diff --git a/backend/kernelCI_app/constants/ingester.py b/backend/kernelCI_app/constants/ingester.py new file mode 100644 index 000000000..bcb8d7c7e --- /dev/null +++ b/backend/kernelCI_app/constants/ingester.py @@ -0,0 +1,50 @@ +"""Constant settings for the ingester functions""" + +import os +import logging +from utils.validation import is_boolean_or_string_true + +logger = logging.getLogger("ingester") + + +VERBOSE: bool = is_boolean_or_string_true(os.environ.get("VERBOSE", False)) +"""Variable used to toggle info logs. Default: False""" + +CONVERT_LOG_EXCERPT = is_boolean_or_string_true( + os.environ.get("CONVERT_LOG_EXCERPT", False) +) +"""Toggle to convert the log_excerpt to output_files url. Default: False""" + +LOGEXCERPT_THRESHOLD = int(os.environ.get("LOGEXCERPT_THRESHOLD", 256)) +"""Bytes threshold for log_excerpt to be converted to url. Default: 256""" + +STORAGE_TOKEN = os.environ.get("STORAGE_TOKEN", None) +STORAGE_BASE_URL = os.environ.get( + "STORAGE_BASE_URL", "https://files-staging.kernelci.org" +) +UPLOAD_URL = f"{STORAGE_BASE_URL}/upload" + +CACHE_LOGS_SIZE_LIMIT = int(os.environ.get("CACHE_LOGS_SIZE_LIMIT", 100000)) +"""Arbitrary limit for cache_logs size, adjust as needed. Default: 100000""" + +TREES_FILE = "/app/trees.yaml" + + +# Batching and backpressure controls +try: + INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "10000")) +except (ValueError, TypeError): + logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000") + INGEST_BATCH_SIZE = 10000 + +try: + INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0")) +except (ValueError, TypeError): + logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0") + INGEST_FLUSH_TIMEOUT_SEC = 2.0 + +try: + INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000")) +except (ValueError, TypeError): + logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000") + INGEST_QUEUE_MAXSIZE = 5000 diff --git a/backend/kernelCI_app/helpers/logger.py b/backend/kernelCI_app/helpers/logger.py index 9b55809e2..2ed8f4c9c 100644 --- a/backend/kernelCI_app/helpers/logger.py +++ b/backend/kernelCI_app/helpers/logger.py @@ -1,3 +1,4 @@ +import time from django.http import HttpRequest from kernelCI_app.constants.general import PRODUCTION_HOST, STAGING_HOST @@ -17,6 +18,17 @@ def log_message(message: str) -> None: print(message) +def out(msg: str) -> None: + """Write debug/perf output to stdout. Logger was unreliable in some environments.""" + try: + print( + f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] {msg}", + flush=True, + ) + except Exception: + pass + + def create_endpoint_notification(*, message: str, request: HttpRequest) -> str: running_instance = get_running_instance() request_path = request.get_full_path() diff --git a/backend/kernelCI_app/management/commands/helpers/file_utils.py b/backend/kernelCI_app/management/commands/helpers/file_utils.py new file mode 100644 index 000000000..5fca4f3df --- /dev/null +++ b/backend/kernelCI_app/management/commands/helpers/file_utils.py @@ -0,0 +1,50 @@ +import os +import yaml +import logging +from kernelCI_app.constants.ingester import TREES_FILE + + +logger = logging.getLogger("ingester") + + +def load_tree_names(trees_file: str = TREES_FILE) -> dict[str, str]: + """Reads data from the trees_file, which correlates git_repository_url to tree_name""" + with open(trees_file, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + tree_names = {v["url"]: tree_name for tree_name, v in data.get("trees", {}).items()} + + return tree_names + + +def move_file_to_failed_dir(filename: str, failed_dir: str) -> None: + try: + os.rename(filename, os.path.join(failed_dir, os.path.basename(filename))) + except Exception as e: + logger.error("Error moving file %s to failed directory: %s", filename, e) + raise e + + +def verify_dir(dir: str) -> None: + if not os.path.exists(dir): + logger.error("Directory %s does not exist", dir) + # try to create it + try: + os.makedirs(dir) + logger.info("Directory %s created", dir) + except Exception as e: + logger.error("Error creating directory %s: %s", dir, e) + raise e + if not os.path.isdir(dir): + raise Exception(f"Directory {dir} is not a directory") + if not os.access(dir, os.W_OK): + raise Exception(f"Directory {dir} is not writable") + logger.info("Directory %s is valid and writable", dir) + + +def verify_spool_dirs(spool_dir: str) -> None: + failed_dir = os.path.join(spool_dir, "failed") + archive_dir = os.path.join(spool_dir, "archive") + verify_dir(spool_dir) + verify_dir(failed_dir) + verify_dir(archive_dir) diff --git a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py index 4526968a9..be86deeb1 100644 --- a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py +++ b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py @@ -1,101 +1,51 @@ from os import DirEntry from concurrent.futures import ThreadPoolExecutor, as_completed -import gzip -import hashlib import json import logging import os from queue import Queue, Empty -import requests -import tempfile +from kernelCI_app.constants.ingester import ( + CONVERT_LOG_EXCERPT, + INGEST_BATCH_SIZE, + INGEST_FLUSH_TIMEOUT_SEC, + INGEST_QUEUE_MAXSIZE, + VERBOSE, +) import threading import time import traceback -from typing import Any, Literal, Optional -import yaml +from typing import Any, Optional, TypedDict +from kernelCI_app.helpers.logger import out +from kernelCI_app.management.commands.helpers.file_utils import move_file_to_failed_dir +from kernelCI_app.management.commands.helpers.log_excerpt_utils import ( + extract_log_excerpt, +) import kcidb_io from django.db import transaction from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents from kernelCI_app.management.commands.helpers.process_submissions import ( + TableNames, build_instances_from_submission, ) +from kernelCI_app.typeModels.modelTypes import MODEL_MAP, TableModels -VERBOSE = 0 -LOGEXCERPT_THRESHOLD = 256 # 256 bytes threshold for logexcerpt -CONVERT_LOG_EXCERPT = False # If True, convert log_excerpt to output_files url -CACHE_LOGS = {} -CACHE_LOGS_SIZE_LIMIT = 100000 # Arbitrary limit for cache_logs size, adjust as needed -cache_logs_lock = threading.Lock() - -STORAGE_TOKEN = os.environ.get("STORAGE_TOKEN", None) -STORAGE_BASE_URL = os.environ.get( - "STORAGE_BASE_URL", "https://files-staging.kernelci.org" -) +class SubmissionMetadata(TypedDict): + filename: str + full_filename: str + fsize: int | None + processing_time: float | None + error: str | None -TREES_FILE = "/app/trees.yaml" logger = logging.getLogger("ingester") -# Batching and backpressure controls -try: - INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "10000")) -except (ValueError, TypeError): - logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000") - INGEST_BATCH_SIZE = 10000 - -try: - INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0")) -except (ValueError, TypeError): - logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0") - INGEST_FLUSH_TIMEOUT_SEC = 2.0 - -try: - INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000")) -except (ValueError, TypeError): - logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000") - INGEST_QUEUE_MAXSIZE = 5000 - # Thread-safe queue for database operations (bounded for backpressure) db_queue = Queue(maxsize=INGEST_QUEUE_MAXSIZE) db_lock = threading.Lock() -def _ts() -> str: - return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - -def _out(msg: str) -> None: - """Write debug/perf output to stdout""" - # logger was unreliable in some environments - try: - print(f"[{_ts()}] {msg}", flush=True) - except Exception: - pass - - -def move_file_to_failed_dir(filename: str, failed_dir: str) -> None: - try: - os.rename(filename, os.path.join(failed_dir, os.path.basename(filename))) - except Exception as e: - logger.error("Error moving file %s to failed directory: %s", filename, e) - raise e - - -def load_tree_names(trees_file_override: Optional[str]) -> dict[str, str]: - global TREES_FILE - if trees_file_override is not None: - TREES_FILE = trees_file_override - - with open(TREES_FILE, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - - tree_names = {v["url"]: tree_name for tree_name, v in data.get("trees", {}).items()} - - return tree_names - - def standardize_tree_names( input_data: dict[str, Any], tree_names: dict[str, str] ) -> None: @@ -113,140 +63,15 @@ def standardize_tree_names( checkout["tree_name"] = correct_tree -def upload_logexcerpt(logexcerpt: str, id: str) -> str: - """ - Upload logexcerpt to storage and return a reference(URL) - """ - upload_url = f"{STORAGE_BASE_URL}/upload" - if VERBOSE: - logger.info("Uploading logexcerpt for %s to %s", id, upload_url) - # make temporary file with logexcerpt data - with tempfile.NamedTemporaryFile(delete=False, suffix=".logexcerpt") as temp_file: - logexcerpt_filename = temp_file.name - logexcerpt_compressed = gzip.compress(logexcerpt.encode("utf-8")) - temp_file.write(logexcerpt_compressed) - temp_file.flush() - with open(logexcerpt_filename, "rb") as f: - hdr = { - "Authorization": f"Bearer {STORAGE_TOKEN}", - } - files = {"file0": ("logexcerpt.txt.gz", f), "path": f"logexcerpt/{id}"} - try: - r = requests.post(upload_url, headers=hdr, files=files) - except Exception as e: - logger.error("Error uploading logexcerpt for %s: %s", id, e) - os.remove(logexcerpt_filename) - return logexcerpt # Return original logexcerpt if upload fails - os.remove(logexcerpt_filename) - if r.status_code != 200: - logger.error( - "Failed to upload logexcerpt for %s: %d : %s", id, r.status_code, r.text - ) - return logexcerpt # Return original logexcerpt if upload fails - - return f"{STORAGE_BASE_URL}/logexcerpt/{id}/logexcerpt.txt.gz" - - -def get_from_cache(log_hash: str) -> Optional[str]: - """ - Check if log_hash is in the cache - """ - with cache_logs_lock: - return CACHE_LOGS.get(log_hash) - - -def set_in_cache(log_hash: str, url: str) -> None: - """ - Set log_hash in the cache with the given URL - """ - with cache_logs_lock: - CACHE_LOGS[log_hash] = url - if VERBOSE: - logger.info("Cached log excerpt with hash %s at %s", log_hash, url) - - -def set_log_excerpt_ofile(item: dict[str, Any], url: str) -> dict[str, Any]: - """ - Clean log_excerpt field - Create name/url dict and append to output_files of job - - item is a build or test - """ - item["log_excerpt"] = None - data = { - "name": "log_excerpt", - "url": url, - } - if "output_files" not in item: - item["output_files"] = [] - - item["output_files"].append(data) - return item - - -def process_log_excerpt_from_item( - item: dict[str, Any], item_type: Literal["build", "test"] -) -> None: - """ - Process log_excerpt from a single build or test (item). - If log_excerpt is large, upload it to storage and replace with a reference. - """ - id = item.get("id", "unknown") - log_excerpt = item["log_excerpt"] - - if isinstance(log_excerpt, str) and len(log_excerpt) > LOGEXCERPT_THRESHOLD: - log_hash = hashlib.sha256(log_excerpt.encode("utf-8")).hexdigest() - if VERBOSE: - logger.info( - "Uploading log_excerpt for %s id %s hash %s with size %d bytes", - item_type, - id, - log_hash, - len(log_excerpt), - ) - # check if log_excerpt already uploaded (by hash as key) - cached_url = get_from_cache(log_hash) - if cached_url: - if VERBOSE: - logger.info( - "Log excerpt for %s %s already uploaded, using cached URL", - item_type, - id, - ) - set_log_excerpt_ofile(item, cached_url) - else: - cached_url = upload_logexcerpt(log_excerpt, log_hash) - set_in_cache(log_hash, cached_url) - set_log_excerpt_ofile(item, cached_url) - - -def extract_log_excerpt(input_data: dict[str, Any]) -> None: - """ - Extract log_excerpt from builds and tests, if it is large, - upload to storage and replace with a reference. - """ - if not STORAGE_TOKEN: - logger.warning("STORAGE_TOKEN is not set, log_excerpts will not be uploaded") - return - - builds: list[dict[str, Any]] = input_data.get("builds", []) - tests: list[dict[str, Any]] = input_data.get("tests", []) - - for build in builds: - if build.get("log_excerpt"): - process_log_excerpt_from_item(item=build, item_type="build") - - for test in tests: - if test.get("log_excerpt"): - process_log_excerpt_from_item(item=test, item_type="test") - - def prepare_file_data( file: DirEntry[str], tree_names: dict[str, str] ) -> tuple[Optional[dict[str, Any]], dict[str, Any]]: """ Prepare file data: read, extract log excerpts, standardize tree names, validate. This function does everything except the actual database load. + + Returns `data, metadata`. + If an error happens, `data` will be None; if file is empty, both are None. """ fsize = file.stat().st_size @@ -286,6 +111,84 @@ def prepare_file_data( } +def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None: + """ + Consume a buffer of items and insert them into the database. + This function is called by the db_worker thread. + """ + if not buffer: + return + + model = MODEL_MAP[item_type] + + t0 = time.time() + model.objects.bulk_create( + buffer, + batch_size=INGEST_BATCH_SIZE, + ignore_conflicts=True, + ) + out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0)) + + +def flush_buffers( + *, + issues_buf: list[Issues], + checkouts_buf: list[Checkouts], + builds_buf: list[Builds], + tests_buf: list[Tests], + incidents_buf: list[Incidents], +) -> None: + """ + Consumes the list of objects and tries to insert them into the database. + """ + total = ( + len(issues_buf) + + len(checkouts_buf) + + len(builds_buf) + + len(tests_buf) + + len(incidents_buf) + ) + + if total == 0: + return + + # Insert in dependency-safe order + flush_start = time.time() + try: + # Single transaction for all tables in the flush + with transaction.atomic(): + consume_buffer(issues_buf, "issues") + consume_buffer(checkouts_buf, "checkouts") + consume_buffer(builds_buf, "builds") + consume_buffer(tests_buf, "tests") + consume_buffer(incidents_buf, "incidents") + except Exception as e: + logger.error("Error during bulk_create flush: %s", e) + finally: + flush_dur = time.time() - flush_start + rate = total / flush_dur if flush_dur > 0 else 0.0 + msg = ( + "Flushed batch in %.3fs (%.1f items/s): " + "issues=%d checkouts=%d builds=%d tests=%d incidents=%d" + % ( + flush_dur, + rate, + len(issues_buf), + len(checkouts_buf), + len(builds_buf), + len(tests_buf), + len(incidents_buf), + ) + ) + out(msg) + issues_buf.clear() + checkouts_buf.clear() + builds_buf.clear() + tests_buf.clear() + incidents_buf.clear() + + +# TODO: lower the complexity of this function def db_worker(stop_event: threading.Event) -> None: # noqa: C901 """ Worker thread that processes the database queue. @@ -296,15 +199,15 @@ def db_worker(stop_event: threading.Event) -> None: # noqa: C901 """ # Local buffers for batching - issues_buf = [] - checkouts_buf = [] - builds_buf = [] - tests_buf = [] - incidents_buf = [] + issues_buf: list[Issues] = [] + checkouts_buf: list[Checkouts] = [] + builds_buf: list[Builds] = [] + tests_buf: list[Tests] = [] + incidents_buf: list[Incidents] = [] last_flush_ts = time.time() - def buffered_total(): + def buffered_total() -> int: return ( len(issues_buf) + len(checkouts_buf) @@ -313,92 +216,6 @@ def buffered_total(): + len(incidents_buf) ) - def flush_buffers(): - nonlocal last_flush_ts - total = buffered_total() - if total == 0: - return - - # Insert in dependency-safe order - flush_start = time.time() - try: - # Single transaction for all tables in the flush - with transaction.atomic(): - if issues_buf: - t0 = time.time() - Issues.objects.bulk_create( - issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True - ) - _out( - "bulk_create issues: n=%d in %.3fs" - % (len(issues_buf), time.time() - t0) - ) - if checkouts_buf: - t0 = time.time() - Checkouts.objects.bulk_create( - checkouts_buf, - batch_size=INGEST_BATCH_SIZE, - ignore_conflicts=True, - ) - _out( - "bulk_create checkouts: n=%d in %.3fs" - % (len(checkouts_buf), time.time() - t0) - ) - if builds_buf: - t0 = time.time() - Builds.objects.bulk_create( - builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True - ) - _out( - "bulk_create builds: n=%d in %.3fs" - % (len(builds_buf), time.time() - t0) - ) - if tests_buf: - t0 = time.time() - Tests.objects.bulk_create( - tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True - ) - _out( - "bulk_create tests: n=%d in %.3fs" - % (len(tests_buf), time.time() - t0) - ) - if incidents_buf: - t0 = time.time() - Incidents.objects.bulk_create( - incidents_buf, - batch_size=INGEST_BATCH_SIZE, - ignore_conflicts=True, - ) - _out( - "bulk_create incidents: n=%d in %.3fs" - % (len(incidents_buf), time.time() - t0) - ) - except Exception as e: - logger.error("Error during bulk_create flush: %s", e) - finally: - flush_dur = time.time() - flush_start - rate = total / flush_dur if flush_dur > 0 else 0.0 - msg = ( - "Flushed batch in %.3fs (%.1f items/s): " - "issues=%d checkouts=%d builds=%d tests=%d incidents=%d" - % ( - flush_dur, - rate, - len(issues_buf), - len(checkouts_buf), - len(builds_buf), - len(tests_buf), - len(incidents_buf), - ) - ) - _out(msg) - issues_buf.clear() - checkouts_buf.clear() - builds_buf.clear() - tests_buf.clear() - incidents_buf.clear() - last_flush_ts = time.time() - while not stop_event.is_set() or not db_queue.empty(): try: item = db_queue.get(timeout=0.1) @@ -415,7 +232,14 @@ def flush_buffers(): incidents_buf.extend(inst["incidents"]) if buffered_total() >= INGEST_BATCH_SIZE: - flush_buffers() + flush_buffers( + issues_buf=issues_buf, + checkouts_buf=checkouts_buf, + builds_buf=builds_buf, + tests_buf=tests_buf, + incidents_buf=incidents_buf, + ) + last_flush_ts = time.time() if VERBOSE: msg = ( @@ -430,7 +254,7 @@ def flush_buffers(): len(inst["incidents"]), ) ) - _out(msg) + out(msg) except Exception as e: logger.error("Error processing item in db_worker: %s", e) finally: @@ -440,20 +264,34 @@ def flush_buffers(): # Time-based flush when idle if (time.time() - last_flush_ts) >= INGEST_FLUSH_TIMEOUT_SEC: if VERBOSE: - _out( + out( "Idle flush after %.1fs without new items (buffered=%d)" % ( INGEST_FLUSH_TIMEOUT_SEC, buffered_total(), ) ) - flush_buffers() + flush_buffers( + issues_buf=issues_buf, + checkouts_buf=checkouts_buf, + builds_buf=builds_buf, + tests_buf=tests_buf, + incidents_buf=incidents_buf, + ) + last_flush_ts = time.time() continue except Exception as e: logger.error("Unexpected error in db_worker: %s", e) # Final flush after loop ends - flush_buffers() + flush_buffers( + issues_buf=issues_buf, + checkouts_buf=checkouts_buf, + builds_buf=builds_buf, + tests_buf=tests_buf, + incidents_buf=incidents_buf, + ) + last_flush_ts = time.time() def process_file( @@ -464,6 +302,9 @@ def process_file( ) -> bool: """ Process a single file in a thread, then queue it for database insertion. + + Returns: + True if file was processed or deleted, False if an error occured """ data, metadata = prepare_file_data(file, tree_names) file = metadata["file"] @@ -509,7 +350,7 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin except Exception: pass - _out( + out( "Spool status: %d .json files queued (%.2f MB)" % ( len(json_files), @@ -586,15 +427,15 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin qsz, ) ) - _out(msg) + out(msg) last_progress = now - _out("Waiting for DB queue to drain... size=%d" % db_queue.qsize()) + out("Waiting for DB queue to drain... size=%d" % db_queue.qsize()) # Wait for all database operations to complete db_queue.join() except KeyboardInterrupt: - _out("KeyboardInterrupt: stopping ingestion and flushing...") + out("KeyboardInterrupt: stopping ingestion and flushing...") try: # Attempt to cancel remaining futures and exit early # Note: this only cancels tasks not yet started @@ -626,46 +467,6 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin mb_per_sec, ) ) - _out(msg) + out(msg) else: - _out("No files processed, nothing to do") - - -def verify_dir(dir: str) -> None: - if not os.path.exists(dir): - logger.error("Directory %s does not exist", dir) - # try to create it - try: - os.makedirs(dir) - logger.info("Directory %s created", dir) - except Exception as e: - logger.error("Error creating directory %s: %s", dir, e) - raise e - if not os.path.isdir(dir): - raise Exception(f"Directory {dir} is not a directory") - if not os.access(dir, os.W_OK): - raise Exception(f"Directory {dir} is not writable") - logger.info("Directory %s is valid and writable", dir) - - -def verify_spool_dirs(spool_dir: str) -> None: - failed_dir = os.path.join(spool_dir, "failed") - archive_dir = os.path.join(spool_dir, "archive") - verify_dir(spool_dir) - verify_dir(failed_dir) - verify_dir(archive_dir) - - -def cache_logs_maintenance() -> None: - """ - Periodically clean up the cache logs to prevent memory leak and slow down. - If CACHE_LOGS grow over 100k entries, clear it. - """ - - # Limit the size of the cache to prevent memory leaks - # (we don't really need lock, as workers idle, but just in case) - with cache_logs_lock: - if len(CACHE_LOGS) > CACHE_LOGS_SIZE_LIMIT: - CACHE_LOGS.clear() - if VERBOSE: - logger.info("Cache logs cleared") + out("No files processed, nothing to do") diff --git a/backend/kernelCI_app/management/commands/helpers/log_excerpt_utils.py b/backend/kernelCI_app/management/commands/helpers/log_excerpt_utils.py new file mode 100644 index 000000000..5b2b23306 --- /dev/null +++ b/backend/kernelCI_app/management/commands/helpers/log_excerpt_utils.py @@ -0,0 +1,177 @@ +import gzip +import hashlib +import logging +import os +import requests +import tempfile +import threading +from typing import Any, Literal, Optional +from kernelCI_app.constants.ingester import ( + CACHE_LOGS_SIZE_LIMIT, + LOGEXCERPT_THRESHOLD, + STORAGE_TOKEN, + STORAGE_BASE_URL, + UPLOAD_URL, + VERBOSE, +) + + +CACHE_LOGS = {} +cache_logs_lock = threading.Lock() +logger = logging.getLogger("ingester") + + +def upload_logexcerpt(logexcerpt: str, id: str) -> str: + """ + Upload logexcerpt to storage and return a reference (URL string) if successful. + If fails, returns the original logexcerpt. + + Args: + logexcerpt: the unchanged logexcerpt + id: the hash of the logexcerpt + + Returns: + str: On success upload: the reference url. On failed upload: the original logexcerpt + """ + if VERBOSE: + logger.info("Uploading logexcerpt for %s to %s", id, UPLOAD_URL) + # make temporary file with logexcerpt data + with tempfile.NamedTemporaryFile(delete=False, suffix=".logexcerpt") as temp_file: + logexcerpt_filename = temp_file.name + logexcerpt_compressed = gzip.compress(logexcerpt.encode("utf-8")) + temp_file.write(logexcerpt_compressed) + temp_file.flush() + with open(logexcerpt_filename, "rb") as f: + hdr = { + "Authorization": f"Bearer {STORAGE_TOKEN}", + } + files = {"file0": ("logexcerpt.txt.gz", f), "path": f"logexcerpt/{id}"} + try: + r = requests.post(UPLOAD_URL, headers=hdr, files=files) + except Exception as e: + logger.error("Error uploading logexcerpt for %s: %s", id, e) + os.remove(logexcerpt_filename) + return logexcerpt # Return original logexcerpt if upload fails + os.remove(logexcerpt_filename) + if r.status_code != 200: + logger.error( + "Failed to upload logexcerpt for %s: %d : %s", id, r.status_code, r.text + ) + return logexcerpt # Return original logexcerpt if upload fails + + return f"{STORAGE_BASE_URL}/logexcerpt/{id}/logexcerpt.txt.gz" + + +def get_from_cache(log_hash: str) -> Optional[str]: + """ + Check if log_hash is in the cache + + Returns: + str|None: The log_excerpt if it exists in cache, None otherwise + """ + with cache_logs_lock: + return CACHE_LOGS.get(log_hash) + + +def set_in_cache(log_hash: str, log_excerpt: str) -> None: + """ + Set log_hash in the cache with the given log_excerpt. + Ideally, the log_excerpt will arrive as a reference URL to where it was stored. + It can arrive as the entire log_excerpt though. + """ + with cache_logs_lock: + CACHE_LOGS[log_hash] = log_excerpt + if VERBOSE: + logger.info("Cached log excerpt with hash %s as %s", log_hash, log_excerpt) + + +def set_log_excerpt_ofile(item: dict[str, Any], url: str) -> dict[str, Any]: + """ + Clean log_excerpt field + Create name/url dict and append to output_files of job + + item is a build or test + """ + item["log_excerpt"] = None + data = { + "name": "log_excerpt", + "url": url, + } + if "output_files" not in item: + item["output_files"] = [] + + item["output_files"].append(data) + return item + + +def process_log_excerpt_from_item( + item: dict[str, Any], item_type: Literal["build", "test"] +) -> None: + """ + Process log_excerpt from a single build or test (item). + If log_excerpt is large, upload it to storage and replace with a reference. + """ + id = item.get("id", "unknown") + log_excerpt = item["log_excerpt"] + + if isinstance(log_excerpt, str) and len(log_excerpt) > LOGEXCERPT_THRESHOLD: + log_hash = hashlib.sha256(log_excerpt.encode("utf-8")).hexdigest() + if VERBOSE: + logger.info( + "Uploading log_excerpt for %s id %s hash %s with size %d bytes", + item_type, + id, + log_hash, + len(log_excerpt), + ) + # check if log_excerpt already uploaded (by hash as key) + cached_url = get_from_cache(log_hash) + if cached_url: + if VERBOSE: + logger.info( + "Log excerpt for %s %s already uploaded, using cached URL", + item_type, + id, + ) + set_log_excerpt_ofile(item, cached_url) + else: + cached_url = upload_logexcerpt(log_excerpt, log_hash) + # TODO: if upload_logexcerpt fails, should we be caching the entire logexcerpt? + set_in_cache(log_hash, cached_url) + set_log_excerpt_ofile(item, cached_url) + + +def extract_log_excerpt(input_data: dict[str, Any]) -> None: + """ + Extract log_excerpt from builds and tests, if it is large, + upload to storage and replace with a reference. + """ + if not STORAGE_TOKEN: + logger.warning("STORAGE_TOKEN is not set, log_excerpts will not be uploaded") + return + + builds: list[dict[str, Any]] = input_data.get("builds", []) + tests: list[dict[str, Any]] = input_data.get("tests", []) + + for build in builds: + if build.get("log_excerpt"): + process_log_excerpt_from_item(item=build, item_type="build") + + for test in tests: + if test.get("log_excerpt"): + process_log_excerpt_from_item(item=test, item_type="test") + + +def cache_logs_maintenance() -> None: + """ + Periodically clean up the cache logs to prevent memory leak and slow down. + If CACHE_LOGS grow over 100k entries, clear it. + """ + + # Limit the size of the cache to prevent memory leaks + # (we don't really need lock, as workers idle, but just in case) + with cache_logs_lock: + if len(CACHE_LOGS) > CACHE_LOGS_SIZE_LIMIT: + CACHE_LOGS.clear() + if VERBOSE: + logger.info("Cache logs cleared") diff --git a/backend/kernelCI_app/management/commands/helpers/process_submissions.py b/backend/kernelCI_app/management/commands/helpers/process_submissions.py index e6dce0dbf..525c0533b 100644 --- a/backend/kernelCI_app/management/commands/helpers/process_submissions.py +++ b/backend/kernelCI_app/management/commands/helpers/process_submissions.py @@ -1,14 +1,23 @@ import logging from django.utils import timezone -from typing import Any, Literal +from typing import Any, TypedDict from django.db import IntegrityError from pydantic import ValidationError from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests +from kernelCI_app.typeModels.modelTypes import TableNames -TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"] +class ProcessedSubmission(TypedDict): + """Stores the list of items in a single submission. + Lists can't be None but can be empty.""" + + issues: list[Issues] + checkouts: list[Checkouts] + builds: list[Builds] + tests: list[Tests] + incidents: list[Incidents] logger = logging.getLogger(__name__) @@ -128,12 +137,12 @@ def make_incident_instance(incident: dict[str, Any]) -> Incidents: return obj -def build_instances_from_submission(data: dict[str, Any]) -> dict[TableNames, list]: +def build_instances_from_submission(data: dict[str, Any]) -> ProcessedSubmission: """ Convert raw submission dicts into unsaved Django model instances, grouped by type. Per-item errors are logged and the item is skipped, matching the previous behavior. """ - out: dict[TableNames, list] = { + out: ProcessedSubmission = { "issues": [], "checkouts": [], "builds": [], diff --git a/backend/kernelCI_app/management/commands/monitor_submissions.py b/backend/kernelCI_app/management/commands/monitor_submissions.py index 31783176f..0078698a1 100644 --- a/backend/kernelCI_app/management/commands/monitor_submissions.py +++ b/backend/kernelCI_app/management/commands/monitor_submissions.py @@ -4,11 +4,15 @@ import time import os from kernelCI_app.management.commands.helpers.kcidbng_ingester import ( - cache_logs_maintenance, ingest_submissions_parallel, +) +from kernelCI_app.management.commands.helpers.file_utils import ( load_tree_names, verify_spool_dirs, ) +from kernelCI_app.management.commands.helpers.log_excerpt_utils import ( + cache_logs_maintenance, +) logger = logging.getLogger(__name__) @@ -69,7 +73,7 @@ def handle( self.stdout.write(f"Using {max_workers} workers") verify_spool_dirs(spool_dir) - tree_names = load_tree_names(trees_file_override=trees_file) + tree_names = load_tree_names(trees_file=trees_file) self.stdout.write("Starting file monitoring... (Press Ctrl+C to stop)") diff --git a/backend/kernelCI_app/typeModels/modelTypes.py b/backend/kernelCI_app/typeModels/modelTypes.py new file mode 100644 index 000000000..0369e2ddf --- /dev/null +++ b/backend/kernelCI_app/typeModels/modelTypes.py @@ -0,0 +1,13 @@ +from typing import Literal +from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests + +type TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"] +type TableModels = Issues | Checkouts | Builds | Tests | Incidents + +MODEL_MAP: dict[TableNames, TableModels] = { + "issues": Issues, + "checkouts": Checkouts, + "builds": Builds, + "tests": Tests, + "incidents": Incidents, +}