Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] fix instrumentation and workflow tracebacks #17749

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

logan-markewich
Copy link
Collaborator

Attempting to fix how tracebacks are handled in instrumentation and in workflows.

This isn't quite correct though, it breaks unit tests. I think I'm hiding some exceptions, causing some cleanup/handling not to run in the workflow?

Attempts to fix #17728
Attempts to fix #17730

Here's the extra test code I was using (which does actually run cleanly)

import asyncio
import logging

from llama_index.core.workflow import (
    Context,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from llama_index.core.workflow.errors import WorkflowCancelledByUser, WorkflowTimeoutError, WorkflowRuntimeError

# Disable asyncio debug logging which shows the callback exceptions
logging.getLogger('asyncio').setLevel(logging.WARNING)

class ExampleWorkflow(Workflow):
    @step
    async def first_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        await asyncio.sleep(3600)
        return StopEvent()

class ErrorWorkflow(Workflow):
    @step
    async def first_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        raise ValueError("This is an error")

async def test_workflow(workflow: Workflow, expected_error: type[Exception]) -> None:
    handler = workflow.run(input="input1")
    try:
        await handler
    except expected_error as e:
        print(f"Expected error occurred: {type(e).__name__}: {str(e)}")
    except Exception as e:
        print(f"Unexpected error occurred: {type(e).__name__}: {str(e)}")

async def test_cancellation(workflow: Workflow) -> None:
    handler = workflow.run(input="input1")
    await handler.cancel_run()
    try:
        await handler
    except WorkflowCancelledByUser:
        print("Workflow was cancelled successfully")
    except Exception as e:
        print(f"Unexpected error during cancellation: {type(e).__name__}: {str(e)}")

async def run_workflow():
    print("\nTesting timeout...")
    await test_workflow(ExampleWorkflow(timeout=5), WorkflowTimeoutError)
    
    print("\nTesting runtime error...")
    await test_workflow(ErrorWorkflow(timeout=5), WorkflowRuntimeError)
    
    print("\nTesting cancellation...")
    await test_cancellation(ErrorWorkflow(timeout=5))

if __name__ == "__main__":
    asyncio.run(run_workflow())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant