Skip to content

Commit 40ba06b

Browse files
committed
refactor: lower complexity of db_worker
Even with this reduction, it's still hard to lower the complexity even further without creating idempotent functions Closes #1571
1 parent df24b3c commit 40ba06b

File tree

1 file changed

+82
-45
lines changed

1 file changed

+82
-45
lines changed

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 82 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,64 @@ def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
130130
out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
131131

132132

133+
def flush_buffers(
134+
*,
135+
issues_buf: list[Issues],
136+
checkouts_buf: list[Checkouts],
137+
builds_buf: list[Builds],
138+
tests_buf: list[Tests],
139+
incidents_buf: list[Incidents],
140+
) -> None:
141+
"""
142+
Consumes the list of objects and tries to insert them into the database.
143+
"""
144+
total = (
145+
len(issues_buf)
146+
+ len(checkouts_buf)
147+
+ len(builds_buf)
148+
+ len(tests_buf)
149+
+ len(incidents_buf)
150+
)
151+
152+
if total == 0:
153+
return
154+
155+
# Insert in dependency-safe order
156+
flush_start = time.time()
157+
try:
158+
# Single transaction for all tables in the flush
159+
with transaction.atomic():
160+
consume_buffer(issues_buf, "issues")
161+
consume_buffer(checkouts_buf, "checkouts")
162+
consume_buffer(builds_buf, "builds")
163+
consume_buffer(tests_buf, "tests")
164+
consume_buffer(incidents_buf, "incidents")
165+
except Exception as e:
166+
logger.error("Error during bulk_create flush: %s", e)
167+
finally:
168+
flush_dur = time.time() - flush_start
169+
rate = total / flush_dur if flush_dur > 0 else 0.0
170+
msg = (
171+
"Flushed batch in %.3fs (%.1f items/s): "
172+
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
173+
% (
174+
flush_dur,
175+
rate,
176+
len(issues_buf),
177+
len(checkouts_buf),
178+
len(builds_buf),
179+
len(tests_buf),
180+
len(incidents_buf),
181+
)
182+
)
183+
out(msg)
184+
issues_buf.clear()
185+
checkouts_buf.clear()
186+
builds_buf.clear()
187+
tests_buf.clear()
188+
incidents_buf.clear()
189+
190+
133191
# TODO: lower the complexity of this function
134192
def db_worker(stop_event: threading.Event) -> None: # noqa: C901
135193
"""
@@ -158,48 +216,6 @@ def buffered_total() -> int:
158216
+ len(incidents_buf)
159217
)
160218

161-
def flush_buffers() -> None:
162-
nonlocal last_flush_ts
163-
total = buffered_total()
164-
if total == 0:
165-
return
166-
167-
# Insert in dependency-safe order
168-
flush_start = time.time()
169-
try:
170-
# Single transaction for all tables in the flush
171-
with transaction.atomic():
172-
consume_buffer(issues_buf, "issues")
173-
consume_buffer(checkouts_buf, "checkouts")
174-
consume_buffer(builds_buf, "builds")
175-
consume_buffer(tests_buf, "tests")
176-
consume_buffer(incidents_buf, "incidents")
177-
except Exception as e:
178-
logger.error("Error during bulk_create flush: %s", e)
179-
finally:
180-
flush_dur = time.time() - flush_start
181-
rate = total / flush_dur if flush_dur > 0 else 0.0
182-
msg = (
183-
"Flushed batch in %.3fs (%.1f items/s): "
184-
"issues=%d checkouts=%d builds=%d tests=%d incidents=%d"
185-
% (
186-
flush_dur,
187-
rate,
188-
len(issues_buf),
189-
len(checkouts_buf),
190-
len(builds_buf),
191-
len(tests_buf),
192-
len(incidents_buf),
193-
)
194-
)
195-
out(msg)
196-
issues_buf.clear()
197-
checkouts_buf.clear()
198-
builds_buf.clear()
199-
tests_buf.clear()
200-
incidents_buf.clear()
201-
last_flush_ts = time.time()
202-
203219
while not stop_event.is_set() or not db_queue.empty():
204220
try:
205221
item = db_queue.get(timeout=0.1)
@@ -216,7 +232,14 @@ def flush_buffers() -> None:
216232
incidents_buf.extend(inst["incidents"])
217233

218234
if buffered_total() >= INGEST_BATCH_SIZE:
219-
flush_buffers()
235+
flush_buffers(
236+
issues_buf=issues_buf,
237+
checkouts_buf=checkouts_buf,
238+
builds_buf=builds_buf,
239+
tests_buf=tests_buf,
240+
incidents_buf=incidents_buf,
241+
)
242+
last_flush_ts = time.time()
220243

221244
if VERBOSE:
222245
msg = (
@@ -248,13 +271,27 @@ def flush_buffers() -> None:
248271
buffered_total(),
249272
)
250273
)
251-
flush_buffers()
274+
flush_buffers(
275+
issues_buf=issues_buf,
276+
checkouts_buf=checkouts_buf,
277+
builds_buf=builds_buf,
278+
tests_buf=tests_buf,
279+
incidents_buf=incidents_buf,
280+
)
281+
last_flush_ts = time.time()
252282
continue
253283
except Exception as e:
254284
logger.error("Unexpected error in db_worker: %s", e)
255285

256286
# Final flush after loop ends
257-
flush_buffers()
287+
flush_buffers(
288+
issues_buf=issues_buf,
289+
checkouts_buf=checkouts_buf,
290+
builds_buf=builds_buf,
291+
tests_buf=tests_buf,
292+
incidents_buf=incidents_buf,
293+
)
294+
last_flush_ts = time.time()
258295

259296

260297
def process_file(

0 commit comments

Comments
 (0)