-
Couldn't load subscription status.
- Fork 18
Refactor: ingester changes #1574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6c4b9ae
cf2dfb7
64f6305
70bdc74
9c09322
001d80d
e4f2646
c4544b4
1255402
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| """Constant settings for the ingester functions""" | ||
|
|
||
| import os | ||
| import logging | ||
| from utils.validation import is_boolean_or_string_true | ||
|
|
||
| logger = logging.getLogger("ingester") | ||
|
|
||
|
|
||
| VERBOSE: bool = is_boolean_or_string_true(os.environ.get("VERBOSE", False)) | ||
| """Variable used to toggle info logs. Default: False""" | ||
|
|
||
| CONVERT_LOG_EXCERPT = is_boolean_or_string_true( | ||
| os.environ.get("CONVERT_LOG_EXCERPT", False) | ||
| ) | ||
| """Toggle to convert the log_excerpt to output_files url. Default: False""" | ||
|
|
||
| LOGEXCERPT_THRESHOLD = int(os.environ.get("LOGEXCERPT_THRESHOLD", 256)) | ||
| """Bytes threashold for log_excerpt to be converted to url. Default: 256""" | ||
|
|
||
| STORAGE_TOKEN = os.environ.get("STORAGE_TOKEN", None) | ||
| STORAGE_BASE_URL = os.environ.get( | ||
| "STORAGE_BASE_URL", "https://files-staging.kernelci.org" | ||
| ) | ||
| UPLOAD_URL = f"{STORAGE_BASE_URL}/upload" | ||
|
|
||
| CACHE_LOGS_SIZE_LIMIT = int(os.environ.get("CACHE_LOGS_SIZE_LIMIT", 100000)) | ||
| """Arbitrary limit for cache_logs size, adjust as needed. Default: 100000""" | ||
|
|
||
| TREES_FILE = "/app/trees.yaml" | ||
|
|
||
|
|
||
| # Batching and backpressure controls | ||
| try: | ||
| INGEST_BATCH_SIZE = int(os.environ.get("INGEST_BATCH_SIZE", 10000)) | ||
| except (ValueError, TypeError): | ||
| logger.warning("Invalid INGEST_BATCH_SIZE, using default 10000") | ||
| INGEST_BATCH_SIZE = 10000 | ||
|
|
||
| try: | ||
| INGEST_FLUSH_TIMEOUT_SEC = float(os.environ.get("INGEST_FLUSH_TIMEOUT_SEC", 2.0)) | ||
| except (ValueError, TypeError): | ||
| logger.warning("Invalid INGEST_FLUSH_TIMEOUT_SEC, using default 2.0") | ||
| INGEST_FLUSH_TIMEOUT_SEC = 5.0 | ||
|
|
||
| try: | ||
| INGEST_QUEUE_MAXSIZE = int(os.environ.get("INGEST_QUEUE_MAXSIZE", 5000)) | ||
| except (ValueError, TypeError): | ||
| logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000") | ||
| INGEST_QUEUE_MAXSIZE = 5000 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import time | ||
| from django.http import HttpRequest | ||
|
|
||
| from kernelCI_app.constants.general import PRODUCTION_HOST, STAGING_HOST | ||
|
|
@@ -17,6 +18,17 @@ def log_message(message: str) -> None: | |
| print(message) | ||
|
|
||
|
|
||
| def out(msg: str) -> None: | ||
| """Write debug/perf output to stdout. Logger was unreliable in some environments.""" | ||
| try: | ||
| print( | ||
| f"[{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}] {msg}", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nested double quotes on f-string |
||
| flush=True, | ||
| ) | ||
| except Exception: | ||
| pass | ||
|
|
||
|
|
||
| def create_endpoint_notification(*, message: str, request: HttpRequest) -> str: | ||
| running_instance = get_running_instance() | ||
| request_path = request.get_full_path() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| import os | ||
| import yaml | ||
| import logging | ||
| from kernelCI_app.helpers.logger import out | ||
| from kernelCI_app.constants.ingester import TREES_FILE | ||
|
|
||
|
|
||
| logger = logging.getLogger("ingester") | ||
|
|
||
|
|
||
| def get_spool_files(spool_dir: str) -> tuple[list[str], int]: | ||
| json_files = [ | ||
| f | ||
| for f in os.listdir(spool_dir) | ||
| if os.path.isfile(os.path.join(spool_dir, f)) and f.endswith(".json") | ||
| ] | ||
| if not json_files: | ||
| return | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it ok to return like this instead of tuple? I think we should return empty list and 0 files |
||
|
|
||
| total_bytes = 0 | ||
| for f in json_files: | ||
| try: | ||
| total_bytes += os.path.getsize(os.path.join(spool_dir, f)) | ||
| except Exception: | ||
| pass | ||
|
|
||
| out( | ||
| "Spool status: %d .json files queued (%.2f MB)" | ||
| % ( | ||
| len(json_files), | ||
| total_bytes / (1024 * 1024) if total_bytes else 0.0, | ||
| ) | ||
| ) | ||
|
|
||
| return json_files, total_bytes | ||
|
|
||
|
|
||
| def load_trees_name(trees_file: str = TREES_FILE) -> dict[str, str]: | ||
| """Reads data from the trees_file, which correlates git_repository_url to tree_name""" | ||
| with open(trees_file, "r", encoding="utf-8") as f: | ||
| data = yaml.safe_load(f) | ||
|
|
||
| trees_name = {v["url"]: tree_name for tree_name, v in data.get("trees", {}).items()} | ||
|
|
||
| return trees_name | ||
|
|
||
|
|
||
| def move_file_to_failed_dir(filename: str, failed_dir: str) -> None: | ||
| try: | ||
| os.rename(filename, os.path.join(failed_dir, os.path.basename(filename))) | ||
| except Exception as e: | ||
| logger.error("Error moving file %s to failed directory: %s", filename, e) | ||
| raise e | ||
|
|
||
|
|
||
| def verify_dir(dir: str) -> None: | ||
| if not os.path.exists(dir): | ||
| logger.error("Directory %s does not exist", dir) | ||
| # try to create it | ||
| try: | ||
| os.makedirs(dir) | ||
| logger.info("Directory %s created", dir) | ||
| except Exception as e: | ||
| logger.error("Error creating directory %s: %s", dir, e) | ||
| raise e | ||
| if not os.path.isdir(dir): | ||
| raise Exception(f"Directory {dir} is not a directory") | ||
| if not os.access(dir, os.W_OK): | ||
| raise Exception(f"Directory {dir} is not writable") | ||
| logger.info("Directory %s is valid and writable", dir) | ||
|
|
||
|
|
||
| def verify_spool_dirs(spool_dir: str) -> None: | ||
| failed_dir = os.path.join(spool_dir, "failed") | ||
| archive_dir = os.path.join(spool_dir, "archive") | ||
| verify_dir(spool_dir) | ||
| verify_dir(failed_dir) | ||
| verify_dir(archive_dir) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If default is 2.0, we must set 2.0 here too