Skip to content

Commit

Permalink
Improve logs and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubno committed Sep 4, 2024
1 parent 5d63d50 commit a799cec
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
2 changes: 1 addition & 1 deletion template/server/api/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Result(BaseModel):
"Extra data that can be included. Not part of the standard types."

is_main_result: Optional[bool] = None
"Whether this data is the result of the cell. Data can be produced by display calls of which can be multiple in a cell."
"Whether this data is the result of the execetution. Data can be produced by display calls of which can be multiple in a cell."

def __init__(self, is_main_result: bool, data: [str, str]):
super().__init__()
Expand Down
29 changes: 17 additions & 12 deletions template/server/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, in_background: bool = False):

class JupyterKernelWebSocket:
_ws: Optional[WebSocketClientProtocol] = None
_receive_task: Optional[asyncio.Task] = None

def __init__(
self,
Expand Down Expand Up @@ -110,7 +111,7 @@ async def execute(
revert_env_vars: Dict[StrictStr, str] = None,
):
message_id = str(uuid.uuid4())
logger.debug(f"Sending execution for code ({message_id}): {code}")
logger.debug(f"Sending code for the execution ({message_id}): {code}")

self._executions[message_id] = Execution(in_background=background)
request = self._get_execute_request(message_id, code, background)
Expand All @@ -135,7 +136,6 @@ async def execute(
)
break

logger.debug(f"Got result for code ({message_id}): {output}")
yield output.model_dump(exclude_none=True)

if revert_env_vars:
Expand Down Expand Up @@ -174,7 +174,6 @@ async def _receive_message(self):

try:
async for message in self._ws:
logger.debug(f"WebSocket received message: {message}".strip())
await self._process_message(json.loads(message))
except Exception as e:
logger.error(f"WebSocket received error while receiving messages: {e}")
Expand All @@ -194,15 +193,13 @@ async def _process_message(self, data: dict):
logger.warning("Parent message ID not found. %s", data)
return

logger.debug(f"Received message {data['msg_type']} for {parent_msg_ig}")

execution = self._executions.get(parent_msg_ig)
if not execution:
return

queue = execution.queue
if data["msg_type"] == "error":
logger.debug(f"Cell {parent_msg_ig} finished execution with error")
logger.debug(f"Execution {parent_msg_ig} finished execution with error")
await queue.put(
Error(
name=data["content"]["ename"],
Expand All @@ -213,35 +210,43 @@ async def _process_message(self, data: dict):

elif data["msg_type"] == "stream":
if data["content"]["name"] == "stdout":
logger.debug(f"Execution {parent_msg_ig} received stdout")
await queue.put(
Stdout(
text=data["content"]["text"], timestamp=data["header"]["date"]
)
)

elif data["content"]["name"] == "stderr":
logger.debug(f"Execution {parent_msg_ig} received stderr")
await queue.put(
Stderr(
text=data["content"]["text"], timestamp=data["header"]["date"]
)
)

elif data["msg_type"] in "display_data":
await queue.put(Result(is_main_result=False, data=data["content"]["data"]))
result = Result(is_main_result=False, data=data["content"]["data"])
logger.debug(f"Execution {parent_msg_ig} received display data with following formats: {result.formats()}")
await queue.put(result)

elif data["msg_type"] == "execute_result":
await queue.put(Result(is_main_result=True, data=data["content"]["data"]))
result = Result(is_main_result=True, data=data["content"]["data"])
logger.debug(f"Execution {parent_msg_ig} received execution result with following formats: {result.formats()}")
await queue.put(result)

elif data["msg_type"] == "status":
if data["content"]["execution_state"] == "busy" and execution.in_background:
logger.debug(f"Cell {parent_msg_ig} started execution")
logger.debug(f"Execution {parent_msg_ig} started execution")
execution.input_accepted = True

if data["content"]["execution_state"] == "idle":
if execution.input_accepted:
logger.debug(f"Cell {parent_msg_ig} finished execution")
logger.debug(f"Execution {parent_msg_ig} finished execution")
await queue.put(EndOfExecution())

elif data["content"]["execution_state"] == "error":
logger.debug(f"Cell {parent_msg_ig} finished execution with error")
logger.debug(f"Execution {parent_msg_ig} finished execution with error")
await queue.put(
Error(
name=data["content"]["ename"],
Expand All @@ -253,7 +258,7 @@ async def _process_message(self, data: dict):

elif data["msg_type"] == "execute_reply":
if data["content"]["status"] == "error":
logger.debug(f"Cell {parent_msg_ig} finished execution with error")
logger.debug(f"Execution {parent_msg_ig} finished execution with error")
await queue.put(
Error(
name=data["content"]["ename"],
Expand Down

0 comments on commit a799cec

Please sign in to comment.