Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Mosaik is a Rust runtime for building self-organizing, leaderless distributed systems. It is designed for trusted, permissioned networks — environments where all participating nodes are assumed honest, such as L2 blockchain infrastructure operated by a single organization.

What Mosaik Does

When you deploy mosaik-based binaries on arbitrary machines, the network self-organizes: nodes discover each other via gossip, infer the data-flow topology, elect leaders where needed, and converge to a stable operational state. Each node needs only two things to participate:

  1. A network ID — identifies which logical network to join
  2. ** (optionally) A bootstrap peer** — the peer ID of any node already on the network (the bootstrap example can be used in production as a universal bootstrap node). Mosaik provides automatic bootstrap by publishing peer identities and their network id associations in Mainline DHT for zero-config discovery.

A secret key is automatically generated on each run, giving the node a unique identity. Specifying a fixed secret key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts.

From these minimal inputs, mosaik handles peer discovery, typed pub/sub data streaming, Raft-based consensus groups, and replicated data structures — all automatically.

Design Philosophy

  • Not Byzantine fault tolerant. All members are assumed honest. This simplifies the protocol stack and enables higher throughput compared to BFT systems.
  • Self-organizing. No central coordinator, no manual topology configuration. Nodes find each other and form the right connections.
  • Built on modern networking. Uses iroh for QUIC-based peer-to-peer transport with relay support and hole-punching.
  • Composable primitives. Five subsystems (Network, Discovery, Streams, Groups, Collections) compose to support a wide range of distributed application patterns.

System Overview

┌─────────────────────────────────────────────────┐
│                   Network                       │
│  (QUIC endpoint, identity, protocol routing)    │
├──────────┬──────────┬──────────┬────────────────┤
│ Discovery│ Streams  │  Groups  │  Collections   │
│  gossip, │  typed   │   Raft   │  Map/Vec/Set/  │
│  catalog │ pub/sub  │ consensus│  Register/Once │
│          │          │  groups  │  PriorityQueue │
└──────────┴──────────┴──────────┴────────────────┘
  • Network is the entry point. It manages the QUIC transport, node identity, and composes all subsystems.
  • Discovery uses gossip to maintain a catalog of all peers, their capabilities, and their available streams/groups.
  • Streams provides typed, async pub/sub channels. Any serializable Rust type can be streamed between nodes.
  • Groups implements Raft consensus for clusters of nodes that need shared state and leader election.
  • Collections builds on Groups to offer replicated data structures (Map, Vec, Set, Register, Once, PriorityQueue) that stay synchronized across nodes.

Who Should Read This Book

This book serves two audiences:

  • Application developers building distributed systems with mosaik — start with Getting Started and the Tutorials.
  • Contributors to mosaik itself — the Architecture Deep Dives section covers protocol internals, Raft modifications, and design decisions.

Quick Example

A minimal mosaik node that joins a network and starts streaming data:

use mosaik::*;
use futures::SinkExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network_id: NetworkId = "my-app".into();
    let network = Network::new(network_id).await?;

    // The node is now online and discoverable
    println!("Node {} is online", network.local().id());

    // Create a typed producer (any serializable type works)
    let producer = network.streams().produce::<String>();

    // Wait for at least one consumer to connect
    producer.when().online().await;

    // Send data
    producer.send("hello, world".to_string()).await?;

    Ok(())
}

Overview

Mosaik is a Rust runtime for building self-organizing, leaderless distributed systems. It targets trusted, permissioned networks — environments where all participating nodes are controlled by the same organization and assumed to be honest.

Key Features

FeatureDescription
Self-organizingNodes discover each other via gossip and form the correct topology automatically
Typed pub/subStream any serializable Rust type between nodes with backpressure and filtering
Raft consensusForm availability groups with leader election and replicated state machines
Replicated collectionsDistributed Map, Vec, Set, Register, Once, and PriorityQueue with strong consistency
QUIC transportBuilt on iroh for modern, encrypted P2P networking
Relay supportNodes behind NAT can communicate via relay servers with automatic hole-punching

Module Map

Mosaik is organized into five composable subsystems:

┌──────────────────────────────────────────────────────┐
│                     Network                          │
│  Entry point. QUIC endpoint, identity, routing.      │
├─────────────┬─────────────┬─────────────┬────────────┤
│  Discovery  │   Streams   │   Groups    │Collections │
│  Gossip &   │  Typed      │  Raft       │ Replicated │
│  peer       │  pub/sub    │  consensus  │ Map, Vec,  │
│  catalog    │  channels   │  groups     │ Set, Reg,  │
│             │             │             │ Once, DEPQ │
└─────────────┴─────────────┴─────────────┴────────────┘

Network

The entry point to the SDK. Creates the QUIC endpoint, manages node identity (derived from a secret key), and composes all subsystems via ALPN-based protocol multiplexing.

Discovery

Gossip-based peer discovery. Maintains a catalog — a synchronized view of all known peers, their capabilities (tags), and their available streams and groups. Uses two complementary protocols: real-time gossip announcements and full catalog synchronization for catch-up.

Streams

Typed, async pub/sub data channels. Any Rust type implementing Serialize + DeserializeOwned + Send + 'static can be streamed. Producers publish data; consumers subscribe. Discovery automatically connects matching producers and consumers across the network.

Groups

Availability groups coordinated by a modified Raft consensus protocol. Nodes sharing a group key form a cluster, elect a leader, and replicate commands through a shared log. Custom state machines define application logic that runs deterministically on all group members.

Collections

Higher-level replicated data structures built on Groups. Each collection (Map, Vec, Set, Register, Once, PriorityQueue) creates its own Raft group with a specialized state machine.

Design Decisions

Trusted Network Assumption

Mosaik is not Byzantine fault tolerant. All nodes are assumed to be honest and correctly implementing the protocol. This assumption enables:

  • Simpler consensus (no need for 2/3 supermajority)
  • Higher throughput (fewer message rounds)
  • Simplified state sync (no fraud proofs needed)

This makes mosaik ideal for infrastructure controlled by a single organization, such as L2 chains, internal microservices, or distributed compute clusters.

Built on iroh

Mosaik uses iroh for its networking layer, which provides:

  • QUIC transport — multiplexed, encrypted connections
  • Relay servers — NAT traversal for nodes behind firewalls
  • mDNS — optional local network discovery
  • Endpoint identity — public keys as node identifiers

Installation

Requirements

  • Rust ≥ 1.89 (edition 2024)
  • A Unix-like OS (Linux, macOS) or Windows

Add to Your Project

Add mosaik to your Cargo.toml:

[dependencies]
mosaik = "0.2"

Mosaik pulls in its core dependencies automatically, including:

  • tokio — async runtime (full features)
  • iroh — QUIC-based P2P networking
  • serde — serialization framework
  • futuresStream and Sink traits

Common Additional Dependencies

Most mosaik applications will also want:

[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
futures = "0.3"
anyhow = "1"
  • tokio — you need the tokio runtime to run mosaik
  • serde with derive — for #[derive(Serialize, Deserialize)] on your data types
  • futures — for StreamExt / SinkExt traits on consumers and producers

Verify Installation

Create a minimal test to verify everything links correctly:

use mosaik::{Network, NetworkId};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network = Network::new("test-network".into()).await?;
    println!("Node online: {}", network.local().id());
    Ok(())
}
cargo run

If you see a node ID printed, mosaik is installed and working.

Feature Flags

Mosaik currently does not define any optional feature flags. All functionality is included by default.

Quick Start

This guide walks through a complete example: two nodes that discover each other and exchange typed data through a stream.

Step 1: Create the Network

Every mosaik application starts by creating a Network. The NetworkId ensures only nodes on the same logical network can communicate.

use mosaik::{Network, NetworkId};

let network_id: NetworkId = "my-app".into();
let network = Network::new(network_id).await?;

Network::new() creates a node with default settings:

  • A random secret key (new identity each run — this is the recommended default)
  • Default relay mode (uses iroh’s relay servers for NAT traversal)
  • No bootstrap peers (fine for local testing)
  • No tags

For production use, you’ll want Network::builder() to configure bootstrap peers so the node can find the network. A fixed secret key is only needed for bootstrap nodes that need a stable peer ID — regular nodes work fine with the auto-generated key. See the Network chapter.

Step 2: Define Your Data Type

Streams in mosaik are typed. Any type implementing Serialize + DeserializeOwned + Send + 'static automatically implements the Datum trait and can be streamed:

use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
    sensor_id: u64,
    temperature: f64,
    timestamp: u64,
}

Step 3: Create a Producer

A producer publishes data for consumers across the network. The stream ID is derived from the type name by default.

let producer = network.streams().produce::<SensorReading>();

// Wait until at least one consumer connects
producer.when().subscribed().await;

Step 4: Send Data

Producer implements the futures::Sink trait. You can use SinkExt::send() for awaitable sends:

use futures::SinkExt;

producer.send(SensorReading {
    sensor_id: 1,
    temperature: 22.5,
    timestamp: 1700000000,
}).await?;

For non-blocking sends, use try_send():

producer.try_send(SensorReading {
    sensor_id: 1,
    temperature: 23.1,
    timestamp: 1700000001,
})?;

Step 5: Create a Consumer (On Another Node)

On a different node (or the same node for testing), create a consumer for the same type:

let other_network = Network::new(network_id).await?;

let mut consumer = other_network.streams().consume::<SensorReading>();

// Wait for subscription to a producer
consumer.when().subscribed().await;

Step 6: Receive Data

Consumer implements futures::Stream. Use StreamExt::next() to receive:

use futures::StreamExt;

while let Some(reading) = consumer.next().await {
    println!("Sensor {}: {}°C", reading.sensor_id, reading.temperature);
}

Or use the direct recv() method:

if let Some(reading) = consumer.recv().await {
    println!("Got reading: {:?}", reading);
}

Step 7: Discovery (Connecting the Nodes)

For nodes to find each other, they need at least one common peer address. In local testing, you can trigger manual discovery:

// On the consumer's network, dial the producer's address
other_network.discovery().sync_with(network.local().addr()).await?;

In production, you’ll configure bootstrap peers:

use mosaik::discovery;

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
    )
    .build()
    .await?;

Putting It All Together

Here’s the complete example as two async tasks simulating two nodes:

use futures::{SinkExt, StreamExt};
use mosaik::{Network, NetworkId};
use serde::{Deserialize, Serialize};
use std::pin::pin;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
    sensor_id: u64,
    temperature: f64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network_id: NetworkId = "sensor-network".into();

    // Node 1: Producer
    let node1 = Network::new(network_id).await?;
    let producer = node1.streams().produce::<SensorReading>();

    // Node 2: Consumer
    let node2 = Network::new(network_id).await?;
    let mut consumer = node2.streams().consume::<SensorReading>();

    // Connect the nodes
    node2.discovery().dial(node1.local().addr()).await?;

    // Wait for the stream to be established
    producer.when().online().await;

    // Send data
    for i in 0..10 {
        producer.send(SensorReading {
            sensor_id: 1,
            temperature: 20.0 + i as f64 * 0.5,
        }).await?;
    }

    // Receive data
    for _ in 0..10 {
        let reading = consumer.recv().await.unwrap();
        println!("{:?}", reading);
    }

    Ok(())
}

What’s Next

  • Tutorials — Walk through the included examples
  • Streams — Producer/consumer configuration, filtering, backpressure
  • Groups — Raft consensus and replicated state machines
  • Collections — Distributed Map, Vec, Set, Register, Once, PriorityQueue

Architecture

This chapter describes how mosaik’s subsystems fit together, how protocols are multiplexed, and how the lifecycle of a node is managed.

Subsystem Composition

A Network is the top-level entry point. When built, it creates and composes four subsystems:

Network::builder(network_id)
    │
    ├── LocalNode          (QUIC endpoint, identity, lifecycle)
    ├── Discovery          (gossip announcements, catalog sync)
    ├── Streams            (typed pub/sub channels)
    └── Groups             (Raft consensus groups)
            └── Collections  (replicated Map, Vec, Set, Register, Once, PriorityQueue)

Each subsystem is created during Network::builder().build() and installed as a protocol handler on the iroh Router. The subsystems are then accessible via accessor methods:

let network = Network::builder(network_id).build().await?;

let local     = network.local();       // LocalNode
let discovery = network.discovery();   // Discovery
let streams   = network.streams();     // Streams
let groups    = network.groups();      // Groups

All handles are cheap to clone (they wrap Arc internally).

ALPN-Based Protocol Multiplexing

Mosaik multiplexes multiple protocols over a single QUIC endpoint using ALPN (Application-Layer Protocol Negotiation). Each subsystem registers its own ALPN identifier:

SubsystemALPNPurpose
Discovery (announce)/mosaik/announceReal-time gossip broadcasts
Discovery (sync)/mosaik/catalog-syncFull catalog exchange
Streams/mosaik/streams/1.0Pub/sub data channels
Groups/mosaik/groups/1Raft consensus, bonds, state sync

When a connection arrives, the iroh router inspects the ALPN and dispatches to the correct subsystem handler. This means all subsystems share the same QUIC endpoint and port.

Subsystems implement the ProtocolProvider trait to install their handlers:

// Internal trait — subsystems implement this
trait ProtocolProvider {
    fn install(self, router: &mut Router);
}

The Protocol trait defines the ALPN for typed links:

pub trait Protocol {
    const ALPN: &'static [u8];
}

Builder Pattern

Network uses a builder pattern for configuration:

let network = Network::builder(network_id)
    .with_secret_key(secret_key)           // Stable identity
    .with_relay_mode(RelayMode::Disabled)  // No relay servers
    .with_mdns_discovery(true)             // Local network discovery
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
            .with_tags("my-role")
    )
    .with_streams(
        streams::Config::builder()
            .with_backoff(ExponentialBackoff::default())
    )
    .with_groups(
        groups::Config::builder()
    )
    .build()
    .await?;

For quick prototyping, Network::new(network_id) uses all defaults.

Lifecycle & Shutdown

Mosaik uses tokio_util::sync::CancellationToken for structured lifecycle management. The token lives in LocalNode and propagates shutdown to all subsystems:

Network::drop()
    │
    ├── cancels LocalNode's CancellationToken
    │       │
    │       ├── Discovery workers shut down
    │       ├── Stream producer/consumer workers shut down
    │       ├── Group bond workers + Raft shut down
    │       └── iroh Router shuts down
    │
    └── all resources released

When a Network is dropped, the cancellation token is triggered, and all background tasks gracefully terminate. Each subsystem’s internal tasks are select-looped against the cancellation token, ensuring no orphaned tasks.

Internal Communication Patterns

Mosaik uses several recurring patterns internally:

Watch Channels

Status changes are broadcast via tokio::sync::watch channels. This enables the when() API:

// Wait for a stream producer to come online
producer.when().online().await;

// Wait for a Raft group leader to be elected
group.when().leader_elected().await;

// Wait for a collection to reach a specific version
collection.when().reaches(version).await;

Arc<Inner> Pattern

All public handles (Network, Discovery, Streams, Groups, Group, Producer, Consumer, Map, Vec, Register, Once, etc.) are cheap to clone. They wrap an Arc<Inner> containing the actual state, making them safe to share across tasks.

Task-Per-Connection

Each consumer subscription and each producer-subscriber pair runs in its own tokio task. This avoids head-of-line blocking — a slow consumer doesn’t affect other consumers, and a slow subscriber doesn’t block the producer.

Identity & Networking

Mosaik’s identity system is built on cryptographic keys and content-addressed hashing. Every identifier in the system — networks, peers, streams, groups, collections — is a 32-byte Digest (blake3 hash).

UniqueId: The Universal Identifier

At the core of mosaik’s identity system is Digest — a 32-byte blake3 hash that serves as the universal identifier type:

use mosaik::Digest;

// From a string (hashes the string if not valid hex)
let id: UniqueId = "my-network".into();

// From raw bytes
let id = UniqueId::from_bytes([0u8; 32]);

// Random
let id = UniqueId::random();

// Compile-time constants via the unique_id! macro
use mosaik::unique_id;

// From a 64-char hex string (decoded directly):
const HEX_ID: UniqueId = unique_id!(
    "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
);

// From any arbitrary string (blake3-hashed at compile time):
const NAMED_ID: UniqueId = unique_id!("my-stream-name");

// Deterministic derivation
let derived = id.derive("sub-identifier");

All the following types are aliases for Digest:

TypeAlias ForIdentifies
UniqueIdDigestGeneral-purpose unique identifier
NetworkIdUniqueIdA mosaik network (derived from name)
TagUniqueIdA capability or role label
StreamIdUniqueIdA data stream (derived from type name)
GroupIdUniqueIdA consensus group (derived from key + config)
StoreIdUniqueIdA replicated collection instance

PeerId: Node Identity

A PeerId is the node’s public key, derived from its secret key. It’s globally unique across all mosaik networks.

use mosaik::{Network, PeerId};
use iroh::SecretKey;

// Random identity (default when using Network::new)
let network = Network::new(network_id).await?;
let my_id: &PeerId = &network.local().id();

// Stable identity via explicit secret key
let secret = SecretKey::generate(&mut rand::rng());
let network = Network::builder(network_id)
    .with_secret_key(secret)
    .build()
    .await?;

For bootstrap nodes and other long-lived infrastructure, you should use a fixed secret key so the node’s PeerId (and therefore its address) remains stable across restarts.

NetworkId: Network Isolation

A NetworkId is a Digest derived from a name string. Nodes can only connect to peers sharing the same NetworkId:

use mosaik::NetworkId;

// These produce the same NetworkId
let id1: NetworkId = "my-app".into();
let id2: NetworkId = "my-app".into();
assert_eq!(id1, id2);

// Different name → different network → can't communicate
let other: NetworkId = "other-app".into();
assert_ne!(id1, other);

The NetworkId also drives automatic peer discovery: nodes sharing the same NetworkId find each other through the Mainline DHT without requiring any hardcoded bootstrap peers. Simply using the same network name is enough for nodes to connect.

Tags: Capability Labels

Tags are Digest values used to describe a node’s role or capabilities:

use mosaik::Tag;

let tag: Tag = "matcher".into();
let another: Tag = "validator".into();

Tags are advertised through the discovery system and can be used to filter which peers a producer accepts or which producers a consumer subscribes to:

// Only accept consumers that have the "authorized" tag
let producer = network.streams().producer::<Order>()
    .accept_if(|peer| peer.tags.contains(&"authorized".into()))
    .build()?;

StreamId: Stream Identity

By default, a StreamId is derived from the Rust type name:

use mosaik::StreamId;

// Automatically derived from the type name
let producer = network.streams().produce::<SensorReading>();

// Or set explicitly
let producer = network.streams().producer::<SensorReading>()
    .with_stream_id("custom-stream-name")
    .build()?;

GroupId: Group Identity

A GroupId is deterministically derived from multiple inputs:

GroupId = hash(
    GroupKey,
    ConsensusConfig,
    StateMachine::signature(),
    StateSync::signature()
)

This ensures that nodes with different configurations, different state machines, or different group secrets cannot accidentally join the same group.

Endpoint Addresses

Nodes are addressed using EndpointAddr from iroh, which encodes the public key and optional relay URL. This is what you pass to bootstrap peers:

let addr = network.local().addr();
// addr contains: PeerId + relay URL + direct addresses

Self-Organization

One of mosaik’s defining features is that nodes self-organize into the correct topology without manual configuration. This chapter explains how that works step by step.

The Self-Organization Loop

When a new node joins the network, the following sequence happens automatically:

1. Bootstrap       Node connects to a known bootstrap peer
                       │
2. Gossip          Announce protocol broadcasts presence
                       │
3. Catalog Sync    Full catalog exchange with bootstrap peer
                       │
4. Discovery       Node learns about all other peers, their
                   tags, streams, and groups
                       │
5. Streams         Consumer discovers matching producers,
                   opens subscriptions automatically
                       │
6. Groups          Node finds peers with matching group keys,
                   forms bonds, joins Raft cluster
                       │
7. Convergence     Network reaches a stable topology where
                   all nodes are connected to the right peers

Step 1: Bootstrap & Gossip

A new node starts with at least one bootstrap peer address. It connects and begins participating in the gossip protocol:

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
            .with_tags("matcher")
    )
    .build()
    .await?;

The node immediately:

  • Announces itself via the gossip protocol (/mosaik/announce), broadcasting its PeerEntry (identity, tags, streams, groups)
  • Receives announcements from other peers
  • Triggers a full catalog sync (/mosaik/catalog-sync) with the bootstrap peer to catch up on all known peers

Step 2: Catalog Convergence

The catalog converges through two complementary protocols:

Real-time: Gossip Announcements

Every node periodically re-announces its PeerEntry via iroh-gossip. The announce interval is configurable (default: 15 seconds) with jitter to avoid thundering herds:

announce_interval = 15s
announce_jitter = 0.5  →  actual interval: 7.5s – 22.5s

When a node changes (adds a tag, creates a stream, joins a group), it re-announces immediately.

Catch-up: Full Catalog Sync

When a new node connects, it performs a bidirectional catalog sync with its peer. Both nodes exchange their complete catalogs, and entries are merged:

Node A                          Node B
  │                               │
  │── CatalogSyncRequest ───────► │
  │                               │
  │◄── CatalogSyncResponse ──────│
  │                               │
  │  (both merge received         │
  │   entries into local catalog) │

Signed Entries

Each PeerEntry is cryptographically signed by its owner. This proves authenticity — you can trust that a peer entry’s tags, streams, and groups are genuine, even when received via gossip through intermediaries.

Staleness & Purging

Entries that haven’t been updated within the purge_after duration (default: 300 seconds) are considered stale and hidden from the public catalog API. This ensures departed nodes are eventually removed.

Step 3: Automatic Stream Connections

Once discovery populates the catalog, the Streams subsystem automatically connects producers and consumers:

1. Node A creates Producer<Order>
   → Discovery advertises: "I produce stream 'Order'"

2. Node B creates Consumer<Order>
   → Discovery observes: "Node A produces 'Order'"
   → Consumer worker opens subscription to Node A

3. Data flows: Node A ──[Order]──► Node B

This is fully automatic. The consumer’s background worker monitors the catalog for matching producers and establishes connections as they appear.

Filtering can restrict which connections form:

// Producer only accepts nodes tagged "authorized"
let producer = network.streams().producer::<Order>()
    .accept_if(|peer| peer.tags.contains(&"authorized".into()))
    .build()?;

// Consumer only subscribes to nodes tagged "primary"
let consumer = network.streams().consumer::<Order>()
    .subscribe_if(|peer| peer.tags.contains(&"primary".into()))
    .build();

