Connection

Let the nodes communicate asynchronously.

Since these nodes run on electronic devices, we need to transmit data beyond horseback.

Dagrs uses tokio channels to establish connections between nodes. You don't need to know how to create and use tokio channels, because Dagrs has encapsulated this mess and Graph will help you create and manage these channels.

Information packet

We call the data passed between nodes "information packet", just like in Flow-based Programming. Dagrs provides type Content as the information packet.

Create an information packet

If you want to send something like a greeting in the following example, first create an information packet.

use dagrs::Content

let greeting = Content::new("Hey there~").

How `Content` keeps data of different type:

The secret to sending information packets of different types on the same channel is the Any trait. When some data is encapsulated into a Content, they are converted into an Arc pointer.

Unwrap an information packet

When you need to get the contents of the received packet, use the get method:

use dagrs::Content

let greeting = Content::new("Hey there~").
let recv_greeting: String = greeting.get().unwrap();
assert_eq!(&recv_greeting, "Hey there~");

Note:

The get method converts the Content's Arc pointer to the type you annotated, or the type inferred by the compiler. If the conversion is successful, you will get Some(T), otherwise you will get None.

Send & Receive information packet

Send a information packet

In Dagrs, each node maintains an OutChannels to manage channels from that node to other nodes. By providing the destination node id and the packet information, OutChannels will send the packet to the node you want.

Suppose we have two nodes: a and b, and we want node a to send a greeting to b. Then we need to put the greeting into the channel from a to b as the following example, which implements node a's action - GreetingAction.

use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};

const NAME_OF_B: &str = "b";

pub struct GreetingAction;
#[async_trait]
impl Action for GreetingAction {
    async fn run(&self, _: &mut InChannels, out_channel: &OutChannels, env: Arc<EnvVar>) -> Output {
        let greeting = Content::new("Hey there~").
        let b_id = env.get_node_id(NAME_OF_B).unwrap();
        out_channel.send_to(&b_id, greeting).unwrap();

        Output::Out(None)
    }
}

The send_to method is asynchronouse. You can also use the method blocking_send, which is synchronouse.

Receive a information packet

Each node maintains an InChannels to manage channels from other nodes to that node. By providing the source node id, InChannels will try to receive a packet from the node you want.

Following the same pattern above, the following example shows how node b recieves a greeting from node a via the implementation of RecvGreetingAction.

use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};

const NAME_OF_A: &str = "a";

pub struct RecvGreetingAction;
#[async_trait]
impl Action for RecvGreetingAction {
    async fn run(&self, in_channel: &mut InChannels, _: &OutChannels, env: Arc<EnvVar>) -> Output {
        let a_id = env.get_node_id(NAME_OF_A).unwrap();
        let greeting = in_channel.recv_from(&a_id).unwrap().

        Output::Out(None)
    }
}

The recv_from method is asynchronouse. You can also use the method blocking_recv_from, which is synchronouse.

Lifetime of the channel

Channels are created when the graph is constructed and the corresponding senders and receivers of these channels are assigned to nodes. Since all good things must come to an end, you can close senders/receivers manually, otherwise they will be closed automatically when the graph is deleted.

Close a sender

Let's continue with the above example, where node a closes the channel's sender after sending the greeting:

const NAME_OF_B: &str = "b";
#[async_trait]
impl Node for MyNode {
    // Other method implementations...
    async fn run(&mut self, env: Arc<EnvVar>) -> Output {
        // `action` sends the greeting to other nodes
        let output = self.action.run(&mut self.input_channels(), &self.output_channels(), env.clone());
        let b_id = env.get_node_id(NAME_OF_B).unwrap();
        self.output_channels().close(&b_id);
        output
    }
}

After a closes the sender, the key-value pair of (b's id, the closed sender) is removed from a.out_channels.

Close a receiver

When node b receives the greeting, it also closes the receiver:

use dagrs::{Action, InChannels, OutChannels, EnvVar, Output};

const NAME_OF_A: &str = "a";

pub struct RecvGreetingAction;
#[async_trait]
impl Action for RecvGreetingAction {
    async fn run(&self, in_channel: &mut InChannels, _: &OutChannels, env: Arc<EnvVar>) -> Output {
        let a_id = env.get_node_id(NAME_OF_A).unwrap();
        let greeting = in_channel.recv_from(&a_id).unwrap().

        in_channel.close(&a_id);
        Output::Out(None)
    }
}

Exception cases in send/receive

Things could go wrong even though they appear perfect.

Error types of send

If a send operation fails, you will get an enum SendErr:

Enum Variant of SendErrDescription
NoSuchChannelNo sender corresponding to the given node id.
ClosedChannel(Content)The corresponding channel is closed alredy. Returns the content that failed to be sent.

Error types of receive

If a receive operation fails, you will get an enum RecvErr:

Enum Variant of RecvErrDescription
NoSuchChannelNo sender corresponding to the given node id.
ClosedThe corresponding channel is closed alredy.
Lagged(u64)The channel experienced a buffer overflow, causing the receiver to discard a certain number of information packets.