Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: UN-1983 BE Enhancements for file centric logging #1157

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions backend/utils/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ def is_json(string: str) -> bool:
return False
return True

# TODO: Use from SDK
@staticmethod
def pretty_file_size(num: float, suffix: str = "B") -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the possible value of suffix other than B ?

"""Gets the human readable size for a file,

Args:
num (int): Size in bytes to parse
suffix (str, optional): _description_. Defaults to "B".

Returns:
str: Human readable size
"""
for unit in ("", "K", "M", "G", "T"):
if abs(num) < 1024.0:
# return f"{num:3.1f} {unit}{suffix}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented line

return f"{num:.2f} {unit}{suffix}"
num /= 1024.0
return f"{num:.2f} {suffix}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should need return f"{num:.2f} T{suffix}" , right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should increase the units to Y . But our files will not be larger as much.



class ModelEnum(Enum):
@classmethod
Expand Down
2 changes: 0 additions & 2 deletions backend/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,5 @@ class ExecutionLogConstants:
LOGS_BATCH_LIMIT: int = settings.LOGS_BATCH_LIMIT
LOG_QUEUE_NAME: str = "log_history_queue"
CELERY_QUEUE_NAME = "celery_periodic_logs"
PERIODIC_TASK_NAME = "workflow_log_history"
PERIODIC_TASK_NAME_V2 = "workflow_log_history_v2"
TASK = "workflow_manager.workflow.execution_log_utils.consume_log_history"
TASK_V2 = "consume_log_history"
7 changes: 5 additions & 2 deletions backend/utils/dto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import logging
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Optional

from django.utils import timezone as dj_timezone
from unstract.workflow_execution.enums import LogType

