diff --git a/cli/serve.go b/cli/serve.go index 0cc100a0..3e5248fc 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -10,7 +10,7 @@ import ( "github.com/goto/entropy/core" "github.com/goto/entropy/core/module" entropyserver "github.com/goto/entropy/internal/server" - "github.com/goto/entropy/internal/store/postgres" + "github.com/goto/entropy/internal/store/pgsql" "github.com/goto/entropy/modules" "github.com/goto/entropy/modules/firehose" "github.com/goto/entropy/modules/job" @@ -96,8 +96,8 @@ func setupRegistry() module.Registry { return registry } -func setupStorage(pgConStr string, syncCfg syncerConf) *postgres.Store { - store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy) +func setupStorage(pgConStr string, syncCfg syncerConf) *pgsql.Store { + store, err := pgsql.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy) if err != nil { zap.L().Fatal("failed to connect to Postgres database", zap.Error(err), zap.String("conn_str", pgConStr)) diff --git a/go.mod b/go.mod index d6e708b6..9418e82c 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/goto/salt v0.3.3 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 + github.com/jackc/pgx/v5 v5.4.3 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.9 github.com/mcuadros/go-defaults v1.2.0 @@ -202,6 +203,8 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/newrelic/csec-go-agent v0.4.0 // indirect github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 // indirect diff --git a/go.sum b/go.sum index 4fa2af5f..e8c93632 100644 --- a/go.sum +++ b/go.sum @@ -912,6 +912,7 @@ github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfG github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= @@ -923,6 +924,8 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.0.7/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -937,6 +940,8 @@ github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXg github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o= github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA= +github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= +github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= diff --git a/internal/store/pgsql/modules.go b/internal/store/pgsql/modules.go new file mode 100644 index 00000000..4f749518 --- /dev/null +++ b/internal/store/pgsql/modules.go @@ -0,0 +1,77 @@ +package pgsql + +import ( + "context" + + "github.com/goto/entropy/core/module" + "github.com/goto/entropy/internal/store/pgsql/queries" +) + +func (st *Store) GetModule(ctx context.Context, urn string) (*module.Module, error) { + mod, err := st.qu.GetModuleByURN(ctx, urn) + if err != nil { + return nil, translateSQLErr(err) + } + + return &module.Module{ + URN: mod.Urn, + Name: mod.Name, + Project: mod.Project, + Configs: mod.Configs, + CreatedAt: mod.CreatedAt.Time, + UpdatedAt: mod.UpdatedAt.Time, + }, nil +} + +func (st *Store) ListModules(ctx context.Context, project string) ([]module.Module, error) { + mods, err := st.qu.ListAllModulesForProject(ctx, project) + if err != nil { + return nil, translateSQLErr(err) + } + + var modules []module.Module + for _, mod := range mods { + modules = append(modules, module.Module{ + URN: mod.Urn, + Name: mod.Name, + Project: mod.Project, + Configs: mod.Configs, + CreatedAt: mod.CreatedAt.Time, + UpdatedAt: mod.UpdatedAt.Time, + }) + } + + return modules, nil +} + +func (st *Store) CreateModule(ctx context.Context, m module.Module) error { + params := queries.InsertModuleParams{ + Urn: m.URN, + Project: m.Project, + Name: m.Name, + Configs: m.Configs, + } + + if err := st.qu.InsertModule(ctx, params); err != nil { + return translateSQLErr(err) + } + return nil +} + +func (st *Store) UpdateModule(ctx context.Context, m module.Module) error { + params := queries.UpdateModuleParams{ + Urn: m.URN, + Configs: m.Configs, + } + if err := st.qu.UpdateModule(ctx, params); err != nil { + return translateSQLErr(err) + } + return nil +} + +func (st *Store) DeleteModule(ctx context.Context, urn string) error { + if err := st.qu.DeleteModule(ctx, urn); err != nil { + return translateSQLErr(err) + } + return nil +} diff --git a/internal/store/pgsql/pgsql.go b/internal/store/pgsql/pgsql.go new file mode 100644 index 00000000..230c3eb1 --- /dev/null +++ b/internal/store/pgsql/pgsql.go @@ -0,0 +1,132 @@ +package pgsql + +import ( + "context" + "database/sql" + _ "embed" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx/v5" + _ "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/internal/store/pgsql/queries" + "github.com/goto/entropy/pkg/errors" +) + +//go:generate sqlc generate + +// schema represents the storage schema. +// Note: Update the constants above if the table name is changed. +// +//go:embed schema.sql +var schema string + +type Store struct { + qu *queries.Queries + pgx *pgx.Conn + extendInterval time.Duration + refreshInterval time.Duration +} + +func (st *Store) Migrate(ctx context.Context) error { + err := st.qu.Migrate(ctx, schema) + return err +} + +func (st *Store) Close() error { return st.pgx.Close(context.Background()) } + +// Open returns store instance backed by PostgresQL. +func Open(conStr string, refreshInterval, extendInterval time.Duration) (*Store, error) { + conn, err := pgx.Connect(context.Background(), conStr) + if err != nil { + return nil, err + } else if err := conn.Ping(context.Background()); err != nil { + _ = conn.Close(context.Background()) + return nil, err + } + + if refreshInterval >= extendInterval { + return nil, errors.New("refreshInterval must be lower than extendInterval") + } + + return &Store{ + qu: queries.New(conn), + pgx: conn, + extendInterval: extendInterval, + refreshInterval: refreshInterval, + }, nil +} + +func translateSQLErr(err error) error { + if errors.Is(err, sql.ErrNoRows) { + return errors.ErrNotFound.WithCausef(err.Error()) + } + + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + // Refer http://www.postgresql.org/docs/9.3/static/errcodes-appendix.html + switch pgErr.Code { + case "unique_violation": + return errors.ErrConflict.WithCausef(err.Error()) + + default: + return errors.ErrInternal.WithCausef(err.Error()) + } + } + + return err +} + +func tagsToLabelMap(tags []string) map[string]string { + const keyValueParts = 2 + + labels := map[string]string{} + for _, tag := range tags { + parts := strings.SplitN(tag, "=", keyValueParts) + key, val := parts[0], parts[1] + labels[key] = val + } + return labels +} + +func labelToTag(k, v string) string { + return fmt.Sprintf("%s=%s", k, v) +} + +type TxFunc func(ctx context.Context, tx pgx.Tx) error + +func withinTx(ctx context.Context, db *pgx.Conn, readOnly bool, fns ...TxFunc) error { + var opts pgx.TxOptions + if readOnly { + opts.AccessMode = pgx.ReadOnly + } else { + opts.AccessMode = pgx.ReadWrite + } + + tx, err := db.BeginTx(ctx, opts) + if err != nil { + return err + } + + for _, fn := range fns { + if err := fn(ctx, tx); err != nil { + _ = tx.Rollback(ctx) + return err + } + } + + return tx.Commit(ctx) +} + +func runAllHooks(ctx context.Context, hooks []resource.MutationHook) error { + for _, hook := range hooks { + if err := hook(ctx); err != nil { + return err + } + } + return nil +} diff --git a/internal/store/pgsql/queries.sql b/internal/store/pgsql/queries.sql new file mode 100644 index 00000000..82ab1fac --- /dev/null +++ b/internal/store/pgsql/queries.sql @@ -0,0 +1,126 @@ +-- =================== Modules =================== + +-- name: GetModuleByURN :one +SELECT * +FROM modules +WHERE urn = $1; + +-- name: ListAllModulesForProject :many +SELECT * +FROM modules +WHERE project = $1; + +-- name: InsertModule :exec +INSERT INTO modules (urn, project, name, configs) +VALUES ($1, $2, $3, $4); + +-- name: UpdateModule :exec +UPDATE modules +SET configs = $2, + updated_at = current_timestamp +WHERE urn = $1; + +-- name: DeleteModule :exec +DELETE +FROM modules +WHERE urn = $1; + +-- =================== Resources =================== + +-- name: GetResourceByURN :one +SELECT r.*, + array_agg(rt.tag)::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_tags rt ON r.id = rt.resource_id + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id +WHERE r.urn = $1 +GROUP BY r.id; + +-- name: GetResourceDependencies :one +SELECT jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id +WHERE r.urn = $1 +GROUP BY r.id; + +-- name: ListResourceURNsByFilter :many +SELECT r.*, + array_agg(rt.tag)::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id + LEFT JOIN resource_tags rt ON r.id = rt.resource_id +WHERE (sqlc.narg('project')::text IS NULL OR r.project = sqlc.narg('project')) + AND (sqlc.narg('kind')::text IS NULL OR r.kind = sqlc.narg('kind')) +GROUP BY r.id; + +-- name: DeleteResourceDependenciesByURN :exec +DELETE +FROM resource_dependencies +WHERE resource_id = (SELECT id FROM resources WHERE urn = $1); + +-- name: DeleteResourceTagsByURN :exec +DELETE +FROM resource_tags +WHERE resource_id = (SELECT id FROM resources WHERE urn = $1); + +-- name: DeleteResourceByURN :exec +DELETE +FROM resources +WHERE urn = $1; + +-- name: InsertResource :one +INSERT INTO resources ("urn", "kind", "project", "name", "created_at", "updated_at", "created_by", "updated_by", + "spec_configs", "state_status", "state_output", "state_module_data", + "state_next_sync", "state_sync_result") +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) +RETURNING id; + +-- name: FetchResourceForSync :one +SELECT urn FROM resources +Where state_next_sync <= current_timestamp +FOR UPDATE SKIP LOCKED; + +-- name: ExtendWaitTime :exec +UPDATE resources +SET state_next_sync = current_timestamp + ($2 ||' seconds')::interval +WHERE urn = $1; + +-- name: InsertResourceTags :copyfrom +INSERT INTO resource_tags (resource_id, tag) +VALUES ($1, $2); + +-- name: InsertResourceDependency :exec +INSERT INTO resource_dependencies (resource_id, dependency_key, depends_on) +VALUES ($1, $2, (SELECT id FROM resources WHERE urn = $3)); + +-- name: UpdateResource :one +UPDATE resources +SET updated_at = current_timestamp, + updated_by = $2, + spec_configs = $3, + state_status = $4, + state_output = $5, + state_module_data = $6, + state_next_sync = $7, + state_sync_result = $8 +WHERE urn = $1 +RETURNING id; + +-- =================== Revisions =================== + +-- name: ListResourceRevisions :many +SELECT rev.*, array_agg(distinct rt.tag)::text[] AS tags +FROM resources r + JOIN revisions rev ON r.id = rev.resource_id + JOIN revision_tags rt ON rev.id = rt.revision_id +WHERE r.urn = $1 +GROUP BY rev.id; + +-- name: InsertRevision :exec +INSERT INTO revisions ("resource_id", "reason", "spec_configs", "created_by") +VALUES ($1, $2, $3, $4); \ No newline at end of file diff --git a/internal/store/pgsql/queries/copyfrom.go b/internal/store/pgsql/queries/copyfrom.go new file mode 100644 index 00000000..58a35051 --- /dev/null +++ b/internal/store/pgsql/queries/copyfrom.go @@ -0,0 +1,43 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 +// source: copyfrom.go + +package queries + +import ( + "context" +) + +// iteratorForInsertResourceTags implements pgx.CopyFromSource. +type iteratorForInsertResourceTags struct { + rows []InsertResourceTagsParams + skippedFirstNextCall bool +} + +func (r *iteratorForInsertResourceTags) Next() bool { + if len(r.rows) == 0 { + return false + } + if !r.skippedFirstNextCall { + r.skippedFirstNextCall = true + return true + } + r.rows = r.rows[1:] + return len(r.rows) > 0 +} + +func (r iteratorForInsertResourceTags) Values() ([]interface{}, error) { + return []interface{}{ + r.rows[0].ResourceID, + r.rows[0].Tag, + }, nil +} + +func (r iteratorForInsertResourceTags) Err() error { + return nil +} + +func (q *Queries) InsertResourceTags(ctx context.Context, arg []InsertResourceTagsParams) (int64, error) { + return q.db.CopyFrom(ctx, []string{"resource_tags"}, []string{"resource_id", "tag"}, &iteratorForInsertResourceTags{rows: arg}) +} diff --git a/internal/store/pgsql/queries/db.go b/internal/store/pgsql/queries/db.go new file mode 100644 index 00000000..8bb4e33a --- /dev/null +++ b/internal/store/pgsql/queries/db.go @@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 + +package queries + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row + CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/store/pgsql/queries/migrate.go b/internal/store/pgsql/queries/migrate.go new file mode 100644 index 00000000..5907d75b --- /dev/null +++ b/internal/store/pgsql/queries/migrate.go @@ -0,0 +1,11 @@ +package queries + +import ( + "context" + _ "embed" +) + +func (q *Queries) Migrate(ctx context.Context, schema string) error { + _, err := q.db.Exec(ctx, schema) + return err +} diff --git a/internal/store/pgsql/queries/models.go b/internal/store/pgsql/queries/models.go new file mode 100644 index 00000000..155b28ff --- /dev/null +++ b/internal/store/pgsql/queries/models.go @@ -0,0 +1,61 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 + +package queries + +import ( + "github.com/jackc/pgx/v5/pgtype" +) + +type Module struct { + Urn string + Name string + Project string + Configs []byte + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz +} + +type Resource struct { + ID int64 + Urn string + Kind string + Name string + Project string + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz + SpecConfigs []byte + StateStatus string + StateOutput []byte + StateModuleData []byte + StateNextSync pgtype.Timestamptz + StateSyncResult []byte + CreatedBy string + UpdatedBy string +} + +type ResourceDependency struct { + ResourceID int64 + DependencyKey string + DependsOn int64 +} + +type ResourceTag struct { + Tag string + ResourceID int64 +} + +type Revision struct { + ID int64 + Reason string + CreatedAt pgtype.Timestamptz + ResourceID int64 + SpecConfigs []byte + CreatedBy string +} + +type RevisionTag struct { + Tag string + RevisionID int64 +} diff --git a/internal/store/pgsql/queries/queries.sql.go b/internal/store/pgsql/queries/queries.sql.go new file mode 100644 index 00000000..1fe1be77 --- /dev/null +++ b/internal/store/pgsql/queries/queries.sql.go @@ -0,0 +1,510 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 +// source: queries.sql + +package queries + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const deleteModule = `-- name: DeleteModule :exec +DELETE +FROM modules +WHERE urn = $1 +` + +func (q *Queries) DeleteModule(ctx context.Context, urn string) error { + _, err := q.db.Exec(ctx, deleteModule, urn) + return err +} + +const deleteResourceByURN = `-- name: DeleteResourceByURN :exec +DELETE +FROM resources +WHERE urn = $1 +` + +func (q *Queries) DeleteResourceByURN(ctx context.Context, urn string) error { + _, err := q.db.Exec(ctx, deleteResourceByURN, urn) + return err +} + +const deleteResourceDependenciesByURN = `-- name: DeleteResourceDependenciesByURN :exec +DELETE +FROM resource_dependencies +WHERE resource_id = (SELECT id FROM resources WHERE urn = $1) +` + +func (q *Queries) DeleteResourceDependenciesByURN(ctx context.Context, urn string) error { + _, err := q.db.Exec(ctx, deleteResourceDependenciesByURN, urn) + return err +} + +const deleteResourceTagsByURN = `-- name: DeleteResourceTagsByURN :exec +DELETE +FROM resource_tags +WHERE resource_id = (SELECT id FROM resources WHERE urn = $1) +` + +func (q *Queries) DeleteResourceTagsByURN(ctx context.Context, urn string) error { + _, err := q.db.Exec(ctx, deleteResourceTagsByURN, urn) + return err +} + +const extendWaitTime = `-- name: ExtendWaitTime :exec +UPDATE resources +SET state_next_sync = current_timestamp + ($2 ||' seconds')::interval +WHERE urn = $1 +` + +type ExtendWaitTimeParams struct { + Urn string + Column2 pgtype.Text +} + +func (q *Queries) ExtendWaitTime(ctx context.Context, arg ExtendWaitTimeParams) error { + _, err := q.db.Exec(ctx, extendWaitTime, arg.Urn, arg.Column2) + return err +} + +const fetchResourceForSync = `-- name: FetchResourceForSync :one +SELECT urn FROM resources +Where state_next_sync <= current_timestamp +FOR UPDATE SKIP LOCKED +` + +func (q *Queries) FetchResourceForSync(ctx context.Context) (string, error) { + row := q.db.QueryRow(ctx, fetchResourceForSync) + var urn string + err := row.Scan(&urn) + return urn, err +} + +const getModuleByURN = `-- name: GetModuleByURN :one + +SELECT urn, name, project, configs, created_at, updated_at +FROM modules +WHERE urn = $1 +` + +// =================== Modules =================== +func (q *Queries) GetModuleByURN(ctx context.Context, urn string) (Module, error) { + row := q.db.QueryRow(ctx, getModuleByURN, urn) + var i Module + err := row.Scan( + &i.Urn, + &i.Name, + &i.Project, + &i.Configs, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const getResourceByURN = `-- name: GetResourceByURN :one + +SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.spec_configs, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by, + array_agg(rt.tag)::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_tags rt ON r.id = rt.resource_id + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id +WHERE r.urn = $1 +GROUP BY r.id +` + +type GetResourceByURNRow struct { + ID int64 + Urn string + Kind string + Name string + Project string + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz + SpecConfigs []byte + StateStatus string + StateOutput []byte + StateModuleData []byte + StateNextSync pgtype.Timestamptz + StateSyncResult []byte + CreatedBy string + UpdatedBy string + Tags []string + Dependencies []byte +} + +// =================== Resources =================== +func (q *Queries) GetResourceByURN(ctx context.Context, urn string) (GetResourceByURNRow, error) { + row := q.db.QueryRow(ctx, getResourceByURN, urn) + var i GetResourceByURNRow + err := row.Scan( + &i.ID, + &i.Urn, + &i.Kind, + &i.Name, + &i.Project, + &i.CreatedAt, + &i.UpdatedAt, + &i.SpecConfigs, + &i.StateStatus, + &i.StateOutput, + &i.StateModuleData, + &i.StateNextSync, + &i.StateSyncResult, + &i.CreatedBy, + &i.UpdatedBy, + &i.Tags, + &i.Dependencies, + ) + return i, err +} + +const getResourceDependencies = `-- name: GetResourceDependencies :one +SELECT jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id +WHERE r.urn = $1 +GROUP BY r.id +` + +func (q *Queries) GetResourceDependencies(ctx context.Context, urn string) ([]byte, error) { + row := q.db.QueryRow(ctx, getResourceDependencies, urn) + var dependencies []byte + err := row.Scan(&dependencies) + return dependencies, err +} + +const insertModule = `-- name: InsertModule :exec +INSERT INTO modules (urn, project, name, configs) +VALUES ($1, $2, $3, $4) +` + +type InsertModuleParams struct { + Urn string + Project string + Name string + Configs []byte +} + +func (q *Queries) InsertModule(ctx context.Context, arg InsertModuleParams) error { + _, err := q.db.Exec(ctx, insertModule, + arg.Urn, + arg.Project, + arg.Name, + arg.Configs, + ) + return err +} + +const insertResource = `-- name: InsertResource :one +INSERT INTO resources ("urn", "kind", "project", "name", "created_at", "updated_at", "created_by", "updated_by", + "spec_configs", "state_status", "state_output", "state_module_data", + "state_next_sync", "state_sync_result") +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) +RETURNING id +` + +type InsertResourceParams struct { + Urn string + Kind string + Project string + Name string + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz + CreatedBy string + UpdatedBy string + SpecConfigs []byte + StateStatus string + StateOutput []byte + StateModuleData []byte + StateNextSync pgtype.Timestamptz + StateSyncResult []byte +} + +func (q *Queries) InsertResource(ctx context.Context, arg InsertResourceParams) (int64, error) { + row := q.db.QueryRow(ctx, insertResource, + arg.Urn, + arg.Kind, + arg.Project, + arg.Name, + arg.CreatedAt, + arg.UpdatedAt, + arg.CreatedBy, + arg.UpdatedBy, + arg.SpecConfigs, + arg.StateStatus, + arg.StateOutput, + arg.StateModuleData, + arg.StateNextSync, + arg.StateSyncResult, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + +const insertResourceDependency = `-- name: InsertResourceDependency :exec +INSERT INTO resource_dependencies (resource_id, dependency_key, depends_on) +VALUES ($1, $2, (SELECT id FROM resources WHERE urn = $3)) +` + +type InsertResourceDependencyParams struct { + ResourceID int64 + DependencyKey string + Urn string +} + +func (q *Queries) InsertResourceDependency(ctx context.Context, arg InsertResourceDependencyParams) error { + _, err := q.db.Exec(ctx, insertResourceDependency, arg.ResourceID, arg.DependencyKey, arg.Urn) + return err +} + +type InsertResourceTagsParams struct { + ResourceID int64 + Tag string +} + +const insertRevision = `-- name: InsertRevision :exec +INSERT INTO revisions ("resource_id", "reason", "spec_configs", "created_by") +VALUES ($1, $2, $3, $4) +` + +type InsertRevisionParams struct { + ResourceID int64 + Reason string + SpecConfigs []byte + CreatedBy string +} + +func (q *Queries) InsertRevision(ctx context.Context, arg InsertRevisionParams) error { + _, err := q.db.Exec(ctx, insertRevision, + arg.ResourceID, + arg.Reason, + arg.SpecConfigs, + arg.CreatedBy, + ) + return err +} + +const listAllModulesForProject = `-- name: ListAllModulesForProject :many +SELECT urn, name, project, configs, created_at, updated_at +FROM modules +WHERE project = $1 +` + +func (q *Queries) ListAllModulesForProject(ctx context.Context, project string) ([]Module, error) { + rows, err := q.db.Query(ctx, listAllModulesForProject, project) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Module + for rows.Next() { + var i Module + if err := rows.Scan( + &i.Urn, + &i.Name, + &i.Project, + &i.Configs, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listResourceRevisions = `-- name: ListResourceRevisions :many + +SELECT rev.id, rev.reason, rev.created_at, rev.resource_id, rev.spec_configs, rev.created_by, array_agg(distinct rt.tag)::text[] AS tags +FROM resources r + JOIN revisions rev ON r.id = rev.resource_id + JOIN revision_tags rt ON rev.id = rt.revision_id +WHERE r.urn = $1 +GROUP BY rev.id +` + +type ListResourceRevisionsRow struct { + ID int64 + Reason string + CreatedAt pgtype.Timestamptz + ResourceID int64 + SpecConfigs []byte + CreatedBy string + Tags []string +} + +// =================== Revisions =================== +func (q *Queries) ListResourceRevisions(ctx context.Context, urn string) ([]ListResourceRevisionsRow, error) { + rows, err := q.db.Query(ctx, listResourceRevisions, urn) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListResourceRevisionsRow + for rows.Next() { + var i ListResourceRevisionsRow + if err := rows.Scan( + &i.ID, + &i.Reason, + &i.CreatedAt, + &i.ResourceID, + &i.SpecConfigs, + &i.CreatedBy, + &i.Tags, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listResourceURNsByFilter = `-- name: ListResourceURNsByFilter :many +SELECT r.id, r.urn, r.kind, r.name, r.project, r.created_at, r.updated_at, r.spec_configs, r.state_status, r.state_output, r.state_module_data, r.state_next_sync, r.state_sync_result, r.created_by, r.updated_by, + array_agg(rt.tag)::text[] AS tags, + jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies +FROM resources r + LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id + LEFT JOIN resources d ON rd.depends_on = d.id + LEFT JOIN resource_tags rt ON r.id = rt.resource_id +WHERE ($1::text IS NULL OR r.project = $1) + AND ($2::text IS NULL OR r.kind = $2) +GROUP BY r.id +` + +type ListResourceURNsByFilterParams struct { + Project pgtype.Text + Kind pgtype.Text +} + +type ListResourceURNsByFilterRow struct { + ID int64 + Urn string + Kind string + Name string + Project string + CreatedAt pgtype.Timestamptz + UpdatedAt pgtype.Timestamptz + SpecConfigs []byte + StateStatus string + StateOutput []byte + StateModuleData []byte + StateNextSync pgtype.Timestamptz + StateSyncResult []byte + CreatedBy string + UpdatedBy string + Tags []string + Dependencies []byte +} + +func (q *Queries) ListResourceURNsByFilter(ctx context.Context, arg ListResourceURNsByFilterParams) ([]ListResourceURNsByFilterRow, error) { + rows, err := q.db.Query(ctx, listResourceURNsByFilter, arg.Project, arg.Kind) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListResourceURNsByFilterRow + for rows.Next() { + var i ListResourceURNsByFilterRow + if err := rows.Scan( + &i.ID, + &i.Urn, + &i.Kind, + &i.Name, + &i.Project, + &i.CreatedAt, + &i.UpdatedAt, + &i.SpecConfigs, + &i.StateStatus, + &i.StateOutput, + &i.StateModuleData, + &i.StateNextSync, + &i.StateSyncResult, + &i.CreatedBy, + &i.UpdatedBy, + &i.Tags, + &i.Dependencies, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateModule = `-- name: UpdateModule :exec +UPDATE modules +SET configs = $2, + updated_at = current_timestamp +WHERE urn = $1 +` + +type UpdateModuleParams struct { + Urn string + Configs []byte +} + +func (q *Queries) UpdateModule(ctx context.Context, arg UpdateModuleParams) error { + _, err := q.db.Exec(ctx, updateModule, arg.Urn, arg.Configs) + return err +} + +const updateResource = `-- name: UpdateResource :one +UPDATE resources +SET updated_at = current_timestamp, + updated_by = $2, + spec_configs = $3, + state_status = $4, + state_output = $5, + state_module_data = $6, + state_next_sync = $7, + state_sync_result = $8 +WHERE urn = $1 +RETURNING id +` + +type UpdateResourceParams struct { + Urn string + UpdatedBy string + SpecConfigs []byte + StateStatus string + StateOutput []byte + StateModuleData []byte + StateNextSync pgtype.Timestamptz + StateSyncResult []byte +} + +func (q *Queries) UpdateResource(ctx context.Context, arg UpdateResourceParams) (int64, error) { + row := q.db.QueryRow(ctx, updateResource, + arg.Urn, + arg.UpdatedBy, + arg.SpecConfigs, + arg.StateStatus, + arg.StateOutput, + arg.StateModuleData, + arg.StateNextSync, + arg.StateSyncResult, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/internal/store/pgsql/resources.go b/internal/store/pgsql/resources.go new file mode 100644 index 00000000..a2afa8a9 --- /dev/null +++ b/internal/store/pgsql/resources.go @@ -0,0 +1,405 @@ +package pgsql + +import ( + "context" + "encoding/json" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + + "github.com/goto/entropy/core/resource" + "github.com/goto/entropy/internal/store/pgsql/queries" + "github.com/goto/entropy/pkg/errors" +) + +func (st *Store) GetByURN(ctx context.Context, urn string) (*resource.Resource, error) { + res, err := st.qu.GetResourceByURN(ctx, urn) + if err != nil { + return nil, translateSQLErr(err) + } + + var nextSyncAt *time.Time + if res.StateNextSync.Valid { + nextSyncAt = &res.StateNextSync.Time + } + + var syncResult resource.SyncResult + if len(res.StateSyncResult) > 0 { + if err := json.Unmarshal(res.StateSyncResult, &syncResult); err != nil { + return nil, err + } + } + + deps, err := depsBytesToMap(res.Dependencies) + if err != nil { + return nil, err + } + + return &resource.Resource{ + URN: res.Urn, + Kind: res.Kind, + Name: res.Name, + Project: res.Project, + Labels: tagsToLabelMap(res.Tags), + CreatedAt: res.CreatedAt.Time, + UpdatedAt: res.UpdatedAt.Time, + UpdatedBy: res.UpdatedBy, + CreatedBy: res.CreatedBy, + Spec: resource.Spec{ + Configs: res.SpecConfigs, + Dependencies: deps, + }, + State: resource.State{ + Status: res.StateStatus, + Output: res.StateOutput, + ModuleData: res.StateModuleData, + NextSyncAt: nextSyncAt, + SyncResult: syncResult, + }, + }, nil +} + +func (st *Store) List(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) { + params := queries.ListResourceURNsByFilterParams{ + Project: pgtype.Text{ + Valid: filter.Project != "", + String: filter.Project, + }, + Kind: pgtype.Text{ + Valid: filter.Kind != "", + String: filter.Kind, + }, + } + + rows, err := st.qu.ListResourceURNsByFilter(ctx, params) + if err != nil { + return nil, translateSQLErr(err) + } + + var result []resource.Resource + for _, res := range rows { + var nextSyncAt *time.Time + if res.StateNextSync.Valid { + nextSyncAt = &res.StateNextSync.Time + } + + var syncResult resource.SyncResult + if len(res.StateSyncResult) > 0 { + if err := json.Unmarshal(res.StateSyncResult, &syncResult); err != nil { + return nil, err + } + } + + deps, err := depsBytesToMap(res.Dependencies) + if err != nil { + return nil, err + } + + result = append(result, resource.Resource{ + URN: res.Urn, + Kind: res.Kind, + Name: res.Name, + Project: res.Project, + Labels: tagsToLabelMap(res.Tags), + CreatedAt: res.CreatedAt.Time, + UpdatedAt: res.UpdatedAt.Time, + UpdatedBy: res.UpdatedBy, + CreatedBy: res.CreatedBy, + Spec: resource.Spec{ + Configs: res.SpecConfigs, + Dependencies: deps, + }, + State: resource.State{ + Status: res.StateStatus, + Output: res.StateOutput, + ModuleData: res.StateModuleData, + NextSyncAt: nextSyncAt, + SyncResult: syncResult, + }, + }) + } + + return result, nil +} + +func (st *Store) Create(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) error { + createResource := func(ctx context.Context, tx pgx.Tx) error { + id, err := st.qu.InsertResource(ctx, queries.InsertResourceParams{}) + if err != nil { + return err + } + + var params []queries.InsertResourceTagsParams + for key, value := range r.Labels { + params = append(params, queries.InsertResourceTagsParams{ + Tag: labelToTag(key, value), + ResourceID: id, + }) + } + + if _, err := st.qu.InsertResourceTags(ctx, params); err != nil { + return err + } + + for key, dependsOn := range r.Spec.Dependencies { + if err := st.qu.InsertResourceDependency(ctx, queries.InsertResourceDependencyParams{ + ResourceID: id, + DependencyKey: key, + Urn: dependsOn, + }); err != nil { + return err + } + } + + return runAllHooks(ctx, hooks) + } + + if err := withinTx(ctx, st.pgx, false, createResource); err != nil { + return translateSQLErr(err) + } + return nil +} + +func (st *Store) Update(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) error { + updateParams := queries.UpdateResourceParams{ + Urn: r.URN, + UpdatedBy: r.UpdatedBy, + SpecConfigs: r.Spec.Configs, + StateStatus: r.State.Status, + StateOutput: r.State.Output, + StateModuleData: r.State.ModuleData, + } + + syncResult, err := json.Marshal(r.State.SyncResult) + if err != nil { + return err + } + updateParams.StateSyncResult = syncResult + + if r.State.NextSyncAt != nil { + updateParams.StateNextSync = pgtype.Timestamptz{ + Time: *r.State.NextSyncAt, + Valid: true, + } + } + + createResource := func(ctx context.Context, tx pgx.Tx) error { + if err := st.qu.DeleteResourceTagsByURN(ctx, r.URN); err != nil { + return err + } + + if err := st.qu.DeleteResourceDependenciesByURN(ctx, r.URN); err != nil { + return err + } + + resourceID, err := st.qu.UpdateResource(ctx, updateParams) + if err != nil { + return err + } + + var params []queries.InsertResourceTagsParams + for key, value := range r.Labels { + params = append(params, queries.InsertResourceTagsParams{ + Tag: labelToTag(key, value), + ResourceID: resourceID, + }) + } + + if _, err := st.qu.InsertResourceTags(ctx, params); err != nil { + return err + } + + for key, dependsOn := range r.Spec.Dependencies { + if err := st.qu.InsertResourceDependency(ctx, queries.InsertResourceDependencyParams{ + Urn: dependsOn, + ResourceID: resourceID, + DependencyKey: key, + }); err != nil { + return err + } + } + + if saveRevision { + revisionParams := queries.InsertRevisionParams{ + ResourceID: resourceID, + Reason: reason, + SpecConfigs: r.Spec.Configs, + CreatedBy: r.UpdatedBy, + } + if err := st.qu.InsertRevision(ctx, revisionParams); err != nil { + return translateSQLErr(err) + } + } + + return runAllHooks(ctx, hooks) + } + + if err := withinTx(ctx, st.pgx, false, createResource); err != nil { + return translateSQLErr(err) + } + return nil +} + +func (st *Store) Delete(ctx context.Context, urn string, hooks ...resource.MutationHook) error { + deleteResource := func(ctx context.Context, tx pgx.Tx) error { + txQueries := st.qu.WithTx(tx) + if err := txQueries.DeleteResourceTagsByURN(ctx, urn); err != nil { + return err + } + if err := txQueries.DeleteResourceDependenciesByURN(ctx, urn); err != nil { + return err + } + if err := txQueries.DeleteResourceByURN(ctx, urn); err != nil { + return err + } + return runAllHooks(ctx, hooks) + } + + if err := withinTx(ctx, st.pgx, false, deleteResource); err != nil { + return translateSQLErr(err) + } + return nil +} + +func (st *Store) Revisions(ctx context.Context, selector resource.RevisionsSelector) ([]resource.Revision, error) { + revs, err := st.qu.ListResourceRevisions(ctx, selector.URN) + if err != nil { + return nil, translateSQLErr(err) + } + + depsJSON, err := st.qu.GetResourceDependencies(ctx, selector.URN) + if err != nil { + return nil, translateSQLErr(err) + } + + var deps map[string]string + if len(depsJSON) > 0 { + if err := json.Unmarshal(depsJSON, &deps); err != nil { + return nil, err + } + } + + var result []resource.Revision + for _, rev := range revs { + result = append(result, resource.Revision{ + ID: rev.ID, + URN: selector.URN, + Reason: rev.Reason, + Labels: tagsToLabelMap(rev.Tags), + CreatedAt: rev.CreatedAt.Time, + CreatedBy: rev.CreatedBy, + Spec: resource.Spec{ + Configs: rev.SpecConfigs, + Dependencies: deps, + }, + }) + } + + return result, nil +} + +func (st *Store) SyncOne(ctx context.Context, syncFn resource.SyncFn) error { + urn, err := st.fetchResourceForSync(ctx) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // No resource available for sync. + return nil + } + return err + } + + cur, err := st.GetByURN(ctx, urn) + if err != nil { + return err + } + + synced, err := st.handleDequeued(ctx, *cur, syncFn) + if err != nil { + return err + } + + return st.Update(ctx, *synced, false, "sync") +} + +func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) { + var urn string + + err := withinTx(ctx, st.pgx, false, func(ctx context.Context, tx pgx.Tx) error { + // find a resource ready for sync, extend it next sync time atomically. + // this ensures multiple workers do not pick up same resources for sync. + u, err := st.qu.FetchResourceForSync(ctx) + if err != nil { + return err + } + urn = u + + return st.extendWaitTime(ctx, urn) + }) + if err != nil { + return "", err + } + + return urn, nil +} + +func (st *Store) handleDequeued(baseCtx context.Context, res resource.Resource, fn resource.SyncFn) (*resource.Resource, error) { + runCtx, cancel := context.WithCancel(baseCtx) + defer cancel() + + // Run heartbeat to keep the resource being picked up by some other syncer + // thread. If heartbeat exits, runCtx will be cancelled and fn should exit. + go st.runHeartbeat(runCtx, cancel, res.URN) + + return fn(runCtx, res) +} + +func (st *Store) runHeartbeat(ctx context.Context, cancel context.CancelFunc, id string) { + defer cancel() + + tick := time.NewTicker(st.refreshInterval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-tick.C: + if err := st.extendWaitTime(ctx, id); err != nil { + return + } + } + } +} + +func (st *Store) extendWaitTime(ctx context.Context, urn string) error { + var column2 pgtype.Text + err := column2.Scan(st.extendInterval.String()) + if err != nil { + return err + } + + return st.qu.ExtendWaitTime(ctx, queries.ExtendWaitTimeParams{ + Urn: urn, + Column2: column2, + }) +} + +func depsBytesToMap(dependencies []byte) (map[string]string, error) { + deps := map[string]string{} + if len(dependencies) > 0 { + if err := json.Unmarshal(dependencies, &deps); err != nil { + return nil, err + } + + for k := range deps { + if k != "" { + break + } + deps = map[string]string{} + } + } + + return deps, nil +} diff --git a/internal/store/pgsql/schema.sql b/internal/store/pgsql/schema.sql new file mode 100644 index 00000000..68f0b53e --- /dev/null +++ b/internal/store/pgsql/schema.sql @@ -0,0 +1,77 @@ +CREATE TABLE IF NOT EXISTS modules +( + urn TEXT NOT NULL PRIMARY KEY, + name TEXT NOT NULL, + project TEXT NOT NULL, + configs bytea NOT NULL, + created_at timestamptz NOT NULL DEFAULT current_timestamp, + updated_at timestamptz NOT NULL DEFAULT current_timestamp +); +CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project); + +CREATE TABLE IF NOT EXISTS resources +( + id BIGSERIAL NOT NULL PRIMARY KEY, + urn TEXT NOT NULL UNIQUE, + kind TEXT NOT NULL, + name TEXT NOT NULL, + project TEXT NOT NULL, + created_at timestamptz NOT NULL DEFAULT current_timestamp, + updated_at timestamptz NOT NULL DEFAULT current_timestamp, + spec_configs bytea NOT NULL, + state_status TEXT NOT NULL, + state_output bytea NOT NULL, + state_module_data bytea NOT NULL, + state_next_sync timestamptz, + state_sync_result bytea +); +CREATE INDEX IF NOT EXISTS idx_resources_kind ON resources (kind); +CREATE INDEX IF NOT EXISTS idx_resources_project ON resources (project); +CREATE INDEX IF NOT EXISTS idx_resources_state_status ON resources (state_status); +CREATE INDEX IF NOT EXISTS idx_resources_next_sync ON resources (state_next_sync); + +CREATE TABLE IF NOT EXISTS resource_dependencies +( + resource_id BIGINT NOT NULL REFERENCES resources (id), + dependency_key TEXT NOT NULL, + depends_on BIGINT NOT NULL REFERENCES resources (id), + + UNIQUE (resource_id, dependency_key) +); + +CREATE TABLE IF NOT EXISTS resource_tags +( + tag TEXT NOT NULL, + resource_id BIGINT NOT NULL REFERENCES resources (id), + + UNIQUE (resource_id, tag) +); +CREATE INDEX IF NOT EXISTS idx_resource_tags_resource_id ON resource_tags (resource_id); +CREATE INDEX IF NOT EXISTS idx_resource_tags_tag ON resource_tags (tag); + +CREATE TABLE IF NOT EXISTS revisions +( + id BIGSERIAL NOT NULL PRIMARY KEY, + reason TEXT NOT NULL DEFAULT '', + created_at timestamptz NOT NULL DEFAULT current_timestamp, + resource_id BIGINT NOT NULL REFERENCES resources (id), + spec_configs bytea NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_revisions_resource_id ON revisions (resource_id); +CREATE INDEX IF NOT EXISTS idx_revisions_created_at ON revisions (created_at); + +CREATE TABLE IF NOT EXISTS revision_tags +( + tag TEXT NOT NULL, + revision_id BIGINT NOT NULL REFERENCES revisions (id), + + UNIQUE (revision_id, tag) +); +CREATE INDEX IF NOT EXISTS idx_revision_tags_revision_id ON revision_tags (revision_id); +CREATE INDEX IF NOT EXISTS idx_revision_tags_tag ON revision_tags (tag); + +ALTER TABLE resources + ADD COLUMN IF NOT EXISTS created_by TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS updated_by TEXT NOT NULL DEFAULT ''; + +ALTER TABLE revisions ADD COLUMN IF NOT EXISTS created_by TEXT NOT NULL DEFAULT ''; \ No newline at end of file diff --git a/internal/store/pgsql/sqlc.yaml b/internal/store/pgsql/sqlc.yaml new file mode 100644 index 00000000..0fc5d9c2 --- /dev/null +++ b/internal/store/pgsql/sqlc.yaml @@ -0,0 +1,11 @@ +version: "2" + +sql: + - engine: "postgresql" + queries: "queries.sql" + schema: "schema.sql" + gen: + go: + package: "queries" + out: "queries" + sql_package: "pgx/v5" \ No newline at end of file