Skip to content

Commit cbd2b5b

Browse files
authored
Merge pull request #179 from xataio/pg-writer-add-syntax-error
[PostgreSQL processor] Add handling of syntax error
2 parents 35a5e40 + 1237353 commit cbd2b5b

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

internal/postgres/errors.go

+13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ func (e *ErrConstraintViolation) Error() string {
3232
return fmt.Sprintf("constraint violation: %s", e.Details)
3333
}
3434

35+
type ErrSyntaxError struct {
36+
Details string
37+
}
38+
39+
func (e *ErrSyntaxError) Error() string {
40+
return fmt.Sprintf("syntax error: %s", e.Details)
41+
}
42+
3543
func mapError(err error) error {
3644
if pgconn.Timeout(err) {
3745
return ErrConnTimeout
@@ -48,6 +56,11 @@ func mapError(err error) error {
4856
Details: pgErr.Message,
4957
}
5058
}
59+
if pgErr.Code == "42601" {
60+
return &ErrSyntaxError{
61+
Details: pgErr.Message,
62+
}
63+
}
5164
// Class 23 — Integrity Constraint Violation
5265
if strings.HasPrefix(pgErr.Code, "23") {
5366
return &ErrConstraintViolation{

pkg/stream/stream_run.go

+2
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, instrumentat
211211
return err
212212
}
213213
defer pgBatchWriter.Close()
214+
logger.Info("starting postgres batch writer...")
214215

215216
processor = pgBatchWriter
216217

@@ -222,6 +223,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, instrumentat
222223
logger.Info("adding transformation layer to processor...")
223224
transformer, err := transformer.New(config.Processor.Transformer, processor, transformer.WithLogger(logger))
224225
if err != nil {
226+
logger.Error(err, "creating transformer layer")
225227
return err
226228
}
227229
processor = transformer

pkg/wal/processor/postgres/postgres_batch_writer.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,11 @@ func (w *BatchWriter) execQueries(ctx context.Context, queries []*query) ([]*que
242242
func (w *BatchWriter) isInternalError(err error) bool {
243243
var errRelationDoesNotExist *pglib.ErrRelationDoesNotExist
244244
var errConstraintViolation *pglib.ErrConstraintViolation
245+
var errSyntaxError *pglib.ErrSyntaxError
245246
switch {
246247
case errors.As(err, &errRelationDoesNotExist),
247-
errors.As(err, &errConstraintViolation):
248+
errors.As(err, &errConstraintViolation),
249+
errors.As(err, &errSyntaxError):
248250
return false
249251
default:
250252
return true

0 commit comments

Comments
 (0)