Skip to content

Commit f466e86

Browse files
feat: add chat bridge implementation and support for the runtime
1 parent 419260a commit f466e86

File tree

4 files changed

+315
-5
lines changed

4 files changed

+315
-5
lines changed

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath"
3-
version = "2.2.16"
3+
version = "2.2.17"
44
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
@@ -143,4 +143,3 @@ name = "testpypi"
143143
url = "https://test.pypi.org/simple/"
144144
publish-url = "https://test.pypi.org/legacy/"
145145
explicit = true
146-

src/uipath/_cli/_chat/_bridge.py

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
"""Chat bridge implementations for conversational agents."""
2+
3+
import asyncio
4+
import logging
5+
import os
6+
from typing import Any
7+
from urllib.parse import urlparse
8+
9+
import socketio
10+
from socketio import AsyncClient
11+
from uipath.core.chat import (
12+
UiPathConversationEvent,
13+
UiPathConversationExchangeEndEvent,
14+
UiPathConversationExchangeEvent,
15+
)
16+
from uipath.runtime.chat import UiPathChatBridgeProtocol
17+
from uipath.runtime.context import UiPathRuntimeContext
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class WebSocketChatBridge:
23+
"""WebSocket-based chat bridge for streaming conversational events to CAS.
24+
25+
Implements UiPathChatBridgeProtocol using python-socketio library.
26+
"""
27+
28+
def __init__(
29+
self,
30+
websocket_url: str,
31+
conversation_id: str,
32+
exchange_id: str,
33+
headers: dict[str, str],
34+
auth: dict[str, Any] | None = None,
35+
):
36+
"""Initialize the WebSocket chat bridge.
37+
38+
Args:
39+
websocket_url: The WebSocket server URL to connect to
40+
conversation_id: The conversation ID for this session
41+
exchange_id: The exchange ID for this session
42+
headers: HTTP headers to send during connection
43+
auth: Optional authentication data to send during connection
44+
"""
45+
self.websocket_url = websocket_url
46+
self.conversation_id = conversation_id
47+
self.exchange_id = exchange_id
48+
self.auth = auth
49+
self.headers = headers
50+
self._client: AsyncClient | None = None
51+
self._connected_event = asyncio.Event()
52+
53+
async def connect(self, timeout: float = 10.0) -> None:
54+
"""Establish WebSocket connection to the server.
55+
56+
Args:
57+
timeout: Connection timeout in seconds (default: 10.0)
58+
59+
Raises:
60+
RuntimeError: If connection fails or times out
61+
62+
Example:
63+
```python
64+
manager = WebSocketManager("http://localhost:3000")
65+
await manager.connect()
66+
```
67+
"""
68+
if self._client is not None:
69+
logger.warning("WebSocket client already connected")
70+
return
71+
72+
# Create new SocketIO client
73+
self._client = socketio.AsyncClient(
74+
logger=logger,
75+
engineio_logger=logger,
76+
)
77+
78+
# Register connection event handlers
79+
self._client.on("connect", self._handle_connect)
80+
self._client.on("disconnect", self._handle_disconnect)
81+
self._client.on("connect_error", self._handle_connect_error)
82+
83+
# Clear connection event
84+
self._connected_event.clear()
85+
86+
try:
87+
# Attempt to connect with timeout
88+
logger.info(f"Connecting to WebSocket server: {self.websocket_url}")
89+
90+
await asyncio.wait_for(
91+
self._client.connect(
92+
url=self.websocket_url,
93+
headers=self.headers,
94+
auth=self.auth,
95+
transports=["websocket"],
96+
),
97+
timeout=timeout,
98+
)
99+
100+
# Wait for connection confirmation
101+
await asyncio.wait_for(self._connected_event.wait(), timeout=timeout)
102+
103+
logger.info("WebSocket connection established successfully")
104+
105+
except asyncio.TimeoutError as e:
106+
error_message = f"Failed to connect to WebSocket server within {timeout}s timeout"
107+
logger.error(error_message)
108+
await self._cleanup_client()
109+
raise RuntimeError(error_message) from e
110+
111+
except Exception as e:
112+
error_message = f"Failed to connect to WebSocket server: {e}"
113+
logger.error(error_message)
114+
await self._cleanup_client()
115+
raise RuntimeError(error_message) from e
116+
117+
async def disconnect(self) -> None:
118+
"""Close the WebSocket connection gracefully.
119+
120+
Sends an exchange end event before disconnecting to signal that the
121+
exchange is complete. Uses stored conversation/exchange IDs.
122+
"""
123+
if self._client is None:
124+
logger.warning("WebSocket client not connected")
125+
return
126+
127+
# Send exchange end event using stored IDs
128+
if self._client and self._connected_event.is_set():
129+
try:
130+
end_event = UiPathConversationEvent(
131+
conversation_id=self.conversation_id,
132+
exchange=UiPathConversationExchangeEvent(
133+
exchange_id=self.exchange_id,
134+
end=UiPathConversationExchangeEndEvent(),
135+
),
136+
)
137+
event_data = end_event.model_dump(
138+
mode="json", exclude_none=True, by_alias=True
139+
)
140+
await self._client.emit("ConversationEvent", event_data)
141+
logger.info("Exchange end event sent")
142+
except Exception as e:
143+
logger.warning(f"Error sending exchange end event: {e}")
144+
145+
try:
146+
logger.info("Disconnecting from WebSocket server")
147+
await self._client.disconnect()
148+
logger.info("WebSocket disconnected successfully")
149+
except Exception as e:
150+
logger.error(f"Error during WebSocket disconnect: {e}")
151+
finally:
152+
await self._cleanup_client()
153+
154+
async def emit_message_event(self, message_event: Any) -> None:
155+
"""Wrap and send a message event to the WebSocket server.
156+
157+
Args:
158+
message_event: UiPathConversationMessageEvent to wrap and send
159+
160+
Raises:
161+
RuntimeError: If client is not connected
162+
"""
163+
if self._client is None:
164+
raise RuntimeError("WebSocket client not connected. Call connect() first.")
165+
166+
if not self._connected_event.is_set():
167+
raise RuntimeError("WebSocket client not in connected state")
168+
169+
try:
170+
# Wrap message event with conversation/exchange IDs
171+
wrapped_event = UiPathConversationEvent(
172+
conversation_id=self.conversation_id,
173+
exchange=UiPathConversationExchangeEvent(
174+
exchange_id=self.exchange_id,
175+
message=message_event,
176+
),
177+
)
178+
179+
event_data = wrapped_event.model_dump(
180+
mode="json", exclude_none=True, by_alias=True
181+
)
182+
183+
logger.debug("Sending conversation event to WebSocket")
184+
await self._client.emit("ConversationEvent", event_data)
185+
logger.debug("Conversation event sent successfully")
186+
187+
except Exception as e:
188+
logger.error(f"Error sending conversation event to WebSocket: {e}")
189+
raise RuntimeError(f"Failed to send conversation event: {e}") from e
190+
191+
@property
192+
def is_connected(self) -> bool:
193+
"""Check if the WebSocket is currently connected.
194+
195+
Returns:
196+
True if connected, False otherwise
197+
"""
198+
return self._client is not None and self._connected_event.is_set()
199+
200+
async def _handle_connect(self) -> None:
201+
"""Handle successful connection event."""
202+
logger.info("WebSocket connection established")
203+
self._connected_event.set()
204+
205+
async def _handle_disconnect(self) -> None:
206+
"""Handle disconnection event."""
207+
logger.info("WebSocket connection closed")
208+
self._connected_event.clear()
209+
210+
async def _handle_connect_error(self, data: Any) -> None:
211+
"""Handle connection error event."""
212+
logger.error(f"WebSocket connection error: {data}")
213+
214+
async def _cleanup_client(self) -> None:
215+
"""Clean up client resources."""
216+
self._connected_event.clear()
217+
self._client = None
218+
219+
220+
def get_chat_bridge(
221+
context: UiPathRuntimeContext,
222+
conversation_id: str,
223+
exchange_id: str,
224+
) -> UiPathChatBridgeProtocol:
225+
"""Factory to get WebSocket chat bridge for conversational agents.
226+
227+
Args:
228+
context: The runtime context containing environment configuration
229+
conversation_id: The conversation ID for this session
230+
exchange_id: The exchange ID for this session
231+
232+
Returns:
233+
WebSocketChatBridge instance configured for CAS
234+
235+
Raises:
236+
RuntimeError: If UIPATH_URL is not set or invalid
237+
238+
Example:
239+
```python
240+
bridge = get_chat_bridge(context, "conv-123", "exch-456")
241+
await bridge.connect()
242+
await bridge.emit_message_event(message_event)
243+
await bridge.disconnect(conversation_id, exchange_id)
244+
```
245+
"""
246+
# Extract host from UIPATH_URL
247+
base_url = os.environ.get("UIPATH_URL")
248+
if not base_url:
249+
raise RuntimeError("UIPATH_URL environment variable required for conversational mode")
250+
251+
parsed = urlparse(base_url)
252+
if not parsed.netloc:
253+
raise RuntimeError(f"Invalid UIPATH_URL format: {base_url}")
254+
255+
host = parsed.netloc
256+
257+
# Construct WebSocket URL for CAS
258+
websocket_url = f"wss://{host}/autopilotforeveryone_/websocket_/socket.io?conversationId={conversation_id}"
259+
260+
# Build headers from context
261+
headers = {
262+
"Authorization": f"Bearer {os.environ.get('UIPATH_ACCESS_TOKEN', '')}",
263+
"X-UiPath-Internal-TenantId": context.tenant_id or os.environ.get("UIPATH_TENANT_ID", ""),
264+
"X-UiPath-Internal-AccountId": context.org_id or os.environ.get("UIPATH_ORGANIZATION_ID", ""),
265+
}
266+
267+
return WebSocketChatBridge(
268+
websocket_url=websocket_url,
269+
conversation_id=conversation_id,
270+
exchange_id=exchange_id,
271+
headers=headers,
272+
)

