Introduction
Mosaik is a Rust runtime for building self-organizing, leaderless distributed systems. It is designed for trusted, permissioned networks — environments where all participating nodes are assumed honest, such as L2 blockchain infrastructure operated by a single organization.
What Mosaik Does
When you deploy mosaik-based binaries on arbitrary machines, the network self-organizes: nodes discover each other via gossip, infer the data-flow topology, elect leaders where needed, and converge to a stable operational state. Each node needs only two things to participate:
- A network ID — identifies which logical network to join
- ** (optionally) A bootstrap peer** — the peer ID of any node already on the network (the bootstrap example can be used in production as a universal bootstrap node). Mosaik provides automatic bootstrap by publishing peer identities and their network id associations in Mainline DHT for zero-config discovery.
A secret key is automatically generated on each run, giving the node a unique identity. Specifying a fixed secret key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts.
From these minimal inputs, mosaik handles peer discovery, typed pub/sub data streaming, Raft-based consensus groups, and replicated data structures — all automatically.
Design Philosophy
- Not Byzantine fault tolerant. All members are assumed honest. This simplifies the protocol stack and enables higher throughput compared to BFT systems.
- Self-organizing. No central coordinator, no manual topology configuration. Nodes find each other and form the right connections.
- Built on modern networking. Uses iroh for QUIC-based peer-to-peer transport with relay support and hole-punching.
- Composable primitives. Five subsystems (
Network,Discovery,Streams,Groups,Collections) compose to support a wide range of distributed application patterns.
System Overview
┌─────────────────────────────────────────────────┐
│ Network │
│ (QUIC endpoint, identity, protocol routing) │
├──────────┬──────────┬──────────┬────────────────┤
│ Discovery│ Streams │ Groups │ Collections │
│ gossip, │ typed │ Raft │ Map/Vec/Set/ │
│ catalog │ pub/sub │ consensus│ Register/Once │
│ │ │ groups │ PriorityQueue │
└──────────┴──────────┴──────────┴────────────────┘
- Network is the entry point. It manages the QUIC transport, node identity, and composes all subsystems.
- Discovery uses gossip to maintain a catalog of all peers, their capabilities, and their available streams/groups.
- Streams provides typed, async pub/sub channels. Any serializable Rust type can be streamed between nodes.
- Groups implements Raft consensus for clusters of nodes that need shared state and leader election.
- Collections builds on Groups to offer replicated data structures (
Map,Vec,Set,Register,Once,PriorityQueue) that stay synchronized across nodes.
Who Should Read This Book
This book serves two audiences:
- Application developers building distributed systems with mosaik — start with Getting Started and the Tutorials.
- Contributors to mosaik itself — the Architecture Deep Dives section covers protocol internals, Raft modifications, and design decisions.
Quick Example
A minimal mosaik node that joins a network and starts streaming data:
use mosaik::*;
use futures::SinkExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network_id: NetworkId = "my-app".into();
let network = Network::new(network_id).await?;
// The node is now online and discoverable
println!("Node {} is online", network.local().id());
// Create a typed producer (any serializable type works)
let producer = network.streams().produce::<String>();
// Wait for at least one consumer to connect
producer.when().online().await;
// Send data
producer.send("hello, world".to_string()).await?;
Ok(())
}
Overview
Mosaik is a Rust runtime for building self-organizing, leaderless distributed systems. It targets trusted, permissioned networks — environments where all participating nodes are controlled by the same organization and assumed to be honest.
Key Features
| Feature | Description |
|---|---|
| Self-organizing | Nodes discover each other via gossip and form the correct topology automatically |
| Typed pub/sub | Stream any serializable Rust type between nodes with backpressure and filtering |
| Raft consensus | Form availability groups with leader election and replicated state machines |
| Replicated collections | Distributed Map, Vec, Set, Register, Once, and PriorityQueue with strong consistency |
| QUIC transport | Built on iroh for modern, encrypted P2P networking |
| Relay support | Nodes behind NAT can communicate via relay servers with automatic hole-punching |
Module Map
Mosaik is organized into five composable subsystems:
┌──────────────────────────────────────────────────────┐
│ Network │
│ Entry point. QUIC endpoint, identity, routing. │
├─────────────┬─────────────┬─────────────┬────────────┤
│ Discovery │ Streams │ Groups │Collections │
│ Gossip & │ Typed │ Raft │ Replicated │
│ peer │ pub/sub │ consensus │ Map, Vec, │
│ catalog │ channels │ groups │ Set, Reg, │
│ │ │ │ Once, DEPQ │
└─────────────┴─────────────┴─────────────┴────────────┘
Network
The entry point to the SDK. Creates the QUIC endpoint, manages node identity (derived from a secret key), and composes all subsystems via ALPN-based protocol multiplexing.
Discovery
Gossip-based peer discovery. Maintains a catalog — a synchronized view of all known peers, their capabilities (tags), and their available streams and groups. Uses two complementary protocols: real-time gossip announcements and full catalog synchronization for catch-up.
Streams
Typed, async pub/sub data channels. Any Rust type implementing Serialize + DeserializeOwned + Send + 'static can be streamed. Producers publish data; consumers subscribe. Discovery automatically connects matching producers and consumers across the network.
Groups
Availability groups coordinated by a modified Raft consensus protocol. Nodes sharing a group key form a cluster, elect a leader, and replicate commands through a shared log. Custom state machines define application logic that runs deterministically on all group members.
Collections
Higher-level replicated data structures built on Groups. Each collection (Map, Vec, Set, Register, Once, PriorityQueue) creates its own Raft group with a specialized state machine.
Design Decisions
Trusted Network Assumption
Mosaik is not Byzantine fault tolerant. All nodes are assumed to be honest and correctly implementing the protocol. This assumption enables:
- Simpler consensus (no need for 2/3 supermajority)
- Higher throughput (fewer message rounds)
- Simplified state sync (no fraud proofs needed)
This makes mosaik ideal for infrastructure controlled by a single organization, such as L2 chains, internal microservices, or distributed compute clusters.
Built on iroh
Mosaik uses iroh for its networking layer, which provides:
- QUIC transport — multiplexed, encrypted connections
- Relay servers — NAT traversal for nodes behind firewalls
- mDNS — optional local network discovery
- Endpoint identity — public keys as node identifiers
Installation
Requirements
- Rust ≥ 1.89 (edition 2024)
- A Unix-like OS (Linux, macOS) or Windows
Add to Your Project
Add mosaik to your Cargo.toml:
[dependencies]
mosaik = "0.2"
Mosaik pulls in its core dependencies automatically, including:
tokio— async runtime (full features)iroh— QUIC-based P2P networkingserde— serialization frameworkfutures—StreamandSinktraits
Common Additional Dependencies
Most mosaik applications will also want:
[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
futures = "0.3"
anyhow = "1"
tokio— you need the tokio runtime to run mosaikserdewithderive— for#[derive(Serialize, Deserialize)]on your data typesfutures— forStreamExt/SinkExttraits on consumers and producers
Verify Installation
Create a minimal test to verify everything links correctly:
use mosaik::{Network, NetworkId};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network = Network::new("test-network".into()).await?;
println!("Node online: {}", network.local().id());
Ok(())
}
cargo run
If you see a node ID printed, mosaik is installed and working.
Feature Flags
Mosaik currently does not define any optional feature flags. All functionality is included by default.
Quick Start
This guide walks through a complete example: two nodes that discover each other and exchange typed data through a stream.
Step 1: Create the Network
Every mosaik application starts by creating a Network. The NetworkId ensures only nodes on the same logical network can communicate.
use mosaik::{Network, NetworkId};
let network_id: NetworkId = "my-app".into();
let network = Network::new(network_id).await?;
Network::new() creates a node with default settings:
- A random secret key (new identity each run — this is the recommended default)
- Default relay mode (uses iroh’s relay servers for NAT traversal)
- No bootstrap peers (fine for local testing)
- No tags
For production use, you’ll want Network::builder() to configure bootstrap peers so the node can find the network. A fixed secret key is only needed for bootstrap nodes that need a stable peer ID — regular nodes work fine with the auto-generated key. See the Network chapter.
Step 2: Define Your Data Type
Streams in mosaik are typed. Any type implementing Serialize + DeserializeOwned + Send + 'static automatically implements the Datum trait and can be streamed:
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
sensor_id: u64,
temperature: f64,
timestamp: u64,
}
Step 3: Create a Producer
A producer publishes data for consumers across the network. The stream ID is derived from the type name by default.
let producer = network.streams().produce::<SensorReading>();
// Wait until at least one consumer connects
producer.when().subscribed().await;
Step 4: Send Data
Producer implements the futures::Sink trait. You can use SinkExt::send() for awaitable sends:
use futures::SinkExt;
producer.send(SensorReading {
sensor_id: 1,
temperature: 22.5,
timestamp: 1700000000,
}).await?;
For non-blocking sends, use try_send():
producer.try_send(SensorReading {
sensor_id: 1,
temperature: 23.1,
timestamp: 1700000001,
})?;
Step 5: Create a Consumer (On Another Node)
On a different node (or the same node for testing), create a consumer for the same type:
let other_network = Network::new(network_id).await?;
let mut consumer = other_network.streams().consume::<SensorReading>();
// Wait for subscription to a producer
consumer.when().subscribed().await;
Step 6: Receive Data
Consumer implements futures::Stream. Use StreamExt::next() to receive:
use futures::StreamExt;
while let Some(reading) = consumer.next().await {
println!("Sensor {}: {}°C", reading.sensor_id, reading.temperature);
}
Or use the direct recv() method:
if let Some(reading) = consumer.recv().await {
println!("Got reading: {:?}", reading);
}
Step 7: Discovery (Connecting the Nodes)
For nodes to find each other, they need at least one common peer address. In local testing, you can trigger manual discovery:
// On the consumer's network, dial the producer's address
other_network.discovery().sync_with(network.local().addr()).await?;
In production, you’ll configure bootstrap peers:
use mosaik::discovery;
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
)
.build()
.await?;
Putting It All Together
Here’s the complete example as two async tasks simulating two nodes:
use futures::{SinkExt, StreamExt};
use mosaik::{Network, NetworkId};
use serde::{Deserialize, Serialize};
use std::pin::pin;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SensorReading {
sensor_id: u64,
temperature: f64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network_id: NetworkId = "sensor-network".into();
// Node 1: Producer
let node1 = Network::new(network_id).await?;
let producer = node1.streams().produce::<SensorReading>();
// Node 2: Consumer
let node2 = Network::new(network_id).await?;
let mut consumer = node2.streams().consume::<SensorReading>();
// Connect the nodes
node2.discovery().dial(node1.local().addr()).await?;
// Wait for the stream to be established
producer.when().online().await;
// Send data
for i in 0..10 {
producer.send(SensorReading {
sensor_id: 1,
temperature: 20.0 + i as f64 * 0.5,
}).await?;
}
// Receive data
for _ in 0..10 {
let reading = consumer.recv().await.unwrap();
println!("{:?}", reading);
}
Ok(())
}
What’s Next
- Tutorials — Walk through the included examples
- Streams — Producer/consumer configuration, filtering, backpressure
- Groups — Raft consensus and replicated state machines
- Collections — Distributed Map, Vec, Set, Register, Once, PriorityQueue
Architecture
This chapter describes how mosaik’s subsystems fit together, how protocols are multiplexed, and how the lifecycle of a node is managed.
Subsystem Composition
A Network is the top-level entry point. When built, it creates and composes four subsystems:
Network::builder(network_id)
│
├── LocalNode (QUIC endpoint, identity, lifecycle)
├── Discovery (gossip announcements, catalog sync)
├── Streams (typed pub/sub channels)
└── Groups (Raft consensus groups)
└── Collections (replicated Map, Vec, Set, Register, Once, PriorityQueue)
Each subsystem is created during Network::builder().build() and installed as a protocol handler on the iroh Router. The subsystems are then accessible via accessor methods:
let network = Network::builder(network_id).build().await?;
let local = network.local(); // LocalNode
let discovery = network.discovery(); // Discovery
let streams = network.streams(); // Streams
let groups = network.groups(); // Groups
All handles are cheap to clone (they wrap Arc internally).
ALPN-Based Protocol Multiplexing
Mosaik multiplexes multiple protocols over a single QUIC endpoint using ALPN (Application-Layer Protocol Negotiation). Each subsystem registers its own ALPN identifier:
| Subsystem | ALPN | Purpose |
|---|---|---|
| Discovery (announce) | /mosaik/announce | Real-time gossip broadcasts |
| Discovery (sync) | /mosaik/catalog-sync | Full catalog exchange |
| Streams | /mosaik/streams/1.0 | Pub/sub data channels |
| Groups | /mosaik/groups/1 | Raft consensus, bonds, state sync |
When a connection arrives, the iroh router inspects the ALPN and dispatches to the correct subsystem handler. This means all subsystems share the same QUIC endpoint and port.
Subsystems implement the ProtocolProvider trait to install their handlers:
// Internal trait — subsystems implement this
trait ProtocolProvider {
fn install(self, router: &mut Router);
}
The Protocol trait defines the ALPN for typed links:
pub trait Protocol {
const ALPN: &'static [u8];
}
Builder Pattern
Network uses a builder pattern for configuration:
let network = Network::builder(network_id)
.with_secret_key(secret_key) // Stable identity
.with_relay_mode(RelayMode::Disabled) // No relay servers
.with_mdns_discovery(true) // Local network discovery
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
.with_tags("my-role")
)
.with_streams(
streams::Config::builder()
.with_backoff(ExponentialBackoff::default())
)
.with_groups(
groups::Config::builder()
)
.build()
.await?;
For quick prototyping, Network::new(network_id) uses all defaults.
Lifecycle & Shutdown
Mosaik uses tokio_util::sync::CancellationToken for structured lifecycle management. The token lives in LocalNode and propagates shutdown to all subsystems:
Network::drop()
│
├── cancels LocalNode's CancellationToken
│ │
│ ├── Discovery workers shut down
│ ├── Stream producer/consumer workers shut down
│ ├── Group bond workers + Raft shut down
│ └── iroh Router shuts down
│
└── all resources released
When a Network is dropped, the cancellation token is triggered, and all background tasks gracefully terminate. Each subsystem’s internal tasks are select-looped against the cancellation token, ensuring no orphaned tasks.
Internal Communication Patterns
Mosaik uses several recurring patterns internally:
Watch Channels
Status changes are broadcast via tokio::sync::watch channels. This enables the when() API:
// Wait for a stream producer to come online
producer.when().online().await;
// Wait for a Raft group leader to be elected
group.when().leader_elected().await;
// Wait for a collection to reach a specific version
collection.when().reaches(version).await;
Arc<Inner> Pattern
All public handles (Network, Discovery, Streams, Groups, Group, Producer, Consumer, Map, Vec, Register, Once, etc.) are cheap to clone. They wrap an Arc<Inner> containing the actual state, making them safe to share across tasks.
Task-Per-Connection
Each consumer subscription and each producer-subscriber pair runs in its own tokio task. This avoids head-of-line blocking — a slow consumer doesn’t affect other consumers, and a slow subscriber doesn’t block the producer.
Identity & Networking
Mosaik’s identity system is built on cryptographic keys and content-addressed hashing. Every identifier in the system — networks, peers, streams, groups, collections — is a 32-byte Digest (blake3 hash).
UniqueId: The Universal Identifier
At the core of mosaik’s identity system is Digest — a 32-byte blake3 hash that serves as the universal identifier type:
use mosaik::Digest;
// From a string (hashes the string if not valid hex)
let id: UniqueId = "my-network".into();
// From raw bytes
let id = UniqueId::from_bytes([0u8; 32]);
// Random
let id = UniqueId::random();
// Compile-time constants via the unique_id! macro
use mosaik::unique_id;
// From a 64-char hex string (decoded directly):
const HEX_ID: UniqueId = unique_id!(
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
);
// From any arbitrary string (blake3-hashed at compile time):
const NAMED_ID: UniqueId = unique_id!("my-stream-name");
// Deterministic derivation
let derived = id.derive("sub-identifier");
All the following types are aliases for Digest:
| Type | Alias For | Identifies |
|---|---|---|
UniqueId | Digest | General-purpose unique identifier |
NetworkId | UniqueId | A mosaik network (derived from name) |
Tag | UniqueId | A capability or role label |
StreamId | UniqueId | A data stream (derived from type name) |
GroupId | UniqueId | A consensus group (derived from key + config) |
StoreId | UniqueId | A replicated collection instance |
PeerId: Node Identity
A PeerId is the node’s public key, derived from its secret key. It’s globally unique across all mosaik networks.
use mosaik::{Network, PeerId};
use iroh::SecretKey;
// Random identity (default when using Network::new)
let network = Network::new(network_id).await?;
let my_id: &PeerId = &network.local().id();
// Stable identity via explicit secret key
let secret = SecretKey::generate(&mut rand::rng());
let network = Network::builder(network_id)
.with_secret_key(secret)
.build()
.await?;
For bootstrap nodes and other long-lived infrastructure, you should use a fixed secret key so the node’s PeerId (and therefore its address) remains stable across restarts.
NetworkId: Network Isolation
A NetworkId is a Digest derived from a name string. Nodes can only connect to peers sharing the same NetworkId:
use mosaik::NetworkId;
// These produce the same NetworkId
let id1: NetworkId = "my-app".into();
let id2: NetworkId = "my-app".into();
assert_eq!(id1, id2);
// Different name → different network → can't communicate
let other: NetworkId = "other-app".into();
assert_ne!(id1, other);
The NetworkId also drives automatic peer discovery: nodes sharing the same NetworkId find each other through the Mainline DHT without requiring any hardcoded bootstrap peers. Simply using the same network name is enough for nodes to connect.
Tags: Capability Labels
Tags are Digest values used to describe a node’s role or capabilities:
use mosaik::Tag;
let tag: Tag = "matcher".into();
let another: Tag = "validator".into();
Tags are advertised through the discovery system and can be used to filter which peers a producer accepts or which producers a consumer subscribes to:
// Only accept consumers that have the "authorized" tag
let producer = network.streams().producer::<Order>()
.accept_if(|peer| peer.tags.contains(&"authorized".into()))
.build()?;
StreamId: Stream Identity
By default, a StreamId is derived from the Rust type name:
use mosaik::StreamId;
// Automatically derived from the type name
let producer = network.streams().produce::<SensorReading>();
// Or set explicitly
let producer = network.streams().producer::<SensorReading>()
.with_stream_id("custom-stream-name")
.build()?;
GroupId: Group Identity
A GroupId is deterministically derived from multiple inputs:
GroupId = hash(
GroupKey,
ConsensusConfig,
StateMachine::signature(),
StateSync::signature()
)
This ensures that nodes with different configurations, different state machines, or different group secrets cannot accidentally join the same group.
Endpoint Addresses
Nodes are addressed using EndpointAddr from iroh, which encodes the public key and optional relay URL. This is what you pass to bootstrap peers:
let addr = network.local().addr();
// addr contains: PeerId + relay URL + direct addresses
Self-Organization
One of mosaik’s defining features is that nodes self-organize into the correct topology without manual configuration. This chapter explains how that works step by step.
The Self-Organization Loop
When a new node joins the network, the following sequence happens automatically:
1. Bootstrap Node connects to a known bootstrap peer
│
2. Gossip Announce protocol broadcasts presence
│
3. Catalog Sync Full catalog exchange with bootstrap peer
│
4. Discovery Node learns about all other peers, their
tags, streams, and groups
│
5. Streams Consumer discovers matching producers,
opens subscriptions automatically
│
6. Groups Node finds peers with matching group keys,
forms bonds, joins Raft cluster
│
7. Convergence Network reaches a stable topology where
all nodes are connected to the right peers
Step 1: Bootstrap & Gossip
A new node starts with at least one bootstrap peer address. It connects and begins participating in the gossip protocol:
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
.with_tags("matcher")
)
.build()
.await?;
The node immediately:
- Announces itself via the gossip protocol (
/mosaik/announce), broadcasting itsPeerEntry(identity, tags, streams, groups) - Receives announcements from other peers
- Triggers a full catalog sync (
/mosaik/catalog-sync) with the bootstrap peer to catch up on all known peers
Step 2: Catalog Convergence
The catalog converges through two complementary protocols:
Real-time: Gossip Announcements
Every node periodically re-announces its PeerEntry via iroh-gossip. The announce interval is configurable (default: 15 seconds) with jitter to avoid thundering herds:
announce_interval = 15s
announce_jitter = 0.5 → actual interval: 7.5s – 22.5s
When a node changes (adds a tag, creates a stream, joins a group), it re-announces immediately.
Catch-up: Full Catalog Sync
When a new node connects, it performs a bidirectional catalog sync with its peer. Both nodes exchange their complete catalogs, and entries are merged:
Node A Node B
│ │
│── CatalogSyncRequest ───────► │
│ │
│◄── CatalogSyncResponse ──────│
│ │
│ (both merge received │
│ entries into local catalog) │
Signed Entries
Each PeerEntry is cryptographically signed by its owner. This proves authenticity — you can trust that a peer entry’s tags, streams, and groups are genuine, even when received via gossip through intermediaries.
Staleness & Purging
Entries that haven’t been updated within the purge_after duration (default: 300 seconds) are considered stale and hidden from the public catalog API. This ensures departed nodes are eventually removed.
Step 3: Automatic Stream Connections
Once discovery populates the catalog, the Streams subsystem automatically connects producers and consumers:
1. Node A creates Producer<Order>
→ Discovery advertises: "I produce stream 'Order'"
2. Node B creates Consumer<Order>
→ Discovery observes: "Node A produces 'Order'"
→ Consumer worker opens subscription to Node A
3. Data flows: Node A ──[Order]──► Node B
This is fully automatic. The consumer’s background worker monitors the catalog for matching producers and establishes connections as they appear.
Filtering can restrict which connections form:
// Producer only accepts nodes tagged "authorized"
let producer = network.streams().producer::<Order>()
.accept_if(|peer| peer.tags.contains(&"authorized".into()))
.build()?;
// Consumer only subscribes to nodes tagged "primary"
let consumer = network.streams().consumer::<Order>()
.subscribe_if(|peer| peer.tags.contains(&"primary".into()))
.build();
Step 4: Automatic Group Formation
Groups form through a similar discovery-driven process:
1. Node A joins group with key K
→ Discovery advertises: "I'm in group G" (where G = hash(K, config, ...))
2. Node B joins group with same key K
→ Discovery observes: "Node A is in group G"
→ Bond worker opens connection to Node A
3. Mutual handshake proves both know key K
→ Bond established
4. Raft consensus begins
→ Leader elected among bonded peers
→ Commands can be replicated
The bond handshake uses HMAC over the TLS session secrets combined with the group key. This proves knowledge of the group secret without transmitting it.
Step 5: Late Joiners
A node joining an already-running network catches up automatically:
Stream Catch-up
Consumers connecting to an active producer receive data from the point of subscription. There’s no historical replay — streams are real-time.
Group Catch-up
A node joining an existing Raft group:
- Forms bonds with existing members
- Receives the current log from peers (distributed across multiple peers for efficiency)
- Applies all log entries to bring its state machine up to date
- Begins participating normally (can vote, can become leader)
For collections, catch-up can use either log replay or snapshot sync:
- Log replay (default): Replays the entire log from the beginning
- Snapshot sync: Transfers a point-in-time snapshot of the state, avoiding log replay for large states
Topology Example
Consider a distributed trading system with three roles:
Tags: "trader" Tags: "matcher" Tags: "reporter"
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Node 1 │ │ Node 3 │ │ Node 5 │
│ Producer │──[Order]──►│ Consumer │ │ Consumer │
│ <Order> │ │ <Order> │ │ <Fill> │
└──────────┘ │ │ └──────────┘
│ Group: │──[Fill]──► ▲
┌──────────┐ │ OrderBook│ ┌──────────┐
│ Node 2 │ │ (Raft) │ │ Node 6 │
│ Producer │──[Order]──►│ │ │ Consumer │
│ <Order> │ │ Producer │──[Fill]──►│ <Fill> │
└──────────┘ │ <Fill> │ └──────────┘
└──────────┘
┌──────────┐
│ Node 4 │
│ (matcher │
│ replica)│
└──────────┘
All of this topology forms automatically from:
- Node 1–2:
with_tags("trader"), createsProducer<Order> - Node 3–4:
with_tags("matcher"), createsConsumer<Order>, joins orderbook group, createsProducer<Fill> - Node 5–6:
with_tags("reporter"), createsConsumer<Fill>
No node needs to know the addresses of any other node except one bootstrap peer.
Building a Bootstrap Node
This tutorial walks through the bootstrap example — a ready-to-use bootstrap node for any mosaik network.
What Is a Bootstrap Node?
A bootstrap node is the first peer that other nodes connect to when joining a network. It serves as the initial discovery point — once a new node connects to a bootstrap peer, the gossip protocol takes over and the joining node learns about all other peers.
Bootstrap nodes are typically:
- Long-lived — they run continuously
- Stable identity — they use a fixed secret key so their address doesn’t change across restarts
- Well-known — their address is configured as a bootstrap peer by other nodes
A bootstrap node doesn’t need any special code — it’s just a regular mosaik node that stays online. The bootstrap example can be used in production as a universal bootstrap node for any mosaik network. The example adds CLI configuration with clap.
Project Setup
The bootstrap example is a single file at examples/bootstrap.rs. It uses these dependencies:
[dependencies]
mosaik = "0.2"
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1"
The CLI Interface
The example uses clap derive to define command-line options:
use clap::{ArgAction, Parser};
use mosaik::*;
#[derive(Debug, Parser)]
struct Opts {
/// The secret key for stable identity across restarts
#[clap(short, long, env = "MOSAIK_BOOTSTRAP_SECRET")]
secret: Option<SecretKey>,
/// The network ID (hex string or seed that gets hashed)
#[clap(short, long, env = "MOSAIK_BOOTSTRAP_NETWORK_ID")]
network_id: Option<NetworkId>,
/// Other bootstrap nodes to connect to on startup
#[clap(short, long, env = "MOSAIK_BOOTSTRAP_PEERS")]
peers: Vec<PeerId>,
/// Tags to advertise (default: "bootstrap")
#[clap(short, long, default_value = "bootstrap",
env = "MOSAIK_BOOTSTRAP_TAGS")]
tags: Vec<Tag>,
/// Disable relay servers (node must be directly reachable)
#[clap(long, default_value_t = false)]
no_relay: bool,
/// Verbose output (-v debug, -vv trace)
#[clap(short, action = ArgAction::Count)]
verbose: u8,
/// Suppress all output
#[clap(short, long)]
quiet: bool,
}
Every option also supports environment variables (MOSAIK_BOOTSTRAP_*), making it easy to configure in containers or systemd services.
Secret Key Handling
The secret key determines the node’s PeerId. For a bootstrap node, a stable identity is essential so that other nodes can reliably find it:
fn parse_secret_key(s: &str) -> Result<SecretKey, Infallible> {
let bytes = Digest::from(s);
Ok(SecretKey::from_bytes(bytes.as_bytes()))
}
This parser accepts two formats:
- 64-character hex string — used directly as the secret key bytes
- Any other string — treated as a seed, hashed with blake3 into a deterministic key
So --secret=my-bootstrap-1 always produces the same key, making deployment reproducible.
Building the Network
The main function assembles the Network using the builder:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opts = Opts::parse();
let secret = opts.secret.unwrap_or_else(|| {
tracing::warn!("No secret key provided, generating random key");
SecretKey::generate(&mut rand::rng())
});
let network_id = opts.network_id.unwrap_or_else(|| {
tracing::warn!("No network id provided, generating random network id");
NetworkId::random()
});
let mut builder = Network::builder(network_id)
.with_secret_key(secret)
.with_discovery(
discovery::Config::builder()
.with_tags(opts.tags.clone())
.with_bootstrap(opts.peers.clone()),
);
if opts.no_relay {
builder = builder.with_relay_mode(iroh::RelayMode::Disabled);
}
let network = builder.build().await?;
tracing::info!("Bootstrap node started");
tracing::info!("Public Id: {}", network.local().id());
tracing::info!("Network Id: {:?}", network.network_id());
// Stay alive forever
core::future::pending::<()>().await;
Ok(())
}
Key points:
with_secret_key()— sets the stable identitywith_discovery()— configures tags and initial peers to dialwith_relay_mode(Disabled)— optional, for nodes with direct connectivitycore::future::pending()— keeps the process alive (the node runs in background tasks)
Running It
# Basic usage with a seed-based secret
cargo run --example bootstrap -- \
--network-id=my-network \
--secret=my-bootstrap-secret
# With environment variables
MOSAIK_BOOTSTRAP_SECRET=my-secret \
MOSAIK_BOOTSTRAP_NETWORK_ID=my-network \
cargo run --example bootstrap
# Multiple bootstrap nodes that know each other
cargo run --example bootstrap -- \
--network-id=my-network \
--secret=node-1 \
--peers=<peer-id-of-node-2>
Using the Bootstrap Node
Other nodes reference the bootstrap node’s address when joining the network:
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
)
.build()
.await?;
The joining node connects to the bootstrap peer, performs a full catalog sync, and then discovers all other nodes through gossip. From that point on, the joining node is a full participant — it doesn’t need the bootstrap node for ongoing operation.
Key Takeaways
- Bootstrap nodes are just regular nodes — no special server code needed
- Stable identity via secret key — ensures the address doesn’t change across restarts
- Tags for discoverability — the
"bootstrap"tag lets other nodes identify bootstrap peers - Minimal configuration — a secret key and network ID are all that’s required
Building a Distributed Orderbook
This tutorial walks through the orderbook example — a distributed order-matching engine that combines Streams, Groups, and Raft consensus.
Architecture Overview
The orderbook example demonstrates how multiple mosaik subsystems compose to build a realistic distributed system:
Traders Matchers Observers
┌────────────┐ ┌──────────────────┐ ┌────────────┐
│ Trader A │ │ Matcher 0 │ │ Observer │
│ Producer │─[Order]─►│ Consumer │ │ Consumer │
│ <Order> │ │ <Order> │ │ <Fill> │
└────────────┘ │ │ └────────────┘
│ ┌────────────┐ │ ▲
┌────────────┐ │ │ OrderBook │ │ │
│ Trader B │ │ │ (Raft SM) │ │──[Fill]───┘
│ Producer │─[Order]─►│ │ │ │
│ <Order> │ │ └────────────┘ │
└────────────┘ │ Producer │
│ <Fill> │
└──────────────────┘
┌──────────────────┐
│ Matcher 1 │
│ (replica) │
└──────────────────┘
┌──────────────────┐
│ Matcher 2 │
│ (replica) │
└──────────────────┘
- Traders produce
Orderobjects via Streams - Matchers (a 3-node Raft group) consume orders, replicate them through consensus, and run a price-time priority matching engine
- Fill events are published back as a stream for downstream consumers
Step 1: Define the Domain Types
First, define the types that flow through the system. These types auto-implement the Datum trait for streaming:
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TradingPair {
pub base: String,
pub quote: String,
}
impl TradingPair {
pub fn new(base: &str, quote: &str) -> Self {
Self { base: base.to_string(), quote: quote.to_string() }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Side { Bid, Ask }
/// Price in basis points (1 unit = 0.01). e.g., 300_000 = $3000.00
pub type Price = u64;
pub type Quantity = u64;
/// A limit order — streamable via mosaik Streams
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Order {
pub id: u64,
pub pair: TradingPair,
pub side: Side,
pub price: Price,
pub quantity: Quantity,
pub trader: String,
}
/// A fill produced when orders match
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Fill {
pub bid_order_id: u64,
pub ask_order_id: u64,
pub pair: TradingPair,
pub price: Price,
pub quantity: Quantity,
}
Because Order and Fill derive Serialize + Deserialize, they automatically implement Datum and can be used with mosaik Streams.
Step 2: Implement the State Machine
The heart of the example is the OrderBook state machine. It implements the StateMachine trait so it can be replicated across all group members via Raft:
use mosaik::groups::{ApplyContext, LogReplaySync, StateMachine};
use mosaik::primitives::UniqueId;
use std::collections::BTreeMap;
/// Commands that mutate the orderbook
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookCommand {
PlaceOrder(Order),
CancelOrder(u64),
}
/// Read-only queries against the orderbook state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQuery {
TopOfBook { pair: TradingPair, depth: usize },
Fills,
OrderCount,
}
/// Query results
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQueryResult {
TopOfBook { bids: Vec<(Price, Quantity)>, asks: Vec<(Price, Quantity)> },
Fills(Vec<Fill>),
OrderCount(usize),
}
The StateMachine trait requires four associated types:
Command— mutations that get replicated through Raft (must be serializable)Query— read requests (not replicated)QueryResult— responses to queriesStateSync— mechanism for catching up new nodes
And three core methods:
impl StateMachine for OrderBook {
type Command = OrderBookCommand;
type Query = OrderBookQuery;
type QueryResult = OrderBookQueryResult;
type StateSync = LogReplaySync<Self>;
fn signature(&self) -> UniqueId {
// Unique identifier for this state machine type.
// Contributes to GroupId derivation — different state machines
// produce different GroupIds even with the same key.
UniqueId::from("orderbook_state_machine")
}
fn apply(&mut self, command: Self::Command, _ctx: &dyn ApplyContext) {
match command {
OrderBookCommand::PlaceOrder(order) => self.place_order(order),
OrderBookCommand::CancelOrder(id) => self.cancel_order(id),
}
}
fn query(&self, query: Self::Query) -> Self::QueryResult {
match query {
OrderBookQuery::TopOfBook { pair: _, depth } => {
// Return top N price levels
// ...
}
OrderBookQuery::Fills => {
OrderBookQueryResult::Fills(self.fills.clone())
}
OrderBookQuery::OrderCount => {
let count = self.bids.values()
.chain(self.asks.values())
.map(Vec::len).sum();
OrderBookQueryResult::OrderCount(count)
}
}
}
fn state_sync(&self) -> Self::StateSync {
LogReplaySync::default()
}
}
Key points:
apply()is called on every node in the same order — this is what Raft guarantees. The matching logic must be deterministic.query()reads local state without going through Raft. WithConsistency::Strong, the query is forwarded to the leader.LogReplaySync::default()means new nodes catch up by replaying the entire command log from the beginning.
Step 3: The Matching Engine
The OrderBook implements price-time priority matching. When a new order arrives, it crosses against the opposite side:
impl OrderBook {
fn match_order(&mut self, order: &Order) -> Quantity {
let mut remaining = order.quantity;
match order.side {
Side::Bid => {
// Bids match against asks at or below the bid price
while remaining > 0 {
let Some((&ask_price, _)) = self.asks.first_key_value()
else { break };
if ask_price > order.price { break; }
let ask_level = self.asks.get_mut(&ask_price).unwrap();
while remaining > 0 && !ask_level.is_empty() {
let (ask_id, ask_qty, _) = &mut ask_level[0];
let fill_qty = remaining.min(*ask_qty);
self.fills.push(Fill {
bid_order_id: order.id,
ask_order_id: *ask_id,
pair: self.pair.clone(),
price: ask_price,
quantity: fill_qty,
});
remaining -= fill_qty;
*ask_qty -= fill_qty;
if *ask_qty == 0 { ask_level.remove(0); }
}
if ask_level.is_empty() {
self.asks.remove(&ask_price);
}
}
}
Side::Ask => {
// Asks match against bids at or above the ask price
// (mirror logic, iterating bids from highest)
// ...
}
}
remaining
}
fn place_order(&mut self, order: Order) {
let remaining = self.match_order(&order);
// Any unfilled quantity rests on the book
if remaining > 0 {
let book = match order.side {
Side::Bid => &mut self.bids,
Side::Ask => &mut self.asks,
};
book.entry(order.price)
.or_default()
.push((order.id, remaining, order.trader));
}
}
}
Because apply() is called in the same order on every replica (guaranteed by Raft), the matching results are identical across all nodes.
Step 4: Wire It All Together
The main function creates the network, forms the Raft group, and connects streams:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let pair = TradingPair::new("ETH", "USDC");
// --- 3 matcher nodes forming a Raft group ---
let matcher0 = Network::new(network_id).await?;
let matcher1 = Network::new(network_id).await?;
let matcher2 = Network::new(network_id).await?;
// Cross-discover all matchers
discover_all([&matcher0, &matcher1, &matcher2]).await?;
// Each joins the same group with an OrderBook state machine
let g0 = matcher0.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
let g1 = matcher1.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
let g2 = matcher2.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
// Wait for consensus to elect a leader
g0.when().online().await;
g1.when().online().await;
g2.when().online().await;
Connecting Traders via Streams
// --- 2 trader nodes producing orders ---
let trader_a = Network::new(network_id).await?;
let trader_b = Network::new(network_id).await?;
discover_all([&trader_a, &trader_b, &matcher0, &matcher1, &matcher2]).await?;
// Traders produce Order streams
let mut orders_a = trader_a.streams().produce::<Order>();
let mut orders_b = trader_b.streams().produce::<Order>();
// Matcher consumes orders
let mut order_consumer = matcher0.streams().consume::<Order>();
order_consumer.when().subscribed().minimum_of(2).await;
Submitting and Matching Orders
// Trader A: asks (selling ETH)
orders_a.send(Order {
id: 1, pair: pair.clone(), side: Side::Ask,
price: 300_000, quantity: 10, trader: "alice".into(),
}).await?;
// Trader B: bids (buying ETH)
orders_b.send(Order {
id: 4, pair: pair.clone(), side: Side::Bid,
price: 300_500, quantity: 15, trader: "bob".into(),
}).await?;
// Consume orders from stream and execute through Raft
for _ in 0..4 {
let order = order_consumer.next().await
.expect("expected order from stream");
g0.execute(OrderBookCommand::PlaceOrder(order)).await?;
}
Querying Results
// Query fills (strong consistency — goes through leader)
let result = g0.query(OrderBookQuery::Fills, Consistency::Strong).await?;
if let OrderBookQueryResult::Fills(fills) = result.result() {
for fill in fills {
println!("{fill}");
}
}
// Verify replication to followers
g1.when().committed().reaches(g0.committed()).await;
let follower_result = g1.query(
OrderBookQuery::Fills, Consistency::Weak
).await?;
Key patterns demonstrated:
execute()— sends a command through Raft and waits for it to be committed on a quorumquery(..., Strong)— reads through the leader for linearizable resultsquery(..., Weak)— reads local state (faster, but may be stale)when().committed().reaches(n)— waits for a follower to catch up to a specific commit index
The Helper: Cross-Discovery
The example includes a utility to fully connect all nodes:
async fn discover_all(
networks: impl IntoIterator<Item = &Network>,
) -> anyhow::Result<()> {
let networks = networks.into_iter().collect::<Vec<_>>();
for (i, net_i) in networks.iter().enumerate() {
for (j, net_j) in networks.iter().enumerate() {
if i != j {
net_i.discovery().sync_with(net_j.local().addr()).await?;
}
}
}
Ok(())
}
In production, this pairwise sync is unnecessary — a single bootstrap peer handles discovery via gossip. This utility is for testing where all nodes start simultaneously.
Running the Example
cd examples/orderbook
cargo run
Expected output:
waiting for matching engine group to come online...
matching engine online, leader: <peer-id>
matcher subscribed to both trader order streams
submitting orders...
received order: alice ASK ETH/USDC@300000 qty=10
received order: alice ASK ETH/USDC@301000 qty=5
received order: bob BID ETH/USDC@299000 qty=8
received order: bob BID ETH/USDC@300500 qty=15
1 fills produced:
Fill(bid=4, ask=1, ETH/USDC@300000, qty=10)
follower g1 sees 1 fills (consistent with leader)
orderbook example complete
Key Takeaways
- Streams for data ingestion — orders flow from traders to the matching engine via typed pub/sub
- Raft for consensus — the
OrderBookstate machine runs on all replicas with identical results because Raft guarantees the same command order execute()for writes,query()for reads — clean separation between mutations (replicated) and reads (local or forwarded)- Automatic catch-up — new replicas replay the command log to reach the current state
- Composability — Streams + Groups + StateMachine combine naturally for real distributed applications
Network
The Network is the primary entry point to the mosaik SDK. It creates the QUIC transport layer, establishes node identity, and composes all subsystems (Discovery, Streams, Groups) into a single cohesive runtime.
Creating a Network
Quick Start
For prototyping, Network::new() creates a node with default settings:
use mosaik::{Network, NetworkId};
let network = Network::new("my-app".into()).await?;
This creates a node with:
- A random secret key (new identity each run)
- Default relay mode (iroh’s relay servers for NAT traversal)
- No bootstrap peers
- No tags
- Default configs for all subsystems
Builder Pattern
For production use, the builder provides full control:
use mosaik::{Network, NetworkId, discovery, streams, groups};
use iroh::SecretKey;
let network = Network::builder("my-app".into())
.with_secret_key(my_secret_key)
.with_relay_mode(iroh::RelayMode::Disabled)
.with_mdns_discovery(true)
.with_addresses(bind_addrs)
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
.with_tags("my-role")
.with_purge_after(Duration::from_secs(600))
)
.with_streams(
streams::Config::builder()
)
.with_groups(
groups::Config::builder()
)
.build()
.await?;
Builder Options
| Method | Type | Default | Description |
|---|---|---|---|
with_secret_key() | SecretKey | Random | Node identity (determines PeerId) |
with_relay_mode() | RelayMode | Default | NAT traversal via relay servers |
with_mdns_discovery() | bool | false | Local network mDNS peer discovery |
with_addresses() | BTreeSet<SocketAddr> | Empty | Explicit bind addresses |
with_discovery() | ConfigBuilder | Defaults | Discovery subsystem configuration |
with_streams() | ConfigBuilder | Defaults | Streams subsystem configuration |
with_groups() | ConfigBuilder | Defaults | Groups subsystem configuration |
Accessing Subsystems
Once built, the Network provides access to all subsystems:
// Transport & identity
let local = network.local();
let peer_id = network.local().id();
let addr = network.local().addr();
let network_id = network.network_id();
// Subsystems
let discovery = network.discovery();
let streams = network.streams();
let groups = network.groups();
All handles are cheap to clone (Arc<Inner> internally).
LocalNode
LocalNode represents the local node’s transport and identity:
let local = network.local();
// Identity
let peer_id = local.id(); // Public key
let secret = local.secret_key(); // Secret key
let network_id = local.network_id();
// Address (for sharing with others)
let addr = local.addr(); // EndpointAddr: public key + relay URL + addrs
// Readiness
local.online().await; // Blocks until the endpoint is ready
// Low-level access to iroh endpoint
let endpoint = local.endpoint();
Waiting for Readiness
After building, you can wait for the node to be fully online:
network.online().await;
This resolves once the iroh endpoint is ready and all subsystem handlers are installed. The build() method already waits for this, so online() is mainly useful when you have a cloned network handle.
Lifecycle & Shutdown
When a Network is dropped, it cancels the internal CancellationToken, which propagates shutdown to all subsystems:
{
let network = Network::new(network_id).await?;
// Node is running...
} // Network dropped here → all tasks cancelled
For long-running services, keep the network handle alive:
let network = Network::new(network_id).await?;
// ... set up streams, groups, etc.
core::future::pending::<()>().await; // Block forever
Link: The Wire Protocol
Under the hood, all communication happens through Link<P> — a typed, framed, bidirectional QUIC stream:
pub struct Link<P: Protocol> { /* ... */ }
Links provide:
- Length-delimited framing via
LengthDelimitedCodec - Serialization via
postcard(compact binary format) - Type safety via the
Protocoltrait and generic parameter
// Open a link to a remote peer
let link = Link::<MyProtocol>::open(local, remote_addr).await?;
// Send and receive typed messages
link.send(MyMessage { /* ... */ }).await?;
let response: MyResponse = link.recv().await?;
// Split into independent halves
let (sender, receiver) = link.split();
// Close with a typed reason
link.close(MyCloseReason::Success).await?;
Most users won’t use Link directly — it’s the building block that Streams, Groups, and Discovery use internally.
Error Handling
Network construction can fail with:
| Error | Cause |
|---|---|
MissingNetworkId | Network ID not provided |
Bind(BindError) | Failed to bind the QUIC endpoint |
InvalidAddress | Invalid socket address in configuration |
DiscoveryConfig(...) | Invalid discovery configuration |
StreamsConfig(...) | Invalid streams configuration |
GroupsConfig(...) | Invalid groups configuration |
Connection-level errors use typed close reasons:
| Code | Name | Meaning |
|---|---|---|
| 200 | Success | Protocol completed normally |
| 204 | GracefulShutdown | Clean shutdown |
| 100 | InvalidAlpn | Wrong protocol identifier |
| 101 | DifferentNetwork | Peer on a different network |
| 102 | Cancelled | Operation cancelled |
| 400 | ProtocolViolation | Message deserialization failed |
| 401 | UnknownPeer | Peer not in discovery catalog |
Discovery
The Discovery subsystem handles gossip-based peer discovery and catalog synchronization. It maintains a network-wide view of all peers, their capabilities, and their available streams and groups.
Overview
Discovery uses three complementary mechanisms:
| Mechanism | Transport | Purpose |
|---|---|---|
| DHT Bootstrap | Mainline DHT (pkarr) | Automatic peer discovery via shared NetworkId |
| Announce | /mosaik/announce | Real-time gossip broadcasts via iroh-gossip |
| Catalog Sync | /mosaik/catalog-sync | Full bidirectional catalog exchange for catch-up |
Nodes sharing the same NetworkId automatically discover each other through the DHT — no hardcoded bootstrap peers are required. Once an initial connection is established, the Announce and Catalog Sync protocols take over for real-time updates.
See the DHT Bootstrap sub-chapter for details on the automatic discovery mechanism.
Configuration
Configure discovery through the Network builder:
use mosaik::{Network, discovery};
use std::time::Duration;
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
.with_bootstrap(bootstrap_addr)
.with_tags("matcher")
.with_tags(["validator", "signer"]) // additive
.with_purge_after(Duration::from_secs(600))
.with_announce_interval(Duration::from_secs(10))
.with_announce_jitter(0.3)
.with_max_time_drift(Duration::from_secs(5))
.with_events_backlog(200)
)
.build()
.await?;
Configuration Options
| Field | Default | Description |
|---|---|---|
bootstrap_peers | [] | Initial peers to connect to on startup |
tags | [] | Tags to advertise about this node |
announce_interval | 15s | How often to re-announce via gossip |
announce_jitter | 0.5 | Max jitter factor (0.0–1.0) for announce timing |
purge_after | 300s | Duration after which stale entries are purged |
max_time_drift | 10s | Maximum acceptable clock drift between peers |
events_backlog | 100 | Past events retained in event broadcast channel |
dht_publish_interval | 300s | How often to publish to the DHT (None to disable) |
dht_poll_interval | 60s | How often to poll the DHT for peers (None to disable) |
Both with_bootstrap() and with_tags() are additive — calling them multiple times adds to the list.
Accessing Discovery
let discovery = network.discovery();
The Discovery handle is cheap to clone.
Core API
Catalog Access
// Get current snapshot of all known peers
let catalog = discovery.catalog();
for (peer_id, entry) in catalog.iter() {
println!("{}: tags={:?}", peer_id, entry.tags);
}
// Watch for catalog changes
let mut watch = discovery.catalog_watch();
loop {
watch.changed().await?;
let catalog = watch.borrow();
println!("Catalog updated: {} peers", catalog.len());
}
Dialing Peers
// Connect to bootstrap peers manually
discovery.dial(bootstrap_addr);
// Dial multiple peers
discovery.dial([addr1, addr2, addr3]);
Manual Sync
// Trigger a full catalog sync with a specific peer
discovery.sync_with(peer_addr).await?;
Managing Tags
// Add tags at runtime
discovery.add_tags("new-role");
discovery.add_tags(["role-a", "role-b"]);
// Remove tags
discovery.remove_tags("old-role");
Changing tags triggers an immediate re-announcement to the network.
Local Entry
// Get this node's signed entry (as others see it)
let my_entry = discovery.me();
println!("I am: {:?}", my_entry);
Unsigned Entries
For testing or manual feeds, you can insert entries that aren’t cryptographically signed:
use mosaik::discovery::PeerEntry;
// Insert an unsigned entry (local-only, not gossiped)
discovery.insert(PeerEntry { /* ... */ });
// Remove a specific peer
discovery.remove(peer_id);
// Clear all unsigned entries
discovery.clear_unsigned();
Injecting Signed Entries
// Feed a signed entry from an external source
let success = discovery.feed(signed_peer_entry);
Events
Subscribe to discovery lifecycle events:
let mut events = discovery.events();
while let Ok(event) = events.recv().await {
match event {
Event::PeerDiscovered(entry) => {
println!("New peer: {}", entry.peer_id());
}
Event::PeerUpdated(entry) => {
println!("Peer updated: {}", entry.peer_id());
}
Event::PeerDeparted(peer_id) => {
println!("Peer left: {}", peer_id);
}
}
}
See the Events sub-chapter for details.
How It Works
Announce Protocol
Every node periodically broadcasts its SignedPeerEntry via iroh-gossip. The announce includes:
- Node identity (PeerId)
- Network ID
- Tags
- Available streams and groups
- Version (start + update timestamps)
The jitter on the announce interval prevents all nodes from announcing simultaneously. When a node changes its metadata (adds a tag, creates a stream, joins a group), it re-announces immediately.
Catalog Sync Protocol
When a new node connects to a peer, they exchange their full catalogs bidirectionally. This ensures a new node quickly learns about all existing peers without waiting for gossip cycles.
Signed vs Unsigned Entries
| Property | Signed | Unsigned |
|---|---|---|
| Source | Created by the peer itself | Injected locally |
| Verification | Cryptographic signature | None |
| Gossip | Yes — propagated network-wide | No — local only |
| Use case | Normal operation | Testing, manual feeds |
Staleness Detection
Each entry has a two-part version: (start_timestamp, update_timestamp). If update_timestamp falls behind the current time by more than purge_after, the entry is hidden from the public catalog API and eventually removed.
See the Catalog sub-chapter for the full catalog API.
DHT Bootstrap
Mosaik includes an automatic peer discovery mechanism based on the Mainline DHT (the same DHT used by BitTorrent). Nodes sharing the same NetworkId automatically discover each other — no hardcoded bootstrap peers are needed.
How It Works
Each node derives a DHT key from its NetworkId and uses it to both publish its own address and poll for other nodes’ addresses. The general flow is:
- Publish — The node resolves the current DHT record for the network key, appends its own address, and publishes the updated record back to the DHT.
- Poll — The node periodically reads the DHT record and dials any peers it finds.
Because both publish and poll use the same deterministic key derived from the NetworkId, all nodes in the same network naturally converge on the same record.
Record Structure
Each DHT record is a pkarr signed packet containing DNS resource records. Every peer in the record has:
- An A record with a unique subdomain to identify the peer
- A TXT record with a
peers=Nfield indicating how many peers that node currently knows about
Capacity Management
Mainline DHT records are limited to 1000 bytes. To stay within this limit, a maximum of 12 peers are stored per record. When the record is full and a new node needs to publish, the least-connected peer (the one with the lowest peers=N count) is evicted. Ties are broken randomly to avoid deterministic starvation.
Evicted peers are not forgotten — they are dialed directly so they can still join the network through normal gossip-based discovery.
Adaptive Polling
Polling uses an adaptive interval:
- Aggressive polling (5 seconds) — when the node has no known peers yet, it polls frequently to bootstrap quickly.
- Relaxed polling (configurable, default 60 seconds) — once the node has discovered at least one peer, polling slows down to reduce DHT load.
Publish Cycle
Publishing runs on a slower interval (default 5 minutes) and uses compare-and-swap (CAS) to safely update the shared record without overwriting concurrent changes from other nodes. A random startup jitter prevents all nodes from publishing simultaneously.
Configuration
DHT bootstrap is enabled by default. You can tune the intervals or disable it entirely:
use mosaik::{Network, discovery};
use std::time::Duration;
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
// Customize DHT intervals
.with_dht_publish_interval(Some(Duration::from_secs(300)))
.with_dht_poll_interval(Some(Duration::from_secs(60)))
)
.build()
.await?;
To disable automatic bootstrap (e.g., when using only explicit bootstrap peers):
let network = Network::builder(network_id)
.with_discovery(
discovery::Config::builder()
.no_auto_bootstrap()
.with_bootstrap(bootstrap_addr) // use explicit peers instead
)
.build()
.await?;
Configuration Options
| Field | Default | Description |
|---|---|---|
dht_publish_interval | 300s | How often to publish this node’s address to the DHT. None disables publishing. |
dht_poll_interval | 60s | How often to poll the DHT for new peers. None disables polling. Actual interval is adaptive (5s when no peers are known). |
When to Use
DHT bootstrap is ideal for:
- Decentralized deployments where there is no fixed infrastructure to serve as bootstrap nodes
- Dynamic environments where nodes come and go frequently
- Zero-configuration setups where nodes only need to agree on a network name
For networks with stable infrastructure, you can combine DHT bootstrap with explicit bootstrap peers — nodes will use whichever method finds peers first.
Catalog
The Catalog is an immutable snapshot of all discovered peers in the network. It’s thread-safe, cheap to clone, and updated via watch channels.
Getting a Catalog
let catalog = network.discovery().catalog();
Each call returns a snapshot — it won’t change after you’ve obtained it. To observe changes, use the watch channel.
Catalog API
Iterating Peers
let catalog = discovery.catalog();
// Iterate all peers
for (peer_id, entry) in catalog.iter() {
println!("Peer {}: {:?}", peer_id, entry.tags);
}
// Count peers
println!("Known peers: {}", catalog.len());
Watching for Changes
let mut watch = discovery.catalog_watch();
loop {
// Wait for catalog to change
watch.changed().await?;
// Borrow the latest snapshot
let catalog = watch.borrow();
println!("Catalog updated, {} peers", catalog.len());
}
The watch receiver always has the latest value. Multiple calls to changed() may skip intermediate updates if the catalog changes faster than you observe it.
PeerEntry
Each peer in the catalog is represented by a PeerEntry:
pub struct PeerEntry {
pub network_id: NetworkId,
pub peer_id: PeerId,
pub addr: EndpointAddr,
pub tags: BTreeSet<Tag>,
pub streams: BTreeSet<StreamId>,
pub groups: BTreeSet<GroupId>,
pub version: PeerEntryVersion,
}
Fields
| Field | Type | Description |
|---|---|---|
network_id | NetworkId | Which network this peer belongs to |
peer_id | PeerId | The peer’s public key |
addr | EndpointAddr | Connection address (public key + relay + direct addrs) |
tags | BTreeSet<Tag> | Capability labels (e.g., "matcher", "validator") |
streams | BTreeSet<StreamId> | Streams this peer produces |
groups | BTreeSet<GroupId> | Groups this peer belongs to |
version | PeerEntryVersion | Two-part version for staleness detection |
Signed Entries
In practice, most catalog entries are SignedPeerEntry — a PeerEntry with a cryptographic signature proving the owner created it:
let my_entry: SignedPeerEntry = discovery.me();
let peer_id = my_entry.peer_id();
Version & Staleness
Each entry carries a two-part version:
pub struct PeerEntryVersion {
pub start: Timestamp, // When the peer first came online
pub update: Timestamp, // When the entry was last updated
}
Entries where update is older than purge_after (default: 300s) are considered stale. Stale entries are:
- Hidden from the public catalog API
- Eventually removed if not refreshed
The periodic gossip re-announce (every ~15s by default) keeps entries fresh. When a node departs gracefully, it broadcasts a departure message; ungraceful departures are detected by staleness.
Using Catalog for Filtering
The catalog is commonly used to filter peers in Streams:
// Producer: only accept consumers with specific tags
let producer = network.streams().producer::<Order>()
.accept_if(|peer| peer.tags.contains(&"authorized".into()))
.build()?;
// Consumer: only subscribe to specific producers
let consumer = network.streams().consumer::<Order>()
.subscribe_if(|peer| peer.tags.contains(&"primary".into()))
.build();
Internal Structure
The catalog uses im::OrdMap — an immutable, persistent ordered map — for its internal storage. This provides:
- O(1) cloning — snapshots are effectively free
- Consistent iteration order — deterministic across nodes
- Thread safety — immutable snapshots can be shared freely
Updates create new snapshots atomically via tokio::sync::watch::Sender<Catalog>.
Events
The Discovery subsystem emits events when the peer catalog changes. You can subscribe to these events for reactive programming.
Subscribing to Events
let mut events = network.discovery().events();
while let Ok(event) = events.recv().await {
match event {
Event::PeerDiscovered(entry) => {
println!("New peer joined: {}", entry.peer_id());
}
Event::PeerUpdated(entry) => {
println!("Peer updated: {}", entry.peer_id());
}
Event::PeerDeparted(peer_id) => {
println!("Peer departed: {}", peer_id);
}
}
}
The events() method returns a tokio::sync::broadcast::Receiver<Event>. Multiple consumers can subscribe independently.
Event Types
PeerDiscovered
Emitted when a previously unknown peer is added to the catalog:
Event::PeerDiscovered(signed_peer_entry)
This fires for both signed entries (from gossip/sync) and unsigned entries (from manual insert()).
PeerUpdated
Emitted when an existing peer’s entry changes:
Event::PeerUpdated(signed_peer_entry)
Common triggers:
- Peer added or removed tags
- Peer started or stopped producing a stream
- Peer joined or left a group
- Peer’s address changed
PeerDeparted
Emitted when a peer is removed from the catalog:
Event::PeerDeparted(peer_id)
This fires when:
- A peer’s entry becomes stale and is purged
- A peer gracefully departs via the departure protocol
- An unsigned entry is removed via
discovery.remove(peer_id)
Backlog
The event channel retains up to events_backlog (default: 100) past events. If a consumer falls behind, older events are dropped and the consumer receives a RecvError::Lagged(n) indicating how many events were missed.
match events.recv().await {
Ok(event) => { /* handle event */ }
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Missed {n} events, consider full catalog re-read");
let catalog = discovery.catalog();
// Reconcile from snapshot
}
Err(broadcast::error::RecvError::Closed) => break,
}
Catalog Watch vs Events
Choose the right tool:
| Use Case | Tool | Why |
|---|---|---|
| React to specific changes | events() | Get individual events with full context |
| Get current state | catalog() | Snapshot of all peers |
| React to any change | catalog_watch() | Triggered on every update, borrow latest |
| Build a UI dashboard | catalog_watch() | Re-render on changes, read full state |
| Log peer arrivals/departures | events() | Specific events about each peer |
Streams
Streams are the primary dataflow primitive in mosaik. They represent typed, asynchronous data channels that connect producers and consumers across a network.
Overview
Producer Node Consumer Node
┌─────────────────┐ ┌─────────────────┐
│ Producer<D> │────QUIC────────│ Consumer<D> │
│ (Sink impl) │ /mosaik/ │ (Stream impl) │
│ │ streams/1.0 │ │
└─────────────────┘ └─────────────────┘
A producer announces a stream via Discovery. Consumers discover the producer, open a QUIC connection using the /mosaik/streams/1.0 ALPN, and begin receiving data. The entire lifecycle is automatic — you only create the handles.
The Datum Trait
Every type sent through a stream must implement Datum:
pub trait Datum: Serialize + DeserializeOwned + Send + 'static {
fn derived_stream_id() -> StreamId {
core::any::type_name::<Self>().into()
}
}
Datum is a blanket impl — any Serialize + DeserializeOwned + Send + 'static type is automatically a Datum. The derived_stream_id() method computes a StreamId (a Digest) from the Rust type name, so each type naturally maps to a unique stream.
#[derive(Serialize, Deserialize)]
struct PriceUpdate {
symbol: String,
price: f64,
}
// PriceUpdate is automatically a Datum
// StreamId = blake3("my_crate::PriceUpdate")
Quick Usage
Producing data:
let producer = network.streams().produce::<PriceUpdate>();
// Wait until at least one consumer is connected
producer.when().online().await;
// Send via Sink trait
use futures::SinkExt;
producer.send(PriceUpdate { symbol: "ETH".into(), price: 3200.0 }).await?;
// Or send immediately (non-blocking)
producer.try_send(PriceUpdate { symbol: "BTC".into(), price: 65000.0 })?;
Consuming data:
let mut consumer = network.streams().consume::<PriceUpdate>();
// Wait until connected to at least one producer
consumer.when().online().await;
// Receive via async method
while let Some(update) = consumer.recv().await {
println!("{}: ${}", update.symbol, update.price);
}
// Or use as a futures::Stream
use futures::StreamExt;
while let Some(update) = consumer.next().await {
println!("{}: ${}", update.symbol, update.price);
}
Stream Identity
By default, a stream’s identity comes from Datum::derived_stream_id(), which hashes the Rust type name. You can override this with a custom StreamId:
let producer = network.streams()
.producer::<PriceUpdate>()
.with_stream_id("custom-price-feed")
.build()?;
This lets you have multiple distinct streams of the same data type.
Architecture
Streams are built on top of the Discovery and Network subsystems:
- Producer creation — the local discovery entry is updated to advertise the stream
- Consumer creation — the consumer worker discovers producers via the catalog and opens subscriptions
- Subscription — a QUIC bi-directional stream is opened; the consumer sends its
Criteria, the producer sends data - Fanout — each consumer gets its own independent sender loop so a slow consumer does not block others
- Cleanup — when handles are dropped, underlying tasks are cancelled
Close Reason Codes
When a stream subscription fails, the producer sends structured close reasons:
| Code | Name | Meaning |
|---|---|---|
10_404 | StreamNotFound | The requested stream does not exist on the producer |
10_403 | NotAllowed | The consumer is rejected by the producer’s accept_if predicate |
10_509 | NoCapacity | The producer has reached max_consumers |
10_413 | TooSlow | The consumer was disconnected for lagging behind |
Subsystem Configuration
The Streams config currently has one setting:
Config::builder()
.with_backoff(ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(300)))
.build())
.build()?;
| Option | Default | Description |
|---|---|---|
backoff | Exponential (max 5 min) | Default backoff policy for consumer connection retries |
Individual producers and consumers can override this via their respective builders.
Producers
A Producer<D> is a handle for sending data of type D to all connected consumers. Multiple Producer handles can exist for the same stream (mpsc pattern) — they share the same underlying fanout sink.
Creating Producers
Simple (default configuration):
let producer = network.streams().produce::<MyDatum>();
If a producer for this datum type already exists, the existing one is returned.
With builder (advanced configuration):
let producer = network.streams()
.producer::<MyDatum>()
.accept_if(|peer| peer.tags().contains(&"trusted".into()))
.online_when(|c| c.minimum_of(2).with_tags("validator"))
.disconnect_lagging(true)
.with_buffer_size(2048)
.with_max_consumers(10)
.with_stream_id("custom-id")
.build()?;
Builder Options
| Method | Default | Description |
|---|---|---|
accept_if(predicate) | Accept all | Predicate to accept/reject incoming consumer connections |
online_when(conditions) | minimum_of(1) | Conditions under which the producer is online |
disconnect_lagging(bool) | true | Disconnect consumers that fall behind buffer_size |
with_buffer_size(n) | 1024 | Internal channel buffer size |
with_max_consumers(n) | usize::MAX | Maximum allowed simultaneous consumers |
with_stream_id(id) | D::derived_stream_id() | Custom stream identity |
with_undelivered_sink(sender) | None | Capture datum that no consumer matched |
Sending Data
Via Sink Trait
Producer<D> implements futures::Sink<D>. The send() method waits for the producer to be online before accepting data:
use futures::SinkExt;
// Blocks until at least one consumer is connected (default online condition)
producer.send(datum).await?;
If the producer is offline, poll_ready will not resolve until the online conditions are met.
Via try_send
For non-blocking sends:
match producer.try_send(datum) {
Ok(()) => { /* sent to fanout */ }
Err(Error::Offline(d)) => { /* no consumers, datum returned */ }
Err(Error::Full(d)) => { /* buffer full, datum returned */ }
Err(Error::Closed(d)) => { /* producer closed */ }
}
All error variants return the unsent datum so you can retry or inspect it.
Online Conditions
By default, a producer is online when it has at least one connected consumer. Customize this:
// Online when at least 3 validators are subscribed
let producer = network.streams()
.producer::<MyDatum>()
.online_when(|c| c.minimum_of(3).with_tags("validator"))
.build()?;
Check online status at any time:
if producer.is_online() {
producer.try_send(datum)?;
}
Observing Status
The when() API provides reactive status monitoring:
// Wait until online
producer.when().online().await;
// Wait until offline
producer.when().offline().await;
// Wait until subscribed by at least one peer
producer.when().subscribed().await;
// Wait until subscribed by at least N peers
producer.when().subscribed().minimum_of(3).await;
// Wait until subscribed by peers with specific tags
producer.when().subscribed().with_tags("validator").await;
// Wait until no subscribers
producer.when().unsubscribed().await;
Inspecting Consumers
Iterate over the currently connected consumers:
for info in producer.consumers() {
println!(
"Consumer {} connected to stream {} — state: {:?}, stats: {}",
info.consumer_id(),
info.stream_id(),
info.state(),
info.stats(),
);
}
Each ChannelInfo provides access to:
stream_id()— the stream this subscription is forproducer_id()/consumer_id()— the peer IDspeer()— thePeerEntrysnapshot at subscription timestate()—Connecting,Connected, orTerminatedstate_watcher()— awatch::Receiver<State>for monitoring changesstats()—Statswithdatums(),bytes(),uptime()is_connected()— shorthand forstate() == Connecteddisconnected()— future that resolves when terminated
Undelivered Sink
Capture datum that did not match any consumer’s criteria:
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let producer = network.streams()
.producer::<MyDatum>()
.with_undelivered_sink(tx)
.build()?;
// In another task
while let Some(datum) = rx.recv().await {
tracing::warn!("Datum had no matching consumer: {:?}", datum);
}
Note: In default configuration (online when ≥ 1 subscriber), undelivered events only occur if connected consumers’ criteria reject the datum. If you customize
online_whento allow publishing with zero subscribers, the sink captures all datum sent while nobody is listening.
Builder Errors
build() returns Err(BuilderError::AlreadyExists(existing)) if a producer for the stream ID already exists. The error contains the existing Producer<D> handle so you can use it directly:
match network.streams().producer::<MyDatum>().build() {
Ok(new) => new,
Err(BuilderError::AlreadyExists(existing)) => existing,
}
The simple produce() method handles this automatically — returning the existing producer if one exists.
Consumers
A Consumer<D> receives data of type D from remote producers. Each consumer has its own worker task that discovers producers, manages subscriptions, and delivers data.
Creating Consumers
Simple (default configuration):
let mut consumer = network.streams().consume::<MyDatum>();
With builder (advanced configuration):
let mut consumer = network.streams()
.consumer::<MyDatum>()
.subscribe_if(|peer| peer.tags().contains(&"primary".into()))
.with_criteria(Criteria::default())
.with_stream_id("custom-id")
.with_backoff(ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(60)))
.build())
.build();
Builder Options
| Method | Default | Description |
|---|---|---|
subscribe_if(predicate) | Accept all | Filter which producers to subscribe to |
with_criteria(criteria) | Criteria::default() | Data selection criteria sent to producers |
with_stream_id(id) | D::derived_stream_id() | Custom stream identity |
with_backoff(policy) | From Streams config | Backoff policy for reconnection retries |
Receiving Data
Via recv()
The primary receiving method — async, blocks until data is available:
while let Some(datum) = consumer.recv().await {
process(datum);
}
// Returns None when the consumer is closed
Via try_recv()
Non-blocking receive for polling patterns:
match consumer.try_recv() {
Ok(datum) => process(datum),
Err(TryRecvError::Empty) => { /* no data available */ }
Err(TryRecvError::Disconnected) => { /* consumer closed */ }
}
Via Stream Trait
Consumer<D> implements futures::Stream<Item = D>:
use futures::StreamExt;
while let Some(datum) = consumer.next().await {
process(datum);
}
// Or with combinators
let filtered = consumer
.filter(|d| futures::future::ready(d.price > 100.0))
.take(10)
.collect::<Vec<_>>()
.await;
Producer Selection
By default, a consumer subscribes to every discovered producer of the same stream ID. Use subscribe_if to be selective:
// Only subscribe to producers tagged as "primary"
let consumer = network.streams()
.consumer::<PriceUpdate>()
.subscribe_if(|peer| peer.tags().contains(&"primary".into()))
.build();
The predicate receives a &PeerEntry and is evaluated each time a new producer is discovered.
Criteria
Criteria are sent to the producer when subscribing. They allow content-based filtering at the source. Currently Criteria is a placeholder that matches everything:
pub struct Criteria {}
impl Criteria {
pub const fn matches<D: Datum>(&self, _item: &D) -> bool {
true
}
}
Backoff & Reconnection
When a connection to a producer fails, the consumer retries with an exponential backoff policy.
Global default (from Streams config): exponential backoff up to 5 minutes.
Per-consumer override:
use backoff::ExponentialBackoffBuilder;
let consumer = network.streams()
.consumer::<MyDatum>()
.with_backoff(ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(30)))
.build())
.build();
Observing Status
The when() API mirrors the producer side:
// Wait until connected to at least one producer
consumer.when().online().await;
// Wait until connected to at least one producer
consumer.when().subscribed().await;
// Wait until connected to at least N producers
consumer.when().subscribed().to_at_least(3).await;
// Wait until no producers are connected
consumer.when().unsubscribed().await;
Check online status imperatively:
if consumer.is_online() {
// consumer has active connections
}
Statistics
Each consumer tracks aggregated stats:
let stats = consumer.stats();
println!("Received {} datums ({} bytes)", stats.datums(), stats.bytes());
if let Some(uptime) = stats.uptime() {
println!("Connected for {:?}", uptime);
}
Stats provides:
datums()— total number of datums receivedbytes()— total serialized bytes receiveduptime()—Option<Duration>since last connection (None if currently disconnected)
Inspecting Producers
for info in consumer.producers() {
println!(
"Connected to producer {} — {:?}, {}",
info.producer_id(),
info.state(),
info.stats(),
);
}
Each ChannelInfo provides the same API as on the producer side.
Multiple Consumers
Multiple consumers can be created for the same stream. Each gets its own independent copy of the data:
let mut consumer_a = network.streams().consume::<PriceUpdate>();
let mut consumer_b = network.streams().consume::<PriceUpdate>();
// Both receive all PriceUpdate data independently
Lifecycle
When a Consumer handle is dropped, its background worker task is cancelled (via a DropGuard) and all connections to producers are closed. No explicit shutdown is needed.
Status & Conditions
Both producers and consumers expose a reactive status API through When and ChannelConditions. This allows you to write event-driven code that reacts to changes in subscription state.
The When API
Every Producer and Consumer provides a when() method that returns a &When handle:
let producer = network.streams().produce::<MyDatum>();
let when = producer.when();
Online / Offline
// Resolves when the channel is ready (all publishing conditions met)
when.online().await;
// Resolves when the channel goes offline
when.offline().await;
// Check immediately
if when.is_online() { /* ... */ }
For producers, “online” means the online_when conditions are met (default: at least 1 consumer). For consumers, “online” means at least one producer connection is active.
Subscribed / Unsubscribed
// Resolves when at least one peer is connected
when.subscribed().await;
// Resolves when zero peers are connected
when.unsubscribed().await;
ChannelConditions
when().subscribed() returns a ChannelConditions — a composable future that resolves when the subscription state matches your criteria.
Minimum Peers
// Wait for at least 3 connected peers
when.subscribed().minimum_of(3).await;
Tag Filtering
// Wait for peers that have the "validator" tag
when.subscribed().with_tags("validator").await;
// Multiple tags (all must be present on the peer)
when.subscribed().with_tags(["validator", "us-east"]).await;
Custom Predicates
// Wait for a peer matching an arbitrary condition
when.subscribed()
.with_predicate(|peer: &PeerEntry| peer.tags().len() >= 3)
.await;
Combining Conditions
Conditions compose naturally:
// At least 2 validators from the us-east region
when.subscribed()
.minimum_of(2)
.with_tags("validator")
.with_predicate(|p| p.tags().contains(&"us-east".into()))
.await;
Inverse Conditions
Use unmet() to invert a condition:
// Wait until there are fewer than 3 validators
when.subscribed()
.minimum_of(3)
.with_tags("validator")
.unmet()
.await;
Re-Triggering
ChannelConditions implements Future and can be polled repeatedly. After resolving, it resets and will resolve again when the condition transitions from not met → met. This makes it suitable for use in loops:
let mut condition = when.subscribed().minimum_of(2);
loop {
(&mut condition).await;
println!("We now have 2+ subscribers again!");
}
Checking Immediately
let condition = when.subscribed().minimum_of(3);
if condition.is_condition_met() {
// There are currently 3+ matching peers
}
ChannelConditions also supports comparison with bool:
if when.subscribed().minimum_of(3) == true {
// equivalent to is_condition_met()
}
ChannelInfo
ChannelInfo represents an individual connection between a consumer and a producer:
pub struct ChannelInfo {
stream_id: StreamId,
criteria: Criteria,
producer_id: PeerId,
consumer_id: PeerId,
stats: Arc<Stats>,
peer: Arc<PeerEntry>,
state: watch::Receiver<State>,
}
Connection State
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum State {
Connecting, // Handshake in progress
Connected, // Active subscription
Terminated, // Unrecoverably closed
}
Monitor state changes:
let info: ChannelInfo = /* from producers() or consumers() */;
// Check current state
match info.state() {
State::Connected => { /* active */ }
State::Connecting => { /* handshake in progress */ }
State::Terminated => { /* closed */ }
}
// Watch for state transitions
let mut watcher = info.state_watcher().clone();
while watcher.changed().await.is_ok() {
println!("State changed to: {:?}", *watcher.borrow());
}
// Wait for disconnection
info.disconnected().await;
Stats
Every channel tracks real-time statistics:
let stats = info.stats();
println!("Datums: {}", stats.datums()); // Total datums transferred
println!("Bytes: {}", stats.bytes()); // Total bytes transferred
println!("Uptime: {:?}", stats.uptime()); // Option<Duration> since connection
// Display formatting included
println!("{}", stats);
// Output: "uptime: 2m 30s, datums: 15432, bytes: 1.23 MB"
Stats implements Display with human-readable formatting using humansize and humantime.
Using Status for Online Conditions
The ChannelConditions type is also used to configure producer online conditions via the builder:
let producer = network.streams()
.producer::<MyDatum>()
.online_when(|c| c.minimum_of(2).with_tags("validator"))
.build()?;
The closure receives a ChannelConditions and returns it with your conditions applied. This is evaluated every time the subscription set changes.
Groups
Availability Groups are clusters of trusted nodes on the same mosaik network that coordinate with each other for load balancing and failover. Members of a group share a secret key, maintain a consistent replicated state through a modified Raft consensus protocol, and stay connected via an all-to-all mesh of persistent bonds.
Overview
┌──────────┐
│ Node A │
│ (Leader) │
└────┬──┬───┘
bond/ \bond
/ \
┌──────────┐ ┌──────────┐
│ Node B │─bond─│ Node C │
│(Follower) │ │(Follower) │
└───────────┘ └───────────┘
Every pair of group members maintains a persistent bond — an authenticated, bidirectional QUIC connection. Bonds carry Raft consensus messages, heartbeats, and log-sync traffic.
Trust Model
Groups are not Byzantine fault tolerant. All members within a group are assumed to be honest and operated by the same entity. The GroupKey acts as the sole admission control — only nodes that know the key can join.
Quick Start
use mosaik::groups::GroupKey;
// All group members must use the same key
let key = GroupKey::generate();
// Join with default settings (NoOp state machine)
let group = network.groups().with_key(key).join();
// Wait for leader election
let leader = group.when().leader_elected().await;
println!("Leader: {leader}");
// Check local role
if group.is_leader() {
println!("I am the leader");
}
Group Identity
A GroupId is derived from three components:
- Group key — the shared secret (
GroupKey) - Consensus configuration — election timeouts, heartbeat intervals, etc.
- State machine signature — the state machine’s
signature()+ state syncsignature()
Any divergence in these values across nodes produces a different GroupId, preventing misconfigured nodes from bonding.
// GroupId derivation (internal)
let id = key.secret().hashed()
.derive(consensus.digest())
.derive(state_machine.signature())
.derive(state_machine.state_sync().signature());
Key Types
| Type | Description |
|---|---|
Groups | Public API gateway — one per Network |
GroupBuilder | Typestate builder for configuring and joining a group |
Group<M> | Handle for interacting with a joined group |
GroupKey | Shared secret for admission control |
GroupId | Unique identifier (Digest) derived from key + config + machine |
Bond / Bonds | Persistent connections between group members |
When | Reactive status API for group state changes |
ConsensusConfig | Raft timing parameters |
ALPN Protocol
Groups use /mosaik/groups/1 as their ALPN identifier.
Close Reason Codes
| Code | Name | Meaning |
|---|---|---|
30_400 | InvalidHandshake | Error during handshake decoding |
30_404 | GroupNotFound | Group ID not known to acceptor |
30_405 | InvalidProof | Authentication proof invalid |
30_408 | Timeout | Timed out waiting for response |
30_429 | AlreadyBonded | A bond already exists between these peers |
Subsystem Configuration
use mosaik::groups::Config;
let config = Config::builder()
.with_handshake_timeout(Duration::from_secs(2)) // default
.build()?;
| Option | Default | Description |
|---|---|---|
handshake_timeout | 2 seconds | Timeout for bond handshake completion |
Joining Groups
The GroupBuilder uses a typestate pattern to ensure groups are configured correctly at compile time. You cannot call join() until both storage and state machine are set (or you use the shorthand join() which defaults to NoOp).
Builder Flow
groups.with_key(key)
├── .join() // NoOp machine, InMemoryLogStore
└── .with_state_machine(machine)
├── .join() // InMemoryLogStore (default)
├── .with_consensus_config(c)
│ └── .join()
└── .with_log_storage(store)
└── .join()
Minimal Join (NoOp)
Useful for leader election without application logic:
let group = network.groups().with_key(key).join();
This creates a group with:
NoOpstate machine (commands are(), queries are())InMemoryLogStorefor storage- Default
ConsensusConfig
With Custom State Machine
let group = network.groups()
.with_key(key)
.with_state_machine(MyStateMachine::new())
.join();
The state machine must be set before storage, since the storage type depends on the command type.
With Custom Storage
let group = network.groups()
.with_key(key)
.with_state_machine(MyStateMachine::new())
.with_log_storage(MyDurableStore::new())
.join();
Storage must implement Storage<M::Command>.
GroupKey
A GroupKey is the shared secret that all members must possess:
use mosaik::groups::GroupKey;
// Generate a new random key
let key = GroupKey::generate();
// All members use the same key
// The key can be serialized and distributed securely
ConsensusConfig
All consensus parameters are part of GroupId derivation — every member must use the same values.
use mosaik::groups::ConsensusConfig;
let config = ConsensusConfig::builder()
.with_heartbeat_interval(Duration::from_millis(500)) // default
.with_heartbeat_jitter(Duration::from_millis(150)) // default
.with_max_missed_heartbeats(10) // default
.with_election_timeout(Duration::from_secs(2)) // default
.with_election_timeout_jitter(Duration::from_millis(500))// default
.with_bootstrap_delay(Duration::from_secs(3)) // default
.with_forward_timeout(Duration::from_secs(2)) // default
.with_query_timeout(Duration::from_secs(2)) // default
.build()?;
let group = network.groups()
.with_key(key)
.with_state_machine(machine)
.with_consensus_config(config)
.join();
Parameters
| Parameter | Default | Description |
|---|---|---|
heartbeat_interval | 500ms | Interval between bond heartbeat pings |
heartbeat_jitter | 150ms | Max random jitter subtracted from heartbeat interval |
max_missed_heartbeats | 10 | Missed heartbeats before bond is considered dead |
election_timeout | 2s | Base timeout before a follower starts an election |
election_timeout_jitter | 500ms | Max random jitter added to election timeout |
bootstrap_delay | 3s | Wait time before first election to allow peer discovery |
forward_timeout | 2s | Timeout for forwarding commands to the leader |
query_timeout | 2s | Timeout for strong-consistency query responses |
Leadership Preference
Nodes can deprioritize leadership to prefer being followers:
// 3x longer election timeout (default multiplier)
let config = ConsensusConfig::default().deprioritize_leadership();
// Custom multiplier
let config = ConsensusConfig::default().deprioritize_leadership_by(5);
This multiplies both election_timeout and bootstrap_delay, reducing the chance of becoming leader.
Idempotent Joins
If you join() a group whose GroupId already exists on this node, the existing Group handle is returned. No duplicate worker is spawned.
Lifecycle
When a Group<M> handle is dropped:
- Bonds notify peers of the departure
- The group’s cancellation token is triggered
- The group is removed from the active groups map
State Machines
Every group runs a replicated state machine (RSM) that processes commands and answers queries. The StateMachine trait is the primary extension point for application logic.
The StateMachine Trait
pub trait StateMachine: Sized + Send + Sync + 'static {
type Command: Command; // Mutating operations
type Query: Query; // Read-only operations
type QueryResult: QueryResult; // Query responses
type StateSync: StateSync<Machine = Self>; // Catch-up strategy
fn signature(&self) -> UniqueId;
fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext);
fn query(&self, query: Self::Query) -> Self::QueryResult;
fn state_sync(&self) -> Self::StateSync;
// Optional overrides
fn apply_batch(&mut self, commands: impl IntoIterator<Item = Self::Command>, ctx: &dyn ApplyContext) { ... }
fn leadership_preference(&self) -> LeadershipPreference { LeadershipPreference::Normal }
}
Associated Types
| Type | Bound | Purpose |
|---|---|---|
Command | Clone + Send + Serialize + DeserializeOwned + 'static | State-mutating operations replicated via Raft log |
Query | Clone + Send + Serialize + DeserializeOwned + 'static | Read-only operations, not replicated |
QueryResult | Clone + Send + Serialize + DeserializeOwned + 'static | Responses to queries |
StateSync | StateSync<Machine = Self> | How lagging followers catch up |
All message types get blanket implementations from StateMachineMessage, so any Clone + Send + Serialize + DeserializeOwned + 'static type qualifies.
Implementing a State Machine
Here is a complete counter example:
use mosaik::groups::*;
use serde::{Serialize, Deserialize};
#[derive(Default)]
struct Counter {
value: i64,
}
#[derive(Clone, Serialize, Deserialize)]
enum CounterCmd {
Increment(i64),
Decrement(i64),
Reset,
}
#[derive(Clone, Serialize, Deserialize)]
enum CounterQuery {
Value,
}
impl StateMachine for Counter {
type Command = CounterCmd;
type Query = CounterQuery;
type QueryResult = i64;
type StateSync = LogReplaySync<Self>;
fn signature(&self) -> UniqueId {
UniqueId::from("counter_v1")
}
fn apply(&mut self, command: CounterCmd, _ctx: &dyn ApplyContext) {
match command {
CounterCmd::Increment(n) => self.value += n,
CounterCmd::Decrement(n) => self.value -= n,
CounterCmd::Reset => self.value = 0,
}
}
fn query(&self, query: CounterQuery) -> i64 {
match query {
CounterQuery::Value => self.value,
}
}
fn state_sync(&self) -> LogReplaySync<Self> {
LogReplaySync::default()
}
}
The signature() Method
Returns a UniqueId that is part of the GroupId derivation. All group members must return the same signature. Different signatures → different GroupId → nodes cannot bond.
Use it to version your state machine:
fn signature(&self) -> UniqueId {
UniqueId::from("orderbook_matching_engine_v2")
}
ApplyContext
The apply() method receives a &dyn ApplyContext providing deterministic metadata:
pub trait ApplyContext {
fn committed(&self) -> Cursor; // Last committed position before this batch
fn log_position(&self) -> Cursor; // Last log position
fn current_term(&self) -> Term; // Term of the commands being applied
}
Important: The context is safe for deterministic state machines — it never exposes non-deterministic data that could diverge across nodes.
Batch Apply
For performance, override apply_batch:
fn apply_batch(
&mut self,
commands: impl IntoIterator<Item = Self::Command>,
ctx: &dyn ApplyContext,
) {
// Apply all commands in a single database transaction
let tx = self.db.begin();
for command in commands {
self.apply_one(&tx, command, ctx);
}
tx.commit();
}
The default implementation simply calls apply() for each command sequentially.
NoOp State Machine
For leader-election-only use cases, mosaik provides NoOp:
#[derive(Debug, Default)]
pub struct NoOp;
impl StateMachine for NoOp {
type Command = ();
type Query = ();
type QueryResult = ();
type StateSync = LogReplaySync<Self>;
// ...
}
Usage:
let group = network.groups().with_key(key).join(); // implicitly uses NoOp
State Synchronization
The state_sync() method returns a StateSync implementation used when followers need to catch up. For most cases, use LogReplaySync:
fn state_sync(&self) -> LogReplaySync<Self> {
LogReplaySync::default()
}
LogReplaySync replays committed log entries to bring the follower’s state machine up to date. For advanced use cases (e.g., snapshot-based sync), implement the StateSync trait directly.
See the State Sync deep dive for details.
Leadership Preference
A state machine can declare its preference for assuming leadership within
the group by overriding leadership_preference(). This is a per-node
setting – different nodes in the same group can return different values
without affecting the GroupId.
use mosaik::LeadershipPreference;
fn leadership_preference(&self) -> LeadershipPreference {
LeadershipPreference::Observer
}
| Variant | Behavior |
|---|---|
Normal (default) | Standard Raft behavior – participates in elections as a candidate |
Reluctant { factor: u32 } | Longer election timeouts (multiplied by factor), can still be elected |
Observer | Never self-nominates as candidate; still votes and replicates log |
The convenience constructor LeadershipPreference::reluctant() creates a
Reluctant variant with the default factor of 3.
Liveness warning: If every node in a group returns
Observer, no leader can ever be elected and the group will be unable to make progress. Ensure at least one node hasNormalorReluctantpreference.
This mechanism is used internally by collection readers,
which return Observer so that leadership stays on writer nodes where
writes are handled directly rather than being forwarded.
Storage
Commands are persisted in a log via the Storage trait:
pub trait Storage<C: Command>: Send + 'static {
fn append(&mut self, command: C, term: Term) -> Index;
fn available(&self) -> RangeInclusive<Index>;
fn get(&self, index: Index) -> Option<(C, Term)>;
fn get_range(&self, range: &RangeInclusive<Index>) -> Vec<(Term, Index, C)>;
fn truncate(&mut self, at: Index);
fn last(&self) -> Cursor;
fn term_at(&self, index: Index) -> Option<Term>;
fn prune_prefix(&mut self, up_to: Index);
fn reset_to(&mut self, cursor: Cursor);
}
InMemoryLogStore<C> is the default implementation. For durability, implement Storage with a persistent backend (disk, database, etc.).
Commands & Queries
Once you have a Group<M> handle, you interact with the replicated state machine through commands (writes) and queries (reads).
Commands
Commands mutate state and are replicated to all group members through the Raft log. They are guaranteed to be applied in the same order on every node.
execute — Send and Wait
Sends a command and waits for it to be committed (replicated to a quorum):
let index = group.execute(CounterCmd::Increment(5)).await?;
println!("Command committed at log index {index}");
If the local node is:
- Leader: replicates to followers, resolves when quorum acknowledges
- Follower: forwards to the leader, resolves when the leader commits
execute_many — Batch Send and Wait
Send multiple commands atomically:
let range = group.execute_many([
CounterCmd::Increment(1),
CounterCmd::Increment(2),
CounterCmd::Increment(3),
]).await?;
println!("Commands committed at indices {range:?}");
Returns an IndexRange (RangeInclusive<Index>) covering all committed entries.
feed — Fire and Forget
Sends a command without waiting for commitment. Resolves once the leader acknowledges receipt and assigns a log index:
let index = group.feed(CounterCmd::Increment(10)).await?;
println!("Command assigned index {index}, not yet committed");
feed_many — Batch Fire and Forget
let range = group.feed_many([
CounterCmd::Reset,
CounterCmd::Increment(100),
]).await?;
Waiting for Commitment After Feed
Combine feed with the cursor watcher:
let range = group.feed_many(commands).await?;
// Wait until all commands are committed
group.when().committed().reaches(range.clone()).await;
Command Errors
pub enum CommandError<M: StateMachine> {
Offline(Vec<M::Command>), // Node is offline; commands returned
NoCommands, // Empty command list
GroupTerminated, // Group is shut down
}
The Offline variant returns the unsent commands so they can be retried:
match group.execute(cmd).await {
Ok(index) => println!("Committed at {index}"),
Err(CommandError::Offline(cmds)) => {
// Save for retry
group.when().online().await;
group.execute(cmds.into_iter().next().unwrap()).await?;
}
Err(CommandError::GroupTerminated) => {
panic!("Group is gone");
}
Err(CommandError::NoCommands) => unreachable!(),
}
Queries
Queries are read-only operations against the state machine. They are not replicated in the log.
Weak Consistency
Reads from the local node’s state machine. Fast but may return stale data:
let result = group.query(CounterQuery::Value, Consistency::Weak).await?;
println!("Local counter value: {} (at index {})", result.result, result.at_position);
Strong Consistency
Forwards the query to the current leader, guaranteeing linearizable reads:
let result = group.query(CounterQuery::Value, Consistency::Strong).await?;
println!("Leader counter value: {} (at index {})", *result, result.state_position());
CommittedQueryResult
Query results are wrapped in CommittedQueryResult<M>:
pub struct CommittedQueryResult<M: StateMachine> {
pub result: M::QueryResult, // The actual result
pub at_position: Index, // Log index at query time
}
It implements Deref to M::QueryResult, so you can use it directly:
let result = group.query(CounterQuery::Value, Consistency::Weak).await?;
// Deref to the inner result
let value: i64 = *result;
// Or access explicitly
let position = result.state_position();
let inner = result.into(); // Consume and get M::QueryResult
Query Errors
pub enum QueryError<M: StateMachine> {
Offline(M::Query), // Node offline; query returned
GroupTerminated, // Group shut down
}
Ordering Guarantee
Consecutive calls to execute, execute_many, feed, or feed_many on the same Group handle are guaranteed to be processed in the order they were issued.
Common Patterns
Command-Query Separation
// Write path: fire and forget for throughput
group.feed(OrderCmd::Place(order)).await?;
// Read path: weak consistency for speed
let book = group.query(OrderQuery::Snapshot, Consistency::Weak).await?;
// Read path: strong consistency for accuracy
let book = group.query(OrderQuery::Snapshot, Consistency::Strong).await?;
Execute and Verify
let index = group.execute(CounterCmd::Increment(1)).await?;
let result = group.query(CounterQuery::Value, Consistency::Weak).await?;
assert!(result.at_position >= index);
Group Status
The Group<M> handle exposes a rich reactive API for monitoring group state: leadership, log progress, and online status.
Inspecting Current State
// Current leader (if any)
let leader: Option<PeerId> = group.leader();
// Am I the leader?
let is_leader: bool = group.is_leader();
// Group identity
let group_id: &GroupId = group.id();
// Current committed index
let committed: Index = group.committed();
// Current log position (may include uncommitted entries)
let cursor: Cursor = group.log_position();
// Active bonds (peer connections)
let bonds: Bonds = group.bonds();
// Group configuration
let config: &GroupConfig = group.config();
The When API
group.when() returns a &When handle for awaiting state transitions.
Leadership Events
// Wait for any leader to be elected
let leader: PeerId = group.when().leader_elected().await;
// Wait for a different leader (leader change)
let new_leader: PeerId = group.when().leader_changed().await;
// Wait until a specific peer becomes leader
group.when().leader_is(expected_peer_id).await;
// Wait until the local node becomes leader
group.when().is_leader().await;
// Wait until the local node becomes a follower
group.when().is_follower().await;
Online / Offline
A node is online when:
- It is the leader, or
- It is a fully synced follower (not catching up, not in an election)
// Wait until ready to process commands
group.when().online().await;
// Wait until the node goes offline
group.when().offline().await;
Log Position Watchers
The CursorWatcher type provides fine-grained observation of log progress:
Committed Index
let committed = group.when().committed();
// Wait until a specific index is committed
committed.reaches(42).await;
// Wait for any forward progress
let new_pos = committed.advanced().await;
// Wait for any change (including backward, e.g., log truncation)
let new_pos = committed.changed().await;
// Wait for regression (rare — happens during partition healing)
let new_pos = committed.reverted().await;
You can also pass an IndexRange from execute_many / feed_many:
let range = group.feed_many(commands).await?;
group.when().committed().reaches(range).await;
Log Position
The same API works for the full log position (including uncommitted entries):
let log = group.when().log();
let new_cursor = log.advanced().await;
let new_cursor = log.changed().await;
Index and Cursor Types
/// A log index (0-based, where 0 is the sentinel "no entry")
pub struct Index(pub u64);
/// A Raft term number
pub struct Term(pub u64);
/// A (term, index) pair identifying a specific log position
pub struct Cursor { pub term: Term, pub index: Index }
Both Index and Term provide:
zero(),one()— constantsis_zero()— check for sentinelprev(),next()— saturating arithmeticdistance(other)— absolute difference
Bonds
group.bonds() returns a Bonds handle — a watchable, ordered collection of all active bonds:
let bonds = group.bonds();
// Iterate current bonds
for bond in bonds.iter() {
println!("Bonded to: {}", bond.peer_id());
}
Bonds carry Raft messages, heartbeats, and sync traffic between group members. See the Bonds deep dive for internals.
Putting It Together
A common pattern for group-aware applications:
let group = network.groups()
.with_key(key)
.with_state_machine(MyMachine::new())
.join();
// Wait until we can serve traffic
group.when().online().await;
if group.is_leader() {
// Run leader-specific tasks
run_leader_loop(&group).await;
} else {
// Run follower-specific tasks
run_follower_loop(&group).await;
}
// React to leadership changes
loop {
let new_leader = group.when().leader_changed().await;
if group.is_leader() {
// Transition to leader role
} else {
// Transition to follower role
}
}
Collections
The Collections subsystem provides replicated, eventually consistent data structures that feel like local Rust collections but are automatically synchronized across all participating nodes using Raft consensus.
┌────────────────────────────────────────────────────────────────────────┐
│ Collections │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐ ┌──────┐ │
│ │ Map │ │ Vec │ │ Set │ │ DEPQ │ │ Register │ │ Once │ │
│ │ <K, V> │ │ <T> │ │ <T> │ │<P,K,V> │ │ <T> │ │ <T> │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └────┬─────┘ └──┬───┘ │
│ └────────────┴────────────┴─┬─────────┴───────────┴──────────┘ │
│ │ │
│ │ │
│ │ │
│ ┌────────────────▼───────────────────┐ │
│ │ Groups (Raft Consensus) │ │
│ │ One group per collection │ │
│ └────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────┘
Each collection instance creates its own Raft consensus group. Different
collections (or the same type with different StoreIds) run as independent
groups and can span different subsets of the network.
Quick start
use mosaik::collections::{Map, StoreId};
// On every node — using the same StoreId joins the same group
let prices = Map::<String, f64>::writer(&network, StoreId::from("prices"));
// Wait until the collection is online
prices.when().online().await;
// Write (only available on writers)
let version = prices.insert("ETH".into(), 3812.50).await?;
// Wait for the write to be committed
prices.when().reaches(version).await;
// Read (available on both writers and readers)
assert_eq!(prices.get(&"ETH".into()), Some(3812.50));
Writer / Reader split
Every collection type offers two modes, distinguished at the type level using a const-generic boolean:
| Mode | Type alias | Can write? | Leadership priority |
|---|---|---|---|
| Writer | MapWriter<K,V>, VecWriter<T>, RegisterWriter<T>, OnceWriter<T> etc. | Yes | Normal |
| Reader | MapReader<K,V>, VecReader<T>, RegisterReader<T>, OnceReader<T> etc. | No | Deprioritized |
Both modes provide identical read access. Readers automatically use
deprioritize_leadership() in their consensus configuration to reduce the
chance of being elected leader, since leaders handle write forwarding.
// Writer — can read AND write
let w = Map::<String, u64>::writer(&network, store_id);
// Reader — can only read, lower chance of becoming leader
let r = Map::<String, u64>::reader(&network, store_id);
Available collections
| Collection | Description | Backing structure |
|---|---|---|
Map<K, V> | Unordered key-value map | im::HashMap (deterministic) |
Vec<T> | Ordered, index-addressable sequence | im::Vector |
Set<T> | Unordered set of unique values | im::HashSet (deterministic) |
Register<T> | Single-value register | Option<T> |
Once<T> | Write-once register | Option<T> |
PriorityQueue<P, K, V> | Double-ended priority queue | im::HashMap + im::OrdMap |
Most collections use the im crate for their internal
state, which provides O(1) structural sharing — cloning a snapshot of the
entire collection is essentially free. This is critical for the snapshot-based
state sync mechanism. Register and Once use a plain Option<T> since they
hold at most one value.
Trait requirements
Collections use blanket-implemented marker traits to constrain their type parameters:
| Trait | Required bounds | Used by |
|---|---|---|
Value | Clone + Debug + Serialize + DeserializeOwned + Hash + PartialEq + Eq + Send + Sync + 'static | All element/value types |
Key | Clone + Serialize + DeserializeOwned + Hash + PartialEq + Eq + Send + Sync + 'static | Map keys, Set elements, PQ keys |
OrderedKey | Key + Ord | PriorityQueue priorities |
These traits are automatically implemented for any type that satisfies their bounds — you never need to implement them manually.
StoreId and group identity
Each collection derives its Raft group identity from:
- A fixed prefix per collection type (e.g.,
"mosaik_collections_map","mosaik_collections_once") - The
StoreId— aUniqueIdyou provide at construction time - The Rust type names of the type parameters
This means Map::<String, u64>::writer(&net, id) and
Map::<u32, u64>::writer(&net, id) will join different groups even with
the same StoreId, because the key type differs. Likewise, a Once<String>
and a Register<String> with the same StoreId form separate groups because
the prefix differs.
Version tracking
All mutating operations return a Version, which wraps the Raft log Index
at which the mutation will be committed:
let version: Version = map.insert("key".into(), 42).await?;
// Wait until the mutation is committed and visible
map.when().reaches(version).await;
Error handling
All write operations return Result<Version, Error<T>>:
| Variant | Meaning |
|---|---|
Error::Offline(T) | The node is temporarily offline. The value that failed is returned for retry. |
Error::NetworkDown | The network is permanently down. The collection is no longer usable. |
SyncConfig
Collections use a snapshot-based state sync mechanism to bring lagging
followers up to date. The SyncConfig controls this behavior:
| Parameter | Default | Description |
|---|---|---|
fetch_batch_size | 2000 | Items per batch during snapshot transfer |
snapshot_ttl | 10s | How long a snapshot stays valid after last access |
snapshot_request_timeout | 15s | Timeout waiting for a SnapshotReady response |
fetch_timeout | 5s | Timeout per FetchDataResponse |
Important: Different
SyncConfigvalues produce different group signatures. Collections using different configs will not see each other.
use mosaik::collections::{Map, StoreId, SyncConfig};
use std::time::Duration;
let config = SyncConfig::default()
.with_fetch_batch_size(5000)
.with_snapshot_ttl(Duration::from_secs(30));
let map = Map::<String, u64>::writer_with_config(&network, store_id, config);
How snapshot sync works
- A lagging follower sends a
RequestSnapshotto the leader. - The leader wraps the request as a special command and replicates it.
- When committed, all peers create a snapshot at the same log position.
- The follower fetches snapshot data in batches from available peers.
- Once complete, the follower installs the snapshot and replays any buffered commands received during sync.
Because im data structures support O(1) cloning, creating a snapshot is
nearly instant regardless of collection size.
Deterministic hashing
Map and Set use BuildHasherDefault<DefaultHasher> (SipHash-1-3 with a fixed
zero seed) for their internal im::HashMap / im::HashSet. This ensures
that iteration order is identical across all nodes for the same logical
state — a requirement for snapshot sync to produce consistent chunked
transfers.
Map
Map<K, V> is a replicated, unordered, eventually consistent key-value map.
Internally it is backed by an im::HashMap with a deterministic hasher.
Construction
use mosaik::collections::{Map, StoreId, SyncConfig};
// Writer — can read and write
let map = Map::<String, u64>::writer(&network, StoreId::from("balances"));
// Writer with custom sync config
let map = Map::<String, u64>::writer_with_config(&network, store_id, config);
// Reader — read-only, deprioritized for leadership
let map = Map::<String, u64>::reader(&network, store_id);
// Reader with custom sync config
let map = Map::<String, u64>::reader_with_config(&network, store_id, config);
// Aliases: new() == writer(), new_with_config() == writer_with_config()
let map = Map::<String, u64>::new(&network, store_id);
Read operations
Available on both writers and readers. All reads operate on the local committed state — they are non-blocking and never touch the network.
| Method | Time | Description |
|---|---|---|
len() -> usize | O(1) | Number of entries |
is_empty() -> bool | O(1) | Whether the map is empty |
contains_key(&K) -> bool | O(log n) | Test if a key exists |
get(&K) -> Option<V> | O(log n) | Get a clone of the value for a key |
iter() -> impl Iterator<Item = (K, V)> | O(1)* | Iterate over all key-value pairs |
keys() -> impl Iterator<Item = K> | O(1)* | Iterate over all keys |
values() -> impl Iterator<Item = V> | O(1)* | Iterate over all values |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
* Iterator creation is O(1) due to im::HashMap’s structural sharing; full
iteration is O(n).
// Read the current state
if let Some(balance) = map.get(&"alice".into()) {
println!("Alice's balance: {balance}");
}
// Snapshot iteration — takes a structural clone, then iterates
for (key, value) in map.iter() {
println!("{key}: {value}");
}
Write operations
Only available on MapWriter<K, V>. All writes go through Raft consensus and
return the Version at which the mutation will be committed.
| Method | Description |
|---|---|
insert(K, V) -> Result<Version, Error<(K, V)>> | Insert or update a key-value pair |
compare_exchange(K, Option<V>, Option<V>) -> Result<Version, Error<(K, Option<V>, Option<V>)>> | Atomic compare-and-swap for a key |
remove(&K) -> Result<Version, Error<K>> | Remove a key |
extend(impl IntoIterator<Item = (K, V)>) -> Result<Version, Error<Vec<(K, V)>>> | Batch insert |
clear() -> Result<Version, Error<()>> | Remove all entries |
// Insert a single entry
let v = map.insert("ETH".into(), 3812).await?;
// Batch insert
let v = map.extend([
("BTC".into(), 105_000),
("SOL".into(), 178),
]).await?;
// Wait for the batch to commit
map.when().reaches(v).await;
// Remove
map.remove(&"SOL".into()).await?;
// Atomic compare-and-swap: update only if current value matches
let v = map.compare_exchange(
"ETH".into(),
Some(3812), // expected current value
Some(3900), // new value
).await?;
map.when().reaches(v).await;
// Compare-and-swap to insert a new key (expected = None)
let v = map.compare_exchange(
"DOGE".into(),
None, // key must not exist
Some(42), // value to insert
).await?;
// Compare-and-swap to delete (new = None)
let v = map.compare_exchange(
"DOGE".into(),
Some(42), // expected current value
None, // remove the key
).await?;
// Clear everything
map.clear().await?;
Compare-and-swap semantics
compare_exchange atomically checks the value associated with a key and only
applies the mutation if it matches the expected parameter:
key: The key to operate on.expected: The expected current value (Nonemeans the key must not exist).new: The value to write if the expectation holds (Noneremoves the key).
expected | new | Effect when matched |
|---|---|---|
None | Some(v) | Insert a new key-value pair |
Some(old) | Some(new) | Update an existing value |
Some(old) | None | Remove the key |
None | None | No-op (key must not exist) |
If the current value does not match expected, the operation is a no-op —
it still commits to the Raft log (incrementing the version) but does not
mutate the map.
Error handling
On failure, the values you attempted to write are returned inside the error so you can retry without re-creating them:
match map.insert("key".into(), expensive_value).await {
Ok(version) => println!("committed at {version}"),
Err(Error::Offline((key, value))) => {
// Node is temporarily offline — retry later
println!("offline, got {key} and {value} back");
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
// Wait until the map is online
map.when().online().await;
// Wait for a specific version
let v = map.insert("x".into(), 1).await?;
map.when().reaches(v).await;
// Wait for any new commit
map.when().updated().await;
// Detect going offline
map.when().offline().await;
Group identity
The group key for a Map<K, V> is derived from:
UniqueId::from("mosaik_collections_map")
.derive(store_id)
.derive(type_name::<K>())
.derive(type_name::<V>())
Two maps with the same StoreId but different key or value types will be in
completely separate consensus groups.
Vec
Vec<T> is a replicated, ordered, index-addressable sequence. Internally it
is backed by im::Vector<T>, which provides efficient push/pop at both ends
and O(log n) random access.
Construction
use mosaik::collections::{Vec, StoreId, SyncConfig};
// Writer — can read and write
let vec = Vec::<String>::writer(&network, StoreId::from("events"));
// Writer with custom sync config
let vec = Vec::<String>::writer_with_config(&network, store_id, config);
// Reader — read-only, deprioritized for leadership
let vec = Vec::<String>::reader(&network, store_id);
// Reader with custom sync config
let vec = Vec::<String>::reader_with_config(&network, store_id, config);
// Aliases: new() == writer(), new_with_config() == writer_with_config()
let vec = Vec::<String>::new(&network, store_id);
Read operations
Available on both writers and readers. Reads operate on the local committed state and never touch the network.
| Method | Time | Description |
|---|---|---|
len() -> usize | O(1) | Number of elements |
is_empty() -> bool | O(1) | Whether the vector is empty |
get(u64) -> Option<T> | O(log n) | Get element at index |
front() -> Option<T> | O(log n) | First element |
head() -> Option<T> | O(log n) | Alias for front() |
back() -> Option<T> | O(log n) | Last element |
last() -> Option<T> | O(log n) | Alias for back() |
contains(&T) -> bool | O(n) | Test if a value is in the vector |
index_of(&T) -> Option<u64> | O(n) | Find the index of a value |
iter() -> impl Iterator<Item = T> | O(1)* | Iterate over all elements |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
* Iterator creation is O(1) due to structural sharing; full traversal is O(n).
// Random access
if let Some(event) = vec.get(0) {
println!("First event: {event}");
}
// Peek at ends
let newest = vec.back();
let oldest = vec.front();
// Linear search
if let Some(idx) = vec.index_of(&"restart".into()) {
println!("Found restart at index {idx}");
}
Write operations
Only available on VecWriter<T>. All writes go through Raft consensus.
| Method | Time | Description |
|---|---|---|
push_back(T) -> Result<Version, Error<T>> | O(1)* | Append to end |
push_front(T) -> Result<Version, Error<T>> | O(1)* | Prepend to start |
insert(u64, T) -> Result<Version, Error<T>> | O(log n) | Insert at index, shifting elements right |
extend(impl IntoIterator<Item = T>) -> Result<Version, Error<Vec<T>>> | O(k)* | Batch append |
pop_back() -> Result<Version, Error<()>> | O(1)* | Remove last element |
pop_front() -> Result<Version, Error<()>> | O(1)* | Remove first element |
remove(u64) -> Result<Version, Error<u64>> | O(log n) | Remove at index, shifting elements left |
swap(u64, u64) -> Result<Version, Error<()>> | O(log n) | Swap two elements |
truncate(usize) -> Result<Version, Error<()>> | O(log n) | Keep only the first n elements |
clear() -> Result<Version, Error<()>> | O(1) | Remove all elements |
* Amortized time complexity.
// Append
let v = vec.push_back("event-1".into()).await?;
// Prepend
vec.push_front("event-0".into()).await?;
// Batch append
let v = vec.extend(["a".into(), "b".into(), "c".into()]).await?;
// Random insert
vec.insert(2, "inserted".into()).await?;
// Remove from ends
vec.pop_back().await?;
vec.pop_front().await?;
// Remove at index
vec.remove(0).await?;
// Swap positions
vec.swap(0, 1).await?;
// Truncate and clear
vec.truncate(10).await?;
vec.clear().await?;
Error handling
Writes return the failed value on Error::Offline so you can retry:
match vec.push_back(item).await {
Ok(version) => {
vec.when().reaches(version).await;
}
Err(Error::Offline(item)) => {
// Retry with the same item later
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
// Wait until online
vec.when().online().await;
// Wait for a specific committed version
let v = vec.push_back("x".into()).await?;
vec.when().reaches(v).await;
// Wait for any update
vec.when().updated().await;
// React to going offline
vec.when().offline().await;
Group identity
The group key for a Vec<T> is derived from:
UniqueId::from("mosaik_collections_vec")
.derive(store_id)
.derive(type_name::<T>())
Two vectors with the same StoreId but different element types will be in
separate consensus groups.
Set
Set<T> is a replicated, unordered, eventually consistent set of unique
values. Internally it is backed by an im::HashSet with a deterministic
hasher (SipHash-1-3 with a fixed zero seed), so iteration order is identical
across all nodes.
Construction
use mosaik::collections::{Set, StoreId, SyncConfig};
// Writer — can read and write
let set = Set::<String>::writer(&network, StoreId::from("active-peers"));
// Writer with custom sync config
let set = Set::<String>::writer_with_config(&network, store_id, config);
// Reader — read-only, deprioritized for leadership
let set = Set::<String>::reader(&network, store_id);
// Reader with custom sync config
let set = Set::<String>::reader_with_config(&network, store_id, config);
// Aliases: new() == writer(), new_with_config() == writer_with_config()
let set = Set::<String>::new(&network, store_id);
Read operations
Available on both writers and readers.
| Method | Time | Description |
|---|---|---|
len() -> usize | O(1) | Number of elements |
is_empty() -> bool | O(1) | Whether the set is empty |
contains(&T) -> bool | O(log n) | Test membership |
is_subset(&Set<T, W>) -> bool | O(n) | Test subset relationship |
iter() -> impl Iterator<Item = T> | O(1)* | Iterate over all elements |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
* Iterator creation is O(1); full traversal is O(n).
// Membership test
if set.contains(&"node-42".into()) {
println!("node-42 is active");
}
// Subset check between two sets (can differ in writer/reader mode)
if allowed.is_subset(&active) {
println!("all allowed nodes are active");
}
// Iteration
for peer in set.iter() {
println!("active: {peer}");
}
Write operations
Only available on SetWriter<T>.
| Method | Description |
|---|---|
insert(T) -> Result<Version, Error<T>> | Insert a value (no-op if already present) |
extend(impl IntoIterator<Item = T>) -> Result<Version, Error<Vec<T>>> | Batch insert |
remove(&T) -> Result<Version, Error<T>> | Remove a value |
clear() -> Result<Version, Error<()>> | Remove all elements |
// Insert
let v = set.insert("node-1".into()).await?;
// Batch insert
let v = set.extend(["node-2".into(), "node-3".into()]).await?;
set.when().reaches(v).await;
// Remove
set.remove(&"node-1".into()).await?;
// Clear
set.clear().await?;
Error handling
Writes return the failed value on Error::Offline:
match set.insert(value).await {
Ok(version) => { /* committed */ }
Err(Error::Offline(value)) => {
// Retry later with the same value
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
set.when().online().await;
let v = set.insert("x".into()).await?;
set.when().reaches(v).await;
set.when().updated().await;
set.when().offline().await;
Group identity
UniqueId::from("mosaik_collections_set")
.derive(store_id)
.derive(type_name::<T>())
Register
Register<T> is a replicated single-value register. It holds at most one
value at a time — writing a new value replaces the previous one entirely. This
is the distributed equivalent of a tokio::sync::watch channel: all nodes
observe the latest value.
Internally the state is simply an Option<T>, making the Register the
simplest collection in the mosaik toolkit.
Construction
use mosaik::collections::{Register, StoreId, SyncConfig};
// Writer — can read and write
let reg = Register::<String>::writer(&network, StoreId::from("config"));
// Writer with custom sync config
let reg = Register::<String>::writer_with_config(&network, store_id, config);
// Reader — read-only, deprioritized for leadership
let reg = Register::<String>::reader(&network, store_id);
// Reader with custom sync config
let reg = Register::<String>::reader_with_config(&network, store_id, config);
// Aliases: new() == writer(), new_with_config() == writer_with_config()
let reg = Register::<String>::new(&network, store_id);
Read operations
Available on both writers and readers. Reads operate on the local committed state and never touch the network.
| Method | Time | Description |
|---|---|---|
read() -> Option<T> | O(1) | Get the current value |
get() -> Option<T> | O(1) | Alias for read() |
is_empty() -> bool | O(1) | Whether the register holds a value |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
// Read the current value
if let Some(config) = reg.read() {
println!("Current config: {config}");
}
// Check if a value has been written
if reg.is_empty() {
println!("No configuration set yet");
}
Write operations
Only available on RegisterWriter<T>.
| Method | Time | Description |
|---|---|---|
write(T) -> Result<Version, Error<T>> | O(1) | Write a value (replaces any existing value) |
set(T) -> Result<Version, Error<T>> | O(1) | Alias for write() |
compare_exchange(Option<T>, Option<T>) -> Result<Version, Error<(Option<T>, Option<T>)>> | O(1) | Atomic compare-and-swap |
clear() -> Result<Version, Error<()>> | O(1) | Remove the stored value |
// Write a value
let v = reg.write("v1".into()).await?;
// Overwrite (replaces previous value)
let v = reg.write("v2".into()).await?;
reg.when().reaches(v).await;
// Using the alias
reg.set("v3".into()).await?;
// Atomic compare-and-swap: only writes if current value matches expected
let v = reg.compare_exchange(Some("v3".into()), Some("v4".into())).await?;
reg.when().reaches(v).await;
assert_eq!(reg.read(), Some("v4".to_string()));
// Compare-and-swap from empty to a value
reg.clear().await?;
let v = reg.compare_exchange(None, Some("first".into())).await?;
reg.when().reaches(v).await;
// Compare-and-swap to remove (set to None)
let v = reg.compare_exchange(Some("first".into()), None).await?;
reg.when().reaches(v).await;
assert!(reg.is_empty());
// Clear back to empty
reg.clear().await?;
assert!(reg.is_empty());
Compare-and-swap semantics
compare_exchange atomically checks the current value of the register and
only applies the write if it matches the current parameter. This is useful
for optimistic concurrency control — multiple writers can attempt a swap, and
only the one whose expectation matches the actual state will succeed.
current: The expected current value (Nonemeans the register must be empty).new: The value to write if the expectation holds (Noneclears the register).
If the current value does not match current, the operation is a no-op —
it still commits to the Raft log (incrementing the version) but does not
change the stored value.
Error handling
Writes return the failed value on Error::Offline so you can retry:
match reg.write(value).await {
Ok(version) => {
reg.when().reaches(version).await;
}
Err(Error::Offline(value)) => {
// Retry with the same value later
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
// Wait until online
reg.when().online().await;
// Wait for a specific committed version
let v = reg.write("new-config".into()).await?;
reg.when().reaches(v).await;
// Wait for any update
reg.when().updated().await;
// React to going offline
reg.when().offline().await;
Group identity
The group key for a Register<T> is derived from:
UniqueId::from("mosaik_collections_register")
.derive(store_id)
.derive(type_name::<T>())
Two registers with the same StoreId but different value types will be in
separate consensus groups.
When to use Register
Register is ideal for:
- Configuration — a single shared config object replicated across the cluster
- Leader state — the current leader’s address or identity
- Latest snapshot — the most recent version of a computed result
- Feature flags — a single boolean or enum toggled cluster-wide
If you need to store multiple values, use Map, Vec,
or Set instead.
Once
Once<T> is a replicated write-once register. It holds at most one value —
once a value has been set, all subsequent writes are silently ignored by the
state machine. This is the distributed equivalent of a
tokio::sync::OnceCell.
Unlike Register<T>, which allows overwriting the stored value,
Once<T> guarantees that the first accepted write is permanent. This makes it
ideal for one-time initialization patterns where you need exactly one value to
win across a distributed cluster.
Internally the state is an Option<T>, identical to Register, but the
apply logic rejects writes when the value is already Some.
Construction
use mosaik::collections::{Once, StoreId, SyncConfig};
// Writer — can read and write
let once = Once::<String>::writer(&network, StoreId::from("config-seed"));
// Writer with custom sync config
let once = Once::<String>::writer_with_config(&network, store_id, config);
// Reader — read-only, deprioritized for leadership
let once = Once::<String>::reader(&network, store_id);
// Reader with custom sync config
let once = Once::<String>::reader_with_config(&network, store_id, config);
// Aliases: new() == writer(), new_with_config() == writer_with_config()
let once = Once::<String>::new(&network, store_id);
Read operations
Available on both writers and readers. Reads operate on the local committed state and never touch the network.
| Method | Time | Description |
|---|---|---|
read() -> Option<T> | O(1) | Get the current value |
get() -> Option<T> | O(1) | Alias for read() |
is_empty() -> bool | O(1) | Whether the register has been set |
is_none() -> bool | O(1) | Alias for is_empty() |
is_some() -> bool | O(1) | Whether the register holds a value |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
// Read the current value
if let Some(seed) = once.read() {
println!("Config seed: {seed}");
}
// Check if a value has been written
if once.is_none() {
println!("No seed set yet");
}
// Equivalent check
assert_eq!(once.is_empty(), once.is_none());
assert_eq!(once.is_some(), !once.is_empty());
Write operations
Only available on OnceWriter<T>.
| Method | Time | Description |
|---|---|---|
write(T) -> Result<Version, Error<T>> | O(1) | Set the value (ignored if already set) |
set(T) -> Result<Version, Error<T>> | O(1) | Alias for write() |
The key behavior: only the first write takes effect. If a value has already
been set, subsequent writes are silently ignored by the state machine. The
returned Version still advances (the command is committed to the Raft log),
but the stored value does not change.
// First write — this value is stored permanently
let v = once.write("genesis".into()).await?;
once.when().reaches(v).await;
assert_eq!(once.read(), Some("genesis".into()));
// Second write — silently ignored
let v2 = once.write("overwrite-attempt".into()).await?;
once.when().reaches(v2).await;
assert_eq!(once.read(), Some("genesis".into())); // still "genesis"
// Using the alias
once.set("another-attempt".into()).await?;
assert_eq!(once.read(), Some("genesis".into())); // unchanged
Error handling
Writes return the failed value on Error::Offline so you can retry:
match once.write(value).await {
Ok(version) => {
once.when().reaches(version).await;
}
Err(Error::Offline(value)) => {
// Retry with the same value later
}
Err(Error::Encoding(value, err)) => {
// Serialization failed
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
// Wait until online
once.when().online().await;
// Wait for a specific committed version
let v = once.write("init".into()).await?;
once.when().reaches(v).await;
// Wait for any update
once.when().updated().await;
// React to going offline
once.when().offline().await;
Group identity
The group key for a Once<T> is derived from:
UniqueId::from("mosaik_collections_once")
.derive(store_id)
.derive(type_name::<T>())
A Once<T> and a Register<T> with the same StoreId and type parameter
will be in separate consensus groups because the prefix differs
("mosaik_collections_once" vs "mosaik_collections_register").
Once vs Register
| Behavior | Once<T> | Register<T> |
|---|---|---|
| Write semantics | First write wins; subsequent ignored | Every write replaces the stored value |
clear() | Not supported | Supported |
compare_exchange | Not supported | Supported |
| Distributed analog | tokio::sync::OnceCell | tokio::sync::watch |
| State machine sig | "mosaik_collections_once" | "mosaik_collections_register" |
When to use Once
Once is ideal for:
- One-time initialization — a genesis config, cluster seed, or bootstrap value that should only be set once
- Distributed elections — the first node to write a value claims a slot
- Immutable assignments — assigning a resource or identity permanently
- Configuration anchors — values that must not change after initial deployment
If you need to update the value after setting it, use
Register instead.
PriorityQueue
PriorityQueue<P, K, V> is a replicated, eventually consistent double-ended
priority queue (DEPQ). Each entry has a priority (P), a unique key
(K), and a value (V). It supports efficient access to both the minimum
and maximum priority elements, key-based lookups, priority updates, and range
removals.
Internally it maintains two indexes:
by_key: im::HashMap<K, (P, V)>— O(log n) key lookupsby_priority: im::OrdMap<P, im::HashMap<K, V>>— O(log n) min/max and range operations
Both indexes use deterministic hashers for consistent iteration order across nodes.
Construction
use mosaik::collections::{PriorityQueue, StoreId, SyncConfig};
// Writer — can read and write
let pq = PriorityQueue::<u64, String, Order>::writer(
&network,
StoreId::from("orderbook"),
);
// Writer with custom sync config
let pq = PriorityQueue::<u64, String, Order>::writer_with_config(
&network, store_id, config,
);
// Reader — read-only, deprioritized for leadership
let pq = PriorityQueue::<u64, String, Order>::reader(&network, store_id);
// Reader with custom sync config
let pq = PriorityQueue::<u64, String, Order>::reader_with_config(
&network, store_id, config,
);
// Aliases
let pq = PriorityQueue::<u64, String, Order>::new(&network, store_id);
let pq = PriorityQueue::<u64, String, Order>::new_with_config(
&network, store_id, config,
);
Read operations
Available on both writers and readers.
| Method | Time | Description |
|---|---|---|
len() -> usize | O(1) | Number of entries |
is_empty() -> bool | O(1) | Whether the queue is empty |
contains_key(&K) -> bool | O(log n) | Test if a key exists |
get(&K) -> Option<V> | O(log n) | Get the value for a key |
get_priority(&K) -> Option<P> | O(log n) | Get the priority for a key |
get_min() -> Option<(P, K, V)> | O(log n) | Entry with the lowest priority |
get_max() -> Option<(P, K, V)> | O(log n) | Entry with the highest priority |
min_priority() -> Option<P> | O(log n) | Lowest priority value |
max_priority() -> Option<P> | O(log n) | Highest priority value |
iter() -> impl Iterator<Item = (P, K, V)> | — | Ascending priority order (alias for iter_asc) |
iter_asc() -> impl Iterator<Item = (P, K, V)> | — | Ascending priority order |
iter_desc() -> impl Iterator<Item = (P, K, V)> | — | Descending priority order |
version() -> Version | O(1) | Current committed state version |
when() -> &When | O(1) | Access the state observer |
When multiple entries share the same priority, get_min() and get_max()
return an arbitrary entry from that priority bucket.
// Peek at extremes
if let Some((priority, key, value)) = pq.get_min() {
println!("Best bid: {key} at priority {priority}");
}
// Look up by key
let price = pq.get_priority(&"order-42".into());
// Iterate in order
for (priority, key, value) in pq.iter_desc() {
println!("{priority}: {key} = {value:?}");
}
Write operations
Only available on PriorityQueueWriter<P, K, V>.
| Method | Description |
|---|---|
insert(P, K, V) -> Result<Version, Error<(P, K, V)>> | Insert or update an entry |
extend(impl IntoIterator<Item = (P, K, V)>) -> Result<Version, Error<Vec<(P, K, V)>>> | Batch insert |
update_priority(&K, P) -> Result<Version, Error<K>> | Change priority of an existing key |
update_value(&K, V) -> Result<Version, Error<K>> | Change value of an existing key |
compare_exchange_value(&K, V, Option<V>) -> Result<Version, Error<K>> | Atomic compare-and-swap on value |
remove(&K) -> Result<Version, Error<K>> | Remove by key |
remove_range(impl RangeBounds<P>) -> Result<Version, Error<()>> | Remove all entries in a priority range |
clear() -> Result<Version, Error<()>> | Remove all entries |
If insert is called with a key that already exists, both its priority and
value are updated. update_priority and update_value are no-ops if the key
doesn’t exist (they still commit to the log).
// Insert
let v = pq.insert(100, "order-1".into(), order).await?;
// Batch insert
let v = pq.extend([
(100, "order-1".into(), order1),
(200, "order-2".into(), order2),
]).await?;
// Update just the priority
pq.update_priority(&"order-1".into(), 150).await?;
// Update just the value
pq.update_value(&"order-1".into(), new_order).await?;
// Atomic compare-and-swap on value (priority is preserved)
let v = pq.compare_exchange_value(
&"order-1".into(),
order1, // expected current value
Some(updated), // new value
).await?;
// Compare-and-swap to remove: expected matches, new is None
let v = pq.compare_exchange_value(
&"order-1".into(),
updated, // expected current value
None, // removes the entry
).await?;
// Remove a single entry
pq.remove(&"order-2".into()).await?;
// Remove all entries with priority below 50
pq.remove_range(..50u64).await?;
// Remove entries in a range
pq.remove_range(10..=20).await?;
// Clear everything
pq.clear().await?;
Range syntax
remove_range accepts any RangeBounds<P>, so all standard Rust range
syntaxes work:
| Syntax | Meaning |
|---|---|
..cutoff | Priorities below cutoff |
..=cutoff | Priorities at or below cutoff |
cutoff.. | Priorities at or above cutoff |
lo..hi | Priorities in [lo, hi) |
lo..=hi | Priorities in [lo, hi] |
.. | All (equivalent to clear()) |
Compare-and-swap semantics
compare_exchange_value atomically checks the value of an existing entry and
only applies the mutation if it matches the expected parameter. Unlike
compare_exchange on Map and Register, this method operates only on the
value — the entry’s priority is always preserved.
key: The key of the entry to operate on (must already exist).expected: The expected current value (typeV, notOption<V>— the key must exist for the exchange to succeed).new: The replacement value.Some(v)updates the value in-place;Noneremoves the entire entry.
If the key does not exist or its current value does not match expected, the
operation is a no-op — it commits to the Raft log but does not change the
queue.
Note: The entry’s priority is never changed by
compare_exchange_value. To atomically update priorities, useupdate_priorityinstead.
Error handling
Same pattern as other collections — failed values are returned for retry:
match pq.insert(priority, key, value).await {
Ok(version) => { /* committed */ }
Err(Error::Offline((priority, key, value))) => {
// Retry later
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
Status & observation
pq.when().online().await;
let v = pq.insert(100, "k".into(), val).await?;
pq.when().reaches(v).await;
pq.when().updated().await;
pq.when().offline().await;
Dual-index architecture
The DEPQ maintains two synchronized indexes:
by_key: HashMap<K, (P, V)> ← key lookups, membership tests
by_priority: OrdMap<P, HashMap<K, V>> ← min/max, range ops, ordered iteration
When a key is inserted or updated, both indexes are updated atomically within
the state machine’s apply_batch. During snapshot sync, only the by_key
index is serialized; the by_priority index is reconstructed on the receiving
side during append.
Group identity
UniqueId::from("mosaik_collections_depq")
.derive(store_id)
.derive(type_name::<P>())
.derive(type_name::<K>())
.derive(type_name::<V>())
Writer/Reader Pattern
All mosaik collections share a common access-control pattern: every collection
type is parameterized by a const-generic boolean (IS_WRITER) that determines
whether the instance has write access.
┌──────────────────┐
│ Collection<T> │
│ const IS_WRITER │
└────────┬─────────┘
│
┌───────────┴───────────┐
│ │
IS_WRITER = true IS_WRITER = false
┌──────────────┐ ┌──────────────┐
│ Writer │ │ Reader │
│ read + write │ │ read-only │
│ normal leader│ │ deprioritized│
│ priority │ │ leader │
└──────────────┘ └──────────────┘
Type aliases
Each collection provides convenient type aliases:
| Collection | Writer type | Reader type |
|---|---|---|
Map<K, V> | MapWriter<K, V> | MapReader<K, V> |
Vec<T> | VecWriter<T> | VecReader<T> |
Set<T> | SetWriter<T> | SetReader<T> |
Register<T> | RegisterWriter<T> | RegisterReader<T> |
Once<T> | OnceWriter<T> | OnceReader<T> |
PriorityQueue<P, K, V> | PriorityQueueWriter<P, K, V> | PriorityQueueReader<P, K, V> |
Construction
Every collection offers the same set of constructors:
// Writer (default)
Collection::writer(&network, store_id)
Collection::writer_with_config(&network, store_id, sync_config)
// Convenience aliases for writer
Collection::new(&network, store_id)
Collection::new_with_config(&network, store_id, sync_config)
// Reader
Collection::reader(&network, store_id)
Collection::reader_with_config(&network, store_id, sync_config)
The StoreId and SyncConfig must match between writers and readers for them
to join the same consensus group.
How it works
Internally, the const-generic boolean controls two things:
-
Method availability — Write methods (
insert,push_back,remove, etc.) are only implemented forIS_WRITER = true. This is enforced at compile time. -
Leadership priority — Readers return a
ConsensusConfigwithdeprioritize_leadership(), which increases their election timeout. This makes it less likely for a reader to become the Raft leader, keeping leadership on writer nodes where write operations are handled directly rather than being forwarded.
// Inside every collection's StateMachine impl:
fn consensus_config(&self) -> Option<ConsensusConfig> {
(!self.is_writer)
.then(|| ConsensusConfig::default().deprioritize_leadership())
}
Shared read API
Both writers and readers have identical read access. The read methods are
implemented on Collection<T, IS_WRITER> without constraining IS_WRITER:
// Works on both MapWriter and MapReader
impl<K: Key, V: Value, const IS_WRITER: bool> Map<K, V, IS_WRITER> {
pub fn len(&self) -> usize { ... }
pub fn get(&self, key: &K) -> Option<V> { ... }
pub fn iter(&self) -> impl Iterator<Item = (K, V)> { ... }
pub fn when(&self) -> &When { ... }
pub fn version(&self) -> Version { ... }
}
Trait requirements
The type parameters for collection elements must satisfy blanket-implemented trait bounds:
Value
Required for all element and value types:
pub trait Value:
Clone + Debug + Serialize + DeserializeOwned
+ Hash + PartialEq + Eq + Send + Sync + 'static
{}
// Blanket impl — any conforming type is a Value
impl<T> Value for T where T: Clone + Debug + Serialize + ... {}
Key
Required for map keys, set elements, and priority queue keys:
pub trait Key:
Clone + Serialize + DeserializeOwned
+ Hash + PartialEq + Eq + Send + Sync + 'static
{}
Note that Key does not require Debug (unlike Value).
OrderedKey
Required for priority queue priorities:
pub trait OrderedKey: Key + Ord {}
impl<T: Key + Ord> OrderedKey for T {}
Version
All write operations return Version, which wraps the Raft log Index where
the mutation will be committed. Use it with the When API to synchronize:
let version = map.insert("key".into(), value).await?;
map.when().reaches(version).await;
// Now the insert is guaranteed to be committed
Version implements Deref<Target = Index>, PartialOrd, Ord, Display,
and Copy.
When (collections)
The collections When is a thin wrapper around the groups When that exposes
collection-relevant observers:
| Method | Description |
|---|---|
online() | Resolves when the collection has joined and synced with the group |
offline() | Resolves when the collection loses sync or leadership |
updated() | Resolves when any new state version is committed |
reaches(Version) | Resolves when committed state reaches at least the given version |
// Typical lifecycle pattern
loop {
collection.when().online().await;
println!("online, version = {}", collection.version());
// ... do work ...
collection.when().offline().await;
println!("went offline, waiting to reconnect...");
}
Multiple writers
Multiple nodes can be writers for the same collection simultaneously. All writes are funneled through Raft consensus, so there is no conflict — every write is serialized in the log and applied in the same order on all nodes.
// Node A
let map = Map::<String, u64>::writer(&network, store_id);
map.insert("from-a".into(), 1).await?;
// Node B (same StoreId)
let map = Map::<String, u64>::writer(&network, store_id);
map.insert("from-b".into(), 2).await?;
// Both nodes see both entries after sync
Choosing writer vs. reader
| Use a Writer when… | Use a Reader when… |
|---|---|
| The node needs to modify the collection | The node only observes state |
| You want normal leadership election priority | You want to reduce leadership overhead |
| The node is in the “hot path” for writes | The node is a monitoring/dashboard node |
Raft Consensus
Mosaik’s groups subsystem implements a modified Raft consensus algorithm optimized for dynamic, self-organizing peer sets. This chapter covers the differences from standard Raft and explains the internal implementation.
Standard Raft recap
Raft organizes a cluster into a single leader and multiple followers. The leader accepts client commands, appends them to a replicated log, and only commits entries once a quorum (majority) of nodes has acknowledged them. If the leader fails, an election promotes a new one.
Mosaik’s modifications
1. Non-voting followers (Abstention)
In standard Raft, every follower participates in elections and log replication quorum counts. In mosaik, a follower can abstain from voting:
enum Vote {
Granted, // Standard yes vote
Denied, // Standard no vote
Abstained, // Mosaik-specific: "I'm too far behind to vote"
}
A follower abstains when it detects that it is lagging behind the leader’s log and cannot verify log consistency. Abstaining removes the node from the quorum denominator until it catches up. This prevents stale nodes from blocking progress while still allowing them to receive new entries and rejoin the quorum later.
2. No per-follower tracking on the leader
Standard Raft leaders maintain nextIndex[] and matchIndex[] arrays to
track each follower’s log position. Mosaik’s leader does not maintain
per-follower state. Instead:
- Each
AppendEntriesResponseincludes the follower’slast_log_index. - The leader uses these responses to calculate commit progress dynamically.
- This simplifies the leader and avoids stale state when group membership changes frequently.
3. Dynamic quorum
Because nodes can abstain, the quorum denominator changes at runtime:
effective_quorum = (voting_nodes / 2) + 1
Where voting_nodes = total_bonded_peers - abstaining_peers. This allows
the cluster to make progress even when some nodes are syncing or offline,
as long as a majority of the voting members agree.
4. Distributed catch-up (state sync)
When a follower falls too far behind to replay individual log entries, mosaik uses a state sync mechanism rather than the leader shipping log snapshots:
- The follower sends a
RequestSnapshotto the leader. - The leader wraps it as a command and replicates it through the log.
- All peers create a snapshot at the committed position of that command.
- The follower fetches snapshot data in batches from multiple peers in parallel, distributing the load.
- Once complete, the follower installs the snapshot and replays any buffered commands.
This is fundamentally different from standard Raft’s approach where only the leader sends snapshots.
5. Leadership deprioritization
Nodes can configure longer election timeouts to reduce the probability of becoming leader:
ConsensusConfig::default().deprioritize_leadership()
This is used by collection readers, which prefer to leave leadership to writer nodes.
6. Bootstrap delay
The first term (Term::zero()) adds an extra bootstrap_delay (default 3s)
to the election timeout. This gives all nodes time to start, discover each
other, and form bonds before the first election fires.
Roles and state transitions
bootstrap_delay
│
▼
┌────────────────┐ election timeout
│ Follower │─────────────────────┐
│ (passive) │ │
└────────┬───────┘ │
│ AppendEntries │
│ from leader ▼
│ ┌──────────────┐
│ │ Candidate │
│ │ (requesting │
│ │ votes) │
│ └──────┬───────┘
│ │ majority
│ │ granted
│ ▼
│ ┌──────────────┐
└────────────────────│ Leader │
higher term │ (active, │
received │ heartbeats) │
└──────────────┘
Each role has specific responsibilities:
| Role | Key actions |
|---|---|
| Follower | Respond to AppendEntries, vote in elections, forward commands to leader, detect leader failure via election timeout |
| Candidate | Increment term, vote for self, send RequestVote to all peers, transition to Leader on majority or back to Follower on higher term |
| Leader | Accept client commands, replicate log entries, send heartbeats, calculate dynamic quorum, commit entries, respond to forwarded queries |
Message types
| Message | Direction | Purpose |
|---|---|---|
AppendEntries | Leader → Followers | Replicate log entries / heartbeat |
AppendEntriesResponse | Follower → Leader | Acknowledge entries, report last log index, grant/deny/abstain |
RequestVote | Candidate → All | Request vote for election |
RequestVoteResponse | All → Candidate | Grant, deny, or abstain |
Forward::Command | Follower → Leader | Forward client commands |
Forward::CommandAck | Leader → Follower | Return assigned log indices |
Forward::Query | Follower → Leader | Forward strong-consistency query |
Forward::QueryResponse | Leader → Follower | Return query result and position |
StateSync(...) | Peer ↔ Peer | State sync protocol messages |
Election timing
Elections are controlled by ConsensusConfig:
| Parameter | Default | Purpose |
|---|---|---|
heartbeat_interval | 500ms | How often the leader sends heartbeats |
heartbeat_jitter | 150ms | Random jitter subtracted from heartbeat interval |
election_timeout | 2s | Base timeout before a follower starts an election |
election_timeout_jitter | 500ms | Random jitter added to election timeout |
bootstrap_delay | 3s | Extra delay for the very first election (term 0) |
max_missed_heartbeats | 10 | Bond heartbeats missed before considering peer dead |
The randomized timeouts ensure that in most cases only one node transitions to candidate at a time, avoiding split votes.
Command flow
Write path (leader)
Client ──execute()──► Leader
│
├─ append to local log
├─ send AppendEntries to followers
│
│◄── AppendEntriesResponse (majority)
│
├─ advance commit index
├─ apply to state machine
└─ return Result to client
Write path (follower)
Client ──execute()──► Follower
│
├─ Forward::Command to leader
│
│◄── Forward::CommandAck (assigned index)
│
│ ... wait for local commit to reach index ...
│
└─ return Result to client
Read path
- Weak consistency: Read directly from local state machine (any role).
- Strong consistency: Forward query to leader, which reads from its always-up-to-date state machine and returns the result with commit position.
Internal types
The implementation is split across several modules:
| Module | Contents |
|---|---|
raft/mod.rs | Raft<S, M> — top-level driver, delegates to current role |
raft/role.rs | Role enum (Follower, Candidate, Leader), shared message handling |
raft/shared.rs | Shared<S, M> — state shared across all roles (storage, state machine, config) |
raft/leader.rs | Leader-specific logic: heartbeats, replication, dynamic quorum |
raft/follower.rs | Follower-specific logic: elections, forwarding, catch-up |
raft/candidate.rs | Candidate-specific logic: vote collection, timeout |
raft/protocol.rs | Message type definitions |
Bonds
Bonds are mosaik’s mechanism for maintaining a fully-connected mesh of persistent, authenticated connections between all members of a group. Every pair of peers in a group maintains exactly one bond.
Lifecycle
A bond goes through three phases:
1. Establishment
When two peers discover they belong to the same group (via discovery events),
one initiates a connection using the /mosaik/groups/1 ALPN protocol.
The handshake uses mutual proof-of-knowledge to verify both sides know the group key before exchanging any group-internal state:
Initiator Acceptor
│ │
├── HandshakeStart ───────────────────►│
│ · network_id │
│ · group_id │
│ · proof = blake3(session_secret │
│ ⊕ group_key) │
│ · known bonds list │
│ │
│ verify proof │
│ │
│◄─── HandshakeEnd ───────────────────┤
│ · proof (acceptor's) │
│ · known bonds list │
│ │
│ verify proof │
│ │
▼ Bond established ▼
The proof is computed from the TLS session secret (unique per connection) combined with the group key. This ensures:
- Both sides know the group key (authentication).
- The proof cannot be replayed on a different connection (freshness).
- No group secrets are transmitted in the clear.
If proof verification fails, the connection is dropped immediately.
2. Steady state (BondWorker)
Once established, a bond is managed by a BondWorker that:
- Sends and receives heartbeats (Ping/Pong) on a configurable interval.
- Relays Raft messages between peers.
- Propagates peer entry updates when discovery information changes.
- Announces new bonds (
BondFormed) so peers learn about topology changes. - Handles graceful departure when a peer is shutting down.
3. Failure and reconnection
If heartbeats are missed beyond the configured threshold, the bond is considered failed and torn down. Both sides independently detect the failure. When the peer reappears (via discovery), a new bond is established from scratch with a fresh handshake.
BondMessage protocol
After the handshake, all communication over a bond uses the BondMessage
enum:
| Variant | Direction | Purpose |
|---|---|---|
Ping | A → B | Heartbeat probe |
Pong | B → A | Heartbeat response |
Departure | Either | Graceful shutdown notification |
PeerEntryUpdate(PeerEntry) | Either | Discovery information changed |
BondFormed(BondId, PeerId) | Either | Announce new bond to populate topology |
Raft(Bytes) | Either | Wrapped Raft protocol message |
All messages are serialized with postcard and framed with
LengthDelimitedCodec over a QUIC bidirectional stream (via Link<P>).
Heartbeat mechanism
The Heartbeat struct tracks liveness:
struct Heartbeat {
tick: Interval, // fires every `base - jitter` to `base`
last_recv: Instant, // last time we received a Ping or Pong
missed: u64, // how many consecutive ticks without a response
max_missed: u64, // threshold from ConsensusConfig (default: 10)
alert: Notify, // signals when threshold is exceeded
}
On each tick:
- Check if
last_recvwas recent enough. - If not, increment
missed. - If
missed ≥ max_missed, notify the alert future — the bond worker tears down the bond.
Receiving any message from the peer resets the counter:
heartbeat.reset(); // sets missed = 0 and refreshes last_recv
Timing uses jitter to prevent synchronized heartbeat storms. The actual
interval for each tick is randomized between base - jitter and base.
Bond identity
Each bond is identified by a BondId:
type BondId = UniqueId;
The BondId is derived deterministically from both peer identities and the
group key, so both sides compute the same identifier independently.
Topology tracking
Every group maintains a Bonds collection (accessible via
Group::bonds() or When::bonds()). This is an immutable set of all
currently active bonds in the group, including bonds between other peers
(learned via BondFormed messages).
This topology awareness lets each peer know the full mesh state, which is used for:
- Dynamic quorum calculations in Raft.
- Determining when enough peers are connected to start elections.
- Providing the topology snapshot for state sync coordination.
Connection handling
The Acceptor struct implements the ProtocolHandler trait for the
/mosaik/groups/1 ALPN:
Incoming connection
│
▼
Wrap in Link<BondMessage>
│
▼
ensure_known_peer()
│
▼
wait_for_handshake()
· receive HandshakeStart
· verify proof
· send HandshakeEnd
│
▼
ensure_same_network()
│
▼
Hand off to Group worker
If any step fails, the connection is dropped cleanly. The group worker then
decides whether to accept the bond (it may already have one with that peer)
and spawns a BondWorker if accepted.
State Synchronization
When a new peer joins a group or falls too far behind the leader’s log, it needs a snapshot of the current state rather than replaying potentially thousands of log entries. Mosaik implements a distributed state sync mechanism that spreads the load across all group members.
Why not leader-only snapshots?
In standard Raft, the leader ships snapshots to lagging followers. This has two problems:
- Leader bottleneck — the leader must serialize and transmit potentially large state to each lagging follower.
- Snapshot inconsistency — the snapshot must be taken at a consistent point, which may block the leader.
Mosaik solves both by having all peers participate in snapshot creation and distribution.
The six-step process
1. Follower 2. Leader 3. All peers
detects lag wraps request create snapshot
──────────── as log entry at committed
sends ──────────── position
RequestSnapshot replicates it ──────────────
4. Follower 5. Follower 6. Follower
discovers which fetches batches installs snapshot
peers have from multiple and resumes
snapshots peers in parallel normal operation
────────────── ───────────────── ──────────────────
Step 1: Detect lag
A follower realizes it is behind when it receives an AppendEntries message
referencing a log prefix it does not have. It sends a StateSync message
to the leader requesting a snapshot.
Step 2: Replicate the request
The leader wraps the snapshot request as a special log entry and replicates it through the normal Raft consensus path. This ensures all peers see the request at the same log position.
Step 3: Create snapshots
When each peer commits the snapshot request entry, it creates a point-in-time snapshot of its state machine. Because all peers see the request at the same committed index, all snapshots are consistent — they represent the same logical state.
The im crate’s persistent data structures make snapshotting O(1) by
sharing structure with the live state (copy-on-write).
Step 4: Discover snapshot sources
The follower queries peers to find out which ones have a snapshot available for the requested position. Snapshots have a TTL (default 10 seconds), so they eventually expire if not fetched.
Step 5: Parallel fetching
The follower fetches snapshot data in batches from multiple peers simultaneously:
Follower ──batch request──► Peer A (items 0..2000)
batch request──► Peer B (items 2000..4000)
batch request──► Peer C (items 4000..6000)
...
Each batch contains up to fetch_batch_size (default 2000) items. By
distributing fetches across peers, the load is balanced and the follower
receives data faster.
Step 6: Install and resume
Once all batches are received, the follower:
- Installs the snapshot as its new state machine state.
- Updates its committed cursor to the snapshot position.
- Replays any log entries received after the snapshot position.
- Resumes normal follower operation (including voting).
Traits
SnapshotStateMachine
State machines that support sync must implement:
trait SnapshotStateMachine: StateMachine {
type Snapshot: Snapshot;
fn snapshot(&self) -> Self::Snapshot;
fn install_snapshot(&mut self, items: Vec<SnapshotItem>);
}
snapshot()creates a serializable snapshot of the current state.install_snapshot()replaces the state with data received from peers.
Snapshot
The snapshot itself is iterable:
trait Snapshot {
fn len(&self) -> usize;
fn into_items(self) -> impl Iterator<Item = SnapshotItem>;
}
Each collection type implements this — for example, MapSnapshot yields
key-value pairs as serialized SnapshotItem bytes.
SnapshotItem
struct SnapshotItem {
key: Bytes,
value: Bytes,
}
The generic key/value format allows any collection type to participate in the same sync protocol.
Configuration
State sync behavior is controlled by SyncConfig:
| Parameter | Default | Description |
|---|---|---|
fetch_batch_size | 2000 | Maximum items per batch request |
snapshot_ttl | 10s | How long a snapshot remains available |
snapshot_request_timeout | 15s | Timeout for requesting a snapshot from peers |
fetch_timeout | 5s | Timeout for each batch fetch operation |
Tuning tips
- Large state: Increase
fetch_batch_sizeto reduce round trips, but watch memory usage. - Slow networks: Increase
fetch_timeoutandsnapshot_request_timeout. - Fast churn: Increase
snapshot_ttlif snapshots expire before followers can fetch them.
Log replay sync
For groups that use Storage (log persistence) instead of in-memory state,
a LogReplaySync alternative exists. Instead of shipping snapshots, the
joining peer replays log entries from a replay provider — another peer
that streams its stored log entries.
This approach is simpler but only works when:
- The log fits in available storage.
- Replay is fast enough relative to new entries arriving.
The replay module provides ReplayProvider and ReplaySession types for
this path.
Collections and state sync
All built-in collection types (Map, Vec, Set, Register, Once,
PriorityQueue) implement SnapshotStateMachine. Their internal state
machines produce snapshots:
MapStateMachine ──snapshot()──► MapSnapshot (HashMap entries)
VecStateMachine ──snapshot()──► VecSnapshot (indexed entries)
SetStateMachine ──snapshot()──► SetSnapshot (set members)
RegStateMachine ──snapshot()──► RegisterSnapshot (optional value)
OnceStateMachine ──snapshot()──► OnceSnapshot (optional value)
DepqStateMachine ──snapshot()──► PriorityQueueSnapshot (by_key + by_priority)
The dual-index structure of PriorityQueue is preserved across sync — both
the by_key and by_priority indices are transmitted and reconstructed.
Register and Once snapshots are trivial (at most one item), so sync
completes in a single batch.
Wire Protocol
All peer-to-peer communication in mosaik flows over QUIC streams, using a
consistent framing layer built on Link<P>. This chapter explains the protocol
stack from the transport up to application messages.
Protocol stack
┌─────────────────────────────────┐
│ Application Messages │ BondMessage, Datum, CatalogSync, ...
├─────────────────────────────────┤
│ postcard serialization │ compact binary encoding (no_std, varint)
├─────────────────────────────────┤
│ LengthDelimitedCodec framing │ 4-byte big-endian length prefix
├─────────────────────────────────┤
│ QUIC bidirectional stream │ SendStream + RecvStream
├─────────────────────────────────┤
│ iroh / quinn transport │ QUIC with TLS 1.3, hole-punching
└─────────────────────────────────┘
Link<P>
The Link<P> type is the core abstraction for typed, bidirectional
communication over a QUIC stream:
struct Link<P: Protocol> {
connection: Connection,
cancel: CancellationToken,
writer: FramedWrite<SendStream, LengthDelimitedCodec>,
reader: FramedRead<RecvStream, LengthDelimitedCodec>,
}
The Protocol trait
Every protocol declares its ALPN (Application-Layer Protocol Negotiation) identifier:
trait Protocol: Serialize + DeserializeOwned + Send + 'static {
const ALPN: &'static [u8];
}
ALPN identifiers are exchanged during the TLS handshake, so peers agree on the protocol before any application data flows. Mosaik uses these ALPNs:
| ALPN | Protocol type | Subsystem |
|---|---|---|
/mosaik/announce | AnnounceMessage | Discovery (gossip) |
/mosaik/catalog-sync | CatalogSync | Discovery (catalog) |
/mosaik/streams/1.0 | Datum impl | Streams |
/mosaik/groups/1 | BondMessage | Groups (bonds) |
Creating a link
Links are created by either connecting to a peer or accepting an incoming connection:
// Outgoing
let link = Link::<BondMessage>::connect(&endpoint, node_addr).await?;
// Incoming (in a ProtocolHandler)
let link = Link::<BondMessage>::accept(connecting).await?;
Sending and receiving
// Send a message (serialized with postcard, length-prefixed)
link.send(BondMessage::Ping).await?;
// Receive a message (read length prefix, deserialize with postcard)
let msg: BondMessage = link.recv().await?;
Under the hood:
send()serializes the message withpostcard::to_allocvec().- The resulting bytes are written through
FramedWritewhich prepends a 4-byte big-endian length prefix. recv()reads the length prefix fromFramedRead, reads exactly that many bytes, and deserializes withpostcard::from_bytes().
Splitting a link
For concurrent send/receive, a link can be split:
let (writer, reader) = link.split();
// In one task:
writer.send(msg).await?;
// In another task:
let msg = reader.recv().await?;
// Rejoin if needed:
let link = Link::join(writer, reader);
Cancellation
Every link carries a CancellationToken. When cancelled, both send and
receive operations return immediately. This is used for graceful shutdown:
link.cancel(); // signals both sides to stop
Wire format
postcard encoding
Postcard is a #[no_std]-compatible binary
serialization format based on variable-length integers (varints). It
produces very compact output:
u8→ 1 byte- Small
u32→ 1 byte (varint) - Enum variant → 1 byte discriminant + payload
- Strings → varint length + UTF-8 bytes
Vec<T>→ varint length + elements
This keeps message sizes minimal, which matters for high-frequency heartbeats and Raft messages.
Framing
Each message on the wire looks like:
┌──────────────┬───────────────────────────────┐
│ Length (4B) │ postcard-encoded payload │
│ big-endian │ │
└──────────────┴───────────────────────────────┘
The LengthDelimitedCodec from the tokio-util crate handles this
automatically. It supports messages up to 2³² - 1 bytes (4 GiB), though in
practice mosaik messages are typically under a few kilobytes.
QUIC transport
Mosaik uses iroh (built on quinn) for QUIC transport. Key features:
| Feature | Benefit |
|---|---|
| TLS 1.3 | All connections encrypted, session secrets used for bond proofs |
| Multiplexed streams | Multiple logical channels over one connection |
| NAT traversal | Built-in hole-punching and relay fallback |
| Connection migration | Connections survive IP changes |
| mDNS discovery | Automatic peer discovery on local networks |
Bidirectional streams
Each Link<P> uses a single QUIC bidirectional stream. This means:
- One stream per bond connection.
- One stream per catalog sync session.
- One stream per producer-consumer pair.
QUIC’s multiplexing means these streams don’t interfere with each other even when sharing the same underlying UDP connection.
Error handling
Link operations return std::io::Error. Common failure modes:
| Error | Cause | Recovery |
|---|---|---|
| Connection closed | Peer shut down or network failure | Reconnect via discovery |
| Deserialization error | Protocol version mismatch or corruption | Drop connection |
| Timeout | Peer unresponsive | Heartbeat detection → reconnect |
| Cancelled | Local shutdown | Graceful cleanup |
Deterministic Hashing
In a distributed replicated system, all peers must arrive at the exact same state after applying the same sequence of operations. Mosaik’s collections use deterministic hashing to guarantee consistent behavior across nodes.
The problem
Rust’s default HashMap and HashSet use RandomState — a hasher seeded
with random data at process startup. This means:
- Iteration order differs between processes.
- Two nodes applying the same inserts will have different internal layouts.
- Snapshots of hash-based structures would differ even with identical contents.
For replicated state machines, this is unacceptable.
The solution
Mosaik’s Map and Set collections use a zero-seeded deterministic
hasher:
type DeterministicHasher = BuildHasherDefault<DefaultHasher>;
BuildHasherDefault<DefaultHasher> constructs a DefaultHasher (SipHash)
with a fixed zero seed on every call. This ensures:
- Same input → same hash across all nodes and restarts.
- Same insertion order → same internal layout in the hash table.
- Snapshots are byte-identical when state is identical.
Where it’s used
Collections
All hash-based collections use the deterministic hasher internally:
Map<K, V>usesim::HashMap<K, V, DeterministicHasher>.Set<V>usesim::HashSet<V, DeterministicHasher>.
The im crate’s persistent hash structures accept a custom hasher parameter,
which mosaik sets to the zero-seeded variant.
Ordered collections
Vec and PriorityQueue do not use hashing for their primary index:
Vecusesim::Vector(a balanced tree indexed by position).PriorityQueueuses:im::HashMap(deterministic hasher) for key → value+priority lookup.im::OrdMapfor priority-ordered access (requiresOrd, not hashing).
Identity derivation
Beyond collection hashing, determinism matters for identity:
UniqueId
Group and store identities are derived deterministically from their inputs:
// GroupId is derived from key + network
let group_id = UniqueId::new(&key, &network_id);
// StoreId is derived from group + type signature
let store_id = StoreId::new(&group_id, &type_signature);
This uses blake3 hashing, which is inherently deterministic.
Type signatures
Collection state machines compute a signature from their Rust type names:
fn signature() -> Digest {
Digest::from(
std::any::type_name::<MapStateMachine<K, V>>()
)
}
This ensures that two collections are only considered the same store if they
have identical key and value types. A Map<String, u64> and a
Map<String, i64> will have different StoreId values and will never
attempt to sync with each other.
Snapshot consistency
Deterministic hashing is critical for snapshot-based state sync:
Node A state: { "alice" → 1, "bob" → 2 }
Node B state: { "alice" → 1, "bob" → 2 }
With deterministic hashing:
Node A snapshot bytes == Node B snapshot bytes ✓
With random hashing:
Node A snapshot bytes != Node B snapshot bytes ✗
(different internal bucket layout)
When a joining peer fetches snapshot batches from multiple source peers, the items must be compatible. Deterministic hashing ensures all sources produce the same serialized representation for identical logical state.
The im crate
Mosaik uses the im crate for persistent
(copy-on-write) data structures. Key properties:
| Property | Benefit |
|---|---|
| Structural sharing | O(1) snapshot via clone() — only divergent nodes are copied |
| Custom hasher | Accepts BuildHasherDefault<DefaultHasher> for determinism |
| Thread-safe clones | Arc-based sharing, safe to snapshot from one task and iterate in another |
| Balanced trees | OrdMap and Vector use RRB trees with O(log n) operations |
The O(1) cloning is especially important for state sync — creating a snapshot does not block the state machine from processing new commands.
Trait requirements
The deterministic hashing strategy imposes trait bounds on collection keys and values:
| Trait | Required by | Purpose |
|---|---|---|
Hash | Map keys, Set values | Deterministic bucket placement |
Eq | Map keys, Set values | Equality comparison for collision resolution |
Clone | All keys and values | Structural sharing in im data structures |
Serialize + DeserializeOwned | All keys and values | Snapshot and replication encoding |
Send + Sync + 'static | All keys and values | Cross-task sharing |
Ord | PriorityQueue priorities | Ordered access in OrdMap |
These are codified as blanket trait aliases:
// Satisfied by types that are Hash + Eq + Clone + Serialize + DeserializeOwned + Send + Sync + 'static
trait Key {}
// Adds Ord to Key requirements
trait OrderedKey {}
// Clone + Serialize + DeserializeOwned + Send + Sync + 'static (no Hash/Eq)
trait Value {}
Configuration Reference
This chapter lists every configuration struct in mosaik, its fields, default values, and how the pieces fit together.
NetworkBuilder (top-level)
The entry point for creating a mosaik network. All nested configs are accessible through fluent builder methods.
use mosaik::Network;
let network = Network::builder()
.network_id("my-network")
.mdns_discovery(true)
.discovery(|d| d.events_backlog(200))
.streams(|s| s.with_backoff(my_backoff))
.groups(|g| g.handshake_timeout(Duration::from_secs(5)))
.build()
.await?;
| Field | Type | Default | Description |
|---|---|---|---|
network_id | NetworkId | required | Unique identifier for this network |
relay_mode | iroh::RelayMode | RelayMode::Default | Relay server mode for NAT/firewall traversal |
mdns_discovery | bool | false | Enable mDNS local network peer discovery |
addresses | BTreeSet<SocketAddr> | empty (all interfaces) | Local bind addresses |
secret_key | SecretKey | random | Ed25519 key for peer identity |
discovery | DiscoveryConfigBuilder | see below | Nested discovery config |
streams | StreamsConfigBuilder | see below | Nested streams config |
groups | GroupsConfigBuilder | see below | Nested groups config |
Note:
secret_keydetermines thePeerId. If omitted, a random key is generated on each run, giving the node a new identity every time. Specifying a fixed key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts.
discovery::Config
Controls peer discovery, gossip, and catalog maintenance.
| Field | Type | Default | Description |
|---|---|---|---|
events_backlog | usize | 100 | Past events retained in event watchers |
bootstrap_peers | Vec<EndpointAddr> | empty | Initial peers to connect to on startup |
tags | Vec<Tag> | empty | Tags advertised in local PeerEntry |
purge_after | Duration | 300s (5 min) | Time before stale peer entries are purged |
max_time_drift | Duration | 10s | Maximum acceptable timestamp drift |
announce_interval | Duration | 15s | Interval between presence announcements |
announce_jitter | f32 | 0.5 | Max jitter factor on announce interval |
graceful_departure_window | Duration | 500ms | Wait for departure gossip to propagate |
Builder methods with_bootstrap(peers) and with_tags(tags) accept either a
single item or an iterator (via IntoIterOrSingle):
Network::builder()
.discovery(|d| d
.with_bootstrap(peer_addr) // single peer
.with_tags(["validator", "relay"]) // multiple tags
.purge_after(Duration::from_secs(600))
)
streams::Config
Controls stream consumer reconnection behavior.
| Field | Type | Default | Description |
|---|---|---|---|
backoff | BackoffFactory | Exponential, 5 min max | Retry policy for consumer stream connections |
Custom backoff example:
use mosaik::streams::backoff::ExponentialBackoff;
Network::builder()
.streams(|s| s.with_backoff(ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(120)),
..Default::default()
}))
groups::Config
Controls bond establishment.
| Field | Type | Default | Description |
|---|---|---|---|
handshake_timeout | Duration | 2s | Timeout for bond handshake with remote peers |
ConsensusConfig
Consensus parameters that must be identical across all members of a group.
These values are hashed into the GroupId, so any mismatch creates a different
group.
| Field | Type | Default | Description |
|---|---|---|---|
heartbeat_interval | Duration | 500ms | Bond heartbeat interval |
heartbeat_jitter | Duration | 150ms | Max heartbeat jitter |
max_missed_heartbeats | u32 | 10 | Missed heartbeats before bond is considered dead |
election_timeout | Duration | 2s | Raft election timeout (must exceed heartbeat interval) |
election_timeout_jitter | Duration | 500ms | Election timeout randomization |
bootstrap_delay | Duration | 3s | Extra delay for the first election (term 0) |
forward_timeout | Duration | 2s | Timeout for forwarding commands to leader |
query_timeout | Duration | 2s | Timeout for leader to respond to queries |
use mosaik::groups::ConsensusConfig;
let config = ConsensusConfig::builder()
.heartbeat_interval(Duration::from_millis(250))
.election_timeout(Duration::from_secs(1))
.build()
.unwrap();
Leadership deprioritization
let config = ConsensusConfig::default().deprioritize_leadership();
This multiplies the election timeout by a factor, making this node less likely to become leader. Used by collection readers.
SyncConfig (collections)
Controls snapshot-based state synchronization for collections.
| Field | Type | Default | Description |
|---|---|---|---|
fetch_batch_size | usize | 2000 | Max items per batch request |
snapshot_ttl | Duration | 10s | How long a snapshot remains available |
snapshot_request_timeout | Duration | 15s | Timeout for requesting a snapshot |
fetch_timeout | Duration | 5s | Timeout for each batch fetch |
Configuration hierarchy
NetworkBuilder
├── network_id (identity)
├── secret_key (identity)
├── relay_mode (transport)
├── mdns_discovery (transport)
├── addresses (transport)
│
├── discovery::Config
│ ├── bootstrap_peers
│ ├── tags
│ ├── events_backlog
│ ├── purge_after
│ ├── max_time_drift
│ ├── announce_interval
│ └── announce_jitter
│
├── streams::Config
│ └── backoff
│
└── groups::Config
└── handshake_timeout
Per-group:
ConsensusConfig (consensus timing, hashed into GroupId)
Per-collection:
SyncConfig (state sync tuning)
Environment influence
Configuration structs are pure Rust — there is no automatic environment variable parsing. However, test utilities honor:
| Variable | Used by | Effect |
|---|---|---|
TEST_TRACE | Test tracing setup | Controls log level (debug/trace/info/etc.) |
TEST_TRACE_UNMUTE | Test tracing setup | Set to 1 to show all log output |
TIME_FACTOR | Test time helpers | Float multiplier for all test durations |
Error Handling
Mosaik uses typed error enums specific to each subsystem, plus a close-reason system for QUIC connection-level codes. This chapter catalogs every public error type.
Network errors
network::Error — returned by NetworkBuilder::build():
| Variant | Description |
|---|---|
MissingNetworkId | No network_id was set on the builder |
Bind(BindError) | Failed to bind the QUIC endpoint |
InvalidAddress(InvalidSocketAddr) | An address in addresses is invalid |
DiscoveryConfig(ConfigBuilderError) | Discovery config builder failed validation |
StreamsConfig(ConfigBuilderError) | Streams config builder failed validation |
GroupsConfig(ConfigBuilderError) | Groups config builder failed validation |
Discovery errors
discovery::Error:
| Variant | Description |
|---|---|
InvalidSecretKey(PeerId, PeerId) | Secret key does not match expected PeerId |
DifferentNetwork { local_network, remote_network } | Remote peer is on a different network |
InvalidSignature | Peer entry signature verification failed |
GossipJoin(ApiError) | Failed to join gossip topic (iroh_gossip) |
PeerIdChanged(PeerId, PeerId) | Attempted to change the local peer’s ID |
Link(LinkError) | Transport-level link error |
Other(Box<dyn Error>) | Generic boxed error |
Cancelled | Operation was cancelled |
Command errors
groups::CommandError<M> — returned by execute(), execute_many(),
feed(), feed_many():
| Variant | Recoverable? | Description |
|---|---|---|
Offline(Vec<M::Command>) | Yes | Node is offline; carries the unsent commands for retry |
NoCommands | No | Empty command batch was submitted |
GroupTerminated | No | The group has been permanently closed |
Recovering from Offline
match group.execute(MyCommand::Increment).await {
Ok(result) => println!("committed: {result:?}"),
Err(CommandError::Offline(commands)) => {
// Wait for the group to come online, then retry
group.when().online().await;
for cmd in commands {
group.execute(cmd).await?;
}
}
Err(CommandError::GroupTerminated) => {
// Permanent failure — stop trying
}
Err(CommandError::NoCommands) => unreachable!(),
}
Query errors
groups::QueryError<M> — returned by query():
| Variant | Recoverable? | Description |
|---|---|---|
Offline(M::Query) | Yes | Node is offline; carries the unsent query |
GroupTerminated | No | The group has been permanently closed |
Collection errors
collections::Error<T> — returned by collection write operations:
| Variant | Recoverable? | Description |
|---|---|---|
Offline(T) | Yes | Node is offline; carries the value for retry |
NetworkDown | No | Network has shut down |
The generic T contains the value that failed to be written, enabling retry
without re-creating the data:
match map.insert("key".into(), 42).await {
Ok(prev) => println!("previous: {prev:?}"),
Err(collections::Error::Offline(value)) => {
// value == 42, retry later
}
Err(collections::Error::NetworkDown) => {
// permanent failure
}
}
Producer errors
streams::producer::Error<D> — returned by Sink::send() and try_send():
| Variant | Description |
|---|---|
Closed(Option<D>) | Producer has been closed |
Full(D) | Internal buffer is full (back-pressure) |
Offline(D) | No active consumers connected |
All variants carry the datum back when possible, enabling retry.
Close reasons (QUIC application codes)
Mosaik uses typed close reasons for QUIC ApplicationClose codes. These
are generated with the make_close_reason! macro and appear in connection
close frames.
Reserved ranges
| Range | Owner |
|---|---|
| 0–199 | mosaik internal |
| 200+ | Application-defined |
Built-in close reasons
| Name | Code | Description |
|---|---|---|
Success | 200 | Protocol completed successfully |
GracefulShutdown | 204 | Graceful shutdown |
InvalidAlpn | 100 | Wrong ALPN protocol |
DifferentNetwork | 101 | Peer on a different network |
Cancelled | 102 | Operation cancelled |
UnexpectedClose | 103 | Unexpected connection close |
ProtocolViolation | 400 | Protocol message violation |
UnknownPeer | 401 | Peer not found in discovery catalog |
Group close reasons
| Name | Code | Description |
|---|---|---|
InvalidHandshake | 30,400 | Handshake decode error |
GroupNotFound | 30,404 | Unknown group ID |
InvalidProof | 30,405 | Invalid authentication proof |
Timeout | 30,408 | Peer response timeout |
AlreadyBonded | 30,429 | Duplicate bond between peers |
Defining custom close reasons
use mosaik::network::make_close_reason;
// Code must be >= 200
make_close_reason!(MyAppError, 500, "Application-specific error");
Error design patterns
Temporary vs. permanent
Mosaik errors follow a consistent pattern:
- Temporary errors carry the original data back (e.g.,
Offline(commands),Full(datum)) so you can retry without data loss. - Permanent errors (e.g.,
GroupTerminated,NetworkDown) indicate the resource is gone and retrying is pointless.
Matching on recoverability
use mosaik::collections::Error;
loop {
match map.insert("key".into(), value.clone()).await {
Ok(_) => break,
Err(Error::Offline(_)) => {
map.when().online().await;
continue;
}
Err(Error::NetworkDown) => {
panic!("network is gone");
}
}
}
The CloseReason trait
All close reason types implement:
trait CloseReason:
Error + Into<ApplicationClose> + PartialEq<ApplicationClose>
+ Clone + Send + Sync + 'static
{}
This lets you match connection close frames against typed reasons:
if close_frame == InvalidProof {
// handle proof failure
}
Primitives Reference
This chapter documents the foundational types and utilities that the rest of
mosaik builds on. These live in the primitives module (public types) and
internal helpers.
Digest / UniqueId / Tag
The core identifier type — a 32-byte blake3 hash.
use mosaik::Digest; // re-exported at crate root
Digest, UniqueId, and Tag are all the same type, aliased for clarity in
different contexts:
| Alias | Used for |
|---|---|
Digest | General-purpose 32-byte identifier |
UniqueId | Derived identity (groups, stores, bonds) |
Tag | Discovery tags for peer classification |
NetworkId | Network identifier (= Digest) |
GroupId | Group identifier (= Digest) |
StreamId | Stream identifier (= Digest) |
StoreId | Collection store identifier (= UniqueId) |
Construction
// From a string (hashes the string with blake3)
let tag = Digest::from("validator");
// From a hex string (64 chars = direct decode, otherwise hashed)
let id = Digest::from("a1b2c3d4...");
// From numeric types (little-endian encoded, then hashed)
let id = Digest::from_u64(42);
// From multiple parts (concatenated, then hashed)
let id = Digest::from_parts(&["prefix", "suffix"]);
// Derive a child ID deterministically
let child = parent.derive("child-name");
// Random
let id = Digest::random();
// Zero (sentinel value)
let zero = Digest::zero();
Compile-time construction
use mosaik::unique_id;
const MY_ID: UniqueId = unique_id!("a1b2c3d4e5f6..."); // 64-char hex literal
Display
Display— short hex (first 5 bytes):a1b2c3d4e5Debug— full 64-character hex stringShort<T>wrapper — always shows first 5 bytesAbbreviated<const LEN, T>— showsfirst..lastif longer than LEN
Serialization
- Human-readable formats (JSON, TOML): hex string
- Binary formats (postcard): raw 32 bytes
PeerId
type PeerId = iroh::EndpointId;
A peer’s public identity, derived from their Ed25519 public key. This is an
iroh type, not a mosaik Digest. It is used in discovery, bonds, and
connection authentication.
SecretKey
Re-exported from iroh. An Ed25519 secret key that determines a node’s
PeerId. If not provided to NetworkBuilder, a random key is generated
automatically — this is the recommended default for regular nodes.
Specifying a fixed secret key is only recommended for bootstrap nodes that need a stable, well-known peer ID across restarts:
use mosaik::SecretKey;
// Auto-generated (default) — new identity each run
let network = Network::builder()
.network_id("my-network")
.build().await?;
// Fixed key — stable identity, recommended only for bootstrap nodes
let key = SecretKey::generate(&mut rand::rng());
let bootstrap = Network::builder()
.network_id("my-network")
.secret_key(key)
.build().await?;
Wire encoding
All network messages use postcard — a compact, #[no_std]-compatible
binary format using variable-length integers.
| Function | Description |
|---|---|
serialize<T: Serialize>(&T) -> Bytes | Serialize to bytes (panics on failure) |
deserialize<T: DeserializeOwned>(impl AsRef<[u8]>) -> Result<T> | Deserialize from bytes |
These are internal crate functions. Application code interacts with them
indirectly through Link<P> send/receive and collection operations.
Bytes / BytesMut
Re-exported from the bytes crate. Used throughout mosaik for zero-copy
byte buffers:
use mosaik::Bytes;
let data: Bytes = serialize(&my_message);
BackoffFactory
A factory type for creating retry backoff strategies:
type BackoffFactory = Arc<
dyn Fn() -> Box<dyn Backoff + Send + Sync + 'static>
+ Send + Sync + 'static
>;
Used in streams::Config to configure consumer reconnection. The backoff
crate is re-exported at mosaik::streams::backoff.
Formatting utilities
Internal helpers for consistent debug output:
| Type | Description |
|---|---|
Pretty<T> | Pass-through wrapper (for trait integration) |
Short<T> | Display first 5 bytes as hex |
Abbreviated<const LEN, T> | Show first..last hex if longer than LEN bytes |
Redacted<T> | Always prints <redacted> |
FmtIter<W, I> | Format iterator elements comma-separated in brackets |
IntoIterOrSingle<T>
An ergonomic trait that lets API methods accept either a single item or an iterator:
// Both work:
discovery.with_tags("validator"); // single tag
discovery.with_tags(["validator", "relay"]); // multiple tags
This is implemented via two blanket impls using Variant<0> (single item
via Into<T>) and Variant<1> (iterator of Into<T>).
Internal async primitives
These are pub(crate) and not part of the public API, but understanding them
helps when reading mosaik’s source code.
UnboundedChannel<T>
A wrapper around tokio::sync::mpsc::unbounded_channel that keeps both the
sender and receiver in one struct:
let channel = UnboundedChannel::new();
channel.send(42);
let val = channel.recv().await;
| Method | Description |
|---|---|
sender() | Get &UnboundedSender<T> |
receiver() | Get &mut UnboundedReceiver<T> |
send(T) | Send (errors silently ignored) |
recv() | Async receive |
poll_recv(cx) | Poll-based receive |
poll_recv_many(cx, buf, limit) | Batch poll receive |
is_empty() / len() | Queue inspection |
AsyncWorkQueue<T>
A FuturesUnordered wrapper with a permanently-pending sentinel future so
that polling never returns None:
let queue = AsyncWorkQueue::new();
queue.enqueue(async { do_work().await });
// Poll via Stream trait — never completes while empty
while let Some(result) = queue.next().await {
handle(result);
}
BoxPinFut<T>
Type alias for boxed, pinned, send futures:
type BoxPinFut<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
The InternalFutureExt trait adds a .pin() method to any
Future + Send + 'static for ergonomic boxing.
Testing Guide
This chapter covers how to test mosaik applications and how to work with mosaik’s own test infrastructure when contributing.
Test setup
Dependencies
Add these to your [dev-dependencies]:
[dev-dependencies]
mosaik = { version = "0.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Basic test structure
Every mosaik test follows the same pattern:
- Create networks with in-process endpoints.
- Connect them via
sync_with(or mDNS for local tests). - Wait for discovery to propagate.
- Exercise the API.
- Assert state.
#[tokio::test]
async fn two_nodes_discover_each_other() {
let net_a = Network::builder()
.network_id("test")
.build()
.await
.unwrap();
let net_b = Network::builder()
.network_id("test")
.build()
.await
.unwrap();
// Connect the two endpoints directly
net_a.sync_with(net_b.endpoint_addr()).await.unwrap();
// Wait for mutual discovery
let event = net_a.discovery().events().recv().await.unwrap();
assert!(matches!(event, Event::Discovered { .. }));
}
Connecting test networks
sync_with
The simplest way to connect two test nodes:
net_a.sync_with(net_b.endpoint_addr()).await?;
This synchronizes discovery catalogs between the two nodes, establishing mutual awareness.
Discover all (fan-out)
For multi-node tests, connect all pairs:
async fn discover_all(networks: &[&Network]) {
let futs: Vec<_> = networks.iter()
.flat_map(|a| networks.iter().map(move |b| {
a.sync_with(b.endpoint_addr())
}))
.collect();
futures::future::try_join_all(futs).await.unwrap();
}
Mosaik’s test suite provides this as a utility function.
Time management
TIME_FACTOR environment variable
All test durations are multiplied by TIME_FACTOR (default 1.0). This is
useful for running tests on slow CI machines or over high-latency networks:
# Double all timeouts for slow CI
TIME_FACTOR=2.0 cargo test
# 10x for debugging with breakpoints
TIME_FACTOR=10.0 cargo test -- --nocapture
Time helper functions
| Function | Description |
|---|---|
secs(n) | Duration::from_secs(n) × TIME_FACTOR |
millis(n) | Duration::from_millis(n) × TIME_FACTOR |
sleep_s(n) | tokio::time::sleep(secs(n)) |
sleep_ms(n) | tokio::time::sleep(millis(n)) |
timeout_s(n, fut) | Timeout with location tracking |
timeout_ms(n, fut) | Timeout with location tracking |
The timeout_* functions use #[track_caller] so timeout errors report the
test line number, not the utility function.
Tracing
Automatic initialization
Mosaik’s test suite uses #[ctor::ctor] to initialize tracing before any test
runs. Control it with environment variables:
# Enable debug logging
TEST_TRACE=debug cargo test
# Available levels: trace, debug, info, warn, error
TEST_TRACE=trace cargo test -- test_name
# Show all modules (including noisy deps)
TEST_TRACE=debug TEST_TRACE_UNMUTE=1 cargo test
Muted modules
By default, these noisy modules are filtered out:
iroh,rustls,igd_next,hickory_*hyper_util,portmapper,reqwestnetwatch,mio,acto,swarm_discoveryevents.net.relay.connected
Set TEST_TRACE_UNMUTE=1 to see their output.
Panic handling
The test harness installs a custom panic hook that:
- Logs the panic via
tracing::error!. - Calls
std::process::abort()to prevent test framework from masking the panic in async contexts.
Testing patterns
Waiting for conditions
Use the When API instead of arbitrary sleeps:
// Wait for a group to come online
group.when().online().await;
// Wait for a collection to reach a version
map.when().updated(|v| v.index() >= 10).await;
// Wait for bonds to form
group.when().bonds(|b| b.len() >= 2).await;
Testing state machines
Test your state machine in isolation before running it in a group:
#[test]
fn state_machine_logic() {
let mut sm = Counter::default();
let ctx = ApplyContext {
index: 1,
term: 1,
leader: false,
};
let result = sm.apply(CounterCommand::Increment, ctx);
assert_eq!(result, 0); // returns previous value
let result = sm.apply(CounterCommand::Increment, ctx);
assert_eq!(result, 1);
}
Testing collections
#[tokio::test]
async fn replicated_map() {
let (net_a, net_b) = create_connected_pair().await;
let writer: MapWriter<String, u64> = net_a.collections()
.map_writer("my-store")
.build();
let reader: MapReader<String, u64> = net_b.collections()
.map_reader("my-store")
.build();
// Write on node A
writer.insert("key".into(), 42).await.unwrap();
// Wait for replication on node B
reader.when().updated(|v| v.index() >= 1).await;
// Read on node B
assert_eq!(reader.get(&"key".into()), Some(42));
}
Testing streams
#[tokio::test]
async fn stream_delivery() {
let (net_a, net_b) = create_connected_pair().await;
let producer = net_a.streams()
.producer::<Message>("topic")
.build();
let consumer = net_b.streams()
.consumer::<Message>("topic")
.build();
// Wait for consumer to connect
producer.when().active().await;
// Send and receive
producer.send(Message("hello".into())).await.unwrap();
let msg = consumer.recv().await.unwrap();
assert_eq!(msg.0, "hello");
}
Polling futures in tests
For testing poll-based logic:
use std::task::Poll;
/// Poll a future exactly once with a no-op waker
fn poll_once<F: Future + Unpin>(f: &mut F) -> Poll<F::Output> {
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
Pin::new(f).poll(&mut cx)
}
CI considerations
- Set
TIME_FACTOR=2.0or higher for CI environments. - Use
TEST_TRACE=debugto capture logs on failure. - Run tests with
--test-threads=1if you encounter port conflicts. - The test suite uses real networking (loopback), so ensure localhost UDP is available.
Project test structure
tests/
├── basic.rs # Test harness, data types, module declarations
├── collections/ # Map, Vec, Set, DEPQ tests
├── discovery/ # Catalog, departure tests
├── groups/ # Bonds, builder, execute, feed, leader, catchup
├── streams/ # Smoke tests, stats, producer/consumer
└── utils/
├── mod.rs # discover_all helper
├── tracing.rs # Auto-init tracing with ctor
├── time.rs # TIME_FACTOR-aware duration helpers
└── fut.rs # poll_once, forever