from unstract.core.constants import LogFieldName
Expand Down Expand Up @@ -35,7 +36,9 @@ def __init__(
self.file_execution_id: Optional[str] = file_execution_id
self.organization_id: str = organization_id
self.timestamp: int = timestamp
self.event_time: datetime = datetime.fromtimestamp(timestamp)
self.event_time: datetime = dj_timezone.make_aware(
datetime.fromtimestamp(timestamp), timezone.utc
)
self.log_type: LogType = log_type
self.data: dict[str, Any] = data

Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ def get_successful_files(self, obj: WorkflowExecution) -> int:
def get_failed_files(self, obj: WorkflowExecution) -> int:
"""Return the count of failed executed files"""
return obj.file_executions.filter(status=ExecutionStatus.ERROR).count()

def to_representation(self, obj: WorkflowExecution):
data = super().to_representation(obj)
data["execution_time"] = obj.pretty_execution_time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this please try

adding execution_time = serializers.ReadOnlyField(source='pretty_execution_time') above class meta below failed_files = serializers.SerializerMethodField()

return data
24 changes: 21 additions & 3 deletions backend/workflow_manager/execution/serializer/file_centric.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,39 @@
from workflow_manager.file_execution.models import (
WorkflowFileExecution as FileExecution,
)
from workflow_manager.workflow_v2.enums import ExecutionStatus

INIT_STATUS_MSG = "Waiting for a worker to pick up file's execution..."

DEFAULT_STATUS_MSG = (
"No status message available, please check again after a few minutes."
)


class FileCentricExecutionSerializer(serializers.ModelSerializer):
latest_log = serializers.SerializerMethodField()
status_msg = serializers.SerializerMethodField()

class Meta:
model = FileExecution
exclude = ["file_hash"]

def get_latest_log(self, obj: FileExecution) -> Optional[dict[str, any]]:
def get_status_msg(self, obj: FileExecution) -> Optional[dict[str, any]]:
if obj.status in [ExecutionStatus.PENDING, ExecutionStatus.QUEUED]:
return INIT_STATUS_MSG

latest_log = (
obj.execution_logs.exclude(data__level__in=["DEBUG", "WARN"])
.order_by("-event_time")
.first()
)
return (
latest_log.data["log"] if latest_log and "log" in latest_log.data else None
latest_log.data["log"]
if latest_log and "log" in latest_log.data
else DEFAULT_STATUS_MSG
Comment on lines +23 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query/method seems to be resource-intensive. Have we analyzed its performance ? (we can use EXPLAIN or some kind of query analyzer)

What is the exact requirement for this query, is there any other aproach to get same requirements?"

)

def to_representation(self, obj: FileExecution):
data = super().to_representation(obj)
data["file_size"] = obj.pretty_file_size
data["execution_time"] = obj.pretty_execution_time
return data
8 changes: 4 additions & 4 deletions backend/workflow_manager/execution/views/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ def get_queryset(self) -> Optional[QuerySet]:
queryset = WorkflowExecution.objects.all()

# Filter based on execution entity
if execution_entity == ExecutionEntity.API:
if execution_entity == ExecutionEntity.API.value:
queryset = queryset.filter(
pipeline_id__in=APIDeployment.objects.values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.ETL:
elif execution_entity == ExecutionEntity.ETL.value:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.ETL
).values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.TASK:
elif execution_entity == ExecutionEntity.TASK.value:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.TASK
).values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.WORKFLOW:
elif execution_entity == ExecutionEntity.WORKFLOW.value:
queryset = queryset.filter(
pipeline_id=None,
workflow_id__in=Workflow.objects.values_list("id", flat=True),
Expand Down
20 changes: 20 additions & 0 deletions backend/workflow_manager/file_execution/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import uuid
from datetime import timedelta
from typing import Optional

from django.db import models
from django.utils import timezone
from utils.common_utils import CommonUtils
from utils.models.base_model import BaseModel
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
Expand Down Expand Up @@ -135,6 +137,24 @@ def update_status(
self.execution_error = execution_error
self.save()

@property
def pretty_file_size(self) -> str:
"""Convert file_size from bytes to human-readable format

Returns:
str: File size with a precision of 2 decimals
"""
return CommonUtils.pretty_file_size(self.file_size)

@property
def pretty_execution_time(self) -> str:
"""Convert execution_time from seconds to HH:MM:SS format

Returns:
str: Time in HH:MM:SS format
"""
return str(timedelta(seconds=self.execution_time)).split(".")[0]

class Meta:
verbose_name = "Workflow File Execution"
verbose_name_plural = "Workflow File Executions"
Expand Down
7 changes: 5 additions & 2 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ def publish_final_workflow_logs(
Returns:
None
"""
# To not associate final logs with a file execution
self.file_execution_id = None
self.publish_update_log(LogState.END_WORKFLOW, "1", LogComponent.STATUS_BAR)
self.publish_update_log(
LogState.SUCCESS, "Executed successfully", LogComponent.WORKFLOW
Expand All @@ -321,12 +323,13 @@ def publish_initial_tool_execution_logs(
Returns:
None
"""
msg = f"Processing file '{file_name}' ({current_file_idx}/{total_files})"
self.publish_update_log(
component=LogComponent.STATUS_BAR,
state=LogState.MESSAGE,
message=f"Processing file {file_name} {current_file_idx}/{total_files}",
message=msg,
)
self.publish_log(f"Processing file {file_name}")
self.publish_log(msg)

def execute_input_file(
self,
Expand Down
10 changes: 10 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import uuid
from datetime import timedelta
from typing import Optional

from api_v2.models import APIDeployment
Expand Down Expand Up @@ -129,6 +130,15 @@ def pipeline_name(self) -> Optional[str]:

return None

@property
def pretty_execution_time(self) -> str:
"""Convert execution_time from seconds to HH:MM:SS format

Returns:
str: Time in HH:MM:SS format
"""
return str(timedelta(seconds=self.execution_time)).split(".")[0]

def __str__(self) -> str:
return (
f"Workflow execution: {self.id} ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ def _execute_step(
Args:
step (int): workflow step
sandbox (ToolSandbox): instance of tool sandbox
execution_type (ExecutionType): step or complete
last_step_output (list[Any]): output of previous step

Raises:
error: _description_
Expand Down