Step 4: Automatic Group Formation

Groups form through a similar discovery-driven process:

1. Node A joins group with key K
   → Discovery advertises: "I'm in group G" (where G = hash(K, config, ...))

2. Node B joins group with same key K
   → Discovery observes: "Node A is in group G"
   → Bond worker opens connection to Node A

3. Mutual handshake proves both know key K
   → Bond established

4. Raft consensus begins
   → Leader elected among bonded peers
   → Commands can be replicated

The bond handshake uses HMAC over the TLS session secrets combined with the group key. This proves knowledge of the group secret without transmitting it.

Step 5: Late Joiners

A node joining an already-running network catches up automatically:

Stream Catch-up

Consumers connecting to an active producer receive data from the point of subscription. There’s no historical replay — streams are real-time.

Group Catch-up

A node joining an existing Raft group:

  1. Forms bonds with existing members
  2. Receives the current log from peers (distributed across multiple peers for efficiency)
  3. Applies all log entries to bring its state machine up to date
  4. Begins participating normally (can vote, can become leader)

For collections, catch-up can use either log replay or snapshot sync:

  • Log replay (default): Replays the entire log from the beginning
  • Snapshot sync: Transfers a point-in-time snapshot of the state, avoiding log replay for large states

Topology Example

Consider a distributed trading system with three roles:

Tags: "trader"          Tags: "matcher"         Tags: "reporter"
┌──────────┐            ┌──────────┐            ┌──────────┐
│  Node 1  │            │  Node 3  │            │  Node 5  │
│ Producer │──[Order]──►│ Consumer │            │ Consumer │
│ <Order>  │            │ <Order>  │            │  <Fill>  │
└──────────┘            │          │            └──────────┘
                        │ Group:   │──[Fill]──►      ▲
┌──────────┐            │ OrderBook│            ┌──────────┐
│  Node 2  │            │ (Raft)   │            │  Node 6  │
│ Producer │──[Order]──►│          │            │ Consumer │
│ <Order>  │            │ Producer │──[Fill]──►│  <Fill>  │
└──────────┘            │  <Fill>  │            └──────────┘
                        └──────────┘
                        ┌──────────┐
                        │  Node 4  │
                        │ (matcher │
                        │  replica)│
                        └──────────┘

All of this topology forms automatically from:

  • Node 1–2: with_tags("trader"), creates Producer<Order>
  • Node 3–4: with_tags("matcher"), creates Consumer<Order>, joins orderbook group, creates Producer<Fill>
  • Node 5–6: with_tags("reporter"), creates Consumer<Fill>

No node needs to know the addresses of any other node except one bootstrap peer.

Building a Bootstrap Node

This tutorial walks through the bootstrap example — a ready-to-use bootstrap node for any mosaik network.

What Is a Bootstrap Node?

A bootstrap node is the first peer that other nodes connect to when joining a network. It serves as the initial discovery point — once a new node connects to a bootstrap peer, the gossip protocol takes over and the joining node learns about all other peers.

Bootstrap nodes are typically:

  • Long-lived — they run continuously
  • Stable identity — they use a fixed secret key so their address doesn’t change across restarts
  • Well-known — their address is configured as a bootstrap peer by other nodes

A bootstrap node doesn’t need any special code — it’s just a regular mosaik node that stays online. The bootstrap example can be used in production as a universal bootstrap node for any mosaik network. The example adds CLI configuration with clap.

Project Setup

The bootstrap example is a single file at examples/bootstrap.rs. It uses these dependencies:

[dependencies]
mosaik = "0.2"
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1"

The CLI Interface

The example uses clap derive to define command-line options:

use clap::{ArgAction, Parser};
use mosaik::*;

#[derive(Debug, Parser)]
struct Opts {
    /// The secret key for stable identity across restarts
    #[clap(short, long, env = "MOSAIK_BOOTSTRAP_SECRET")]
    secret: Option<SecretKey>,

    /// The network ID (hex string or seed that gets hashed)
    #[clap(short, long, env = "MOSAIK_BOOTSTRAP_NETWORK_ID")]
    network_id: Option<NetworkId>,

    /// Other bootstrap nodes to connect to on startup
    #[clap(short, long, env = "MOSAIK_BOOTSTRAP_PEERS")]
    peers: Vec<PeerId>,

    /// Tags to advertise (default: "bootstrap")
    #[clap(short, long, default_value = "bootstrap",
           env = "MOSAIK_BOOTSTRAP_TAGS")]
    tags: Vec<Tag>,

    /// Disable relay servers (node must be directly reachable)
    #[clap(long, default_value_t = false)]
    no_relay: bool,

    /// Verbose output (-v debug, -vv trace)
    #[clap(short, action = ArgAction::Count)]
    verbose: u8,

    /// Suppress all output
    #[clap(short, long)]
    quiet: bool,
}

Every option also supports environment variables (MOSAIK_BOOTSTRAP_*), making it easy to configure in containers or systemd services.

Secret Key Handling

The secret key determines the node’s PeerId. For a bootstrap node, a stable identity is essential so that other nodes can reliably find it:

fn parse_secret_key(s: &str) -> Result<SecretKey, Infallible> {
    let bytes = Digest::from(s);
    Ok(SecretKey::from_bytes(bytes.as_bytes()))
}

This parser accepts two formats:

  • 64-character hex string — used directly as the secret key bytes
  • Any other string — treated as a seed, hashed with blake3 into a deterministic key

So --secret=my-bootstrap-1 always produces the same key, making deployment reproducible.

Building the Network

The main function assembles the Network using the builder:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let opts = Opts::parse();

    let secret = opts.secret.unwrap_or_else(|| {
        tracing::warn!("No secret key provided, generating random key");
        SecretKey::generate(&mut rand::rng())
    });

    let network_id = opts.network_id.unwrap_or_else(|| {
        tracing::warn!("No network id provided, generating random network id");
        NetworkId::random()
    });

    let mut builder = Network::builder(network_id)
        .with_secret_key(secret)
        .with_discovery(
            discovery::Config::builder()
                .with_tags(opts.tags.clone())
                .with_bootstrap(opts.peers.clone()),
        );

    if opts.no_relay {
        builder = builder.with_relay_mode(iroh::RelayMode::Disabled);
    }

    let network = builder.build().await?;

    tracing::info!("Bootstrap node started");
    tracing::info!("Public Id: {}", network.local().id());
    tracing::info!("Network Id: {:?}", network.network_id());

    // Stay alive forever
    core::future::pending::<()>().await;
    Ok(())
}

Key points:

  1. with_secret_key() — sets the stable identity
  2. with_discovery() — configures tags and initial peers to dial
  3. with_relay_mode(Disabled) — optional, for nodes with direct connectivity
  4. core::future::pending() — keeps the process alive (the node runs in background tasks)

Running It

# Basic usage with a seed-based secret
cargo run --example bootstrap -- \
    --network-id=my-network \
    --secret=my-bootstrap-secret

# With environment variables
MOSAIK_BOOTSTRAP_SECRET=my-secret \
MOSAIK_BOOTSTRAP_NETWORK_ID=my-network \
cargo run --example bootstrap

# Multiple bootstrap nodes that know each other
cargo run --example bootstrap -- \
    --network-id=my-network \
    --secret=node-1 \
    --peers=<peer-id-of-node-2>

Using the Bootstrap Node

Other nodes reference the bootstrap node’s address when joining the network:

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
    )
    .build()
    .await?;

The joining node connects to the bootstrap peer, performs a full catalog sync, and then discovers all other nodes through gossip. From that point on, the joining node is a full participant — it doesn’t need the bootstrap node for ongoing operation.

Key Takeaways

  1. Bootstrap nodes are just regular nodes — no special server code needed
  2. Stable identity via secret key — ensures the address doesn’t change across restarts
  3. Tags for discoverability — the "bootstrap" tag lets other nodes identify bootstrap peers
  4. Minimal configuration — a secret key and network ID are all that’s required

Building a Distributed Orderbook

This tutorial walks through the orderbook example — a distributed order-matching engine that combines Streams, Groups, and Raft consensus.

Architecture Overview

The orderbook example demonstrates how multiple mosaik subsystems compose to build a realistic distributed system:

         Traders                  Matchers                 Observers
    ┌────────────┐          ┌──────────────────┐      ┌────────────┐
    │  Trader A  │          │    Matcher 0     │      │  Observer  │
    │  Producer  │─[Order]─►│    Consumer      │      │  Consumer  │
    │  <Order>   │          │    <Order>       │      │  <Fill>    │
    └────────────┘          │                  │      └────────────┘
                            │  ┌────────────┐  │           ▲
    ┌────────────┐          │  │  OrderBook │  │           │
    │  Trader B  │          │  │  (Raft SM) │  │──[Fill]───┘
    │  Producer  │─[Order]─►│  │            │  │
    │  <Order>   │          │  └────────────┘  │
    └────────────┘          │    Producer      │
                            │    <Fill>        │
                            └──────────────────┘
                            ┌──────────────────┐
                            │    Matcher 1     │
                            │    (replica)     │
                            └──────────────────┘
                            ┌──────────────────┐
                            │    Matcher 2     │
                            │    (replica)     │
                            └──────────────────┘
  1. Traders produce Order objects via Streams
  2. Matchers (a 3-node Raft group) consume orders, replicate them through consensus, and run a price-time priority matching engine
  3. Fill events are published back as a stream for downstream consumers

Step 1: Define the Domain Types

First, define the types that flow through the system. These types auto-implement the Datum trait for streaming:

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TradingPair {
    pub base: String,
    pub quote: String,
}

impl TradingPair {
    pub fn new(base: &str, quote: &str) -> Self {
        Self { base: base.to_string(), quote: quote.to_string() }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Side { Bid, Ask }

/// Price in basis points (1 unit = 0.01). e.g., 300_000 = $3000.00
pub type Price = u64;
pub type Quantity = u64;

/// A limit order — streamable via mosaik Streams
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Order {
    pub id: u64,
    pub pair: TradingPair,
    pub side: Side,
    pub price: Price,
    pub quantity: Quantity,
    pub trader: String,
}

/// A fill produced when orders match
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Fill {
    pub bid_order_id: u64,
    pub ask_order_id: u64,
    pub pair: TradingPair,
    pub price: Price,
    pub quantity: Quantity,
}

Because Order and Fill derive Serialize + Deserialize, they automatically implement Datum and can be used with mosaik Streams.

Step 2: Implement the State Machine

The heart of the example is the OrderBook state machine. It implements the StateMachine trait so it can be replicated across all group members via Raft:

use mosaik::groups::{ApplyContext, LogReplaySync, StateMachine};
use mosaik::primitives::UniqueId;
use std::collections::BTreeMap;

/// Commands that mutate the orderbook
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookCommand {
    PlaceOrder(Order),
    CancelOrder(u64),
}

/// Read-only queries against the orderbook state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQuery {
    TopOfBook { pair: TradingPair, depth: usize },
    Fills,
    OrderCount,
}

/// Query results
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQueryResult {
    TopOfBook { bids: Vec<(Price, Quantity)>, asks: Vec<(Price, Quantity)> },
    Fills(Vec<Fill>),
    OrderCount(usize),
}

The StateMachine trait requires four associated types:

  • Command — mutations that get replicated through Raft (must be serializable)
  • Query — read requests (not replicated)
  • QueryResult — responses to queries
  • StateSync — mechanism for catching up new nodes

And three core methods:

impl StateMachine for OrderBook {
    type Command = OrderBookCommand;
    type Query = OrderBookQuery;
    type QueryResult = OrderBookQueryResult;
    type StateSync = LogReplaySync<Self>;

    fn signature(&self) -> UniqueId {
        // Unique identifier for this state machine type.
        // Contributes to GroupId derivation — different state machines
        // produce different GroupIds even with the same key.
        UniqueId::from("orderbook_state_machine")
    }

    fn apply(&mut self, command: Self::Command, _ctx: &dyn ApplyContext) {
        match command {
            OrderBookCommand::PlaceOrder(order) => self.place_order(order),
            OrderBookCommand::CancelOrder(id) => self.cancel_order(id),
        }
    }

    fn query(&self, query: Self::Query) -> Self::QueryResult {
        match query {
            OrderBookQuery::TopOfBook { pair: _, depth } => {
                // Return top N price levels
                // ...
            }
            OrderBookQuery::Fills => {
                OrderBookQueryResult::Fills(self.fills.clone())
            }
            OrderBookQuery::OrderCount => {
                let count = self.bids.values()
                    .chain(self.asks.values())
                    .map(Vec::len).sum();
                OrderBookQueryResult::OrderCount(count)
            }
        }
    }

    fn state_sync(&self) -> Self::StateSync {
        LogReplaySync::default()
    }
}

Key points:

  • apply() is called on every node in the same order — this is what Raft guarantees. The matching logic must be deterministic.
  • query() reads local state without going through Raft. With Consistency::Strong, the query is forwarded to the leader.
  • LogReplaySync::default() means new nodes catch up by replaying the entire command log from the beginning.

Step 3: The Matching Engine

The OrderBook implements price-time priority matching. When a new order arrives, it crosses against the opposite side:

impl OrderBook {
    fn match_order(&mut self, order: &Order) -> Quantity {
        let mut remaining = order.quantity;

        match order.side {
            Side::Bid => {
                // Bids match against asks at or below the bid price
                while remaining > 0 {
                    let Some((&ask_price, _)) = self.asks.first_key_value()
                        else { break };
                    if ask_price > order.price { break; }

                    let ask_level = self.asks.get_mut(&ask_price).unwrap();
                    while remaining > 0 && !ask_level.is_empty() {
                        let (ask_id, ask_qty, _) = &mut ask_level[0];
                        let fill_qty = remaining.min(*ask_qty);

                        self.fills.push(Fill {
                            bid_order_id: order.id,
                            ask_order_id: *ask_id,
                            pair: self.pair.clone(),
                            price: ask_price,
                            quantity: fill_qty,
                        });

                        remaining -= fill_qty;
                        *ask_qty -= fill_qty;
                        if *ask_qty == 0 { ask_level.remove(0); }
                    }
                    if ask_level.is_empty() {
                        self.asks.remove(&ask_price);
                    }
                }
            }
            Side::Ask => {
                // Asks match against bids at or above the ask price
                // (mirror logic, iterating bids from highest)
                // ...
            }
        }
        remaining
    }

    fn place_order(&mut self, order: Order) {
        let remaining = self.match_order(&order);
        // Any unfilled quantity rests on the book
        if remaining > 0 {
            let book = match order.side {
                Side::Bid => &mut self.bids,
                Side::Ask => &mut self.asks,
            };
            book.entry(order.price)
                .or_default()
                .push((order.id, remaining, order.trader));
        }
    }
}

Because apply() is called in the same order on every replica (guaranteed by Raft), the matching results are identical across all nodes.

Step 4: Wire It All Together

The main function creates the network, forms the Raft group, and connects streams:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network_id = NetworkId::random();
    let group_key = GroupKey::random();
    let pair = TradingPair::new("ETH", "USDC");

    // --- 3 matcher nodes forming a Raft group ---
    let matcher0 = Network::new(network_id).await?;
    let matcher1 = Network::new(network_id).await?;
    let matcher2 = Network::new(network_id).await?;

    // Cross-discover all matchers
    discover_all([&matcher0, &matcher1, &matcher2]).await?;

    // Each joins the same group with an OrderBook state machine
    let g0 = matcher0.groups().with_key(group_key)
        .with_state_machine(OrderBook::new(pair.clone()))
        .join();
    let g1 = matcher1.groups().with_key(group_key)
        .with_state_machine(OrderBook::new(pair.clone()))
        .join();
    let g2 = matcher2.groups().with_key(group_key)
        .with_state_machine(OrderBook::new(pair.clone()))
        .join();

    // Wait for consensus to elect a leader
    g0.when().online().await;
    g1.when().online().await;
    g2.when().online().await;

Connecting Traders via Streams

    // --- 2 trader nodes producing orders ---
    let trader_a = Network::new(network_id).await?;
    let trader_b = Network::new(network_id).await?;
    discover_all([&trader_a, &trader_b, &matcher0, &matcher1, &matcher2]).await?;

    // Traders produce Order streams
    let mut orders_a = trader_a.streams().produce::<Order>();
    let mut orders_b = trader_b.streams().produce::<Order>();

    // Matcher consumes orders
    let mut order_consumer = matcher0.streams().consume::<Order>();
    order_consumer.when().subscribed().minimum_of(2).await;

Submitting and Matching Orders

    // Trader A: asks (selling ETH)
    orders_a.send(Order {
        id: 1, pair: pair.clone(), side: Side::Ask,
        price: 300_000, quantity: 10, trader: "alice".into(),
    }).await?;

    // Trader B: bids (buying ETH)
    orders_b.send(Order {
        id: 4, pair: pair.clone(), side: Side::Bid,
        price: 300_500, quantity: 15, trader: "bob".into(),
    }).await?;

    // Consume orders from stream and execute through Raft
    for _ in 0..4 {
        let order = order_consumer.next().await
            .expect("expected order from stream");
        g0.execute(OrderBookCommand::PlaceOrder(order)).await?;
    }

Querying Results

    // Query fills (strong consistency — goes through leader)
    let result = g0.query(OrderBookQuery::Fills, Consistency::Strong).await?;
    if let OrderBookQueryResult::Fills(fills) = result.result() {
        for fill in fills {
            println!("{fill}");
        }
    }

    // Verify replication to followers
    g1.when().committed().reaches(g0.committed()).await;
    let follower_result = g1.query(
        OrderBookQuery::Fills, Consistency::Weak
    ).await?;

Key patterns demonstrated:

  • execute() — sends a command through Raft and waits for it to be committed on a quorum
  • query(..., Strong) — reads through the leader for linearizable results
  • query(..., Weak) — reads local state (faster, but may be stale)
  • when().committed().reaches(n) — waits for a follower to catch up to a specific commit index

The Helper: Cross-Discovery

The example includes a utility to fully connect all nodes:

async fn discover_all(
    networks: impl IntoIterator<Item = &Network>,
) -> anyhow::Result<()> {
    let networks = networks.into_iter().collect::<Vec<_>>();
    for (i, net_i) in networks.iter().enumerate() {
        for (j, net_j) in networks.iter().enumerate() {
            if i != j {
                net_i.discovery().sync_with(net_j.local().addr()).await?;
            }
        }
    }
    Ok(())
}

In production, this pairwise sync is unnecessary — a single bootstrap peer handles discovery via gossip. This utility is for testing where all nodes start simultaneously.

Running the Example

cd examples/orderbook
cargo run

Expected output:

waiting for matching engine group to come online...
matching engine online, leader: <peer-id>
matcher subscribed to both trader order streams
submitting orders...
received order: alice ASK ETH/USDC@300000 qty=10
received order: alice ASK ETH/USDC@301000 qty=5
received order: bob BID ETH/USDC@299000 qty=8
received order: bob BID ETH/USDC@300500 qty=15
1 fills produced:
  Fill(bid=4, ask=1, ETH/USDC@300000, qty=10)
follower g1 sees 1 fills (consistent with leader)
orderbook example complete

Key Takeaways

  1. Streams for data ingestion — orders flow from traders to the matching engine via typed pub/sub
  2. Raft for consensus — the OrderBook state machine runs on all replicas with identical results because Raft guarantees the same command order
  3. execute() for writes, query() for reads — clean separation between mutations (replicated) and reads (local or forwarded)
  4. Automatic catch-up — new replicas replay the command log to reach the current state
  5. Composability — Streams + Groups + StateMachine combine naturally for real distributed applications

Network

The Network is the primary entry point to the mosaik SDK. It creates the QUIC transport layer, establishes node identity, and composes all subsystems (Discovery, Streams, Groups) into a single cohesive runtime.

Creating a Network

Quick Start

For prototyping, Network::new() creates a node with default settings:

use mosaik::{Network, NetworkId};

let network = Network::new("my-app".into()).await?;

This creates a node with:

  • A random secret key (new identity each run)
  • Default relay mode (iroh’s relay servers for NAT traversal)
  • No bootstrap peers
  • No tags
  • Default configs for all subsystems

Builder Pattern

For production use, the builder provides full control:

use mosaik::{Network, NetworkId, discovery, streams, groups};
use iroh::SecretKey;

let network = Network::builder("my-app".into())
    .with_secret_key(my_secret_key)
    .with_relay_mode(iroh::RelayMode::Disabled)
    .with_mdns_discovery(true)
    .with_addresses(bind_addrs)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
            .with_tags("my-role")
            .with_purge_after(Duration::from_secs(600))
    )
    .with_streams(
        streams::Config::builder()
    )
    .with_groups(
        groups::Config::builder()
    )
    .build()
    .await?;

Builder Options

MethodTypeDefaultDescription
with_secret_key()SecretKeyRandomNode identity (determines PeerId)
with_relay_mode()RelayModeDefaultNAT traversal via relay servers
with_mdns_discovery()boolfalseLocal network mDNS peer discovery
with_addresses()BTreeSet<SocketAddr>EmptyExplicit bind addresses
with_discovery()ConfigBuilderDefaultsDiscovery subsystem configuration
with_streams()ConfigBuilderDefaultsStreams subsystem configuration
with_groups()ConfigBuilderDefaultsGroups subsystem configuration

Accessing Subsystems

Once built, the Network provides access to all subsystems:

// Transport & identity
let local = network.local();
let peer_id = network.local().id();
let addr = network.local().addr();
let network_id = network.network_id();

// Subsystems
let discovery = network.discovery();
let streams = network.streams();
let groups = network.groups();

All handles are cheap to clone (Arc<Inner> internally).

LocalNode

LocalNode represents the local node’s transport and identity:

let local = network.local();

// Identity
let peer_id = local.id();          // Public key
let secret = local.secret_key();   // Secret key
let network_id = local.network_id();

// Address (for sharing with others)
let addr = local.addr();  // EndpointAddr: public key + relay URL + addrs

// Readiness
local.online().await;  // Blocks until the endpoint is ready

// Low-level access to iroh endpoint
let endpoint = local.endpoint();

Waiting for Readiness

After building, you can wait for the node to be fully online:

network.online().await;

This resolves once the iroh endpoint is ready and all subsystem handlers are installed. The build() method already waits for this, so online() is mainly useful when you have a cloned network handle.

Lifecycle & Shutdown

When a Network is dropped, it cancels the internal CancellationToken, which propagates shutdown to all subsystems:

{
    let network = Network::new(network_id).await?;
    // Node is running...
} // Network dropped here → all tasks cancelled

For long-running services, keep the network handle alive:

