1package pg_store23import (4 "context"5 "errors"6 "fmt"7 "log/slog"8 "time"910 "website-feeds/model"1112 "github.com/jackc/pgx/v5"13 "github.com/jackc/pgx/v5/pgxpool"14)1516type pgStore struct {17 pool *pgxpool.Pool18}1920type settings struct {21 RedditClientId *string22 RedditClientSecret *string23 DefaultNumPosts int24 MaxNumPosts int25 SiteFetchWait time.Duration26}2728func (s *settings) intoModel() model.Settings {29 var reddit *model.RedditSettings30 if s.RedditClientId != nil &&31 s.RedditClientSecret != nil {32 reddit = &model.RedditSettings{33 ClientID: *s.RedditClientId,34 ClientSecret: *s.RedditClientSecret,35 }36 }3738 return model.Settings{39 DefaultNumPosts: s.DefaultNumPosts,40 MaxNumPosts: s.MaxNumPosts,41 SiteFetchWait: s.SiteFetchWait,42 Reddit: reddit,43 }44}4546type site struct {47 ID int48 Name string49 DisplayName string50 LastFetchedTime *time.Time51 LastFetchedNum *int52}5354func (s *site) intoModel() model.Site {55 var lastFetched *model.SiteLastFetched56 if s.LastFetchedTime != nil && s.LastFetchedNum != nil {57 lastFetched = &model.SiteLastFetched{58 Time: *s.LastFetchedTime,59 Num: *s.LastFetchedNum,60 }61 }6263 return model.Site{64 ID: s.ID,65 Name: s.Name,66 DisplayName: s.DisplayName,67 LastFetched: lastFetched,68 }69}7071func New(ctx context.Context, url string) (*pgStore, error) {72 pool, err := pgxpool.New(ctx, url)73 if err != nil {74 return nil, fmt.Errorf("failed to create database: %w", err)75 }7677 return newFromPool(ctx, pool)78}7980func newFromPool(ctx context.Context, pool *pgxpool.Pool) (*pgStore, error) {81 if err := pool.Ping(ctx); err != nil {82 return nil, fmt.Errorf("failed to ping database: %w", err)83 }8485 s := &pgStore{pool}86 if err := s.updateSchema(ctx); err != nil {87 return nil, fmt.Errorf("failed to update database schema: %w", err)88 }8990 return s, nil91}9293func (s *pgStore) Close() {94 s.pool.Close()95}9697func (s *pgStore) SiteByName(98 ctx context.Context,99 name string,100) (*model.Site, error) {101 rows, err := s.pool.Query(102 ctx,103 `SELECT id, name, display_name, last_fetched_time, last_fetched_num104 FROM sites105 WHERE name = $1`,106 name)107 if err != nil {108 return nil, fmt.Errorf("while querying site by name: %w", err)109 }110111 site, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[site])112 if err != nil {113 return nil, fmt.Errorf("while collecting site: %w", err)114 }115116 m := site.intoModel()117118 return &m, nil119}120121func (s *pgStore) WriteSitePosts(122 ctx context.Context,123 siteID int,124 posts []model.Post,125 lastFetched model.SiteLastFetched,126) error {127 tx, err := s.pool.Begin(ctx)128 if err != nil {129 return fmt.Errorf("failed to begin transaction: %w", err)130 }131 defer func() {132 if err := tx.Rollback(ctx); err != nil &&133 !errors.Is(err, pgx.ErrTxClosed) {134 slog.Warn(135 "Failed to roll back writing posts transaction",136 slog.Any("err", err),137 )138 }139 }()140141 _, err = tx.Exec(142 ctx,143 `UPDATE sites144 SET last_fetched_time = $2,145 last_fetched_num = $3146 WHERE id = $1147 AND (last_fetched_time < $2 OR last_fetched_time IS NULL)`,148 siteID,149 lastFetched.Time,150 lastFetched.Num,151 )152 if err != nil {153 return fmt.Errorf("faled to update site fetched time: %w", err)154 }155156 for _, post := range posts {157 _, err = tx.Exec(158 ctx,159 `INSERT INTO posts (site_unique_id, site_id, title, score, created,160 url, last_fetched)161 VALUES ($1, $2, $3, $4, $5, $6, $7)162 ON CONFLICT (site_unique_id, site_id)163 DO UPDATE SET164 score = $4, title = $3, last_fetched = $7165 WHERE166 posts.last_fetched < $7`,167 post.SiteUniqueID,168 siteID,169 post.Title,170 post.Score,171 post.Created,172 post.URL,173 lastFetched.Time,174 )175 if err != nil {176 return fmt.Errorf("failed to update post: %w", err)177 }178 }179180 return tx.Commit(ctx)181}182183func (s *pgStore) TopPostsForSite(184 ctx context.Context,185 siteID int,186 now time.Time,187 numPosts int,188) ([]model.Post, error) {189 rows, err := s.pool.Query(190 ctx,191 `SELECT site_unique_id, title, score, created, url192 FROM (193 SELECT *194 FROM posts195 INNER JOIN sites ON posts.site_id = sites.id196 WHERE sites.id = $1197 AND created > $2::timestamptz - interval '8 days'198 AND created < $2::timestamptz - interval '1 days'199 ORDER BY score DESC200 LIMIT $3201 )202 ORDER BY created ASC`,203 siteID,204 now,205 numPosts,206 )207 if err != nil {208 return nil, fmt.Errorf("unable to start query: %w", err)209 }210211 posts, err := pgx.CollectRows(rows, pgx.RowToStructByName[model.Post])212 if err != nil {213 return nil, fmt.Errorf("unable to collect rows: %s", err)214 }215216 return posts, nil217}218219func (s *pgStore) CreateSite(220 ctx context.Context,221 name, displayName string,222) (*model.Site, error) {223 rows, err := s.pool.Query(224 ctx,225 `INSERT INTO sites (name, display_name)226 VALUES ($1, $2)227 RETURNING id, name, display_name, last_fetched_time, last_fetched_num`,228 name,229 displayName,230 )231 if err != nil {232 return nil, fmt.Errorf("while creating site: %w", err)233 }234235 site, err := pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[site])236 if err != nil {237 return nil, fmt.Errorf("while collecting created site: %w", err)238 }239240 m := site.intoModel()241242 return &m, nil243}244245func (s *pgStore) Settings(ctx context.Context) (*model.Settings, error) {246 rows, err := s.pool.Query(247 ctx,248 `SELECT reddit_client_id, reddit_client_secret, default_num_posts, max_num_posts, site_fetch_wait FROM settings`,249 )250 if err != nil {251 return nil, fmt.Errorf("while querying for settings: %w", err)252 }253254 settings, err := pgx.CollectExactlyOneRow(255 rows,256 pgx.RowToStructByName[settings],257 )258 if err != nil {259 return nil, fmt.Errorf("while collecting settings: %w", err)260 }261262 m := settings.intoModel()263264 return &m, nil265}266267func (s *pgStore) UpdateSettings(268 ctx context.Context,269 settings *model.Settings,270) error {271 var redditClientID *string272 var redditClientSecret *string273 if settings.Reddit != nil {274 redditClientID = &settings.Reddit.ClientID275 redditClientSecret = &settings.Reddit.ClientSecret276 }277278 _, err := s.pool.Exec(279 ctx,280 `UPDATE settings281 SET reddit_client_id = $1,282 reddit_client_secret = $2,283 default_num_posts = $3,284 max_num_posts = $4,285 site_fetch_wait = $5`,286 redditClientID,287 redditClientSecret,288 settings.DefaultNumPosts,289 settings.MaxNumPosts,290 settings.SiteFetchWait,291 )292293 return err294}295296func (s *pgStore) WriteRequest(ctx context.Context, r *model.Request) error {297 _, err := s.pool.Exec(298 ctx,299 `INSERT INTO requests (host, site_id, num_posts, default_num_posts, timestamp)300 VALUES ($1, $2, $3, $4, $5)`,301 r.Host,302 r.SiteId,303 r.NumPosts,304 r.DefaultNumPosts,305 r.Timestamp,306 )307 if err != nil {308 return fmt.Errorf("while inserting request: %w", err)309 }310311 return nil312}313314func (s *pgStore) NumFetchPostsForSite(315 ctx context.Context,316 siteID int,317 now time.Time,318) (int, error) {319 rows, err := s.pool.Query(320 ctx,321 `SELECT MAX(num_posts)322 FROM requests323 WHERE site_id = $1324 AND timestamp > $2::timestamptz - interval '1 day'`,325 siteID,326 now,327 )328 if err != nil {329 return -1, fmt.Errorf("while querying requests: %w", err)330 }331332 numFetchPosts, err := pgx.CollectExactlyOneRow(rows, pgx.RowTo[int])333 if err != nil {334 return -1, fmt.Errorf("while collecting numFetchPosts: %w", err)335 }336337 return numFetchPosts, nil338}