website-feeds

Make RSS feeds of your favorite "vote on posts" websites

git clone https://code.pdelong.com/website-feeds.git

  1package pg_store
  2
  3import (
  4	"context"
  5	"errors"
  6	"fmt"
  7	"log/slog"
  8	"time"
  9
 10	"website-feeds/model"
 11
 12	"github.com/jackc/pgx/v5"
 13	"github.com/jackc/pgx/v5/pgxpool"
 14)
 15
 16type pgStore struct {
 17	pool *pgxpool.Pool
 18}
 19
 20type settings struct {
 21	RedditClientId     *string
 22	RedditClientSecret *string
 23	DefaultNumPosts    int
 24	MaxNumPosts        int
 25	SiteFetchWait      time.Duration
 26}
 27
 28func (s *settings) intoModel() model.Settings {
 29	var reddit *model.RedditSettings
 30	if s.RedditClientId != nil &&
 31		s.RedditClientSecret != nil {
 32		reddit = &model.RedditSettings{
 33			ClientID:     *s.RedditClientId,
 34			ClientSecret: *s.RedditClientSecret,
 35		}
 36	}
 37
 38	return model.Settings{
 39		DefaultNumPosts: s.DefaultNumPosts,
 40		MaxNumPosts:     s.MaxNumPosts,
 41		SiteFetchWait:   s.SiteFetchWait,
 42		Reddit:          reddit,
 43	}
 44}
 45
 46type site struct {
 47	ID              int
 48	Name            string
 49	DisplayName     string
 50	LastFetchedTime *time.Time
 51	LastFetchedNum  *int
 52}
 53
 54func (s *site) intoModel() model.Site {
 55	var lastFetched *model.SiteLastFetched
 56	if s.LastFetchedTime != nil && s.LastFetchedNum != nil {
 57		lastFetched = &model.SiteLastFetched{
 58			Time: *s.LastFetchedTime,
 59			Num:  *s.LastFetchedNum,
 60		}
 61	}
 62
 63	return model.Site{
 64		ID:          s.ID,
 65		Name:        s.Name,
 66		DisplayName: s.DisplayName,
 67		LastFetched: lastFetched,
 68	}
 69}
 70
 71func New(ctx context.Context, url string) (*pgStore, error) {
 72	pool, err := pgxpool.New(ctx, url)
 73	if err != nil {
 74		return nil, fmt.Errorf("failed to create database: %w", err)
 75	}
 76
 77	return newFromPool(ctx, pool)
 78}
 79
 80func newFromPool(ctx context.Context, pool *pgxpool.Pool) (*pgStore, error) {
 81	if err := pool.Ping(ctx); err != nil {
 82		return nil, fmt.Errorf("failed to ping database: %w", err)
 83	}
 84
 85	s := &pgStore{pool}
 86	if err := s.updateSchema(ctx); err != nil {
 87		return nil, fmt.Errorf("failed to update database schema: %w", err)
 88	}
 89
 90	return s, nil
 91}
 92
 93func (s *pgStore) Close() {
 94	s.pool.Close()
 95}
 96
 97func (s *pgStore) SiteByName(
 98	ctx context.Context,
 99	name string,
100) (*model.Site, error) {
101	rows, err := s.pool.Query(
102		ctx,
103		`SELECT id, name, display_name, last_fetched_time, last_fetched_num
104		 FROM sites
105		 WHERE name = $1`,
106		name)
107	if err != nil {
108		return nil, fmt.Errorf("while querying site by name: %w", err)
109	}
110
111	site, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[site])
112	if err != nil {
113		return nil, fmt.Errorf("while collecting site: %w", err)
114	}
115
116	m := site.intoModel()
117
118	return &m, nil
119}
120
121func (s *pgStore) WriteSitePosts(
122	ctx context.Context,
123	siteID int,
124	posts []model.Post,
125	lastFetched model.SiteLastFetched,
126) error {
127	tx, err := s.pool.Begin(ctx)
128	if err != nil {
129		return fmt.Errorf("failed to begin transaction: %w", err)
130	}
131	defer func() {
132		if err := tx.Rollback(ctx); err != nil &&
133			!errors.Is(err, pgx.ErrTxClosed) {
134			slog.Warn(
135				"Failed to roll back writing posts transaction",
136				slog.Any("err", err),
137			)
138		}
139	}()
140
141	_, err = tx.Exec(
142		ctx,
143		`UPDATE sites
144		 SET last_fetched_time = $2,
145		     last_fetched_num = $3
146		 WHERE id = $1
147		   AND (last_fetched_time < $2 OR last_fetched_time IS NULL)`,
148		siteID,
149		lastFetched.Time,
150		lastFetched.Num,
151	)
152	if err != nil {
153		return fmt.Errorf("faled to update site fetched time: %w", err)
154	}
155
156	for _, post := range posts {
157		_, err = tx.Exec(
158			ctx,
159			`INSERT INTO posts (site_unique_id, site_id, title, score, created,
160			     url, last_fetched)
161			     VALUES ($1, $2, $3, $4, $5, $6, $7)
162			 ON CONFLICT (site_unique_id, site_id)
163			     DO UPDATE SET
164			         score = $4, title = $3, last_fetched = $7
165			     WHERE
166			         posts.last_fetched < $7`,
167			post.SiteUniqueID,
168			siteID,
169			post.Title,
170			post.Score,
171			post.Created,
172			post.URL,
173			lastFetched.Time,
174		)
175		if err != nil {
176			return fmt.Errorf("failed to update post: %w", err)
177		}
178	}
179
180	return tx.Commit(ctx)
181}
182
183func (s *pgStore) TopPostsForSite(
184	ctx context.Context,
185	siteID int,
186	now time.Time,
187	numPosts int,
188) ([]model.Post, error) {
189	rows, err := s.pool.Query(
190		ctx,
191		`SELECT site_unique_id, title, score, created, url
192		 FROM (
193		     SELECT *
194		     FROM posts
195		     INNER JOIN sites ON posts.site_id = sites.id
196		     WHERE sites.id = $1
197		       AND created > $2::timestamptz - interval '8 days'
198		       AND created < $2::timestamptz - interval '1 days'
199		     ORDER BY score DESC
200		     LIMIT $3
201		 )
202		 ORDER BY created ASC`,
203		siteID,
204		now,
205		numPosts,
206	)
207	if err != nil {
208		return nil, fmt.Errorf("unable to start query: %w", err)
209	}
210
211	posts, err := pgx.CollectRows(rows, pgx.RowToStructByName[model.Post])
212	if err != nil {
213		return nil, fmt.Errorf("unable to collect rows: %s", err)
214	}
215
216	return posts, nil
217}
218
219func (s *pgStore) CreateSite(
220	ctx context.Context,
221	name, displayName string,
222) (*model.Site, error) {
223	rows, err := s.pool.Query(
224		ctx,
225		`INSERT INTO sites (name, display_name)
226		 VALUES ($1, $2)
227		 RETURNING id, name, display_name, last_fetched_time, last_fetched_num`,
228		name,
229		displayName,
230	)
231	if err != nil {
232		return nil, fmt.Errorf("while creating site: %w", err)
233	}
234
235	site, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[site])
236	if err != nil {
237		return nil, fmt.Errorf("while collecting created site: %w", err)
238	}
239
240	m := site.intoModel()
241
242	return &m, nil
243}
244
245func (s *pgStore) Settings(ctx context.Context) (*model.Settings, error) {
246	rows, err := s.pool.Query(
247		ctx,
248		`SELECT reddit_client_id, reddit_client_secret, default_num_posts, max_num_posts, site_fetch_wait FROM settings`,
249	)
250	if err != nil {
251		return nil, fmt.Errorf("while querying for settings: %w", err)
252	}
253
254	settings, err := pgx.CollectExactlyOneRow(
255		rows,
256		pgx.RowToStructByName[settings],
257	)
258	if err != nil {
259		return nil, fmt.Errorf("while collecting settings: %w", err)
260	}
261
262	m := settings.intoModel()
263
264	return &m, nil
265}
266
267func (s *pgStore) UpdateSettings(
268	ctx context.Context,
269	settings *model.Settings,
270) error {
271	var redditClientID *string
272	var redditClientSecret *string
273	if settings.Reddit != nil {
274		redditClientID = &settings.Reddit.ClientID
275		redditClientSecret = &settings.Reddit.ClientSecret
276	}
277
278	_, err := s.pool.Exec(
279		ctx,
280		`UPDATE settings
281		 SET reddit_client_id = $1,
282		     reddit_client_secret = $2,
283		     default_num_posts = $3,
284		     max_num_posts = $4,
285		     site_fetch_wait = $5`,
286		redditClientID,
287		redditClientSecret,
288		settings.DefaultNumPosts,
289		settings.MaxNumPosts,
290		settings.SiteFetchWait,
291	)
292
293	return err
294}
295
296func (s *pgStore) WriteRequest(ctx context.Context, r *model.Request) error {
297	_, err := s.pool.Exec(
298		ctx,
299		`INSERT INTO requests (host, site_id, num_posts, default_num_posts, timestamp)
300		 VALUES ($1, $2, $3, $4, $5)`,
301		r.Host,
302		r.SiteId,
303		r.NumPosts,
304		r.DefaultNumPosts,
305		r.Timestamp,
306	)
307	if err != nil {
308		return fmt.Errorf("while inserting request: %w", err)
309	}
310
311	return nil
312}
313
314func (s *pgStore) NumFetchPostsForSite(
315	ctx context.Context,
316	siteID int,
317	now time.Time,
318) (int, error) {
319	rows, err := s.pool.Query(
320		ctx,
321		`SELECT MAX(num_posts)
322		 FROM requests
323		 WHERE site_id = $1
324		   AND timestamp > $2::timestamptz - interval '1 day'`,
325		siteID,
326		now,
327	)
328	if err != nil {
329		return -1, fmt.Errorf("while querying requests: %w", err)
330	}
331
332	numFetchPosts, err := pgx.CollectExactlyOneRow(rows, pgx.RowTo[int])
333	if err != nil {
334		return -1, fmt.Errorf("while collecting numFetchPosts: %w", err)
335	}
336
337	return numFetchPosts, nil
338}