feat(tool): add DAP tool subcommand for direct @tool_function invocation#1270
Conversation
WHY
---
Until now, external callers had no way to invoke a node's @tool_function
methods directly. The only path was through the Question/Answer pipeline
(e.g. QuestionType.EXECUTE for raw SQL, QuestionType.DIALECT for engine
discovery), which forced tool invocations through the full data-lane
lifecycle -- serializing a Question, routing through writeQuestions,
emitting an Answer, and parsing the result from SSE. This was
architecturally wrong: tool invocations are stateless RPC calls, not
conversational turns.
LLM agents already had direct tool access internally via
IInvokeTool.Invoke dispatched through the control plane, but this
mechanism was not exposed to SDK callers. The admin-ui Web AI page's
upcoming document management features (list/stats on vector stores)
need this capability, as does any client that wants to call execute,
dialect, search, upsert, delete, or any other @tool_function without
the overhead of the chat pipeline.
WHAT CHANGED
------------
1. New tool DAP subcommand (data_conn.py)
Added a tool subcommand to on_rrext_process alongside the existing
open/write/close lifecycle. The handler (_tool()) receives a tool
name, node ID, optional input dict, and optional pipe_id.
The dispatch mechanism walks the filter chain via the next pointer
(exposed by the C++ bindings as IServiceFilterInstance.next) to locate
the target node by its component ID (pipeType.id). Once found, it
accesses the node's Python IInstance via node.pyInstance and calls
invoke(param) directly -- bypassing the C++ cb_control dispatcher
entirely.
This bypass is necessary because cb_control checks the pipe's
controller map for registered control listeners of a given classType.
The pipe root (IServiceFilterPipe from getPipe()) does not have tool
entries in its controller map -- those entries are scoped to the node
that owns the control binding (e.g. an agent node that wires
control: [{classType: tool, from: agent_1}]). Since we are
calling from the data connection layer (outside any node), we cannot
go through the normal control dispatch path. Calling
pyInstance.invoke(param) directly invokes IInstanceBase.invoke(),
which dispatches to _dispatch_tool() -- the same code path that
agents use internally.
When the node does not own the requested tool, _dispatch_tool raises
PreventDefault. The _tool() handler catches this and converts it to
a ValueError so the DAP layer surfaces the error to the client.
When pipe_id is provided, the handler reuses the caller's already-open
pipe (looked up from _pipe_map). When omitted, it borrows a pipe from
the endpoint pool via getPipe()/putPipe().
2. Vector store list/stats tools (store.py)
Added two new @tool_function methods to VectorStoreToolMixin:
- list: calls store.getPaths() with optional parent/offset/limit
filtering. Returns the map of unique parent paths.
- stats: calls store.count_documents() and reads store.modelName
and store.vectorSize. Returns collection statistics.
The existing search, upsert, and delete tools become externally
callable for free -- they were already registered via @tool_function but
previously only reachable from within an agent's tool discovery.
Updated test_vectordb_tool_mixin.py to expect the two new tools in
the namespace assertion sets.
3. Python SDK -- two tool() methods
RocketRideClient.tool() (client.py): Standalone method that sends
the tool DAP subcommand via client.call('rrext_process', ...). The
server borrows a pipe for the call. Returns the tool's output directly.
DataPipe.tool() (data.py): Pipe-bound method on the DataPipe inner
class. Sends the same subcommand but includes pipe_id so the server
reuses the caller's already-open pipe.
4. TypeScript SDK -- two tool() methods
RocketRideClient.tool() (client.ts): Standalone method using
client.call(). Accepts a typed options object with token, tool,
nodeId, input, and timeout. Generic return type <T>.
DataPipe.tool() (client.ts): Pipe-bound method that passes
pipe_id automatically from the open pipe.
5. EXECUTE/DIALECT removal from QuestionType
Removed EXECUTE and DIALECT from the QuestionType enum in both
the Python SDK (question.py) and TypeScript SDK (Question.ts).
These were special-purpose question types that bypassed the LLM to
run raw SQL or discover database dialects -- functionality now properly
served by the execute and dialect @tool_function methods.
6. DB nodes -- execute/dialect as @tool_function (db_instance_base.py)
Removed the EXECUTE and DIALECT handling from writeQuestions() in
DatabaseInstanceBase. Added two @tool_function methods in their place:
- execute: validates allow_execute config gate, calls
_executeRawQuery(), sanitizes rows, returns {rows, affected_rows}.
- dialect: returns {dialect: self._db_dialect()}.
The Neo4j node (IInstance.py) receives the same treatment -- its
execute tool calls IGlobal._run_query_raw() and its dialect tool
returns {dialect: neo4j}. Both the old EXECUTE/DIALECT branches
are removed from writeQuestions().
7. DatabaseApi rewrite (database.py, database.ts)
Both Python and TypeScript DatabaseApi classes are rewritten to call
client.tool() internally instead of constructing Question objects:
- query() calls client.tool(tool='execute', input={sql: sql})
- dialect() calls client.tool(tool='dialect')
The public API signatures are preserved (with the addition of an
optional node_id/nodeId parameter for targeting specific nodes).
8. Fix: subprocess error propagation (task_engine.py, cmd_data.py)
Task._send_data() was returning the subprocess DAP response without
checking its success field. When the subprocess returned
success: False (e.g. for an invalid tool name), the mid-tier proxy in
cmd_data.py silently rewrapped it as success: True and the client
never saw the error.
Fixed by adding a did_fail() check in _send_data() that raises
RuntimeError with the subprocess error message. This propagates
through the existing except Exception handler in cmd_data.py (and
cmd_cprofile.py) which re-raises, and the DAP layer converts it to a
proper success: False response to the client.
This was a pre-existing bug affecting all subprocess data operations,
not just the new tool subcommand.
9. Test infrastructure
conftest.py (client-python): Extracted shared TEST_CONFIG,
ensure_clean_pipeline(), and a client fixture from the main test
file into conftest.py for reuse across test modules.
tool_pipeline.py / tool.pipeline.ts: Pipeline fixture with a
webhook source and a tool_python node connected via
control: [{classType: tool, from: webhook_1}]. The tool_python
node runs Python code in a sandbox with no external dependencies.
test_tool.py / tool.test.ts: Integration tests covering:
- Standalone client.tool() executing Python and validating results
- Stdout capture from sandboxed scripts
- Error handling for invalid tool names (expects RuntimeError/throw)
- Pipe-bound DataPipe.tool() through an open pipe
- Pre-open error (pipe not open)
10. gen-node-tables.mjs -- skip on non-develop branches
The nodes:docs-generate task was regenerating every node's README.md
on every build, even on feature branches. The generated Source URL
contains the branch name (resolved from origin/HEAD), so switching
branches caused 100+ file changes in the diff.
Added an early-exit guard: the generator now checks the current git
branch and skips entirely when not on develop.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds a new ChangesTool invocation plumbing
Doc generation branch guard
Sequence Diagram(s)sequenceDiagram
rect rgba(173, 216, 230, 0.5)
note over CallerSDK,DataConn: Tool invocation (new path)
end
participant CallerSDK as RocketRideClient / DataPipe
participant DataConn
participant Pipeline
participant ToolNode as `@tool_function` Node
CallerSDK->>DataConn: rrext_process(subcommand=tool, token/pipe_id, tool, nodeId, input)
DataConn->>DataConn: validate tool name non-empty
DataConn->>Pipeline: borrow or reuse pipe by pipe_id / endpoint pool
DataConn->>Pipeline: walk pipe.next to match nodeId
DataConn->>ToolNode: invoke(IInvokeTool.Invoke{tool, input})
alt tool not found on node
ToolNode-->>DataConn: APERR(Ec.PreventDefault)
DataConn-->>CallerSDK: ValueError / RuntimeError
else success
ToolNode-->>DataConn: tool result
DataConn->>Pipeline: return borrowed pipe
DataConn-->>CallerSDK: { result: tool_result }
end
sequenceDiagram
rect rgba(255, 200, 150, 0.5)
note over DatabaseApi,DatabaseInstanceBase: Database operations (old vs new)
end
participant DatabaseApi
participant RocketRideClient
participant DataConn
participant DatabaseInstanceBase
note over DatabaseApi,DatabaseInstanceBase: Old path (removed)
DatabaseApi->>RocketRideClient: chat(Question{type: EXECUTE/DIALECT})
RocketRideClient->>DataConn: writeQuestions with QuestionType branch
note over DatabaseApi,DatabaseInstanceBase: New path
DatabaseApi->>RocketRideClient: tool(execute | dialect, nodeId, {sql})
RocketRideClient->>DataConn: rrext_process(subcommand=tool)
DataConn->>DatabaseInstanceBase: invoke execute() / dialect()
DatabaseInstanceBase->>DatabaseInstanceBase: _executeRawQuery(sql) / _db_dialect()
DatabaseInstanceBase-->>DataConn: { rows, affected_rows } / { dialect }
DataConn-->>RocketRideClient: { result }
RocketRideClient-->>DatabaseApi: Dict[str, Any] / DatabaseDialect
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🤖 Internal: Discord sync markerAuto-managed by the Discord notification workflow. Stores the linked Discord message ID. Do not edit or delete. |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/ai/src/ai/common/store.py`:
- Around line 1114-1115: The int() conversions for offset and limit parameters
lack error handling and will raise ValueError for non-integer input values,
crashing the tool invocation. Wrap the int() conversion calls for both the
offset and limit parameters in a try-except block to catch ValueError
exceptions, and when caught, return a structured error response that provides
clear feedback to the caller about the invalid input instead of allowing the
exception to propagate up the call stack.
In `@packages/ai/src/ai/modules/data/data_conn.py`:
- Around line 790-833: The tool_sync function is missing the in_use flag and
activity timestamp guards that protect pipes in the _write method, allowing
_monitor_pipes to reclaim a borrowed pipe mid-invocation. Before executing the
tool invocation through py_instance.invoke(), set conn_pipe.in_use to True and
refresh its activity timestamp (matching the pattern used in _write), then reset
conn_pipe.in_use to False in the finally block to release the lock. This
prevents the monitor process from reclaiming the pipe while the tool is
executing.
- Around line 786-813: The node lookup logic in the tool_sync() function does
not implement the documented broadcast behavior for an empty nodeId. When nodeId
is an empty string, the code should apply a "first handler wins" broadcast
pattern to tool-lane nodes instead of attempting exact ID matching which fails.
Modify the node lookup section where the filter chain is walked: add a
conditional check before the while loop to handle the empty nodeId case
separately, implementing the broadcast behavior for tool-lane nodes, while
keeping the existing exact ID matching logic for non-empty nodeIds. This will
allow optional callers that don't specify a nodeId to properly invoke the first
available handler rather than raising a ValueError.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: d61fc14a-acde-4e61-b211-31fc0e0e8e30
📒 Files selected for processing (20)
nodes/scripts/gen-node-tables.mjsnodes/test/test_vectordb_tool_mixin.pypackages/ai/src/ai/common/database/db_global_base.pypackages/ai/src/ai/common/database/db_instance_base.pypackages/ai/src/ai/common/store.pypackages/ai/src/ai/modules/data/data_conn.pypackages/ai/src/ai/modules/task/commands/cmd_data.pypackages/ai/src/ai/modules/task/task_engine.pypackages/client-python/src/rocketride/client.pypackages/client-python/src/rocketride/database.pypackages/client-python/src/rocketride/mixins/data.pypackages/client-python/src/rocketride/schema/question.pypackages/client-python/tests/conftest.pypackages/client-python/tests/test_tool.pypackages/client-python/tests/tool_pipeline.pypackages/client-typescript/src/client/client.tspackages/client-typescript/src/client/database.tspackages/client-typescript/src/client/schema/Question.tspackages/client-typescript/tests/tool.pipeline.tspackages/client-typescript/tests/tool.test.ts
💤 Files with no reviewable changes (2)
- packages/client-python/src/rocketride/schema/question.py
- packages/client-typescript/src/client/schema/Question.ts
…pipe guards - store.py: wrap offset/limit int() conversions in try/except to return a structured error instead of crashing on invalid input - data_conn.py: implement empty-nodeId broadcast behavior (walk all filter-chain nodes, first handler wins) matching the documented contract - data_conn.py: add in_use flag and activity-timer guards to pipe-bound tool calls so _monitor_pipes cannot reclaim the pipe mid-invocation (mirrors the existing _write pattern) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/ai/src/ai/common/store.py`:
- Around line 1114-1120: The current validation only checks if offset and limit
can be converted to integers, but allows negative values to pass through to
store.getPaths(). Add a validation check after the int() conversions to ensure
both offset and limit are non-negative (>= 0), and return an error response with
an appropriate message if either value is negative. This validation should occur
before the call to store.getPaths() to enforce a consistent contract across all
store implementations.
In `@packages/ai/src/ai/modules/data/data_conn.py`:
- Around line 803-808: The `_tool()` method bypasses the concurrency limit when
`pipe_id` is omitted because it obtains the pipe via `self._target.getPipe()`
without acquiring the `self._pipe_sem` semaphore. When the pipe is borrowed
(when `conn_pipe is None` in the else block and `borrowed` is set to `True`),
the code must still acquire and release `self._pipe_sem` to enforce the
connection's thread count limit. Apply this fix at all locations where pipes are
borrowed directly from the target, ensuring the semaphore is acquired before
using the borrowed pipe and released when done.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b69641e0-754e-4c0f-88a7-5cb40e3a457a
📒 Files selected for processing (2)
packages/ai/src/ai/common/store.pypackages/ai/src/ai/modules/data/data_conn.py
Summary
toolDAP subcommand torrext_processthat lets SDK callers invoke any@tool_functionon any node in a running pipeline directly -- no Question/Answer/SSE overheadQuestionType.EXECUTEandDIALECT, replace with@tool_functionmethods (execute,dialect) on database nodeslistandstatstools to vector store nodesclient.tool()andDataPipe.tool()to both Python and TypeScript SDKsnodes:docs-generateon non-develop branches to prevent 100+ README file changes in diffsUsage example
Architecture
The
toolsubcommand walks the pipe's filter chain viaIServiceFilterInstance.nextto locate the target node by component ID, then callsnode.pyInstance.invoke(param)directly. This bypasses the C++cb_controldispatcher (which requires controller map entries that are scoped to control bindings, not available from the data connection layer) and instead invokesIInstanceBase.invoke()->_dispatch_tool()-- the same code path agents use internally.Two SDK entry points:
client.tool(token, tool, node_id, input)-- server borrows a pipe from the poolpipe.tool(tool, node_id, input)-- reuses the caller's open pipe viapipe_idFiles changed
data_conn.pytoolsubcommand +_tool()handlertask_engine.py_send_data()now raises on subprocesssuccess: Falsecmd_data.py_send_data)cmd_cprofile.pystore.pylist+stats@tool_functionon VectorStoreToolMixindb_instance_base.pyexecute+dialect@tool_function, removed EXECUTE/DIALECT from writeQuestionsdb_global_base.pyclient.pytool()methoddata.pyDataPipe.tool()pipe-bound methoddatabase.pyclient.tool()internallyquestion.pyclient.tstool()+DataPipe.tool()database.tsclient.tool()internallyQuestion.tstest_tool.py,tool.test.tstool_pipeline.py,tool.pipeline.tsconftest.pytest_vectordb_tool_mixin.pygen-node-tables.mjsTest plan
builder client-python:test-- 83 passedbuilder client-typescript:test-- all passedbuilder nodes:test --pytest="-k test_collect_tool_methods"-- 3 passedGenerated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
tool()), including pipeline-bound tool execution.toolsubcommand.list(with pagination) andstats(collection stats).Bug Fixes
Tests
tool()and tool-bound pipe behavior.Breaking Changes
EXECUTEandDIALECTquestion types; use tool-based database APIs instead.