Skip to content

Commit d3f0977

Browse files
authored
Merge pull request #154 from thirdweb-dev/02-18-fix_duplicate_inserts
fix duplicate inserts
2 parents a2aede6 + 216eecd commit d3f0977

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

internal/orchestrator/committer.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4747

4848
func (c *Committer) Start(ctx context.Context) {
4949
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
50-
ticker := time.NewTicker(interval)
51-
defer ticker.Stop()
5250

5351
log.Debug().Msgf("Committer running")
5452
for {
5553
select {
5654
case <-ctx.Done():
5755
log.Info().Msg("Committer shutting down")
5856
return
59-
case <-ticker.C:
57+
default:
58+
time.Sleep(interval)
6059
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
6160
if err != nil {
6261
log.Error().Err(err).Msg("Error getting block data to commit")

internal/orchestrator/committer_test.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,11 @@ func TestCommit(t *testing.T) {
130130
mockRPC := mocks.NewMockIRPCClient(t)
131131
mockMainStorage := mocks.NewMockIMainStorage(t)
132132
mockStagingStorage := mocks.NewMockIStagingStorage(t)
133+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
133134
mockStorage := storage.IStorage{
134-
MainStorage: mockMainStorage,
135-
StagingStorage: mockStagingStorage,
135+
MainStorage: mockMainStorage,
136+
StagingStorage: mockStagingStorage,
137+
OrchestratorStorage: mockOrchestratorStorage,
136138
}
137139
committer := NewCommitter(mockRPC, mockStorage)
138140

@@ -186,10 +188,12 @@ func TestStartCommitter(t *testing.T) {
186188
mockRPC := mocks.NewMockIRPCClient(t)
187189
mockMainStorage := mocks.NewMockIMainStorage(t)
188190
mockStagingStorage := mocks.NewMockIStagingStorage(t)
191+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
189192

190193
mockStorage := storage.IStorage{
191-
MainStorage: mockMainStorage,
192-
StagingStorage: mockStagingStorage,
194+
MainStorage: mockMainStorage,
195+
StagingStorage: mockStagingStorage,
196+
OrchestratorStorage: mockOrchestratorStorage,
193197
}
194198

195199
committer := NewCommitter(mockRPC, mockStorage)
@@ -218,9 +222,11 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
218222
mockRPC := mocks.NewMockIRPCClient(t)
219223
mockMainStorage := mocks.NewMockIMainStorage(t)
220224
mockStagingStorage := mocks.NewMockIStagingStorage(t)
225+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
221226
mockStorage := storage.IStorage{
222-
MainStorage: mockMainStorage,
223-
StagingStorage: mockStagingStorage,
227+
MainStorage: mockMainStorage,
228+
StagingStorage: mockStagingStorage,
229+
OrchestratorStorage: mockOrchestratorStorage,
224230
}
225231

226232
committer := NewCommitter(mockRPC, mockStorage)

internal/storage/clickhouse.go

+3
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,9 @@ func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big
10841084

10851085
// TODO make this atomic
10861086
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
1087+
if len(*data) == 0 {
1088+
return nil
1089+
}
10871090
blocks := make([]common.Block, 0, len(*data))
10881091
logs := make([]common.Log, 0)
10891092
transactions := make([]common.Transaction, 0)

0 commit comments

Comments
 (0)