let network = Network::new(network_id).await?;
// ... set up streams, groups, etc.
core::future::pending::<()>().await;  // Block forever

Under the hood, all communication happens through Link<P> — a typed, framed, bidirectional QUIC stream:

pub struct Link<P: Protocol> { /* ... */ }

Links provide:

  • Length-delimited framing via LengthDelimitedCodec
  • Serialization via postcard (compact binary format)
  • Type safety via the Protocol trait and generic parameter
// Open a link to a remote peer
let link = Link::<MyProtocol>::open(local, remote_addr).await?;

// Send and receive typed messages
link.send(MyMessage { /* ... */ }).await?;
let response: MyResponse = link.recv().await?;

// Split into independent halves
let (sender, receiver) = link.split();

// Close with a typed reason
link.close(MyCloseReason::Success).await?;

Most users won’t use Link directly — it’s the building block that Streams, Groups, and Discovery use internally.

Error Handling

Network construction can fail with:

ErrorCause
MissingNetworkIdNetwork ID not provided
Bind(BindError)Failed to bind the QUIC endpoint
InvalidAddressInvalid socket address in configuration
DiscoveryConfig(...)Invalid discovery configuration
StreamsConfig(...)Invalid streams configuration
GroupsConfig(...)Invalid groups configuration

Connection-level errors use typed close reasons:

CodeNameMeaning
200SuccessProtocol completed normally
204GracefulShutdownClean shutdown
100InvalidAlpnWrong protocol identifier
101DifferentNetworkPeer on a different network
102CancelledOperation cancelled
400ProtocolViolationMessage deserialization failed
401UnknownPeerPeer not in discovery catalog

Discovery

The Discovery subsystem handles gossip-based peer discovery and catalog synchronization. It maintains a network-wide view of all peers, their capabilities, and their available streams and groups.

Overview

Discovery uses three complementary mechanisms:

MechanismTransportPurpose
DHT BootstrapMainline DHT (pkarr)Automatic peer discovery via shared NetworkId
Announce/mosaik/announceReal-time gossip broadcasts via iroh-gossip
Catalog Sync/mosaik/catalog-syncFull bidirectional catalog exchange for catch-up

Nodes sharing the same NetworkId automatically discover each other through the DHT — no hardcoded bootstrap peers are required. Once an initial connection is established, the Announce and Catalog Sync protocols take over for real-time updates.

See the DHT Bootstrap sub-chapter for details on the automatic discovery mechanism.

Configuration

Configure discovery through the Network builder:

use mosaik::{Network, discovery};
use std::time::Duration;

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .with_bootstrap(bootstrap_addr)
            .with_tags("matcher")
            .with_tags(["validator", "signer"])  // additive
            .with_purge_after(Duration::from_secs(600))
            .with_announce_interval(Duration::from_secs(10))
            .with_announce_jitter(0.3)
            .with_max_time_drift(Duration::from_secs(5))
            .with_events_backlog(200)
    )
    .build()
    .await?;

Configuration Options

FieldDefaultDescription
bootstrap_peers[]Initial peers to connect to on startup
tags[]Tags to advertise about this node
announce_interval15sHow often to re-announce via gossip
announce_jitter0.5Max jitter factor (0.0–1.0) for announce timing
purge_after300sDuration after which stale entries are purged
max_time_drift10sMaximum acceptable clock drift between peers
events_backlog100Past events retained in event broadcast channel
dht_publish_interval300sHow often to publish to the DHT (None to disable)
dht_poll_interval60sHow often to poll the DHT for peers (None to disable)

Both with_bootstrap() and with_tags() are additive — calling them multiple times adds to the list.

Accessing Discovery

let discovery = network.discovery();

The Discovery handle is cheap to clone.

Core API

Catalog Access

// Get current snapshot of all known peers
let catalog = discovery.catalog();
for (peer_id, entry) in catalog.iter() {
    println!("{}: tags={:?}", peer_id, entry.tags);
}

// Watch for catalog changes
let mut watch = discovery.catalog_watch();
loop {
    watch.changed().await?;
    let catalog = watch.borrow();
    println!("Catalog updated: {} peers", catalog.len());
}

Dialing Peers

// Connect to bootstrap peers manually
discovery.dial(bootstrap_addr);

// Dial multiple peers
discovery.dial([addr1, addr2, addr3]);

Manual Sync

// Trigger a full catalog sync with a specific peer
discovery.sync_with(peer_addr).await?;

Managing Tags

// Add tags at runtime
discovery.add_tags("new-role");
discovery.add_tags(["role-a", "role-b"]);

// Remove tags
discovery.remove_tags("old-role");

Changing tags triggers an immediate re-announcement to the network.

Local Entry

// Get this node's signed entry (as others see it)
let my_entry = discovery.me();
println!("I am: {:?}", my_entry);

Unsigned Entries

For testing or manual feeds, you can insert entries that aren’t cryptographically signed:

use mosaik::discovery::PeerEntry;

// Insert an unsigned entry (local-only, not gossiped)
discovery.insert(PeerEntry { /* ... */ });

// Remove a specific peer
discovery.remove(peer_id);

// Clear all unsigned entries
discovery.clear_unsigned();

Injecting Signed Entries

// Feed a signed entry from an external source
let success = discovery.feed(signed_peer_entry);

Events

Subscribe to discovery lifecycle events:

let mut events = discovery.events();
while let Ok(event) = events.recv().await {
    match event {
        Event::PeerDiscovered(entry) => {
            println!("New peer: {}", entry.peer_id());
        }
        Event::PeerUpdated(entry) => {
            println!("Peer updated: {}", entry.peer_id());
        }
        Event::PeerDeparted(peer_id) => {
            println!("Peer left: {}", peer_id);
        }
    }
}

See the Events sub-chapter for details.

How It Works

Announce Protocol

Every node periodically broadcasts its SignedPeerEntry via iroh-gossip. The announce includes:

  • Node identity (PeerId)
  • Network ID
  • Tags
  • Available streams and groups
  • Version (start + update timestamps)

The jitter on the announce interval prevents all nodes from announcing simultaneously. When a node changes its metadata (adds a tag, creates a stream, joins a group), it re-announces immediately.

Catalog Sync Protocol

When a new node connects to a peer, they exchange their full catalogs bidirectionally. This ensures a new node quickly learns about all existing peers without waiting for gossip cycles.

Signed vs Unsigned Entries

PropertySignedUnsigned
SourceCreated by the peer itselfInjected locally
VerificationCryptographic signatureNone
GossipYes — propagated network-wideNo — local only
Use caseNormal operationTesting, manual feeds

Staleness Detection

Each entry has a two-part version: (start_timestamp, update_timestamp). If update_timestamp falls behind the current time by more than purge_after, the entry is hidden from the public catalog API and eventually removed.

See the Catalog sub-chapter for the full catalog API.

DHT Bootstrap

Mosaik includes an automatic peer discovery mechanism based on the Mainline DHT (the same DHT used by BitTorrent). Nodes sharing the same NetworkId automatically discover each other — no hardcoded bootstrap peers are needed.

How It Works

Each node derives a DHT key from its NetworkId and uses it to both publish its own address and poll for other nodes’ addresses. The general flow is:

  1. Publish — The node resolves the current DHT record for the network key, appends its own address, and publishes the updated record back to the DHT.
  2. Poll — The node periodically reads the DHT record and dials any peers it finds.

Because both publish and poll use the same deterministic key derived from the NetworkId, all nodes in the same network naturally converge on the same record.

Record Structure

Each DHT record is a pkarr signed packet containing DNS resource records. Every peer in the record has:

  • An A record with a unique subdomain to identify the peer
  • A TXT record with a peers=N field indicating how many peers that node currently knows about

Capacity Management

Mainline DHT records are limited to 1000 bytes. To stay within this limit, a maximum of 12 peers are stored per record. When the record is full and a new node needs to publish, the least-connected peer (the one with the lowest peers=N count) is evicted. Ties are broken randomly to avoid deterministic starvation.

Evicted peers are not forgotten — they are dialed directly so they can still join the network through normal gossip-based discovery.

Adaptive Polling

Polling uses an adaptive interval:

  • Aggressive polling (5 seconds) — when the node has no known peers yet, it polls frequently to bootstrap quickly.
  • Relaxed polling (configurable, default 60 seconds) — once the node has discovered at least one peer, polling slows down to reduce DHT load.

Publish Cycle

Publishing runs on a slower interval (default 5 minutes) and uses compare-and-swap (CAS) to safely update the shared record without overwriting concurrent changes from other nodes. A random startup jitter prevents all nodes from publishing simultaneously.

Configuration

DHT bootstrap is enabled by default. You can tune the intervals or disable it entirely:

use mosaik::{Network, discovery};
use std::time::Duration;

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            // Customize DHT intervals
            .with_dht_publish_interval(Some(Duration::from_secs(300)))
            .with_dht_poll_interval(Some(Duration::from_secs(60)))
    )
    .build()
    .await?;

To disable automatic bootstrap (e.g., when using only explicit bootstrap peers):

let network = Network::builder(network_id)
    .with_discovery(
        discovery::Config::builder()
            .no_auto_bootstrap()
            .with_bootstrap(bootstrap_addr)  // use explicit peers instead
    )
    .build()
    .await?;

Configuration Options

FieldDefaultDescription
dht_publish_interval300sHow often to publish this node’s address to the DHT. None disables publishing.
dht_poll_interval60sHow often to poll the DHT for new peers. None disables polling. Actual interval is adaptive (5s when no peers are known).

When to Use

DHT bootstrap is ideal for:

  • Decentralized deployments where there is no fixed infrastructure to serve as bootstrap nodes
  • Dynamic environments where nodes come and go frequently
  • Zero-configuration setups where nodes only need to agree on a network name

For networks with stable infrastructure, you can combine DHT bootstrap with explicit bootstrap peers — nodes will use whichever method finds peers first.

Catalog

The Catalog is an immutable snapshot of all discovered peers in the network. It’s thread-safe, cheap to clone, and updated via watch channels.

Getting a Catalog

let catalog = network.discovery().catalog();

Each call returns a snapshot — it won’t change after you’ve obtained it. To observe changes, use the watch channel.

Catalog API

Iterating Peers

let catalog = discovery.catalog();

// Iterate all peers
for (peer_id, entry) in catalog.iter() {
    println!("Peer {}: {:?}", peer_id, entry.tags);
}

// Count peers
println!("Known peers: {}", catalog.len());

Watching for Changes

let mut watch = discovery.catalog_watch();

loop {
    // Wait for catalog to change
    watch.changed().await?;

    // Borrow the latest snapshot
    let catalog = watch.borrow();
    println!("Catalog updated, {} peers", catalog.len());
}

The watch receiver always has the latest value. Multiple calls to changed() may skip intermediate updates if the catalog changes faster than you observe it.

PeerEntry

Each peer in the catalog is represented by a PeerEntry:

pub struct PeerEntry {
    pub network_id: NetworkId,
    pub peer_id: PeerId,
    pub addr: EndpointAddr,
    pub tags: BTreeSet<Tag>,
    pub streams: BTreeSet<StreamId>,
    pub groups: BTreeSet<GroupId>,
    pub version: PeerEntryVersion,
}

Fields

FieldTypeDescription
network_idNetworkIdWhich network this peer belongs to
peer_idPeerIdThe peer’s public key
addrEndpointAddrConnection address (public key + relay + direct addrs)
tagsBTreeSet<Tag>Capability labels (e.g., "matcher", "validator")
streamsBTreeSet<StreamId>Streams this peer produces
groupsBTreeSet<GroupId>Groups this peer belongs to
versionPeerEntryVersionTwo-part version for staleness detection

Signed Entries

In practice, most catalog entries are SignedPeerEntry — a PeerEntry with a cryptographic signature proving the owner created it:

let my_entry: SignedPeerEntry = discovery.me();
let peer_id = my_entry.peer_id();

Version & Staleness

Each entry carries a two-part version:

pub struct PeerEntryVersion {
    pub start: Timestamp,   // When the peer first came online
    pub update: Timestamp,  // When the entry was last updated
}

Entries where update is older than purge_after (default: 300s) are considered stale. Stale entries are:

  1. Hidden from the public catalog API
  2. Eventually removed if not refreshed

The periodic gossip re-announce (every ~15s by default) keeps entries fresh. When a node departs gracefully, it broadcasts a departure message; ungraceful departures are detected by staleness.

Using Catalog for Filtering

The catalog is commonly used to filter peers in Streams:

// Producer: only accept consumers with specific tags
let producer = network.streams().producer::<Order>()
    .accept_if(|peer| peer.tags.contains(&"authorized".into()))
    .build()?;

// Consumer: only subscribe to specific producers
let consumer = network.streams().consumer::<Order>()
    .subscribe_if(|peer| peer.tags.contains(&"primary".into()))
    .build();

Internal Structure

The catalog uses im::OrdMap — an immutable, persistent ordered map — for its internal storage. This provides:

  • O(1) cloning — snapshots are effectively free
  • Consistent iteration order — deterministic across nodes
  • Thread safety — immutable snapshots can be shared freely

Updates create new snapshots atomically via tokio::sync::watch::Sender<Catalog>.

Events

The Discovery subsystem emits events when the peer catalog changes. You can subscribe to these events for reactive programming.

Subscribing to Events

let mut events = network.discovery().events();

while let Ok(event) = events.recv().await {
    match event {
        Event::PeerDiscovered(entry) => {
            println!("New peer joined: {}", entry.peer_id());
        }
        Event::PeerUpdated(entry) => {
            println!("Peer updated: {}", entry.peer_id());
        }
        Event::PeerDeparted(peer_id) => {
            println!("Peer departed: {}", peer_id);
        }
    }
}

The events() method returns a tokio::sync::broadcast::Receiver<Event>. Multiple consumers can subscribe independently.

Event Types

PeerDiscovered

Emitted when a previously unknown peer is added to the catalog:

Event::PeerDiscovered(signed_peer_entry)

This fires for both signed entries (from gossip/sync) and unsigned entries (from manual insert()).

PeerUpdated

Emitted when an existing peer’s entry changes:

Event::PeerUpdated(signed_peer_entry)

Common triggers:

  • Peer added or removed tags
  • Peer started or stopped producing a stream
  • Peer joined or left a group
  • Peer’s address changed

PeerDeparted

Emitted when a peer is removed from the catalog:

Event::PeerDeparted(peer_id)

This fires when:

  • A peer’s entry becomes stale and is purged
  • A peer gracefully departs via the departure protocol
  • An unsigned entry is removed via discovery.remove(peer_id)

Backlog

The event channel retains up to events_backlog (default: 100) past events. If a consumer falls behind, older events are dropped and the consumer receives a RecvError::Lagged(n) indicating how many events were missed.

match events.recv().await {
    Ok(event) => { /* handle event */ }
    Err(broadcast::error::RecvError::Lagged(n)) => {
        tracing::warn!("Missed {n} events, consider full catalog re-read");
        let catalog = discovery.catalog();
        // Reconcile from snapshot
    }
    Err(broadcast::error::RecvError::Closed) => break,
}

Catalog Watch vs Events

Choose the right tool:

Use CaseToolWhy
React to specific changesevents()Get individual events with full context
Get current statecatalog()Snapshot of all peers
React to any changecatalog_watch()Triggered on every update, borrow latest
Build a UI dashboardcatalog_watch()Re-render on changes, read full state
Log peer arrivals/departuresevents()Specific events about each peer

Streams

Streams are the primary dataflow primitive in mosaik. They represent typed, asynchronous data channels that connect producers and consumers across a network.

Overview

Producer Node                       Consumer Node
┌─────────────────┐                ┌─────────────────┐
│  Producer<D>    │────QUIC────────│  Consumer<D>     │
│  (Sink impl)    │  /mosaik/      │  (Stream impl)   │
│                 │  streams/1.0   │                   │
└─────────────────┘                └─────────────────┘

A producer announces a stream via Discovery. Consumers discover the producer, open a QUIC connection using the /mosaik/streams/1.0 ALPN, and begin receiving data. The entire lifecycle is automatic — you only create the handles.

The Datum Trait

Every type sent through a stream must implement Datum:

pub trait Datum: Serialize + DeserializeOwned + Send + 'static {
    fn derived_stream_id() -> StreamId {
        core::any::type_name::<Self>().into()
    }
}

Datum is a blanket impl — any Serialize + DeserializeOwned + Send + 'static type is automatically a Datum. The derived_stream_id() method computes a StreamId (a Digest) from the Rust type name, so each type naturally maps to a unique stream.

#[derive(Serialize, Deserialize)]
struct PriceUpdate {
    symbol: String,
    price: f64,
}

// PriceUpdate is automatically a Datum
// StreamId = blake3("my_crate::PriceUpdate")

Quick Usage

Producing data:

let producer = network.streams().produce::<PriceUpdate>();

// Wait until at least one consumer is connected
producer.when().online().await;

// Send via Sink trait
use futures::SinkExt;
producer.send(PriceUpdate { symbol: "ETH".into(), price: 3200.0 }).await?;

// Or send immediately (non-blocking)
producer.try_send(PriceUpdate { symbol: "BTC".into(), price: 65000.0 })?;

Consuming data:

let mut consumer = network.streams().consume::<PriceUpdate>();

// Wait until connected to at least one producer
consumer.when().online().await;

// Receive via async method
while let Some(update) = consumer.recv().await {
    println!("{}: ${}", update.symbol, update.price);
}

// Or use as a futures::Stream
use futures::StreamExt;
while let Some(update) = consumer.next().await {
    println!("{}: ${}", update.symbol, update.price);
}

Stream Identity

By default, a stream’s identity comes from Datum::derived_stream_id(), which hashes the Rust type name. You can override this with a custom StreamId:

let producer = network.streams()
    .producer::<PriceUpdate>()
    .with_stream_id("custom-price-feed")
    .build()?;

This lets you have multiple distinct streams of the same data type.

Architecture

Streams are built on top of the Discovery and Network subsystems:

  1. Producer creation — the local discovery entry is updated to advertise the stream
  2. Consumer creation — the consumer worker discovers producers via the catalog and opens subscriptions
  3. Subscription — a QUIC bi-directional stream is opened; the consumer sends its Criteria, the producer sends data
  4. Fanout — each consumer gets its own independent sender loop so a slow consumer does not block others
  5. Cleanup — when handles are dropped, underlying tasks are cancelled

Close Reason Codes

When a stream subscription fails, the producer sends structured close reasons:

CodeNameMeaning
10_404StreamNotFoundThe requested stream does not exist on the producer
10_403NotAllowedThe consumer is rejected by the producer’s accept_if predicate
10_509NoCapacityThe producer has reached max_consumers
10_413TooSlowThe consumer was disconnected for lagging behind

Subsystem Configuration

The Streams config currently has one setting:

Config::builder()
    .with_backoff(ExponentialBackoffBuilder::default()
        .with_max_elapsed_time(Some(Duration::from_secs(300)))
        .build())
    .build()?;
OptionDefaultDescription
backoffExponential (max 5 min)Default backoff policy for consumer connection retries

Individual producers and consumers can override this via their respective builders.

Producers

A Producer<D> is a handle for sending data of type D to all connected consumers. Multiple Producer handles can exist for the same stream (mpsc pattern) — they share the same underlying fanout sink.

Creating Producers

Simple (default configuration):

let producer = network.streams().produce::<MyDatum>();

If a producer for this datum type already exists, the existing one is returned.

With builder (advanced configuration):

let producer = network.streams()
    .producer::<MyDatum>()
    .accept_if(|peer| peer.tags().contains(&"trusted".into()))
    .online_when(|c| c.minimum_of(2).with_tags("validator"))
    .disconnect_lagging(true)
    .with_buffer_size(2048)
    .with_max_consumers(10)
    .with_stream_id("custom-id")
    .build()?;

Builder Options

MethodDefaultDescription
accept_if(predicate)Accept allPredicate to accept/reject incoming consumer connections
online_when(conditions)minimum_of(1)Conditions under which the producer is online
disconnect_lagging(bool)trueDisconnect consumers that fall behind buffer_size
with_buffer_size(n)1024Internal channel buffer size
with_max_consumers(n)usize::MAXMaximum allowed simultaneous consumers
with_stream_id(id)D::derived_stream_id()Custom stream identity
with_undelivered_sink(sender)NoneCapture datum that no consumer matched

Sending Data

Via Sink Trait

Producer<D> implements futures::Sink<D>. The send() method waits for the producer to be online before accepting data:

use futures::SinkExt;

// Blocks until at least one consumer is connected (default online condition)
producer.send(datum).await?;

If the producer is offline, poll_ready will not resolve until the online conditions are met.

Via try_send

For non-blocking sends:

match producer.try_send(datum) {
    Ok(()) => { /* sent to fanout */ }
    Err(Error::Offline(d)) => { /* no consumers, datum returned */ }
    Err(Error::Full(d)) => { /* buffer full, datum returned */ }
    Err(Error::Closed(d)) => { /* producer closed */ }
}

All error variants return the unsent datum so you can retry or inspect it.

Online Conditions

By default, a producer is online when it has at least one connected consumer. Customize this:

// Online when at least 3 validators are subscribed
let producer = network.streams()
    .producer::<MyDatum>()
    .online_when(|c| c.minimum_of(3).with_tags("validator"))
    .build()?;

Check online status at any time:

if producer.is_online() {
    producer.try_send(datum)?;
}

Observing Status

The when() API provides reactive status monitoring:

// Wait until online
producer.when().online().await;

// Wait until offline
producer.when().offline().await;

// Wait until subscribed by at least one peer
producer.when().subscribed().await;

// Wait until subscribed by at least N peers
producer.when().subscribed().minimum_of(3).await;

// Wait until subscribed by peers with specific tags
producer.when().subscribed().with_tags("validator").await;

// Wait until no subscribers
producer.when().unsubscribed().await;

Inspecting Consumers

Iterate over the currently connected consumers:

for info in producer.consumers() {
    println!(
        "Consumer {} connected to stream {} — state: {:?}, stats: {}",
        info.consumer_id(),
        info.stream_id(),
        info.state(),
        info.stats(),
    );
}

Each ChannelInfo provides access to:

  • stream_id() — the stream this subscription is for
  • producer_id() / consumer_id() — the peer IDs
  • peer() — the PeerEntry snapshot at subscription time
  • state()Connecting, Connected, or Terminated
  • state_watcher() — a watch::Receiver<State> for monitoring changes
  • stats()Stats with datums(), bytes(), uptime()
  • is_connected() — shorthand for state() == Connected
  • disconnected() — future that resolves when terminated

Undelivered Sink

