Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] fix instrumentation and workflow tracebacks #17749

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions llama-index-core/llama_index/core/instrumentation/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,48 @@ def handle_future_result(
instance: Any,
context: Context,
) -> None:
from llama_index.core.workflow.errors import WorkflowCancelledByUser
from llama_index.core.workflow.errors import (
WorkflowCancelledByUser,
WorkflowTimeoutError,
WorkflowRuntimeError,
)

try:
# Get exception first to avoid calling result() on failed future
exception = future.exception()
if exception is not None:
raise exception
# Handle expected workflow exceptions
if isinstance(
exception,
(
WorkflowCancelledByUser,
WorkflowTimeoutError,
WorkflowRuntimeError,
),
):
self.span_exit(
id_=span_id,
bound_args=bound_args,
instance=instance,
result=None,
)
# Don't re-raise, just return None to avoid callback exception
return None

# Handle unexpected exceptions
self.event(
SpanDropEvent(span_id=span_id, err_str=str(exception))
)
self.span_drop(
id_=span_id,
bound_args=bound_args,
instance=instance,
err=exception,
)
# Don't re-raise, just return None to avoid callback exception
return None

# Handle successful result
result = future.result()
self.span_exit(
id_=span_id,
Expand All @@ -292,29 +327,21 @@ def handle_future_result(
result=result,
)
return result
except WorkflowCancelledByUser:

except asyncio.CancelledError:
# Handle cancellation cleanly
self.span_exit(
id_=span_id,
bound_args=bound_args,
instance=instance,
result=None,
)
# Don't re-raise, just return None to avoid callback exception
return None
except BaseException as e:
self.event(SpanDropEvent(span_id=span_id, err_str=str(e)))
self.span_drop(
id_=span_id, bound_args=bound_args, instance=instance, err=e
)
raise
finally:
try:
context.run(active_span_id.reset, token)
except ValueError as e:
# TODO: Since the context is created in a sync context no in async task,
# detaching the token raises an ValueError saying "token was created
# in a different Context. We should figure out how to handle active spans
# correctly, but for now just suppressing the error so it won't be
# surfaced to the user.
_logger.debug(f"Failed to reset active_span_id: {e}")

try:
Expand Down
61 changes: 38 additions & 23 deletions llama-index-core/llama_index/core/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,42 +418,57 @@ async def _run_workflow() -> None:
return_when=asyncio.FIRST_EXCEPTION,
)

we_done = False
# Handle task completion
exception_raised = None
for task in done:
e = task.exception()
if type(e) == WorkflowDone:
we_done = True
elif e is not None:
exception_raised = e
break

# Cancel any pending tasks
try:
e = task.exception()
if isinstance(e, WorkflowDone):
result.set_result(ctx._retval)
return
elif e is not None:
exception_raised = e
break
except asyncio.CancelledError:
pass

# Clean up unfinished tasks
for t in unfinished:
t.cancel()

# wait for cancelled tasks to cleanup
await asyncio.gather(*unfinished, return_exceptions=True)

# the context is no longer running
# Handle errors
ctx.is_running = False
ctx.write_event_to_stream(StopEvent())

if exception_raised:
# cancel the stream
ctx.write_event_to_stream(StopEvent())

raise exception_raised

if not we_done:
# cancel the stream
ctx.write_event_to_stream(StopEvent())

msg = f"Operation timed out after {self._timeout} seconds"
raise WorkflowTimeoutError(msg)
if isinstance(
exception_raised,
(
WorkflowTimeoutError,
WorkflowRuntimeError,
WorkflowCancelledByUser,
),
):
result.set_exception(exception_raised)
else:
result.set_exception(
WorkflowRuntimeError(
f"Unexpected error: {exception_raised!s}"
)
)
else:
result.set_exception(
WorkflowTimeoutError(
f"Operation timed out after {self._timeout} seconds"
)
)

result.set_result(ctx._retval)
except Exception as e:
result.set_exception(e)
if not result.cancelled():
result.set_exception(WorkflowRuntimeError(f"Workflow error: {e!s}"))
finally:
if self._sem:
self._sem.release()
Expand Down
Loading