website-feeds

Make RSS feeds of your favorite "vote on posts" websites

git clone https://code.pdelong.com/website-feeds.git

  1package pg_store
  2
  3import (
  4	"context"
  5	_ "embed"
  6	"errors"
  7	"fmt"
  8	"log/slog"
  9
 10	"github.com/jackc/pgx/v5"
 11)
 12
 13type updater func(context.Context, pgx.Tx) error
 14
 15func updaterSql(update string) updater {
 16	return func(ctx context.Context, tx pgx.Tx) error {
 17		_, err := tx.Exec(ctx, string(update))
 18		return err
 19	}
 20}
 21
 22//go:embed "schema_files/1.sql"
 23var baseSql string
 24
 25var updates = []updater{
 26	updaterSql(baseSql),
 27}
 28
 29// UpdateSchema updates the database's schema to the latest version.
 30func (s *pgStore) updateSchema(ctx context.Context) error {
 31	tx, err := s.pool.Begin(ctx)
 32	if err != nil {
 33		return fmt.Errorf("failed to start transaction: %w", err)
 34	}
 35
 36	defer func() {
 37		if err := tx.Rollback(ctx); err != nil &&
 38			!errors.Is(err, pgx.ErrTxClosed) {
 39			slog.Error(
 40				"Failed to roll back schema update transaction",
 41				slog.Any("err", err),
 42			)
 43		}
 44	}()
 45
 46	currentVersion, err := currentSchemaVersion(ctx, tx)
 47	if err != nil {
 48		return fmt.Errorf("while querying current schema version: %w", err)
 49	}
 50
 51	maxSchemaVersion := len(updates)
 52	if currentVersion == maxSchemaVersion {
 53		slog.Info(
 54			"DB is up-to-date",
 55			slog.Any("curr", currentVersion),
 56		)
 57		return nil
 58	} else if currentVersion > maxSchemaVersion {
 59		return fmt.Errorf(
 60			"current database version is too new: %d vs %d",
 61			currentVersion,
 62			maxSchemaVersion,
 63		)
 64	} else {
 65		slog.Info(
 66			"DB version is out-of-date. Updating",
 67			slog.Any("curr", currentVersion),
 68			slog.Any("latest", maxSchemaVersion),
 69		)
 70	}
 71
 72	for i := currentVersion; i < maxSchemaVersion; i++ {
 73		fun := updates[i]
 74		slog.Info(
 75			"(PROVISIONAL) Updating schema version",
 76			slog.Any("curr", i),
 77			slog.Any("next", i+1),
 78		)
 79
 80		if err := fun(ctx, tx); err != nil {
 81			return fmt.Errorf(
 82				"failed to update schema from version %d to %d: %w",
 83				i,
 84				i+1,
 85				err,
 86			)
 87		}
 88
 89	}
 90
 91	_, err = tx.Exec(ctx, `DELETE FROM schema_version`)
 92	if err != nil {
 93		return fmt.Errorf("failed to clear schema_version table: %w", err)
 94	}
 95
 96	_, err = tx.Exec(
 97		ctx,
 98		`INSERT INTO schema_version(version) VALUES ($1)`,
 99		maxSchemaVersion,
100	)
101	if err != nil {
102		return fmt.Errorf("failed to update schema_version table: %w", err)
103	}
104
105	err = tx.Commit(ctx)
106	if err != nil {
107		return fmt.Errorf("failed to commit: %w", err)
108	}
109
110	slog.Info("Successfully updated schema")
111
112	return nil
113}
114
115func currentSchemaVersion(ctx context.Context, tx pgx.Tx) (int, error) {
116	tableExists := false
117	err := tx.QueryRow(
118		ctx,
119		`SELECT
120     		COALESCE((
121         		SELECT
122             		TRUE
123         		FROM information_schema.tables
124         		WHERE
125             		table_name = 'schema_version'), FALSE)`,
126	).Scan(&tableExists)
127	if err != nil {
128		return -1, err
129	}
130
131	if !tableExists {
132		return 0, nil
133	}
134
135	currentVersion := -1
136	err = tx.QueryRow(
137		ctx,
138		`SELECT
139     		COALESCE((
140         		SELECT
141             		MAX(version)
142         		FROM schema_version), 0)`,
143	).Scan(&currentVersion)
144	if err != nil {
145		return -1, err
146	}
147
148	return currentVersion, nil
149}