src/uipath/_cli/cli_run.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import click
44
from uipath.core.tracing import UiPathTraceManager
55
from uipath.runtime import (
6+
UiPathChatRuntime,
67
UiPathExecuteOptions,
78
UiPathRuntimeFactoryProtocol,
89
UiPathRuntimeFactoryRegistry,
@@ -13,11 +14,13 @@
1314
from uipath.runtime.context import UiPathRuntimeContext
1415
from uipath.runtime.debug import UiPathDebugBridgeProtocol
1516
from uipath.runtime.errors import UiPathRuntimeError
16-
from uipath.runtime.events import UiPathRuntimeStateEvent
17+
from uipath.runtime.events import UiPathRuntimeMessageEvent, UiPathRuntimeStateEvent
1718

19+
from uipath._cli._chat._bridge import get_chat_bridge
1820
from uipath._cli._debug._bridge import ConsoleDebugBridge
1921
from uipath._cli._utils._common import read_resource_overwrites_from_file
2022
from uipath._cli._utils._debug import setup_debugging
23+
from uipath._cli.models.uipath_json_schema import UiPathJsonConfig
2124
from uipath._utils._bindings import ResourceOverwritesContext
2225
from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter
2326

@@ -132,6 +135,22 @@ async def debug_runtime(
132135
await debug_bridge.emit_state_update(event)
133136
return ctx.result
134137

138+
async def conversational_runtime(
139+
ctx: UiPathRuntimeContext,
140+
runtime: UiPathRuntimeProtocol,
141+
conversation_id: str,
142+
exchange_id: str,
143+
) -> UiPathRuntimeResult | None:
144+
"""Execute runtime in chat mode with WebSocket streaming."""
145+
chat_bridge = get_chat_bridge(ctx, conversation_id, exchange_id)
146+
chat_runtime = UiPathChatRuntime(delegate=runtime, chat_bridge=chat_bridge)
147+
148+
options = UiPathExecuteOptions(resume=resume)
149+
ctx.result = await chat_runtime.execute(
150+
input=ctx.get_input(), options=options
151+
)
152+
return ctx.result
153+
135154
async def execute() -> None:
136155
trace_manager = UiPathTraceManager()
137156

@@ -151,6 +170,21 @@ async def execute() -> None:
151170
JsonLinesFileExporter(ctx.trace_file)
152171
)
153172

173+
# Check if this is a conversational agent
174+
config = UiPathJsonConfig.load_from_file("uipath.json")
175+
is_conversational = config.runtime_options.is_conversational
176+
177+
# Extract conversation_id and exchange_id
178+
if is_conversational:
179+
input_data = ctx.get_input()
180+
conversation_id = input_data.get("conversationId")
181+
exchange_id = input_data.get("exchangeId")
182+
183+
if not conversation_id or not exchange_id:
184+
raise RuntimeError(
185+
"Conversational agents require 'conversationId' and 'exchangeId' in input"
186+
)
187+
154188
async with ResourceOverwritesContext(
155189
lambda: read_resource_overwrites_from_file(ctx.runtime_dir)
156190
):
@@ -162,7 +196,12 @@ async def execute() -> None:
162196
runtime = await factory.new_runtime(
163197
entrypoint, ctx.job_id or "default"
164198
)
165-
if ctx.job_id:
199+
200+
if is_conversational:
201+
ctx.result = await conversational_runtime(
202+
ctx, runtime, conversation_id, exchange_id
203+
)
204+
elif ctx.job_id:
166205
trace_manager.add_span_exporter(LlmOpsHttpExporter())
167206
ctx.result = await execute_runtime(ctx, runtime)
168207
else:

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)