Capture datum that did not match any consumer’s criteria:

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let producer = network.streams()
    .producer::<MyDatum>()
    .with_undelivered_sink(tx)
    .build()?;

// In another task
while let Some(datum) = rx.recv().await {
    tracing::warn!("Datum had no matching consumer: {:?}", datum);
}

Note: In default configuration (online when ≥ 1 subscriber), undelivered events only occur if connected consumers’ criteria reject the datum. If you customize online_when to allow publishing with zero subscribers, the sink captures all datum sent while nobody is listening.

Builder Errors

build() returns Err(BuilderError::AlreadyExists(existing)) if a producer for the stream ID already exists. The error contains the existing Producer<D> handle so you can use it directly:

match network.streams().producer::<MyDatum>().build() {
    Ok(new) => new,
    Err(BuilderError::AlreadyExists(existing)) => existing,
}

The simple produce() method handles this automatically — returning the existing producer if one exists.

Consumers

A Consumer<D> receives data of type D from remote producers. Each consumer has its own worker task that discovers producers, manages subscriptions, and delivers data.

Creating Consumers

Simple (default configuration):

let mut consumer = network.streams().consume::<MyDatum>();

With builder (advanced configuration):

let mut consumer = network.streams()
    .consumer::<MyDatum>()
    .subscribe_if(|peer| peer.tags().contains(&"primary".into()))
    .with_criteria(Criteria::default())
    .with_stream_id("custom-id")
    .with_backoff(ExponentialBackoffBuilder::default()
        .with_max_elapsed_time(Some(Duration::from_secs(60)))
        .build())
    .build();

Builder Options

MethodDefaultDescription
subscribe_if(predicate)Accept allFilter which producers to subscribe to
with_criteria(criteria)Criteria::default()Data selection criteria sent to producers
with_stream_id(id)D::derived_stream_id()Custom stream identity
with_backoff(policy)From Streams configBackoff policy for reconnection retries

Receiving Data

Via recv()

The primary receiving method — async, blocks until data is available:

while let Some(datum) = consumer.recv().await {
    process(datum);
}
// Returns None when the consumer is closed

Via try_recv()

Non-blocking receive for polling patterns:

match consumer.try_recv() {
    Ok(datum) => process(datum),
    Err(TryRecvError::Empty) => { /* no data available */ }
    Err(TryRecvError::Disconnected) => { /* consumer closed */ }
}

Via Stream Trait

Consumer<D> implements futures::Stream<Item = D>:

use futures::StreamExt;

while let Some(datum) = consumer.next().await {
    process(datum);
}

// Or with combinators
let filtered = consumer
    .filter(|d| futures::future::ready(d.price > 100.0))
    .take(10)
    .collect::<Vec<_>>()
    .await;

Producer Selection

By default, a consumer subscribes to every discovered producer of the same stream ID. Use subscribe_if to be selective:

// Only subscribe to producers tagged as "primary"
let consumer = network.streams()
    .consumer::<PriceUpdate>()
    .subscribe_if(|peer| peer.tags().contains(&"primary".into()))
    .build();

The predicate receives a &PeerEntry and is evaluated each time a new producer is discovered.

Criteria

Criteria are sent to the producer when subscribing. They allow content-based filtering at the source. Currently Criteria is a placeholder that matches everything:

pub struct Criteria {}

impl Criteria {
    pub const fn matches<D: Datum>(&self, _item: &D) -> bool {
        true
    }
}

Backoff & Reconnection

When a connection to a producer fails, the consumer retries with an exponential backoff policy.

Global default (from Streams config): exponential backoff up to 5 minutes.

Per-consumer override:

use backoff::ExponentialBackoffBuilder;

let consumer = network.streams()
    .consumer::<MyDatum>()
    .with_backoff(ExponentialBackoffBuilder::default()
        .with_max_elapsed_time(Some(Duration::from_secs(30)))
        .build())
    .build();

Observing Status

The when() API mirrors the producer side:

// Wait until connected to at least one producer
consumer.when().online().await;

// Wait until connected to at least one producer
consumer.when().subscribed().await;

// Wait until connected to at least N producers
consumer.when().subscribed().to_at_least(3).await;

// Wait until no producers are connected
consumer.when().unsubscribed().await;

Check online status imperatively:

if consumer.is_online() {
    // consumer has active connections
}

Statistics

Each consumer tracks aggregated stats:

let stats = consumer.stats();
println!("Received {} datums ({} bytes)", stats.datums(), stats.bytes());

if let Some(uptime) = stats.uptime() {
    println!("Connected for {:?}", uptime);
}

Stats provides:

  • datums() — total number of datums received
  • bytes() — total serialized bytes received
  • uptime()Option<Duration> since last connection (None if currently disconnected)

Inspecting Producers

for info in consumer.producers() {
    println!(
        "Connected to producer {} — {:?}, {}",
        info.producer_id(),
        info.state(),
        info.stats(),
    );
}

Each ChannelInfo provides the same API as on the producer side.

Multiple Consumers

Multiple consumers can be created for the same stream. Each gets its own independent copy of the data:

let mut consumer_a = network.streams().consume::<PriceUpdate>();
let mut consumer_b = network.streams().consume::<PriceUpdate>();

// Both receive all PriceUpdate data independently

Lifecycle

When a Consumer handle is dropped, its background worker task is cancelled (via a DropGuard) and all connections to producers are closed. No explicit shutdown is needed.

Status & Conditions

Both producers and consumers expose a reactive status API through When and ChannelConditions. This allows you to write event-driven code that reacts to changes in subscription state.

The When API

Every Producer and Consumer provides a when() method that returns a &When handle:

let producer = network.streams().produce::<MyDatum>();
let when = producer.when();

Online / Offline

// Resolves when the channel is ready (all publishing conditions met)
when.online().await;

// Resolves when the channel goes offline
when.offline().await;

// Check immediately
if when.is_online() { /* ... */ }

For producers, “online” means the online_when conditions are met (default: at least 1 consumer). For consumers, “online” means at least one producer connection is active.

Subscribed / Unsubscribed

// Resolves when at least one peer is connected
when.subscribed().await;

// Resolves when zero peers are connected
when.unsubscribed().await;

ChannelConditions

when().subscribed() returns a ChannelConditions — a composable future that resolves when the subscription state matches your criteria.

Minimum Peers

// Wait for at least 3 connected peers
when.subscribed().minimum_of(3).await;

Tag Filtering

// Wait for peers that have the "validator" tag
when.subscribed().with_tags("validator").await;

// Multiple tags (all must be present on the peer)
when.subscribed().with_tags(["validator", "us-east"]).await;

Custom Predicates

// Wait for a peer matching an arbitrary condition
when.subscribed()
    .with_predicate(|peer: &PeerEntry| peer.tags().len() >= 3)
    .await;

Combining Conditions

Conditions compose naturally:

// At least 2 validators from the us-east region
when.subscribed()
    .minimum_of(2)
    .with_tags("validator")
    .with_predicate(|p| p.tags().contains(&"us-east".into()))
    .await;

Inverse Conditions

Use unmet() to invert a condition:

// Wait until there are fewer than 3 validators
when.subscribed()
    .minimum_of(3)
    .with_tags("validator")
    .unmet()
    .await;

Re-Triggering

ChannelConditions implements Future and can be polled repeatedly. After resolving, it resets and will resolve again when the condition transitions from not met → met. This makes it suitable for use in loops:

let mut condition = when.subscribed().minimum_of(2);
loop {
    (&mut condition).await;
    println!("We now have 2+ subscribers again!");
}

Checking Immediately

let condition = when.subscribed().minimum_of(3);
if condition.is_condition_met() {
    // There are currently 3+ matching peers
}

ChannelConditions also supports comparison with bool:

if when.subscribed().minimum_of(3) == true {
    // equivalent to is_condition_met()
}

ChannelInfo

ChannelInfo represents an individual connection between a consumer and a producer:

pub struct ChannelInfo {
    stream_id: StreamId,
    criteria: Criteria,
    producer_id: PeerId,
    consumer_id: PeerId,
    stats: Arc<Stats>,
    peer: Arc<PeerEntry>,
    state: watch::Receiver<State>,
}

Connection State

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum State {
    Connecting,   // Handshake in progress
    Connected,    // Active subscription
    Terminated,   // Unrecoverably closed
}

Monitor state changes:

let info: ChannelInfo = /* from producers() or consumers() */;

// Check current state
match info.state() {
    State::Connected => { /* active */ }
    State::Connecting => { /* handshake in progress */ }
    State::Terminated => { /* closed */ }
}

// Watch for state transitions
let mut watcher = info.state_watcher().clone();
while watcher.changed().await.is_ok() {
    println!("State changed to: {:?}", *watcher.borrow());
}

// Wait for disconnection
info.disconnected().await;

Stats

Every channel tracks real-time statistics:

let stats = info.stats();

println!("Datums: {}", stats.datums());   // Total datums transferred
println!("Bytes: {}", stats.bytes());     // Total bytes transferred
println!("Uptime: {:?}", stats.uptime()); // Option<Duration> since connection

// Display formatting included
println!("{}", stats);
// Output: "uptime: 2m 30s, datums: 15432, bytes: 1.23 MB"

Stats implements Display with human-readable formatting using humansize and humantime.

Using Status for Online Conditions

The ChannelConditions type is also used to configure producer online conditions via the builder:

let producer = network.streams()
    .producer::<MyDatum>()
    .online_when(|c| c.minimum_of(2).with_tags("validator"))
    .build()?;

The closure receives a ChannelConditions and returns it with your conditions applied. This is evaluated every time the subscription set changes.

Groups

Availability Groups are clusters of trusted nodes on the same mosaik network that coordinate with each other for load balancing and failover. Members of a group share a secret key, maintain a consistent replicated state through a modified Raft consensus protocol, and stay connected via an all-to-all mesh of persistent bonds.

Overview

          ┌──────────┐
          │  Node A   │
          │ (Leader)  │
          └────┬──┬───┘
          bond/  \bond
              /    \
 ┌──────────┐      ┌──────────┐
 │  Node B   │─bond─│  Node C   │
 │(Follower) │      │(Follower) │
 └───────────┘      └───────────┘

Every pair of group members maintains a persistent bond — an authenticated, bidirectional QUIC connection. Bonds carry Raft consensus messages, heartbeats, and log-sync traffic.

Trust Model

Groups are not Byzantine fault tolerant. All members within a group are assumed to be honest and operated by the same entity. The GroupKey acts as the sole admission control — only nodes that know the key can join.

Quick Start

use mosaik::groups::GroupKey;

// All group members must use the same key
let key = GroupKey::generate();

// Join with default settings (NoOp state machine)
let group = network.groups().with_key(key).join();

// Wait for leader election
let leader = group.when().leader_elected().await;
println!("Leader: {leader}");

// Check local role
if group.is_leader() {
    println!("I am the leader");
}

Group Identity

A GroupId is derived from three components:

  1. Group key — the shared secret (GroupKey)
  2. Consensus configuration — election timeouts, heartbeat intervals, etc.
  3. State machine signature — the state machine’s signature() + state sync signature()

Any divergence in these values across nodes produces a different GroupId, preventing misconfigured nodes from bonding.

// GroupId derivation (internal)
let id = key.secret().hashed()
    .derive(consensus.digest())
    .derive(state_machine.signature())
    .derive(state_machine.state_sync().signature());

Key Types

TypeDescription
GroupsPublic API gateway — one per Network
GroupBuilderTypestate builder for configuring and joining a group
Group<M>Handle for interacting with a joined group
GroupKeyShared secret for admission control
GroupIdUnique identifier (Digest) derived from key + config + machine
Bond / BondsPersistent connections between group members
WhenReactive status API for group state changes
ConsensusConfigRaft timing parameters

ALPN Protocol

Groups use /mosaik/groups/1 as their ALPN identifier.

Close Reason Codes

CodeNameMeaning
30_400InvalidHandshakeError during handshake decoding
30_404GroupNotFoundGroup ID not known to acceptor
30_405InvalidProofAuthentication proof invalid
30_408TimeoutTimed out waiting for response
30_429AlreadyBondedA bond already exists between these peers

Subsystem Configuration

use mosaik::groups::Config;

let config = Config::builder()
    .with_handshake_timeout(Duration::from_secs(2))  // default
    .build()?;
OptionDefaultDescription
handshake_timeout2 secondsTimeout for bond handshake completion

Joining Groups

The GroupBuilder uses a typestate pattern to ensure groups are configured correctly at compile time. You cannot call join() until both storage and state machine are set (or you use the shorthand join() which defaults to NoOp).

Builder Flow

groups.with_key(key)
  ├── .join()                          // NoOp machine, InMemoryLogStore
  └── .with_state_machine(machine)
        ├── .join()                    // InMemoryLogStore (default)
        ├── .with_consensus_config(c)
        │     └── .join()
        └── .with_log_storage(store)
              └── .join()

Minimal Join (NoOp)

Useful for leader election without application logic:

let group = network.groups().with_key(key).join();

This creates a group with:

  • NoOp state machine (commands are (), queries are ())
  • InMemoryLogStore for storage
  • Default ConsensusConfig

With Custom State Machine

let group = network.groups()
    .with_key(key)
    .with_state_machine(MyStateMachine::new())
    .join();

The state machine must be set before storage, since the storage type depends on the command type.

With Custom Storage

let group = network.groups()
    .with_key(key)
    .with_state_machine(MyStateMachine::new())
    .with_log_storage(MyDurableStore::new())
    .join();

Storage must implement Storage<M::Command>.

GroupKey

A GroupKey is the shared secret that all members must possess:

use mosaik::groups::GroupKey;

// Generate a new random key
let key = GroupKey::generate();

// All members use the same key
// The key can be serialized and distributed securely

ConsensusConfig

All consensus parameters are part of GroupId derivation — every member must use the same values.

use mosaik::groups::ConsensusConfig;

let config = ConsensusConfig::builder()
    .with_heartbeat_interval(Duration::from_millis(500))     // default
    .with_heartbeat_jitter(Duration::from_millis(150))       // default
    .with_max_missed_heartbeats(10)                          // default
    .with_election_timeout(Duration::from_secs(2))           // default
    .with_election_timeout_jitter(Duration::from_millis(500))// default
    .with_bootstrap_delay(Duration::from_secs(3))            // default
    .with_forward_timeout(Duration::from_secs(2))            // default
    .with_query_timeout(Duration::from_secs(2))              // default
    .build()?;

let group = network.groups()
    .with_key(key)
    .with_state_machine(machine)
    .with_consensus_config(config)
    .join();

Parameters

ParameterDefaultDescription
heartbeat_interval500msInterval between bond heartbeat pings
heartbeat_jitter150msMax random jitter subtracted from heartbeat interval
max_missed_heartbeats10Missed heartbeats before bond is considered dead
election_timeout2sBase timeout before a follower starts an election
election_timeout_jitter500msMax random jitter added to election timeout
bootstrap_delay3sWait time before first election to allow peer discovery
forward_timeout2sTimeout for forwarding commands to the leader
query_timeout2sTimeout for strong-consistency query responses

Leadership Preference

Nodes can deprioritize leadership to prefer being followers:

// 3x longer election timeout (default multiplier)
let config = ConsensusConfig::default().deprioritize_leadership();

// Custom multiplier
let config = ConsensusConfig::default().deprioritize_leadership_by(5);

This multiplies both election_timeout and bootstrap_delay, reducing the chance of becoming leader.

Idempotent Joins

If you join() a group whose GroupId already exists on this node, the existing Group handle is returned. No duplicate worker is spawned.

Lifecycle

When a Group<M> handle is dropped:

  1. Bonds notify peers of the departure
  2. The group’s cancellation token is triggered
  3. The group is removed from the active groups map

State Machines

Every group runs a replicated state machine (RSM) that processes commands and answers queries. The StateMachine trait is the primary extension point for application logic.

The StateMachine Trait

pub trait StateMachine: Sized + Send + Sync + 'static {
    type Command: Command;       // Mutating operations
    type Query: Query;           // Read-only operations
    type QueryResult: QueryResult; // Query responses
    type StateSync: StateSync<Machine = Self>; // Catch-up strategy

    fn signature(&self) -> UniqueId;
    fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext);
    fn query(&self, query: Self::Query) -> Self::QueryResult;
    fn state_sync(&self) -> Self::StateSync;

    // Optional overrides
    fn apply_batch(&mut self, commands: impl IntoIterator<Item = Self::Command>, ctx: &dyn ApplyContext) { ... }
    fn leadership_preference(&self) -> LeadershipPreference { LeadershipPreference::Normal }
}

Associated Types

TypeBoundPurpose
CommandClone + Send + Serialize + DeserializeOwned + 'staticState-mutating operations replicated via Raft log
QueryClone + Send + Serialize + DeserializeOwned + 'staticRead-only operations, not replicated
QueryResultClone + Send + Serialize + DeserializeOwned + 'staticResponses to queries
StateSyncStateSync<Machine = Self>How lagging followers catch up

All message types get blanket implementations from StateMachineMessage, so any Clone + Send + Serialize + DeserializeOwned + 'static type qualifies.

Implementing a State Machine

Here is a complete counter example:

use mosaik::groups::*;
use serde::{Serialize, Deserialize};

#[derive(Default)]
struct Counter {
    value: i64,
}

#[derive(Clone, Serialize, Deserialize)]
enum CounterCmd {
    Increment(i64),
    Decrement(i64),
    Reset,
}

#[derive(Clone, Serialize, Deserialize)]
enum CounterQuery {
    Value,
}

impl StateMachine for Counter {
    type Command = CounterCmd;
    type Query = CounterQuery;
    type QueryResult = i64;
    type StateSync = LogReplaySync<Self>;

    fn signature(&self) -> UniqueId {
        UniqueId::from("counter_v1")
    }

    fn apply(&mut self, command: CounterCmd, _ctx: &dyn ApplyContext) {
        match command {
            CounterCmd::Increment(n) => self.value += n,
            CounterCmd::Decrement(n) => self.value -= n,
            CounterCmd::Reset => self.value = 0,
        }
    }

    fn query(&self, query: CounterQuery) -> i64 {
        match query {
            CounterQuery::Value => self.value,
        }
    }

    fn state_sync(&self) -> LogReplaySync<Self> {
        LogReplaySync::default()
    }
}

The signature() Method

Returns a UniqueId that is part of the GroupId derivation. All group members must return the same signature. Different signatures → different GroupId → nodes cannot bond.

Use it to version your state machine:

fn signature(&self) -> UniqueId {
    UniqueId::from("orderbook_matching_engine_v2")
}

ApplyContext

The apply() method receives a &dyn ApplyContext providing deterministic metadata:

pub trait ApplyContext {
    fn committed(&self) -> Cursor;     // Last committed position before this batch
    fn log_position(&self) -> Cursor;  // Last log position
    fn current_term(&self) -> Term;    // Term of the commands being applied
}

Important: The context is safe for deterministic state machines — it never exposes non-deterministic data that could diverge across nodes.

Batch Apply

For performance, override apply_batch:

fn apply_batch(
    &mut self,
    commands: impl IntoIterator<Item = Self::Command>,
    ctx: &dyn ApplyContext,
) {
    // Apply all commands in a single database transaction
    let tx = self.db.begin();
    for command in commands {
        self.apply_one(&tx, command, ctx);
    }
    tx.commit();
}

The default implementation simply calls apply() for each command sequentially.

NoOp State Machine

For leader-election-only use cases, mosaik provides NoOp:

#[derive(Debug, Default)]
pub struct NoOp;

impl StateMachine for NoOp {
    type Command = ();
    type Query = ();
    type QueryResult = ();
    type StateSync = LogReplaySync<Self>;
    // ...
}

Usage:

let group = network.groups().with_key(key).join(); // implicitly uses NoOp

State Synchronization

The state_sync() method returns a StateSync implementation used when followers need to catch up. For most cases, use LogReplaySync:

fn state_sync(&self) -> LogReplaySync<Self> {
    LogReplaySync::default()
}

LogReplaySync replays committed log entries to bring the follower’s state machine up to date. For advanced use cases (e.g., snapshot-based sync), implement the StateSync trait directly.

See the State Sync deep dive for details.

Leadership Preference

A state machine can declare its preference for assuming leadership within the group by overriding leadership_preference(). This is a per-node setting – different nodes in the same group can return different values without affecting the GroupId.

use mosaik::LeadershipPreference;

fn leadership_preference(&self) -> LeadershipPreference {
    LeadershipPreference::Observer
}
VariantBehavior
Normal (default)Standard Raft behavior – participates in elections as a candidate
Reluctant { factor: u32 }Longer election timeouts (multiplied by factor), can still be elected
ObserverNever self-nominates as candidate; still votes and replicates log

The convenience constructor LeadershipPreference::reluctant() creates a Reluctant variant with the default factor of 3.

Liveness warning: If every node in a group returns Observer, no leader can ever be elected and the group will be unable to make progress. Ensure at least one node has Normal or Reluctant preference.

This mechanism is used internally by collection readers, which return Observer so that leadership stays on writer nodes where writes are handled directly rather than being forwarded.

Storage

Commands are persisted in a log via the Storage trait:

pub trait Storage<C: Command>: Send + 'static {
    fn append(&mut self, command: C, term: Term) -> Index;
    fn available(&self) -> RangeInclusive<Index>;
    fn get(&self, index: Index) -> Option<(C, Term)>;
    fn get_range(&self, range: &RangeInclusive<Index>) -> Vec<(Term, Index, C)>;
    fn truncate(&mut self, at: Index);
    fn last(&self) -> Cursor;
    fn term_at(&self, index: Index) -> Option<Term>;
    fn prune_prefix(&mut self, up_to: Index);
    fn reset_to(&mut self, cursor: Cursor);
}

InMemoryLogStore<C> is the default implementation. For durability, implement Storage with a persistent backend (disk, database, etc.).

Commands & Queries

Once you have a Group<M> handle, you interact with the replicated state machine through commands (writes) and queries (reads).

Commands

Commands mutate state and are replicated to all group members through the Raft log. They are guaranteed to be applied in the same order on every node.

execute — Send and Wait

Sends a command and waits for it to be committed (replicated to a quorum):

let index = group.execute(CounterCmd::Increment(5)).await?;
println!("Command committed at log index {index}");

If the local node is:

  • Leader: replicates to followers, resolves when quorum acknowledges
  • Follower: forwards to the leader, resolves when the leader commits

execute_many — Batch Send and Wait

Send multiple commands atomically:

let range = group.execute_many([
    CounterCmd::Increment(1),
    CounterCmd::Increment(2),
    CounterCmd::Increment(3),
]).await?;
println!("Commands committed at indices {range:?}");

