diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 27892935a..5e82fd0fe 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc + data/writer.cc delete_file_index.cc expression/aggregate.cc expression/binder.cc @@ -144,6 +145,7 @@ add_iceberg_lib(iceberg iceberg_install_all_headers(iceberg) add_subdirectory(catalog) +add_subdirectory(data) add_subdirectory(expression) add_subdirectory(manifest) add_subdirectory(row) diff --git a/src/iceberg/data/CMakeLists.txt b/src/iceberg/data/CMakeLists.txt new file mode 100644 index 000000000..e50b8b989 --- /dev/null +++ b/src/iceberg/data/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/data) diff --git a/src/iceberg/data/writer.cc b/src/iceberg/data/writer.cc new file mode 100644 index 000000000..4a18c8053 --- /dev/null +++ b/src/iceberg/data/writer.cc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/writer.h" + +namespace iceberg { + +// FileWriter is a pure virtual interface class. +// Implementations will be provided in subsequent tasks. + +} // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h new file mode 100644 index 000000000..9876f8cbf --- /dev/null +++ b/src/iceberg/data/writer.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/writer.h +/// Base interface for Iceberg data file writers. + +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Base interface for data file writers. +/// +/// This interface defines the common operations for writing Iceberg data files, +/// including data files, equality delete files, and position delete files. +/// +/// Typical usage: +/// 1. Create a writer instance (via concrete implementation) +/// 2. Call Write() one or more times to write data +/// 3. Call Close() to finalize the file +/// 4. Call Metadata() to get file metadata (only valid after Close()) +/// +/// \note This interface is not thread-safe. Concurrent calls to Write() +/// from multiple threads on the same instance are not supported. +class ICEBERG_EXPORT FileWriter { + public: + virtual ~FileWriter() = default; + + /// \brief Write a batch of records. + /// + /// \param data Arrow array containing the records to write. + /// \return Status indicating success or failure. + virtual Status Write(ArrowArray* data) = 0; + + /// \brief Get the current number of bytes written. + /// + /// \return Result containing the number of bytes written or an error. + virtual Result Length() const = 0; + + /// \brief Close the writer and finalize the file. + /// + /// \return Status indicating success or failure. + virtual Status Close() = 0; + + /// \brief File metadata for all files produced by the writer. + struct ICEBERG_EXPORT WriteResult { + /// Usually a writer produces a single data or delete file. + /// Position delete writer may produce multiple file-scoped delete files. + /// In the future, multiple files can be produced if file rolling is supported. + std::vector> data_files; + }; + + /// \brief Get file metadata for all files produced by this writer. + /// + /// This method should be called after Close() to retrieve the metadata + /// for all files written by this writer. + /// + /// \return Result containing the write result or an error. + virtual Result Metadata() = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index b7f23c538..8ae7cfd1e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -165,6 +165,8 @@ if(ICEBERG_BUILD_BUNDLE) update_properties_test.cc update_sort_order_test.cc) + add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc) + endif() if(ICEBERG_BUILD_REST) diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc new file mode 100644 index 000000000..df7ea9d89 --- /dev/null +++ b/src/iceberg/test/data_writer_test.cc @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// Mock implementation of FileWriter for testing +class MockFileWriter : public FileWriter { + public: + MockFileWriter() = default; + + Status Write(ArrowArray* data) override { + if (is_closed_) { + return Invalid("Writer is closed"); + } + if (data == nullptr) { + return Invalid("Null data provided"); + } + write_count_++; + // Simulate writing some bytes + bytes_written_ += 1024; + return {}; + } + + Result Length() const override { return bytes_written_; } + + Status Close() override { + if (is_closed_) { + return Invalid("Writer already closed"); + } + is_closed_ = true; + return {}; + } + + Result Metadata() override { + if (!is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + + WriteResult result; + auto data_file = std::make_shared(); + data_file->file_path = "/test/data/file.parquet"; + data_file->file_format = FileFormatType::kParquet; + data_file->record_count = write_count_ * 100; + data_file->file_size_in_bytes = bytes_written_; + result.data_files.push_back(data_file); + + return result; + } + + bool is_closed() const { return is_closed_; } + int32_t write_count() const { return write_count_; } + + private: + int64_t bytes_written_ = 0; + bool is_closed_ = false; + int32_t write_count_ = 0; +}; + +TEST(FileWriterTest, BasicWriteOperation) { + MockFileWriter writer; + + // Create a dummy ArrowArray (normally this would contain actual data) + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_EQ(writer.write_count(), 1); + + auto length_result = writer.Length(); + ASSERT_THAT(length_result, IsOk()); + ASSERT_EQ(*length_result, 1024); +} + +TEST(FileWriterTest, MultipleWrites) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + // Write multiple times + for (int i = 0; i < 5; i++) { + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + } + + ASSERT_EQ(writer.write_count(), 5); + + auto length_result = writer.Length(); + ASSERT_THAT(length_result, IsOk()); + ASSERT_EQ(*length_result, 5120); // 5 * 1024 +} + +TEST(FileWriterTest, WriteNullData) { + MockFileWriter writer; + + auto status = writer.Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Null data provided")); +} + +TEST(FileWriterTest, CloseWriter) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_FALSE(writer.is_closed()); + + ASSERT_THAT(writer.Close(), IsOk()); + ASSERT_TRUE(writer.is_closed()); +} + +TEST(FileWriterTest, DoubleClose) { + MockFileWriter writer; + + ASSERT_THAT(writer.Close(), IsOk()); + auto status = writer.Close(); + ASSERT_THAT(status, HasErrorMessage("Writer already closed")); +} + +TEST(FileWriterTest, WriteAfterClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Close(), IsOk()); + + auto status = writer.Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is closed")); +} + +TEST(FileWriterTest, MetadataBeforeClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + + auto metadata_result = writer.Metadata(); + ASSERT_THAT(metadata_result, + HasErrorMessage("Writer must be closed before getting metadata")); +} + +TEST(FileWriterTest, MetadataAfterClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + // Write some data + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + + // Close the writer + ASSERT_THAT(writer.Close(), IsOk()); + + // Get metadata + auto metadata_result = writer.Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& result = *metadata_result; + ASSERT_EQ(result.data_files.size(), 1); + + const auto& data_file = result.data_files[0]; + ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); + ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); + ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records + ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 +} + +TEST(FileWriterTest, WriteResultStructure) { + FileWriter::WriteResult result; + + // Test that WriteResult can hold multiple data files + auto data_file1 = std::make_shared(); + data_file1->file_path = "/test/data/file1.parquet"; + data_file1->record_count = 100; + + auto data_file2 = std::make_shared(); + data_file2->file_path = "/test/data/file2.parquet"; + data_file2->record_count = 200; + + result.data_files.push_back(data_file1); + result.data_files.push_back(data_file2); + + ASSERT_EQ(result.data_files.size(), 2); + ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet"); + ASSERT_EQ(result.data_files[0]->record_count, 100); + ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet"); + ASSERT_EQ(result.data_files[1]->record_count, 200); +} + +TEST(FileWriterTest, EmptyWriteResult) { + FileWriter::WriteResult result; + ASSERT_EQ(result.data_files.size(), 0); + ASSERT_TRUE(result.data_files.empty()); +} + +} // namespace iceberg