I'm building a hierarchical LLM agent workflow using LangGraph
where the research_team
performs research and the writing_team
drafts the report. Each team has a similar structure with a Team Supervisor
node and multiple Worker
nodes, and the workflow runs for each sub-section of the main report.
This setup requires that both teams communicate over the same channels while updating parts of the task, such as section_topic
and section_content
. Currently, the implementation only appends to a unified messages
list, and the LLM output sometimes prematurely hits __end__
, leaving some sub-sections incomplete.
I need a way to subclass MessageState
(or use a similar Pydantic/TypedDict approach) to create separate channels for communication. Each node—be it a supervisor or worker—should update its designated channel without interfering with the overall chat history.
Below is the code I'm working with:
Supervisor Node Function:
class SupervisorInput(MessagesState):
"""User request."""
main_topic: Annotated[str, ..., "The main topic of the request"]
section_topic: Annotated[Optional[str], "Sub-section topic of the main topic"]
section_content: Annotated[Optional[str], "Sub-section topic content"]
def make_supervisor_node(llm: BaseChatModel, system_prompt: str | None, members: List[str]) -> str:
options = ["FINISH"] + members
if system_prompt is None:
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
f" following teams: {members}. Given the user request,"
" respond with the team to act next. Each team will perform a"
" task and respond with their results and status. You should verify"
" the task performed by the teams to ensure it statisfies user request."
" When finished, respond with FINISH."
)
class SupervisorAction(TypedDict):
"""Supervisor action."""
# main_topic: SupervisorInput
section_topic: Annotated[str, "Sub-section topic of the main topic"]
section_search_query: Annotated[Optional[str], "Search query for the sub-section topic"]
next: Literal[*options]
def supervisor_node(state: SupervisorInput) -> Command[Literal[*members, "__end__"]]:
"""An LLM-based decision maker."""
# print(f"Supervisor Node State: {state}")
messages = [
{"role": "system", "content": system_prompt},
] + state["messages"]
response = llm.with_structured_output(SupervisorAction).invoke(messages)
print(f"Supervisor reponse: {response}")
goto = response["next"]
print(f"Going to node: {goto}")
if goto == "FINISH":
goto = END
return Command(goto=goto)
return supervisor_node
Research Team Graph:
## Define tools
research_tools = [TavilySearchResults(max_results=5), PubmedQueryRun(), SemanticScholarQueryRun()]
## Define LLM
research_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tavily_agent = create_react_agent(research_llm, tools=research_tools)
def tavily_node(state: SupervisorInput) -> Command[Literal["supervisor"]]:
result = tavily_agent.invoke(state)
return Command(
update = {
"messages": [
HumanMessage(content=result["messages"][-1].content, name="tavily")
]
},
# We want our workers to ALWAYS "report back" to the supervisor when done
goto = "supervisor",
)
research_supervisor_prompt = ''.join(open("./prompts/research_supervisor_prompt.txt", "r").readlines())
# print(research_supervisor_prompt)
research_supervisor_node = make_supervisor_node(research_llm, research_supervisor_prompt,
["tavily"])
## Define Research Team
research_team = StateGraph(SupervisorInput)
research_team.add_node("supervisor", research_supervisor_node)
research_team.add_node("tavily", tavily_node)
research_team.add_edge(START, "supervisor")
research_graph = research_team.compile()
The current code operates but results in disjointed LLM outputs, with the workflow reaching __end__
before all sub-sections are completed. Moreover, continuously appending to the messages
list doesn't effectively manage updates. I need guidance on modifying or subclassing MessageState
so that both the research and writing teams can update separate channels (e.g., section_topic
and section_content
) while collaborating on tasks.