Skip to content

Commit 3bcefbd

Browse files
authored
ray-haystack integration (#278)
* ray-haystack integration * Code review suggestions
1 parent 312db5e commit 3bcefbd

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

images/ray-pipeline-concurrent.gif

714 KB
Loading

integrations/ray.md

+350
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
---
2+
layout: integration
3+
name: Ray
4+
description: Run and scale Haystack Pipelines with Ray in distributed manner
5+
authors:
6+
- name: Sergey Bondarenco
7+
socials:
8+
github: prosto
9+
pypi: https://pypi.org/project/ray-haystack/
10+
repo: https://github.com/prosto/ray-haystack
11+
type: Distributed Computing
12+
report_issue: https://github.com/prosto/ray-haystack/issues
13+
logo: /logos/ray.png
14+
version: Haystack 2.0
15+
toc: true
16+
---
17+
18+
### Table of Contents
19+
20+
- [Overview](#overview)
21+
- [Installation](#installation)
22+
- [Usage](#usage)
23+
- [Start with an example](#start-with-an-example)
24+
- [Read pipeline events](#read-pipeline-events)
25+
- [Component Serialization](#component-serialization)
26+
- [DocumentStore with Ray](#documentstore-with-ray)
27+
- [RayPipeline Settings](#raypipeline-settings)
28+
- [Middleware](#middleware)
29+
- [Resources](#resources)
30+
- [License](#license)
31+
32+
## Overview
33+
34+
`ray-haystack` is a python package which allows running [Haystack pipelines](https://docs.haystack.deepset.ai/docs/pipelines) on [Ray](https://docs.ray.io/en/latest/ray-overview/index.html)
35+
in a distributed manner. The package provides the same API to build and run Haystack pipelines, but under the hood, components are being distributed to remote nodes for execution using Ray primitives.
36+
Specifically, [Ray Actor](https://docs.ray.io/en/latest/ray-core/actors.html) is created for each component in a pipeline to `run` its logic.
37+
38+
The purpose of this library is to showcase the ability to run Haystack in a distributed setup with Ray featuring its options to configure the payload, e.g:
39+
40+
- Control with [resources](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html) how much CPU/GPU is needed for a component to run (per each component if needed)
41+
- Manage [environment dependencies](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html) for components to run on dedicated machines.
42+
- Run pipeline on Kubernetes using [KubeRay](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started.html)
43+
44+
Most of the time, you will run Haystack pipelines on your local environment; even in production, you will want to run the pipeline on a single node if the goal is to return a response quickly to the user without the overhead you would usually get with a distributed setup. However, in the case of long running and complex RAG pipelines distributed way might help:
45+
46+
- Not every component needs GPU, most will use some external API calls. With Ray it should be possible to assign respective resource requirements (CPU, RAM) per component execution needs.
47+
- Some components might take longer to run, so ideally, if there is an option to parallelize component execution, it would decrease pipeline run time.
48+
- With asynchronous execution, it should be possible to interact with different component execution stages (e.g. fire an event before and after the component starts).
49+
50+
`ray-haystack` provides a custom implementation for pipeline execution logic with the goal to stay **as compliant as possible with native Haystack implementation**.
51+
In most cases, you should expect the same results (outputs) from pipeline runs. On top of that, the package will parallelize component runs where possible.
52+
Components with no active dependencies can be scheduled without waiting for currently running components.
53+
54+
![Ray Pipeline Parallel](https://raw.githubusercontent.com/deepset-ai/haystack-integrations/main/images/ray-pipeline-concurrent.gif)
55+
56+
## Installation
57+
58+
`ray-haystack` can be installed as any other Python library, using pip:
59+
60+
```shell
61+
pip install ray-haystack
62+
```
63+
64+
The package should work with python version 3.8 and onwards. If you plan to use `ray-haystack` with an existing Ray cluster, make sure you align python and `ray` versions with those running in the cluster.
65+
66+
> **Note**
67+
> The `ray-haystack` package will install both `haystack-ai` and `ray` as transitive dependencies. The minimum supported version of haystack is `2.6.0`.
68+
69+
If you would like to see [Ray dashboard](https://docs.ray.io/en/latest/ray-observability/getting-started.html) when starting Ray cluster locally, install Ray as follows:
70+
71+
```shell
72+
pip install -U "ray[default]"
73+
pip install ray-haystack
74+
```
75+
76+
While pipeline is running locally, access the dashboard in the browser at [http://localhost:8265](http://localhost:8265).
77+
78+
## Usage
79+
80+
### Start with an example
81+
82+
Once `ray-haystack` is installed, let's demonstrate how it works by running a simple example.
83+
84+
We will build a pipeline that fetches RSS news headlines from the list of given URLs and converts each headline to a `Document` with content equal to the headline title. We then ask LLM (`OpenAIGenerator`) to create a news summary from the list of converted Documents, given a prompt `template`.
85+
86+
```python
87+
import io
88+
import os
89+
from typing import List, Optional
90+
from xml.etree.ElementTree import parse as parse_xml
91+
92+
import ray # Import ray
93+
from haystack import Document, component
94+
from haystack.components.builders import PromptBuilder
95+
from haystack.components.fetchers import LinkContentFetcher
96+
from haystack.components.generators import OpenAIGenerator
97+
from haystack.components.joiners import DocumentJoiner
98+
from haystack.dataclasses import ByteStream
99+
100+
from ray_haystack import RayPipeline # Import RayPipeline (instead of `from haystack import Pipeline`)
101+
102+
# Please introduce your OpenAI Key here
103+
os.environ["OPENAI_API_KEY"] = "You OpenAI Key"
104+
105+
@component
106+
class XmlConverter:
107+
"""
108+
Custom component which parses given RSS feed (from ByteStream) and extracts values by a
109+
given XPath, e.g. ".//channel/item/title" will find "title" for each RSS feed item.
110+
A Document is created for each extracted title. The `category` attribute can be used as
111+
an additional metadata field.
112+
"""
113+
114+
def __init__(self, xpath: str = ".//channel/item/title", category: Optional[str] = None):
115+
self.xpath = xpath
116+
self.category = category
117+
118+
@component.output_types(documents=List[Document])
119+
def run(self, sources: List[ByteStream]):
120+
documents: List[Document] = []
121+
for source in sources:
122+
xml_content = io.StringIO(source.to_string())
123+
documents.extend(
124+
Document(content=elem.text, meta={"category": self.category})
125+
for elem in parse_xml(xml_content).findall(self.xpath) # noqa: S314
126+
if elem.text
127+
)
128+
return {"documents": documents}
129+
130+
template = """
131+
Given news headlines below provide a summary of what is happening in the world right now in a couple of sentences.
132+
You will be given headline titles in the following format: "<headline category>: <headline title>".
133+
When creating summary pay attention to common news headlines as those could be most insightful.
134+
135+
HEADLINES:
136+
{% for document in documents %}
137+
{{ document.meta["category"] }}: {{ document.content }}
138+
{% endfor %}
139+
140+
SUMMARY:
141+
"""
142+
143+
# Create instance of Ray pipeline
144+
pipeline = RayPipeline()
145+
146+
pipeline.add_component("tech-news-fetcher", LinkContentFetcher())
147+
pipeline.add_component("business-news-fetcher", LinkContentFetcher())
148+
pipeline.add_component("politics-news-fetcher", LinkContentFetcher())
149+
pipeline.add_component("tech-xml-converter", XmlConverter(category="tech"))
150+
pipeline.add_component("business-xml-converter", XmlConverter(category="business"))
151+
pipeline.add_component("politics-xml-converter", XmlConverter(category="politics"))
152+
pipeline.add_component("document_joiner", DocumentJoiner(sort_by_score=False))
153+
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
154+
pipeline.add_component("generator", OpenAIGenerator()) # "gpt-4o-mini" is the default model
155+
156+
pipeline.connect("tech-news-fetcher", "tech-xml-converter.sources")
157+
pipeline.connect("business-news-fetcher", "business-xml-converter.sources")
158+
pipeline.connect("politics-news-fetcher", "politics-xml-converter.sources")
159+
pipeline.connect("tech-xml-converter", "document_joiner")
160+
pipeline.connect("business-xml-converter", "document_joiner")
161+
pipeline.connect("politics-xml-converter", "document_joiner")
162+
pipeline.connect("document_joiner", "prompt_builder")
163+
pipeline.connect("prompt_builder", "generator.prompt")
164+
165+
# Draw pipeline and save it to `pipe.png`
166+
# pipeline.draw("pipe.png")
167+
168+
# Start local Ray cluster
169+
ray.init()
170+
171+
# Prepare pipeline inputs by specifying RSS urls for each fetcher
172+
pipeline_inputs = {
173+
"tech-news-fetcher": {
174+
"urls": [
175+
"https://www.theverge.com/rss/frontpage/",
176+
"https://techcrunch.com/feed",
177+
"https://cnet.com/rss/news",
178+
"https://wired.com/feed/rss",
179+
]
180+
},
181+
"business-news-fetcher": {
182+
"urls": [
183+
"https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10001147",
184+
"https://www.business-standard.com/rss/home_page_top_stories.rss",
185+
"https://feeds.a.dj.com/rss/WSJcomUSBusiness.xml",
186+
]
187+
},
188+
"politics-news-fetcher": {
189+
"urls": [
190+
"https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10000113",
191+
"https://rss.nytimes.com/services/xml/rss/nyt/Politics.xml",
192+
]
193+
},
194+
}
195+
196+
# Run pipeline with inputs
197+
result = pipeline.run(pipeline_inputs)
198+
199+
# Print response from LLM
200+
print("RESULT: ", result["generator"]["replies"][0])
201+
```
202+
203+
Key takeways from the example:
204+
205+
- import `ray` module
206+
- import `RayPipeline` (from `ray_haystack`) instead of `Pipeline` class from `haystack`
207+
- before running the pipeline, start [local ray cluster](https://docs.ray.io/en/latest/ray-core/starting-ray.html#start-ray-init) with explicit `ray.init()` call
208+
209+
Under the hood, `RyaPipeline` creates actors for each component in the pipeline and runs it in a distributed manner until no components are left to run. By default, `RyaPipeline` blocks until the pipeline finishes its execution.
210+
211+
### Read pipeline events
212+
213+
In some cases, you would want to react asynchronously to particular pipeline execution points:
214+
215+
- when pipeline starts
216+
- before component runs
217+
- after component finishes
218+
- after pipeline finishes
219+
220+
Internally, `RayPipeline` creates an instance of [Ray Queue](https://docs.ray.io/en/latest/ray-core/api/doc/ray.util.queue.Queue.html) where such events are stored and can be consumed.
221+
222+
Except for the standard `run` method, `RayPipeline` provides a method called `run_nowait`, which returns pipeline execution results without blocking current logic. We can use `run_nowait` to iterate over pipeline events, e.g.
223+
224+
```python
225+
result = pipeline.run_nowait(pipeline_inputs)
226+
227+
for pipeline_event in result.pipeline_events_sync():
228+
print(
229+
f"\n>>> [{pipeline_event.time}] Source: {pipeline_event.source} | Type: {pipeline_event.type} | Data={pipeline_event.data}"
230+
)
231+
```
232+
233+
### Component Serialization
234+
235+
If you run native Haystack pipeline locally, the components remain in the same python process, and there is no reason to care about the distributed setup. When RayPipeline runs in a distributed manner, it should be able to [serialize](https://docs.ray.io/en/latest/ray-core/objects/serialization.html) components before they end up in a remote task or actor.
236+
237+
The `ray-haystack` package relies on Haystack's standard `to_dict` and `from_dict` methods to serialize and deserialize components, respectively.
238+
Please refer to the main [documentation](https://github.com/prosto/ray-haystack?tab=readme-ov-file#component-serialization) for a detailed explanation and features that handle edge cases.
239+
240+
### DocumentStore with Ray
241+
242+
When you use [InMemoryDocumentStore](https://docs.haystack.deepset.ai/docs/inmemorydocumentstore) or any DocumentStore which runs in memory with `RayPipeline`, you will stumble upon an apparent issue: in a distributed environment, these document stores running in memory will fail to operate as components that reference the store will not point to a single instance but rather a copy of it.
243+
244+
`ray-haystack` package provides a wrapper around `InMemoryDocumentStore` by implementing a proxy pattern so that only a single instance of `InMemoryDocumentStore` across Ray cluster is present. With that, components can share a single store. Use `RayInMemoryDocumentStore`, `RayInMemoryEmbeddingRetriever` or `RayInMemoryBM25Retriever` in case you need in-memory document store in your Ray pipelines.
245+
246+
### RayPipeline Settings
247+
248+
When an actor is created in Ray, we can control its behavior by providing certain [settings](https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html).
249+
250+
`ray-haystack` provides means to configure pipeline Actors with the help of `RayPipelineSettings` dictionary:
251+
252+
```python
253+
from typing import Any, Dict
254+
255+
from ray_haystack import RayPipeline, RayPipelineSettings
256+
257+
settings: RayPipelineSettings = {
258+
"common": {
259+
"actor_options": {
260+
"namespace": "haystack", # common namespace name for all actors
261+
}
262+
},
263+
"components": {
264+
"per_component": {
265+
"generator": {
266+
"actor_options": {
267+
"num_cpus": 2, # component specific CPU resource requirement
268+
}
269+
}
270+
}
271+
},
272+
}
273+
274+
# Option 1 - Pass settings through pipeline's metadata
275+
pipeline = RayPipeline(metadata={"ray": settings})
276+
277+
pipeline_inputs: Dict[str, Any] = {}
278+
279+
# Option 2 - Pass settings when in the `run` method
280+
pipeline.run(pipeline_inputs, ray_settings=settings)
281+
```
282+
283+
### Middleware
284+
285+
Sometimes it might be useful to let custom logic run before and after component actor runs the component.
286+
287+
It is possible to build custom middleware:
288+
289+
```python
290+
from typing import Any, Literal
291+
292+
import ray
293+
from haystack.components.fetchers import LinkContentFetcher
294+
295+
from ray_haystack import RayPipeline, RayPipelineSettings
296+
from ray_haystack.middleware import ComponentMiddleware, ComponentMiddlewareContext
297+
from ray_haystack.serialization import worker_asset
298+
299+
ray.init()
300+
301+
@worker_asset
302+
class TraceMiddleware(ComponentMiddleware):
303+
def __init__(self, capture: Literal["input", "output", "input_and_output"] = "input_and_output"):
304+
self.capture = capture
305+
306+
def __call__(self, component_input, ctx: ComponentMiddlewareContext) -> Any:
307+
print(f"Tracer: Before running component '{ctx['component_name']}' with inputs: '{component_input}'")
308+
309+
outputs = self.next(component_input, ctx)
310+
311+
print(f"Tracer: After running component '{ctx['component_name']}' with outputs: '{outputs}'")
312+
313+
return outputs
314+
315+
pipeline = RayPipeline()
316+
pipeline.add_component("cocktail_fetcher", LinkContentFetcher())
317+
318+
settings: RayPipelineSettings = {
319+
"components": {
320+
"per_component": {
321+
# Middleware applies only to "cocktail_fetcher" component
322+
"cocktail_fetcher": {
323+
"middleware": {
324+
"trace": {"type": "__main__.TraceMiddleware"},
325+
},
326+
},
327+
}
328+
},
329+
}
330+
331+
response = pipeline.run(
332+
{
333+
"cocktail_fetcher": {"urls": ["https://www.thecocktaildb.com/api/json/v1/1/random.php"]},
334+
},
335+
ray_settings=settings,
336+
)
337+
```
338+
339+
## Resources
340+
341+
- The full documentation is available in the [repository](https://github.com/prosto/ray-haystack/tree/main)
342+
- Explore more advanced examples:
343+
- [Trace Haystack Pipelines in Browser](https://github.com/prosto/ray-haystack/blob/main/examples/pipeline_watch/README.md)
344+
- [Running Haystack Pipeline on Kubernetes](https://github.com/prosto/ray-haystack/blob/main/examples/pipeline_kubernetes/README.md)
345+
- [Run pipeline with detached component actors](https://github.com/prosto/ray-haystack/tree/main/examples/pipeline_detached_actors)
346+
- [Learn more about Ray](https://docs.ray.io/en/latest/ray-overview/getting-started.html)
347+
348+
## License
349+
350+
`ray-haystack` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.

logos/ray.png

58.6 KB
Loading

0 commit comments

Comments
 (0)