Skip to content

Commit 4e0cc51

Browse files
authored
Merge pull request #174 from xataio/pg-processor-handle-errors-gracefully
[PostgreSQL processor] Handle errors gracefully
2 parents b167141 + b13e81d commit 4e0cc51

File tree

5 files changed

+174
-8
lines changed

5 files changed

+174
-8
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ test:
1010

1111
.PHONY: integration-test
1212
integration-test:
13-
@PGSTREAM_INTEGRATION_TESTS=true go test -timeout 90s github.com/xataio/pgstream/pkg/stream/integration
13+
@PGSTREAM_INTEGRATION_TESTS=true go test -timeout 180s github.com/xataio/pgstream/pkg/stream/integration
1414

1515
.PHONY: license-check
1616
license-check:

go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHo
102102
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
103103
github.com/elastic/go-elasticsearch/v8 v8.14.0 h1:1ywU8WFReLLcxE1WJqii3hTtbPUE2hc38ZK/j4mMFow=
104104
github.com/elastic/go-elasticsearch/v8 v8.14.0/go.mod h1:WRvnlGkSuZyp83M2U8El/LGXpCjYLrvlkSgkAH4O5I4=
105-
github.com/eminano/greenmask v0.0.0-20250210141616-781d3aca2262 h1:IAhjExmPmCH/UWLv56J/50wrjlwJRTbwI8gBhb5Rl/A=
106-
github.com/eminano/greenmask v0.0.0-20250210141616-781d3aca2262/go.mod h1:gBBDtACvj/I3nzyEvCGqrWd2ox1XVlh5+Xv5JXKcKDI=
107105
github.com/eminano/greenmask v0.0.0-20250307113752-035ee2b102e6 h1:m/MuqhOhjJM2oWIjUMkyvrCuCmb/IMC8Ig3/uSM0M+U=
108106
github.com/eminano/greenmask v0.0.0-20250307113752-035ee2b102e6/go.mod h1:gBBDtACvj/I3nzyEvCGqrWd2ox1XVlh5+Xv5JXKcKDI=
109107
github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=

internal/postgres/errors.go

