Skip to content

Commit

Permalink
output/clickhouse: create clickhouse context to make sure last batch …
Browse files Browse the repository at this point in the history
…will send
  • Loading branch information
parsa97 committed Jul 9, 2024
1 parent d50c4b6 commit 62244c8
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions internal/output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,15 @@ needs to make sure that there is proper Database connection and table are presen
clickhouse folder for the file tables.sql
*/
func (chConfig clickhouseConfig) Output(ctx context.Context) {
g, gCtx := errgroup.WithContext(ctx)
clickhouseContext := context.Background()
g, gCtx := errgroup.WithContext(clickhouseContext)
for i := 0; i < int(chConfig.ClickhouseWorkers); i++ {
g.Go(func() error { return chConfig.clickhouseOutputWorker(gCtx) })
g.Go(func() error { return chConfig.clickhouseOutputWorker(gCtx, ctx) })
}
}

func (chConfig clickhouseConfig) clickhouseOutputWorker(ctx context.Context) error {
conn, batch := chConfig.connectClickhouseRetry(ctx)
func (chConfig clickhouseConfig) clickhouseOutputWorker(clickhouseContext context.Context, ctx context.Context) error {
conn, batch := chConfig.connectClickhouseRetry(clickhouseContext)
clickhouseSentToOutput := metrics.GetOrRegisterCounter("clickhouseSentToOutput", metrics.DefaultRegistry)
clickhouseSkipped := metrics.GetOrRegisterCounter("clickhouseSkipped", metrics.DefaultRegistry)
clickhouseFailed := metrics.GetOrRegisterCounter("clickhouseFailed", metrics.DefaultRegistry)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (chConfig clickhouseConfig) clickhouseOutputWorker(ctx context.Context) err
clickhouseFailed.Inc(int64(c))
}
c = 0
batch, _ = conn.PrepareBatch(ctx, "INSERT INTO DNS_LOG")
batch, _ = conn.PrepareBatch(clickhouseContext, "INSERT INTO DNS_LOG")
}
}
case <-ticker.C:
Expand All @@ -250,13 +251,14 @@ func (chConfig clickhouseConfig) clickhouseOutputWorker(ctx context.Context) err
clickhouseFailed.Inc(int64(c))
}
c = 0
batch, _ = conn.PrepareBatch(ctx, "INSERT INTO DNS_LOG")
batch, _ = conn.PrepareBatch(clickhouseContext, "INSERT INTO DNS_LOG")
case <-ctx.Done():
err := batch.Flush()
err := batch.Send()
if err != nil {
log.Warnf("Errro while executing batch: %v", err)
clickhouseFailed.Inc(int64(c))
}
clickhouseContext.Done()
conn.Close()
log.Debug("exiting out of clickhouse output") //todo:remove
return nil
Expand Down

0 comments on commit 62244c8

Please sign in to comment.