Returns an IndexRange (RangeInclusive<Index>) covering all committed entries.

feed — Fire and Forget

Sends a command without waiting for commitment. Resolves once the leader acknowledges receipt and assigns a log index:

let index = group.feed(CounterCmd::Increment(10)).await?;
println!("Command assigned index {index}, not yet committed");

feed_many — Batch Fire and Forget

let range = group.feed_many([
    CounterCmd::Reset,
    CounterCmd::Increment(100),
]).await?;

Waiting for Commitment After Feed

Combine feed with the cursor watcher:

let range = group.feed_many(commands).await?;

// Wait until all commands are committed
group.when().committed().reaches(range.clone()).await;

Command Errors

pub enum CommandError<M: StateMachine> {
    Offline(Vec<M::Command>),  // Node is offline; commands returned
    NoCommands,                // Empty command list
    GroupTerminated,           // Group is shut down
}

The Offline variant returns the unsent commands so they can be retried:

match group.execute(cmd).await {
    Ok(index) => println!("Committed at {index}"),
    Err(CommandError::Offline(cmds)) => {
        // Save for retry
        group.when().online().await;
        group.execute(cmds.into_iter().next().unwrap()).await?;
    }
    Err(CommandError::GroupTerminated) => {
        panic!("Group is gone");
    }
    Err(CommandError::NoCommands) => unreachable!(),
}

Queries

Queries are read-only operations against the state machine. They are not replicated in the log.

Weak Consistency

Reads from the local node’s state machine. Fast but may return stale data:

let result = group.query(CounterQuery::Value, Consistency::Weak).await?;
println!("Local counter value: {} (at index {})", result.result, result.at_position);

Strong Consistency

Forwards the query to the current leader, guaranteeing linearizable reads:

let result = group.query(CounterQuery::Value, Consistency::Strong).await?;
println!("Leader counter value: {} (at index {})", *result, result.state_position());

CommittedQueryResult

Query results are wrapped in CommittedQueryResult<M>:

pub struct CommittedQueryResult<M: StateMachine> {
    pub result: M::QueryResult,    // The actual result
    pub at_position: Index,        // Log index at query time
}

It implements Deref to M::QueryResult, so you can use it directly:

let result = group.query(CounterQuery::Value, Consistency::Weak).await?;

// Deref to the inner result
let value: i64 = *result;

// Or access explicitly
let position = result.state_position();
let inner = result.into(); // Consume and get M::QueryResult

Query Errors

pub enum QueryError<M: StateMachine> {
    Offline(M::Query),   // Node offline; query returned
    GroupTerminated,     // Group shut down
}

Ordering Guarantee

Consecutive calls to execute, execute_many, feed, or feed_many on the same Group handle are guaranteed to be processed in the order they were issued.

Common Patterns

Command-Query Separation

// Write path: fire and forget for throughput
group.feed(OrderCmd::Place(order)).await?;

// Read path: weak consistency for speed
let book = group.query(OrderQuery::Snapshot, Consistency::Weak).await?;

// Read path: strong consistency for accuracy
let book = group.query(OrderQuery::Snapshot, Consistency::Strong).await?;

Execute and Verify

let index = group.execute(CounterCmd::Increment(1)).await?;
let result = group.query(CounterQuery::Value, Consistency::Weak).await?;
assert!(result.at_position >= index);

Group Status

The Group<M> handle exposes a rich reactive API for monitoring group state: leadership, log progress, and online status.

Inspecting Current State

// Current leader (if any)
let leader: Option<PeerId> = group.leader();

// Am I the leader?
let is_leader: bool = group.is_leader();

// Group identity
let group_id: &GroupId = group.id();

// Current committed index
let committed: Index = group.committed();

// Current log position (may include uncommitted entries)
let cursor: Cursor = group.log_position();

// Active bonds (peer connections)
let bonds: Bonds = group.bonds();

// Group configuration
let config: &GroupConfig = group.config();

The When API

group.when() returns a &When handle for awaiting state transitions.

Leadership Events

// Wait for any leader to be elected
let leader: PeerId = group.when().leader_elected().await;

// Wait for a different leader (leader change)
let new_leader: PeerId = group.when().leader_changed().await;

// Wait until a specific peer becomes leader
group.when().leader_is(expected_peer_id).await;

// Wait until the local node becomes leader
group.when().is_leader().await;

// Wait until the local node becomes a follower
group.when().is_follower().await;

Online / Offline

A node is online when:

  • It is the leader, or
  • It is a fully synced follower (not catching up, not in an election)
// Wait until ready to process commands
group.when().online().await;

// Wait until the node goes offline
group.when().offline().await;

Log Position Watchers

The CursorWatcher type provides fine-grained observation of log progress:

Committed Index

let committed = group.when().committed();

// Wait until a specific index is committed
committed.reaches(42).await;

// Wait for any forward progress
let new_pos = committed.advanced().await;

// Wait for any change (including backward, e.g., log truncation)
let new_pos = committed.changed().await;

// Wait for regression (rare — happens during partition healing)
let new_pos = committed.reverted().await;

You can also pass an IndexRange from execute_many / feed_many:

let range = group.feed_many(commands).await?;
group.when().committed().reaches(range).await;

Log Position

The same API works for the full log position (including uncommitted entries):

let log = group.when().log();

let new_cursor = log.advanced().await;
let new_cursor = log.changed().await;

Index and Cursor Types

/// A log index (0-based, where 0 is the sentinel "no entry")
pub struct Index(pub u64);

/// A Raft term number
pub struct Term(pub u64);

/// A (term, index) pair identifying a specific log position
pub struct Cursor { pub term: Term, pub index: Index }

Both Index and Term provide:

  • zero(), one() — constants
  • is_zero() — check for sentinel
  • prev(), next() — saturating arithmetic
  • distance(other) — absolute difference

Bonds

group.bonds() returns a Bonds handle — a watchable, ordered collection of all active bonds:

let bonds = group.bonds();

// Iterate current bonds
for bond in bonds.iter() {
    println!("Bonded to: {}", bond.peer_id());
}

Bonds carry Raft messages, heartbeats, and sync traffic between group members. See the Bonds deep dive for internals.

Putting It Together

A common pattern for group-aware applications:

let group = network.groups()
    .with_key(key)
    .with_state_machine(MyMachine::new())
    .join();

// Wait until we can serve traffic
group.when().online().await;

if group.is_leader() {
    // Run leader-specific tasks
    run_leader_loop(&group).await;
} else {
    // Run follower-specific tasks
    run_follower_loop(&group).await;
}

// React to leadership changes
loop {
    let new_leader = group.when().leader_changed().await;
    if group.is_leader() {
        // Transition to leader role
    } else {
        // Transition to follower role
    }
}

Collections

The Collections subsystem provides replicated, eventually consistent data structures that feel like local Rust collections but are automatically synchronized across all participating nodes using Raft consensus.

┌────────────────────────────────────────────────────────────────────────┐
│                   Collections                                          │
│                                                                        │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐ ┌──────┐ │
│  │   Map    │ │   Vec    │ │   Set    │ │  DEPQ  │ │ Register │ │ Once │ │
│  │ <K, V>   │ │ <T>      │ │ <T>      │ │<P,K,V> │ │  <T>     │ │ <T>  │ │
│  └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └────┬─────┘ └──┬───┘ │
│       └────────────┴────────────┴─┬─────────┴───────────┴──────────┘     │
│                                   │                                     │
│                                   │                                    │
│                                   │                                    │
│                  ┌────────────────▼───────────────────┐                │
│                  │    Groups (Raft Consensus)         │                │
│                  │    One group per collection        │                │
│                  └────────────────────────────────────┘                │
└────────────────────────────────────────────────────────────────────────┘

Each collection instance creates its own Raft consensus group. Different collections (or the same type with different StoreIds) run as independent groups and can span different subsets of the network.

Quick start

use mosaik::collections::{Map, StoreId};

// On every node — using the same StoreId joins the same group
let prices = Map::<String, f64>::writer(&network, StoreId::from("prices"));

// Wait until the collection is online
prices.when().online().await;

// Write (only available on writers)
let version = prices.insert("ETH".into(), 3812.50).await?;

// Wait for the write to be committed
prices.when().reaches(version).await;

// Read (available on both writers and readers)
assert_eq!(prices.get(&"ETH".into()), Some(3812.50));

Writer / Reader split

Every collection type offers two modes, distinguished at the type level using a const-generic boolean:

ModeType aliasCan write?Leadership priority
WriterMapWriter<K,V>, VecWriter<T>, RegisterWriter<T>, OnceWriter<T> etc.YesNormal
ReaderMapReader<K,V>, VecReader<T>, RegisterReader<T>, OnceReader<T> etc.NoDeprioritized

Both modes provide identical read access. Readers automatically use deprioritize_leadership() in their consensus configuration to reduce the chance of being elected leader, since leaders handle write forwarding.

// Writer — can read AND write
let w = Map::<String, u64>::writer(&network, store_id);

// Reader — can only read, lower chance of becoming leader
let r = Map::<String, u64>::reader(&network, store_id);

Available collections

CollectionDescriptionBacking structure
Map<K, V>Unordered key-value mapim::HashMap (deterministic)
Vec<T>Ordered, index-addressable sequenceim::Vector
Set<T>Unordered set of unique valuesim::HashSet (deterministic)
Register<T>Single-value registerOption<T>
Once<T>Write-once registerOption<T>
PriorityQueue<P, K, V>Double-ended priority queueim::HashMap + im::OrdMap

Most collections use the im crate for their internal state, which provides O(1) structural sharing — cloning a snapshot of the entire collection is essentially free. This is critical for the snapshot-based state sync mechanism. Register and Once use a plain Option<T> since they hold at most one value.

Trait requirements

Collections use blanket-implemented marker traits to constrain their type parameters:

TraitRequired boundsUsed by
ValueClone + Debug + Serialize + DeserializeOwned + Hash + PartialEq + Eq + Send + Sync + 'staticAll element/value types
KeyClone + Serialize + DeserializeOwned + Hash + PartialEq + Eq + Send + Sync + 'staticMap keys, Set elements, PQ keys
OrderedKeyKey + OrdPriorityQueue priorities

These traits are automatically implemented for any type that satisfies their bounds — you never need to implement them manually.

StoreId and group identity

Each collection derives its Raft group identity from:

  1. A fixed prefix per collection type (e.g., "mosaik_collections_map", "mosaik_collections_once")
  2. The StoreId — a UniqueId you provide at construction time
  3. The Rust type names of the type parameters

This means Map::<String, u64>::writer(&net, id) and Map::<u32, u64>::writer(&net, id) will join different groups even with the same StoreId, because the key type differs. Likewise, a Once<String> and a Register<String> with the same StoreId form separate groups because the prefix differs.

Version tracking

All mutating operations return a Version, which wraps the Raft log Index at which the mutation will be committed:

let version: Version = map.insert("key".into(), 42).await?;

// Wait until the mutation is committed and visible
map.when().reaches(version).await;

Error handling

All write operations return Result<Version, Error<T>>:

VariantMeaning
Error::Offline(T)The node is temporarily offline. The value that failed is returned for retry.
Error::NetworkDownThe network is permanently down. The collection is no longer usable.

SyncConfig

Collections use a snapshot-based state sync mechanism to bring lagging followers up to date. The SyncConfig controls this behavior:

ParameterDefaultDescription
fetch_batch_size2000Items per batch during snapshot transfer
snapshot_ttl10sHow long a snapshot stays valid after last access
snapshot_request_timeout15sTimeout waiting for a SnapshotReady response
fetch_timeout5sTimeout per FetchDataResponse

Important: Different SyncConfig values produce different group signatures. Collections using different configs will not see each other.

use mosaik::collections::{Map, StoreId, SyncConfig};
use std::time::Duration;

let config = SyncConfig::default()
    .with_fetch_batch_size(5000)
    .with_snapshot_ttl(Duration::from_secs(30));

let map = Map::<String, u64>::writer_with_config(&network, store_id, config);

How snapshot sync works

  1. A lagging follower sends a RequestSnapshot to the leader.
  2. The leader wraps the request as a special command and replicates it.
  3. When committed, all peers create a snapshot at the same log position.
  4. The follower fetches snapshot data in batches from available peers.
  5. Once complete, the follower installs the snapshot and replays any buffered commands received during sync.

Because im data structures support O(1) cloning, creating a snapshot is nearly instant regardless of collection size.

Deterministic hashing

Map and Set use BuildHasherDefault<DefaultHasher> (SipHash-1-3 with a fixed zero seed) for their internal im::HashMap / im::HashSet. This ensures that iteration order is identical across all nodes for the same logical state — a requirement for snapshot sync to produce consistent chunked transfers.

Map

Map<K, V> is a replicated, unordered, eventually consistent key-value map. Internally it is backed by an im::HashMap with a deterministic hasher.

Construction

use mosaik::collections::{Map, StoreId, SyncConfig};

// Writer — can read and write
let map = Map::<String, u64>::writer(&network, StoreId::from("balances"));

// Writer with custom sync config
let map = Map::<String, u64>::writer_with_config(&network, store_id, config);

// Reader — read-only, deprioritized for leadership
let map = Map::<String, u64>::reader(&network, store_id);

// Reader with custom sync config
let map = Map::<String, u64>::reader_with_config(&network, store_id, config);

// Aliases: new() == writer(), new_with_config() == writer_with_config()
let map = Map::<String, u64>::new(&network, store_id);

Read operations

Available on both writers and readers. All reads operate on the local committed state — they are non-blocking and never touch the network.

MethodTimeDescription
len() -> usizeO(1)Number of entries
is_empty() -> boolO(1)Whether the map is empty
contains_key(&K) -> boolO(log n)Test if a key exists
get(&K) -> Option<V>O(log n)Get a clone of the value for a key
iter() -> impl Iterator<Item = (K, V)>O(1)*Iterate over all key-value pairs
keys() -> impl Iterator<Item = K>O(1)*Iterate over all keys
values() -> impl Iterator<Item = V>O(1)*Iterate over all values
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer

* Iterator creation is O(1) due to im::HashMap’s structural sharing; full iteration is O(n).

// Read the current state
if let Some(balance) = map.get(&"alice".into()) {
    println!("Alice's balance: {balance}");
}

// Snapshot iteration — takes a structural clone, then iterates
for (key, value) in map.iter() {
    println!("{key}: {value}");
}

Write operations

Only available on MapWriter<K, V>. All writes go through Raft consensus and return the Version at which the mutation will be committed.

MethodDescription
insert(K, V) -> Result<Version, Error<(K, V)>>Insert or update a key-value pair
compare_exchange(K, Option<V>, Option<V>) -> Result<Version, Error<(K, Option<V>, Option<V>)>>Atomic compare-and-swap for a key
remove(&K) -> Result<Version, Error<K>>Remove a key
extend(impl IntoIterator<Item = (K, V)>) -> Result<Version, Error<Vec<(K, V)>>>Batch insert
clear() -> Result<Version, Error<()>>Remove all entries
// Insert a single entry
let v = map.insert("ETH".into(), 3812).await?;

// Batch insert
let v = map.extend([
    ("BTC".into(), 105_000),
    ("SOL".into(), 178),
]).await?;

// Wait for the batch to commit
map.when().reaches(v).await;

// Remove
map.remove(&"SOL".into()).await?;

// Atomic compare-and-swap: update only if current value matches
let v = map.compare_exchange(
    "ETH".into(),
    Some(3812),   // expected current value
    Some(3900),   // new value
).await?;
map.when().reaches(v).await;

// Compare-and-swap to insert a new key (expected = None)
let v = map.compare_exchange(
    "DOGE".into(),
    None,          // key must not exist
    Some(42),      // value to insert
).await?;

// Compare-and-swap to delete (new = None)
let v = map.compare_exchange(
    "DOGE".into(),
    Some(42),      // expected current value
    None,          // remove the key
).await?;

// Clear everything
map.clear().await?;

Compare-and-swap semantics

compare_exchange atomically checks the value associated with a key and only applies the mutation if it matches the expected parameter:

  • key: The key to operate on.
  • expected: The expected current value (None means the key must not exist).
  • new: The value to write if the expectation holds (None removes the key).
expectednewEffect when matched
NoneSome(v)Insert a new key-value pair
Some(old)Some(new)Update an existing value
Some(old)NoneRemove the key
NoneNoneNo-op (key must not exist)

If the current value does not match expected, the operation is a no-op — it still commits to the Raft log (incrementing the version) but does not mutate the map.

Error handling

On failure, the values you attempted to write are returned inside the error so you can retry without re-creating them:

