Skip to content

feat: use sqlc to write queries #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/serve.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import (
"github.com/goto/entropy/core"
"github.com/goto/entropy/core/module"
entropyserver "github.com/goto/entropy/internal/server"
"github.com/goto/entropy/internal/store/postgres"
"github.com/goto/entropy/internal/store/pgsql"
"github.com/goto/entropy/modules"
"github.com/goto/entropy/modules/firehose"
"github.com/goto/entropy/modules/job"
@@ -96,8 +96,8 @@ func setupRegistry() module.Registry {
return registry
}

func setupStorage(pgConStr string, syncCfg syncerConf) *postgres.Store {
store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy)
func setupStorage(pgConStr string, syncCfg syncerConf) *pgsql.Store {
store, err := pgsql.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy)
if err != nil {
zap.L().Fatal("failed to connect to Postgres database",
zap.Error(err), zap.String("conn_str", pgConStr))
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ require (
github.com/goto/salt v0.3.3
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/jackc/pgx/v5 v5.4.3
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.9
github.com/mcuadros/go-defaults v1.2.0
@@ -202,6 +203,8 @@ require (
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/newrelic/csec-go-agent v0.4.0 // indirect
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 // indirect
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -912,6 +912,7 @@ github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfG
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
@@ -923,6 +924,8 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX
github.com/jackc/pgproto3/v2 v2.0.7/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
@@ -937,6 +940,8 @@ github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXg
github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o=
github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg=
github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
77 changes: 77 additions & 0 deletions internal/store/pgsql/modules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pgsql

import (
"context"

"github.com/goto/entropy/core/module"
"github.com/goto/entropy/internal/store/pgsql/queries"
)

func (st *Store) GetModule(ctx context.Context, urn string) (*module.Module, error) {
mod, err := st.qu.GetModuleByURN(ctx, urn)
if err != nil {
return nil, translateSQLErr(err)
}

return &module.Module{
URN: mod.Urn,
Name: mod.Name,
Project: mod.Project,
Configs: mod.Configs,
CreatedAt: mod.CreatedAt.Time,
UpdatedAt: mod.UpdatedAt.Time,
}, nil
}

func (st *Store) ListModules(ctx context.Context, project string) ([]module.Module, error) {
mods, err := st.qu.ListAllModulesForProject(ctx, project)
if err != nil {
return nil, translateSQLErr(err)
}

var modules []module.Module
for _, mod := range mods {
modules = append(modules, module.Module{
URN: mod.Urn,
Name: mod.Name,
Project: mod.Project,
Configs: mod.Configs,
CreatedAt: mod.CreatedAt.Time,
UpdatedAt: mod.UpdatedAt.Time,
})
}

return modules, nil
}

func (st *Store) CreateModule(ctx context.Context, m module.Module) error {
params := queries.InsertModuleParams{
Urn: m.URN,
Project: m.Project,
Name: m.Name,
Configs: m.Configs,
}

if err := st.qu.InsertModule(ctx, params); err != nil {
return translateSQLErr(err)
}
return nil
}

func (st *Store) UpdateModule(ctx context.Context, m module.Module) error {
params := queries.UpdateModuleParams{
Urn: m.URN,
Configs: m.Configs,
}
if err := st.qu.UpdateModule(ctx, params); err != nil {
return translateSQLErr(err)
}
return nil
}

func (st *Store) DeleteModule(ctx context.Context, urn string) error {
if err := st.qu.DeleteModule(ctx, urn); err != nil {
return translateSQLErr(err)
}
return nil
}
132 changes: 132 additions & 0 deletions internal/store/pgsql/pgsql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package pgsql

import (
"context"
"database/sql"
_ "embed"
"fmt"
"strings"
"time"

"github.com/jackc/pgx/v5"
_ "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"

"github.com/goto/entropy/core/resource"
"github.com/goto/entropy/internal/store/pgsql/queries"
"github.com/goto/entropy/pkg/errors"
)

//go:generate sqlc generate

// schema represents the storage schema.
// Note: Update the constants above if the table name is changed.
//
//go:embed schema.sql
var schema string

type Store struct {
qu *queries.Queries
pgx *pgx.Conn
extendInterval time.Duration
refreshInterval time.Duration
}

func (st *Store) Migrate(ctx context.Context) error {
err := st.qu.Migrate(ctx, schema)
return err
}

func (st *Store) Close() error { return st.pgx.Close(context.Background()) }

// Open returns store instance backed by PostgresQL.
func Open(conStr string, refreshInterval, extendInterval time.Duration) (*Store, error) {
conn, err := pgx.Connect(context.Background(), conStr)
if err != nil {
return nil, err
} else if err := conn.Ping(context.Background()); err != nil {
_ = conn.Close(context.Background())
return nil, err
}

if refreshInterval >= extendInterval {
return nil, errors.New("refreshInterval must be lower than extendInterval")
}

return &Store{
qu: queries.New(conn),
pgx: conn,
extendInterval: extendInterval,
refreshInterval: refreshInterval,
}, nil
}

func translateSQLErr(err error) error {
if errors.Is(err, sql.ErrNoRows) {
return errors.ErrNotFound.WithCausef(err.Error())
}

var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
// Refer http://www.postgresql.org/docs/9.3/static/errcodes-appendix.html
switch pgErr.Code {
case "unique_violation":
return errors.ErrConflict.WithCausef(err.Error())

default:
return errors.ErrInternal.WithCausef(err.Error())
}
}

return err
}

func tagsToLabelMap(tags []string) map[string]string {
const keyValueParts = 2

labels := map[string]string{}
for _, tag := range tags {
parts := strings.SplitN(tag, "=", keyValueParts)
key, val := parts[0], parts[1]
labels[key] = val
}
return labels
}

func labelToTag(k, v string) string {
return fmt.Sprintf("%s=%s", k, v)
}

type TxFunc func(ctx context.Context, tx pgx.Tx) error

func withinTx(ctx context.Context, db *pgx.Conn, readOnly bool, fns ...TxFunc) error {
var opts pgx.TxOptions
if readOnly {
opts.AccessMode = pgx.ReadOnly
} else {
opts.AccessMode = pgx.ReadWrite
}

tx, err := db.BeginTx(ctx, opts)
if err != nil {
return err
}

for _, fn := range fns {
if err := fn(ctx, tx); err != nil {
_ = tx.Rollback(ctx)
return err
}
}

return tx.Commit(ctx)
}

func runAllHooks(ctx context.Context, hooks []resource.MutationHook) error {
for _, hook := range hooks {
if err := hook(ctx); err != nil {
return err
}
}
return nil
}
126 changes: 126 additions & 0 deletions internal/store/pgsql/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
-- =================== Modules ===================

-- name: GetModuleByURN :one
SELECT *
FROM modules
WHERE urn = $1;

-- name: ListAllModulesForProject :many
SELECT *
FROM modules
WHERE project = $1;

-- name: InsertModule :exec
INSERT INTO modules (urn, project, name, configs)
VALUES ($1, $2, $3, $4);

-- name: UpdateModule :exec
UPDATE modules
SET configs = $2,
updated_at = current_timestamp
WHERE urn = $1;

-- name: DeleteModule :exec
DELETE
FROM modules
WHERE urn = $1;

-- =================== Resources ===================

-- name: GetResourceByURN :one
SELECT r.*,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_tags rt ON r.id = rt.resource_id
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
WHERE r.urn = $1
GROUP BY r.id;

-- name: GetResourceDependencies :one
SELECT jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
WHERE r.urn = $1
GROUP BY r.id;

-- name: ListResourceURNsByFilter :many
SELECT r.*,
array_agg(rt.tag)::text[] AS tags,
jsonb_object_agg(COALESCE(rd.dependency_key, ''), d.urn) AS dependencies
FROM resources r
LEFT JOIN resource_dependencies rd ON r.id = rd.resource_id
LEFT JOIN resources d ON rd.depends_on = d.id
LEFT JOIN resource_tags rt ON r.id = rt.resource_id
WHERE (sqlc.narg('project')::text IS NULL OR r.project = sqlc.narg('project'))
AND (sqlc.narg('kind')::text IS NULL OR r.kind = sqlc.narg('kind'))
GROUP BY r.id;

-- name: DeleteResourceDependenciesByURN :exec
DELETE
FROM resource_dependencies
WHERE resource_id = (SELECT id FROM resources WHERE urn = $1);

-- name: DeleteResourceTagsByURN :exec
DELETE
FROM resource_tags
WHERE resource_id = (SELECT id FROM resources WHERE urn = $1);

-- name: DeleteResourceByURN :exec
DELETE
FROM resources
WHERE urn = $1;

-- name: InsertResource :one
INSERT INTO resources ("urn", "kind", "project", "name", "created_at", "updated_at", "created_by", "updated_by",
"spec_configs", "state_status", "state_output", "state_module_data",
"state_next_sync", "state_sync_result")
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING id;

-- name: FetchResourceForSync :one
SELECT urn FROM resources
Where state_next_sync <= current_timestamp
FOR UPDATE SKIP LOCKED;

-- name: ExtendWaitTime :exec
UPDATE resources
SET state_next_sync = current_timestamp + ($2 ||' seconds')::interval
WHERE urn = $1;

-- name: InsertResourceTags :copyfrom
INSERT INTO resource_tags (resource_id, tag)
VALUES ($1, $2);

-- name: InsertResourceDependency :exec
INSERT INTO resource_dependencies (resource_id, dependency_key, depends_on)
VALUES ($1, $2, (SELECT id FROM resources WHERE urn = $3));

-- name: UpdateResource :one
UPDATE resources
SET updated_at = current_timestamp,
updated_by = $2,
spec_configs = $3,
state_status = $4,
state_output = $5,
state_module_data = $6,
state_next_sync = $7,
state_sync_result = $8
WHERE urn = $1
RETURNING id;

-- =================== Revisions ===================

-- name: ListResourceRevisions :many
SELECT rev.*, array_agg(distinct rt.tag)::text[] AS tags
FROM resources r
JOIN revisions rev ON r.id = rev.resource_id
JOIN revision_tags rt ON rev.id = rt.revision_id
WHERE r.urn = $1
GROUP BY rev.id;

-- name: InsertRevision :exec
INSERT INTO revisions ("resource_id", "reason", "spec_configs", "created_by")
VALUES ($1, $2, $3, $4);
43 changes: 43 additions & 0 deletions internal/store/pgsql/queries/copyfrom.go
33 changes: 33 additions & 0 deletions internal/store/pgsql/queries/db.go
11 changes: 11 additions & 0 deletions internal/store/pgsql/queries/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package queries

import (
"context"
_ "embed"
)

func (q *Queries) Migrate(ctx context.Context, schema string) error {
_, err := q.db.Exec(ctx, schema)
return err
}
61 changes: 61 additions & 0 deletions internal/store/pgsql/queries/models.go
510 changes: 510 additions & 0 deletions internal/store/pgsql/queries/queries.sql.go

Large diffs are not rendered by default.

405 changes: 405 additions & 0 deletions internal/store/pgsql/resources.go

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions internal/store/pgsql/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
CREATE TABLE IF NOT EXISTS modules
(
urn TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
project TEXT NOT NULL,
configs bytea NOT NULL,
created_at timestamptz NOT NULL DEFAULT current_timestamp,
updated_at timestamptz NOT NULL DEFAULT current_timestamp
);
CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project);

CREATE TABLE IF NOT EXISTS resources
(
id BIGSERIAL NOT NULL PRIMARY KEY,
urn TEXT NOT NULL UNIQUE,
kind TEXT NOT NULL,
name TEXT NOT NULL,
project TEXT NOT NULL,
created_at timestamptz NOT NULL DEFAULT current_timestamp,
updated_at timestamptz NOT NULL DEFAULT current_timestamp,
spec_configs bytea NOT NULL,
state_status TEXT NOT NULL,
state_output bytea NOT NULL,
state_module_data bytea NOT NULL,
state_next_sync timestamptz,
state_sync_result bytea
);
CREATE INDEX IF NOT EXISTS idx_resources_kind ON resources (kind);
CREATE INDEX IF NOT EXISTS idx_resources_project ON resources (project);
CREATE INDEX IF NOT EXISTS idx_resources_state_status ON resources (state_status);
CREATE INDEX IF NOT EXISTS idx_resources_next_sync ON resources (state_next_sync);

CREATE TABLE IF NOT EXISTS resource_dependencies
(
resource_id BIGINT NOT NULL REFERENCES resources (id),
dependency_key TEXT NOT NULL,
depends_on BIGINT NOT NULL REFERENCES resources (id),

UNIQUE (resource_id, dependency_key)
);

CREATE TABLE IF NOT EXISTS resource_tags
(
tag TEXT NOT NULL,
resource_id BIGINT NOT NULL REFERENCES resources (id),

UNIQUE (resource_id, tag)
);
CREATE INDEX IF NOT EXISTS idx_resource_tags_resource_id ON resource_tags (resource_id);
CREATE INDEX IF NOT EXISTS idx_resource_tags_tag ON resource_tags (tag);

CREATE TABLE IF NOT EXISTS revisions
(
id BIGSERIAL NOT NULL PRIMARY KEY,
reason TEXT NOT NULL DEFAULT '<unknown>',
created_at timestamptz NOT NULL DEFAULT current_timestamp,
resource_id BIGINT NOT NULL REFERENCES resources (id),
spec_configs bytea NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_revisions_resource_id ON revisions (resource_id);
CREATE INDEX IF NOT EXISTS idx_revisions_created_at ON revisions (created_at);

CREATE TABLE IF NOT EXISTS revision_tags
(
tag TEXT NOT NULL,
revision_id BIGINT NOT NULL REFERENCES revisions (id),

UNIQUE (revision_id, tag)
);
CREATE INDEX IF NOT EXISTS idx_revision_tags_revision_id ON revision_tags (revision_id);
CREATE INDEX IF NOT EXISTS idx_revision_tags_tag ON revision_tags (tag);

ALTER TABLE resources
ADD COLUMN IF NOT EXISTS created_by TEXT NOT NULL DEFAULT '<unknown>',
ADD COLUMN IF NOT EXISTS updated_by TEXT NOT NULL DEFAULT '<unknown>';

ALTER TABLE revisions ADD COLUMN IF NOT EXISTS created_by TEXT NOT NULL DEFAULT '<unknown>';
11 changes: 11 additions & 0 deletions internal/store/pgsql/sqlc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "2"

sql:
- engine: "postgresql"
queries: "queries.sql"
schema: "schema.sql"
gen:
go:
package: "queries"
out: "queries"
sql_package: "pgx/v5"