diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index 721ce98..579e1ef 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -55,6 +55,8 @@ pub enum PrimaryCommand { ConsensusDecided { /// The height that was decided height: u64, + /// The Cut that was decided (used to sync positions) + cut: Cut, }, } @@ -140,12 +142,16 @@ impl PrimaryHandle { /// Notify the Primary that consensus has decided on a height /// /// This triggers the Primary to advance its state and continue producing cuts. + /// The Cut is passed so that the Primary can sync its position tracking with + /// the authoritative decided state - this ensures validators that missed some + /// CARs during collection still have consistent position tracking. pub async fn notify_decision( &self, height: u64, + cut: Cut, ) -> Result<(), mpsc::error::SendError> { self.command_sender - .send(PrimaryCommand::ConsensusDecided { height }) + .send(PrimaryCommand::ConsensusDecided { height, cut }) .await } } @@ -449,13 +455,19 @@ impl Primary { /// Handle commands from external sources (e.g., node) async fn handle_command(&mut self, cmd: PrimaryCommand) { match cmd { - PrimaryCommand::ConsensusDecided { height } => { + PrimaryCommand::ConsensusDecided { height, cut } => { debug!( height, validator = %self.config.validator_id, + cut_cars = cut.cars.len(), "Received consensus decision notification" ); + // CRITICAL: Sync position tracking from the decided Cut BEFORE advancing state + // This ensures validators that missed some CARs during collection still have + // consistent position tracking for subsequent heights + self.state.sync_positions_from_cut(&cut); + // Advance state to allow producing cuts for the next height self.state.finalize_height(height); diff --git a/crates/data-chain/src/primary/state.rs b/crates/data-chain/src/primary/state.rs index d7c646c..1b51218 100644 --- a/crates/data-chain/src/primary/state.rs +++ b/crates/data-chain/src/primary/state.rs @@ -3,6 +3,7 @@ use crate::attestation::{AggregatedAttestation, Attestation}; use crate::batch::BatchDigest; use crate::car::Car; +use crate::cut::Cut; use cipherbft_types::{Hash, ValidatorId}; use std::collections::HashMap; use std::time::Instant; @@ -215,6 +216,27 @@ impl PrimaryState { self.last_seen_car_hashes.insert(validator, car_hash); } + /// Sync position tracking from a decided Cut + /// + /// When consensus decides on a Cut, all validators must update their position + /// tracking to reflect the decided state. This is critical because: + /// 1. A validator may not have received all CARs during the collection phase + /// 2. Position validation requires sequential positions (no gaps) + /// 3. Without syncing, future CARs will be rejected with PositionGap errors + /// + /// This method updates `last_seen_positions` and `last_seen_car_hashes` for + /// each CAR in the decided Cut if the position is higher than what we've seen. + pub fn sync_positions_from_cut(&mut self, cut: &Cut) { + for (validator, car) in &cut.cars { + let current_pos = self.last_seen_positions.get(validator).copied(); + // Only update if the decided position is higher than our current tracking + if current_pos.is_none_or(|p| car.position > p) { + self.last_seen_positions.insert(*validator, car.position); + self.last_seen_car_hashes.insert(*validator, car.hash()); + } + } + } + /// Get last seen Car hash for parent_ref validation pub fn last_seen_car_hash(&self, validator: &ValidatorId) -> Option<&Hash> { self.last_seen_car_hashes.get(validator) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 52bdd7e..2764ed7 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1030,8 +1030,9 @@ impl Node { // Notify Primary that consensus has decided on this height (only when DCL enabled) // This allows Primary to advance its state and produce cuts for the next height + // CRITICAL: We pass the cut so Primary can sync position tracking from decided CARs if let Some(ref mut handle) = primary_handle { - if let Err(e) = handle.notify_decision(height.0).await { + if let Err(e) = handle.notify_decision(height.0, cut.clone()).await { warn!("Failed to notify Primary of consensus decision: {:?}", e); } }