Connection
Let the nodes communicate asynchronously.
Since these nodes run on electronic devices, we need to transmit data beyond horseback.
Dagrs uses tokio channels to establish connections
between nodes. You don't need to know how to create and use tokio channels, because Dagrs has encapsulated
this mess and Graph
will help you create and manage these channels.
Information packet
We call the data passed between nodes "information packet", just like in Flow-based
Programming. Dagrs provides type Content
as the information packet.
Create an information packet
If you want to send something like a greeting in the following example, first create an information packet.
use dagrs::Content
let greeting = Content::new("Hey there~").
How `Content` keeps data of different type:
The secret to sending information packets of different types on the same channel is
the Any
trait. When some data is
encapsulated into a Content
, they are converted into an Arc
pointer.
Unwrap an information packet
When you need to get the contents of the received packet, use the get
method:
use dagrs::Content
let greeting = Content::new("Hey there~").
let recv_greeting: String = greeting.get().unwrap();
assert_eq!(&recv_greeting, "Hey there~");
Note:
The get
method converts the Content
's Arc
pointer to the type you annotated,
or the type inferred by the compiler. If the conversion is successful, you will get
Some(T)
, otherwise you will get None
.
Send & Receive information packet
Send a information packet
In Dagrs, each node maintains an OutChannels
to manage channels from that node to other nodes.
By providing the destination node id and the packet information, OutChannels
will send the packet
to the node you want.
Suppose we have two nodes: a
and b
, and we want node a
to send a greeting to b
.
Then we need to put the greeting into the channel from a
to b
as the following example,
which implements node a
's action - GreetingAction
.
use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
const NAME_OF_B: &str = "b";
pub struct GreetingAction;
#[async_trait]
impl Action for GreetingAction {
async fn run(&self, _: &mut InChannels, out_channel: &OutChannels, env: Arc<EnvVar>) -> Output {
let greeting = Content::new("Hey there~").
let b_id = env.get_node_id(NAME_OF_B).unwrap();
out_channel.send_to(&b_id, greeting).unwrap();
Output::Out(None)
}
}
The send_to
method is asynchronouse. You can also use the method
blocking_send
, which is synchronouse.
Receive a information packet
Each node maintains an InChannels
to manage channels from other nodes to that node.
By providing the source node id, InChannels
will try to receive a packet
from the node you want.
Following the same pattern above, the following example shows how node b
recieves a
greeting from node a
via the implementation of RecvGreetingAction
.
use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
const NAME_OF_A: &str = "a";
pub struct RecvGreetingAction;
#[async_trait]
impl Action for RecvGreetingAction {
async fn run(&self, in_channel: &mut InChannels, _: &OutChannels, env: Arc<EnvVar>) -> Output {
let a_id = env.get_node_id(NAME_OF_A).unwrap();
let greeting = in_channel.recv_from(&a_id).unwrap().
Output::Out(None)
}
}
The recv_from
method is asynchronouse. You can also use the method
blocking_recv_from
, which is synchronouse.
Lifetime of the channel
Channels are created when the graph is constructed and the corresponding senders and receivers of these channels are assigned to nodes. Since all good things must come to an end, you can close senders/receivers manually, otherwise they will be closed automatically when the graph is deleted.
Close a sender
Let's continue with the above example, where node a
closes the channel's
sender after sending the greeting:
const NAME_OF_B: &str = "b";
#[async_trait]
impl Node for MyNode {
// Other method implementations...
async fn run(&mut self, env: Arc<EnvVar>) -> Output {
// `action` sends the greeting to other nodes
let output = self.action.run(&mut self.input_channels(), &self.output_channels(), env.clone());
let b_id = env.get_node_id(NAME_OF_B).unwrap();
self.output_channels().close(&b_id);
output
}
}
After a
closes the sender, the key-value pair of
(b
's id, the closed sender) is removed from a.out_channels
.
Close a receiver
When node b
receives the greeting, it also closes the receiver:
use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};
const NAME_OF_A: &str = "a";
pub struct RecvGreetingAction;
#[async_trait]
impl Action for RecvGreetingAction {
async fn run(&self, in_channel: &mut InChannels, _: &OutChannels, env: Arc<EnvVar>) -> Output {
let a_id = env.get_node_id(NAME_OF_A).unwrap();
let greeting = in_channel.recv_from(&a_id).unwrap().
in_channel.close(&a_id);
Output::Out(None)
}
}
Exception cases in send/receive
Things could go wrong even though they appear perfect.
Error types of send
If a send operation fails, you will get an enum SendErr
:
Enum Variant of SendErr | Description |
---|---|
NoSuchChannel | No sender corresponding to the given node id. |
ClosedChannel(Content) | The corresponding channel is closed alredy. Returns the content that failed to be sent. |
Error types of receive
If a receive operation fails, you will get an enum RecvErr
:
Enum Variant of RecvErr | Description |
---|---|
NoSuchChannel | No sender corresponding to the given node id. |
Closed | The corresponding channel is closed alredy. |
Lagged(u64) | The channel experienced a buffer overflow, causing the receiver to discard a certain number of information packets. |