Building a Distributed Orderbook
This tutorial walks through the orderbook example — a distributed order-matching engine that combines Streams, Groups, and Raft consensus.
Architecture Overview
The orderbook example demonstrates how multiple mosaik subsystems compose to build a realistic distributed system:
Traders Matchers Observers
┌────────────┐ ┌──────────────────┐ ┌────────────┐
│ Trader A │ │ Matcher 0 │ │ Observer │
│ Producer │─[Order]─►│ Consumer │ │ Consumer │
│ <Order> │ │ <Order> │ │ <Fill> │
└────────────┘ │ │ └────────────┘
│ ┌────────────┐ │ ▲
┌────────────┐ │ │ OrderBook │ │ │
│ Trader B │ │ │ (Raft SM) │ │──[Fill]───┘
│ Producer │─[Order]─►│ │ │ │
│ <Order> │ │ └────────────┘ │
└────────────┘ │ Producer │
│ <Fill> │
└──────────────────┘
┌──────────────────┐
│ Matcher 1 │
│ (replica) │
└──────────────────┘
┌──────────────────┐
│ Matcher 2 │
│ (replica) │
└──────────────────┘
- Traders produce
Orderobjects via Streams - Matchers (a 3-node Raft group) consume orders, replicate them through consensus, and run a price-time priority matching engine
- Fill events are published back as a stream for downstream consumers
Step 1: Define the Domain Types
First, define the types that flow through the system. These types auto-implement the Datum trait for streaming:
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TradingPair {
pub base: String,
pub quote: String,
}
impl TradingPair {
pub fn new(base: &str, quote: &str) -> Self {
Self { base: base.to_string(), quote: quote.to_string() }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Side { Bid, Ask }
/// Price in basis points (1 unit = 0.01). e.g., 300_000 = $3000.00
pub type Price = u64;
pub type Quantity = u64;
/// A limit order — streamable via mosaik Streams
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Order {
pub id: u64,
pub pair: TradingPair,
pub side: Side,
pub price: Price,
pub quantity: Quantity,
pub trader: String,
}
/// A fill produced when orders match
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Fill {
pub bid_order_id: u64,
pub ask_order_id: u64,
pub pair: TradingPair,
pub price: Price,
pub quantity: Quantity,
}
Because Order and Fill derive Serialize + Deserialize, they automatically implement Datum and can be used with mosaik Streams.
Step 2: Implement the State Machine
The heart of the example is the OrderBook state machine. It implements the StateMachine trait so it can be replicated across all group members via Raft:
use mosaik::groups::{ApplyContext, LogReplaySync, StateMachine};
use mosaik::primitives::UniqueId;
use std::collections::BTreeMap;
/// Commands that mutate the orderbook
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookCommand {
PlaceOrder(Order),
CancelOrder(u64),
}
/// Read-only queries against the orderbook state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQuery {
TopOfBook { pair: TradingPair, depth: usize },
Fills,
OrderCount,
}
/// Query results
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderBookQueryResult {
TopOfBook { bids: Vec<(Price, Quantity)>, asks: Vec<(Price, Quantity)> },
Fills(Vec<Fill>),
OrderCount(usize),
}
The StateMachine trait requires four associated types:
Command— mutations that get replicated through Raft (must be serializable)Query— read requests (not replicated)QueryResult— responses to queriesStateSync— mechanism for catching up new nodes
And three core methods:
impl StateMachine for OrderBook {
type Command = OrderBookCommand;
type Query = OrderBookQuery;
type QueryResult = OrderBookQueryResult;
type StateSync = LogReplaySync<Self>;
fn signature(&self) -> UniqueId {
// Unique identifier for this state machine type.
// Contributes to GroupId derivation — different state machines
// produce different GroupIds even with the same key.
UniqueId::from("orderbook_state_machine")
}
fn apply(&mut self, command: Self::Command, _ctx: &dyn ApplyContext) {
match command {
OrderBookCommand::PlaceOrder(order) => self.place_order(order),
OrderBookCommand::CancelOrder(id) => self.cancel_order(id),
}
}
fn query(&self, query: Self::Query) -> Self::QueryResult {
match query {
OrderBookQuery::TopOfBook { pair: _, depth } => {
// Return top N price levels
// ...
}
OrderBookQuery::Fills => {
OrderBookQueryResult::Fills(self.fills.clone())
}
OrderBookQuery::OrderCount => {
let count = self.bids.values()
.chain(self.asks.values())
.map(Vec::len).sum();
OrderBookQueryResult::OrderCount(count)
}
}
}
fn state_sync(&self) -> Self::StateSync {
LogReplaySync::default()
}
}
Key points:
apply()is called on every node in the same order — this is what Raft guarantees. The matching logic must be deterministic.query()reads local state without going through Raft. WithConsistency::Strong, the query is forwarded to the leader.LogReplaySync::default()means new nodes catch up by replaying the entire command log from the beginning.
Step 3: The Matching Engine
The OrderBook implements price-time priority matching. When a new order arrives, it crosses against the opposite side:
impl OrderBook {
fn match_order(&mut self, order: &Order) -> Quantity {
let mut remaining = order.quantity;
match order.side {
Side::Bid => {
// Bids match against asks at or below the bid price
while remaining > 0 {
let Some((&ask_price, _)) = self.asks.first_key_value()
else { break };
if ask_price > order.price { break; }
let ask_level = self.asks.get_mut(&ask_price).unwrap();
while remaining > 0 && !ask_level.is_empty() {
let (ask_id, ask_qty, _) = &mut ask_level[0];
let fill_qty = remaining.min(*ask_qty);
self.fills.push(Fill {
bid_order_id: order.id,
ask_order_id: *ask_id,
pair: self.pair.clone(),
price: ask_price,
quantity: fill_qty,
});
remaining -= fill_qty;
*ask_qty -= fill_qty;
if *ask_qty == 0 { ask_level.remove(0); }
}
if ask_level.is_empty() {
self.asks.remove(&ask_price);
}
}
}
Side::Ask => {
// Asks match against bids at or above the ask price
// (mirror logic, iterating bids from highest)
// ...
}
}
remaining
}
fn place_order(&mut self, order: Order) {
let remaining = self.match_order(&order);
// Any unfilled quantity rests on the book
if remaining > 0 {
let book = match order.side {
Side::Bid => &mut self.bids,
Side::Ask => &mut self.asks,
};
book.entry(order.price)
.or_default()
.push((order.id, remaining, order.trader));
}
}
}
Because apply() is called in the same order on every replica (guaranteed by Raft), the matching results are identical across all nodes.
Step 4: Wire It All Together
The main function creates the network, forms the Raft group, and connects streams:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let pair = TradingPair::new("ETH", "USDC");
// --- 3 matcher nodes forming a Raft group ---
let matcher0 = Network::new(network_id).await?;
let matcher1 = Network::new(network_id).await?;
let matcher2 = Network::new(network_id).await?;
// Cross-discover all matchers
discover_all([&matcher0, &matcher1, &matcher2]).await?;
// Each joins the same group with an OrderBook state machine
let g0 = matcher0.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
let g1 = matcher1.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
let g2 = matcher2.groups().with_key(group_key)
.with_state_machine(OrderBook::new(pair.clone()))
.join();
// Wait for consensus to elect a leader
g0.when().online().await;
g1.when().online().await;
g2.when().online().await;
Connecting Traders via Streams
// --- 2 trader nodes producing orders ---
let trader_a = Network::new(network_id).await?;
let trader_b = Network::new(network_id).await?;
discover_all([&trader_a, &trader_b, &matcher0, &matcher1, &matcher2]).await?;
// Traders produce Order streams
let mut orders_a = trader_a.streams().produce::<Order>();
let mut orders_b = trader_b.streams().produce::<Order>();
// Matcher consumes orders
let mut order_consumer = matcher0.streams().consume::<Order>();
order_consumer.when().subscribed().minimum_of(2).await;
Submitting and Matching Orders
// Trader A: asks (selling ETH)
orders_a.send(Order {
id: 1, pair: pair.clone(), side: Side::Ask,
price: 300_000, quantity: 10, trader: "alice".into(),
}).await?;
// Trader B: bids (buying ETH)
orders_b.send(Order {
id: 4, pair: pair.clone(), side: Side::Bid,
price: 300_500, quantity: 15, trader: "bob".into(),
}).await?;
// Consume orders from stream and execute through Raft
for _ in 0..4 {
let order = order_consumer.next().await
.expect("expected order from stream");
g0.execute(OrderBookCommand::PlaceOrder(order)).await?;
}
Querying Results
// Query fills (strong consistency — goes through leader)
let result = g0.query(OrderBookQuery::Fills, Consistency::Strong).await?;
if let OrderBookQueryResult::Fills(fills) = result.result() {
for fill in fills {
println!("{fill}");
}
}
// Verify replication to followers
g1.when().committed().reaches(g0.committed()).await;
let follower_result = g1.query(
OrderBookQuery::Fills, Consistency::Weak
).await?;
Key patterns demonstrated:
execute()— sends a command through Raft and waits for it to be committed on a quorumquery(..., Strong)— reads through the leader for linearizable resultsquery(..., Weak)— reads local state (faster, but may be stale)when().committed().reaches(n)— waits for a follower to catch up to a specific commit index
The Helper: Cross-Discovery
The example includes a utility to fully connect all nodes:
async fn discover_all(
networks: impl IntoIterator<Item = &Network>,
) -> anyhow::Result<()> {
let networks = networks.into_iter().collect::<Vec<_>>();
for (i, net_i) in networks.iter().enumerate() {
for (j, net_j) in networks.iter().enumerate() {
if i != j {
net_i.discovery().sync_with(net_j.local().addr()).await?;
}
}
}
Ok(())
}
In production, this pairwise sync is unnecessary — a single bootstrap peer handles discovery via gossip. This utility is for testing where all nodes start simultaneously.
Running the Example
cd examples/orderbook
cargo run
Expected output:
waiting for matching engine group to come online...
matching engine online, leader: <peer-id>
matcher subscribed to both trader order streams
submitting orders...
received order: alice ASK ETH/USDC@300000 qty=10
received order: alice ASK ETH/USDC@301000 qty=5
received order: bob BID ETH/USDC@299000 qty=8
received order: bob BID ETH/USDC@300500 qty=15
1 fills produced:
Fill(bid=4, ask=1, ETH/USDC@300000, qty=10)
follower g1 sees 1 fills (consistent with leader)
orderbook example complete
Key Takeaways
- Streams for data ingestion — orders flow from traders to the matching engine via typed pub/sub
- Raft for consensus — the
OrderBookstate machine runs on all replicas with identical results because Raft guarantees the same command order execute()for writes,query()for reads — clean separation between mutations (replicated) and reads (local or forwarded)- Automatic catch-up — new replicas replay the command log to reach the current state
- Composability — Streams + Groups + StateMachine combine naturally for real distributed applications