-
Notifications
You must be signed in to change notification settings - Fork 0
Statistics-augmented Catalog Service API and DataFusion connector #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
v0 of the Cascades-style optimizer. - Exhaustive optimization: expression and group returns only when the subgraph is optimized. - Applying enforcer rules and adding generated expressions to the memo table. - Special termination logic is required when the child has the same group + physical requirement as the parent. - Exhaustive exploration when applying rules, generate all bindings before doing the transform, but only expand based on specified rule patterns. Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
## Problem Some methods are added to the IR as experimental features. We also got feedback from the dev meeting that the rule seems hard to read (or long). We would like to clean up these rough edges. ## Summary of changes - eliminate`try_bind_ref_xxx` and use `try_borrow` - add `borrow_raw_parts` so we always refer to `$node_name` instead of `$ref_name`. - Plumb through property methods to use shorthand. **_TODO:_** Pattern builder can also be generated by macros. --------- Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
… catalog_provider
… catalog_provider
.DS_Store
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this and add .DS_Store to .gitignore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I've already added that to .gitignore, not sure why it sneaked through. Will fix this later today.
connectors/datafusion/src/catalog.rs
Outdated
| #[derive(Debug)] | ||
| pub struct OptdSchemaProvider { | ||
| inner: Arc<dyn SchemaProvider>, | ||
| catalog_handle: Option<CatalogServiceHandle>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this an Option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I think this is an artifact I left when I was trying to add caching to the catalog service and forgot to remove it from SchemaProvider or TableProvider. Let me fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a statistics-augmented catalog service API backed by DuckLake and provides DataFusion integration to complete the query optimization pipeline. The implementation adds a thread-safe catalog service using async message passing, wrapper providers for DataFusion integration, and CLI support with optional persistent metadata.
Key Changes:
- Implements
CatalogServicewith async request handling viampscchannels, supporting snapshot-based time travel for statistics - Provides DataFusion connector wrappers (
OptdCatalogProviderList,OptdCatalogProvider,OptdSchemaProvider,OptdTableProvider) to integrate the catalog service into DataFusion's planning - Adds CLI integration with optional DuckLake metadata persistence via
OPTD_METADATA_CATALOG_PATHenvironment variable
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
optd/storage/src/lib.rs |
Removed unused DuckLake storage module scaffolding |
optd/storage/Cargo.toml |
Removed unused storage crate |
optd/catalog/src/lib.rs |
Core catalog implementation with DuckDB backend, snapshot management, and statistics versioning |
optd/catalog/src/service.rs |
Async catalog service using tokio channels for thread-safe access |
optd/catalog/tests/statistics_tests.rs |
Comprehensive unit tests for catalog functionality including edge cases and time-travel |
optd/catalog/tests/service_tests.rs |
Service-level tests covering concurrency, shutdown, and handle cloning |
optd/catalog/Cargo.toml |
Dependencies for DuckDB, tokio, serde, and testing |
connectors/datafusion/src/catalog.rs |
Wrapper providers for DataFusion catalog integration |
connectors/datafusion/src/table.rs |
Table provider wrapper with statistics support |
connectors/datafusion/src/lib.rs |
Public API exports for connector |
connectors/datafusion/tests/integration_test.rs |
Integration tests for DataFusion connector with catalog service |
connectors/datafusion/Cargo.toml |
Dependencies including optd-catalog and async-trait |
cli/src/main.rs |
CLI integration with optional catalog service initialization |
cli/src/lib.rs |
Session context enhancements for external table creation |
cli/tests/catalog_service_integration.rs |
End-to-end integration tests for CLI with catalog service |
cli/smoke_test_cli.sh |
Shell script for manual CLI smoke testing |
cli/Cargo.toml |
CLI dependencies including optd-catalog |
Cargo.toml |
Workspace member cleanup removing optd-storage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #[allow(dead_code)] | ||
| pub struct OptdTable { | ||
| inner: Box<ListingTable>, | ||
| name: String, | ||
| table_reference: TableReference, | ||
| } | ||
|
|
||
| impl OptdTable { | ||
| pub fn try_new( | ||
| inner: ListingTable, | ||
| name: String, | ||
| table_reference: TableReference, | ||
| ) -> Result<Self> { | ||
| Ok(Self { | ||
| inner: Box::new(inner), | ||
| name, | ||
| table_reference, | ||
| }) | ||
| } | ||
|
|
||
| pub fn new_with_inner( | ||
| inner: Box<ListingTable>, | ||
| name: String, | ||
| table_reference: TableReference, | ||
| ) -> Self { | ||
| Self { | ||
| inner, | ||
| name, | ||
| table_reference, | ||
| } | ||
| } | ||
|
|
||
| pub fn name(&self) -> &str { | ||
| &self.name | ||
| } | ||
|
|
||
| pub fn table_reference(&self) -> &TableReference { | ||
| &self.table_reference | ||
| } | ||
| } |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting #[allow(dead_code)] on the entire OptdTable struct and its methods suppresses warnings that could indicate unused code. If this is intended for future use, consider documenting why it exists or remove it if not needed in the current implementation.
| fn supports_filters_pushdown( | ||
| &self, | ||
| filters: &[&Expr], | ||
| ) -> Result<Vec<TableProviderFilterPushDown>> { | ||
| Ok(vec![ | ||
| TableProviderFilterPushDown::Unsupported; | ||
| filters.len() | ||
| ]) | ||
| } |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The supports_filters_pushdown implementation returns Unsupported for all filters. This means filter pushdown optimizations are disabled. If filter pushdown is not yet implemented, add a TODO comment explaining the plan to enable it in the future.
| #!/usr/bin/env bash | ||
| # CLI smoke test - verifies catalog integration is active | ||
|
|
||
| set -e # Exit on error |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shell script uses set -e but does not use set -u (fail on undefined variables) or set -o pipefail (fail if any command in a pipeline fails). This could mask errors in test execution. Consider adding these flags for more robust error detection.
| set -e # Exit on error | |
| set -euo pipefail # Exit on error, undefined variable, or failed pipeline |
| /// Max pending requests | ||
| const CHANNEL_BUFFER_SIZE: usize = 1000; |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel buffer size is hardcoded to 1000. As noted in the PR description, this might need to be configurable. Consider making CHANNEL_BUFFER_SIZE a parameter in the constructor to allow tuning based on expected load, or document why 1000 is the appropriate fixed value.
| /// Max pending requests | |
| const CHANNEL_BUFFER_SIZE: usize = 1000; | |
| // The channel buffer size should be passed as a parameter to the constructor or initialization function | |
| // to allow tuning based on expected load. |
| async fn create_external_table(&self, cmd: &CreateExternalTable) -> Result<DataFrame> { | ||
| let exist = self.inner.table_exist(cmd.name.clone())?; | ||
|
|
||
| if cmd.temporary { | ||
| return not_impl_err!("Temporary tables not supported"); | ||
| } | ||
|
|
||
| if exist { | ||
| match cmd.if_not_exists { | ||
| true => return self.return_empty_dataframe(), | ||
| false => { | ||
| return exec_err!("Table '{}' already exists", cmd.name); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let table_provider: Arc<dyn TableProvider> = self.create_custom_table(cmd).await?; | ||
| self.register_table(cmd.name.clone(), table_provider)?; | ||
|
|
||
| self.return_empty_dataframe() | ||
| } |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the CreateExternalTable handling. The new create_external_table method adds significant functionality but lacks documentation explaining its purpose, parameters, and behavior. Add docstring comments to clarify the intent and usage.
| source: duckdb::Error::ExecuteReturnedResults, | ||
| })?; | ||
|
|
||
| rx.await.map_err(|_| Error::QueryExecution { | ||
| source: duckdb::Error::ExecuteReturnedResults, | ||
| })? |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling here uses ExecuteReturnedResults as a catch-all error for channel communication failures. This is misleading because ExecuteReturnedResults is a DuckDB error meant for SQL execution, not channel communication errors. Consider creating a more specific error variant for channel/communication errors.
| /// Get a reference to the underlying DuckLakeCatalog for test setup only. | ||
| /// Only available in test/debug builds and should | ||
| /// only be used for setting up test fixtures. | ||
| #[cfg(any(test, debug_assertions))] | ||
| pub fn catalog_for_setup(&self) -> &DuckLakeCatalog { | ||
| &self.backend | ||
| } | ||
| } |
Copilot
AI
Dec 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method catalog_for_setup is only available in debug/test builds but returns a reference to the backend which could enable modifications outside the intended async service pattern. Even though this is test-only, consider returning a read-only interface or documenting that this should only be used before spawning the service.
Statistics-augmented Catalog Service API and DataFusion connector
Augment the Statistics module with a Service wrapper to make ready for integration and benchmark. Implement connectors to DataFusion to complete pipeline. Connect to the datafusion CLI for ease of use.
Architecture
Optd Catalog (
optd/catalog/)The main catalog service module. This package provides a runnable service that serves as a thread-safe metadata-augmented catalog backed by DuckLake. Below is the code architecture.
CatalogService: Catalog service in background. Usesmpscchannels to provide serialized access to the encapsulated DuckLake catalog and process incoming requests;CatalogServiceHandle: Handle for sending requests toCatalogService; instantiated together with the backend service when generating newCatalogService. This handle can be cloned to be adapted in multi-threading situations. It currently allows updates to and retrievals from the service for the following information:current_snapshot,current_snapshot_info);current_schema,current_schema_info);table_statistics,update_table_column_stats).DuckLakeCatalog: Wrapper of the DuckLake Catalog connection and implements theCatalogtrait that determines the functionalities exposed.Statistics are versioned: each
update_table_column_statscreates a new snapshot with begin-to-end ranges, while supporting time-travel and advanced statistics stored as JSON.DataFusion Connector (
connectors/datafusion/src/catalog.rs,connectors/datafusion/src/table.rs)Integration of the OptD catalog to fit in DataFusion's planning procedure:
OptdCatalogProviderList: Wrapper of DataFusion's catalog list, can take in aCatalogServiceHandleto communicate with its corresponding service and pass it down to its subordinateOptdCatalogProviders;OptdCatalogProvider: Wrapper of DataFusion's catalog, can take in aCatalogServiceHandle;OptdSchemaProviderandOptdTableProvider: Simple wrapper structs for schemas and tables.CLI (
cli/src/main.rs)Allow default usage of the DataFusion Connector to interact with a background Optd Catalog Service process.
tokio);OPTD_METADATA_CATALOG_PATHor default location.Testing
Optd Catalog
DataFusion Connector
CLI
DuckLakeCatalog, another for full integration among the Catalog service, the DataFusion Connector, and the CLI.cli/src/smoke_test_cli.sh) for checking CLI usage in terminal (also provides intuition for how to use the catalog service).Next Steps
The are two branching options for next steps:
Note 1: currently the catalog service can buffer 1000 requests with
mpsc(fixed). Do we make it a parameter that can be passed into the constructor of CatalogService?Note 2: Should we make DuckLakeCatalog only public within the crate?