generated from langchain-ai/data-enrichment
-
Notifications
You must be signed in to change notification settings - Fork 36
Open
Description
Hello,
I tried to implement a supervisor into the memory template but it would seem to call my supervisor in parallel with the same input and output. Subsequent worker agent calls are also happening twice.
Please find my sample code below.
"""Example chatbot that incorporates user memories with supervisor orchestration (Option A)."""
import datetime
import logging
from dataclasses import dataclass
from typing_extensions import Annotated
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, BaseMessage
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from langgraph_sdk import get_client
from chatbot.configuration import ChatConfigurable
from chatbot.utils import format_memories
# Agents + Supervisor
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
# ===============================
# Logging
# ===============================
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# ===============================
# Chat State
# ===============================
@dataclass
class ChatState:
"""The state of the chatbot."""
messages: Annotated[list[BaseMessage], add_messages]
# ===============================
# Tools
# ===============================
def add(a: float, b: float):
"""Add two numbers."""
return a + b
def multiply(a: float, b: float):
"""Multiply two numbers."""
return a * b
def divide(a: float, b: float):
"""Divide two numbers."""
return a / b
# ===============================
# Agents
# ===============================
math_agent = create_react_agent(
model="openai:gpt-4.1",
tools=[add, multiply, divide],
prompt=(
"You are a math agent.\n\n"
"INSTRUCTIONS:\n"
"- Assist ONLY with math-related tasks\n"
"- After you're done with your tasks, respond to the supervisor directly\n"
"- Respond ONLY with the results of your work, do NOT include ANY other text."
),
name="math_agent",
)
supervisor = create_supervisor(
model=init_chat_model("openai:gpt-4.1"),
agents=[math_agent],
prompt=(
"You are a supervisor managing two agents:\n"
"- a research agent. Assign research-related tasks to this agent\n"
"- a math agent. Assign math-related tasks to this agent\n"
"Assign work to one agent at a time, do not call agents in parallel.\n"
"Do not do any work yourself."
),
add_handoff_back_messages=True,
output_mode="full_history",
).compile()
# ===============================
# Bot Node (Supervisor Orchestration)
# ===============================
async def bot(state: ChatState) -> dict[str, list[BaseMessage]]:
"""Main bot logic: inject memory, call supervisor, return updated state."""
logger.info(f"Entering bot node with state: {state.messages}")
configurable = ChatConfigurable.from_context()
namespace = (configurable.user_id,)
store = get_store()
# Search for relevant memories
query = "\n".join(str(message.content) for message in state.messages)
logger.info(f"Memory search query: {query}")
items = await store.asearch(namespace, query=query, limit=10)
logger.info(f"Retrieved memories: {items}")
# Construct system prompt with memory context
prompt = configurable.system_prompt.format(
user_info=format_memories(items),
time=datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S"),
)
input_messages = [SystemMessage(content=prompt)] + state.messages
logger.info(f"Input to supervisor: {input_messages}")
# Call supervisor once
try:
result = await supervisor.ainvoke(
{"input": input_messages},
config={"configurable": {"model": configurable.model}},
)
logger.info(f"Supervisor output: {result}")
except Exception as e:
logger.error(f"Supervisor invocation failed: {str(e)}")
result = [AIMessage(content="Sorry, I encountered an error. How can I assist you?")]
# Normalize supervisor output into list[Messages]
if isinstance(result, dict):
result = result.get("messages", [AIMessage(content=str(result))])
elif not isinstance(result, list):
result = [AIMessage(content=str(result))]
updated_messages = state.messages + result
logger.info(f"Updated messages: {updated_messages}")
return {"messages": updated_messages}
# ===============================
# Memory Scheduling Node
# ===============================
async def schedule_memories(state: ChatState, config: RunnableConfig) -> None:
"""Schedule memory formation outside hotpath (debounced)."""
configurable = ChatConfigurable.from_context()
memory_client = get_client()
await memory_client.runs.create(
thread_id=config["configurable"]["thread_id"],
multitask_strategy="enqueue",
after_seconds=configurable.delay_seconds,
assistant_id=configurable.mem_assistant_id,
input={"messages": state.messages},
config={
"configurable": {
"user_id": configurable.user_id,
"memory_types": configurable.memory_types,
},
},
)
# ===============================
# Graph Assembly
# ===============================
builder = StateGraph(ChatState, config_schema=ChatConfigurable)
builder.add_node(bot)
builder.add_node(schedule_memories)
builder.add_edge("__start__", "bot")
builder.add_edge("bot", "schedule_memories")
graph = builder.compile()
The langsmith trace is below.
https://smith.langchain.com/public/902b5b2b-e915-4017-8451-93aba043ca40/r
Is that expected behavior or am I missing something out?
Metadata
Metadata
Assignees
Labels
No labels