Skip to content

Commit 2ea5940

Browse files
authored
Add AsyncExecutor in MCPTool, hayhooks examples (#1643)
1 parent 5da9515 commit 2ea5940

File tree

6 files changed

+263
-86
lines changed

6 files changed

+263
-86
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
"""
6+
Example of a Hayhooks PipelineWrapper for deploying an MCP Tool-based time pipeline as a REST API.
7+
8+
To run this example:
9+
10+
1. Install Hayhooks and dependencies:
11+
$ pip install hayhooks haystack-ai
12+
13+
2. Start the Hayhooks server:
14+
$ hayhooks run
15+
16+
3. Deploy this pipeline wrapper:
17+
$ hayhooks pipeline deploy-files -n time_pipeline {root_dir_for_mcp_haystack_integration}/examples/hayhooks/
18+
19+
4. Invoke via curl:
20+
$ curl -X POST 'http://localhost:1416/time_pipeline/run' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{"query":"What is the time in San Francisco? Be brief"}'
21+
22+
For more information, see: https://github.com/deepset-ai/hayhooks
23+
"""
24+
25+
from hayhooks import BasePipelineWrapper
26+
from haystack import Pipeline
27+
from haystack.components.converters import OutputAdapter
28+
from haystack.components.generators.chat import OpenAIChatGenerator
29+
from haystack.components.tools import ToolInvoker
30+
from haystack.dataclasses import ChatMessage
31+
32+
from haystack_integrations.tools.mcp.mcp_tool import MCPTool, StdioServerInfo
33+
34+
35+
class PipelineWrapper(BasePipelineWrapper):
36+
def setup(self) -> None:
37+
"""
38+
Setup the pipeline with MCP time tool.
39+
40+
This creates a pipeline that uses an MCP time tool to get the current time
41+
and then uses the time to answer a user question.
42+
"""
43+
44+
time_tool = MCPTool(
45+
name="get_current_time",
46+
server_info=StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"]),
47+
)
48+
49+
self.pipeline = Pipeline()
50+
self.pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini", tools=[time_tool]))
51+
self.pipeline.add_component("tool_invoker", ToolInvoker(tools=[time_tool]))
52+
self.pipeline.add_component(
53+
"adapter",
54+
OutputAdapter(
55+
template="{{ initial_msg + initial_tool_messages + tool_messages }}",
56+
output_type=list[ChatMessage],
57+
unsafe=True,
58+
),
59+
)
60+
self.pipeline.add_component("response_llm", OpenAIChatGenerator(model="gpt-4o-mini"))
61+
self.pipeline.connect("llm.replies", "tool_invoker.messages")
62+
self.pipeline.connect("llm.replies", "adapter.initial_tool_messages")
63+
self.pipeline.connect("tool_invoker.tool_messages", "adapter.tool_messages")
64+
self.pipeline.connect("adapter.output", "response_llm.messages")
65+
66+
def run_api(self, query: str) -> str:
67+
"""
68+
Run the pipeline with a user query.
69+
70+
:param query: The user query asking about time
71+
:return: The response from the LLM
72+
"""
73+
# Create a user message from the query
74+
user_input_msg = ChatMessage.from_user(text=query)
75+
76+
# Run the pipeline
77+
result = self.pipeline.run(
78+
{"llm": {"messages": [user_input_msg]}, "adapter": {"initial_msg": [user_input_msg]}}
79+
)
80+
81+
# Return the text of the first reply
82+
return result["response_llm"]["replies"][0].text

integrations/mcp/examples/mcp_sse_client.py

+13
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,21 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import logging
6+
57
from haystack_integrations.tools.mcp import MCPTool, SSEServerInfo
68

9+
# Setup targeted logging - only show debug logs from our package
10+
logging.basicConfig(level=logging.WARNING) # Set root logger to WARNING
11+
mcp_logger = logging.getLogger("haystack_integrations.tools.mcp")
12+
mcp_logger.setLevel(logging.DEBUG)
13+
# Ensure we have at least one handler to avoid messages going to root logger
14+
if not mcp_logger.handlers:
15+
handler = logging.StreamHandler()
16+
handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
17+
mcp_logger.addHandler(handler)
18+
mcp_logger.propagate = False # Prevent propagation to root logger
19+
720
# run this client after running the server mcp_sse_server.py
821
# it shows how easy it is to use the MCPTool with SSE transport
922

integrations/mcp/examples/mcp_stdio_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import logging
6+
57
from haystack_integrations.tools.mcp import MCPTool, StdioServerInfo
68

9+
# Setup logging
10+
logging.basicConfig(level=logging.DEBUG)
11+
logger = logging.getLogger("haystack_integrations.tools.mcp")
12+
logger.setLevel(logging.DEBUG)
13+
714
# For stdio MCPTool we don't need to run a server, we can just use the MCPTool directly
815
# Here we use the mcp-server-time server
916
# See https://github.com/modelcontextprotocol/servers/tree/main/src/time for more details

integrations/mcp/examples/time_pipeline.py

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
# See https://github.com/modelcontextprotocol/servers/tree/main/src/time for more details
99
# prior to running this script, pip install mcp-server-time
1010

11+
import logging
12+
1113
from haystack import Pipeline
1214
from haystack.components.converters import OutputAdapter
1315
from haystack.components.generators.chat import OpenAIChatGenerator
@@ -16,6 +18,17 @@
1618

1719
from haystack_integrations.tools.mcp.mcp_tool import MCPTool, StdioServerInfo
1820

21+
# Setup targeted logging - only show debug logs from our package
22+
logging.basicConfig(level=logging.WARNING) # Set root logger to WARNING
23+
mcp_logger = logging.getLogger("haystack_integrations.tools.mcp")
24+
mcp_logger.setLevel(logging.DEBUG)
25+
# Ensure we have at least one handler to avoid messages going to root logger
26+
if not mcp_logger.handlers:
27+
handler = logging.StreamHandler()
28+
handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
29+
mcp_logger.addHandler(handler)
30+
mcp_logger.propagate = False # Prevent propagation to root logger
31+
1932

2033
def main():
2134
time_tool = MCPTool(

0 commit comments

Comments
 (0)