Skip to content

Commit 117bcf4

Browse files
jonSuitsHenryHengZJ
authored andcommitted
Add Detailed Streaming to the Tool Agent (FlowiseAI#4155)
* Add Detailed Streaming to the Tool Agent * lint fix --------- Co-authored-by: Henry <[email protected]>
1 parent a8e19a0 commit 117bcf4

File tree

2 files changed

+118
-3
lines changed

2 files changed

+118
-3
lines changed

packages/components/nodes/agents/ToolAgent/ToolAgent.ts

+34-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
IUsedTool,
2525
IVisionChatModal
2626
} from '../../../src/Interface'
27-
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
27+
import { ConsoleCallbackHandler, CustomChainHandler, CustomStreamingHandler, additionalCallbacks } from '../../../src/handler'
2828
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
2929
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
3030
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
@@ -101,6 +101,15 @@ class ToolAgent_Agents implements INode {
101101
type: 'number',
102102
optional: true,
103103
additionalParams: true
104+
},
105+
{
106+
label: 'Enable Detailed Streaming',
107+
name: 'enableDetailedStreaming',
108+
type: 'boolean',
109+
default: false,
110+
description: 'Stream detailed intermediate steps during agent execution',
111+
optional: true,
112+
additionalParams: true
104113
}
105114
]
106115
this.sessionId = fields?.sessionId
@@ -113,6 +122,7 @@ class ToolAgent_Agents implements INode {
113122
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
114123
const memory = nodeData.inputs?.memory as FlowiseMemory
115124
const moderations = nodeData.inputs?.inputModeration as Moderation[]
125+
const enableDetailedStreaming = nodeData.inputs?.enableDetailedStreaming as boolean
116126

117127
const shouldStreamResponse = options.shouldStreamResponse
118128
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
@@ -136,14 +146,28 @@ class ToolAgent_Agents implements INode {
136146
const loggerHandler = new ConsoleCallbackHandler(options.logger)
137147
const callbacks = await additionalCallbacks(nodeData, options)
138148

149+
// Add custom streaming handler if detailed streaming is enabled
150+
let customStreamingHandler = null
151+
152+
if (enableDetailedStreaming && shouldStreamResponse) {
153+
customStreamingHandler = new CustomStreamingHandler(sseStreamer, chatId)
154+
}
155+
139156
let res: ChainValues = {}
140157
let sourceDocuments: ICommonObject[] = []
141158
let usedTools: IUsedTool[] = []
142159
let artifacts = []
143160

144161
if (shouldStreamResponse) {
145162
const handler = new CustomChainHandler(sseStreamer, chatId)
146-
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
163+
const allCallbacks = [loggerHandler, handler, ...callbacks]
164+
165+
// Add detailed streaming handler if enabled
166+
if (enableDetailedStreaming && customStreamingHandler) {
167+
allCallbacks.push(customStreamingHandler)
168+
}
169+
170+
res = await executor.invoke({ input }, { callbacks: allCallbacks })
147171
if (res.sourceDocuments) {
148172
if (sseStreamer) {
149173
sseStreamer.streamSourceDocumentsEvent(chatId, flatten(res.sourceDocuments))
@@ -174,7 +198,14 @@ class ToolAgent_Agents implements INode {
174198
}
175199
}
176200
} else {
177-
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
201+
const allCallbacks = [loggerHandler, ...callbacks]
202+
203+
// Add detailed streaming handler if enabled
204+
if (enableDetailedStreaming && customStreamingHandler) {
205+
allCallbacks.push(customStreamingHandler)
206+
}
207+
208+
res = await executor.invoke({ input }, { callbacks: allCallbacks })
178209
if (res.sourceDocuments) {
179210
sourceDocuments = res.sourceDocuments
180211
}

packages/components/src/handler.ts

+84
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { LangWatch, LangWatchSpan, LangWatchTrace, autoconvertTypedValues } from
3030
import { DataSource } from 'typeorm'
3131
import { ChatGenerationChunk } from '@langchain/core/outputs'
3232
import { AIMessageChunk } from '@langchain/core/messages'
33+
import { Serialized } from '@langchain/core/load/serializable'
3334

3435
interface AgentRun extends Run {
3536
actions: AgentAction[]
@@ -1499,3 +1500,86 @@ export class AnalyticHandler {
14991500
}
15001501
}
15011502
}
1503+
1504+
/**
1505+
* Custom callback handler for streaming detailed intermediate information
1506+
* during agent execution, specifically tool invocation inputs and outputs.
1507+
*/
1508+
export class CustomStreamingHandler extends BaseCallbackHandler {
1509+
name = 'custom_streaming_handler'
1510+
1511+
private sseStreamer: IServerSideEventStreamer
1512+
private chatId: string
1513+
1514+
constructor(sseStreamer: IServerSideEventStreamer, chatId: string) {
1515+
super()
1516+
this.sseStreamer = sseStreamer
1517+
this.chatId = chatId
1518+
}
1519+
1520+
/**
1521+
* Handle the start of a tool invocation
1522+
*/
1523+
async handleToolStart(tool: Serialized, input: string, runId: string, parentRunId?: string): Promise<void> {
1524+
if (!this.sseStreamer) return
1525+
1526+
const toolName = typeof tool === 'object' && tool.name ? tool.name : 'unknown-tool'
1527+
const toolInput = typeof input === 'string' ? input : JSON.stringify(input, null, 2)
1528+
1529+
// Stream the tool invocation details using the agent_trace event type for consistency
1530+
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
1531+
step: 'tool_start',
1532+
name: toolName,
1533+
input: toolInput,
1534+
runId,
1535+
parentRunId: parentRunId || null
1536+
})
1537+
}
1538+
1539+
/**
1540+
* Handle the end of a tool invocation
1541+
*/
1542+
async handleToolEnd(output: string | object, runId: string, parentRunId?: string): Promise<void> {
1543+
if (!this.sseStreamer) return
1544+
1545+
const toolOutput = typeof output === 'string' ? output : JSON.stringify(output, null, 2)
1546+
1547+
// Stream the tool output details using the agent_trace event type for consistency
1548+
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
1549+
step: 'tool_end',
1550+
output: toolOutput,
1551+
runId,
1552+
parentRunId: parentRunId || null
1553+
})
1554+
}
1555+
1556+
/**
1557+
* Handle tool errors
1558+
*/
1559+
async handleToolError(error: Error, runId: string, parentRunId?: string): Promise<void> {
1560+
if (!this.sseStreamer) return
1561+
1562+
// Stream the tool error details using the agent_trace event type for consistency
1563+
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
1564+
step: 'tool_error',
1565+
error: error.message,
1566+
runId,
1567+
parentRunId: parentRunId || null
1568+
})
1569+
}
1570+
1571+
/**
1572+
* Handle agent actions
1573+
*/
1574+
async handleAgentAction(action: AgentAction, runId: string, parentRunId?: string): Promise<void> {
1575+
if (!this.sseStreamer) return
1576+
1577+
// Stream the agent action details using the agent_trace event type for consistency
1578+
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
1579+
step: 'agent_action',
1580+
action: JSON.stringify(action),
1581+
runId,
1582+
parentRunId: parentRunId || null
1583+
})
1584+
}
1585+
}

0 commit comments

Comments
 (0)