From fc7c50b9930b3a4172188c1a591f47777acae37f Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 01/15] Move address prefix index to the txstore db It makes more sense there, since it doesn't depend on any of the data added to the txstore in the first `add` stage. And it needs to be there, for the followup commit that assumes all entries in the history db can be safely deleted when undoing blocks. --- doc/schema.md | 6 +++--- src/new_index/schema.rs | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/doc/schema.md b/doc/schema.md index 4875cb4df..a04a312c9 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -33,9 +33,10 @@ Each transaction results in the following new rows: * `"C{txid}{confirmed-blockhash}" → ""` (a list of blockhashes where `txid` was seen to be confirmed) -Each output results in the following new row: +Each output results in the following new rows: * `"O{txid}{vout}" → "{scriptpubkey}{value}"` + * `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled) When the indexer is synced up to the tip of the chain, the hash of the tip is saved as following: @@ -43,10 +44,9 @@ When the indexer is synced up to the tip of the chain, the hash of the tip is sa ### `history` -Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new rows (`H` is for history, `F` is for funding): +Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new row (`H` is for history, `F` is for funding): * `"H{funding-scripthash}{funding-height}F{funding-txid:vout}{value}" → ""` - * `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled) Each spending input (except the coinbase) results in the following new rows (`S` is for spending): diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 7fb74e825..773431ca3 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -746,7 +746,7 @@ impl ChainQuery { pub fn address_search(&self, prefix: &str, limit: usize) -> Vec { let _timer_scan = self.start_timer("address_search"); self.store - .history_db + .txstore_db .iter_scan(&addr_search_filter(prefix)) .take(limit) .map(|row| std::str::from_utf8(&row.key[1..]).unwrap().to_string()) @@ -1026,6 +1026,12 @@ fn add_transaction( if is_spendable(txo) { rows.push(TxOutRow::new(&txid, txo_index, txo).into_row()); } + + if iconfig.address_search { + if let Some(row) = addr_search_row(&txo.script_pubkey, iconfig.network) { + rows.push(row); + } + } } } @@ -1109,12 +1115,6 @@ fn index_transaction( }), ); rows.push(history.into_row()); - - if iconfig.address_search { - if let Some(row) = addr_search_row(&txo.script_pubkey, iconfig.network) { - rows.push(row); - } - } } } for (txi_index, txi) in tx.input.iter().enumerate() { From 205317671a361817fa705179bdb8b76449a0fdd0 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 02/15] Handle reorgs by undoing history DB entries created by stale blocks Prior to this change, history index entries created by stale blocks would remain in the history DB and only get discarded at read time. This change explicitly removes history entries when a reorg occurs, so we can assume all indexed entries correspond to blocks currently still part of the best chain. This enables optimizing some db lookups (in the followup commits), since readers no longer need to account for stale entries. (Note schema.md was only corrected to match the existing schema, 'D' rows were already being kept for both the history and txstore dbs.) --- doc/schema.md | 6 ++- src/elements/asset.rs | 2 +- src/new_index/db.rs | 16 +++++- src/new_index/schema.rs | 115 ++++++++++++++++++++++++++++++++++------ src/util/block.rs | 46 +++++++++++----- 5 files changed, 153 insertions(+), 32 deletions(-) diff --git a/doc/schema.md b/doc/schema.md index a04a312c9..721418ba0 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -25,7 +25,7 @@ Each block results in the following new rows: * `"M{blockhash}" → "{metadata}"` (block weight, size and number of txs) - * `"D{blockhash}" → ""` (signifies the block is done processing) + * `"D{blockhash}" → ""` (signifies the block was added) Each transaction results in the following new rows: @@ -54,6 +54,10 @@ Each spending input (except the coinbase) results in the following new rows (`S` * `"S{funding-txid:vout}{spending-txid:vin}" → ""` +Each block results in the following new row: + + * `"D{blockhash}" → ""` (signifies the block was indexed) + #### Elements only Assets (re)issuances results in the following new rows (only for user-issued assets): diff --git a/src/elements/asset.rs b/src/elements/asset.rs index 726431b54..aa36c782e 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -509,7 +509,7 @@ where // save updated stats to cache if let Some(lastblock) = lastblock { - chain.store().cache_db().write( + chain.store().cache_db().write_rows( vec![asset_cache_row(asset_id, &newstats, &lastblock)], DBFlush::Enable, ); diff --git a/src/new_index/db.rs b/src/new_index/db.rs index e889aad63..5888e8d20 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -170,7 +170,7 @@ impl DB { } } - pub fn write(&self, mut rows: Vec, flush: DBFlush) { + pub fn write_rows(&self, mut rows: Vec, flush: DBFlush) { log::trace!( "writing {} rows to {:?}, flush={:?}", rows.len(), @@ -182,6 +182,20 @@ impl DB { for row in rows { batch.put(&row.key, &row.value); } + self.write_batch(batch, flush) + } + + pub fn delete_rows(&self, mut rows: Vec, flush: DBFlush) { + log::trace!("deleting {} rows from {:?}", rows.len(), self.db,); + rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + let mut batch = rocksdb::WriteBatch::default(); + for row in rows { + batch.delete(&row.key); + } + self.write_batch(batch, flush) + } + + fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) { let do_flush = match flush { DBFlush::Enable => true, DBFlush::Disable => false, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 773431ca3..7464e3daa 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -75,8 +75,21 @@ impl Store { cache_db.start_stats_exporter(Arc::clone(&db_metrics), "cache_db"); let headers = if let Some(tip_hash) = txstore_db.get(b"t") { - let tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); + let mut tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); let headers_map = load_blockheaders(&txstore_db); + + // Move the tip back until we reach a block that is indexed in the history db. + // It is possible for the tip recorded under the db "t" key to be un-indexed if electrs + // shuts down during reorg handling. Normally this wouldn't matter because the non-indexed + // block would be stale, but it could matter if the chain later re-orged back to + // include the previously stale block because more blocks were built on top of it. + // Without this, the stale-then-not-stale block(s) would not get re-indexed correctly. + while !indexed_blockhashes.contains(&tip_hash) { + tip_hash = headers_map + .get(&tip_hash) + .expect("invalid header chain") + .prev_blockhash; + } debug!( "{} headers were loaded, tip at {:?}", headers_map.len(), @@ -259,22 +272,56 @@ impl Indexer { db.enable_auto_compaction(); } - fn get_new_headers(&self, daemon: &Daemon, tip: &BlockHash) -> Result> { - let headers = self.store.indexed_headers.read().unwrap(); - let new_headers = daemon.get_new_headers(&headers, &tip)?; - let result = headers.order(new_headers); - - if let Some(tip) = result.last() { - info!("{:?} ({} left to index)", tip, result.len()); - }; - Ok(result) + fn get_new_headers( + &self, + daemon: &Daemon, + tip: &BlockHash, + ) -> Result<(Vec, Option)> { + let indexed_headers = self.store.indexed_headers.read().unwrap(); + let raw_new_headers = daemon.get_new_headers(&indexed_headers, &tip)?; + let (new_headers, reorged_since) = indexed_headers.preprocess(raw_new_headers); + + if let Some(tip) = new_headers.last() { + info!("{:?} ({} left to index)", tip, new_headers.len()); + } + Ok((new_headers, reorged_since)) } pub fn update(&mut self, daemon: &Daemon) -> Result { let daemon = daemon.reconnect()?; let tip = daemon.getbestblockhash()?; - let new_headers = self.get_new_headers(&daemon, &tip)?; + let (new_headers, reorged_since) = self.get_new_headers(&daemon, &tip)?; + + // Handle reorgs by undoing the reorged (stale) blocks first + if let Some(reorged_since) = reorged_since { + // Remove reorged headers from the in-memory HeaderList. + // This will also immediately invalidate all the history db entries originating from those blocks + // (even before the rows are deleted below), since they reference block heights that will no longer exist. + // This ensures consistency - it is not possible for blocks to be available (e.g. in GET /blocks/tip or /block/:hash) + // without the corresponding history entries for these blocks (e.g. in GET /address/:address/txs), or vice-versa. + let reorged_headers = self + .store + .indexed_headers + .write() + .unwrap() + .pop(reorged_since); + // The chain tip will temporarily drop to the common ancestor (at height reorged_since-1), + // until the new headers are `append()`ed (below). + + info!( + "processing reorg of depth {} since height {}", + reorged_headers.len(), + reorged_since, + ); + + // Fetch the reorged blocks, then undo their history index db rows. + // The txstore db rows are kept for reorged blocks/transactions. + start_fetcher(self.from, &daemon, reorged_headers)? + .map(|blocks| self.undo_index(&blocks)); + } + + // Add new blocks to the txstore db let to_add = self.headers_to_add(&new_headers); debug!( "adding transactions from {} blocks using {:?}", @@ -284,6 +331,7 @@ impl Indexer { start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks)); self.start_auto_compactions(&self.store.txstore_db); + // Index new blocks to the history db let to_index = self.headers_to_index(&new_headers); debug!( "indexing history from {} blocks using {:?}", @@ -301,12 +349,14 @@ impl Indexer { self.flush = DBFlush::Enable; } - // update the synced tip *after* the new data is flushed to disk + // Update the synced tip after all db writes are flushed debug!("updating synced tip to {:?}", tip); self.store.txstore_db.put_sync(b"t", &serialize(&tip)); + // Finally, append the new headers to the in-memory HeaderList. + // This will make both the headers and the history entries visible in the public APIs, consistently with each-other. let mut headers = self.store.indexed_headers.write().unwrap(); - headers.apply(new_headers); + headers.append(new_headers); assert_eq!(tip, *headers.tip()); if let FetchFrom::BlkFiles = self.from { @@ -326,7 +376,7 @@ impl Indexer { }; { let _timer = self.start_timer("add_write"); - self.store.txstore_db.write(rows, self.flush); + self.store.txstore_db.write_rows(rows, self.flush); } self.store @@ -337,6 +387,37 @@ impl Indexer { } fn index(&self, blocks: &[BlockEntry]) { + self.store + .history_db + .write_rows(self._index(blocks), self.flush); + + let mut indexed_blockhashes = self.store.indexed_blockhashes.write().unwrap(); + indexed_blockhashes.extend(blocks.iter().map(|b| b.entry.hash())); + } + + // Undo the history db entries previously written for the given blocks (that were reorged). + // This includes the TxHistory, TxEdge and BlockDone rows ('H', 'S' and 'D'), + // as well as the Elements history rows ('I' and 'i'). + // + // This does *not* remove any txstore db entries, which are intentionally kept + // even for reorged blocks. + fn undo_index(&self, blocks: &[BlockEntry]) { + self.store + .history_db + .delete_rows(self._index(blocks), self.flush); + // Note this doesn't actually "undo" the rows - the keys are simply deleted, and won't get + // reverted back to their prior value (if there was one). It is expected that the history db + // keys created by blocks are always unique and impossible to already exist from a prior block. + // This is true for all history keys (which always include the height or txid), but for example + // not true for the address prefix search index (in the txstore). + + let mut indexed_blockhashes = self.store.indexed_blockhashes.write().unwrap(); + for block in blocks { + indexed_blockhashes.remove(block.entry.hash()); + } + } + + fn _index(&self, blocks: &[BlockEntry]) -> Vec { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() @@ -353,7 +434,7 @@ impl Indexer { } index_blocks(blocks, &previous_txos_map, &self.iconfig) }; - self.store.history_db.write(rows, self.flush); + rows } pub fn fetch_from(&mut self, from: FetchFrom) { @@ -563,7 +644,7 @@ impl ChainQuery { // save updated utxo set to cache if let Some(lastblock) = lastblock { if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_rows( vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()], DBFlush::Enable, ); @@ -666,7 +747,7 @@ impl ChainQuery { // save updated stats to cache if let Some(lastblock) = lastblock { if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_rows( vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()], DBFlush::Enable, ); diff --git a/src/util/block.rs b/src/util/block.rs index 5dac63bcf..7bbada26e 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -128,12 +128,17 @@ impl HeaderList { ); let mut headers = HeaderList::empty(); - headers.apply(headers.order(headers_chain)); + headers.append(headers.preprocess(headers_chain).0); headers } + /// Pre-process the given `BlockHeader`s to verify they connect to the chain and to + /// transform them into `HeaderEntry`s with heights and hashes - but without saving them. + /// If the headers trigger a reorg, the `reorged_since` height is returned too. + /// Actually applying the headers requires to first pop() the reorged blocks (if any), + /// then append() the new ones. #[trace] - pub fn order(&self, new_headers: Vec) -> Vec { + pub fn preprocess(&self, new_headers: Vec) -> (Vec, Option) { // header[i] -> header[i-1] (i.e. header.last() is the tip) struct HashedHeader { blockhash: BlockHash, @@ -152,7 +157,7 @@ impl HeaderList { } let prev_blockhash = match hashed_headers.first() { Some(h) => h.header.prev_blockhash, - None => return vec![], // hashed_headers is empty + None => return (vec![], None), // hashed_headers is empty }; let new_height: usize = if prev_blockhash == *DEFAULT_BLOCKHASH { 0 @@ -162,18 +167,38 @@ impl HeaderList { .height() + 1 }; - (new_height..) + let header_entries = (new_height..) .zip(hashed_headers.into_iter()) .map(|(height, hashed_header)| HeaderEntry { height, hash: hashed_header.blockhash, header: hashed_header.header, }) - .collect() + .collect(); + let reorged_since = (new_height < self.len()).then_some(new_height); + (header_entries, reorged_since) } + /// Pop off reorged blocks since (including) the given height and return them. #[trace] - pub fn apply(&mut self, new_headers: Vec) { + pub fn pop(&mut self, since_height: usize) -> Vec { + let reorged_headers = self.headers.split_off(since_height); + + for header in &reorged_headers { + self.heights.remove(header.hash()); + } + self.tip = self + .headers + .last() + .map(|h| *h.hash()) + .unwrap_or_else(|| *DEFAULT_BLOCKHASH); + + reorged_headers + } + + /// Append new headers. Expected to always extend the tip (stale blocks must be removed first) + #[trace] + pub fn append(&mut self, new_headers: Vec) { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); @@ -200,7 +225,7 @@ impl HeaderList { new_headers.len(), new_height ); - let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries + assert_eq!(new_height, self.headers.len()); for new_header in new_headers { let height = new_header.height(); assert_eq!(height, self.headers.len()); @@ -214,11 +239,8 @@ impl HeaderList { pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> { let height = self.heights.get(blockhash)?; let header = self.headers.get(*height)?; - if *blockhash == *header.hash() { - Some(header) - } else { - None - } + assert_eq!(header.hash(), blockhash); + Some(header) } #[trace] From 1668f08410378124c1ba9abfb745b91b16a953fb Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 03/15] Optimize address TxHistory lookups for tx history, stats and UTXOs Iterating history db entries now involves a single sequential db scan (plus reads into the in-memory HeaderList), without the per-tx random access db reads that were previously needed to verify confirmation status. --- src/elements/asset.rs | 9 +++++---- src/new_index/schema.rs | 32 ++++++++++++++++++++------------ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/elements/asset.rs b/src/elements/asset.rs index aa36c782e..a19969fb3 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -13,7 +13,7 @@ use crate::elements::registry::{AssetMeta, AssetRegistry}; use crate::errors::*; use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow}; use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query}; -use crate::util::{bincode, full_hash, Bytes, FullHash, TransactionStatus, TxInput}; +use crate::util::{bincode, full_hash, BlockId, Bytes, FullHash, TransactionStatus, TxInput}; lazy_static! { pub static ref NATIVE_ASSET_ID: AssetId = @@ -526,13 +526,14 @@ fn chain_asset_stats_delta( start_height: usize, apply_fn: AssetStatApplyFn, ) -> (T, Option) { + let headers = chain.store().headers(); let history_iter = chain .history_iter_scan(b'I', &asset_id.into_inner()[..], start_height) .map(TxHistoryRow::from_row) .filter_map(|history| { - chain - .tx_confirming_block(&history.get_txid()) - .map(|blockid| (history, blockid)) + // skip over entries that point to non-existing heights (may happen during reorg handling) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut stats = init_stats; diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 7464e3daa..e87da0d37 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -576,13 +576,15 @@ impl ChainQuery { limit: usize, ) -> Vec<(Transaction, BlockId)> { let _timer_scan = self.start_timer("history"); + let headers = self.store.indexed_headers.read().unwrap(); let txs_conf = self .history_iter_scan_reverse(code, hash) - .map(|row| TxHistoryRow::from_row(row).get_txid()) + .map(TxHistoryRow::from_row) + .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() // TODO seek directly to last seen tx without reading earlier rows - .skip_while(|txid| { + .skip_while(|(txid, _)| { // skip until we reach the last_seen_txid last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) }) @@ -590,9 +592,11 @@ impl ChainQuery { Some(_) => 1, // skip the last_seen_txid itself None => 0, }) - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + // skip over entries that point to non-existing heights (may happen during reorg handling) + .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?.into()))) .take(limit) .collect::>(); + drop(headers); self.lookup_txns(&txs_conf) .expect("failed looking up txs in history index") @@ -609,10 +613,13 @@ impl ChainQuery { fn _history_txids(&self, code: u8, hash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { let _timer = self.start_timer("history_txids"); + let headers = self.store.indexed_headers.read().unwrap(); self.history_iter_scan(code, hash, 0) - .map(|row| TxHistoryRow::from_row(row).get_txid()) + .map(TxHistoryRow::from_row) + .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) .unique() - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + // skip over entries that point to non-existing heights (may happen during reorg handling) + .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?.into()))) .take(limit) .collect() } @@ -686,12 +693,14 @@ impl ChainQuery { limit: usize, ) -> Result<(UtxoMap, Option, usize)> { let _timer = self.start_timer("utxo_delta"); + let headers = self.store.indexed_headers.read().unwrap(); let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) + // skip over entries that point to non-existing heights (may happen during reorg handling) .filter_map(|history| { - self.tx_confirming_block(&history.get_txid()) - .map(|b| (history, b)) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut utxos = init_utxos; @@ -764,15 +773,14 @@ impl ChainQuery { start_height: usize, ) -> (ScriptStats, Option) { let _timer = self.start_timer("stats_delta"); // TODO: measure also the number of txns processed. + let headers = self.store.indexed_headers.read().unwrap(); let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) + // skip over entries that point to non-existing heights (may happen during reorg handling) .filter_map(|history| { - self.tx_confirming_block(&history.get_txid()) - // drop history entries that were previously confirmed in a re-orged block and later - // confirmed again at a different height - .filter(|blockid| blockid.height == history.key.confirmed_height as usize) - .map(|blockid| (history, blockid)) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut stats = init_stats; From f4a47902ca5f1f999d31702c3a53ac36adda1c6a Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 04/15] Add tests for reorg scenarios --- tests/common.rs | 12 +++ tests/rest.rs | 216 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+) diff --git a/tests/common.rs b/tests/common.rs index 5fb995d2d..ff8dbcdc9 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -276,6 +276,18 @@ impl TestRunner { } } +// Make the RpcApi methods available directly on TestRunner, +// without having to go through the node_client() getter +impl bitcoincore_rpc::RpcApi for TestRunner { + fn call serde::de::Deserialize<'a>>( + &self, + cmd: &str, + args: &[serde_json::Value], + ) -> bitcoincore_rpc::Result { + self.node_client().call(cmd, args) + } +} + pub fn init_rest_tester() -> Result<(rest::Handle, net::SocketAddr, TestRunner)> { let tester = TestRunner::new()?; let rest_server = rest::start(Arc::clone(&tester.config), Arc::clone(&tester.query)); diff --git a/tests/rest.rs b/tests/rest.rs index 382ad16fd..1410658b6 100644 --- a/tests/rest.rs +++ b/tests/rest.rs @@ -2,6 +2,9 @@ use bitcoind::bitcoincore_rpc::RpcApi; use serde_json::Value; use std::collections::HashSet; +#[cfg(not(feature = "liquid"))] +use {bitcoin::Amount, serde_json::from_value}; + use electrs::chain::Txid; pub mod common; @@ -207,6 +210,219 @@ fn test_rest() -> Result<()> { let status = empty_package_resp.status(); assert_eq!(status, 400); + // Reorg handling tests + #[cfg(not(feature = "liquid"))] + { + let get_conf_height = |txid| -> Result> { + Ok(get_json(&format!("/tx/{}/status", txid))?["block_height"].as_u64()) + }; + let get_chain_stats = |addr| -> Result { + Ok(get_json(&format!("/address/{}", addr))?["chain_stats"].take()) + }; + let get_chain_txs = |addr| -> Result> { + Ok(from_value(get_json(&format!( + "/address/{}/txs/chain", + addr + ))?)?) + }; + let get_outspend = |outpoint: &bitcoin::OutPoint| -> Result { + get_json(&format!("/tx/{}/outspend/{}", outpoint.txid, outpoint.vout)) + }; + + let init_height = tester.node_client().get_block_count()?; + + let address = tester.newaddress()?; + let miner_address = tester.newaddress()?; + + let txid_a = tester.send(&address, Amount::from_sat(100000))?; + let txid_b = tester.send(&address, Amount::from_sat(200000))?; + let txid_c = tester.send(&address, Amount::from_sat(500000))?; + + let tx_a = tester.get_raw_transaction(&txid_a, None)?; + let tx_b = tester.get_raw_transaction(&txid_b, None)?; + let tx_c = tester.get_raw_transaction(&txid_c, None)?; + + // Confirm tx_a, tx_b and tx_c + let blockhash_1 = tester.mine()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 1).to_string() + ); + assert_eq!(get_plain("/blocks/tip/hash")?, blockhash_1.to_string()); + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_c)?, Some(init_height + 1)); + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(800000) + ); + assert_eq!(get_chain_txs(&address)?.len(), 3); + + let c_outspend = get_outspend(&tx_c.input[0].previous_output)?; + assert_eq!( + c_outspend["txid"].as_str(), + Some(txid_c.to_string().as_str()) + ); + assert_eq!( + c_outspend["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + // Reorg the last block, re-confirm tx_a at the same height + tester.invalidate_block(&blockhash_1)?; + tester.call::( + "generateblock", + &[ + miner_address.to_string().into(), + [txid_a.to_string()].into(), + ], + )?; + // Re-confirm tx_b at a different height + tester.call::( + "generateblock", + &[ + miner_address.to_string().into(), + [txid_b.to_string()].into(), + ], + )?; + // Don't re-confirm tx_c at all + + let blockhash_2 = tester.get_best_block_hash()?; + + tester.sync()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 2).to_string() + ); + assert_eq!(get_plain("/blocks/tip/hash")?, blockhash_2.to_string()); + + // Test address stats (GET /address/:address) + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(300000) + ); + + // Test address history (GET /address/:address/txs/chain) + let addr_txs = get_chain_txs(&address)?; + assert_eq!(addr_txs.len(), 2); + assert_eq!( + addr_txs[0]["txid"].as_str(), + Some(txid_b.to_string().as_str()) + ); + assert_eq!( + addr_txs[0]["status"]["block_height"].as_u64(), + Some(init_height + 2) + ); + assert_eq!( + addr_txs[1]["txid"].as_str(), + Some(txid_a.to_string().as_str()) + ); + assert_eq!( + addr_txs[1]["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + // Test transaction status lookup (GET /tx/:txid/status) + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, Some(init_height + 2)); + assert_eq!(get_conf_height(&txid_c)?, None); + + // Test spend edge lookup (GET /tx/:txid/outspend/:vout) + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!(a_spends["txid"].as_str(), Some(txid_a.to_string().as_str())); + assert_eq!( + a_spends["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + let b_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(b_spends["txid"].as_str(), Some(txid_b.to_string().as_str())); + assert_eq!( + b_spends["status"]["block_height"].as_u64(), + Some(init_height + 2) + ); + let c_spends = get_outspend(&tx_c.input[0].previous_output)?; + assert_eq!(c_spends["status"]["confirmed"].as_bool(), Some(false)); + + // Test a deeper reorg, all the way back to exclude tx_b + tester.generate_to_address(15, &address)?; + tester.sync()?; + tester.invalidate_block(&blockhash_2)?; + + for _ in 0..20 { + // Mine some empty blocks, intentionally without tx_b + tester.call::( + "generateblock", + &[miner_address.to_string().into(), Vec::::new().into()], + )?; + } + tester.sync()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 21).to_string() + ); + assert_eq!( + get_plain("/blocks/tip/hash")?, + tester.get_best_block_hash()?.to_string() + ); + + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(100000) + ); + + let addr_txs = get_chain_txs(&address)?; + assert_eq!(addr_txs.len(), 1); + assert_eq!( + addr_txs[0]["txid"].as_str(), + Some(txid_a.to_string().as_str()) + ); + assert_eq!( + addr_txs[0]["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, None); + assert_eq!(get_conf_height(&txid_c)?, None); + + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!( + a_spends["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + let b_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(b_spends["spent"].as_bool(), Some(false)); + let c_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(c_spends["spent"].as_bool(), Some(false)); + + // Reorg everything back to genesis + tester.invalidate_block(&tester.get_block_hash(1)?)?; + tester.call::( + "generateblock", + &[miner_address.to_string().into(), Vec::::new().into()], + )?; + tester.sync()?; + + assert_eq!(get_plain("/blocks/tip/height")?, 1.to_string()); + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(0) + ); + assert_eq!(get_chain_txs(&address)?.len(), 0); + assert_eq!(get_conf_height(&txid_a)?, None); + assert_eq!(get_conf_height(&txid_b)?, None); + assert_eq!(get_conf_height(&txid_c)?, None); + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!(a_spends["spent"].as_bool(), Some(false)); + + // Mine some blocks so that the followup tests have some coins to play with + tester.generate_to_address(101, &miner_address)?; + tester.sync()?; + } + // bitcoin 28.0 only tests - submitpackage #[cfg(all(not(feature = "liquid"), feature = "bitcoind_28_0"))] { From d687b4667973ddfac001613d03942dcd520a9201 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 05/15] Fix reorg crash recovery when there are >100 reorged blocks --- src/new_index/schema.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index e87da0d37..de7b6ff76 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -300,7 +300,7 @@ impl Indexer { // (even before the rows are deleted below), since they reference block heights that will no longer exist. // This ensures consistency - it is not possible for blocks to be available (e.g. in GET /blocks/tip or /block/:hash) // without the corresponding history entries for these blocks (e.g. in GET /address/:address/txs), or vice-versa. - let reorged_headers = self + let mut reorged_headers = self .store .indexed_headers .write() @@ -315,6 +315,12 @@ impl Indexer { reorged_since, ); + // Reorged blocks are undone in chunks of 100, processed in serial, each as an atomic batch. + // Reverse them so that chunks closest to the chain tip are processed first, + // which is necessary to properly recover from crashes during reorg handling. + // Also see the comment under `Store::open()`. + reorged_headers.reverse(); + // Fetch the reorged blocks, then undo their history index db rows. // The txstore db rows are kept for reorged blocks/transactions. start_fetcher(self.from, &daemon, reorged_headers)? From 57c9bc10cfb68fe1536fe0c7d82fc302d58a4832 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 06/15] Optimize the TxConf confirmations index Changed from an index of `txid -> Set` to `txid -> blockheight` - Instead of a list of blocks seen to include the txid (including stale blocks), map the txid directly to the single block that confirmed it and is still part of the best chain. - Identify blocks by their height instead of their hash. Previously it was necessary to keep the hash to ensure it is still part of the best chain, but now we can assume that it is. - Move the index from the txstore db to the history db, so that its entries will get undone during reorgs. --- doc/schema.md | 8 +++-- src/new_index/schema.rs | 65 +++++++++++++++++------------------------ 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/doc/schema.md b/doc/schema.md index 721418ba0..30d994f39 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -27,12 +27,10 @@ Each block results in the following new rows: * `"D{blockhash}" → ""` (signifies the block was added) -Each transaction results in the following new rows: +Each transaction results in the following new row: * `"T{txid}" → "{serialized-transaction}"` - * `"C{txid}{confirmed-blockhash}" → ""` (a list of blockhashes where `txid` was seen to be confirmed) - Each output results in the following new rows: * `"O{txid}{vout}" → "{scriptpubkey}{value}"` @@ -44,6 +42,10 @@ When the indexer is synced up to the tip of the chain, the hash of the tip is sa ### `history` +Each transaction results in the following new row: + + * `"C{txid}" → "{confirmed-height}"` + Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new row (`H` is for history, `F` is for funding): * `"H{funding-scripthash}{funding-height}F{funding-txid:vout}{value}" → ""` diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index de7b6ff76..ab9b1a561 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -18,6 +18,7 @@ use elements::{ }; use std::collections::{BTreeSet, HashMap, HashSet}; +use std::convert::TryInto; use std::path::Path; use std::sync::{Arc, RwLock}; @@ -402,7 +403,7 @@ impl Indexer { } // Undo the history db entries previously written for the given blocks (that were reorged). - // This includes the TxHistory, TxEdge and BlockDone rows ('H', 'S' and 'D'), + // This includes the TxHistory, TxEdge, TxConf and BlockDone rows ('H', 'S', 'C' and 'D'), // as well as the Elements history rows ('I' and 'i'). // // This does *not* remove any txstore db entries, which are intentionally kept @@ -989,18 +990,11 @@ impl ChainQuery { pub fn tx_confirming_block(&self, txid: &Txid) -> Option { let _timer = self.start_timer("tx_confirming_block"); + let row_value = self.store.history_db.get(&TxConfRow::key(txid))?; + let height = TxConfRow::height_from_val(&row_value); let headers = self.store.indexed_headers.read().unwrap(); - self.store - .txstore_db - .iter_scan(&TxConfRow::filter(&txid[..])) - .map(TxConfRow::from_row) - // header_by_blockhash only returns blocks that are part of the best chain, - // or None for orphaned blocks. - .filter_map(|conf| { - headers.header_by_blockhash(&deserialize(&conf.key.blockhash).unwrap()) - }) - .next() - .map(BlockId::from) + // skip entries that point to non-existing heights (may happen during reorg handling) + Some(headers.header_by_height(height as usize)?.into()) } pub fn get_block_status(&self, hash: &BlockHash) -> BlockStatus { @@ -1074,7 +1068,6 @@ fn load_blockheaders(db: &DB) -> HashMap { fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec { // persist individual transactions: // T{txid} → {rawtx} - // C{txid}{blockhash}{height} → // O{txid}{index} → {txout} // persist block headers', block txids' and metadata rows: // B{blockhash} → {header} @@ -1087,7 +1080,7 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec = b.block.txdata.iter().map(|tx| tx.compute_txid()).collect(); for (tx, txid) in b.block.txdata.iter().zip(txids.iter()) { - add_transaction(*txid, tx, blockhash, &mut rows, iconfig); + add_transaction(*txid, tx, &mut rows, iconfig); } if !iconfig.light_mode { @@ -1103,15 +1096,7 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec, - iconfig: &IndexerConfig, -) { - rows.push(TxConfRow::new(txid, blockhash).into_row()); - +fn add_transaction(txid: Txid, tx: &Transaction, rows: &mut Vec, iconfig: &IndexerConfig) { if !iconfig.light_mode { rows.push(TxRow::new(txid, tx).into_row()); } @@ -1192,12 +1177,17 @@ fn index_transaction( rows: &mut Vec, iconfig: &IndexerConfig, ) { + let txid = full_hash(&tx.compute_txid()[..]); + + // persist tx confirmation row: + // C{txid} → "{block_height}" + rows.push(TxConfRow::new(txid, confirmed_height).into_row()); + // persist history index: // H{funding-scripthash}{funding-height}F{funding-txid:vout} → "" // H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout} → "" // persist "edges" for fast is-this-TXO-spent check // S{funding-txid:vout}{spending-txid:vin} → "" - let txid = full_hash(&tx.compute_txid()[..]); for (txo_index, txo) in tx.output.iter().enumerate() { if is_spendable(txo) || iconfig.index_unspendables { let history = TxHistoryRow::new( @@ -1316,40 +1306,39 @@ impl TxRow { struct TxConfKey { code: u8, txid: FullHash, - blockhash: FullHash, } struct TxConfRow { key: TxConfKey, + value: u32, // the confirmation height } impl TxConfRow { - fn new(txid: Txid, blockhash: FullHash) -> TxConfRow { + fn new(txid: FullHash, height: u32) -> TxConfRow { let txid = full_hash(&txid[..]); TxConfRow { - key: TxConfKey { - code: b'C', - txid, - blockhash, - }, + key: TxConfKey { code: b'C', txid }, + value: height, } } - fn filter(prefix: &[u8]) -> Bytes { - [b"C", prefix].concat() + fn key(txid: &Txid) -> Bytes { + bincode::serialize_little(&TxConfKey { + code: b'C', + txid: full_hash(&txid[..]), + }) + .unwrap() } fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), - value: vec![], + value: self.value.to_le_bytes().to_vec(), } } - fn from_row(row: DBRow) -> Self { - TxConfRow { - key: bincode::deserialize_little(&row.key).expect("failed to parse TxConfKey"), - } + fn height_from_val(val: &[u8]) -> u32 { + u32::from_le_bytes(val.try_into().expect("invalid TxConf value")) } } From 469429ddf2f61855b52a4fac5fe13e3d6c75fb6b Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 07/15] Optimize the TxEdge spending index Changed from an index of `funding_txid:vout -> Set` to `funding_txid:vout -> spending_txid:vin||spending_height` - Instead of a list of inputs seen to spend the outpoint, map the outpoint directly to the single spending input that is still part of the best chain. - Keep the height of the spending transaction, too. This reduces the number of db reads per spend lookup from 2 to 1. --- doc/schema.md | 2 +- src/new_index/schema.rs | 47 +++++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/doc/schema.md b/doc/schema.md index 30d994f39..d9dabf089 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -54,7 +54,7 @@ Each spending input (except the coinbase) results in the following new rows (`S` * `"H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout}{value}" → ""` - * `"S{funding-txid:vout}{spending-txid:vin}" → ""` + * `"S{funding-txid:vout}" → "{spending-txid:vin}{spending-height}"` Each block results in the following new row: diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index ab9b1a561..4d1d78df3 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -974,18 +974,15 @@ impl ChainQuery { pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { let _timer = self.start_timer("lookup_spend"); - self.store - .history_db - .iter_scan(&TxEdgeRow::filter(&outpoint)) - .map(TxEdgeRow::from_row) - .find_map(|edge| { - let txid: Txid = deserialize(&edge.key.spending_txid).unwrap(); - self.tx_confirming_block(&txid).map(|b| SpendingInput { - txid, - vin: edge.key.spending_vin as u32, - confirmed: Some(b), - }) - }) + let edge = TxEdgeValue::from_bytes(&self.store.history_db.get(&TxEdgeRow::key(outpoint))?); + let headers = self.store.indexed_headers.read().unwrap(); + // skip entries that point to non-existing heights (may happen during reorg handling) + let header = headers.header_by_height(edge.spending_height as usize)?; + Some(SpendingInput { + txid: deserialize(&edge.spending_txid).expect("failed to parse Txid"), + vin: edge.spending_vin as u32, + confirmed: Some(header.into()), + }) } pub fn tx_confirming_block(&self, txid: &Txid) -> Option { @@ -1228,6 +1225,7 @@ fn index_transaction( txi.previous_output.vout as u16, txid, txi_index as u16, + confirmed_height, ); rows.push(edge.into_row()); } @@ -1585,12 +1583,18 @@ struct TxEdgeKey { code: u8, funding_txid: FullHash, funding_vout: u16, +} + +#[derive(Serialize, Deserialize)] +struct TxEdgeValue { spending_txid: FullHash, spending_vin: u16, + spending_height: u32, } struct TxEdgeRow { key: TxEdgeKey, + value: TxEdgeValue, } impl TxEdgeRow { @@ -1599,19 +1603,22 @@ impl TxEdgeRow { funding_vout: u16, spending_txid: FullHash, spending_vin: u16, + spending_height: u32, ) -> Self { let key = TxEdgeKey { code: b'S', funding_txid, funding_vout, + }; + let value = TxEdgeValue { spending_txid, spending_vin, + spending_height, }; - TxEdgeRow { key } + TxEdgeRow { key, value } } - fn filter(outpoint: &OutPoint) -> Bytes { - // TODO build key without using bincode? [ b"S", &outpoint.txid[..], outpoint.vout?? ].concat() + fn key(outpoint: &OutPoint) -> Bytes { bincode::serialize_little(&(b'S', full_hash(&outpoint.txid[..]), outpoint.vout as u16)) .unwrap() } @@ -1619,14 +1626,14 @@ impl TxEdgeRow { fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), - value: vec![], + value: bincode::serialize_little(&self.value).unwrap(), } } +} - fn from_row(row: DBRow) -> Self { - TxEdgeRow { - key: bincode::deserialize_little(&row.key).expect("failed to deserialize TxEdgeKey"), - } +impl TxEdgeValue { + fn from_bytes(bytes: &[u8]) -> Self { + bincode::deserialize_little(bytes).expect("invalid TxEdgeValue") } } From 2433a4b8d28f8e4a8374b55530049007320167d3 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 08/15] Implement multi-outpoint TxEdge lookup using MultiGet Now possible with the V2 schema, since the exact TxEdge row key can be derived from the funding_txid:vout alone (previously the key also included the spending_txid, requiring a prefix scan for each lookup). --- src/new_index/query.rs | 25 +++++++++++++++++-------- src/new_index/schema.rs | 24 ++++++++++++++++++++++++ src/rest.rs | 2 +- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 03c5d201f..712fed330 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -1,5 +1,3 @@ -use rayon::prelude::*; - use std::collections::{BTreeSet, HashMap}; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; @@ -153,18 +151,29 @@ impl Query { } #[trace] - pub fn lookup_tx_spends(&self, tx: Transaction) -> Vec> { + pub fn lookup_tx_spends(&self, tx: &Transaction) -> Vec> { let txid = tx.compute_txid(); + let outpoints = tx + .output + .iter() + .enumerate() + .filter(|(_, txout)| is_spendable(txout)) + .map(|(vout, _)| OutPoint::new(txid, vout as u32)) + .collect::>(); + // First fetch all confirmed spends using a MultiGet operation, + // then fall back to the mempool for any outpoints not spent on-chain + let mut chain_spends = self.chain.lookup_spends(outpoints); + let mempool = self.mempool(); tx.output - .par_iter() + .iter() .enumerate() .map(|(vout, txout)| { if is_spendable(txout) { - self.lookup_spend(&OutPoint { - txid, - vout: vout as u32, - }) + let outpoint = OutPoint::new(txid, vout as u32); + chain_spends + .remove(&outpoint) + .or_else(|| mempool.lookup_spend(&outpoint)) } else { None } diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 4d1d78df3..e255a0dcf 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -985,6 +985,30 @@ impl ChainQuery { }) } + pub fn lookup_spends(&self, outpoints: BTreeSet) -> HashMap { + let _timer = self.start_timer("lookup_spends"); + let headers = self.store.indexed_headers.read().unwrap(); + self.store + .history_db + .multi_get(outpoints.iter().map(TxEdgeRow::key)) + .into_iter() + .zip(outpoints) + .filter_map(|(edge_val, outpoint)| { + let edge = TxEdgeValue::from_bytes(&edge_val.unwrap()?); + // skip over entries that point to non-existing heights (may happen during reorg handling) + let header = headers.header_by_height(edge.spending_height as usize)?; + Some(( + outpoint, + SpendingInput { + txid: deserialize(&edge.spending_txid).expect("failed to parse Txid"), + vin: edge.spending_vin as u32, + confirmed: Some(header.into()), + }, + )) + }) + .collect() + } + pub fn tx_confirming_block(&self, txid: &Txid) -> Option { let _timer = self.start_timer("tx_confirming_block"); let row_value = self.store.history_db.get(&TxConfRow::key(txid))?; diff --git a/src/rest.rs b/src/rest.rs index cefc49b7c..ece761b5a 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -996,7 +996,7 @@ fn handle_request( .lookup_txn(&hash) .ok_or_else(|| HttpError::not_found("Transaction not found".to_string()))?; let spends: Vec = query - .lookup_tx_spends(tx) + .lookup_tx_spends(&tx) .into_iter() .map(|spend| spend.map_or_else(SpendingValue::default, SpendingValue::from)) .collect(); From 308e5a0096b7b55458b98a69c4171290cc4e723e Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Mon, 1 Dec 2025 00:38:03 +0200 Subject: [PATCH 09/15] Implement multi-transaction TxConf lookup using MultiGet Now possible with the V2 schema, since the exact TxConf row key can be derived from the txid alone (previously the key also included the block, requiring a prefix scan for each lookup). This isn't used anywhere yet, but will be used in a followup commit for the DB migration script (and could potentially be used for a new public API endpoint). Exposed as a standalone function so that it can be used directly with a `DB`, without having to construct the full `ChainQuery` with a `Daemon`. --- src/new_index/schema.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index e255a0dcf..d6e1d37ec 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -1018,6 +1018,10 @@ impl ChainQuery { Some(headers.header_by_height(height as usize)?.into()) } + pub fn lookup_confirmations(&self, txids: BTreeSet) -> HashMap { + lookup_confirmations(&self.store.history_db, txids) + } + pub fn get_block_status(&self, hash: &BlockHash) -> BlockStatus { // TODO differentiate orphaned and non-existing blocks? telling them apart requires // an additional db read. @@ -1170,6 +1174,18 @@ fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { .map(|val| deserialize(&val).expect("failed to parse TxOut")) } +pub fn lookup_confirmations(history_db: &DB, txids: BTreeSet) -> HashMap { + history_db + .multi_get(txids.iter().map(TxConfRow::key)) + .into_iter() + .zip(txids) + .filter_map(|(res, txid)| { + let confirmation_height = u32::from_le_bytes(res.unwrap()?.try_into().unwrap()); + Some((txid, confirmation_height)) + }) + .collect() +} + fn index_blocks( block_entries: &[BlockEntry], previous_txos_map: &HashMap, From 7beaaba81640a55d687cc3abefca5aa278141e03 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 10/15] Bump DB version, add DB migration script --- src/bin/db-migrate-v1-to-v2.rs | 286 ++++++++++++++++++++++++++++++++ src/bin/electrs.rs | 2 +- src/bin/popular-scripts.rs | 2 +- src/bin/tx-fingerprint-stats.rs | 2 +- src/new_index/db.rs | 31 ++-- src/new_index/schema.rs | 37 +++-- tests/common.rs | 2 +- 7 files changed, 327 insertions(+), 35 deletions(-) create mode 100644 src/bin/db-migrate-v1-to-v2.rs diff --git a/src/bin/db-migrate-v1-to-v2.rs b/src/bin/db-migrate-v1-to-v2.rs new file mode 100644 index 000000000..a32f9ebfe --- /dev/null +++ b/src/bin/db-migrate-v1-to-v2.rs @@ -0,0 +1,286 @@ +use std::collections::BTreeSet; +use std::convert::TryInto; +use std::str; + +use itertools::Itertools; +use log::{debug, info, trace}; +use rocksdb::WriteBatch; + +use bitcoin::hashes::Hash; + +use electrs::chain::{BlockHash, Txid}; +use electrs::new_index::db::DBFlush; +use electrs::new_index::schema::{ + lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxEdgeRow as V2TxEdgeRow, + TxHistoryKey, +}; +use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little}; +use electrs::{config::Config, metrics::Metrics}; + +const FROM_DB_VERSION: u32 = 1; +const TO_DB_VERSION: u32 = 2; + +const BATCH_SIZE: usize = 15000; +const PROGRESS_EVERY: usize = BATCH_SIZE * 50; + +// For Elements-based chains the 'I' asset history index is migrated too +#[cfg(not(feature = "liquid"))] +const HISTORY_PREFIXES: [u8; 1] = [b'H']; +#[cfg(feature = "liquid")] +const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I']; + +fn main() { + let config = Config::from_args(); + let metrics = Metrics::new(config.monitoring_addr); + let store = Store::open(&config, &metrics, false); + + let txstore_db = store.txstore_db(); + let history_db = store.history_db(); + let cache_db = store.cache_db(); + let headers = store.headers(); + + // Check the DB version under `V` matches the expected version + for db in [txstore_db, history_db, cache_db] { + let ver_bytes = db.get(b"V").expect("missing DB version"); + let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap(); + assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver); + } + + // Utility to log progress once every PROGRESS_EVERY ticks + let mut tick = 0usize; + macro_rules! progress { + ($($arg:tt)+) => {{ + tick = tick.wrapping_add(1); + if tick % PROGRESS_EVERY == 0 { + debug!($($arg)+); + } + }}; + } + + // 1. Migrate the address prefix search index + // Moved as-is from the history db to the txstore db + info!("[1/4] migrating address prefix search index..."); + let address_iter = history_db.iter_scan(b"a"); + for chunk in &address_iter.chunks(BATCH_SIZE) { + let mut batch = WriteBatch::default(); + for row in chunk { + progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap()); + batch.put(row.key, row.value); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[1/4] writing batch of {} ops", batch.len()); + txstore_db.write_batch(batch, DBFlush::Disable); + } + // Flush the txstore db, only then delete the original rows from the history db + info!("[1/4] flushing V2 address index to txstore db"); + txstore_db.flush(); + info!("[1/4] deleting V1 address index from history db"); + history_db.delete_range(b"a", b"b", DBFlush::Enable); + + // 2. Migrate the TxConf transaction confirmation index + // - Moved from the txstore db to the history db + // - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain) + // - Changed from the block hash to the block height + // - Entries originating from stale blocks are removed + // Steps 3/4 depend on this index getting migrated first + info!("[2/4] migrating TxConf index..."); + let txconf_iter = txstore_db.iter_scan(b"C"); + for chunk in &txconf_iter.chunks(BATCH_SIZE) { + let mut batch = WriteBatch::default(); + for v1_row in chunk { + let v1_txconf: V1TxConfKey = + deserialize_little(&v1_row.key).expect("invalid TxConfKey"); + let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash); + if let Some(header) = headers.header_by_blockhash(&blockhash) { + // The blockhash is still part of the best chain, use its height to construct the V2 row + let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row(); + batch.put(v2_row.key, v2_row.value); + } else { + // The transaction was reorged, don't write the V2 entry + // trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid)); + } + progress!( + "[2/4] migrating TxConf index ~{:.2}%", + est_hash_progress(&v1_txconf.txid) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[2/4] writing batch of {} ops", batch.len()); + history_db.write_batch(batch, DBFlush::Disable); + } + // Flush the history db, only then delete the original rows from the txstore db + info!("[2/4] flushing V2 TxConf to history db"); + history_db.flush(); + info!("[2/4] deleting V1 TxConf from txstore db"); + txstore_db.delete_range(b"C", b"D", DBFlush::Enable); + + // 3. Migrate the TxEdge spending index + // - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain) + // - Keep the height of the spending tx + // - Entries originating from stale blocks are removed + info!("[3/4] migrating TxEdge index..."); + let txedge_iter = history_db.iter_scan(b"S"); + for chunk in &txedge_iter.chunks(BATCH_SIZE) { + let mut v1_edges = Vec::with_capacity(BATCH_SIZE); + let mut spending_txids = BTreeSet::new(); + for v1_row in chunk { + if let Ok(v1_edge) = deserialize_little::(&v1_row.key) { + spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid)); + v1_edges.push((v1_edge, v1_row.key)); + } + // Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped + // This is necessary to properly recover if the migration stops halfway through. + } + + // Lookup the confirmation status for the entire chunk using a MultiGet operation + let confirmations = lookup_confirmations(history_db, spending_txids); + + let mut batch = WriteBatch::default(); + for (v1_edge, v1_db_key) in v1_edges { + let spending_txid = Txid::from_byte_array(v1_edge.spending_txid); + + // Remove the old V1 entry. V2 entries use a different key. + batch.delete(v1_db_key); + + if let Some(spending_height) = confirmations.get(&spending_txid) { + // Re-add the V2 entry if it is still part of the best chain + let v2_row = V2TxEdgeRow::new( + v1_edge.funding_txid, + v1_edge.funding_vout, + v1_edge.spending_txid, + v1_edge.spending_vin, + *spending_height, // now with the height included + ) + .into_row(); + batch.put(v2_row.key, v2_row.value); + } else { + // The spending transaction was reorged, don't write the V2 entry + //trace!("[3/4] skipping reorged TxEdge for {}", spending_txid); + } + + progress!( + "[3/4] migrating TxEdge index ~{:.2}%", + est_hash_progress(&v1_edge.funding_txid) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[3/4] writing batch of {} ops", batch.len()); + history_db.write_batch(batch, DBFlush::Disable); + } + info!("[3/4] flushing V2 TxEdge index to history db"); + history_db.flush(); + + // 4. Migrate the TxHistory index + // Entries originating from stale blocks are removed, with no other changes + info!("[4/4] migrating TxHistory index..."); + for prefix in HISTORY_PREFIXES { + let txhistory_iter = history_db.iter_scan(&[prefix]); + info!("[4/4] migrating TxHistory index {}", prefix as char); + for chunk in &txhistory_iter.chunks(BATCH_SIZE) { + let mut history_entries = Vec::with_capacity(BATCH_SIZE); + let mut history_txids = BTreeSet::new(); + for row in chunk { + let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey"); + history_txids.insert(hist.txinfo.get_txid()); + history_entries.push((hist, row.key)); + } + + // Lookup the confirmation status for the entire chunk using a MultiGet operation + let confirmations = lookup_confirmations(history_db, history_txids); + + let mut batch = WriteBatch::default(); + for (hist, db_key) in history_entries { + let hist_txid = hist.txinfo.get_txid(); + if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) { + // The history entry originated from a stale block, remove it + batch.delete(db_key); + // trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid()); + } + progress!( + "[4/4] migrating TxHistory index {} ~{:.2}%", + prefix as char, + est_hash_progress(&hist.hash) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[4/4] writing batch of {} deletions", batch.len()); + if !batch.is_empty() { + history_db.write_batch(batch, DBFlush::Disable); + } + } + } + info!("[4/4] flushing TxHistory deletions to history db"); + history_db.flush(); + + // Update the DB version under `V` + let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap(); + for db in [txstore_db, history_db, cache_db] { + db.put_sync(b"V", &ver_bytes); + } + + // Compact everything once at the end + txstore_db.full_compaction(); + history_db.full_compaction(); +} + +// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution +fn est_hash_progress(hash: &FullHash) -> f32 { + u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32 +} + +#[derive(Debug, serde::Deserialize)] +struct V1TxConfKey { + #[allow(dead_code)] + code: u8, + txid: FullHash, + blockhash: FullHash, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct V1TxEdgeKey { + code: u8, + funding_txid: FullHash, + funding_vout: u16, + spending_txid: FullHash, + spending_vin: u16, +} + +/* +use bitcoin::hex::DisplayHex; + +fn dump_db(db: &DB, label: &str, prefix: &[u8]) { + debug!("dumping {}", label); + for item in db.iter_scan(prefix) { + trace!( + "[{}] {} => {}", + label, + fmt_key(&item.key), + &item.value.to_lower_hex_string() + ); + } +} + +fn debug_batch(batch: &WriteBatch, label: &'static str) { + debug!("batch {} with {} ops", label, batch.len()); + batch.iterate(&mut WriteBatchLogIterator(label)); +} + +struct WriteBatchLogIterator(&'static str); +impl rocksdb::WriteBatchIterator for WriteBatchLogIterator { + fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) { + trace!( + "[batch {}] PUT {} => {}", + self.0, + fmt_key(&key), + value.to_lower_hex_string() + ); + } + fn delete(&mut self, key: Box<[u8]>) { + trace!("[batch {}] DELETE {}", self.0, fmt_key(&key)); + } +} + +fn fmt_key(key: &[u8]) -> String { + format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string()) +} +*/ diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 59f957ae5..f8178fbf7 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -68,7 +68,7 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( signal.clone(), &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let mut indexer = Indexer::open( Arc::clone(&store), fetch_from(&config, &store), diff --git a/src/bin/popular-scripts.rs b/src/bin/popular-scripts.rs index a7b245817..6ad39f667 100644 --- a/src/bin/popular-scripts.rs +++ b/src/bin/popular-scripts.rs @@ -8,7 +8,7 @@ use electrs::{ fn main() { let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Store::open(&config.db_path.join("newindex"), &config, &metrics); + let store = Store::open(&config, &metrics, true); let mut iter = store.history_db().raw_iterator(); iter.seek(b"H"); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 83b3f213a..f96c7e7e4 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -24,7 +24,7 @@ fn main() { let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 5888e8d20..d1d62dd3b 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -11,7 +11,7 @@ use crate::config::Config; use crate::new_index::db_metrics::RocksDbMetrics; use crate::util::{bincode, spawn_thread, Bytes}; -static DB_VERSION: u32 = 1; +static DB_VERSION: u32 = 2; #[derive(Debug, Eq, PartialEq)] pub struct DBRow { @@ -87,7 +87,7 @@ pub enum DBFlush { } impl DB { - pub fn open(path: &Path, config: &Config) -> DB { + pub fn open(path: &Path, config: &Config, verify_compat: bool) -> DB { debug!("opening DB at {:?}", path); let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); @@ -119,7 +119,9 @@ impl DB { let db = DB { db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")) }; - db.verify_compatibility(config); + if verify_compat { + db.verify_compatibility(config); + } db } @@ -195,7 +197,7 @@ impl DB { self.write_batch(batch, flush) } - fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) { + pub fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) { let do_flush = match flush { DBFlush::Enable => true, DBFlush::Disable => false, @@ -232,21 +234,20 @@ impl DB { self.db.multi_get(keys) } + /// Remove database entries in the range [from, to) + pub fn delete_range>(&self, from: K, to: K, flush: DBFlush) { + let mut batch = rocksdb::WriteBatch::default(); + batch.delete_range(from, to); + self.write_batch(batch, flush); + } + fn verify_compatibility(&self, config: &Config) { - let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); - - if config.light_mode { - // append a byte to indicate light_mode is enabled. - // we're not letting bincode serialize this so that the compatiblity bytes won't change - // (and require a reindex) when light_mode is disabled. this should be chagned the next - // time we bump DB_VERSION and require a re-index anyway. - compatibility_bytes.push(1); - } + let compatibility_bytes = bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap(); match self.get(b"V") { None => self.put(b"V", &compatibility_bytes), - Some(ref x) if x != &compatibility_bytes => { - panic!("Incompatible database found. Please reindex.") + Some(x) if x != compatibility_bytes => { + panic!("Incompatible database found. Please reindex or migrate.") } Some(_) => (), } diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d6e1d37ec..f79277980 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -19,8 +19,7 @@ use elements::{ use std::collections::{BTreeSet, HashMap, HashSet}; use std::convert::TryInto; -use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use crate::{chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, @@ -59,16 +58,18 @@ pub struct Store { } impl Store { - pub fn open(path: &Path, config: &Config, metrics: &Metrics) -> Self { - let txstore_db = DB::open(&path.join("txstore"), config); + pub fn open(config: &Config, metrics: &Metrics, verify_compat: bool) -> Self { + let path = config.db_path.join("newindex"); + + let txstore_db = DB::open(&path.join("txstore"), config, verify_compat); let added_blockhashes = load_blockhashes(&txstore_db, &BlockRow::done_filter()); debug!("{} blocks were added", added_blockhashes.len()); - let history_db = DB::open(&path.join("history"), config); + let history_db = DB::open(&path.join("history"), config, verify_compat); let indexed_blockhashes = load_blockhashes(&history_db, &BlockRow::done_filter()); debug!("{} blocks were indexed", indexed_blockhashes.len()); - let cache_db = DB::open(&path.join("cache"), config); + let cache_db = DB::open(&path.join("cache"), config, verify_compat); let db_metrics = Arc::new(RocksDbMetrics::new(&metrics)); txstore_db.start_stats_exporter(Arc::clone(&db_metrics), "txstore_db"); @@ -123,6 +124,10 @@ impl Store { &self.cache_db } + pub fn headers(&self) -> RwLockReadGuard { + self.indexed_headers.read().unwrap() + } + pub fn done_initial_sync(&self) -> bool { self.txstore_db.get(b"t").is_some() } @@ -1341,18 +1346,18 @@ impl TxRow { } #[derive(Serialize, Deserialize)] -struct TxConfKey { +pub struct TxConfKey { code: u8, txid: FullHash, } -struct TxConfRow { +pub struct TxConfRow { key: TxConfKey, value: u32, // the confirmation height } impl TxConfRow { - fn new(txid: FullHash, height: u32) -> TxConfRow { + pub fn new(txid: FullHash, height: u32) -> TxConfRow { let txid = full_hash(&txid[..]); TxConfRow { key: TxConfKey { code: b'C', txid }, @@ -1360,7 +1365,7 @@ impl TxConfRow { } } - fn key(txid: &Txid) -> Bytes { + pub fn key(txid: &Txid) -> Bytes { bincode::serialize_little(&TxConfKey { code: b'C', txid: full_hash(&txid[..]), @@ -1368,7 +1373,7 @@ impl TxConfRow { .unwrap() } - fn into_row(self) -> DBRow { + pub fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), value: self.value.to_le_bytes().to_vec(), @@ -1619,26 +1624,26 @@ impl TxHistoryInfo { } #[derive(Serialize, Deserialize)] -struct TxEdgeKey { +pub struct TxEdgeKey { code: u8, funding_txid: FullHash, funding_vout: u16, } #[derive(Serialize, Deserialize)] -struct TxEdgeValue { +pub struct TxEdgeValue { spending_txid: FullHash, spending_vin: u16, spending_height: u32, } -struct TxEdgeRow { +pub struct TxEdgeRow { key: TxEdgeKey, value: TxEdgeValue, } impl TxEdgeRow { - fn new( + pub fn new( funding_txid: FullHash, funding_vout: u16, spending_txid: FullHash, @@ -1663,7 +1668,7 @@ impl TxEdgeRow { .unwrap() } - fn into_row(self) -> DBRow { + pub fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), value: bincode::serialize_little(&self.value).unwrap(), diff --git a/tests/common.rs b/tests/common.rs index ff8dbcdc9..3662920a1 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -144,7 +144,7 @@ impl TestRunner { &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let fetch_from = if !env::var("JSONRPC_IMPORT").is_ok() && !cfg!(feature = "liquid") { // run the initial indexing from the blk files then switch to using the jsonrpc, From d94af92fb064fab47c4fb91cf6eba42537f22351 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 11/15] Implement multi-transaction TxRaw lookup using MultiGet - Change lookup_txns to use MultiGet - Use lookup_txns for block transactions and reconstruction too (GET /block/:hash/txs and GET /block/:hash/raw) (This was already possible with the V1 schema, but related to and builds upon the other V2 changes.) Plus some related changes: - Remove expensive sanity check assertion in lookup_txn (involved txid computation and wasn't really necessary) - Add test for raw block reconstruction --- src/new_index/schema.rs | 96 ++++++++++++++++++++++++++++++----------- src/rest.rs | 45 +++++++------------ tests/rest.rs | 23 +++++----- 3 files changed, 98 insertions(+), 66 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index f79277980..b532bd8f4 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -494,6 +494,28 @@ impl ChainQuery { } } + pub fn get_block_txs( + &self, + hash: &BlockHash, + start_index: usize, + limit: usize, + ) -> Result> { + let txids = self.get_block_txids(hash).chain_err(|| "block not found")?; + ensure!(start_index < txids.len(), "start index out of range"); + + let txids_with_blockhash = txids + .into_iter() + .skip(start_index) + .take(limit) + .map(|txid| (txid, *hash)) + .collect::>(); + + self.lookup_txns(&txids_with_blockhash) + + // XXX use getblock in lightmode? a single RPC call, but would fetch all txs to get one page + // self.daemon.getblock(hash)?.txdata.into_iter().skip(start_index).take(limit).collect() + } + pub fn get_block_meta(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_meta"); @@ -519,17 +541,19 @@ impl ChainQuery { let entry = self.header_by_hash(hash)?; let meta = self.get_block_meta(hash)?; let txids = self.get_block_txids(hash)?; + let txids_with_blockhash: Vec<_> = + txids.into_iter().map(|txid| (txid, *hash)).collect(); + let raw_txs = self.lookup_raw_txns(&txids_with_blockhash).ok()?; // TODO avoid hiding all errors as None, return a Result // Reconstruct the raw block using the header and txids, // as let mut raw = Vec::with_capacity(meta.size as usize); raw.append(&mut serialize(entry.header())); - raw.append(&mut serialize(&VarInt(txids.len() as u64))); + raw.append(&mut serialize(&VarInt(raw_txs.len() as u64))); - for txid in txids { - // we don't need to provide the blockhash because we know we're not in light mode - raw.append(&mut self.lookup_raw_txn(&txid, None)?); + for mut raw_tx in raw_txs { + raw.append(&mut raw_tx); } Some(raw) @@ -589,7 +613,7 @@ impl ChainQuery { ) -> Vec<(Transaction, BlockId)> { let _timer_scan = self.start_timer("history"); let headers = self.store.indexed_headers.read().unwrap(); - let txs_conf = self + let history_iter = self .history_iter_scan_reverse(code, hash) .map(TxHistoryRow::from_row) .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) @@ -605,16 +629,22 @@ impl ChainQuery { None => 0, }) // skip over entries that point to non-existing heights (may happen during reorg handling) - .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?.into()))) - .take(limit) - .collect::>(); + .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?))) + .take(limit); + + let mut txids_with_blockhash = Vec::with_capacity(limit); + let mut blockids = Vec::with_capacity(limit); + for (txid, header) in history_iter { + txids_with_blockhash.push((txid, *header.hash())); + blockids.push(BlockId::from(header)); + } drop(headers); - self.lookup_txns(&txs_conf) + self.lookup_txns(&txids_with_blockhash) .expect("failed looking up txs in history index") .into_iter() - .zip(txs_conf) - .map(|(tx, (_, blockid))| (tx, blockid)) + .zip(blockids) + .map(|(tx, blockid)| (tx, blockid)) .collect() } @@ -926,26 +956,40 @@ impl ChainQuery { .clone() } - // TODO: can we pass txids as a "generic iterable"? - // TODO: should also use a custom ThreadPoolBuilder? - pub fn lookup_txns(&self, txids: &[(Txid, BlockId)]) -> Result> { + pub fn lookup_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { let _timer = self.start_timer("lookup_txns"); - txids - .par_iter() - .map(|(txid, blockid)| { - self.lookup_txn(txid, Some(&blockid.hash)) - .chain_err(|| "missing tx") - }) - .collect::>>() + Ok(self + .lookup_raw_txns(txids)? + .into_iter() + .map(|rawtx| deserialize(&rawtx).expect("failed to parse Transaction")) + .collect()) } pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { let _timer = self.start_timer("lookup_txn"); - self.lookup_raw_txn(txid, blockhash).map(|rawtx| { - let txn: Transaction = deserialize(&rawtx).expect("failed to parse Transaction"); - assert_eq!(*txid, txn.compute_txid()); - txn - }) + let rawtx = self.lookup_raw_txn(txid, blockhash)?; + Some(deserialize(&rawtx).expect("failed to parse Transaction")) + } + + pub fn lookup_raw_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { + let _timer = self.start_timer("lookup_raw_txns"); + if self.light_mode { + txids + .par_iter() + .map(|(txid, blockhash)| { + self.lookup_raw_txn(txid, Some(blockhash)) + .chain_err(|| "missing tx") + }) + .collect() + } else { + let keys = txids.iter().map(|(txid, _)| TxRow::key(&txid[..])); + self.store + .txstore_db + .multi_get(keys) + .into_iter() + .map(|val| val.unwrap().chain_err(|| "missing tx")) + .collect() + } } pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { diff --git a/src/rest.rs b/src/rest.rs index ece761b5a..f991351bb 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -718,41 +718,28 @@ fn handle_request( } (&Method::GET, Some(&"block"), Some(hash), Some(&"txs"), start_index, None) => { let hash = BlockHash::from_str(hash)?; - let txids = query - .chain() - .get_block_txids(&hash) - .ok_or_else(|| HttpError::not_found("Block not found".to_string()))?; - let start_index = start_index .map_or(0u32, |el| el.parse().unwrap_or(0)) .max(0u32) as usize; - if start_index >= txids.len() { - bail!(HttpError::not_found("start index out of range".to_string())); - } else if start_index % CHAIN_TXS_PER_PAGE != 0 { - bail!(HttpError::from(format!( - "start index must be a multipication of {}", - CHAIN_TXS_PER_PAGE - ))); - } - // blockid_by_hash() only returns the BlockId for non-orphaned blocks, - // or None for orphaned - let confirmed_blockid = query.chain().blockid_by_hash(&hash); + ensure!( + start_index % CHAIN_TXS_PER_PAGE == 0, + "start index must be a multipication of {}", + CHAIN_TXS_PER_PAGE + ); + + // The BlockId would not be available for stale blocks + let blockid = query.chain().blockid_by_hash(&hash); - let txs = txids - .iter() - .skip(start_index) - .take(CHAIN_TXS_PER_PAGE) - .map(|txid| { - query - .lookup_txn(&txid) - .map(|tx| (tx, confirmed_blockid.clone())) - .ok_or_else(|| "missing tx".to_string()) - }) - .collect::)>, _>>()?; + let txs = query + .chain() + .get_block_txs(&hash, start_index, CHAIN_TXS_PER_PAGE)? + .into_iter() + .map(|tx| (tx, blockid)) + .collect(); - // XXX orphraned blocks alway get TTL_SHORT - let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query); + // XXX stale blocks alway get TTL_SHORT + let ttl = ttl_by_depth(blockid.map(|b| b.height), query); json_response(prepare_txs(txs, query, config), ttl) } diff --git a/tests/rest.rs b/tests/rest.rs index 1410658b6..420464d16 100644 --- a/tests/rest.rs +++ b/tests/rest.rs @@ -1,3 +1,4 @@ +use bitcoin::hex::FromHex; use bitcoind::bitcoincore_rpc::RpcApi; use serde_json::Value; use std::collections::HashSet; @@ -15,17 +16,9 @@ use common::Result; fn test_rest() -> Result<()> { let (rest_handle, rest_addr, mut tester) = common::init_rest_tester().unwrap(); - let get_json = |path: &str| -> Result { - Ok(ureq::get(&format!("http://{}{}", rest_addr, path)) - .call()? - .into_json::()?) - }; - - let get_plain = |path: &str| -> Result { - Ok(ureq::get(&format!("http://{}{}", rest_addr, path)) - .call()? - .into_string()?) - }; + let get = |path: &str| ureq::get(&format!("http://{}{}", rest_addr, path)).call(); + let get_json = |path: &str| -> Result { Ok(get(path)?.into_json::()?) }; + let get_plain = |path: &str| -> Result { Ok(get(path)?.into_string()?) }; // Send transaction and confirm it let addr1 = tester.newaddress()?; @@ -141,6 +134,14 @@ fn test_rest() -> Result<()> { ); assert_eq!(res["tx_count"].as_u64(), Some(2)); + // Test GET /block/:hash/raw + let mut res = get(&format!("/block/{}/raw", blockhash))?.into_reader(); + let mut rest_rawblock = Vec::new(); + res.read_to_end(&mut rest_rawblock).unwrap(); + let node_hexblock = // uses low-level call() to support Elements + tester.call::("getblock", &[blockhash.to_string().into(), 0.into()])?; + assert_eq!(rest_rawblock, Vec::from_hex(&node_hexblock).unwrap()); + // Test GET /block/:hash/txs let res = get_json(&format!("/block/{}/txs", blockhash))?; let block_txs = res.as_array().expect("list of txs"); From 0f2acc75a475c5d5bd9e06dd3046203278bd780f Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 30 Nov 2025 22:02:43 +0200 Subject: [PATCH 12/15] Optimize ScanIterator to avoid unnecessary copying MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously each key/value read during iteration was getting duplicated 😮 (This doesn't strictly belong to the PR its included in, but it will greatly benefit the DB migration script.) --- src/new_index/db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index d1d62dd3b..dc37a483e 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -38,8 +38,8 @@ impl<'a> Iterator for ScanIterator<'a> { return None; } Some(DBRow { - key: key.to_vec(), - value: value.to_vec(), + key: key.into_vec(), + value: value.into_vec(), }) } } From dafcd6dc9e477fe28b530e70471eff2cada2ba81 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 2 Dec 2025 05:34:53 +0200 Subject: [PATCH 13/15] Fix lookup_confirmations() to account for stale entries Also adds HeaderList::best_height() to help avoid off-by-one errors for the chain length vs tip height (like I initially made when implementing this >.<), and to make getting the tip height of an empty HeaderList an explicit error (previously it over overflow and return usize::MAX). --- src/bin/db-migrate-v1-to-v2.rs | 5 +++-- src/new_index/schema.rs | 16 +++++++++++----- src/util/block.rs | 9 ++++++++- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/bin/db-migrate-v1-to-v2.rs b/src/bin/db-migrate-v1-to-v2.rs index a32f9ebfe..9289f1d77 100644 --- a/src/bin/db-migrate-v1-to-v2.rs +++ b/src/bin/db-migrate-v1-to-v2.rs @@ -38,6 +38,7 @@ fn main() { let history_db = store.history_db(); let cache_db = store.cache_db(); let headers = store.headers(); + let tip_height = headers.best_height() as u32; // Check the DB version under `V` matches the expected version for db in [txstore_db, history_db, cache_db] { @@ -133,7 +134,7 @@ fn main() { } // Lookup the confirmation status for the entire chunk using a MultiGet operation - let confirmations = lookup_confirmations(history_db, spending_txids); + let confirmations = lookup_confirmations(history_db, tip_height, spending_txids); let mut batch = WriteBatch::default(); for (v1_edge, v1_db_key) in v1_edges { @@ -186,7 +187,7 @@ fn main() { } // Lookup the confirmation status for the entire chunk using a MultiGet operation - let confirmations = lookup_confirmations(history_db, history_txids); + let confirmations = lookup_confirmations(history_db, tip_height, history_txids); let mut batch = WriteBatch::default(); for (hist, db_key) in history_entries { diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index b532bd8f4..fccd9ff10 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -375,7 +375,7 @@ impl Indexer { self.from = FetchFrom::Bitcoind; } - self.tip_metric.set(headers.len() as i64 - 1); + self.tip_metric.set(headers.best_height() as i64); Ok(tip) } @@ -940,8 +940,9 @@ impl ChainQuery { .map(BlockId::from) } + /// Get the chain tip height. Panics if called on an empty HeaderList. pub fn best_height(&self) -> usize { - self.store.indexed_headers.read().unwrap().len() - 1 + self.store.indexed_headers.read().unwrap().best_height() } pub fn best_hash(&self) -> BlockHash { @@ -1068,7 +1069,7 @@ impl ChainQuery { } pub fn lookup_confirmations(&self, txids: BTreeSet) -> HashMap { - lookup_confirmations(&self.store.history_db, txids) + lookup_confirmations(&self.store.history_db, self.best_height() as u32, txids) } pub fn get_block_status(&self, hash: &BlockHash) -> BlockStatus { @@ -1223,14 +1224,19 @@ fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { .map(|val| deserialize(&val).expect("failed to parse TxOut")) } -pub fn lookup_confirmations(history_db: &DB, txids: BTreeSet) -> HashMap { +pub fn lookup_confirmations( + history_db: &DB, + tip_height: u32, + txids: BTreeSet, +) -> HashMap { history_db .multi_get(txids.iter().map(TxConfRow::key)) .into_iter() .zip(txids) .filter_map(|(res, txid)| { let confirmation_height = u32::from_le_bytes(res.unwrap()?.try_into().unwrap()); - Some((txid, confirmation_height)) + // skip over entries that point to non-existing heights (may happen during reorg handling) + (confirmation_height <= tip_height).then_some((txid, confirmation_height)) }) .collect() } diff --git a/src/util/block.rs b/src/util/block.rs index 7bbada26e..4b4e8da82 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -270,6 +270,13 @@ impl HeaderList { self.headers.len() } + /// Get the chain tip height. Panics if called on an empty HeaderList. + pub fn best_height(&self) -> usize { + self.len() + .checked_sub(1) + .expect("best_height() on empty HeaderList") + } + pub fn is_empty(&self) -> bool { self.headers.is_empty() } @@ -284,7 +291,7 @@ impl HeaderList { // Matches bitcoind's behaviour: bitcoin-cli getblock `bitcoin-cli getblockhash 0` | jq '.time == .mediantime' if height == 0 { self.headers.get(0).unwrap().header.time - } else if height > self.len() - 1 { + } else if height > self.best_height() { 0 } else { let mut timestamps = (height.saturating_sub(MTP_SPAN - 1)..=height) From 4c72b9fe4c04634a5ab397079ceb859ae27ee125 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Thu, 4 Dec 2025 01:43:59 +0200 Subject: [PATCH 14/15] Fix handling of reorgs that shorten the chain Prior to this fix, `Indexer::update()` would panic on the `assert_eq!(tip, *headers.tip())` assertion when handling reorgs that shorten the existing chain without adding any blocks to replace them. This should not normally happen, but might due to manual `invalidateblock`. For example, this will reproduce the panic: `bitcoin-cli invalidateblock $(bitcoin-cli getbestblockhash)` --- src/new_index/schema.rs | 4 +- src/util/block.rs | 86 +++++++++++++++++++++-------------------- tests/rest.rs | 14 ++++--- 3 files changed, 56 insertions(+), 48 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index fccd9ff10..9f5f96ff1 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -284,8 +284,8 @@ impl Indexer { tip: &BlockHash, ) -> Result<(Vec, Option)> { let indexed_headers = self.store.indexed_headers.read().unwrap(); - let raw_new_headers = daemon.get_new_headers(&indexed_headers, &tip)?; - let (new_headers, reorged_since) = indexed_headers.preprocess(raw_new_headers); + let raw_new_headers = daemon.get_new_headers(&indexed_headers, tip)?; + let (new_headers, reorged_since) = indexed_headers.preprocess(raw_new_headers, tip); if let Some(tip) = new_headers.last() { info!("{:?} ({} left to index)", tip, new_headers.len()); diff --git a/src/util/block.rs b/src/util/block.rs index 4b4e8da82..c5c2f7c5a 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -2,9 +2,9 @@ use crate::chain::{BlockHash, BlockHeader}; use crate::errors::*; use crate::new_index::BlockEntry; +use itertools::Itertools; use std::collections::HashMap; use std::fmt; -use std::iter::FromIterator; use std::slice; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime as DateTime; @@ -128,7 +128,7 @@ impl HeaderList { ); let mut headers = HeaderList::empty(); - headers.append(headers.preprocess(headers_chain).0); + headers.append(headers.preprocess(headers_chain, &tip_hash).0); headers } @@ -138,43 +138,50 @@ impl HeaderList { /// Actually applying the headers requires to first pop() the reorged blocks (if any), /// then append() the new ones. #[trace] - pub fn preprocess(&self, new_headers: Vec) -> (Vec, Option) { + pub fn preprocess( + &self, + new_headers: Vec, + new_tip: &BlockHash, + ) -> (Vec, Option) { // header[i] -> header[i-1] (i.e. header.last() is the tip) - struct HashedHeader { - blockhash: BlockHash, - header: BlockHeader, - } - let hashed_headers = - Vec::::from_iter(new_headers.into_iter().map(|header| HashedHeader { - blockhash: header.block_hash(), - header, - })); - for i in 1..hashed_headers.len() { - assert_eq!( - hashed_headers[i].header.prev_blockhash, - hashed_headers[i - 1].blockhash - ); - } - let prev_blockhash = match hashed_headers.first() { - Some(h) => h.header.prev_blockhash, - None => return (vec![], None), // hashed_headers is empty - }; - let new_height: usize = if prev_blockhash == *DEFAULT_BLOCKHASH { - 0 + let (new_height, header_entries) = if !new_headers.is_empty() { + let hashed_headers = new_headers + .into_iter() + .map(|h| (h.block_hash(), h)) + .collect::>(); + for ((curr_blockhash, _), (_, next_header)) in hashed_headers.iter().tuple_windows() { + assert_eq!(*curr_blockhash, next_header.prev_blockhash); + } + assert_eq!(hashed_headers.last().unwrap().0, *new_tip); + + let prev_blockhash = &hashed_headers.first().unwrap().1.prev_blockhash; + let new_height = if *prev_blockhash == *DEFAULT_BLOCKHASH { + 0 + } else { + self.header_by_blockhash(prev_blockhash) + .expect("headers do not connect") + .height() + + 1 + }; + let header_entries = (new_height..) + .zip(hashed_headers) + .map(|(height, (hash, header))| HeaderEntry { + height, + hash, + header, + }) + .collect(); + (new_height, header_entries) } else { - self.header_by_blockhash(&prev_blockhash) - .unwrap_or_else(|| panic!("{} is not part of the blockchain", prev_blockhash)) + // No new headers, but the new tip could potentially shorten the chain (or be a no-op if it matches the existing tip) + // This should not normally happen, but might due to manual `invalidateblock` + let new_height = self + .header_by_blockhash(new_tip) + .expect("new tip not in chain") .height() - + 1 + + 1; + (new_height, vec![]) }; - let header_entries = (new_height..) - .zip(hashed_headers.into_iter()) - .map(|(height, hashed_header)| HeaderEntry { - height, - hash: hashed_header.blockhash, - header: hashed_header.header, - }) - .collect(); let reorged_since = (new_height < self.len()).then_some(new_height); (header_entries, reorged_since) } @@ -200,12 +207,9 @@ impl HeaderList { #[trace] pub fn append(&mut self, new_headers: Vec) { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) - for i in 1..new_headers.len() { - assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); - assert_eq!( - *new_headers[i - 1].hash(), - new_headers[i].header().prev_blockhash - ); + for (curr_header, next_header) in new_headers.iter().tuple_windows() { + assert_eq!(curr_header.height() + 1, next_header.height()); + assert_eq!(*curr_header.hash(), next_header.header().prev_blockhash); } let new_height = match new_headers.first() { Some(entry) => { diff --git a/tests/rest.rs b/tests/rest.rs index 420464d16..8cb7a4583 100644 --- a/tests/rest.rs +++ b/tests/rest.rs @@ -399,15 +399,19 @@ fn test_rest() -> Result<()> { let c_spends = get_outspend(&tx_b.input[0].previous_output)?; assert_eq!(c_spends["spent"].as_bool(), Some(false)); + // Invalidate the tip with no replacement, shortening the chain by one block + tester.invalidate_block(&tester.get_best_block_hash()?)?; + tester.sync()?; + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 20).to_string() + ); + // Reorg everything back to genesis tester.invalidate_block(&tester.get_block_hash(1)?)?; - tester.call::( - "generateblock", - &[miner_address.to_string().into(), Vec::::new().into()], - )?; tester.sync()?; - assert_eq!(get_plain("/blocks/tip/height")?, 1.to_string()); + assert_eq!(get_plain("/blocks/tip/height")?, 0.to_string()); assert_eq!( get_chain_stats(&address)?["funded_txo_sum"].as_u64(), Some(0) From 416cb4c112c9f8a72bfad085b27deffab49c964b Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Fri, 5 Dec 2025 05:17:29 +0200 Subject: [PATCH 15/15] Update comment to be more accurate Following https://github.com/Blockstream/electrs/pull/174#discussion_r2589966745 --- src/elements/asset.rs | 2 +- src/new_index/schema.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/elements/asset.rs b/src/elements/asset.rs index a19969fb3..149ebd703 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -531,7 +531,7 @@ fn chain_asset_stats_delta( .history_iter_scan(b'I', &asset_id.into_inner()[..], start_height) .map(TxHistoryRow::from_row) .filter_map(|history| { - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) let header = headers.header_by_height(history.key.confirmed_height as usize)?; Some((history, BlockId::from(header))) }); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 9f5f96ff1..e35d15c9b 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -628,7 +628,7 @@ impl ChainQuery { Some(_) => 1, // skip the last_seen_txid itself None => 0, }) - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?))) .take(limit); @@ -660,7 +660,7 @@ impl ChainQuery { .map(TxHistoryRow::from_row) .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) .unique() - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?.into()))) .take(limit) .collect() @@ -739,7 +739,7 @@ impl ChainQuery { let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|history| { let header = headers.header_by_height(history.key.confirmed_height as usize)?; Some((history, BlockId::from(header))) @@ -819,7 +819,7 @@ impl ChainQuery { let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|history| { let header = headers.header_by_height(history.key.confirmed_height as usize)?; Some((history, BlockId::from(header))) @@ -1026,7 +1026,7 @@ impl ChainQuery { let _timer = self.start_timer("lookup_spend"); let edge = TxEdgeValue::from_bytes(&self.store.history_db.get(&TxEdgeRow::key(outpoint))?); let headers = self.store.indexed_headers.read().unwrap(); - // skip entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) let header = headers.header_by_height(edge.spending_height as usize)?; Some(SpendingInput { txid: deserialize(&edge.spending_txid).expect("failed to parse Txid"), @@ -1045,7 +1045,7 @@ impl ChainQuery { .zip(outpoints) .filter_map(|(edge_val, outpoint)| { let edge = TxEdgeValue::from_bytes(&edge_val.unwrap()?); - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) let header = headers.header_by_height(edge.spending_height as usize)?; Some(( outpoint, @@ -1064,7 +1064,7 @@ impl ChainQuery { let row_value = self.store.history_db.get(&TxConfRow::key(txid))?; let height = TxConfRow::height_from_val(&row_value); let headers = self.store.indexed_headers.read().unwrap(); - // skip entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) Some(headers.header_by_height(height as usize)?.into()) } @@ -1235,7 +1235,7 @@ pub fn lookup_confirmations( .zip(txids) .filter_map(|(res, txid)| { let confirmation_height = u32::from_le_bytes(res.unwrap()?.try_into().unwrap()); - // skip over entries that point to non-existing heights (may happen during reorg handling) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) (confirmation_height <= tip_height).then_some((txid, confirmation_height)) }) .collect()