Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

PriorityQueue

PriorityQueue<P, K, V> is a replicated, eventually consistent double-ended priority queue (DEPQ). Each entry has a priority (P), a unique key (K), and a value (V). It supports efficient access to both the minimum and maximum priority elements, key-based lookups, priority updates, and range removals.

Internally it maintains two indexes:

  • by_key: im::HashMap<K, (P, V)> — O(log n) key lookups
  • by_priority: im::OrdMap<P, im::HashMap<K, V>> — O(log n) min/max and range operations

Both indexes use deterministic hashers for consistent iteration order across nodes.

Construction

use mosaik::collections::{PriorityQueue, StoreId, SyncConfig};

// Writer — can read and write
let pq = PriorityQueue::<u64, String, Order>::writer(
    &network,
    StoreId::from("orderbook"),
);

// Writer with custom sync config
let pq = PriorityQueue::<u64, String, Order>::writer_with_config(
    &network, store_id, config,
);

// Reader — read-only, deprioritized for leadership
let pq = PriorityQueue::<u64, String, Order>::reader(&network, store_id);

// Reader with custom sync config
let pq = PriorityQueue::<u64, String, Order>::reader_with_config(
    &network, store_id, config,
);

// Aliases
let pq = PriorityQueue::<u64, String, Order>::new(&network, store_id);
let pq = PriorityQueue::<u64, String, Order>::new_with_config(
    &network, store_id, config,
);

Read operations

Available on both writers and readers.

MethodTimeDescription
len() -> usizeO(1)Number of entries
is_empty() -> boolO(1)Whether the queue is empty
contains_key(&K) -> boolO(log n)Test if a key exists
get(&K) -> Option<V>O(log n)Get the value for a key
get_priority(&K) -> Option<P>O(log n)Get the priority for a key
get_min() -> Option<(P, K, V)>O(log n)Entry with the lowest priority
get_max() -> Option<(P, K, V)>O(log n)Entry with the highest priority
min_priority() -> Option<P>O(log n)Lowest priority value
max_priority() -> Option<P>O(log n)Highest priority value
iter() -> impl Iterator<Item = (P, K, V)>Ascending priority order (alias for iter_asc)
iter_asc() -> impl Iterator<Item = (P, K, V)>Ascending priority order
iter_desc() -> impl Iterator<Item = (P, K, V)>Descending priority order
version() -> VersionO(1)Current committed state version
when() -> &WhenO(1)Access the state observer

When multiple entries share the same priority, get_min() and get_max() return an arbitrary entry from that priority bucket.

// Peek at extremes
if let Some((priority, key, value)) = pq.get_min() {
    println!("Best bid: {key} at priority {priority}");
}

// Look up by key
let price = pq.get_priority(&"order-42".into());

// Iterate in order
for (priority, key, value) in pq.iter_desc() {
    println!("{priority}: {key} = {value:?}");
}

Write operations

Only available on PriorityQueueWriter<P, K, V>.

MethodDescription
insert(P, K, V) -> Result<Version, Error<(P, K, V)>>Insert or update an entry
extend(impl IntoIterator<Item = (P, K, V)>) -> Result<Version, Error<Vec<(P, K, V)>>>Batch insert
update_priority(&K, P) -> Result<Version, Error<K>>Change priority of an existing key
update_value(&K, V) -> Result<Version, Error<K>>Change value of an existing key
compare_exchange_value(&K, V, Option<V>) -> Result<Version, Error<K>>Atomic compare-and-swap on value
remove(&K) -> Result<Version, Error<K>>Remove by key
remove_range(impl RangeBounds<P>) -> Result<Version, Error<()>>Remove all entries in a priority range
clear() -> Result<Version, Error<()>>Remove all entries

If insert is called with a key that already exists, both its priority and value are updated. update_priority and update_value are no-ops if the key doesn’t exist (they still commit to the log).

// Insert
let v = pq.insert(100, "order-1".into(), order).await?;

// Batch insert
let v = pq.extend([
    (100, "order-1".into(), order1),
    (200, "order-2".into(), order2),
]).await?;

// Update just the priority
pq.update_priority(&"order-1".into(), 150).await?;

// Update just the value
pq.update_value(&"order-1".into(), new_order).await?;

// Atomic compare-and-swap on value (priority is preserved)
let v = pq.compare_exchange_value(
    &"order-1".into(),
    order1,        // expected current value
    Some(updated), // new value
).await?;

// Compare-and-swap to remove: expected matches, new is None
let v = pq.compare_exchange_value(
    &"order-1".into(),
    updated,       // expected current value
    None,          // removes the entry
).await?;

// Remove a single entry
pq.remove(&"order-2".into()).await?;

// Remove all entries with priority below 50
pq.remove_range(..50u64).await?;

// Remove entries in a range
pq.remove_range(10..=20).await?;

// Clear everything
pq.clear().await?;

Range syntax

remove_range accepts any RangeBounds<P>, so all standard Rust range syntaxes work:

SyntaxMeaning
..cutoffPriorities below cutoff
..=cutoffPriorities at or below cutoff
cutoff..Priorities at or above cutoff
lo..hiPriorities in [lo, hi)
lo..=hiPriorities in [lo, hi]
..All (equivalent to clear())

Compare-and-swap semantics

compare_exchange_value atomically checks the value of an existing entry and only applies the mutation if it matches the expected parameter. Unlike compare_exchange on Map and Register, this method operates only on the value — the entry’s priority is always preserved.

  • key: The key of the entry to operate on (must already exist).
  • expected: The expected current value (type V, not Option<V> — the key must exist for the exchange to succeed).
  • new: The replacement value. Some(v) updates the value in-place; None removes the entire entry.

If the key does not exist or its current value does not match expected, the operation is a no-op — it commits to the Raft log but does not change the queue.

Note: The entry’s priority is never changed by compare_exchange_value. To atomically update priorities, use update_priority instead.

Error handling

Same pattern as other collections — failed values are returned for retry:

match pq.insert(priority, key, value).await {
    Ok(version) => { /* committed */ }
    Err(Error::Offline((priority, key, value))) => {
        // Retry later
    }
    Err(Error::NetworkDown) => {
        // Permanent failure
    }
}

Status & observation

pq.when().online().await;

let v = pq.insert(100, "k".into(), val).await?;
pq.when().reaches(v).await;

pq.when().updated().await;
pq.when().offline().await;

Dual-index architecture

The DEPQ maintains two synchronized indexes:

by_key:      HashMap<K, (P, V)>     ← key lookups, membership tests
by_priority: OrdMap<P, HashMap<K, V>> ← min/max, range ops, ordered iteration

When a key is inserted or updated, both indexes are updated atomically within the state machine’s apply_batch. During snapshot sync, only the by_key index is serialized; the by_priority index is reconstructed on the receiving side during append.

Group identity

UniqueId::from("mosaik_collections_depq")
    .derive(store_id)
    .derive(type_name::<P>())
    .derive(type_name::<K>())
    .derive(type_name::<V>())