Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Quick Start

This guide walks through a complete example: two nodes that discover each other and exchange typed data through a stream.

Step 1: Create the Network

Every mosaik application starts by creating a Network. The NetworkId ensures only nodes on the same logical network can communicate.

use mosaik::{Network, NetworkId};

let network_id: NetworkId = "my-app".into();
let network = Network::new(network_id).await?;

Network::new() creates a node with default settings:

  • A random secret key (new identity each run — this is the recommended default)
  • Default relay mode (uses iroh’s relay servers for NAT traversal)
  • No bootstrap peers (fine for local testing)
  • No tags

For production use, you’ll want Network::builder() to configure bootstrap peers so the node can find the network. A fixed secret key is only needed for bootstrap nodes that need a stable peer ID — regular nodes work fine with the auto-generated key. See the Network chapter.

Step 2: Define Your Data Type

Streams in mosaik are typed. Any type implementing Serialize + DeserializeOwned + Send + 'static automatically implements the Datum trait and can be streamed:

use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
    sensor_id: u64,
    temperature: f64,
    timestamp: u64,
}

Step 3: Create a Producer

A producer publishes data for consumers across the network. The stream ID is derived from the type name by default.

let producer = network.streams().produce::<SensorReading>();

// Wait until at least one consumer connects
producer.when().subscribed().await;

Step 4: Send Data

Producer implements the futures::Sink trait. You can use SinkExt::send() for awaitable sends:

use futures::SinkExt;

producer.send(SensorReading {
    sensor_id: 1,
    temperature: 22.5,
    timestamp: 1700000000,
}).await?;

For non-blocking sends, use try_send():

producer.try_send(SensorReading {
    sensor_id: 1,
    temperature: 23.1,
    timestamp: 1700000001,
})?;

Step 5: Create a Consumer (On Another Node)

On a different node (or the same node for testing), create a consumer for the same type:

let other_network = Network::new(network_id).await?;

let mut consumer = other_network.streams().consume::<SensorReading>();

// Wait for subscription to a producer
consumer.when().subscribed().await;

Step 6: Receive Data

Consumer implements futures::Stream. Use StreamExt::next() to receive:

use futures::StreamExt;

while let Some(reading) = consumer.next().await {
    println!("Sensor {}: {}°C", reading.sensor_id, reading.temperature);
}

Or use the direct recv() method:

if let Some(reading) = consumer.recv().await {
    println!("Got reading: {:?}", reading);
}

Step 7: Discovery (Connecting the Nodes)

For nodes to find each other, they need at least one common peer address. In local testing, you can trigger manual discovery:

// On the consumer's network, dial the producer's address
other_network.discovery().sync_with(network.local().addr()).await?;

In production, you’ll configure bootstrap peers:

use mosaik::discovery;

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
    )
    .build()
    .await?;

Putting It All Together

Here’s the complete example as two async tasks simulating two nodes:

use futures::{SinkExt, StreamExt};
use mosaik::{Network, NetworkId};
use serde::{Deserialize, Serialize};
use std::pin::pin;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
    sensor_id: u64,
    temperature: f64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network_id: NetworkId = "sensor-network".into();

    // Node 1: Producer
    let node1 = Network::new(network_id).await?;
    let producer = node1.streams().produce::<SensorReading>();

    // Node 2: Consumer
    let node2 = Network::new(network_id).await?;
    let mut consumer = node2.streams().consume::<SensorReading>();

    // Connect the nodes
    node2.discovery().dial(node1.local().addr()).await?;

    // Wait for the stream to be established
    producer.when().online().await;

    // Send data
    for i in 0..10 {
        producer.send(SensorReading {
            sensor_id: 1,
            temperature: 20.0 + i as f64 * 0.5,
        }).await?;
    }

    // Receive data
    for _ in 0..10 {
        let reading = consumer.recv().await.unwrap();
        println!("{:?}", reading);
    }

    Ok(())
}

What’s Next

  • Tutorials — Walk through the included examples
  • Streams — Producer/consumer configuration, filtering, backpressure
  • Groups — Raft consensus and replicated state machines
  • Collections — Distributed Map, Vec, Set, Register, Once, PriorityQueue