-
Notifications
You must be signed in to change notification settings - Fork 526
feat: introduce MemWAL writer #5709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for the fast turnaround! I will take a look tonight. Meanwhile, I think the code path deserves some benchmark, can you add that? |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Implements a Memory Write-Ahead Log (MemWAL) system for Lance with: - LSM-tree based write path with WAL, MemTable, and SST layers - Support for maintaining BTree, IVF-PQ, and FTS indexes during writes - Configurable durable/nondurable and sync/async index modes - Benchmark showing 19+ Melem/s throughput with batch size 100 Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
4fae180 to
eb46adc
Compare
rust/lance-io/src/object_store.rs
Outdated
| /// (e.g., another writer already wrote the same WAL entry). | ||
| /// | ||
| /// Returns `Err` with `AlreadyExists` if the destination file exists. | ||
| pub async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need rename_if_not_exists and copy_if_not_exists? We can fulfill rename operation with existing object_store APIs, see how we do that in commit.rs to do atomic manifest commit for local storage.
| if pk_fields.is_empty() { | ||
| return Err(Error::invalid_input( | ||
| "MemWAL requires a primary key on the dataset. \ | ||
| Define a primary key using the 'lance-schema:unenforced-primary-key' field metadata.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Arrow field metadata
| #[derive(Debug, Clone, Default)] | ||
| pub struct MemWalConfig { | ||
| /// Region specification for partitioning writes. | ||
| pub region_specs: Vec<RegionSpec>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should just take a single RegionSpec to begin with, and it should be optional (you can create a MemWAL index without region spec).
We can in the future add APIs like add_region_spec (we can add a TODO for those, don't need to add now)
| /// | ||
| /// This opens the vector index and extracts the IVF model and product | ||
| /// quantizer needed for in-memory index maintenance. | ||
| async fn load_vector_index_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these just be functions, not methods within Dataset?
| /// When false: | ||
| /// - Index updates are deferred | ||
| /// - New data may not appear in index-accelerated queries immediately | ||
| pub indexed_writes: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name is confusing, because it can mean indexed vs not indexed. What about sync_indexed_write
| })?; | ||
|
|
||
| // Best-effort update version hint | ||
| self.write_version_hint(version).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should log a warning if failed
| ..Default::default() | ||
| }; | ||
|
|
||
| self.object_store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also differentiate between local and cloud, since local will use a rename to ensure atomicity?
| } | ||
|
|
||
| // Parallel scan forward with batches of HEAD requests | ||
| let batch_size = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a config in region writer config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch size can just be 2 as default
- Remove unnecessary rename_if_not_exists/copy_if_not_exists wrappers - Change MemWalConfig.region_specs to optional single region_spec - Move load_vector_index_config to standalone function - Rename indexed_writes to sync_indexed_write for clarity - Add warning log comment for manifest update failures - Differentiate local vs cloud manifest writes (rename vs PUT-IF-NOT-EXISTS) - Make manifest_scan_batch_size configurable with default 2 Co-Authored-By: Jack Ye <yezhaoqin@gmail.com> Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
All review comments have been addressed in commit cda38c0:
|
Replace Vec<MemTableFragment> with an actual Lance in-memory Dataset for queryable storage. Each write now: 1. Writes RecordBatch to in-memory Dataset (creates new version) 2. Appends to WAL buffer for durability 3. Updates in-memory indexes Key changes: - memtable.rs: Use Dataset instead of Vec<MemTableFragment> - insert() is now async and writes to memory:// Dataset - Added scan_batches() method to read from Dataset - Simplified WAL flush tracking with HashSet<usize> - writer.rs: Update put() to handle async insert - Clone batch for WAL before async insert - Remove separate indexes field from WriterState - flush.rs: Read from Dataset via scan_batches() - write.rs: Remove MemTableFragment export Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Eliminate batch cloning by using the in-memory Dataset as single source of truth. The WAL buffer now tracks fragment IDs only (no batch data), and scans fragments from the Dataset during flush. Changes: - WAL buffer: Remove batch storage, track fragment IDs as cursor - MemTable: Add scan_fragments_by_ids() for scanning specific fragments - Writer: Remove batch cloning, pass MemTable reference to WAL flush Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace batch collection with direct streaming from the in-memory Dataset scanner. Instead of collecting all batches into a Vec and then creating a RecordBatchIterator, we now: 1. Get DatasetRecordBatchStream from dataset.scan().try_into_stream() 2. Convert to SendableRecordBatchStream (implements StreamingWriteSource) 3. Pass directly to InsertBuilder.execute_stream() This eliminates the intermediate Vec<RecordBatch> allocation and reduces memory pressure during flush operations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- WAL entries are now stored as Lance datasets instead of Arrow IPC streams - Schema: LargeBinary (file_bytes) + Binary (fragment_bytes) with writer_epoch metadata - Enables direct file byte copying from in-memory store (no re-encoding) - Simplified flush logic by removing temp file + rename approach - Added WalEntryData::read() for WAL replay Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add FragmentStore for O(1) append operations (avoids manifest growth) - Replace BTreeMemIndex with SkipListMemIndex using crossbeam-skiplist - Add TTL-based Dataset caching in MemTable for eventual consistency - Store original RecordBatches for efficient Dataset reconstruction Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Based on draft shared from @jackye1995 , cleanup and publish for review, also added a custom fix that
RegionWriterusesArc<EpochGuard>instead ofEpochGuardto avoid unnecessarily incrementing the epochThere are some code that are in the reader, but hard to separate out in the PR, I marked them as dead code for now.