-
Notifications
You must be signed in to change notification settings - Fork 21
Operator
In Zenoh-Flow, an Operator processes data, received from Sources or other Operators, and forwards the results of this processing to Sinks or other Operators. An Operator is typically where one would write the core of their business logic. For instance, in an object detection pipeline, a Source would fetch the images while the actual detection would be done in an Operator.
For Zenoh-Flow to be able to load our Operator, it must be accompanied by a descriptor.
The content of the descriptor for an Operator is as follows:
-
(optional) a
description
, - (optional) some configuration,
- (optional) some vars,
- its
inputs
--- i.e. the data it requires, - its
outputs
--- i.e. the data it will produce, - a
library
--- i.e. where to find its actual implementation.
Below is a valid descriptor that matches the code we are going to write next:
description: my implementation of an Operator
# This configuration is not used and serves as an example.
configuration:
value: not-used
# This vars section is not used and serves as an example.
vars:
FOO: not-used
inputs:
- input
outputs:
- output
# Linux:
library: file:///absolute/path/to/the/implementation/libmy_operator.so
# MacOS:
# library: file:///absolute/path/to/the/implementation/libmy_operator.dylib
Assuming you want to create an Operator called my-operator
, enter the following in a terminal:
cargo new --lib my-operator
Modify the Cargo.toml
to add these dependencies and tell rustc that you want a library that can
be dynamically loaded:
[dependencies]
async-trait = "0.1.50" # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow-nodes = { git = "https://github.com/eclipse-zenoh-flow/zenoh-flow.git" }
[lib]
crate-type=["cdylib"]
Now modify lib.rs
to (i) implement the Zenoh-Flow traits and (ii) include your logic.
Below you can find commented boilerplate code to do (i).
use async_trait::async_trait;
use zenoh_flow_nodes::prelude::*;
// MyOperator is where you implement your business' logic.
//
// `Input` and `Output` are structures provided by Zenoh-Flow through which you, respectively,
// receive data from upstream nodes and send data to donwstream nodes.
//
// The way to pass an `Output` and an `Input` is through its Constructor --- see below.
//
// That structure is also where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_operator` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_operator]
struct MyOperator {
input: Input<String>,
output: Output<String>,
}
#[async_trait]
impl Node for MyOperator {
async fn iteration(&self) -> Result<()> {
// Add your business logic here.
let data_to_process = self.input.recv().await?;
let processed_data = format!("{} From Zenoh-Flow!", data_to_process);
self.output.send(processed_data, None).await
}
}
#[async_trait]
impl Operator for MyOperator {
async fn new(
// The `context` provides information about the Zenoh-Flow daemon on which the generated
// node MyOperator will be executed.
context: Context,
// The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
// dictionary and allows accessing configuration variables defined in the descriptor.
configuration: Configuration,
// The `Inputs` are created by Zenoh-Flow. It is a HashMap whose keys match what was defined
// in the descriptor file.
mut inputs: Inputs,
// The `Outputs` are created by Zenoh-Flow. It is a HashMap whose keys match what was
// defined in the descriptor file.
mut outputs: Outputs,
) -> Result<Self> {
let input = inputs
.take("input")
.expect("No input named 'input' found")
// The method `typed` allows automatically deserialising data if it comes from a
// node located on another process. It will call the provided closure.
.typed(|bytes| todo!());
let output = outputs
.take("output")
.expect("No output named 'output' found")
// The method `typed` allows automatically serialising data if it goes to a
// node located on another process. It will call the provided closure.
.typed(|buffer, data: &String| todo!());
Ok(MyOperator { input, output })
}
}
(1): Configuration (2): serde_json::Value
-
Descriptors
-
Node Implementation
-
Advanced