+15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package postgres
55
import (
66
"errors"
77
"fmt"
8+
"strings"
89

910
"github.com/jackc/pgx/v5"
1011
"github.com/jackc/pgx/v5/pgconn"
@@ -23,6 +24,14 @@ func (e *ErrRelationDoesNotExist) Error() string {
2324
return fmt.Sprintf("relation does not exist: %s", e.Details)
2425
}
2526

27+
type ErrConstraintViolation struct {
28+
Details string
29+
}
30+
31+
func (e *ErrConstraintViolation) Error() string {
32+
return fmt.Sprintf("constraint violation: %s", e.Details)
33+
}
34+
2635
func mapError(err error) error {
2736
if pgconn.Timeout(err) {
2837
return ErrConnTimeout
@@ -39,6 +48,12 @@ func mapError(err error) error {
3948
Details: pgErr.Message,
4049
}
4150
}
51+
// Class 23 — Integrity Constraint Violation
52+
if strings.HasPrefix(pgErr.Code, "23") {
53+
return &ErrConstraintViolation{
54+
Details: pgErr.Message,
55+
}
56+
}
4257
}
4358

4459
return err

pkg/wal/processor/postgres/postgres_batch_writer.go

+51-5
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,64 @@ func (w *BatchWriter) flushQueries(ctx context.Context, queries []*query) error
195195
if len(queries) == 0 {
196196
return nil
197197
}
198+
199+
var err error
200+
for {
201+
queries, err = w.execQueries(ctx, queries)
202+
if err != nil {
203+
return err
204+
}
205+
206+
if len(queries) == 0 {
207+
return nil
208+
}
209+
}
210+
}
211+
212+
func (w *BatchWriter) execQueries(ctx context.Context, queries []*query) ([]*query, error) {
213+
retryQueries := []*query{}
198214
err := w.pgConn.ExecInTx(ctx, func(tx pglib.Tx) error {
199-
for _, q := range queries {
215+
for i, q := range queries {
200216
if _, err := tx.Exec(ctx, q.sql, q.args...); err != nil {
217+
w.logger.Error(err, "executing sql query", loglib.Fields{
218+
"sql": q.sql,
219+
"args": q.args,
220+
"severity": "DATALOSS",
221+
})
222+
// if a query returns an error, it will abort the tx. Log it as
223+
// dataloss and remove it from the list of queries to be
224+
// retried.
225+
retryQueries = removeIndex(queries, i)
201226
return err
202227
}
203228
}
204229
return nil
205230
})
206-
if err != nil {
207-
return err
231+
if err != nil && w.isInternalError(err) {
232+
// if there was an internal error in the tx, there's no point in
233+
// retrying, return error and stop processing.
234+
return nil, err
208235
}
209236

210-
queries = queries[:0]
211-
return nil
237+
// if there were no errors or no internal errors in the tx, return the
238+
// queries to retry (none if the tx was successful)
239+
return retryQueries, nil
240+
}
241+
242+
func (w *BatchWriter) isInternalError(err error) bool {
243+
var errRelationDoesNotExist *pglib.ErrRelationDoesNotExist
244+
var errConstraintViolation *pglib.ErrConstraintViolation
245+
switch {
246+
case errors.As(err, &errRelationDoesNotExist),
247+
errors.As(err, &errConstraintViolation):
248+
return false
249+
default:
250+
return true
251+
}
252+
}
253+
254+
func removeIndex(s []*query, index int) []*query {
255+
ret := make([]*query, 0)
256+
ret = append(ret, s[:index]...)
257+
return append(ret, s[index+1:]...)
212258
}

pkg/wal/processor/postgres/postgres_batch_writer_test.go

+107
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package postgres
55
import (
66
"context"
77
"errors"
8+
"fmt"
89
"testing"
910
"time"
1011

@@ -239,6 +240,112 @@ func TestBatchWriter_sendBatch(t *testing.T) {
239240
}
240241
}
241242

243+
func TestBatchWriter_flushQueries(t *testing.T) {
244+
t.Parallel()
245+
246+
testQuerySQL := "INSERT INTO test(id, name) VALUES($1, $2)"
247+
args1 := []any{1, "alice"}
248+
args2 := []any{1, "bob"}
249+
250+
testQuery := func(args []any) *query {
251+
return &query{
252+
sql: testQuerySQL,
253+
args: args,
254+
}
255+
}
256+
execCalls := uint(0)
257+
258+
tests := []struct {
259+
name string
260+
pgconn *pgmocks.Querier
261+
queries []*query
262+
263+
wantExecCalls uint
264+
wantErr error
265+
}{
266+
{
267+
name: "ok - no queries",
268+
pgconn: &pgmocks.Querier{
269+
ExecInTxFn: func(ctx context.Context, f func(tx pglib.Tx) error) error {
270+
return errors.New("ExecInTxFn: should not be called")
271+
},
272+
},
273+
queries: []*query{},
274+
275+
wantExecCalls: 0,
276+
wantErr: nil,
277+
},
278+
{
279+
name: "ok",
280+
pgconn: &pgmocks.Querier{
281+
ExecInTxFn: func(ctx context.Context, f func(tx pglib.Tx) error) error {
282+
mockTx := pgmocks.Tx{
283+
ExecFn: func(ctx context.Context, query string, args ...any) (pglib.CommandTag, error) {
284+
execCalls++
285+
require.Equal(t, testQuerySQL, query)
286+
require.Len(t, args, 2)
287+
switch args[1] {
288+
case args1[1]:
289+
return pglib.CommandTag{}, nil
290+
case args2[1]:
291+
return pglib.CommandTag{}, &pglib.ErrConstraintViolation{}
292+
default:
293+
return pglib.CommandTag{}, fmt.Errorf("unexpected argument in sql query: %v", args[1])
294+
}
295+
},
296+
}
297+
return f(&mockTx)
298+
},
299+
},
300+
queries: []*query{testQuery(args1), testQuery(args2)},
301+
302+
wantExecCalls: 3,
303+
wantErr: nil,
304+
},
305+
{
306+
name: "error - internal error in tx exec",
307+
pgconn: &pgmocks.Querier{
308+
ExecInTxFn: func(ctx context.Context, f func(tx pglib.Tx) error) error {
309+
mockTx := pgmocks.Tx{
310+
ExecFn: func(ctx context.Context, query string, args ...any) (pglib.CommandTag, error) {
311+
execCalls++
312+
require.Equal(t, testQuerySQL, query)
313+
require.Len(t, args, 2)
314+
switch args[1] {
315+
case args1[1]:
316+
return pglib.CommandTag{}, nil
317+
case args2[1]:
318+
return pglib.CommandTag{}, errTest
319+
default:
320+
return pglib.CommandTag{}, fmt.Errorf("unexpected argument in sql query: %v", args[1])
321+
}
322+
},
323+
}
324+
return f(&mockTx)
325+
},
326+
},
327+
queries: []*query{testQuery(args1), testQuery(args2)},
328+
329+
wantExecCalls: 2,
330+
wantErr: errTest,
331+
},
332+
}
333+
334+
for _, tc := range tests {
335+
t.Run(tc.name, func(t *testing.T) {
336+
bw := &BatchWriter{
337+
logger: loglib.NewNoopLogger(),
338+
pgConn: tc.pgconn,
339+
}
340+
341+
err := bw.flushQueries(context.Background(), tc.queries)
342+
require.ErrorIs(t, err, tc.wantErr)
343+
require.Equal(t, tc.wantExecCalls, execCalls)
344+
execCalls = 0
345+
})
346+
}
347+
}
348+
242349
func TestBatchWriter(t *testing.T) {
243350
t.Parallel()
244351

0 commit comments

Comments
 (0)