5
5
"encoding/json"
6
6
"fmt"
7
7
"log/slog"
8
+ "strings"
8
9
"sync"
9
10
"time"
10
11
@@ -243,7 +244,7 @@ func (h *Hook[T]) processPayload(ctx context.Context, rawPayload json.RawMessage
243
244
244
245
func (h * Hook [T ]) createNotificationFunction (ctx context.Context ) error {
245
246
query := fmt .Sprintf (`
246
- CREATE OR REPLACE FUNCTION pghook.notify ()
247
+ CREATE OR REPLACE FUNCTION pghook.%[1]s_notify ()
247
248
RETURNS trigger AS $$
248
249
DECLARE
249
250
payload jsonb;
@@ -269,7 +270,7 @@ func (h *Hook[T]) createNotificationFunction(ctx context.Context) error {
269
270
);
270
271
END IF;
271
272
272
- PERFORM pgmq.send('%s', payload);
273
+ PERFORM pgmq.send('%[1] s', payload);
273
274
274
275
RETURN NULL;
275
276
END;
@@ -283,13 +284,16 @@ func (h *Hook[T]) createNotificationFunction(ctx context.Context) error {
283
284
}
284
285
285
286
func (h * Hook [T ]) createTableTrigger (ctx context.Context , table string ) error {
287
+ name := fmt .Sprintf ("pghook_%s" , strings .ReplaceAll (table , "." , "_" ))
288
+
286
289
query := fmt .Sprintf (`
287
- CREATE OR REPLACE TRIGGER pghook_%s AFTER INSERT OR UPDATE OR DELETE ON %s
288
- FOR EACH ROW EXECUTE PROCEDURE pghook.notify();
289
- ` , table , table )
290
+ CREATE OR REPLACE TRIGGER %s AFTER INSERT OR UPDATE OR DELETE ON %s
291
+ FOR EACH ROW EXECUTE PROCEDURE pghook.%s_notify();
292
+ ` , name , table , h .config .eventQueueName )
293
+
290
294
if _ , err := h .querier .Exec (ctx , query ); err != nil {
291
295
return fmt .Errorf ("create trigger failed: %w" , err )
292
296
}
293
- h .config .logger .Debug ("created trigger" , "name" , fmt . Sprintf ( "pghook_%s" , table ) , "table" , table )
297
+ h .config .logger .Debug ("created trigger" , "name" , name , "table" , table )
294
298
return nil
295
299
}
0 commit comments