Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
36cfae2
first commit
rayhhome Aug 5, 2025
7f74b6d
core: cascades v0 (#3)
yliang412 Aug 7, 2025
8716478
setup CLI and connectors skeleton (#4)
yliang412 Aug 8, 2025
deeac21
setup ci (#5)
yliang412 Aug 8, 2025
746468c
core: clean up methods on the IR (#6)
yliang412 Aug 11, 2025
ef1a181
cargo.lock update
rayhhome Aug 12, 2025
e1bf28c
lib.rs intercept create external table
rayhhome Aug 14, 2025
fa8e974
Merge branch 'main' into catalog_provider
rayhhome Aug 15, 2025
762fae7
optclicontext create external table impl attempt
rayhhome Aug 20, 2025
f92efb1
Merge remote-tracking branch 'origin' into catalog_provider
rayhhome Sep 2, 2025
40274ab
Tentative statistics retriver structs and fetch_table_statistics impl…
rayhhome Oct 18, 2025
f2a17dd
Merge branch 'main' of https://github.com/yliang412/optd into catalog…
rayhhome Oct 18, 2025
bad8c8f
fetch stats, update stats, and preliminary tests impl
rayhhome Oct 19, 2025
af832ba
update dependencies
rayhhome Oct 19, 2025
a610854
add some notes to refactoring
yliang412 Oct 20, 2025
c114adf
fix non unique table name query, use fromIterator, and remove connect…
rayhhome Oct 22, 2025
b0e5329
make tests use tempdir
rayhhome Oct 22, 2025
a85ad15
attempted update_stats impl
rayhhome Oct 24, 2025
ff7ce20
add new snapshot logic
rayhhome Oct 24, 2025
1952737
more rubust tests for stats
rayhhome Oct 28, 2025
86dcfa1
move tests to directory and add basic stats update
rayhhome Oct 28, 2025
c2f31c2
Upgrade fetch and update functions to use adv stats
rayhhome Oct 30, 2025
bf988e3
get schema implementation update
rayhhome Oct 30, 2025
23192a5
documentation & format
rayhhome Oct 30, 2025
dd3f9eb
refactor
yliang412 Oct 31, 2025
54731f6
fix clippy
yliang412 Oct 31, 2025
1753080
refactor methods as associated
rayhhome Nov 4, 2025
7bf7e27
cleanup
rayhhome Nov 5, 2025
d25b78e
modularity refactor + general cleanup
rayhhome Nov 5, 2025
83e4cb4
move to connectors
rayhhome Nov 11, 2025
c658be8
Merge branch 'statistics_api' of https://github.com/yliang412/optd in…
rayhhome Nov 11, 2025
e6e9253
service implementation and test suite
rayhhome Nov 18, 2025
ba0bfa0
rename error
rayhhome Nov 18, 2025
740e1f3
update lock and remove superfluous main
rayhhome Nov 18, 2025
4b579be
catalog implementation and test suite + datafusion version bump
rayhhome Nov 19, 2025
85205c4
cargo fmt
rayhhome Nov 19, 2025
4bebc8f
bump version of datafusion & cli to 51.0
rayhhome Nov 19, 2025
2103a8a
Merge branch 'version-bump' of https://github.com/yliang412/optd into…
rayhhome Nov 19, 2025
4dba0d4
add datafusion features & update gitignore
rayhhome Nov 19, 2025
b5709e9
Merge branch 'version-bump' of https://github.com/yliang412/optd into…
rayhhome Nov 19, 2025
03ebbbb
refactoring inconsistent design and remove ds_store
rayhhome Nov 21, 2025
11e8240
update cli to use catalog service
rayhhome Nov 24, 2025
a487782
add cli smoke and integration tests
rayhhome Nov 25, 2025
5727ced
remove lib extraneous comment
rayhhome Nov 25, 2025
664e55a
Change path naming
rayhhome Nov 25, 2025
851be1f
Add more comprehensive Datafusion integration tests
rayhhome Nov 25, 2025
f6aa2c7
Merge branch 'main' of https://github.com/yliang412/optd into catalog…
rayhhome Dec 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,995 changes: 1,176 additions & 819 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 1 addition & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
[workspace]
resolver = "2"
members = [
"cli",
"connectors/datafusion",
"optd/catalog",
"optd/core",
"optd/storage",
]
members = ["cli", "connectors/datafusion", "optd/core", "optd/catalog"]

# By default, only compiles the `optd-core` crate.
default-members = ["optd/core"]

[workspace.dependencies]

optd-datafusion = { path = "connectors/datafusion" }

tokio = { version = "1.47", features = ["macros", "rt", "sync"] }
tracing = "0.1"

Expand Down
6 changes: 6 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ object_store = "0.12.3"
url = "2.5.4"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tracing = { workspace = true }

futures = "0.3.31"
optd-catalog = { path = "../optd/catalog", version = "0.1" }

[dev-dependencies]
tempfile = "3"
57 changes: 57 additions & 0 deletions cli/smoke_test_cli.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env bash
# CLI smoke test - verifies catalog integration is active

set -e # Exit on error
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shell script uses set -e but does not use set -u (fail on undefined variables) or set -o pipefail (fail if any command in a pipeline fails). This could mask errors in test execution. Consider adding these flags for more robust error detection.

Suggested change
set -e # Exit on error
set -euo pipefail # Exit on error, undefined variable, or failed pipeline

Copilot uses AI. Check for mistakes.

GREEN='\033[0;32m'
RED='\033[0;31m'
RESET='\033[0m'

echo "=== CLI Smoke Test ==="

# Build
echo "Building..."
cargo build --package optd-cli --quiet
if [ ! -f ./target/debug/optd-cli ]; then
echo -e "${RED}✗ Build failed${RESET}"
exit 1
fi

CLI=./target/debug/optd-cli

# Test 1: Basic functionality
echo "Test 1: Basic query execution"
output=$($CLI -c "SELECT 1 as test;" 2>&1)
if [ $? -eq 0 ] && echo "$output" | grep -q "OptD catalog"; then
echo -e "${GREEN}✓ PASS${RESET} - CLI runs, catalog integration active"
else
echo -e "${RED}✗ FAIL${RESET}"
exit 1
fi

# Test 2: Session persistence (multiple commands)
echo "Test 2: Session state persistence"
output=$($CLI -c "CREATE TABLE t (x INT);" -c "INSERT INTO t VALUES (1);" -c "SELECT * FROM t;" 2>&1)
if [ $? -eq 0 ] && echo "$output" | grep -q "1 row"; then
echo -e "${GREEN}✓ PASS${RESET} - Multiple commands work, session persists"
else
echo -e "${RED}✗ FAIL${RESET}"
exit 1
fi

# Test 3: Metadata path configuration
echo "Test 3: Metadata path environment variable"
TMPDIR_PATH=$(mktemp -d)
export OPTD_METADATA_CATALOG_PATH="$TMPDIR_PATH/test.ducklake"
output=$($CLI -c "SELECT 1;" 2>&1)
unset OPTD_METADATA_CATALOG_PATH
rm -rf "$TMPDIR_PATH"
if echo "$output" | grep -q "Using OptD catalog with metadata path"; then
echo -e "${GREEN}✓ PASS${RESET} - Metadata path recognized"
else
echo -e "${RED}✗ FAIL${RESET}"
exit 1
fi

echo ""
echo -e "${GREEN}✓ All smoke tests passed!${RESET}"
79 changes: 68 additions & 11 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;

use datafusion::{
common::{DataFusionError, Result, exec_err, not_impl_err},
datasource::TableProvider,
execution::{SessionStateBuilder, runtime_env::RuntimeEnv},
logical_expr::{CreateExternalTable, LogicalPlanBuilder},
prelude::{DataFrame, SessionConfig, SessionContext},
sql::TableReference,
};
use datafusion_cli::cli_context::CliSessionContext;
use optd_datafusion::{OptdExtensionConfig, SessionStateBuilderOptdExt};
use std::sync::Arc;

pub struct OptdCliSessionContext {
inner: SessionContext,
Expand Down Expand Up @@ -39,10 +42,62 @@ impl OptdCliSessionContext {
&self.inner
}

pub fn return_empty_dataframe(&self) -> datafusion::common::Result<DataFrame> {
let plan = datafusion::logical_expr::LogicalPlanBuilder::empty(false).build()?;
pub fn return_empty_dataframe(&self) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(DataFrame::new(self.inner.state(), plan))
}

async fn create_external_table(&self, cmd: &CreateExternalTable) -> Result<DataFrame> {
let exist = self.inner.table_exist(cmd.name.clone())?;

if cmd.temporary {
return not_impl_err!("Temporary tables not supported");
}

if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
false => {
return exec_err!("Table '{}' already exists", cmd.name);
}
}
}

let table_provider: Arc<dyn TableProvider> = self.create_custom_table(cmd).await?;
self.register_table(cmd.name.clone(), table_provider)?;

self.return_empty_dataframe()
}
Comment on lines +50 to +70
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the CreateExternalTable handling. The new create_external_table method adds significant functionality but lacks documentation explaining its purpose, parameters, and behavior. Add docstring comments to clarify the intent and usage.

Copilot uses AI. Check for mistakes.

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let state = self.inner.state_ref().read().clone();
let file_type = cmd.file_type.to_uppercase();
let factory = state
.table_factories()
.get(file_type.as_str())
.ok_or_else(|| {
DataFusionError::Execution(format!("Unable to find factory for {}", cmd.file_type))
})?;
let table = (*factory).create(&state, cmd).await?;
Ok(table)
}

pub fn register_table(
&self,
table_ref: impl Into<TableReference>,
provider: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table_ref: TableReference = table_ref.into();
let table = table_ref.table().to_owned();
self.inner
.state_ref()
.read()
.schema_for_ref(table_ref)?
.register_table(table, provider)
}
}

impl CliSessionContext for OptdCliSessionContext {
Expand Down Expand Up @@ -72,12 +127,8 @@ impl CliSessionContext for OptdCliSessionContext {
plan: datafusion::logical_expr::LogicalPlan,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<
Output = Result<
datafusion::prelude::DataFrame,
datafusion::common::DataFusionError,
>,
> + ::core::marker::Send
dyn ::core::future::Future<Output = Result<DataFrame, DataFusionError>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
Expand All @@ -102,8 +153,14 @@ impl CliSessionContext for OptdCliSessionContext {
}
_ => (),
}
} else if let datafusion::logical_expr::LogicalPlan::Ddl(ddl) = &plan {
match ddl {
datafusion::logical_expr::DdlStatement::CreateExternalTable(create_table) => {
return self.create_external_table(&create_table).await;
}
_ => (),
}
}

self.inner.execute_logical_plan(plan).await
};