match map.insert("key".into(), expensive_value).await {
    Ok(version) => println!("committed at {version}"),
    Err(Error::Offline((key, value))) => {
        // Node is temporarily offline — retry later
        println!("offline, got {key} and {value} back");
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

// Wait until the map is online
map.when().online().await;

// Wait for a specific version
let v = map.insert("x".into(), 1).await?;
map.when().reaches(v).await;

// Wait for any new commit
map.when().updated().await;

// Detect going offline
map.when().offline().await;

Group identity

The group key for a Map<K, V> is derived from:

UniqueId::from("mosaik_collections_map")
    .derive(store_id)
    .derive(type_name::<K>())
    .derive(type_name::<V>())

Two maps with the same StoreId but different key or value types will be in completely separate consensus groups.

Vec

Vec<T> is a replicated, ordered, index-addressable sequence. Internally it is backed by im::Vector<T>, which provides efficient push/pop at both ends and O(log n) random access.

Construction

use mosaik::collections::{Vec, StoreId, SyncConfig};

// Writer — can read and write
let vec = Vec::<String>::writer(&network, StoreId::from("events"));

// Writer with custom sync config
let vec = Vec::<String>::writer_with_config(&network, store_id, config);

// Reader — read-only, deprioritized for leadership
let vec = Vec::<String>::reader(&network, store_id);

// Reader with custom sync config
let vec = Vec::<String>::reader_with_config(&network, store_id, config);

// Aliases: new() == writer(), new_with_config() == writer_with_config()
let vec = Vec::<String>::new(&network, store_id);

Read operations

Available on both writers and readers. Reads operate on the local committed state and never touch the network.

MethodTimeDescription
len() -> usizeO(1)Number of elements
is_empty() -> boolO(1)Whether the vector is empty
get(u64) -> Option<T>O(log n)Get element at index
front() -> Option<T>O(log n)First element
head() -> Option<T>O(log n)Alias for front()
back() -> Option<T>O(log n)Last element
last() -> Option<T>O(log n)Alias for back()
contains(&T) -> boolO(n)Test if a value is in the vector
index_of(&T) -> Option<u64>O(n)Find the index of a value
iter() -> impl Iterator<Item = T>O(1)*Iterate over all elements
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer

* Iterator creation is O(1) due to structural sharing; full traversal is O(n).

// Random access
if let Some(event) = vec.get(0) {
    println!("First event: {event}");
}

// Peek at ends
let newest = vec.back();
let oldest = vec.front();

// Linear search
if let Some(idx) = vec.index_of(&"restart".into()) {
    println!("Found restart at index {idx}");
}

Write operations

Only available on VecWriter<T>. All writes go through Raft consensus.

MethodTimeDescription
push_back(T) -> Result<Version, Error<T>>O(1)*Append to end
push_front(T) -> Result<Version, Error<T>>O(1)*Prepend to start
insert(u64, T) -> Result<Version, Error<T>>O(log n)Insert at index, shifting elements right
extend(impl IntoIterator<Item = T>) -> Result<Version, Error<Vec<T>>>O(k)*Batch append
pop_back() -> Result<Version, Error<()>>O(1)*Remove last element
pop_front() -> Result<Version, Error<()>>O(1)*Remove first element
remove(u64) -> Result<Version, Error<u64>>O(log n)Remove at index, shifting elements left
swap(u64, u64) -> Result<Version, Error<()>>O(log n)Swap two elements
truncate(usize) -> Result<Version, Error<()>>O(log n)Keep only the first n elements
clear() -> Result<Version, Error<()>>O(1)Remove all elements

* Amortized time complexity.

// Append
let v = vec.push_back("event-1".into()).await?;

// Prepend
vec.push_front("event-0".into()).await?;

// Batch append
let v = vec.extend(["a".into(), "b".into(), "c".into()]).await?;

// Random insert
vec.insert(2, "inserted".into()).await?;

// Remove from ends
vec.pop_back().await?;
vec.pop_front().await?;

// Remove at index
vec.remove(0).await?;

// Swap positions
vec.swap(0, 1).await?;

// Truncate and clear
vec.truncate(10).await?;
vec.clear().await?;

Error handling

Writes return the failed value on Error::Offline so you can retry:

match vec.push_back(item).await {
    Ok(version) => {
        vec.when().reaches(version).await;
    }
    Err(Error::Offline(item)) => {
        // Retry with the same item later
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

// Wait until online
vec.when().online().await;

// Wait for a specific committed version
let v = vec.push_back("x".into()).await?;
vec.when().reaches(v).await;

// Wait for any update
vec.when().updated().await;

// React to going offline
vec.when().offline().await;

Group identity

The group key for a Vec<T> is derived from:

UniqueId::from("mosaik_collections_vec")
    .derive(store_id)
    .derive(type_name::<T>())

Two vectors with the same StoreId but different element types will be in separate consensus groups.

Set

Set<T> is a replicated, unordered, eventually consistent set of unique values. Internally it is backed by an im::HashSet with a deterministic hasher (SipHash-1-3 with a fixed zero seed), so iteration order is identical across all nodes.

Construction

use mosaik::collections::{Set, StoreId, SyncConfig};

// Writer — can read and write
let set = Set::<String>::writer(&network, StoreId::from("active-peers"));

// Writer with custom sync config
let set = Set::<String>::writer_with_config(&network, store_id, config);

// Reader — read-only, deprioritized for leadership
let set = Set::<String>::reader(&network, store_id);

// Reader with custom sync config
let set = Set::<String>::reader_with_config(&network, store_id, config);

// Aliases: new() == writer(), new_with_config() == writer_with_config()
let set = Set::<String>::new(&network, store_id);

Read operations

Available on both writers and readers.

MethodTimeDescription
len() -> usizeO(1)Number of elements
is_empty() -> boolO(1)Whether the set is empty
contains(&T) -> boolO(log n)Test membership
is_subset(&Set<T, W>) -> boolO(n)Test subset relationship
iter() -> impl Iterator<Item = T>O(1)*Iterate over all elements
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer

* Iterator creation is O(1); full traversal is O(n).

// Membership test
if set.contains(&"node-42".into()) {
    println!("node-42 is active");
}

// Subset check between two sets (can differ in writer/reader mode)
if allowed.is_subset(&active) {
    println!("all allowed nodes are active");
}

// Iteration
for peer in set.iter() {
    println!("active: {peer}");
}

Write operations

Only available on SetWriter<T>.

MethodDescription
insert(T) -> Result<Version, Error<T>>Insert a value (no-op if already present)
extend(impl IntoIterator<Item = T>) -> Result<Version, Error<Vec<T>>>Batch insert
remove(&T) -> Result<Version, Error<T>>Remove a value
clear() -> Result<Version, Error<()>>Remove all elements
// Insert
let v = set.insert("node-1".into()).await?;

// Batch insert
let v = set.extend(["node-2".into(), "node-3".into()]).await?;
set.when().reaches(v).await;

// Remove
set.remove(&"node-1".into()).await?;

// Clear
set.clear().await?;

Error handling

Writes return the failed value on Error::Offline:

match set.insert(value).await {
    Ok(version) => { /* committed */ }
    Err(Error::Offline(value)) => {
        // Retry later with the same value
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

set.when().online().await;

let v = set.insert("x".into()).await?;
set.when().reaches(v).await;

set.when().updated().await;
set.when().offline().await;

Group identity

UniqueId::from("mosaik_collections_set")
    .derive(store_id)
    .derive(type_name::<T>())

Register

Register<T> is a replicated single-value register. It holds at most one value at a time — writing a new value replaces the previous one entirely. This is the distributed equivalent of a tokio::sync::watch channel: all nodes observe the latest value.

Internally the state is simply an Option<T>, making the Register the simplest collection in the mosaik toolkit.

Construction

use mosaik::collections::{Register, StoreId, SyncConfig};

// Writer — can read and write
let reg = Register::<String>::writer(&network, StoreId::from("config"));

// Writer with custom sync config
let reg = Register::<String>::writer_with_config(&network, store_id, config);

// Reader — read-only, deprioritized for leadership
let reg = Register::<String>::reader(&network, store_id);

// Reader with custom sync config
let reg = Register::<String>::reader_with_config(&network, store_id, config);

// Aliases: new() == writer(), new_with_config() == writer_with_config()
let reg = Register::<String>::new(&network, store_id);

Read operations

Available on both writers and readers. Reads operate on the local committed state and never touch the network.

MethodTimeDescription
read() -> Option<T>O(1)Get the current value
get() -> Option<T>O(1)Alias for read()
is_empty() -> boolO(1)Whether the register holds a value
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer
// Read the current value
if let Some(config) = reg.read() {
    println!("Current config: {config}");
}

// Check if a value has been written
if reg.is_empty() {
    println!("No configuration set yet");
}

Write operations

Only available on RegisterWriter<T>.

MethodTimeDescription
write(T) -> Result<Version, Error<T>>O(1)Write a value (replaces any existing value)
set(T) -> Result<Version, Error<T>>O(1)Alias for write()
compare_exchange(Option<T>, Option<T>) -> Result<Version, Error<(Option<T>, Option<T>)>>O(1)Atomic compare-and-swap
clear() -> Result<Version, Error<()>>O(1)Remove the stored value
// Write a value
let v = reg.write("v1".into()).await?;

// Overwrite (replaces previous value)
let v = reg.write("v2".into()).await?;
reg.when().reaches(v).await;

// Using the alias
reg.set("v3".into()).await?;

// Atomic compare-and-swap: only writes if current value matches expected
let v = reg.compare_exchange(Some("v3".into()), Some("v4".into())).await?;
reg.when().reaches(v).await;
assert_eq!(reg.read(), Some("v4".to_string()));

// Compare-and-swap from empty to a value
reg.clear().await?;
let v = reg.compare_exchange(None, Some("first".into())).await?;
reg.when().reaches(v).await;

// Compare-and-swap to remove (set to None)
let v = reg.compare_exchange(Some("first".into()), None).await?;
reg.when().reaches(v).await;
assert!(reg.is_empty());

// Clear back to empty
reg.clear().await?;
assert!(reg.is_empty());

Compare-and-swap semantics

compare_exchange atomically checks the current value of the register and only applies the write if it matches the current parameter. This is useful for optimistic concurrency control — multiple writers can attempt a swap, and only the one whose expectation matches the actual state will succeed.

  • current: The expected current value (None means the register must be empty).
  • new: The value to write if the expectation holds (None clears the register).

If the current value does not match current, the operation is a no-op — it still commits to the Raft log (incrementing the version) but does not change the stored value.

Error handling

Writes return the failed value on Error::Offline so you can retry:

match reg.write(value).await {
    Ok(version) => {
        reg.when().reaches(version).await;
    }
    Err(Error::Offline(value)) => {
        // Retry with the same value later
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

// Wait until online
reg.when().online().await;

// Wait for a specific committed version
let v = reg.write("new-config".into()).await?;
reg.when().reaches(v).await;

// Wait for any update
reg.when().updated().await;

// React to going offline
reg.when().offline().await;

Group identity

The group key for a Register<T> is derived from:

UniqueId::from("mosaik_collections_register")
    .derive(store_id)
    .derive(type_name::<T>())

Two registers with the same StoreId but different value types will be in separate consensus groups.

When to use Register

Register is ideal for:

  • Configuration — a single shared config object replicated across the cluster
  • Leader state — the current leader’s address or identity
  • Latest snapshot — the most recent version of a computed result
  • Feature flags — a single boolean or enum toggled cluster-wide

If you need to store multiple values, use Map, Vec, or Set instead.

Once

Once<T> is a replicated write-once register. It holds at most one value — once a value has been set, all subsequent writes are silently ignored by the state machine. This is the distributed equivalent of a tokio::sync::OnceCell.

Unlike Register<T>, which allows overwriting the stored value, Once<T> guarantees that the first accepted write is permanent. This makes it ideal for one-time initialization patterns where you need exactly one value to win across a distributed cluster.

Internally the state is an Option<T>, identical to Register, but the apply logic rejects writes when the value is already Some.

Construction

use mosaik::collections::{Once, StoreId, SyncConfig};

// Writer — can read and write
let once = Once::<String>::writer(&network, StoreId::from("config-seed"));

// Writer with custom sync config
let once = Once::<String>::writer_with_config(&network, store_id, config);

// Reader — read-only, deprioritized for leadership
let once = Once::<String>::reader(&network, store_id);

// Reader with custom sync config
let once = Once::<String>::reader_with_config(&network, store_id, config);

// Aliases: new() == writer(), new_with_config() == writer_with_config()
let once = Once::<String>::new(&network, store_id);

Read operations

Available on both writers and readers. Reads operate on the local committed state and never touch the network.

MethodTimeDescription
read() -> Option<T>O(1)Get the current value
get() -> Option<T>O(1)Alias for read()
is_empty() -> boolO(1)Whether the register has been set
is_none() -> boolO(1)Alias for is_empty()
is_some() -> boolO(1)Whether the register holds a value
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer
// Read the current value
if let Some(seed) = once.read() {
    println!("Config seed: {seed}");
}

// Check if a value has been written
if once.is_none() {
    println!("No seed set yet");
}

// Equivalent check
assert_eq!(once.is_empty(), once.is_none());
assert_eq!(once.is_some(), !once.is_empty());

Write operations

Only available on OnceWriter<T>.

MethodTimeDescription
write(T) -> Result<Version, Error<T>>O(1)Set the value (ignored if already set)
set(T) -> Result<Version, Error<T>>O(1)Alias for write()

The key behavior: only the first write takes effect. If a value has already been set, subsequent writes are silently ignored by the state machine. The returned Version still advances (the command is committed to the Raft log), but the stored value does not change.

// First write — this value is stored permanently
let v = once.write("genesis".into()).await?;
once.when().reaches(v).await;
assert_eq!(once.read(), Some("genesis".into()));

// Second write — silently ignored
let v2 = once.write("overwrite-attempt".into()).await?;
once.when().reaches(v2).await;
assert_eq!(once.read(), Some("genesis".into())); // still "genesis"

// Using the alias
once.set("another-attempt".into()).await?;
assert_eq!(once.read(), Some("genesis".into())); // unchanged

Error handling

Writes return the failed value on Error::Offline so you can retry:

match once.write(value).await {
    Ok(version) => {
        once.when().reaches(version).await;
    }
    Err(Error::Offline(value)) => {
        // Retry with the same value later
    }
    Err(Error::Encoding(value, err)) => {
        // Serialization failed
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

// Wait until online
once.when().online().await;

// Wait for a specific committed version
let v = once.write("init".into()).await?;
once.when().reaches(v).await;

// Wait for any update
once.when().updated().await;

// React to going offline
once.when().offline().await;

Group identity

The group key for a Once<T> is derived from:

UniqueId::from("mosaik_collections_once")
    .derive(store_id)
    .derive(type_name::<T>())

A Once<T> and a Register<T> with the same StoreId and type parameter will be in separate consensus groups because the prefix differs ("mosaik_collections_once" vs "mosaik_collections_register").

Once vs Register

BehaviorOnce<T>Register<T>
Write semanticsFirst write wins; subsequent ignoredEvery write replaces the stored value
clear()Not supportedSupported
compare_exchangeNot supportedSupported
Distributed analogtokio::sync::OnceCelltokio::sync::watch
State machine sig"mosaik_collections_once""mosaik_collections_register"

When to use Once

Once is ideal for:

  • One-time initialization — a genesis config, cluster seed, or bootstrap value that should only be set once
  • Distributed elections — the first node to write a value claims a slot
  • Immutable assignments — assigning a resource or identity permanently
  • Configuration anchors — values that must not change after initial deployment

If you need to update the value after setting it, use Register instead.

PriorityQueue

PriorityQueue<P, K, V> is a replicated, eventually consistent double-ended priority queue (DEPQ). Each entry has a priority (P), a unique key (K), and a value (V). It supports efficient access to both the minimum and maximum priority elements, key-based lookups, priority updates, and range removals.

Internally it maintains two indexes:

  • by_key: im::HashMap<K, (P, V)> — O(log n) key lookups
  • by_priority: im::OrdMap<P, im::HashMap<K, V>> — O(log n) min/max and range operations

Both indexes use deterministic hashers for consistent iteration order across nodes.

Construction

use mosaik::collections::{PriorityQueue, StoreId, SyncConfig};

// Writer — can read and write
let pq = PriorityQueue::<u64, String, Order>::writer(
    &network,
    StoreId::from("orderbook"),
);

// Writer with custom sync config
let pq = PriorityQueue::<u64, String, Order>::writer_with_config(
    &network, store_id, config,
);

// Reader — read-only, deprioritized for leadership
let pq = PriorityQueue::<u64, String, Order>::reader(&network, store_id);

// Reader with custom sync config
let pq = PriorityQueue::<u64, String, Order>::reader_with_config(
    &network, store_id, config,
);

// Aliases
let pq = PriorityQueue::<u64, String, Order>::new(&network, store_id);
let pq = PriorityQueue::<u64, String, Order>::new_with_config(
    &network, store_id, config,
);

Read operations

Available on both writers and readers.

MethodTimeDescription
len() -> usizeO(1)Number of entries
is_empty() -> boolO(1)Whether the queue is empty
contains_key(&K) -> boolO(log n)Test if a key exists
get(&K) -> Option<V>O(log n)Get the value for a key
get_priority(&K) -> Option<P>O(log n)Get the priority for a key
get_min() -> Option<(P, K, V)>O(log n)Entry with the lowest priority
get_max() -> Option<(P, K, V)>O(log n)Entry with the highest priority
min_priority() -> Option<P>O(log n)Lowest priority value
max_priority() -> Option<P>O(log n)Highest priority value
iter() -> impl Iterator<Item = (P, K, V)>Ascending priority order (alias for iter_asc)
iter_asc() -> impl Iterator<Item = (P, K, V)>Ascending priority order
iter_desc() -> impl Iterator<Item = (P, K, V)>Descending priority order
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer

When multiple entries share the same priority, get_min() and get_max() return an arbitrary entry from that priority bucket.

// Peek at extremes
if let Some((priority, key, value)) = pq.get_min() {
    println!("Best bid: {key} at priority {priority}");
}

// Look up by key
let price = pq.get_priority(&"order-42".into());

// Iterate in order
for (priority, key, value) in pq.iter_desc() {
    println!("{priority}: {key} = {value:?}");
}

Write operations

Only available on PriorityQueueWriter<P, K, V>.

MethodDescription
insert(P, K, V) -> Result<Version, Error<(P, K, V)>>Insert or update an entry
extend(impl IntoIterator<Item = (P, K, V)>) -> Result<Version, Error<Vec<(P, K, V)>>>Batch insert
update_priority(&K, P) -> Result<Version, Error<K>>Change priority of an existing key
update_value(&K, V) -> Result<Version, Error<K>>Change value of an existing key
compare_exchange_value(&K, V, Option<V>) -> Result<Version, Error<K>>Atomic compare-and-swap on value
remove(&K) -> Result<Version, Error<K>>Remove by key
remove_range(impl RangeBounds<P>) -> Result<Version, Error<()>>Remove all entries in a priority range
clear() -> Result<Version, Error<()>>Remove all entries

If insert is called with a key that already exists, both its priority and value are updated. update_priority and update_value are no-ops if the key doesn’t exist (they still commit to the log).

// Insert
let v = pq.insert(100, "order-1".into(), order).await?;

// Batch insert
let v = pq.extend([
    (100, "order-1".into(), order1),
    (200, "order-2".into(), order2),
]).await?;

// Update just the priority
pq.update_priority(&"order-1".into(), 150).await?;

// Update just the value
pq.update_value(&"order-1".into(), new_order).await?;

// Atomic compare-and-swap on value (priority is preserved)
let v = pq.compare_exchange_value(
    &"order-1".into(),
    order1,        // expected current value
    Some(updated), // new value
).await?;

// Compare-and-swap to remove: expected matches, new is None
let v = pq.compare_exchange_value(
    &"order-1".into(),
    updated,       // expected current value
    None,          // removes the entry
).await?;

// Remove a single entry
pq.remove(&"order-2".into()).await?;

// Remove all entries with priority below 50
pq.remove_range(..50u64).await?;

// Remove entries in a range
pq.remove_range(10..=20).await?;

// Clear everything
pq.clear().await?;

Range syntax

remove_range accepts any RangeBounds<P>, so all standard Rust range syntaxes work:

SyntaxMeaning
..cutoffPriorities below cutoff
..=cutoffPriorities at or below cutoff
cutoff..Priorities at or above cutoff
lo..hiPriorities in [lo, hi)
lo..=hiPriorities in [lo, hi]
..All (equivalent to clear())

Compare-and-swap semantics

compare_exchange_value atomically checks the value of an existing entry and only applies the mutation if it matches the expected parameter. Unlike compare_exchange on Map and Register, this method operates only on the value — the entry’s priority is always preserved.

  • key: The key of the entry to operate on (must already exist).
  • expected: The expected current value (type V, not Option<V> — the key must exist for the exchange to succeed).
  • new: The replacement value. Some(v) updates the value in-place; None removes the entire entry.

If the key does not exist or its current value does not match expected, the operation is a no-op — it commits to the Raft log but does not change the queue.

Note: The entry’s priority is never changed by compare_exchange_value. To atomically update priorities, use update_priority instead.

Error handling

Same pattern as other collections — failed values are returned for retry:

match pq.insert(priority, key, value).await {
    Ok(version) => { /* committed */ }
    Err(Error::Offline((priority, key, value))) => {
        // Retry later
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

pq.when().online().await;

let v = pq.insert(100, "k".into(), val).await?;
pq.when().reaches(v).await;

pq.when().updated().await;
pq.when().offline().await;

Dual-index architecture

The DEPQ maintains two synchronized indexes:

by_key:      HashMap<K, (P, V)>     ← key lookups, membership tests
by_priority: OrdMap<P, HashMap<K, V>> ← min/max, range ops, ordered iteration

When a key is inserted or updated, both indexes are updated atomically within the state machine’s apply_batch. During snapshot sync, only the by_key index is serialized; the by_priority index is reconstructed on the receiving side during append.

Group identity

UniqueId::from("mosaik_collections_depq")
    .derive(store_id)
    .derive(type_name::<P>())
    .derive(type_name::<K>())
    .derive(type_name::<V>())

Writer/Reader Pattern

All mosaik collections share a common access-control pattern: every collection type is parameterized by a const-generic boolean (IS_WRITER) that determines whether the instance has write access.

                 ┌──────────────────┐
                 │  Collection<T>   │
                 │  const IS_WRITER │
                 └────────┬─────────┘
                          │
              ┌───────────┴───────────┐
              │                       │
       IS_WRITER = true        IS_WRITER = false
       ┌──────────────┐       ┌──────────────┐
       │    Writer    │       │    Reader    │
       │ read + write │       │  read-only   │
       │ normal leader│       │ deprioritized│
       │   priority   │       │   leader     │
       └──────────────┘       └──────────────┘

Type aliases

Each collection provides convenient type aliases:

CollectionWriter typeReader type
Map<K, V>MapWriter<K, V>MapReader<K, V>
Vec<T>VecWriter<T>VecReader<T>
Set<T>SetWriter<T>SetReader<T>
Register<T>RegisterWriter<T>RegisterReader<T>
Once<T>OnceWriter<T>OnceReader<T>
PriorityQueue<P, K, V>PriorityQueueWriter<P, K, V>PriorityQueueReader<P, K, V>

Construction

Every collection offers the same set of constructors:

// Writer (default)
Collection::writer(&network, store_id)
Collection::writer_with_config(&network, store_id, sync_config)

// Convenience aliases for writer
Collection::new(&network, store_id)
Collection::new_with_config(&network, store_id, sync_config)

// Reader
Collection::reader(&network, store_id)
Collection::reader_with_config(&network, store_id, sync_config)

The StoreId and SyncConfig must match between writers and readers for them to join the same consensus group.

How it works

Internally, the const-generic boolean controls two things:

  1. Method availability — Write methods (insert, push_back, remove, etc.) are only implemented for IS_WRITER = true. This is enforced at compile time.

  2. Leadership priority — Readers return a ConsensusConfig with deprioritize_leadership(), which increases their election timeout. This makes it less likely for a reader to become the Raft leader, keeping leadership on writer nodes where write operations are handled directly rather than being forwarded.

// Inside every collection's StateMachine impl:
fn consensus_config(&self) -> Option<ConsensusConfig> {
    (!self.is_writer)
        .then(|| ConsensusConfig::default().deprioritize_leadership())
}

Shared read API

Both writers and readers have identical read access. The read methods are implemented on Collection<T, IS_WRITER> without constraining IS_WRITER:

// Works on both MapWriter and MapReader
impl<K: Key, V: Value, const IS_WRITER: bool> Map<K, V, IS_WRITER> {
    pub fn len(&self) -> usize { ... }
    pub fn get(&self, key: &K) -> Option<V> { ... }
    pub fn iter(&self) -> impl Iterator<Item = (K, V)> { ... }
    pub fn when(&self) -> &When { ... }
    pub fn version(&self) -> Version { ... }
}

Trait requirements

The type parameters for collection elements must satisfy blanket-implemented trait bounds:

Value

Required for all element and value types:

pub trait Value:
    Clone + Debug + Serialize + DeserializeOwned
    + Hash + PartialEq + Eq + Send + Sync + 'static
{}

// Blanket impl — any conforming type is a Value
impl<T> Value for T where T: Clone + Debug + Serialize + ... {}

Key

Required for map keys, set elements, and priority queue keys:

pub trait Key:
    Clone + Serialize + DeserializeOwned
    + Hash + PartialEq + Eq + Send + Sync + 'static
{}

Note that Key does not require Debug (unlike Value).

OrderedKey

Required for priority queue priorities:

pub trait OrderedKey: Key + Ord {}
impl<T: Key + Ord> OrderedKey for T {}

Version

All write operations return Version, which wraps the Raft log Index where the mutation will be committed. Use it with the When API to synchronize:

let version = map.insert("key".into(), value).await?;
map.when().reaches(version).await;
// Now the insert is guaranteed to be committed

Version implements Deref<Target = Index>, PartialOrd, Ord, Display, and Copy.

When (collections)

The collections When is a thin wrapper around the groups When that exposes collection-relevant observers:

MethodDescription
online()Resolves when the collection has joined and synced with the group
offline()Resolves when the collection loses sync or leadership
updated()Resolves when any new state version is committed
reaches(Version)Resolves when committed state reaches at least the given version
// Typical lifecycle pattern
loop {
    collection.when().online().await;
    println!("online, version = {}", collection.version());

    // ... do work ...

    collection.when().offline().await;
    println!("went offline, waiting to reconnect...");
}

Multiple writers

Multiple nodes can be writers for the same collection simultaneously. All writes are funneled through Raft consensus, so there is no conflict — every write is serialized in the log and applied in the same order on all nodes.

// Node A
let map = Map::<String, u64>::writer(&network, store_id);
map.insert("from-a".into(), 1).await?;

// Node B (same StoreId)
let map = Map::<String, u64>::writer(&network, store_id);
map.insert("from-b".into(), 2).await?;

// Both nodes see both entries after sync

Choosing writer vs. reader

Use a Writer when…Use a Reader when…
The node needs to modify the collectionThe node only observes state
You want normal leadership election priorityYou want to reduce leadership overhead
The node is in the “hot path” for writesThe node is a monitoring/dashboard node

Raft Consensus

Mosaik’s groups subsystem implements a modified Raft consensus algorithm optimized for dynamic, self-organizing peer sets. This chapter covers the differences from standard Raft and explains the internal implementation.

Standard Raft recap

Raft organizes a cluster into a single leader and multiple followers. The leader accepts client commands, appends them to a replicated log, and only commits entries once a quorum (majority) of nodes has acknowledged them. If the leader fails, an election promotes a new one.

Mosaik’s modifications

1. Non-voting followers (Abstention)

In standard Raft, every follower participates in elections and log replication quorum counts. In mosaik, a follower can abstain from voting:

enum Vote {
    Granted,    // Standard yes vote
    Denied,     // Standard no vote
    Abstained,  // Mosaik-specific: "I'm too far behind to vote"
}

A follower abstains when it detects that it is lagging behind the leader’s log and cannot verify log consistency. Abstaining removes the node from the quorum denominator until it catches up. This prevents stale nodes from blocking progress while still allowing them to receive new entries and rejoin the quorum later.

2. No per-follower tracking on the leader

Standard Raft leaders maintain nextIndex[] and matchIndex[] arrays to track each follower’s log position. Mosaik’s leader does not maintain per-follower state. Instead:

  • Each AppendEntriesResponse includes the follower’s last_log_index.
  • The leader uses these responses to calculate commit progress dynamically.
  • This simplifies the leader and avoids stale state when group membership changes frequently.

3. Dynamic quorum

Because nodes can abstain, the quorum denominator changes at runtime:

effective_quorum = (voting_nodes / 2) + 1

Where voting_nodes = total_bonded_peers - abstaining_peers. This allows the cluster to make progress even when some nodes are syncing or offline, as long as a majority of the voting members agree.

4. Distributed catch-up (state sync)

When a follower falls too far behind to replay individual log entries, mosaik uses a state sync mechanism rather than the leader shipping log snapshots:

  1. The follower sends a RequestSnapshot to the leader.
  2. The leader wraps it as a command and replicates it through the log.
  3. All peers create a snapshot at the committed position of that command.
  4. The follower fetches snapshot data in batches from multiple peers in parallel, distributing the load.
  5. Once complete, the follower installs the snapshot and replays any buffered commands.

This is fundamentally different from standard Raft’s approach where only the leader sends snapshots.

5. Leadership deprioritization

Nodes can configure longer election timeouts to reduce the probability of becoming leader:

ConsensusConfig::default().deprioritize_leadership()

This is used by collection readers, which prefer to leave leadership to writer nodes.

6. Bootstrap delay

The first term (Term::zero()) adds an extra bootstrap_delay (default 3s) to the election timeout. This gives all nodes time to start, discover each other, and form bonds before the first election fires.

Roles and state transitions

          bootstrap_delay
               │
               ▼
     ┌────────────────┐     election timeout
     │    Follower     │─────────────────────┐
     │  (passive)      │                     │
     └────────┬───────┘                     │
              │ AppendEntries               │
              │ from leader                 ▼
              │                    ┌──────────────┐
              │                    │   Candidate   │
              │                    │ (requesting   │
              │                    │    votes)     │
              │                    └──────┬───────┘
              │                           │ majority
              │                           │ granted
              │                           ▼
              │                    ┌──────────────┐
              └────────────────────│    Leader     │
                 higher term       │ (active,      │
                 received          │  heartbeats)  │
                                   └──────────────┘

Each role has specific responsibilities:

RoleKey actions
FollowerRespond to AppendEntries, vote in elections, forward commands to leader, detect leader failure via election timeout
CandidateIncrement term, vote for self, send RequestVote to all peers, transition to Leader on majority or back to Follower on higher term
LeaderAccept client commands, replicate log entries, send heartbeats, calculate dynamic quorum, commit entries, respond to forwarded queries

Message types

MessageDirectionPurpose
AppendEntriesLeader → FollowersReplicate log entries / heartbeat
AppendEntriesResponseFollower → LeaderAcknowledge entries, report last log index, grant/deny/abstain
RequestVoteCandidate → AllRequest vote for election
RequestVoteResponseAll → CandidateGrant, deny, or abstain
Forward::CommandFollower → LeaderForward client commands
Forward::CommandAckLeader → FollowerReturn assigned log indices
Forward::QueryFollower → LeaderForward strong-consistency query
Forward::QueryResponseLeader → FollowerReturn query result and position
StateSync(...)Peer ↔ PeerState sync protocol messages

Election timing

Elections are controlled by ConsensusConfig:

ParameterDefaultPurpose
heartbeat_interval500msHow often the leader sends heartbeats
heartbeat_jitter150msRandom jitter subtracted from heartbeat interval
election_timeout2sBase timeout before a follower starts an election
election_timeout_jitter500msRandom jitter added to election timeout
bootstrap_delay3sExtra delay for the very first election (term 0)
max_missed_heartbeats10Bond heartbeats missed before considering peer dead

The randomized timeouts ensure that in most cases only one node transitions to candidate at a time, avoiding split votes.

Command flow

Write path (leader)

Client ──execute()──► Leader
                        │
                        ├─ append to local log
                        ├─ send AppendEntries to followers
                        │
                        │◄── AppendEntriesResponse (majority)
                        │
                        ├─ advance commit index
                        ├─ apply to state machine
                        └─ return Result to client

Write path (follower)

Client ──execute()──► Follower
                         │
                         ├─ Forward::Command to leader
                         │
                         │◄── Forward::CommandAck (assigned index)
                         │
                         │ ... wait for local commit to reach index ...
                         │
                         └─ return Result to client

Read path

  • Weak consistency: Read directly from local state machine (any role).
  • Strong consistency: Forward query to leader, which reads from its always-up-to-date state machine and returns the result with commit position.

Internal types

The implementation is split across several modules:

ModuleContents
raft/mod.rsRaft<S, M> — top-level driver, delegates to current role
raft/role.rsRole enum (Follower, Candidate, Leader), shared message handling
raft/shared.rsShared<S, M> — state shared across all roles (storage, state machine, config)
raft/leader.rsLeader-specific logic: heartbeats, replication, dynamic quorum
raft/follower.rsFollower-specific logic: elections, forwarding, catch-up
raft/candidate.rsCandidate-specific logic: vote collection, timeout
raft/protocol.rsMessage type definitions

Bonds

Bonds are mosaik’s mechanism for maintaining a fully-connected mesh of persistent, authenticated connections between all members of a group. Every pair of peers in a group maintains exactly one bond.

Lifecycle

A bond goes through three phases:

1. Establishment

When two peers discover they belong to the same group (via discovery events), one initiates a connection using the /mosaik/groups/1 ALPN protocol.

The handshake uses mutual proof-of-knowledge to verify both sides know the group key before exchanging any group-internal state:

Initiator                              Acceptor
    │                                      │
    ├── HandshakeStart ───────────────────►│
    │   · network_id                       │
    │   · group_id                         │
    │   · proof = blake3(session_secret    │
    │             ⊕ group_key)             │
    │   · known bonds list                 │
    │                                      │
    │                    verify proof       │
    │                                      │
    │◄─── HandshakeEnd ───────────────────┤
    │     · proof (acceptor's)             │
    │     · known bonds list               │
    │                                      │
    │   verify proof                       │
    │                                      │
    ▼   Bond established                   ▼

The proof is computed from the TLS session secret (unique per connection) combined with the group key. This ensures:

  • Both sides know the group key (authentication).
  • The proof cannot be replayed on a different connection (freshness).
  • No group secrets are transmitted in the clear.

If proof verification fails, the connection is dropped immediately.

2. Steady state (BondWorker)

Once established, a bond is managed by a BondWorker that:

  • Sends and receives heartbeats (Ping/Pong) on a configurable interval.
  • Relays Raft messages between peers.
  • Propagates peer entry updates when discovery information changes.
  • Announces new bonds (BondFormed) so peers learn about topology changes.
  • Handles graceful departure when a peer is shutting down.

3. Failure and reconnection

If heartbeats are missed beyond the configured threshold, the bond is considered failed and torn down. Both sides independently detect the failure. When the peer reappears (via discovery), a new bond is established from scratch with a fresh handshake.

BondMessage protocol

After the handshake, all communication over a bond uses the BondMessage enum:

VariantDirectionPurpose
PingA → BHeartbeat probe
PongB → AHeartbeat response
DepartureEitherGraceful shutdown notification
PeerEntryUpdate(PeerEntry)EitherDiscovery information changed
BondFormed(BondId, PeerId)EitherAnnounce new bond to populate topology
Raft(Bytes)EitherWrapped Raft protocol message

All messages are serialized with postcard and framed with LengthDelimitedCodec over a QUIC bidirectional stream (via Link<P>).

Heartbeat mechanism

The Heartbeat struct tracks liveness:

struct Heartbeat {
    tick: Interval,        // fires every `base - jitter` to `base`
    last_recv: Instant,    // last time we received a Ping or Pong
    missed: u64,           // how many consecutive ticks without a response
    max_missed: u64,       // threshold from ConsensusConfig (default: 10)
    alert: Notify,         // signals when threshold is exceeded
}

On each tick:

  1. Check if last_recv was recent enough.
  2. If not, increment missed.
  3. If missed ≥ max_missed, notify the alert future — the bond worker tears down the bond.

Receiving any message from the peer resets the counter:

heartbeat.reset(); // sets missed = 0 and refreshes last_recv

Timing uses jitter to prevent synchronized heartbeat storms. The actual interval for each tick is randomized between base - jitter and base.

Bond identity

Each bond is identified by a BondId:

type BondId = UniqueId;

The BondId is derived deterministically from both peer identities and the group key, so both sides compute the same identifier independently.

Topology tracking

Every group maintains a Bonds collection (accessible via Group::bonds() or When::bonds()). This is an immutable set of all currently active bonds in the group, including bonds between other peers (learned via BondFormed messages).

This topology awareness lets each peer know the full mesh state, which is used for:

  • Dynamic quorum calculations in Raft.
  • Determining when enough peers are connected to start elections.
  • Providing the topology snapshot for state sync coordination.

Connection handling

The Acceptor struct implements the ProtocolHandler trait for the /mosaik/groups/1 ALPN:

Incoming connection
       │
       ▼
  Wrap in Link<BondMessage>
       │
       ▼
  ensure_known_peer()
       │
       ▼
  wait_for_handshake()
  · receive HandshakeStart
  · verify proof
  · send HandshakeEnd
       │
       ▼
  ensure_same_network()
       │
       ▼
  Hand off to Group worker

If any step fails, the connection is dropped cleanly. The group worker then decides whether to accept the bond (it may already have one with that peer) and spawns a BondWorker if accepted.

State Synchronization

When a new peer joins a group or falls too far behind the leader’s log, it needs a snapshot of the current state rather than replaying potentially thousands of log entries. Mosaik implements a distributed state sync mechanism that spreads the load across all group members.

Why not leader-only snapshots?

In standard Raft, the leader ships snapshots to lagging followers. This has two problems:

  1. Leader bottleneck — the leader must serialize and transmit potentially large state to each lagging follower.
  2. Snapshot inconsistency — the snapshot must be taken at a consistent point, which may block the leader.

Mosaik solves both by having all peers participate in snapshot creation and distribution.

The six-step process

1. Follower             2. Leader              3. All peers
   detects lag              wraps request         create snapshot
   ────────────             as log entry          at committed
   sends                    ────────────          position
   RequestSnapshot          replicates it         ──────────────

4. Follower             5. Follower            6. Follower
   discovers which          fetches batches       installs snapshot
   peers have               from multiple         and resumes
   snapshots                peers in parallel     normal operation
   ──────────────           ─────────────────     ──────────────────

Step 1: Detect lag

A follower realizes it is behind when it receives an AppendEntries message referencing a log prefix it does not have. It sends a StateSync message to the leader requesting a snapshot.

Step 2: Replicate the request

The leader wraps the snapshot request as a special log entry and replicates it through the normal Raft consensus path. This ensures all peers see the request at the same log position.

Step 3: Create snapshots

When each peer commits the snapshot request entry, it creates a point-in-time snapshot of its state machine. Because all peers see the request at the same committed index, all snapshots are consistent — they represent the same logical state.

The im crate’s persistent data structures make snapshotting O(1) by sharing structure with the live state (copy-on-write).

Step 4: Discover snapshot sources

The follower queries peers to find out which ones have a snapshot available for the requested position. Snapshots have a TTL (default 10 seconds), so they eventually expire if not fetched.

Step 5: Parallel fetching

The follower fetches snapshot data in batches from multiple peers simultaneously:

Follower ──batch request──► Peer A  (items 0..2000)
           batch request──► Peer B  (items 2000..4000)
           batch request──► Peer C  (items 4000..6000)
                   ...

Each batch contains up to fetch_batch_size (default 2000) items. By distributing fetches across peers, the load is balanced and the follower receives data faster.

Step 6: Install and resume

Once all batches are received, the follower:

  1. Installs the snapshot as its new state machine state.
  2. Updates its committed cursor to the snapshot position.
  3. Replays any log entries received after the snapshot position.
  4. Resumes normal follower operation (including voting).

Traits

SnapshotStateMachine

State machines that support sync must implement:

trait SnapshotStateMachine: StateMachine {
    type Snapshot: Snapshot;

    fn snapshot(&self) -> Self::Snapshot;
    fn install_snapshot(&mut self, items: Vec<SnapshotItem>);
}
  • snapshot() creates a serializable snapshot of the current state.
  • install_snapshot() replaces the state with data received from peers.

Snapshot

The snapshot itself is iterable:

trait Snapshot {
    fn len(&self) -> usize;
    fn into_items(self) -> impl Iterator<Item = SnapshotItem>;
}

Each collection type implements this — for example, MapSnapshot yields key-value pairs as serialized SnapshotItem bytes.

SnapshotItem

struct SnapshotItem {
    key: Bytes,
    value: Bytes,
}

The generic key/value format allows any collection type to participate in the same sync protocol.

Configuration

State sync behavior is controlled by SyncConfig:

ParameterDefaultDescription
fetch_batch_size2000Maximum items per batch request
snapshot_ttl10sHow long a snapshot remains available
snapshot_request_timeout15sTimeout for requesting a snapshot from peers
fetch_timeout5sTimeout for each batch fetch operation

Tuning tips

  • Large state: Increase fetch_batch_size to reduce round trips, but watch memory usage.
  • Slow networks: Increase fetch_timeout and snapshot_request_timeout.
  • Fast churn: Increase snapshot_ttl if snapshots expire before followers can fetch them.

Log replay sync

For groups that use Storage (log persistence) instead of in-memory state, a LogReplaySync alternative exists. Instead of shipping snapshots, the joining peer replays log entries from a replay provider — another peer that streams its stored log entries.

This approach is simpler but only works when:

  • The log fits in available storage.
  • Replay is fast enough relative to new entries arriving.

The replay module provides ReplayProvider and ReplaySession types for this path.

Collections and state sync

All built-in collection types (Map, Vec, Set, Register, Once, PriorityQueue) implement SnapshotStateMachine. Their internal state machines produce snapshots:

MapStateMachine  ──snapshot()──► MapSnapshot (HashMap entries)
VecStateMachine  ──snapshot()──► VecSnapshot (indexed entries)
SetStateMachine  ──snapshot()──► SetSnapshot (set members)
RegStateMachine  ──snapshot()──► RegisterSnapshot (optional value)
OnceStateMachine ──snapshot()──► OnceSnapshot (optional value)
DepqStateMachine ──snapshot()──► PriorityQueueSnapshot (by_key + by_priority)

The dual-index structure of PriorityQueue is preserved across sync — both the by_key and by_priority indices are transmitted and reconstructed. Register and Once snapshots are trivial (at most one item), so sync completes in a single batch.

Wire Protocol

All peer-to-peer communication in mosaik flows over QUIC streams, using a consistent framing layer built on Link<P>. This chapter explains the protocol stack from the transport up to application messages.

Protocol stack

┌─────────────────────────────────┐
│  Application Messages           │  BondMessage, Datum, CatalogSync, ...
├─────────────────────────────────┤
│  postcard serialization         │  compact binary encoding (no_std, varint)
├─────────────────────────────────┤
│  LengthDelimitedCodec framing   │  4-byte big-endian length prefix
├─────────────────────────────────┤
│  QUIC bidirectional stream      │  SendStream + RecvStream
├─────────────────────────────────┤
│  iroh / quinn transport         │  QUIC with TLS 1.3, hole-punching
└─────────────────────────────────┘

Link<P>

The Link<P> type is the core abstraction for typed, bidirectional communication over a QUIC stream:

struct Link<P: Protocol> {
    connection: Connection,
    cancel: CancellationToken,
    writer: FramedWrite<SendStream, LengthDelimitedCodec>,
    reader: FramedRead<RecvStream, LengthDelimitedCodec>,
}

The Protocol trait

Every protocol declares its ALPN (Application-Layer Protocol Negotiation) identifier:

trait Protocol: Serialize + DeserializeOwned + Send + 'static {
    const ALPN: &'static [u8];
}

ALPN identifiers are exchanged during the TLS handshake, so peers agree on the protocol before any application data flows. Mosaik uses these ALPNs:

ALPNProtocol typeSubsystem
/mosaik/announceAnnounceMessageDiscovery (gossip)
/mosaik/catalog-syncCatalogSyncDiscovery (catalog)
/mosaik/streams/1.0Datum implStreams
/mosaik/groups/1BondMessageGroups (bonds)

Links are created by either connecting to a peer or accepting an incoming connection:

// Outgoing
let link = Link::<BondMessage>::connect(&endpoint, node_addr).await?;

// Incoming (in a ProtocolHandler)
let link = Link::<BondMessage>::accept(connecting).await?;

Sending and receiving

// Send a message (serialized with postcard, length-prefixed)
link.send(BondMessage::Ping).await?;

// Receive a message (read length prefix, deserialize with postcard)
let msg: BondMessage = link.recv().await?;

Under the hood:

  1. send() serializes the message with postcard::to_allocvec().
  2. The resulting bytes are written through FramedWrite which prepends a 4-byte big-endian length prefix.
  3. recv() reads the length prefix from FramedRead, reads exactly that many bytes, and deserializes with postcard::from_bytes().

For concurrent send/receive, a link can be split:

let (writer, reader) = link.split();

// In one task:
writer.send(msg).await?;

// In another task:
let msg = reader.recv().await?;

// Rejoin if needed:
let link = Link::join(writer, reader);

Cancellation

Every link carries a CancellationToken. When cancelled, both send and receive operations return immediately. This is used for graceful shutdown:

link.cancel(); // signals both sides to stop

Wire format

postcard encoding

Postcard is a #[no_std]-compatible binary serialization format based on variable-length integers (varints). It produces very compact output:

  • u8 → 1 byte
  • Small u32 → 1 byte (varint)
  • Enum variant → 1 byte discriminant + payload
  • Strings → varint length + UTF-8 bytes
  • Vec<T> → varint length + elements

This keeps message sizes minimal, which matters for high-frequency heartbeats and Raft messages.

Framing

Each message on the wire looks like:

┌──────────────┬───────────────────────────────┐
│ Length (4B)   │ postcard-encoded payload       │
│ big-endian    │                               │
└──────────────┴───────────────────────────────┘

The LengthDelimitedCodec from the tokio-util crate handles this automatically. It supports messages up to 2³² - 1 bytes (4 GiB), though in practice mosaik messages are typically under a few kilobytes.

QUIC transport

Mosaik uses iroh (built on quinn) for QUIC transport. Key features:

FeatureBenefit
TLS 1.3All connections encrypted, session secrets used for bond proofs
Multiplexed streamsMultiple logical channels over one connection
NAT traversalBuilt-in hole-punching and relay fallback
Connection migrationConnections survive IP changes
mDNS discoveryAutomatic peer discovery on local networks

Bidirectional streams

Each Link<P> uses a single QUIC bidirectional stream. This means:

  • One stream per bond connection.
  • One stream per catalog sync session.
  • One stream per producer-consumer pair.

QUIC’s multiplexing means these streams don’t interfere with each other even when sharing the same underlying UDP connection.

Error handling

Link operations return std::io::Error. Common failure modes:

ErrorCauseRecovery
Connection closedPeer shut down or network failureReconnect via discovery
Deserialization errorProtocol version mismatch or corruptionDrop connection
TimeoutPeer unresponsiveHeartbeat detection → reconnect
CancelledLocal shutdownGraceful cleanup

Deterministic Hashing

In a distributed replicated system, all peers must arrive at the exact same state after applying the same sequence of operations. Mosaik’s collections use deterministic hashing to guarantee consistent behavior across nodes.

The problem

Rust’s default HashMap and HashSet use RandomState — a hasher seeded with random data at process startup. This means:

  • Iteration order differs between processes.
  • Two nodes applying the same inserts will have different internal layouts.
  • Snapshots of hash-based structures would differ even with identical contents.

For replicated state machines, this is unacceptable.

The solution

Mosaik’s Map and Set collections use a zero-seeded deterministic hasher:

type DeterministicHasher = BuildHasherDefault<DefaultHasher>;

BuildHasherDefault<DefaultHasher> constructs a DefaultHasher (SipHash) with a fixed zero seed on every call. This ensures:

  1. Same input → same hash across all nodes and restarts.
  2. Same insertion order → same internal layout in the hash table.
  3. Snapshots are byte-identical when state is identical.

Where it’s used

Collections

All hash-based collections use the deterministic hasher internally:

  • Map<K, V> uses im::HashMap<K, V, DeterministicHasher>.
  • Set<V> uses im::HashSet<V, DeterministicHasher>.

The im crate’s persistent hash structures accept a custom hasher parameter, which mosaik sets to the zero-seeded variant.

Ordered collections

Vec and PriorityQueue do not use hashing for their primary index:

  • Vec uses im::Vector (a balanced tree indexed by position).
  • PriorityQueue uses:
    • im::HashMap (deterministic hasher) for key → value+priority lookup.
    • im::OrdMap for priority-ordered access (requires Ord, not hashing).

Identity derivation

Beyond collection hashing, determinism matters for identity:

UniqueId

Group and store identities are derived deterministically from their inputs:

// GroupId is derived from key + network
let group_id = UniqueId::new(&key, &network_id);

// StoreId is derived from group + type signature
let store_id = StoreId::new(&group_id, &type_signature);

This uses blake3 hashing, which is inherently deterministic.

Type signatures

Collection state machines compute a signature from their Rust type names:

fn signature() -> Digest {
    Digest::from(
        std::any::type_name::<MapStateMachine<K, V>>()
    )
}

This ensures that two collections are only considered the same store if they have identical key and value types. A Map<String, u64> and a Map<String, i64> will have different StoreId values and will never attempt to sync with each other.

Snapshot consistency

Deterministic hashing is critical for snapshot-based state sync:

Node A state: { "alice" → 1, "bob" → 2 }
Node B state: { "alice" → 1, "bob" → 2 }

With deterministic hashing:
  Node A snapshot bytes == Node B snapshot bytes  ✓

With random hashing:
  Node A snapshot bytes != Node B snapshot bytes  ✗
  (different internal bucket layout)

When a joining peer fetches snapshot batches from multiple source peers, the items must be compatible. Deterministic hashing ensures all sources produce the same serialized representation for identical logical state.

The im crate

Mosaik uses the im crate for persistent (copy-on-write) data structures. Key properties:

PropertyBenefit
Structural sharingO(1) snapshot via clone() — only divergent nodes are copied
Custom hasherAccepts BuildHasherDefault<DefaultHasher> for determinism
Thread-safe clonesArc-based sharing, safe to snapshot from one task and iterate in another
Balanced treesOrdMap and Vector use RRB trees with O(log n) operations

The O(1) cloning is especially important for state sync — creating a snapshot does not block the state machine from processing new commands.

Trait requirements

The deterministic hashing strategy imposes trait bounds on collection keys and values:

TraitRequired byPurpose
HashMap keys, Set valuesDeterministic bucket placement
EqMap keys, Set valuesEquality comparison for collision resolution
CloneAll keys and valuesStructural sharing in im data structures
Serialize + DeserializeOwnedAll keys and valuesSnapshot and replication encoding
Send + Sync + 'staticAll keys and valuesCross-task sharing
OrdPriorityQueue prioritiesOrdered access in OrdMap

These are codified as blanket trait aliases:

// Satisfied by types that are Hash + Eq + Clone + Serialize + DeserializeOwned + Send + Sync + 'static
trait Key {}

// Adds Ord to Key requirements
trait OrderedKey {}

// Clone + Serialize + DeserializeOwned + Send + Sync + 'static (no Hash/Eq)
trait Value {}

Configuration Reference

This chapter lists every configuration struct in mosaik, its fields, default values, and how the pieces fit together.

NetworkBuilder (top-level)

The entry point for creating a mosaik network. All nested configs are accessible through fluent builder methods.

use mosaik::Network;

let network = Network::builder()
    .network_id("my-network")
    .mdns_discovery(true)
    .discovery(|d| d.events_backlog(200))
    .streams(|s| s.with_backoff(my_backoff))
    .groups(|g| g.handshake_timeout(Duration::from_secs(5)))
    .build()
    .await?;
FieldTypeDefaultDescription
network_idNetworkIdrequiredUnique identifier for this network
relay_modeiroh::RelayModeRelayMode::DefaultRelay server mode for NAT/firewall traversal
mdns_discoveryboolfalseEnable mDNS local network peer discovery
addressesBTreeSet<SocketAddr>empty (all interfaces)Local bind addresses
secret_keySecretKeyrandomEd25519 key for peer identity
discoveryDiscoveryConfigBuildersee belowNested discovery config
streamsStreamsConfigBuildersee belowNested streams config
groupsGroupsConfigBuildersee belowNested groups config

Note: secret_key determines the PeerId. If omitted, a random key is generated on each run, giving the node a new identity every time. Specifying a fixed key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts.


discovery::Config

Controls peer discovery, gossip, and catalog maintenance.

FieldTypeDefaultDescription
events_backlogusize100Past events retained in event watchers
bootstrap_peersVec<EndpointAddr>emptyInitial peers to connect to on startup
tagsVec<Tag>emptyTags advertised in local PeerEntry
purge_afterDuration300s (5 min)Time before stale peer entries are purged
max_time_driftDuration10sMaximum acceptable timestamp drift
announce_intervalDuration15sInterval between presence announcements
announce_jitterf320.5Max jitter factor on announce interval
graceful_departure_windowDuration500msWait for departure gossip to propagate

Builder methods with_bootstrap(peers) and with_tags(tags) accept either a single item or an iterator (via IntoIterOrSingle):

Network::builder()
    .discovery(|d| d
        .with_bootstrap(peer_addr)          // single peer
        .with_tags(["validator", "relay"])   // multiple tags
        .purge_after(Duration::from_secs(600))
    )

streams::Config

Controls stream consumer reconnection behavior.

FieldTypeDefaultDescription
backoffBackoffFactoryExponential, 5 min maxRetry policy for consumer stream connections

Custom backoff example:

use mosaik::streams::backoff::ExponentialBackoff;

Network::builder()
    .streams(|s| s.with_backoff(ExponentialBackoff {
        max_elapsed_time: Some(Duration::from_secs(120)),
        ..Default::default()
    }))

groups::Config

Controls bond establishment.

FieldTypeDefaultDescription
handshake_timeoutDuration2sTimeout for bond handshake with remote peers

ConsensusConfig

Consensus parameters that must be identical across all members of a group. These values are hashed into the GroupId, so any mismatch creates a different group.

FieldTypeDefaultDescription
heartbeat_intervalDuration500msBond heartbeat interval
heartbeat_jitterDuration150msMax heartbeat jitter
max_missed_heartbeatsu3210Missed heartbeats before bond is considered dead
election_timeoutDuration2sRaft election timeout (must exceed heartbeat interval)
election_timeout_jitterDuration500msElection timeout randomization
bootstrap_delayDuration3sExtra delay for the first election (term 0)
forward_timeoutDuration2sTimeout for forwarding commands to leader
query_timeoutDuration2sTimeout for leader to respond to queries
use mosaik::groups::ConsensusConfig;

let config = ConsensusConfig::builder()
    .heartbeat_interval(Duration::from_millis(250))
    .election_timeout(Duration::from_secs(1))
    .build()
    .unwrap();

Leadership deprioritization

let config = ConsensusConfig::default().deprioritize_leadership();

This multiplies the election timeout by a factor, making this node less likely to become leader. Used by collection readers.


SyncConfig (collections)

Controls snapshot-based state synchronization for collections.

FieldTypeDefaultDescription
fetch_batch_sizeusize2000Max items per batch request
snapshot_ttlDuration10sHow long a snapshot remains available
snapshot_request_timeoutDuration15sTimeout for requesting a snapshot
fetch_timeoutDuration5sTimeout for each batch fetch

Configuration hierarchy

NetworkBuilder
├── network_id          (identity)
├── secret_key          (identity)
├── relay_mode          (transport)
├── mdns_discovery      (transport)
├── addresses           (transport)
│
├── discovery::Config
│   ├── bootstrap_peers
│   ├── tags
│   ├── events_backlog
│   ├── purge_after
│   ├── max_time_drift
│   ├── announce_interval
│   └── announce_jitter
│
├── streams::Config
│   └── backoff
│
└── groups::Config
    └── handshake_timeout

Per-group:
ConsensusConfig         (consensus timing, hashed into GroupId)

Per-collection:
SyncConfig              (state sync tuning)

Environment influence

Configuration structs are pure Rust — there is no automatic environment variable parsing. However, test utilities honor:

VariableUsed byEffect
TEST_TRACETest tracing setupControls log level (debug/trace/info/etc.)
TEST_TRACE_UNMUTETest tracing setupSet to 1 to show all log output
TIME_FACTORTest time helpersFloat multiplier for all test durations

Error Handling

Mosaik uses typed error enums specific to each subsystem, plus a close-reason system for QUIC connection-level codes. This chapter catalogs every public error type.

Network errors

network::Error — returned by NetworkBuilder::build():

VariantDescription
MissingNetworkIdNo network_id was set on the builder
Bind(BindError)Failed to bind the QUIC endpoint
InvalidAddress(InvalidSocketAddr)An address in addresses is invalid
DiscoveryConfig(ConfigBuilderError)Discovery config builder failed validation
StreamsConfig(ConfigBuilderError)Streams config builder failed validation
GroupsConfig(ConfigBuilderError)Groups config builder failed validation

Discovery errors

discovery::Error:

VariantDescription
InvalidSecretKey(PeerId, PeerId)Secret key does not match expected PeerId
DifferentNetwork { local_network, remote_network }Remote peer is on a different network
InvalidSignaturePeer entry signature verification failed
GossipJoin(ApiError)Failed to join gossip topic (iroh_gossip)
PeerIdChanged(PeerId, PeerId)Attempted to change the local peer’s ID
Link(LinkError)Transport-level link error
Other(Box<dyn Error>)Generic boxed error
CancelledOperation was cancelled

Command errors

groups::CommandError<M> — returned by execute(), execute_many(), feed(), feed_many():

VariantRecoverable?Description
Offline(Vec<M::Command>)YesNode is offline; carries the unsent commands for retry
NoCommandsNoEmpty command batch was submitted
GroupTerminatedNoThe group has been permanently closed

Recovering from Offline

match group.execute(MyCommand::Increment).await {
    Ok(result) => println!("committed: {result:?}"),
    Err(CommandError::Offline(commands)) => {
        // Wait for the group to come online, then retry
        group.when().online().await;
        for cmd in commands {
            group.execute(cmd).await?;
        }
    }
    Err(CommandError::GroupTerminated) => {
        // Permanent failure — stop trying
    }
    Err(CommandError::NoCommands) => unreachable!(),
}

Query errors

groups::QueryError<M> — returned by query():

VariantRecoverable?Description
Offline(M::Query)YesNode is offline; carries the unsent query
GroupTerminatedNoThe group has been permanently closed

Collection errors

collections::Error<T> — returned by collection write operations:

VariantRecoverable?Description
Offline(T)YesNode is offline; carries the value for retry
NetworkDownNoNetwork has shut down

The generic T contains the value that failed to be written, enabling retry without re-creating the data:

match map.insert("key".into(), 42).await {
    Ok(prev) => println!("previous: {prev:?}"),
    Err(collections::Error::Offline(value)) => {
        // value == 42, retry later
    }
    Err(collections::Error::NetworkDown) => {
        // permanent failure
    }
}

Producer errors

streams::producer::Error<D> — returned by Sink::send() and try_send():

VariantDescription
Closed(Option<D>)Producer has been closed
Full(D)Internal buffer is full (back-pressure)
Offline(D)No active consumers connected

All variants carry the datum back when possible, enabling retry.

Close reasons (QUIC application codes)

Mosaik uses typed close reasons for QUIC ApplicationClose codes. These are generated with the make_close_reason! macro and appear in connection close frames.

Reserved ranges

RangeOwner
0–199mosaik internal
200+Application-defined

Built-in close reasons

NameCodeDescription
Success200Protocol completed successfully
GracefulShutdown204Graceful shutdown
InvalidAlpn100Wrong ALPN protocol
DifferentNetwork101Peer on a different network
Cancelled102Operation cancelled
UnexpectedClose103Unexpected connection close
ProtocolViolation400Protocol message violation
UnknownPeer401Peer not found in discovery catalog

Group close reasons

NameCodeDescription
InvalidHandshake30,400Handshake decode error
GroupNotFound30,404Unknown group ID
InvalidProof30,405Invalid authentication proof
Timeout30,408Peer response timeout
AlreadyBonded30,429Duplicate bond between peers

Defining custom close reasons

use mosaik::network::make_close_reason;

// Code must be >= 200
make_close_reason!(MyAppError, 500, "Application-specific error");

Error design patterns

Temporary vs. permanent

Mosaik errors follow a consistent pattern:

  • Temporary errors carry the original data back (e.g., Offline(commands), Full(datum)) so you can retry without data loss.
  • Permanent errors (e.g., GroupTerminated, NetworkDown) indicate the resource is gone and retrying is pointless.

Matching on recoverability

use mosaik::collections::Error;

loop {
    match map.insert("key".into(), value.clone()).await {
        Ok(_) => break,
        Err(Error::Offline(_)) => {
            map.when().online().await;
            continue;
        }
        Err(Error::NetworkDown) => {
            panic!("network is gone");
        }
    }
}

The CloseReason trait

All close reason types implement:

trait CloseReason:
    Error + Into<ApplicationClose> + PartialEq<ApplicationClose>
    + Clone + Send + Sync + 'static
{}

This lets you match connection close frames against typed reasons:

if close_frame == InvalidProof {
    // handle proof failure
}

Primitives Reference

This chapter documents the foundational types and utilities that the rest of mosaik builds on. These live in the primitives module (public types) and internal helpers.

Digest / UniqueId / Tag

The core identifier type — a 32-byte blake3 hash.

use mosaik::Digest;  // re-exported at crate root

Digest, UniqueId, and Tag are all the same type, aliased for clarity in different contexts:

AliasUsed for
DigestGeneral-purpose 32-byte identifier
UniqueIdDerived identity (groups, stores, bonds)
TagDiscovery tags for peer classification
NetworkIdNetwork identifier (= Digest)
GroupIdGroup identifier (= Digest)
StreamIdStream identifier (= Digest)
StoreIdCollection store identifier (= UniqueId)

Construction

// From a string (hashes the string with blake3)
let tag = Digest::from("validator");

// From a hex string (64 chars = direct decode, otherwise hashed)
let id = Digest::from("a1b2c3d4...");

// From numeric types (little-endian encoded, then hashed)
let id = Digest::from_u64(42);

// From multiple parts (concatenated, then hashed)
let id = Digest::from_parts(&["prefix", "suffix"]);

// Derive a child ID deterministically
let child = parent.derive("child-name");

// Random
let id = Digest::random();

// Zero (sentinel value)
let zero = Digest::zero();

Compile-time construction

use mosaik::unique_id;

const MY_ID: UniqueId = unique_id!("a1b2c3d4e5f6...");  // 64-char hex literal

Display

  • Display — short hex (first 5 bytes): a1b2c3d4e5
  • Debug — full 64-character hex string
  • Short<T> wrapper — always shows first 5 bytes
  • Abbreviated<const LEN, T> — shows first..last if longer than LEN

Serialization

  • Human-readable formats (JSON, TOML): hex string
  • Binary formats (postcard): raw 32 bytes

PeerId

type PeerId = iroh::EndpointId;

A peer’s public identity, derived from their Ed25519 public key. This is an iroh type, not a mosaik Digest. It is used in discovery, bonds, and connection authentication.

SecretKey

Re-exported from iroh. An Ed25519 secret key that determines a node’s PeerId. If not provided to NetworkBuilder, a random key is generated automatically — this is the recommended default for regular nodes.

Specifying a fixed secret key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts:

use mosaik::SecretKey;

// Auto-generated (default) — new identity each run
let network = Network::builder()
    .network_id("my-network")
    .build().await?;

// Fixed key — stable identity, recommended only for bootstrap nodes
let key = SecretKey::generate(&mut rand::rng());
let bootstrap = Network::builder()
    .network_id("my-network")
    .secret_key(key)
    .build().await?;

Wire encoding

All network messages use postcard — a compact, #[no_std]-compatible binary format using variable-length integers.

FunctionDescription
serialize<T: Serialize>(&T) -> BytesSerialize to bytes (panics on failure)
deserialize<T: DeserializeOwned>(impl AsRef<[u8]>) -> Result<T>Deserialize from bytes

These are internal crate functions. Application code interacts with them indirectly through Link<P> send/receive and collection operations.


Bytes / BytesMut

Re-exported from the bytes crate. Used throughout mosaik for zero-copy byte buffers:

use mosaik::Bytes;

let data: Bytes = serialize(&my_message);

BackoffFactory

A factory type for creating retry backoff strategies:

type BackoffFactory = Arc<
    dyn Fn() -> Box<dyn Backoff + Send + Sync + 'static>
        + Send + Sync + 'static
>;

Used in streams::Config to configure consumer reconnection. The backoff crate is re-exported at mosaik::streams::backoff.


Formatting utilities

Internal helpers for consistent debug output:

TypeDescription
Pretty<T>Pass-through wrapper (for trait integration)
Short<T>Display first 5 bytes as hex
Abbreviated<const LEN, T>Show first..last hex if longer than LEN bytes
Redacted<T>Always prints <redacted>
FmtIter<W, I>Format iterator elements comma-separated in brackets

IntoIterOrSingle<T>

An ergonomic trait that lets API methods accept either a single item or an iterator:

// Both work:
discovery.with_tags("validator");            // single tag
discovery.with_tags(["validator", "relay"]); // multiple tags

This is implemented via two blanket impls using Variant<0> (single item via Into<T>) and Variant<1> (iterator of Into<T>).


Internal async primitives

These are pub(crate) and not part of the public API, but understanding them helps when reading mosaik’s source code.

UnboundedChannel<T>

A wrapper around tokio::sync::mpsc::unbounded_channel that keeps both the sender and receiver in one struct:

let channel = UnboundedChannel::new();
channel.send(42);
let val = channel.recv().await;
MethodDescription
sender()Get &UnboundedSender<T>
receiver()Get &mut UnboundedReceiver<T>
send(T)Send (errors silently ignored)
recv()Async receive
poll_recv(cx)Poll-based receive
poll_recv_many(cx, buf, limit)Batch poll receive
is_empty() / len()Queue inspection

AsyncWorkQueue<T>

A FuturesUnordered wrapper with a permanently-pending sentinel future so that polling never returns None:

let queue = AsyncWorkQueue::new();
queue.enqueue(async { do_work().await });

// Poll via Stream trait — never completes while empty
while let Some(result) = queue.next().await {
    handle(result);
}

BoxPinFut<T>

Type alias for boxed, pinned, send futures:

type BoxPinFut<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

The InternalFutureExt trait adds a .pin() method to any Future + Send + 'static for ergonomic boxing.

Testing Guide

This chapter covers how to test mosaik applications and how to work with mosaik’s own test infrastructure when contributing.

Test setup

Dependencies

Add these to your [dev-dependencies]:

[dev-dependencies]
mosaik = { version = "0.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Basic test structure

Every mosaik test follows the same pattern:

  1. Create networks with in-process endpoints.
  2. Connect them via sync_with (or mDNS for local tests).
  3. Wait for discovery to propagate.
  4. Exercise the API.
  5. Assert state.
#[tokio::test]
async fn two_nodes_discover_each_other() {
    let net_a = Network::builder()
        .network_id("test")
        .build()
        .await
        .unwrap();

    let net_b = Network::builder()
        .network_id("test")
        .build()
        .await
        .unwrap();

    // Connect the two endpoints directly
    net_a.sync_with(net_b.endpoint_addr()).await.unwrap();

    // Wait for mutual discovery
    let event = net_a.discovery().events().recv().await.unwrap();
    assert!(matches!(event, Event::Discovered { .. }));
}

Connecting test networks

sync_with

The simplest way to connect two test nodes:

net_a.sync_with(net_b.endpoint_addr()).await?;

This synchronizes discovery catalogs between the two nodes, establishing mutual awareness.

Discover all (fan-out)

For multi-node tests, connect all pairs:

async fn discover_all(networks: &[&Network]) {
    let futs: Vec<_> = networks.iter()
        .flat_map(|a| networks.iter().map(move |b| {
            a.sync_with(b.endpoint_addr())
        }))
        .collect();
    futures::future::try_join_all(futs).await.unwrap();
}

Mosaik’s test suite provides this as a utility function.

Time management

TIME_FACTOR environment variable

All test durations are multiplied by TIME_FACTOR (default 1.0). This is useful for running tests on slow CI machines or over high-latency networks:

# Double all timeouts for slow CI
TIME_FACTOR=2.0 cargo test

# 10x for debugging with breakpoints
TIME_FACTOR=10.0 cargo test -- --nocapture

Time helper functions

FunctionDescription
secs(n)Duration::from_secs(n) × TIME_FACTOR
millis(n)Duration::from_millis(n) × TIME_FACTOR
sleep_s(n)tokio::time::sleep(secs(n))
sleep_ms(n)tokio::time::sleep(millis(n))
timeout_s(n, fut)Timeout with location tracking
timeout_ms(n, fut)Timeout with location tracking

The timeout_* functions use #[track_caller] so timeout errors report the test line number, not the utility function.

Tracing

Automatic initialization

Mosaik’s test suite uses #[ctor::ctor] to initialize tracing before any test runs. Control it with environment variables:

# Enable debug logging
TEST_TRACE=debug cargo test

# Available levels: trace, debug, info, warn, error
TEST_TRACE=trace cargo test -- test_name

# Show all modules (including noisy deps)
TEST_TRACE=debug TEST_TRACE_UNMUTE=1 cargo test

Muted modules

By default, these noisy modules are filtered out:

  • iroh, rustls, igd_next, hickory_*
  • hyper_util, portmapper, reqwest
  • netwatch, mio, acto, swarm_discovery
  • events.net.relay.connected

Set TEST_TRACE_UNMUTE=1 to see their output.

Panic handling

The test harness installs a custom panic hook that:

  1. Logs the panic via tracing::error!.
  2. Calls std::process::abort() to prevent test framework from masking the panic in async contexts.

Testing patterns

Waiting for conditions

Use the When API instead of arbitrary sleeps:

// Wait for a group to come online
group.when().online().await;

// Wait for a collection to reach a version
map.when().updated(|v| v.index() >= 10).await;

// Wait for bonds to form
group.when().bonds(|b| b.len() >= 2).await;

Testing state machines

Test your state machine in isolation before running it in a group:

#[test]
fn state_machine_logic() {
    let mut sm = Counter::default();
    let ctx = ApplyContext {
        index: 1,
        term: 1,
        leader: false,
    };

    let result = sm.apply(CounterCommand::Increment, ctx);
    assert_eq!(result, 0); // returns previous value

    let result = sm.apply(CounterCommand::Increment, ctx);
    assert_eq!(result, 1);
}

Testing collections

#[tokio::test]
async fn replicated_map() {
    let (net_a, net_b) = create_connected_pair().await;

    let writer: MapWriter<String, u64> = net_a.collections()
        .map_writer("my-store")
        .build();

    let reader: MapReader<String, u64> = net_b.collections()
        .map_reader("my-store")
        .build();

    // Write on node A
    writer.insert("key".into(), 42).await.unwrap();

    // Wait for replication on node B
    reader.when().updated(|v| v.index() >= 1).await;

    // Read on node B
    assert_eq!(reader.get(&"key".into()), Some(42));
}

Testing streams

#[tokio::test]
async fn stream_delivery() {
    let (net_a, net_b) = create_connected_pair().await;

    let producer = net_a.streams()
        .producer::<Message>("topic")
        .build();

    let consumer = net_b.streams()
        .consumer::<Message>("topic")
        .build();

    // Wait for consumer to connect
    producer.when().active().await;

    // Send and receive
    producer.send(Message("hello".into())).await.unwrap();
    let msg = consumer.recv().await.unwrap();
    assert_eq!(msg.0, "hello");
}

Polling futures in tests

For testing poll-based logic:

use std::task::Poll;

/// Poll a future exactly once with a no-op waker
fn poll_once<F: Future + Unpin>(f: &mut F) -> Poll<F::Output> {
    let waker = futures::task::noop_waker();
    let mut cx = std::task::Context::from_waker(&waker);
    Pin::new(f).poll(&mut cx)
}

CI considerations

  • Set TIME_FACTOR=2.0 or higher for CI environments.
  • Use TEST_TRACE=debug to capture logs on failure.
  • Run tests with --test-threads=1 if you encounter port conflicts.
  • The test suite uses real networking (loopback), so ensure localhost UDP is available.

Project test structure

tests/
├── basic.rs            # Test harness, data types, module declarations
├── collections/        # Map, Vec, Set, DEPQ tests
├── discovery/          # Catalog, departure tests
├── groups/             # Bonds, builder, execute, feed, leader, catchup
├── streams/            # Smoke tests, stats, producer/consumer
└── utils/
    ├── mod.rs          # discover_all helper
    ├── tracing.rs      # Auto-init tracing with ctor
    ├── time.rs         # TIME_FACTOR-aware duration helpers
    └── fut.rs          # poll_once, forever