Node
Abstraction of a task.
In Dagrs, we provide the trait Node
as the representation of
user's task. Node
is the basic scheduling unit of Graph
.
Node's interfaces
#[async_trait]
pub trait Node: Send + Sync {
/// id is the unique identifier of each node, it will be assigned
/// by the [`NodeTable`].
/// when creating a new node, you can find this node through
/// this identifier.
fn id(&self) -> NodeId;
/// The node's name.
fn name(&self) -> NodeName;
/// Input Channels of this node.
fn input_channels(&mut self) -> &mut InChannels;
/// Output Channels of this node.
fn output_channels(&mut self) -> &mut OutChannels;
/// Execute a run of this node.
async fn run(&mut self, env: Arc<EnvVar>) -> Output;
}
The above methods ensure the communication and execution function when Graph
schedules Tasks.
Dynamic execution logics
It is possible to execute different logic on the same data layout. So we provide trait
Action
as the abstraction of a task's logic.
Here is an example of implementing Action
, that simply returns Output::Out
containing a String "Hello world".
use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
#[derive(Default)]
pub struct HelloAction;
#[async_trait]
impl Action for HelloAction {
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new("Hello world".to_string())))
}
}
Create a node
Get a node id
Before creating a new node, let's first get to know the NodeTable
- a mapping of nodes' names to their ids.
During runtime, we need to figure out the specific node to send/receive messages to/from it. Therefore, each node will be assigned
a unique id before the run.
You can call the alloc_id_for
method of NodeTable
to alloc an id by providing a name, just like the following example.
use dagrs::NodeTable;
let node_table = NodeTable::default();
let name = "Node Name";
let id = node_table.alloc_id_for(name);
Create a default node
If you don't need a Node
to hold any extra information, DefaultNode
is the best choice.
We can create a DefaultNode
with a
custom Action
and other necceray fields. The example below creates a DefaultNode
with a name of
"My Node", an action of HelloAction
(defined above), a NodeId
assigned by the node_table
, and empty
input_channels and output_channels.
use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction};
let node_name = "My Node";
let mut node_table = NodeTable::new();
let mut node = DefaultNode::with_action(
NodeName::from(node_name),
HelloAction::default(),
&mut node_table,
);
// Check if node table has key-value pair (node.name, node.id)
assert_eq!(node_table.get(node_name).unwrap(), &node.id());
let env = Arc::new(EnvVar::new(node_table));
let out = node.run(env).get_out().unwrap();
let out: &String = out.get().unwrap();
assert_eq!(out, "Hello world");
Implement a customized node
Don't worry if the DefaultNode
can't satisfy your need.
You are free to implement a customized Node
like the following example:
use std::sync::Arc;
use async_trait::async_trait;
use dagrs::{Content, EnvVar, InChannels, Node, NodeId, NodeName, OutChannels, Output};
struct MessageNode {
id: NodeId,
name: NodeName,
in_channels: InChannels,
out_channels: OutChannels,
/*Put your custom fields here.*/
message: String,
}
#[async_trait]
impl Node for MessageNode {
fn id(&self) -> NodeId {
self.id
}
fn name(&self) -> NodeName {
self.name.clone()
}
fn input_channels(&mut self) -> &mut InChannels {
&mut self.in_channels
}
fn output_channels(&mut self) -> &mut OutChannels {
&mut self.out_channels
}
async fn run(&mut self, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new(self.message.clone())))
}
}
The MessageNode
above simply returns its field message
when run. If needed,
the execution logic of this node can also be replaced with an Action
, just like the
DefaultNode
.
If you need many custom Nodes like this, it can be annoying to duplicate these necceray fields
and methods required by the Node
trait. The auto_node
macro can help you out.