Expand Down
32 changes: 29 additions & 3 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};

use optd_catalog::{CatalogService, DuckLakeCatalog};
use optd_cli::OptdCliSessionContext;
use optd_datafusion::OptdCatalogProviderList;

#[derive(Debug, Parser, PartialEq)]
#[clap(author, version, about, long_about= None)]
Expand Down Expand Up @@ -214,11 +216,35 @@ async fn main_inner() -> Result<()> {
let cli_ctx = cli_ctx.enable_url_table();
let ctx = cli_ctx.inner();

// Initialize catalog with optional DuckLake catalog service
let catalog_handle = if let Ok(metadata_path) = env::var("OPTD_METADATA_CATALOG_PATH") {
if !args.quiet {
println!("Using OptD catalog with metadata path: {}", metadata_path);
}
let ducklake_catalog = DuckLakeCatalog::try_new(None, Some(&metadata_path))
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let (service, handle) = CatalogService::new(ducklake_catalog);
tokio::spawn(async move { service.run().await });
Some(handle)
} else {
if !args.quiet {
println!("OptD catalog integration enabled (no persistent metadata)");
}
None
};

// Wrap the catalog list with OptdCatalogProviderList
let original_catalog_list = ctx.state().catalog_list().clone();
let optd_catalog_list =
OptdCatalogProviderList::new(original_catalog_list.clone(), catalog_handle);

// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
// and wrap it with OptD catalog provider
let dynamic_catalog = Arc::new(DynamicObjectStoreCatalog::new(
Arc::new(optd_catalog_list),
ctx.state_weak_ref(),
)));
));
ctx.register_catalog_list(dynamic_catalog);

// register `parquet_metadata` table function to get metadata from parquet files
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
Expand Down
Loading
Loading