7070}
7171
7272func ensureTables (ctx context.Context , db pginterfaces.PGXPoolConn , opts postgresql.MigrateOptions ) error {
73+ // Create state table if missing, with row_id ready for pagination
7374 exists , err := tableExists (ctx , db , opts .StateTableName )
7475 if err != nil {
7576 return err
@@ -78,42 +79,88 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre
7879 if ! exists {
7980 opts .Logger .Info ("Creating CockroachDB state table" )
8081 _ , err = db .Exec (ctx , fmt .Sprintf (`CREATE TABLE %s (
81- key text NOT NULL PRIMARY KEY,
82- value jsonb NOT NULL,
83- isbinary boolean NOT NULL,
84- etag INT,
85- insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
86- updatedate TIMESTAMP WITH TIME ZONE NULL,
87- expiredate TIMESTAMP WITH TIME ZONE NULL,
88- INDEX expiredate_idx (expiredate)
82+ key text NOT NULL PRIMARY KEY,
83+ value jsonb NOT NULL,
84+ isbinary boolean NOT NULL,
85+ etag INT,
86+ insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
87+ updatedate TIMESTAMP WITH TIME ZONE NULL,
88+ expiredate TIMESTAMP WITH TIME ZONE NULL,
89+ row_id INT8 NOT NULL DEFAULT unique_rowid(),
90+ UNIQUE (row_id)
8991);` , opts .StateTableName ))
9092 if err != nil {
9193 return err
9294 }
93- }
95+ // Indexes created after table create for idempotency
96+ if _ , err = db .Exec (ctx , fmt .Sprintf (
97+ `CREATE INDEX IF NOT EXISTS %s_expiredate_idx ON %s (expiredate);` ,
98+ opts .StateTableName , opts .StateTableName )); err != nil {
99+ return err
100+ }
101+ } else {
102+ // Existing table: make sure columns + indexes exist
103+ // 1) expiredate (idempotent)
104+ if _ , err = db .Exec (ctx , fmt .Sprintf (
105+ `ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMPTZ NULL;` ,
106+ opts .StateTableName )); err != nil {
107+ return err
108+ }
109+ if _ , err = db .Exec (ctx , fmt .Sprintf (
110+ `CREATE INDEX IF NOT EXISTS %s_expiredate_idx ON %s (expiredate);` ,
111+ opts .StateTableName , opts .StateTableName )); err != nil {
112+ return err
113+ }
94114
95- // If table was created before v1.11.
96- _ , err = db .Exec (ctx , fmt .Sprintf (
97- `ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;` , opts .StateTableName ))
98- if err != nil {
99- return err
100- }
101- _ , err = db .Exec (ctx , fmt .Sprintf (
102- `CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);` , opts .StateTableName ))
103- if err != nil {
104- return err
115+ // 2) row_id for keyset pagination
116+ opts .Logger .Infof ("Ensuring row_id exists on '%s'" , opts .StateTableName )
117+
118+ // Add column if missing (nullable initially)
119+ if _ , err = db .Exec (ctx , fmt .Sprintf (
120+ `ALTER TABLE %s ADD COLUMN IF NOT EXISTS row_id INT8;` ,
121+ opts .StateTableName )); err != nil {
122+ return err
123+ }
124+
125+ // Ensure it has a default generator
126+ if _ , err = db .Exec (ctx , fmt .Sprintf (
127+ `ALTER TABLE %s ALTER COLUMN row_id SET DEFAULT unique_rowid();` ,
128+ opts .StateTableName )); err != nil {
129+ return err
130+ }
131+
132+ // Backfill NULLs (older rows) with generated values
133+ if _ , err = db .Exec (ctx , fmt .Sprintf (
134+ `UPDATE %s SET row_id = unique_rowid() WHERE row_id IS NULL;` ,
135+ opts .StateTableName )); err != nil {
136+ return err
137+ }
138+
139+ // Enforce NOT NULL
140+ if _ , err = db .Exec (ctx , fmt .Sprintf (
141+ `ALTER TABLE %s ALTER COLUMN row_id SET NOT NULL;` ,
142+ opts .StateTableName )); err != nil {
143+ return err
144+ }
145+
146+ // Unique index to guarantee ordering without changing PK
147+ if _ , err = db .Exec (ctx , fmt .Sprintf (
148+ `CREATE UNIQUE INDEX IF NOT EXISTS %s_row_id_uidx ON %s (row_id);` ,
149+ opts .StateTableName , opts .StateTableName )); err != nil {
150+ return err
151+ }
105152 }
106153
154+ // Metadata table
107155 exists , err = tableExists (ctx , db , opts .MetadataTableName )
108156 if err != nil {
109157 return err
110158 }
111-
112159 if ! exists {
113160 opts .Logger .Info ("Creating CockroachDB metadata table" )
114161 _ , err = db .Exec (ctx , fmt .Sprintf (`CREATE TABLE %s (
115- key text NOT NULL PRIMARY KEY,
116- value text NOT NULL
162+ key text NOT NULL PRIMARY KEY,
163+ value text NOT NULL
117164);` , opts .MetadataTableName ))
118165 if err != nil {
119166 return err
@@ -124,7 +171,7 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre
124171}
125172
126173func tableExists (ctx context.Context , db pginterfaces.PGXPoolConn , tableName string ) (bool , error ) {
127- exists := false
128- err := db .QueryRow (ctx , "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)" , tableName ).Scan (& exists )
174+ var exists bool
175+ err := db .QueryRow (ctx , "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)" , tableName ).Scan (& exists )
129176 return exists , err
130177}
0 commit comments