From 72f10a46b56e48ad6b6fe8daf5b3dbac2f7af948 Mon Sep 17 00:00:00 2001 From: Logan Markewich Date: Fri, 7 Feb 2025 15:25:52 -0600 Subject: [PATCH] fix instrumentation and workflow tracebacks --- .../core/instrumentation/dispatcher.py | 55 ++++++++++++----- .../llama_index/core/workflow/workflow.py | 61 ++++++++++++------- 2 files changed, 79 insertions(+), 37 deletions(-) diff --git a/llama-index-core/llama_index/core/instrumentation/dispatcher.py b/llama-index-core/llama_index/core/instrumentation/dispatcher.py index 6ad38eafb1bb5..29754671fdb70 100644 --- a/llama-index-core/llama_index/core/instrumentation/dispatcher.py +++ b/llama-index-core/llama_index/core/instrumentation/dispatcher.py @@ -277,13 +277,48 @@ def handle_future_result( instance: Any, context: Context, ) -> None: - from llama_index.core.workflow.errors import WorkflowCancelledByUser + from llama_index.core.workflow.errors import ( + WorkflowCancelledByUser, + WorkflowTimeoutError, + WorkflowRuntimeError, + ) try: + # Get exception first to avoid calling result() on failed future exception = future.exception() if exception is not None: - raise exception + # Handle expected workflow exceptions + if isinstance( + exception, + ( + WorkflowCancelledByUser, + WorkflowTimeoutError, + WorkflowRuntimeError, + ), + ): + self.span_exit( + id_=span_id, + bound_args=bound_args, + instance=instance, + result=None, + ) + # Don't re-raise, just return None to avoid callback exception + return None + + # Handle unexpected exceptions + self.event( + SpanDropEvent(span_id=span_id, err_str=str(exception)) + ) + self.span_drop( + id_=span_id, + bound_args=bound_args, + instance=instance, + err=exception, + ) + # Don't re-raise, just return None to avoid callback exception + return None + # Handle successful result result = future.result() self.span_exit( id_=span_id, @@ -292,29 +327,21 @@ def handle_future_result( result=result, ) return result - except WorkflowCancelledByUser: + + except asyncio.CancelledError: + # Handle cancellation cleanly self.span_exit( id_=span_id, bound_args=bound_args, instance=instance, result=None, ) + # Don't re-raise, just return None to avoid callback exception return None - except BaseException as e: - self.event(SpanDropEvent(span_id=span_id, err_str=str(e))) - self.span_drop( - id_=span_id, bound_args=bound_args, instance=instance, err=e - ) - raise finally: try: context.run(active_span_id.reset, token) except ValueError as e: - # TODO: Since the context is created in a sync context no in async task, - # detaching the token raises an ValueError saying "token was created - # in a different Context. We should figure out how to handle active spans - # correctly, but for now just suppressing the error so it won't be - # surfaced to the user. _logger.debug(f"Failed to reset active_span_id: {e}") try: diff --git a/llama-index-core/llama_index/core/workflow/workflow.py b/llama-index-core/llama_index/core/workflow/workflow.py index f1e89872067ef..a360ce60c329b 100644 --- a/llama-index-core/llama_index/core/workflow/workflow.py +++ b/llama-index-core/llama_index/core/workflow/workflow.py @@ -418,42 +418,57 @@ async def _run_workflow() -> None: return_when=asyncio.FIRST_EXCEPTION, ) - we_done = False + # Handle task completion exception_raised = None for task in done: - e = task.exception() - if type(e) == WorkflowDone: - we_done = True - elif e is not None: - exception_raised = e - break - - # Cancel any pending tasks + try: + e = task.exception() + if isinstance(e, WorkflowDone): + result.set_result(ctx._retval) + return + elif e is not None: + exception_raised = e + break + except asyncio.CancelledError: + pass + + # Clean up unfinished tasks for t in unfinished: t.cancel() # wait for cancelled tasks to cleanup await asyncio.gather(*unfinished, return_exceptions=True) - # the context is no longer running + # Handle errors ctx.is_running = False + ctx.write_event_to_stream(StopEvent()) if exception_raised: - # cancel the stream - ctx.write_event_to_stream(StopEvent()) - - raise exception_raised - - if not we_done: - # cancel the stream - ctx.write_event_to_stream(StopEvent()) - - msg = f"Operation timed out after {self._timeout} seconds" - raise WorkflowTimeoutError(msg) + if isinstance( + exception_raised, + ( + WorkflowTimeoutError, + WorkflowRuntimeError, + WorkflowCancelledByUser, + ), + ): + result.set_exception(exception_raised) + else: + result.set_exception( + WorkflowRuntimeError( + f"Unexpected error: {exception_raised!s}" + ) + ) + else: + result.set_exception( + WorkflowTimeoutError( + f"Operation timed out after {self._timeout} seconds" + ) + ) - result.set_result(ctx._retval) except Exception as e: - result.set_exception(e) + if not result.cancelled(): + result.set_exception(WorkflowRuntimeError(f"Workflow error: {e!s}")) finally: if self._sem: self._sem.release()