Skip to content

Commit

Permalink
improve astream
Browse files Browse the repository at this point in the history
  • Loading branch information
Mustafa Kerem Kurban committed Oct 3, 2024
1 parent 21e0ed4 commit f475770
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/neuroagent/multi_agents/hierarchical_multi_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,34 +308,43 @@ async def arun(self, query: str, thread_id: str) -> AgentOutput:
async def astream(
self, thread_id: str, query: str, connection_string: str | None = None
) -> AsyncIterator[str]:
"""Run the agent against a query in streaming way.
"""Run the hierarchical team agent against a query in a streaming way.
Parameters
----------
thread_id
thread_id : str
ID of the thread of the chat.
query
query : str
Query of the user.
connection_string
connection string for the checkpoint database.
connection_string : str | None, optional
Connection string for the checkpoint database.
Yields
------
Iterator streaming the processed output of the LLM
str
Iterator streaming the processed output of the LLM and all agents.
"""
async with (
self.agent.checkpointer.__class__.from_conn_string(connection_string)
if connection_string
else AsyncExitStack() as memory
):
self.memory.__class__.from_conn_string(connection_string)
if connection_string and self.memory
else AsyncExitStack()
) as memory:
if isinstance(memory, BaseCheckpointSaver):
self.agent.checkpointer = memory
config = {"configurable": {"thread_id": thread_id}}
streamed_response = self.agent.astream_events(
{"messages": query}, version="v2", config=config
)
async for event in streamed_response:
yield event
self.memory = memory

config = RunnableConfig(configurable={"thread_id": thread_id})

async for event in self.top_level_chain.astream_events(
{"messages": [HumanMessage(content=query)]},
config=config,
version="v2"
):
if event["event"] == "on_chat_model_stream":
yield event["data"]
elif event["event"] == "on_agent_action":
yield f"Agent {event['name']} is taking action: {event['data']['tool']}"
elif event["event"] == "on_agent_finish":
yield f"Agent {event['name']} finished: {event['data']['return_values']['output']}"

@staticmethod
def _process_output(output: Any) -> AgentOutput:
Expand Down

0 comments on commit f475770

Please sign in to comment.