From 914d821ad576ac5a1dc11a1cedfb48b63f2fc285 Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 19:47:46 +0530 Subject: [PATCH 1/8] feat: add pgx support to optimise inserts --- cmd/init.go | 107 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 87 insertions(+), 20 deletions(-) diff --git a/cmd/init.go b/cmd/init.go index eb6d114..34f2746 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -15,10 +15,13 @@ import ( "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/posflag" "github.com/knadh/koanf/v2" + flag "github.com/spf13/pflag" "github.com/zerodha/dungbeetle/v2/internal/core" "github.com/zerodha/dungbeetle/v2/internal/dbpool" + "github.com/zerodha/dungbeetle/v2/internal/resultbackends/pgxdb" "github.com/zerodha/dungbeetle/v2/internal/resultbackends/sqldb" + "github.com/zerodha/dungbeetle/v2/models" ) var ( @@ -174,27 +177,10 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) { return nil, err } - // Connect to result DBs. - resPool, err := dbpool.New(resDBs) - if err != nil { - return nil, err - } - // Initialize the result backend controller for every backend. - backends := make(core.ResultBackends) - for name, db := range resPool { - opt := sqldb.Opt{ - DBType: resDBs[name].Type, - ResultsTable: ko.MustString(fmt.Sprintf("results.%s.results_table", name)), - UnloggedTables: resDBs[name].Unlogged, - } - - backend, err := sqldb.NewSQLBackend(db, opt, log) - if err != nil { - return nil, fmt.Errorf("error initializing result backend: %w", err) - } - - backends[name] = backend + backends, err := initResultBackends(resDBs, ko, log) + if err != nil { + return nil, fmt.Errorf("error initializing result backends: %w", err) } if v := ko.MustString("job_queue.broker.type"); v != "redis" { @@ -242,3 +228,84 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) { return co, nil } + +// initResultBackends initializes all result backends defined in config. +func initResultBackends(resDBs map[string]dbpool.Config, ko *koanf.Koanf, logger *slog.Logger) (core.ResultBackends, error) { + backends := make(core.ResultBackends) + + for name, config := range resDBs { + var backend models.ResultBackend + var err error + + // Check if we should use optimized pgx backend for PostgreSQL + if config.Type == "postgres" { + backend, err = createPgxResultBackend(name, config, ko, logger) + if err != nil { + // Fall back to standard SQL backend on error + logger.Warn("failed to create pgx backend, falling back to standard SQL", + "name", name, "error", err) + + return nil, fmt.Errorf("error initializing result backend '%s': %w", name, err) + } else { + logger.Info("using optimized pgx backend", + "name", name, + ) + } + } else { + // Use standard SQL backend for other databases + backend, err = createSQLResultBackend(name, config, ko, logger) + } + + if err != nil { + return nil, fmt.Errorf("error initializing result backend '%s': %w", name, err) + } + + backends[name] = backend + } + + return backends, nil +} + +func createPgxResultBackend(name string, config dbpool.Config, ko *koanf.Koanf, logger *slog.Logger) (models.ResultBackend, error) { + opt := pgxdb.Opt{ + ResultsTable: ko.MustString(fmt.Sprintf("results.%s.results_table", name)), + UnloggedTables: config.Unlogged, + BatchInsert: config.BatchInsert, + MaxConns: config.MaxActiveConns, + MaxConnIdleTime: config.MaxIdleConns, + } + + if opt.ResultsTable == "" { + opt.ResultsTable = "results_%s" + } + + return pgxdb.NewPgxBackend(config.DSN, opt, logger) +} + +// createSQLResultBackend creates a standard SQL backend +func createSQLResultBackend(name string, config dbpool.Config, ko *koanf.Koanf, logger *slog.Logger) (models.ResultBackend, error) { + // Connect to result DB using standard database/sql + resPool, err := dbpool.New(map[string]dbpool.Config{ + name: config, + }) + if err != nil { + return nil, err + } + + db, ok := resPool[name] + if !ok { + return nil, fmt.Errorf("failed to get database connection for %s", name) + } + + opt := sqldb.Opt{ + DBType: config.Type, + ResultsTable: ko.MustString(fmt.Sprintf("results.%s.results_table", name)), + UnloggedTables: config.Unlogged, + } + + if opt.ResultsTable == "" { + opt.ResultsTable = "results_%s" + } + + return sqldb.NewSQLBackend(db, opt, logger) +} From 1b2c459159d2189a8c6cd9da603967f76aeb3bde Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 19:50:48 +0530 Subject: [PATCH 2/8] fix: convert ttl to seconds before converting it to int --- internal/core/core.go | 2 +- internal/dbpool/dbpool.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/core/core.go b/internal/core/core.go index de0423c..5f26594 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -380,7 +380,7 @@ func (co *Core) makeJob(j models.JobReq, taskName string) (tasqueue.Job, error) b, err := msgpack.Marshal(taskMeta{ Args: args, DB: j.DB, - TTL: int(ttl), + TTL: int(ttl.Seconds()), }) if err != nil { return tasqueue.Job{}, err diff --git a/internal/dbpool/dbpool.go b/internal/dbpool/dbpool.go index e690dd6..6dbca4c 100644 --- a/internal/dbpool/dbpool.go +++ b/internal/dbpool/dbpool.go @@ -21,6 +21,8 @@ type Config struct { MaxIdleConns int `koanf:"max_idle"` MaxActiveConns int `koanf:"max_active"` ConnectTimeout time.Duration `koanf:"connect_timeout"` + BatchSize int `koanf:"batch_size"` + BatchInsert bool `koanf:"batch_insert"` } // New takes a map of named DB configurations and returns a named map From 43f4197a00bb5f8e3064e6fb62d26561d56d54c8 Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 19:51:42 +0530 Subject: [PATCH 3/8] fix: refactored to handle only mysql db --- internal/resultbackends/sqldb/sqldb.go | 51 ++++---------------------- 1 file changed, 8 insertions(+), 43 deletions(-) diff --git a/internal/resultbackends/sqldb/sqldb.go b/internal/resultbackends/sqldb/sqldb.go index 3cb7266..db562da 100644 --- a/internal/resultbackends/sqldb/sqldb.go +++ b/internal/resultbackends/sqldb/sqldb.go @@ -16,11 +16,6 @@ import ( "github.com/zerodha/dungbeetle/v2/models" ) -const ( - dbTypePostgres = "postgres" - dbTypeMysql = "mysql" -) - // Opt represents SQL DB backend's options. type Opt struct { DBType string @@ -48,7 +43,6 @@ type SQLDBResultSet struct { taskName string colsWritten bool cols []string - rows [][]byte tx *sql.Tx tbl string @@ -122,14 +116,7 @@ func (w *SQLDBResultSet) RegisterColTypes(cols []string, colTypes []*sql.ColumnT ) for i := range w.cols { colNameHolder[i] = fmt.Sprintf(`"%s"`, w.cols[i]) - - // This will be filled by the driver. - if w.backend.opt.DBType == dbTypePostgres { - // Postgres placeholders are $1, $2 ... - colValHolder[i] = fmt.Sprintf("$%d", i+1) - } else { - colValHolder[i] = "?" - } + colValHolder[i] = "?" } ins := fmt.Sprintf(`INSERT INTO "%%s" (%s) `, strings.Join(colNameHolder, ",")) @@ -235,20 +222,13 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins for i := range cols { colNameHolder[i] = s.quoteIdentifier(cols[i]) + colValHolder[i] = "?" - // This will be filled by the driver. - if s.opt.DBType == dbTypePostgres { - // Postgres placeholders are $1, $2 ... - colValHolder[i] = fmt.Sprintf("$%d", i+1) - } else { - colValHolder[i] = "?" - } } var ( - fields = make([]string, len(cols)) - typ string - unlogged string + fields = make([]string, len(cols)) + typ string ) for i := 0; i < len(cols); i++ { @@ -265,15 +245,10 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins case "BOOLEAN": typ = "BOOLEAN" case "JSON", "JSONB": - if s.opt.DBType == dbTypePostgres { - typ = "JSONB" - } else { - typ = "JSON" - } + typ = "JSON" case "_INT4", "_INT8", "_TEXT": - if s.opt.DBType != dbTypePostgres { - typ = "TEXT" - } + typ = "TEXT" + case "VARCHAR": typ = "VARCHAR(255)" default: @@ -287,16 +262,9 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins fields[i] = fmt.Sprintf("%s %s", s.quoteIdentifier(cols[i]), typ) } - // If the DB is Postgres, optionally create an "unlogged" table that disables - // WAL, improving performance of throw-away cache tables. - // https://www.postgresql.org/docs/current/sql-createtable.html - if s.opt.DBType == dbTypePostgres && s.opt.UnloggedTables { - unlogged = "UNLOGGED" - } - return insertSchema{ dropTable: fmt.Sprintf("DROP TABLE IF EXISTS %s;", s.quoteIdentifier("%s")), - createTable: fmt.Sprintf("CREATE %s TABLE IF NOT EXISTS %s (%s);", unlogged, s.quoteIdentifier("%s"), strings.Join(fields, ",")), + createTable: fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);", s.quoteIdentifier("%s"), strings.Join(fields, ",")), insertRow: fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", s.quoteIdentifier("%s"), strings.Join(colNameHolder, ","), @@ -306,9 +274,6 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins // quoteIdentifier quotes an identifier (table or column name) based on the database type func (s *SqlDB) quoteIdentifier(name string) string { - if s.opt.DBType == dbTypePostgres { - return fmt.Sprintf(`"%s"`, name) - } // MySQL uses backticks return fmt.Sprintf("`%s`", name) } From 7bc58c2821ae5c98cd0198e7bb324c24f957baaa Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 19:52:06 +0530 Subject: [PATCH 4/8] fix: added pgx support --- go.mod | 12 + go.sum | 27 ++ internal/resultbackends/pgxdb/pgxdb.go | 381 +++++++++++++++++++++++++ 3 files changed, 420 insertions(+) create mode 100644 internal/resultbackends/pgxdb/pgxdb.go diff --git a/go.mod b/go.mod index f71ec3c..ea9b48c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-chi/chi/v5 v5.2.2 github.com/go-sql-driver/mysql v1.9.3 github.com/gofrs/uuid/v5 v5.3.2 + github.com/jackc/pgx/v5 v5.7.6 github.com/kalbhor/tasqueue/v2 v2.2.1 github.com/knadh/goyesql/v2 v2.2.0 github.com/knadh/koanf/parsers/toml v0.1.0 @@ -16,6 +17,7 @@ require ( github.com/knadh/koanf/providers/file v1.2.0 github.com/knadh/koanf/providers/posflag v1.0.1 github.com/knadh/koanf/v2 v2.2.2 + github.com/labstack/gommon v0.4.2 github.com/lib/pq v1.10.9 github.com/spf13/pflag v1.0.7 github.com/stretchr/testify v1.10.0 @@ -35,12 +37,19 @@ require ( github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect @@ -48,8 +57,11 @@ require ( go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect + golang.org/x/crypto v0.41.0 // indirect golang.org/x/net v0.43.0 // indirect + golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.36.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index d9bdd11..b6265a7 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,14 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/kalbhor/tasqueue/v2 v2.2.1 h1:jSRcbPYdF1qMeuizgdcVBCcFKx0fInMOxTPHoc6A//w= github.com/kalbhor/tasqueue/v2 v2.2.1/go.mod h1:OOPWDU65QhGlzq9fpyW2pBvXrsPzpHiVBtrIaDgn+Rc= @@ -65,9 +73,16 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= +github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -93,8 +108,13 @@ github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M= github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= +github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= @@ -116,6 +136,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -125,11 +147,15 @@ golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -160,5 +186,6 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/resultbackends/pgxdb/pgxdb.go b/internal/resultbackends/pgxdb/pgxdb.go new file mode 100644 index 0000000..7442411 --- /dev/null +++ b/internal/resultbackends/pgxdb/pgxdb.go @@ -0,0 +1,381 @@ +// Package pgxdb is an optimized PostgreSQL backend implementation using pgx +// for high-performance bulk operations with COPY protocol support. +package pgxdb + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "strings" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/zerodha/dungbeetle/v2/models" +) + +// Opt represents PostgreSQL backend options. +type Opt struct { + ResultsTable string + UnloggedTables bool + BatchInsert bool // Enable bulk inserts + BatchSize int // Batch size for COPY operations + MaxConns int + MaxConnIdleTime int +} + +// PgxDB represents the optimized PostgreSQL backend. +type PgxDB struct { + pool *pgxpool.Pool + opt Opt + logger *slog.Logger + + // Cache for result schemas + resTableSchemas map[string]insertSchema + schemaMutex sync.RWMutex +} + +// PgxResultSet represents a writer that saves results using pgx. +type PgxResultSet struct { + jobID string + taskName string + colsWritten bool + cols []string + colTypes []*sql.ColumnType + + // For COPY protocol + rows [][]interface{} + rowBuffer sync.Pool + + // For standard inserts + tx pgx.Tx + tbl string + + backend *PgxDB + ctx context.Context + cancel context.CancelFunc +} + +// insertSchema contains the generated SQL for creating tables and inserting rows. +type insertSchema struct { + dropTable string + createTable string + insertRow string + copyColumns []string +} + +// NewPgxBackend returns a new pgx result backend instance. +func NewPgxBackend(connString string, opt Opt, lo *slog.Logger) (*PgxDB, error) { + config, err := pgxpool.ParseConfig(connString) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } + if opt.MaxConns == 0 { + opt.MaxConns = 5 + } + // Optimize connection pool settings + config.MaxConns = int32(opt.MaxConns) + config.MinConns = 5 + config.MaxConnLifetime = time.Hour + config.MaxConnIdleTime = time.Minute * time.Duration(opt.MaxConnIdleTime) + + pool, err := pgxpool.NewWithConfig(context.Background(), config) + if err != nil { + return nil, fmt.Errorf("failed to create connection pool: %w", err) + } + + // Test connection + if err := pool.Ping(context.Background()); err != nil { + pool.Close() + return nil, fmt.Errorf("failed to ping database: %w", err) + } + + s := &PgxDB{ + pool: pool, + opt: opt, + resTableSchemas: make(map[string]insertSchema), + logger: lo, + } + + if opt.BatchSize == 0 { + s.opt.BatchSize = 5000 + } + + return s, nil +} + +// NewResultSet returns a new instance of a pgx result writer. +func (p *PgxDB) NewResultSet(jobID, taskName string, ttl time.Duration) (models.ResultSet, error) { + ctx, cancel := context.WithTimeout(context.Background(), ttl) + + tx, err := p.pool.Begin(ctx) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + + rs := &PgxResultSet{ + jobID: jobID, + taskName: taskName, + backend: p, + tbl: fmt.Sprintf(p.opt.ResultsTable, jobID), + tx: tx, + ctx: ctx, + cancel: cancel, + rows: make([][]interface{}, 0, p.opt.BatchSize), + } + + // Initialize row buffer pool for memory efficiency + rs.rowBuffer = sync.Pool{ + New: func() interface{} { + return make([]interface{}, 0, 20) // Adjust based on typical column count + }, + } + + return rs, nil +} + +// RegisterColTypes registers the column types for a task's result set. +func (w *PgxResultSet) RegisterColTypes(cols []string, colTypes []*sql.ColumnType) error { + if w.IsColTypesRegistered() { + return errors.New("column types are already registered") + } + + w.cols = make([]string, len(cols)) + copy(w.cols, cols) + w.colTypes = colTypes + + // Create schema and cache it + w.backend.schemaMutex.Lock() + w.backend.resTableSchemas[w.taskName] = w.backend.createTableSchema(cols, colTypes) + w.backend.schemaMutex.Unlock() + + return nil +} + +// IsColTypesRegistered checks if column types are registered. +func (w *PgxResultSet) IsColTypesRegistered() bool { + w.backend.schemaMutex.RLock() + _, ok := w.backend.resTableSchemas[w.taskName] + w.backend.schemaMutex.RUnlock() + return ok +} + +// WriteCols creates the results table. +func (w *PgxResultSet) WriteCols(cols []string) error { + if w.colsWritten { + return fmt.Errorf("columns for '%s' are already written", w.taskName) + } + + w.backend.schemaMutex.RLock() + rSchema, ok := w.backend.resTableSchemas[w.taskName] + w.backend.schemaMutex.RUnlock() + + if !ok { + return fmt.Errorf("column types for '%s' have not been registered", w.taskName) + } + + // Create table in a separate transaction for DDL + ddlTx, err := w.backend.pool.Begin(w.ctx) + if err != nil { + return fmt.Errorf("failed to begin DDL transaction: %w", err) + } + defer ddlTx.Rollback(w.ctx) + + // Drop existing table + fmt.Println("Creating results table:", w.tbl) + if _, err := ddlTx.Exec(w.ctx, fmt.Sprintf(rSchema.dropTable, w.tbl)); err != nil { + return fmt.Errorf("failed to drop table: %w", err) + } + + // Create new table + if _, err := ddlTx.Exec(w.ctx, fmt.Sprintf(rSchema.createTable, w.tbl)); err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + + if err := ddlTx.Commit(w.ctx); err != nil { + return fmt.Errorf("failed to commit DDL transaction: %w", err) + } + + w.colsWritten = true + return nil +} + +// WriteRow writes a row to the result set. +func (w *PgxResultSet) WriteRow(row []interface{}) error { + if !w.colsWritten { + return errors.New("columns must be written before rows") + } + + if w.backend.opt.BatchInsert { + // Buffer rows for COPY protocol + rowCopy := make([]interface{}, len(row)) + copy(rowCopy, row) + w.rows = append(w.rows, rowCopy) + + // Flush if we've reached batch size + if len(w.rows) >= w.backend.opt.BatchSize { + fmt.Println("Flushing batch of rows using COPY protocol ", len(w.rows)) + return w.flushCopy() + } + return nil + } + + // Standard insert for non-COPY mode + w.backend.schemaMutex.RLock() + rSchema, ok := w.backend.resTableSchemas[w.taskName] + w.backend.schemaMutex.RUnlock() + + if !ok { + return fmt.Errorf("schema not found for task '%s'", w.taskName) + } + + _, err := w.tx.Exec(w.ctx, fmt.Sprintf(rSchema.insertRow, w.tbl), row...) + return err +} + +// flushCopy performs bulk insert using COPY protocol. +func (w *PgxResultSet) flushCopy() error { + if len(w.rows) == 0 { + return nil + } + + w.backend.schemaMutex.RLock() + rSchema, ok := w.backend.resTableSchemas[w.taskName] + w.backend.schemaMutex.RUnlock() + + if !ok { + return fmt.Errorf("schema not found for task '%s'", w.taskName) + } + + // Use COPY protocol for bulk insert + conn, err := w.backend.pool.Acquire(w.ctx) + if err != nil { + return fmt.Errorf("failed to acquire connection: %w", err) + } + defer conn.Release() + + copyCount, err := conn.Conn().CopyFrom( + w.ctx, + pgx.Identifier{w.tbl}, + rSchema.copyColumns, + pgx.CopyFromRows(w.rows), + ) + + if err != nil { + return fmt.Errorf("COPY failed: %w", err) + } + + if int(copyCount) != len(w.rows) { + return fmt.Errorf("expected to copy %d rows, but copied %d", len(w.rows), copyCount) + } + + // Clear the buffer + w.rows = w.rows[:0] + + return nil +} + +// Flush flushes any buffered rows and commits the transaction. +func (w *PgxResultSet) Flush() error { + // Flush any remaining buffered rows if using COPY + if w.backend.opt.BatchInsert && len(w.rows) > 0 { + if err := w.flushCopy(); err != nil { + return err + } + } + + // Commit transaction for standard inserts + if w.tx != nil { + return w.tx.Commit(w.ctx) + } + + return nil +} + +// Close closes the result set and releases resources. +func (w *PgxResultSet) Close() error { + defer w.cancel() + + if w.tx != nil { + return w.tx.Rollback(w.ctx) + } + + return nil +} + +// createTableSchema generates SQL schemas for table operations. +func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) insertSchema { + var ( + colNameHolder = make([]string, len(cols)) + colValHolder = make([]string, len(cols)) + copyColumns = make([]string, len(cols)) + fields = make([]string, len(cols)) + ) + + for i, col := range cols { + quotedCol := fmt.Sprintf(`"%s"`, col) + colNameHolder[i] = quotedCol + colValHolder[i] = fmt.Sprintf("$%d", i+1) + copyColumns[i] = col // Unquoted for COPY + + // Map SQL types to PostgreSQL types + typ := mapColumnType(colTypes[i]) + + // Add NOT NULL constraint if applicable + if nullable, ok := colTypes[i].Nullable(); ok && !nullable { + typ += " NOT NULL" + } + + fields[i] = fmt.Sprintf("%s %s", quotedCol, typ) + } + + // Build unlogged table modifier if requested + unlogged := "" + if p.opt.UnloggedTables { + unlogged = "UNLOGGED" + } + + return insertSchema{ + dropTable: `DROP TABLE IF EXISTS "%s" CASCADE`, + createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s)`, + unlogged, strings.Join(fields, ", ")), + insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, + strings.Join(colNameHolder, ", "), + strings.Join(colValHolder, ", ")), + copyColumns: copyColumns, + } +} + +// mapColumnType maps database type names to PostgreSQL types. +func mapColumnType(colType *sql.ColumnType) string { + typeName := colType.DatabaseTypeName() + + switch strings.ToUpper(typeName) { + case "INT2", "INT4", "INT8", "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": + return "BIGINT" + case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC", "FLOAT32", "FLOAT64": + return "DECIMAL" + case "TIMESTAMP", "DATETIME": + return "TIMESTAMP" + case "DATE": + return "DATE" + case "BOOLEAN": + return "BOOLEAN" + case "JSON", "JSONB": + return "JSONB" + case "VARCHAR": + return "VARCHAR(255)" + default: + return "TEXT" + } +} + +// Close closes the database connection pool. +func (p *PgxDB) Close() error { + p.pool.Close() + return nil +} From 9ac707730ec45f146422ac591db406dbfaa29832 Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 19:59:25 +0530 Subject: [PATCH 5/8] fix: added comments --- internal/resultbackends/pgxdb/pgxdb.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/internal/resultbackends/pgxdb/pgxdb.go b/internal/resultbackends/pgxdb/pgxdb.go index 7442411..ee24782 100644 --- a/internal/resultbackends/pgxdb/pgxdb.go +++ b/internal/resultbackends/pgxdb/pgxdb.go @@ -22,7 +22,7 @@ type Opt struct { ResultsTable string UnloggedTables bool BatchInsert bool // Enable bulk inserts - BatchSize int // Batch size for COPY operations + BatchSize int // Batch size for bulk operations MaxConns int MaxConnIdleTime int } @@ -46,7 +46,7 @@ type PgxResultSet struct { cols []string colTypes []*sql.ColumnType - // For COPY protocol + // For batch processing rows [][]interface{} rowBuffer sync.Pool @@ -211,20 +211,19 @@ func (w *PgxResultSet) WriteRow(row []interface{}) error { } if w.backend.opt.BatchInsert { - // Buffer rows for COPY protocol + // Buffer rows for batch processing. rowCopy := make([]interface{}, len(row)) copy(rowCopy, row) w.rows = append(w.rows, rowCopy) // Flush if we've reached batch size if len(w.rows) >= w.backend.opt.BatchSize { - fmt.Println("Flushing batch of rows using COPY protocol ", len(w.rows)) - return w.flushCopy() + return w.flushBatch() } return nil } - // Standard insert for non-COPY mode + // Standard insert for non-batch mode w.backend.schemaMutex.RLock() rSchema, ok := w.backend.resTableSchemas[w.taskName] w.backend.schemaMutex.RUnlock() @@ -237,8 +236,8 @@ func (w *PgxResultSet) WriteRow(row []interface{}) error { return err } -// flushCopy performs bulk insert using COPY protocol. -func (w *PgxResultSet) flushCopy() error { +// flushBatch performs bulk insert using batch process. +func (w *PgxResultSet) flushBatch() error { if len(w.rows) == 0 { return nil } @@ -251,7 +250,7 @@ func (w *PgxResultSet) flushCopy() error { return fmt.Errorf("schema not found for task '%s'", w.taskName) } - // Use COPY protocol for bulk insert + // Use batch process for bulk insert conn, err := w.backend.pool.Acquire(w.ctx) if err != nil { return fmt.Errorf("failed to acquire connection: %w", err) @@ -281,9 +280,9 @@ func (w *PgxResultSet) flushCopy() error { // Flush flushes any buffered rows and commits the transaction. func (w *PgxResultSet) Flush() error { - // Flush any remaining buffered rows if using COPY + // Flush any remaining buffered rows if using batch inserts if w.backend.opt.BatchInsert && len(w.rows) > 0 { - if err := w.flushCopy(); err != nil { + if err := w.flushBatch(); err != nil { return err } } @@ -320,7 +319,7 @@ func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins quotedCol := fmt.Sprintf(`"%s"`, col) colNameHolder[i] = quotedCol colValHolder[i] = fmt.Sprintf("$%d", i+1) - copyColumns[i] = col // Unquoted for COPY + copyColumns[i] = col // Unquoted for batch inserts // Map SQL types to PostgreSQL types typ := mapColumnType(colTypes[i]) From 77385a78e452cb90dc3db4236c11536e39b21efd Mon Sep 17 00:00:00 2001 From: shridhar Date: Tue, 30 Sep 2025 21:26:45 +0530 Subject: [PATCH 6/8] fix: go idimatic --- cmd/init.go | 1 + internal/resultbackends/pgxdb/pgxdb.go | 29 ++++++++------------------ 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/cmd/init.go b/cmd/init.go index 34f2746..d16efc8 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -271,6 +271,7 @@ func createPgxResultBackend(name string, config dbpool.Config, ko *koanf.Koanf, ResultsTable: ko.MustString(fmt.Sprintf("results.%s.results_table", name)), UnloggedTables: config.Unlogged, BatchInsert: config.BatchInsert, + BatchSize: config.BatchSize, MaxConns: config.MaxActiveConns, MaxConnIdleTime: config.MaxIdleConns, } diff --git a/internal/resultbackends/pgxdb/pgxdb.go b/internal/resultbackends/pgxdb/pgxdb.go index ee24782..71c2bbd 100644 --- a/internal/resultbackends/pgxdb/pgxdb.go +++ b/internal/resultbackends/pgxdb/pgxdb.go @@ -29,11 +29,9 @@ type Opt struct { // PgxDB represents the optimized PostgreSQL backend. type PgxDB struct { - pool *pgxpool.Pool - opt Opt - logger *slog.Logger - - // Cache for result schemas + pool *pgxpool.Pool + opt Opt + logger *slog.Logger resTableSchemas map[string]insertSchema schemaMutex sync.RWMutex } @@ -76,7 +74,6 @@ func NewPgxBackend(connString string, opt Opt, lo *slog.Logger) (*PgxDB, error) if opt.MaxConns == 0 { opt.MaxConns = 5 } - // Optimize connection pool settings config.MaxConns = int32(opt.MaxConns) config.MinConns = 5 config.MaxConnLifetime = time.Hour @@ -100,7 +97,7 @@ func NewPgxBackend(connString string, opt Opt, lo *slog.Logger) (*PgxDB, error) logger: lo, } - if opt.BatchSize == 0 { + if s.opt.BatchSize == 0 { s.opt.BatchSize = 5000 } @@ -148,7 +145,6 @@ func (w *PgxResultSet) RegisterColTypes(cols []string, colTypes []*sql.ColumnTyp copy(w.cols, cols) w.colTypes = colTypes - // Create schema and cache it w.backend.schemaMutex.Lock() w.backend.resTableSchemas[w.taskName] = w.backend.createTableSchema(cols, colTypes) w.backend.schemaMutex.Unlock() @@ -173,7 +169,6 @@ func (w *PgxResultSet) WriteCols(cols []string) error { w.backend.schemaMutex.RLock() rSchema, ok := w.backend.resTableSchemas[w.taskName] w.backend.schemaMutex.RUnlock() - if !ok { return fmt.Errorf("column types for '%s' have not been registered", w.taskName) } @@ -227,7 +222,6 @@ func (w *PgxResultSet) WriteRow(row []interface{}) error { w.backend.schemaMutex.RLock() rSchema, ok := w.backend.resTableSchemas[w.taskName] w.backend.schemaMutex.RUnlock() - if !ok { return fmt.Errorf("schema not found for task '%s'", w.taskName) } @@ -245,7 +239,6 @@ func (w *PgxResultSet) flushBatch() error { w.backend.schemaMutex.RLock() rSchema, ok := w.backend.resTableSchemas[w.taskName] w.backend.schemaMutex.RUnlock() - if !ok { return fmt.Errorf("schema not found for task '%s'", w.taskName) } @@ -263,7 +256,6 @@ func (w *PgxResultSet) flushBatch() error { rSchema.copyColumns, pgx.CopyFromRows(w.rows), ) - if err != nil { return fmt.Errorf("COPY failed: %w", err) } @@ -308,12 +300,10 @@ func (w *PgxResultSet) Close() error { // createTableSchema generates SQL schemas for table operations. func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) insertSchema { - var ( - colNameHolder = make([]string, len(cols)) - colValHolder = make([]string, len(cols)) - copyColumns = make([]string, len(cols)) - fields = make([]string, len(cols)) - ) + colNameHolder := make([]string, len(cols)) + colValHolder := make([]string, len(cols)) + copyColumns := make([]string, len(cols)) + fields := make([]string, len(cols)) for i, col := range cols { quotedCol := fmt.Sprintf(`"%s"`, col) @@ -328,7 +318,6 @@ func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins if nullable, ok := colTypes[i].Nullable(); ok && !nullable { typ += " NOT NULL" } - fields[i] = fmt.Sprintf("%s %s", quotedCol, typ) } @@ -339,7 +328,7 @@ func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins } return insertSchema{ - dropTable: `DROP TABLE IF EXISTS "%s" CASCADE`, + dropTable: fmt.Sprintf(`DROP TABLE IF EXISTS "%%s" CASCADE`), createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s)`, unlogged, strings.Join(fields, ", ")), insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, From 8279075261d9e9643ef61a591d186da3371baf95 Mon Sep 17 00:00:00 2001 From: shridhar Date: Wed, 1 Oct 2025 15:27:01 +0530 Subject: [PATCH 7/8] fix: code cleanup --- cmd/init.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/cmd/init.go b/cmd/init.go index d16efc8..c2059e1 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -237,20 +237,9 @@ func initResultBackends(resDBs map[string]dbpool.Config, ko *koanf.Koanf, logger var backend models.ResultBackend var err error - // Check if we should use optimized pgx backend for PostgreSQL + // Use pgx backend for PostgreSQL. if config.Type == "postgres" { backend, err = createPgxResultBackend(name, config, ko, logger) - if err != nil { - // Fall back to standard SQL backend on error - logger.Warn("failed to create pgx backend, falling back to standard SQL", - "name", name, "error", err) - - return nil, fmt.Errorf("error initializing result backend '%s': %w", name, err) - } else { - logger.Info("using optimized pgx backend", - "name", name, - ) - } } else { // Use standard SQL backend for other databases backend, err = createSQLResultBackend(name, config, ko, logger) From 2aa5f2f47522e9d5ae7dd27967fdfbd3dbe25bf9 Mon Sep 17 00:00:00 2001 From: shridhar Date: Wed, 1 Oct 2025 22:57:46 +0530 Subject: [PATCH 8/8] fix: map column types for clcikhouse specific db --- internal/resultbackends/pgxdb/pgxdb.go | 36 +++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/internal/resultbackends/pgxdb/pgxdb.go b/internal/resultbackends/pgxdb/pgxdb.go index 71c2bbd..2dfe893 100644 --- a/internal/resultbackends/pgxdb/pgxdb.go +++ b/internal/resultbackends/pgxdb/pgxdb.go @@ -342,21 +342,49 @@ func (p *PgxDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins func mapColumnType(colType *sql.ColumnType) string { typeName := colType.DatabaseTypeName() + // Remove Nullable wrapper if present (ClickHouse specific) + if strings.HasPrefix(typeName, "Nullable(") && strings.HasSuffix(typeName, ")") { + typeName = strings.TrimPrefix(typeName, "Nullable(") + typeName = strings.TrimSuffix(typeName, ")") + } + switch strings.ToUpper(typeName) { + // PostgreSQL native types case "INT2", "INT4", "INT8", "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": return "BIGINT" - case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC", "FLOAT32", "FLOAT64": + + // ClickHouse integer types + case "UINT8", "UINT16", "UINT32", "UINT64", "INT16", "INT32", "INT64": + return "BIGINT" + + // Float types + case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC": + return "DECIMAL" + + // ClickHouse float types + case "FLOAT32", "FLOAT64": return "DECIMAL" - case "TIMESTAMP", "DATETIME": + + // String types + case "STRING", "FIXEDSTRING": + return "TEXT" + + // Date/Time types + case "TIMESTAMP", "DATETIME", "DATETIME64": return "TIMESTAMP" - case "DATE": + case "DATE", "DATE32": return "DATE" - case "BOOLEAN": + + // Other types + case "BOOLEAN", "BOOL": return "BOOLEAN" case "JSON", "JSONB": return "JSONB" case "VARCHAR": return "VARCHAR(255)" + case "TEXT": + return "TEXT" + default: return "TEXT" }