Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ WEAVIATE_HTTP_PORT="443" # or 8080 for localhost
WEAVIATE_GRPC_PORT="443" # or 50051 for localhost
WEAVIATE_HTTP_SECURE="true" # set to false for localhost
WEAVIATE_GRPC_SECURE="true" # set to false for localhost

# Optionally, specify E2B.dev API key for Python Code Interpreter
E2B_API_KEY="e2b_..."
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ uv run --env-file .env \
-m src.2_frameworks.2_multi_agent.planner_worker_gradio
```

Python Code Interpreter demo- using the OpenAI Agent SDK, E2B for secure code sandbox, and LangFuse for observability.

```bash
uv run --env-file .env -m src.2_frameworks.code_interpreter_gradio
```

### 3. Evals

Synthetic data.
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"aiohttp>=3.12.14",
"beautifulsoup4>=4.13.4",
"datasets>=3.6.0",
"e2b-code-interpreter>=1.5.2",
"gradio>=5.35.0",
"langfuse>=3.1.3",
"lxml>=6.0.0",
Expand All @@ -23,6 +24,7 @@ dependencies = [
"pydantic-ai-slim[logfire]>=0.3.7",
"pytest-asyncio>=0.25.2",
"scikit-learn>=1.7.0",
"starlette>=0.47.2",
"weaviate-client>=4.15.4",
]

Expand Down
98 changes: 98 additions & 0 deletions src/2_frameworks/code_interpreter_gradio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Code Interpreter example.

Logs traces to LangFuse for observability and evaluation.

You will need your E2B API Key.
"""

import logging
from pathlib import Path

import agents
import gradio as gr
from dotenv import load_dotenv
from gradio.components.chatbot import ChatMessage
from openai import AsyncOpenAI

from src.utils import (
CodeInterpreter,
oai_agent_stream_to_gradio_messages,
pretty_print,
setup_langfuse_tracer,
)
from src.utils.langfuse.shared_client import langfuse_client


load_dotenv(verbose=True)


logging.basicConfig(level=logging.INFO)

CODE_INTERPRETER_INSTRUCTIONS = """\
The `code_interpreter` tool executes Python commands. \
Please note that data is not persisted. Each time you invoke this tool, \
you will need to run import and define all variables from scratch.

You can access the local filesystem using this tool. \
Instead of asking the user for file inputs, you should try to find the file \
using this tool.

Recommended packages: Pandas, Numpy, SymPy, Scikit-learn.

You can also run Jupyter-style shell commands (e.g., `!pip freeze`)
but you won't be able to install packages.
"""

AGENT_LLM_NAME = "gemini-2.5-flash"
async_openai_client = AsyncOpenAI()
code_interpreter = CodeInterpreter(
local_files=[Path("tests/tool_tests/example_files/example_a.csv")]
)


async def _main(question: str, gr_messages: list[ChatMessage]):
setup_langfuse_tracer()

main_agent = agents.Agent(
name="Data Analysis Agent",
instructions=CODE_INTERPRETER_INSTRUCTIONS,
tools=[
agents.function_tool(
code_interpreter.run_code,
name_override="code_interpreter",
)
],
model=agents.OpenAIChatCompletionsModel(
model=AGENT_LLM_NAME, openai_client=async_openai_client
),
)

with langfuse_client.start_as_current_span(name="Agents-SDK-Trace") as span:
span.update(input=question)

result_stream = agents.Runner.run_streamed(main_agent, input=question)
async for _item in result_stream.stream_events():
gr_messages += oai_agent_stream_to_gradio_messages(_item)
if len(gr_messages) > 0:
yield gr_messages

span.update(output=result_stream.final_output)

pretty_print(gr_messages)
yield gr_messages


demo = gr.ChatInterface(
_main,
title="2.1 OAI Agent SDK ReAct + LangFuse Code Interpreter",
type="messages",
examples=[
"What is the sum of the column `x` in this example_a.csv?",
"What is the sum of the column `y` in this example_a.csv?",
"Create a linear best-fit line for the data in example_a.csv.",
],
)


if __name__ == "__main__":
demo.launch(server_name="0.0.0.0")
1 change: 1 addition & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
from .langfuse.oai_sdk_setup import setup_langfuse_tracer
from .logging import set_up_logging
from .pretty_printing import pretty_print
from .tools.code_interpreter import CodeInterpreter
from .tools.kb_weaviate import AsyncWeaviateKnowledgeBase, get_weaviate_async_client
from .trees import tree_filter
13 changes: 13 additions & 0 deletions src/utils/gradio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,18 @@ def oai_agent_stream_to_gradio_messages(
},
)
)
elif isinstance(stream_event, stream_events.RunItemStreamEvent):
name = stream_event.name
item = stream_event.item
if name == "tool_output" and isinstance(item, ToolCallOutputItem):
output.append(
ChatMessage(
role="assistant",
content=f"```\n{item.output}\n```",
metadata={
"title": "*Tool call output*",
},
)
)

return output
129 changes: 129 additions & 0 deletions src/utils/tools/code_interpreter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Code interpreter tool."""

from pathlib import Path
from typing import Sequence

from e2b_code_interpreter import AsyncSandbox
from pydantic import BaseModel

from ..async_utils import gather_with_progress


class _CodeInterpreterOutputError(BaseModel):
"""Error from code interpreter."""

name: str
value: str
traceback: str


class CodeInterpreterOutput(BaseModel):
"""Output from code interpreter."""

stdout: list[str]
stderr: list[str]
error: _CodeInterpreterOutputError | None = None

def __init__(self, stdout: list[str], stderr: list[str], **kwargs):
"""Split lines in stdout and stderr."""
stdout_processed = []
for _line in stdout:
stdout_processed.extend(_line.splitlines())

stderr_processed = []
for _line in stderr:
stderr_processed.extend(_line.splitlines())

super().__init__(stdout=stdout_processed, stderr=stderr_processed, **kwargs)


async def _upload_file(sandbox: "AsyncSandbox", local_path: "str | Path") -> str:
"""Upload file to sandbox.

Returns
-------
str, denoting the remote path.
"""
path = Path(local_path)
remote_path = f"{path.name}"
with open(local_path, "rb") as file:
await sandbox.files.write(remote_path, file)

return remote_path


async def _upload_files(
sandbox: "AsyncSandbox", paths: Sequence[Path | str]
) -> list[str]:
"""Upload files to the sandbox.

Parameters
----------
paths: Sequence[pathlib.Path | str]
Files to upload to the sandbox.

Returns
-------
list[str]
List of remote paths, one per file.
"""
if not paths:
return []

file_upload_coros = [_upload_file(sandbox, _path) for _path in paths]
remote_paths = await gather_with_progress(
file_upload_coros, description=f"Uploading {len(paths)} to sandbox"
)
return list(remote_paths)


class CodeInterpreter:
"""Code Interpreter tool for the agent."""

def __init__(
self,
local_files: "Sequence[Path | str]| None" = None,
timeout_seconds: int = 30,
):
"""Configure your Code Interpreter session.

Note that the sandbox is not persistent, and each run_code will
execute in a fresh sandbox! (e.g., variables need to be re-declared each time.)

Parameters
----------
local_files : list[pathlib.Path | str] | None
Optionally, specify a list of local files (as paths)
to upload to sandbox working directory.
timeout_seconds : int
Limit executions to this duration.
"""
self.timeout_seconds = timeout_seconds
self.local_files = local_files if local_files else []

async def run_code(self, code: str) -> str:
"""Run the given Python code in a sandbox environment.

Parameters
----------
code : str
Python logic to execute.
"""
sbx = await AsyncSandbox.create(timeout=self.timeout_seconds)
await _upload_files(sbx, self.local_files)

try:
result = await sbx.run_code(
code, on_error=lambda error: print(error.traceback)
)
response = CodeInterpreterOutput.model_validate_json(result.logs.to_json())

error = result.error
if error is not None:
response.error = _CodeInterpreterOutputError.model_validate_json(
error.to_json()
)

return response.model_dump_json()
finally:
await sbx.kill()
2 changes: 2 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

```bash
uv run pytest -sv tests/tool_tests/test_weaviate.py
uv run pytest -sv tests/tool_tests/test_code_interpreter.py
PYTHONPATH="." uv run pytest -sv tests/tool_tests/test_integration.py

