1package pg_store23import (4 "context"5 _ "embed"6 "errors"7 "fmt"8 "log/slog"910 "github.com/jackc/pgx/v5"11)1213type updater func(context.Context, pgx.Tx) error1415func updaterSql(update string) updater {16 return func(ctx context.Context, tx pgx.Tx) error {17 _, err := tx.Exec(ctx, string(update))18 return err19 }20}2122//go:embed "schema_files/1.sql"23var baseSql string2425var updates = []updater{26 updaterSql(baseSql),27}2829// UpdateSchema updates the database's schema to the latest version.30func (s *pgStore) updateSchema(ctx context.Context) error {31 tx, err := s.pool.Begin(ctx)32 if err != nil {33 return fmt.Errorf("failed to start transaction: %w", err)34 }3536 defer func() {37 if err := tx.Rollback(ctx); err != nil &&38 !errors.Is(err, pgx.ErrTxClosed) {39 slog.Error(40 "Failed to roll back schema update transaction",41 slog.Any("err", err),42 )43 }44 }()4546 currentVersion, err := currentSchemaVersion(ctx, tx)47 if err != nil {48 return fmt.Errorf("while querying current schema version: %w", err)49 }5051 maxSchemaVersion := len(updates)52 if currentVersion == maxSchemaVersion {53 slog.Info(54 "DB is up-to-date",55 slog.Any("curr", currentVersion),56 )57 return nil58 } else if currentVersion > maxSchemaVersion {59 return fmt.Errorf(60 "current database version is too new: %d vs %d",61 currentVersion,62 maxSchemaVersion,63 )64 } else {65 slog.Info(66 "DB version is out-of-date. Updating",67 slog.Any("curr", currentVersion),68 slog.Any("latest", maxSchemaVersion),69 )70 }7172 for i := currentVersion; i < maxSchemaVersion; i++ {73 fun := updates[i]74 slog.Info(75 "(PROVISIONAL) Updating schema version",76 slog.Any("curr", i),77 slog.Any("next", i+1),78 )7980 if err := fun(ctx, tx); err != nil {81 return fmt.Errorf(82 "failed to update schema from version %d to %d: %w",83 i,84 i+1,85 err,86 )87 }8889 }9091 _, err = tx.Exec(ctx, `DELETE FROM schema_version`)92 if err != nil {93 return fmt.Errorf("failed to clear schema_version table: %w", err)94 }9596 _, err = tx.Exec(97 ctx,98 `INSERT INTO schema_version(version) VALUES ($1)`,99 maxSchemaVersion,100 )101 if err != nil {102 return fmt.Errorf("failed to update schema_version table: %w", err)103 }104105 err = tx.Commit(ctx)106 if err != nil {107 return fmt.Errorf("failed to commit: %w", err)108 }109110 slog.Info("Successfully updated schema")111112 return nil113}114115func currentSchemaVersion(ctx context.Context, tx pgx.Tx) (int, error) {116 tableExists := false117 err := tx.QueryRow(118 ctx,119 `SELECT120 COALESCE((121 SELECT122 TRUE123 FROM information_schema.tables124 WHERE125 table_name = 'schema_version'), FALSE)`,126 ).Scan(&tableExists)127 if err != nil {128 return -1, err129 }130131 if !tableExists {132 return 0, nil133 }134135 currentVersion := -1136 err = tx.QueryRow(137 ctx,138 `SELECT139 COALESCE((140 SELECT141 MAX(version)142 FROM schema_version), 0)`,143 ).Scan(¤tVersion)144 if err != nil {145 return -1, err146 }147148 return currentVersion, nil149}