Skip to content

Commit e9196c9

Browse files
Refactor: ingester changes (#1574)
* refactor: improve typing and buffer consumption * feat: add ingester constants * refactor: move log_excerpt related functions * refactor: move out to logger * refactor: move ingester file functions * refactor: improve ingester typing * refactor: lower complexity of db_worker Even with this reduction, it's still hard to lower the complexity even further without creating idempotent functions Closes #1571
1 parent 27e5500 commit e9196c9

File tree

8 files changed

+464
-348
lines changed

8 files changed

+464
-348
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Constant settings for the ingester functions"""
2+
3+
import os
4+
import logging
5+
from utils.validation import is_boolean_or_string_true
6+
7+
logger = logging.getLogger("ingester")
8+
9+
10+
VERBOSE: bool = is_boolean_or_string_true(os.environ.get("VERBOSE", False))
11+
"""Variable used to toggle info logs. Default: False"""
12+
13+
CONVERT_LOG_EXCERPT = is_boolean_or_string_true(
14+
os.environ.get("CONVERT_LOG_EXCERPT", False)
15+
)
16+
"""Toggle to convert the log_excerpt to output_files url. Default: False"""
17+
18+
LOGEXCERPT_THRESHOLD = int(os.environ.get("LOGEXCERPT_THRESHOLD", 256))
19+
"""Bytes threshold for log_excerpt to be converted to url. Default: 256"""
20+
21+
STORAGE_TOKEN = os.environ.get("STORAGE_TOKEN", None)
22+
STORAGE_BASE_URL = os.environ.get(
23+
"STORAGE_BASE_URL", "https://files-staging.kernelci.org"
24+
)
25+
UPLOAD_URL = f"{STORAGE_BASE_URL}/upload"
26+
27+
CACHE_LOGS_SIZE_LIMIT = int(os.environ.get("CACHE_LOGS_SIZE_LIMIT", 100000))
28+
"""Arbitrary limit for cache_logs size, adjust as needed. Default: 100000"""
29+
30+
TREES_FILE = "/app/trees.yaml"
31+
32+
33+
# Batching and backpressure controls
34+
try:
35+
INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", "10000"))
36+
except (ValueError, TypeError):
37+
logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000")
38+
INGEST_BATCH_SIZE = 10000
39+
40+
try:
41+
INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", "2.0"))
42+
except (ValueError, TypeError):
43+
logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0")
44+
INGEST_FLUSH_TIMEOUT_SEC = 2.0
45+
46+
try:
47+
INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", "5000"))
48+
except (ValueError, TypeError):
49+
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
50+
INGEST_QUEUE_MAXSIZE = 5000

backend/kernelCI_app/helpers/logger.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from django.http import HttpRequest
23

34
from kernelCI_app.constants.general import PRODUCTION_HOST, STAGING_HOST
@@ -17,6 +18,17 @@ def log_message(message: str) -> None:
1718
print(message)
1819

1920

21+
def out(msg: str) -> None:
22+
"""Write debug/perf output to stdout. Logger was unreliable in some environments."""
23+
try:
24+
print(
25+
f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] {msg}",
26+
flush=True,
27+
)
28+
except Exception:
29+
pass
30+
31+
2032
def create_endpoint_notification(*, message: str, request: HttpRequest) -> str:
2133
running_instance = get_running_instance()
2234
request_path = request.get_full_path()
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import os
2+
import yaml
3+
import logging
4+
from kernelCI_app.constants.ingester import TREES_FILE
5+
6+
7+
logger = logging.getLogger("ingester")
8+
9+
10+
def load_tree_names(trees_file: str = TREES_FILE) -> dict[str, str]:
11+
"""Reads data from the trees_file, which correlates git_repository_url to tree_name"""
12+
with open(trees_file, "r", encoding="utf-8") as f:
13+
data = yaml.safe_load(f)
14+
15+
tree_names = {v["url"]: tree_name for tree_name, v in data.get("trees", {}).items()}
16+
17+
return tree_names
18+
19+
20+
def move_file_to_failed_dir(filename: str, failed_dir: str) -> None:
21+
try:
22+
os.rename(filename, os.path.join(failed_dir, os.path.basename(filename)))
23+
except Exception as e:
24+
logger.error("Error moving file %s to failed directory: %s", filename, e)
25+
raise e
26+
27+
28+
def verify_dir(dir: str) -> None:
29+
if not os.path.exists(dir):
30+
logger.error("Directory %s does not exist", dir)
31+
# try to create it
32+
try:
33+
os.makedirs(dir)
34+
logger.info("Directory %s created", dir)
35+
except Exception as e:
36+
logger.error("Error creating directory %s: %s", dir, e)
37+
raise e
38+
if not os.path.isdir(dir):
39+
raise Exception(f"Directory {dir} is not a directory")
40+
if not os.access(dir, os.W_OK):
41+
raise Exception(f"Directory {dir} is not writable")
42+
logger.info("Directory %s is valid and writable", dir)
43+
44+
45+
def verify_spool_dirs(spool_dir: str) -> None:
46+
failed_dir = os.path.join(spool_dir, "failed")
47+
archive_dir = os.path.join(spool_dir, "archive")
48+
verify_dir(spool_dir)
49+
verify_dir(failed_dir)
50+
verify_dir(archive_dir)

0 commit comments

Comments
 (0)