```
3 changes: 3 additions & 0 deletions tests/tool_tests/example_files/example_a.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
x,y
1,18
6,108
67 changes: 67 additions & 0 deletions tests/tool_tests/test_code_interpreter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Test code interpreter tool."""

from pathlib import Path

import pytest

from src.utils import pretty_print
from src.utils.tools.code_interpreter import (
CodeInterpreter,
CodeInterpreterOutput,
)


PANDAS_VERSION_SCRIPT = """\
import os
import pandas as pd
print(pd.__version__)
"""

PANDAS_READ_FILE_SCRIPT = """\
import pandas as pd
from pathlib import Path

assert Path("example_a.csv").exists()
df = pd.read_csv("example_a.csv")
print(df.sum()["y"])
"""


@pytest.mark.asyncio
async def test_code_interpreter():
"""Test running a Python command in the interpreter."""
session = CodeInterpreter(timeout_seconds=15)

response = await session.run_code(PANDAS_VERSION_SCRIPT)
response_typed = CodeInterpreterOutput.model_validate_json(response)
assert response_typed.error is None

pretty_print(response_typed)
pd_version_major, *_ = response_typed.stdout[0].strip().split(".")
assert int(pd_version_major) >= 2


@pytest.mark.asyncio
async def test_jupyter_command():
"""Test running a Python command in the interpreter."""
session = CodeInterpreter(timeout_seconds=15)

response = await session.run_code("!pip freeze")
response_typed = CodeInterpreterOutput.model_validate_json(response)

pretty_print(response_typed)


@pytest.mark.asyncio
async def test_code_interpreter_upload_file():
"""Test running a Python command in the interpreter."""
example_paths = [Path("tests/tool_tests/example_files/example_a.csv")]
for _path in example_paths:
assert _path.exists()

session = CodeInterpreter(timeout_seconds=15, local_files=example_paths)
response = await session.run_code(PANDAS_READ_FILE_SCRIPT)
response_typed = CodeInterpreterOutput.model_validate_json(response)

pretty_print(response_typed)
assert int(response_typed.stdout[0]) == 126
Loading