Skip to content

Commit de536f8

Browse files
committed
refactor: improve ingester typing
This will be handy for checking which objects to update in the denormalized tables
1 parent 41f7df0 commit de536f8

File tree

3 files changed

+62
-58
lines changed

3 files changed

+62
-58
lines changed

backend/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,6 @@ runtime/secrets/postgres_password_secret
180180
# This is where the sqlite cache will be stored by default if not running on Docker.
181181
volume_data/*.sqlite3
182182
volume_data/*.yaml
183+
184+
# Temporary submissions directory (for development/testing purposes)
185+
kernelCI_app/management/commands/tmp_submissions/*

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
1818

1919
from kernelCI_app.management.commands.helpers.process_submissions import (
20+
TableNames,
2021
build_instances_from_submission,
2122
)
2223

@@ -293,6 +294,38 @@ def prepare_file_data(filename, trees_name, spool_dir):
293294
}
294295

295296

297+
# TODO: MOVE THESE TYPES AND CONSTS
298+
type TableModels = Issues | Checkouts | Builds | Tests | Incidents
299+
300+
MODEL_MAP: dict[TableNames, TableModels] = {
301+
"issues": Issues,
302+
"checkouts": Checkouts,
303+
"builds": Builds,
304+
"tests": Tests,
305+
"incidents": Incidents,
306+
}
307+
308+
309+
def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
310+
"""
311+
Consume a buffer of items and insert them into the database.
312+
This function is called by the db_worker thread.
313+
"""
314+
if not buffer:
315+
return
316+
317+
model = MODEL_MAP[item_type]
318+
319+
t0 = time.time()
320+
model.objects.bulk_create(
321+
buffer,
322+
batch_size=INGEST_BATCH_SIZE,
323+
ignore_conflicts=True,
324+
)
325+
_out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
326+
327+
328+
# TODO: lower the complexity of this function
296329
def db_worker(stop_event: threading.Event): # noqa: C901
297330
"""
298331
Worker thread that processes the database queue.
@@ -303,11 +336,11 @@ def db_worker(stop_event: threading.Event): # noqa: C901
303336
"""
304337

305338
# Local buffers for batching
306-
issues_buf = []
307-
checkouts_buf = []
308-
builds_buf = []
309-
tests_buf = []
310-
incidents_buf = []
339+
issues_buf: list[Issues] = []
340+
checkouts_buf: list[Checkouts] = []
341+
builds_buf: list[Builds] = []
342+
tests_buf: list[Tests] = []
343+
incidents_buf: list[Incidents] = []
311344

312345
last_flush_ts = time.time()
313346

@@ -331,55 +364,11 @@ def flush_buffers():
331364
try:
332365
# Single transaction for all tables in the flush
333366
with transaction.atomic():
334-
if issues_buf:
335-
t0 = time.time()
336-
Issues.objects.bulk_create(
337-
issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
338-
)
339-
_out(
340-
"bulk_create issues: n=%d in %.3fs"
341-
% (len(issues_buf), time.time() - t0)
342-
)
343-
if checkouts_buf:
344-
t0 = time.time()
345-
Checkouts.objects.bulk_create(
346-
checkouts_buf,
347-
batch_size=INGEST_BATCH_SIZE,
348-
ignore_conflicts=True,
349-
)
350-
_out(
351-
"bulk_create checkouts: n=%d in %.3fs"
352-
% (len(checkouts_buf), time.time() - t0)
353-
)
354-
if builds_buf:
355-
t0 = time.time()
356-
Builds.objects.bulk_create(
357-
builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
358-
)
359-
_out(
360-
"bulk_create builds: n=%d in %.3fs"
361-
% (len(builds_buf), time.time() - t0)
362-
)
363-
if tests_buf:
364-
t0 = time.time()
365-
Tests.objects.bulk_create(
366-
tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
367-
)
368-
_out(
369-
"bulk_create tests: n=%d in %.3fs"
370-
% (len(tests_buf), time.time() - t0)
371-
)
372-
if incidents_buf:
373-
t0 = time.time()
374-
Incidents.objects.bulk_create(
375-
incidents_buf,
376-
batch_size=INGEST_BATCH_SIZE,
377-
ignore_conflicts=True,
378-
)
379-
_out(
380-
"bulk_create incidents: n=%d in %.3fs"
381-
% (len(incidents_buf), time.time() - t0)
382-
)
367+
consume_buffer(issues_buf, "issues")
368+
consume_buffer(checkouts_buf, "checkouts")
369+
consume_buffer(builds_buf, "builds")
370+
consume_buffer(tests_buf, "tests")
371+
consume_buffer(incidents_buf, "incidents")
383372
except Exception as e:
384373
logger.error("Error during bulk_create flush: %s", e)
385374
finally:

backend/kernelCI_app/management/commands/helpers/process_submissions.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
import logging
22
from django.utils import timezone
3-
from typing import Any, Literal
3+
from typing import Any, Literal, TypedDict
44

55
from django.db import IntegrityError
66
from pydantic import ValidationError
77

88
from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests
99

1010

11-
TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"]
11+
# TODO: move to a types file/folder
12+
type TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"]
13+
14+
15+
class ProcessedSubmission(TypedDict):
16+
"""Stores the list of items in a single submission.
17+
Lists can't be None but can be empty."""
18+
19+
issues: list[Issues]
20+
checkouts: list[Checkouts]
21+
builds: list[Builds]
22+
tests: list[Tests]
23+
incidents: list[Incidents]
1224

1325

1426
logger = logging.getLogger(__name__)
@@ -128,12 +140,12 @@ def make_incident_instance(incident) -> Incidents:
128140
return obj
129141

130142

131-
def build_instances_from_submission(data: dict[str, Any]) -> dict[TableNames, list]:
143+
def build_instances_from_submission(data: dict[str, Any]) -> ProcessedSubmission:
132144
"""
133145
Convert raw submission dicts into unsaved Django model instances, grouped by type.
134146
Per-item errors are logged and the item is skipped, matching the previous behavior.
135147
"""
136-
out: dict[TableNames, list] = {
148+
out: ProcessedSubmission = {
137149
"issues": [],
138150
"checkouts": [],
139151
"builds": [],

0 commit comments

Comments
 (0)