Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions crates/data-chain/src/primary/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub enum PrimaryCommand {
ConsensusDecided {
/// The height that was decided
height: u64,
/// The Cut that was decided (used to sync positions)
cut: Cut,
},
}

Expand Down Expand Up @@ -140,12 +142,16 @@ impl PrimaryHandle {
/// Notify the Primary that consensus has decided on a height
///
/// This triggers the Primary to advance its state and continue producing cuts.
/// The Cut is passed so that the Primary can sync its position tracking with
/// the authoritative decided state - this ensures validators that missed some
/// CARs during collection still have consistent position tracking.
pub async fn notify_decision(
&self,
height: u64,
cut: Cut,
) -> Result<(), mpsc::error::SendError<PrimaryCommand>> {
self.command_sender
.send(PrimaryCommand::ConsensusDecided { height })
.send(PrimaryCommand::ConsensusDecided { height, cut })
.await
}
}
Expand Down Expand Up @@ -449,13 +455,19 @@ impl Primary {
/// Handle commands from external sources (e.g., node)
async fn handle_command(&mut self, cmd: PrimaryCommand) {
match cmd {
PrimaryCommand::ConsensusDecided { height } => {
PrimaryCommand::ConsensusDecided { height, cut } => {
debug!(
height,
validator = %self.config.validator_id,
cut_cars = cut.cars.len(),
"Received consensus decision notification"
);

// CRITICAL: Sync position tracking from the decided Cut BEFORE advancing state
// This ensures validators that missed some CARs during collection still have
// consistent position tracking for subsequent heights
self.state.sync_positions_from_cut(&cut);

// Advance state to allow producing cuts for the next height
self.state.finalize_height(height);

Expand Down
22 changes: 22 additions & 0 deletions crates/data-chain/src/primary/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::attestation::{AggregatedAttestation, Attestation};
use crate::batch::BatchDigest;
use crate::car::Car;
use crate::cut::Cut;
use cipherbft_types::{Hash, ValidatorId};
use std::collections::HashMap;
use std::time::Instant;
Expand Down Expand Up @@ -215,6 +216,27 @@ impl PrimaryState {
self.last_seen_car_hashes.insert(validator, car_hash);
}

/// Sync position tracking from a decided Cut
///
/// When consensus decides on a Cut, all validators must update their position
/// tracking to reflect the decided state. This is critical because:
/// 1. A validator may not have received all CARs during the collection phase
/// 2. Position validation requires sequential positions (no gaps)
/// 3. Without syncing, future CARs will be rejected with PositionGap errors
///
/// This method updates `last_seen_positions` and `last_seen_car_hashes` for
/// each CAR in the decided Cut if the position is higher than what we've seen.
pub fn sync_positions_from_cut(&mut self, cut: &Cut) {
for (validator, car) in &cut.cars {
let current_pos = self.last_seen_positions.get(validator).copied();
// Only update if the decided position is higher than our current tracking
if current_pos.is_none_or(|p| car.position > p) {
self.last_seen_positions.insert(*validator, car.position);
self.last_seen_car_hashes.insert(*validator, car.hash());
}
}
}

/// Get last seen Car hash for parent_ref validation
pub fn last_seen_car_hash(&self, validator: &ValidatorId) -> Option<&Hash> {
self.last_seen_car_hashes.get(validator)
Expand Down
3 changes: 2 additions & 1 deletion crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,9 @@ impl Node {

// Notify Primary that consensus has decided on this height (only when DCL enabled)
// This allows Primary to advance its state and produce cuts for the next height
// CRITICAL: We pass the cut so Primary can sync position tracking from decided CARs
if let Some(ref mut handle) = primary_handle {
if let Err(e) = handle.notify_decision(height.0).await {
if let Err(e) = handle.notify_decision(height.0, cut.clone()).await {
warn!("Failed to notify Primary of consensus decision: {:?}", e);
}
}
Expand Down