-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Open
Labels
bugSomething isn't workingSomething isn't workingpendingawaiting review/confirmation by maintainerawaiting review/confirmation by maintainer
Description
Checked other resources
- This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
- I added a clear and detailed title that summarizes the issue.
- I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
- I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
import asyncio
from typing import Annotated, List
from uuid import uuid4
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command
from typing_extensions import TypedDict
@tool
def create_circle(shape_id: str, color: str) -> str:
"""Create a circle on the canvas."""
response = interrupt({"tool": "create_circle", "shapeId": shape_id, "color": color})
return f"Created circle {shape_id}. Response: {response}"
@tool
def create_square(shape_id: str, color: str) -> str:
"""Create a square on the canvas."""
response = interrupt({"tool": "create_square", "shapeId": shape_id, "color": color})
return f"Created square {shape_id}. Response: {response}"
@tool
def create_triangle(shape_id: str, color: str) -> str:
"""Create a triangle on the canvas."""
response = interrupt({"tool": "create_triangle", "shapeId": shape_id, "color": color})
return f"Created triangle {shape_id}. Response: {response}"
class State(TypedDict):
messages: Annotated[List[AnyMessage], add_messages]
tools = [create_circle, create_square, create_triangle]
async def agent_node(state: State):
"""Simulates an LLM returning multiple parallel tool calls."""
return {"messages": [AIMessage(
content="I'll create three shapes for you.",
tool_calls=[
{"id": "call_1", "name": "create_circle", "args": {"shape_id": "c1", "color": "red"}},
{"id": "call_2", "name": "create_square", "args": {"shape_id": "s1", "color": "blue"}},
{"id": "call_3", "name": "create_triangle", "args": {"shape_id": "t1", "color": "green"}},
],
)]}
def router(state: State) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
async def main():
print("=" * 60)
print("Bug Demo: Parallel Interrupts Have Identical IDs")
print("=" * 60)
graph = StateGraph(State)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools=tools))
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", router, ["tools", END])
graph.add_edge("tools", "agent")
checkpointer = MemorySaver()
workflow = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": str(uuid4())}}
# First invocation - triggers interrupts
print("\n--- First Invocation ---")
await workflow.ainvoke(
{"messages": [HumanMessage(content="Create 3 shapes")]},
config=config
)
# Check state for interrupts
state = await workflow.aget_state(config)
all_interrupts = []
for task in state.tasks:
if task.interrupts:
all_interrupts.extend(task.interrupts)
print(f"\nInterrupts collected: {len(all_interrupts)}")
for i, intr in enumerate(all_interrupts):
print(f" {i}: id={intr.id}, tool={intr.value.get('tool')}")
# Show the problem: all IDs are identical
unique_ids = set(intr.id for intr in all_interrupts)
print(f"\nUnique IDs: {len(unique_ids)} (expected: 3)")
if len(unique_ids) == 1:
print("\n!!! BUG: All 3 interrupts have the SAME ID !!!")
print("This makes it impossible to resume each tool with its own value.")
# Attempt resume - this fails because dict keys collide
print("\n--- Attempting Resume ---")
resume_data = {
intr.id: {"approved": True, "tool": intr.value["tool"]}
for intr in all_interrupts
}
print(f"Resume dict has {len(resume_data)} entries (expected: 3, actual: 1 due to ID collision)")
if __name__ == "__main__":
asyncio.run(main())Error Message and Stack Trace (if applicable)
No exception will be present, this is an issue in resuming since the IDs remain same:
============================================================
Bug Demo: Parallel Interrupts Have Identical IDs
============================================================
--- First Invocation ---
Interrupts collected: 3
0: id=1b09852344aeb4aee6776a24d09c3f9e, tool=create_circle
1: id=1b09852344aeb4aee6776a24d09c3f9e, tool=create_square
2: id=1b09852344aeb4aee6776a24d09c3f9e, tool=create_triangle
Unique IDs: 1 (expected: 3)
!!! BUG: All 3 interrupts have the SAME ID !!!
This makes it impossible to resume each tool with its own value.
--- Attempting Resume ---
Resume dict has 1 entries (expected: 3, actual: 1 due to ID collision)Description
What I'm trying to do
Execute multiple tools in parallel where each tool calls interrupt() to get human approval, then resume all tools with their respective approval responses.
What I expect to happen
- All 3 tools execute in parallel and call
interrupt() - Each interrupt gets a unique ID so I can match resume values to specific tools
- I can resume with
Command(resume={id1: response1, id2: response2, id3: response3}) - Each tool receives its corresponding response
What actually happens
- All 3 tools execute and call
interrupt()(this works) - All 3 interrupts get the SAME ID (this is the bug)
- When building the resume dict, all 3 entries collapse into 1 due to key collision
- Only one tool can be resumed; the other two are stuck
Root Cause Analysis
The issue is in langgraph/types.py in the Interrupt.from_ns() method:
@classmethod
def from_ns(cls, value: Any, ns: str) -> Interrupt:
return cls(value=value, id=xxh3_128_hexdigest(ns.encode()))The interrupt ID is generated by hashing only the checkpoint namespace (ns). When multiple tools run in parallel within the same ToolNode, they share the same namespace, so they all get identical IDs.
The interrupt() function (lines) does track an interrupt counter via scratchpad.interrupt_counter(), but this counter is not included in the ID hash.
Suggested Fix
Include the interrupt index in the ID generation:
@classmethod
def from_ns(cls, value: Any, ns: str, idx: int = 0) -> Interrupt:
return cls(value=value, id=xxh3_128_hexdigest(f"{ns}:{idx}".encode()))And update the interrupt() function to pass the index:
raise GraphInterrupt(
(
Interrupt.from_ns(
value=value,
ns=conf[CONFIG_KEY_CHECKPOINT_NS],
idx=idx, # Add this
),
)
)System Info
System Information
------------------
OS: Darwin
OS Version: Darwin Kernel Version 25.1.0
Python Version: 3.13.x / 3.14.x
Package Information
-------------------
langchain_core: 0.3.x
langgraph: 0.2.x
langgraph_sdk: 0.3.x
keurcien
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingpendingawaiting review/confirmation by maintainerawaiting review/confirmation by maintainer