Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Status & Conditions

Both producers and consumers expose a reactive status API through When and ChannelConditions. This allows you to write event-driven code that reacts to changes in subscription state.

The When API

Every Producer and Consumer provides a when() method that returns a &When handle:

let producer = network.streams().produce::<MyDatum>();
let when = producer.when();

Online / Offline

// Resolves when the channel is ready (all publishing conditions met)
when.online().await;

// Resolves when the channel goes offline
when.offline().await;

// Check immediately
if when.is_online() { /* ... */ }

For producers, “online” means the online_when conditions are met (default: at least 1 consumer). For consumers, “online” means at least one producer connection is active.

Subscribed / Unsubscribed

// Resolves when at least one peer is connected
when.subscribed().await;

// Resolves when zero peers are connected
when.unsubscribed().await;

ChannelConditions

when().subscribed() returns a ChannelConditions — a composable future that resolves when the subscription state matches your criteria.

Minimum Peers

// Wait for at least 3 connected peers
when.subscribed().minimum_of(3).await;

Tag Filtering

// Wait for peers that have the "validator" tag
when.subscribed().with_tags("validator").await;

// Multiple tags (all must be present on the peer)
when.subscribed().with_tags(["validator", "us-east"]).await;

Custom Predicates

// Wait for a peer matching an arbitrary condition
when.subscribed()
    .with_predicate(|peer: &PeerEntry| peer.tags().len() >= 3)
    .await;

Combining Conditions

Conditions compose naturally:

// At least 2 validators from the us-east region
when.subscribed()
    .minimum_of(2)
    .with_tags("validator")
    .with_predicate(|p| p.tags().contains(&"us-east".into()))
    .await;

Inverse Conditions

Use unmet() to invert a condition:

// Wait until there are fewer than 3 validators
when.subscribed()
    .minimum_of(3)
    .with_tags("validator")
    .unmet()
    .await;

Re-Triggering

ChannelConditions implements Future and can be polled repeatedly. After resolving, it resets and will resolve again when the condition transitions from not met → met. This makes it suitable for use in loops:

let mut condition = when.subscribed().minimum_of(2);
loop {
    (&mut condition).await;
    println!("We now have 2+ subscribers again!");
}

Checking Immediately

let condition = when.subscribed().minimum_of(3);
if condition.is_condition_met() {
    // There are currently 3+ matching peers
}

ChannelConditions also supports comparison with bool:

if when.subscribed().minimum_of(3) == true {
    // equivalent to is_condition_met()
}

ChannelInfo

ChannelInfo represents an individual connection between a consumer and a producer:

pub struct ChannelInfo {
    stream_id: StreamId,
    criteria: Criteria,
    producer_id: PeerId,
    consumer_id: PeerId,
    stats: Arc<Stats>,
    peer: Arc<PeerEntry>,
    state: watch::Receiver<State>,
}

Connection State

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum State {
    Connecting,   // Handshake in progress
    Connected,    // Active subscription
    Terminated,   // Unrecoverably closed
}

Monitor state changes:

let info: ChannelInfo = /* from producers() or consumers() */;

// Check current state
match info.state() {
    State::Connected => { /* active */ }
    State::Connecting => { /* handshake in progress */ }
    State::Terminated => { /* closed */ }
}

// Watch for state transitions
let mut watcher = info.state_watcher().clone();
while watcher.changed().await.is_ok() {
    println!("State changed to: {:?}", *watcher.borrow());
}

// Wait for disconnection
info.disconnected().await;

Stats

Every channel tracks real-time statistics:

let stats = info.stats();

println!("Datums: {}", stats.datums());   // Total datums transferred
println!("Bytes: {}", stats.bytes());     // Total bytes transferred
println!("Uptime: {:?}", stats.uptime()); // Option<Duration> since connection

// Display formatting included
println!("{}", stats);
// Output: "uptime: 2m 30s, datums: 15432, bytes: 1.23 MB"

Stats implements Display with human-readable formatting using humansize and humantime.

Using Status for Online Conditions

The ChannelConditions type is also used to configure producer online conditions via the builder:

let producer = network.streams()
    .producer::<MyDatum>()
    .online_when(|c| c.minimum_of(2).with_tags("validator"))
    .build()?;

The closure receives a ChannelConditions and returns it with your conditions applied. This is evaluated every time the subscription set changes.