diff --git a/Cargo.lock b/Cargo.lock index 135d7bd946..ea55972f70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -889,6 +889,22 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" +dependencies = [ + "base64 0.22.1", + "http 1.4.0", + "log", + "rustls", + "serde", + "serde_json", + "url", + "webpki-roots 1.0.4", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -904,6 +920,23 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-creds" +version = "0.39.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3b85155d265df828f84e53886ed9e427aed979dd8a39f5b8b2162c77e142d7" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml 0.38.4", + "rust-ini", + "serde", + "thiserror 2.0.17", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.15.2" @@ -926,6 +959,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "838b36c8dc927b6db1b6c6b8f5d05865f2213550b9e83bf92fa99ed6525472c0" +dependencies = [ + "thiserror 2.0.17", +] + [[package]] name = "axum" version = "0.8.8" @@ -1531,6 +1573,15 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.51" @@ -1724,6 +1775,19 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "compio" version = "0.17.0" @@ -4836,6 +4900,25 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_redshift_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "dashmap", + "futures", + "iggy_connector_sdk", + "once_cell", + "rust-s3", + "serde", + "simd-json", + "sqlx", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "iggy_connector_sdk" version = "0.1.1-edge.1" @@ -5674,6 +5757,17 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "md-5" version = "0.10.6" @@ -5684,6 +5778,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.7.6" @@ -5759,6 +5859,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -7765,6 +7874,41 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-s3" +version = "0.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4af74047374528b627109d579ce86b23ccf6ffba7ff363c807126c1aff69e1bb" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.22.1", + "bytes", + "cfg-if", + "futures-util", + "hex", + "hmac", + "http 1.4.0", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "quick-xml 0.38.4", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "sha2", + "sysinfo 0.37.2", + "thiserror 2.0.17", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.39.0" @@ -7910,6 +8054,25 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rxml" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee" +dependencies = [ + "bytes", + "rxml_validation", +] + +[[package]] +name = "rxml_validation" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4" +dependencies = [ + "compact_str", +] + [[package]] name = "ryu" version = "1.0.22" @@ -8823,6 +8986,12 @@ dependencies = [ "toml 0.8.23", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index e5e370d6d2..9bbae3ec93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", + "core/connectors/sinks/redshift_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/postgres_source", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 1468dcbb0d..9096680e8f 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -407,6 +407,7 @@ iggy_connector_postgres_sink: 0.1.0, "Apache-2.0", iggy_connector_postgres_source: 0.1.0, "Apache-2.0", iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0", iggy_connector_random_source: 0.1.0, "Apache-2.0", +iggy_connector_redshift_sink: 0.1.0, "Apache-2.0", iggy_connector_sdk: 0.1.1-edge.1, "Apache-2.0", iggy_connector_stdout_sink: 0.1.0, "Apache-2.0", iggy_examples: 0.0.5, "Apache-2.0", @@ -489,7 +490,9 @@ macro_rules_attribute: 0.1.3, "MIT", macro_rules_attribute-proc_macro: 0.1.3, "MIT", matchers: 0.2.0, "MIT", matchit: 0.8.4, "BSD-3-Clause AND MIT", +maybe-async: 0.2.10, "MIT", md-5: 0.10.6, "Apache-2.0 OR MIT", +md5: 0.8.0, "Apache-2.0 OR MIT", memchr: 2.7.6, "MIT OR Unlicense", message_bus: 0.1.0, "Apache-2.0", metadata: 0.1.0, "Apache-2.0", @@ -499,6 +502,7 @@ mimalloc: 0.1.48, "MIT", mime: 0.3.17, "Apache-2.0 OR MIT", mime_guess: 2.0.5, "MIT", minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", +minidom: 0.16.0, "MPL-2.0", miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib", mio: 1.1.1, "MIT", mockall: 0.14.0, "Apache-2.0 OR MIT", @@ -679,6 +683,7 @@ rust-embed: 8.9.0, "MIT", rust-embed-impl: 8.9.0, "MIT", rust-embed-utils: 8.9.0, "MIT", rust-ini: 0.21.3, "MIT", +rust-s3: 0.37.1, "MIT", rust_decimal: 1.39.0, "MIT", rustc-hash: 2.1.1, "Apache-2.0 OR MIT", rustc_version: 0.4.1, "Apache-2.0 OR MIT", @@ -692,6 +697,8 @@ rustls-platform-verifier: 0.6.2, "Apache-2.0 OR MIT", rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT", rustls-webpki: 0.103.8, "ISC", rustversion: 1.0.22, "Apache-2.0 OR MIT", +rxml: 0.11.1, "MIT", +rxml_validation: 0.11.0, "MIT", ryu: 1.0.22, "Apache-2.0 OR BSL-1.0", same-file: 1.0.6, "MIT OR Unlicense", scc: 2.4.0, "Apache-2.0", @@ -764,6 +771,7 @@ sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT", sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT", static-toml: 1.3.0, "MIT", +static_assertions: 1.1.0, "Apache-2.0 OR MIT", stringprep: 0.1.5, "Apache-2.0 OR MIT", strsim: 0.11.1, "MIT", structmeta: 0.3.0, "Apache-2.0 OR MIT", diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs index 27c98d8900..d1bb565e83 100644 --- a/core/common/src/sender/mod.rs +++ b/core/common/src/sender/mod.rs @@ -37,8 +37,11 @@ use compio::net::TcpStream; use compio_quic::{RecvStream, SendStream}; use compio_tls::TlsStream; use std::future::Future; +#[cfg(unix)] use std::os::fd::{AsFd, OwnedFd}; -use tracing::{debug, error}; +use tracing::debug; +#[cfg(unix)] +use tracing::error; macro_rules! forward_async_methods { ( @@ -117,6 +120,7 @@ impl SenderKind { Self::WebSocketTls(stream) } + #[cfg(unix)] pub fn take_and_migrate_tcp(&mut self) -> Option { match self { SenderKind::Tcp(tcp_sender) => { @@ -137,6 +141,12 @@ impl SenderKind { } } + #[cfg(not(unix))] + pub fn take_and_migrate_tcp(&mut self) -> Option<()> { + // Socket migration is not supported on non-Unix platforms + None + } + forward_async_methods! { async fn read(&mut self, buffer: B) -> (Result<(), IggyError>, B); async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; diff --git a/core/connectors/sinks/redshift_sink/Cargo.toml b/core/connectors/sinks/redshift_sink/Cargo.toml new file mode 100644 index 0000000000..d4b21cbc87 --- /dev/null +++ b/core/connectors/sinks/redshift_sink/Cargo.toml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_redshift_sink" +version = "0.1.0" +description = "Iggy Redshift sink connector for loading stream messages into Amazon Redshift via S3 staging" +edition = "2021" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "redshift", "sink", "aws"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "futures", "simd-json", "prost"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rust-s3 = { workspace = true } +serde = { workspace = true } +simd-json = { workspace = true } +sqlx = { version = "0.8", features = [ + "runtime-tokio-rustls", + "postgres", + "chrono", +] } +tokio = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/core/connectors/sinks/redshift_sink/README.md b/core/connectors/sinks/redshift_sink/README.md new file mode 100644 index 0000000000..cc49d6442a --- /dev/null +++ b/core/connectors/sinks/redshift_sink/README.md @@ -0,0 +1,235 @@ +# Apache Iggy - Redshift Sink Connector + +A sink connector that loads data from Iggy streams into Amazon Redshift using the S3 staging method. This is the recommended approach for high-volume data loading into Redshift. + +## Overview + +The Redshift Sink Connector: + +1. **Buffers** incoming messages into batches +2. **Uploads** batches as CSV files to S3 +3. **Executes** Redshift COPY command to load data from S3 +4. **Cleans up** staged S3 files after successful load + +This approach leverages Redshift's massively parallel processing (MPP) architecture for efficient bulk loading. + +## Prerequisites + +- Amazon Redshift cluster with network access +- S3 bucket for staging files +- AWS credentials with appropriate permissions: + - S3: `s3:PutObject`, `s3:GetObject`, `s3:DeleteObject` on the staging bucket + - Redshift: `COPY` permission on the target table + +## Configuration + +Create a connector configuration file (e.g., `redshift.toml`): + +```toml +type = "sink" +key = "redshift" +enabled = true +version = 0 +name = "Redshift Sink" +path = "target/release/libiggy_connector_redshift_sink" +plugin_config_format = "toml" + +[[streams]] +stream = "events" +topics = ["user_actions"] +schema = "json" +batch_length = 10000 +poll_interval = "100ms" +consumer_group = "redshift_sink" + +[plugin_config] +# Redshift connection (PostgreSQL wire protocol) +connection_string = "postgres://admin:password@my-cluster.region.redshift.amazonaws.com:5439/mydb" +target_table = "public.events" + +# S3 staging configuration +s3_bucket = "my-staging-bucket" +s3_prefix = "redshift/staging/" +s3_region = "us-east-1" + +# AWS authentication - use either IAM role (preferred) or access keys +iam_role = "arn:aws:iam::123456789012:role/RedshiftS3Access" + +# Or use access keys instead of IAM role: +# aws_access_key_id = "AKIAIOSFODNN7EXAMPLE" +# aws_secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + +# Batching settings +batch_size = 10000 +flush_interval_ms = 30000 + +# CSV format options +csv_delimiter = "," +csv_quote = "\"" + +# COPY command options +max_errors = 10 +# compression = "gzip" + +# Cleanup and reliability +delete_staged_files = true +max_retries = 3 +retry_delay_ms = 1000 + +# Database settings +max_connections = 5 +auto_create_table = false + +# Metadata columns (adds iggy_offset, iggy_timestamp, etc.) +include_metadata = false +``` + +## Configuration Reference + +| Property | Type | Required | Default | Description | +| -------- | ---- | -------- | ------- | ----------- | +| `connection_string` | String | Yes | - | Redshift connection string in PostgreSQL format | +| `target_table` | String | Yes | - | Target table name (can include schema) | +| `s3_bucket` | String | Yes | - | S3 bucket for staging CSV files | +| `s3_region` | String | Yes | - | AWS region for the S3 bucket | +| `s3_prefix` | String | No | `""` | S3 key prefix for staged files | +| `iam_role` | String | No* | - | IAM role ARN for Redshift to access S3 | +| `aws_access_key_id` | String | No* | - | AWS access key ID | +| `aws_secret_access_key` | String | No* | - | AWS secret access key | +| `batch_size` | Integer | No | `10000` | Messages per batch before S3 upload | +| `flush_interval_ms` | Integer | No | `30000` | Max wait time before flushing partial batch | +| `csv_delimiter` | Char | No | `,` | CSV field delimiter | +| `csv_quote` | Char | No | `"` | CSV quote character | +| `max_errors` | Integer | No | `0` | Max errors before COPY fails | +| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2` | +| `delete_staged_files` | Boolean | No | `true` | Delete S3 files after successful COPY | +| `max_connections` | Integer | No | `5` | Max Redshift connections | +| `max_retries` | Integer | No | `3` | Max retry attempts for failures | +| `retry_delay_ms` | Integer | No | `1000` | Initial retry delay (exponential backoff) | +| `include_metadata` | Boolean | No | `false` | Include Iggy metadata columns | +| `auto_create_table` | Boolean | No | `false` | Auto-create table if not exists | + +*Either `iam_role` or both `aws_access_key_id` and `aws_secret_access_key` must be provided. + +## Table Schema + +When `auto_create_table` is enabled, the connector creates a table with this schema: + +```sql +CREATE TABLE IF NOT EXISTS ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX), + -- When include_metadata = true: + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER, + -- + created_at TIMESTAMP DEFAULT GETDATE() +); +``` + +For production use, pre-create your table with appropriate column types, sort keys, and distribution style. + +## IAM Role Setup + +For IAM role authentication (recommended), create a role with this trust policy: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "redshift.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +And attach a policy with S3 access: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:ListBucket" + ], + "Resource": [ + "arn:aws:s3:::my-staging-bucket", + "arn:aws:s3:::my-staging-bucket/*" + ] + } + ] +} +``` + +Then associate the role with your Redshift cluster. + +## Performance Tuning + +### Batch Size + +- **Small batches** (1,000-5,000): Lower latency, more COPY operations +- **Large batches** (50,000-100,000): Higher throughput, more memory usage +- Recommended starting point: `10,000` + +### Compression + +Enable compression for large payloads to reduce S3 transfer time: + +```toml +compression = "gzip" +``` + +### Parallelism + +Increase `batch_length` in stream config to process more messages per poll: + +```toml +[[streams]] +batch_length = 50000 +``` + +## Error Handling + +The connector implements retry logic with exponential backoff for transient failures: + +- **S3 upload failures**: Retried up to `max_retries` times +- **COPY command failures**: Retried with backoff, failed rows logged +- **Cleanup failures**: Logged as warnings, do not block processing + +Use `max_errors` to control COPY behavior: + +- `0`: Fail on first error (strict mode) +- `N`: Allow up to N errors per COPY operation + +## Monitoring + +The connector logs statistics on close: + +```text +Closing Redshift sink connector ID: 1. Stats: 150000 messages processed, 15 batches loaded, 0 errors +``` + +Monitor these metrics to track connector health. + +## Limitations + +- Payload must be convertible to string (JSON, text, or raw bytes) +- Table must exist unless `auto_create_table` is enabled +- Currently supports CSV format only (Parquet planned) + +## License + +Licensed under the Apache License, Version 2.0. diff --git a/core/connectors/sinks/redshift_sink/src/config.rs b/core/connectors/sinks/redshift_sink/src/config.rs new file mode 100644 index 0000000000..5a6c8adfcc --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/config.rs @@ -0,0 +1,243 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::Error; +use serde::{Deserialize, Serialize}; + +/// Configuration for the Redshift Sink Connector. +/// +/// This connector loads data from Iggy streams into Amazon Redshift using S3 staging, +/// which is the recommended approach for high-volume data loading. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RedshiftSinkConfig { + /// Redshift connection string in PostgreSQL format. + /// Example: `postgres://user:password@cluster.region.redshift.amazonaws.com:5439/database` + pub connection_string: String, + + /// Target table name in Redshift. Can include schema prefix. + /// Example: `public.events` or `analytics.user_actions` + pub target_table: String, + + /// IAM role ARN for Redshift to access S3. Preferred over access keys. + /// Example: `arn:aws:iam::123456789012:role/RedshiftS3Access` + pub iam_role: Option, + + /// S3 bucket name for staging CSV files before COPY. + pub s3_bucket: String, + + /// S3 key prefix for staged files. Defaults to empty string. + /// Example: `staging/redshift/` + pub s3_prefix: Option, + + /// AWS region for the S3 bucket. + /// Example: `us-east-1` + pub s3_region: String, + + /// Custom S3 endpoint URL for testing with LocalStack or MinIO. + /// If not specified, uses the default AWS S3 endpoint. + /// Example: `http://localhost:4566` + pub s3_endpoint: Option, + + /// AWS access key ID. Required if IAM role is not specified. + pub aws_access_key_id: Option, + + /// AWS secret access key. Required if IAM role is not specified. + pub aws_secret_access_key: Option, + + /// Number of messages to batch before uploading to S3 and executing COPY. + /// Defaults to 10000. + pub batch_size: Option, + + /// Maximum time in milliseconds to wait before flushing a partial batch. + /// Defaults to 30000 (30 seconds). + pub flush_interval_ms: Option, + + /// CSV field delimiter character. Defaults to `,`. + pub csv_delimiter: Option, + + /// CSV quote character for escaping. Defaults to `"`. + pub csv_quote: Option, + + /// Number of header rows to skip. Defaults to 0. + pub ignore_header: Option, + + /// Maximum number of errors allowed before COPY fails. Defaults to 0. + pub max_errors: Option, + + /// Compression format for staged files: `gzip`, `lzop`, `bzip2`, or `none`. + pub compression: Option, + + /// Whether to delete staged S3 files after successful COPY. Defaults to true. + pub delete_staged_files: Option, + + /// Maximum number of database connections. Defaults to 5. + pub max_connections: Option, + + /// Maximum number of retry attempts for transient failures. Defaults to 3. + pub max_retries: Option, + + /// Initial delay in milliseconds between retries. Uses exponential backoff. + /// Defaults to 1000. + pub retry_delay_ms: Option, + + /// Whether to include Iggy metadata columns (offset, timestamp, stream, topic, partition). + /// Defaults to false. + pub include_metadata: Option, + + /// Whether to auto-create the target table if it doesn't exist. Defaults to false. + pub auto_create_table: Option, +} + +impl RedshiftSinkConfig { + /// Validates the configuration and returns an error if invalid. + pub fn validate(&self) -> Result<(), Error> { + if self.connection_string.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.target_table.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_bucket.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_region.is_empty() { + return Err(Error::InvalidConfig); + } + + // Validate AWS credentials: either IAM role or access keys must be provided + let has_iam_role = self.iam_role.as_ref().is_some_and(|r| !r.is_empty()); + let has_access_key = self + .aws_access_key_id + .as_ref() + .is_some_and(|k| !k.is_empty()); + let has_secret_key = self + .aws_secret_access_key + .as_ref() + .is_some_and(|s| !s.is_empty()); + + if !(has_iam_role || (has_access_key && has_secret_key)) { + return Err(Error::InvalidConfig); + } + + // If using access keys, both must be provided + if (has_access_key && !has_secret_key) || (!has_access_key && has_secret_key) { + return Err(Error::InvalidConfig); + } + + // Validate compression if specified + if let Some(compression) = &self.compression { + let valid = ["gzip", "lzop", "bzip2", "none"]; + if !valid.contains(&compression.to_lowercase().as_str()) { + return Err(Error::InvalidConfig); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn valid_config() -> RedshiftSinkConfig { + RedshiftSinkConfig { + connection_string: "postgres://user:pass@host:5439/db".to_string(), + target_table: "test_table".to_string(), + iam_role: Some("arn:aws:iam::123:role/Test".to_string()), + s3_bucket: "bucket".to_string(), + s3_prefix: None, + s3_region: "us-east-1".to_string(), + s3_endpoint: None, + aws_access_key_id: None, + aws_secret_access_key: None, + batch_size: None, + flush_interval_ms: None, + csv_delimiter: None, + csv_quote: None, + ignore_header: None, + max_errors: None, + compression: None, + delete_staged_files: None, + max_connections: None, + max_retries: None, + retry_delay_ms: None, + include_metadata: None, + auto_create_table: None, + } + } + + #[test] + fn test_valid_config_with_iam_role() { + let config = valid_config(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_valid_config_with_access_keys() { + let mut config = valid_config(); + config.iam_role = None; + config.aws_access_key_id = Some("AKIAIOSFODNN7EXAMPLE".to_string()); + config.aws_secret_access_key = Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_invalid_empty_connection_string() { + let mut config = valid_config(); + config.connection_string = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_empty_table() { + let mut config = valid_config(); + config.target_table = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_empty_bucket() { + let mut config = valid_config(); + config.s3_bucket = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_compression() { + let mut config = valid_config(); + config.compression = Some("invalid".to_string()); + assert!(config.validate().is_err()); + } + + #[test] + fn test_valid_compression_options() { + for comp in ["gzip", "GZIP", "lzop", "bzip2", "none"] { + let mut config = valid_config(); + config.compression = Some(comp.to_string()); + assert!( + config.validate().is_ok(), + "compression '{}' should be valid", + comp + ); + } + } +} diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs new file mode 100644 index 0000000000..9c87ae47a7 --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -0,0 +1,497 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + sink_connector, ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, +}; +use s3::S3Uploader; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option>, + s3_uploader: Option, + state: Mutex, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured - always attempt cleanup regardless of COPY result + if self.config.delete_staged_files.unwrap_or(true) { + if let Err(e) = s3_uploader.delete_file(&s3_key).await { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + } + + // Return COPY result after cleanup + copy_result?; + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + state.batches_loaded += 1; + + info!( + "Redshift sink ID: {} loaded {} messages to table '{}' (total: {}, batches: {})", + self.id, + messages.len(), + self.config.target_table, + state.messages_processed, + state.batches_loaded + ); + + Ok(()) + } + + fn messages_to_csv( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result, Error> { + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut csv_output = Vec::new(); + // Pre-allocate the escaped quote string for performance + let escaped_quote = format!("{quote}{quote}"); + + for message in messages { + let payload_str = match &message.payload { + Payload::Json(value) => simd_json::to_string(value).unwrap_or_else(|e| { + warn!( + "Failed to serialize JSON payload for message {}: {}", + message.id, e + ); + String::new() + }), + Payload::Text(text) => text.clone(), + Payload::Raw(bytes) => String::from_utf8_lossy(bytes).to_string(), + _ => { + let bytes = message.payload.clone().try_into_vec().map_err(|e| { + error!("Failed to convert payload: {}", e); + Error::InvalidRecord + })?; + String::from_utf8_lossy(&bytes).to_string() + } + }; + + // Escape quotes in payload + let escaped_payload = payload_str.replace(quote, &escaped_quote); + + let mut row = format!( + "{}{delim}{quote}{payload}{quote}", + message.id, + delim = delimiter, + payload = escaped_payload + ); + + if include_metadata { + // `message.timestamp` is in microseconds. Preserve microsecond precision + // by converting to seconds and nanoseconds for `from_timestamp`. + let timestamp_micros = message.timestamp; + let timestamp_secs = (timestamp_micros / 1_000_000) as i64; + let timestamp_nanos = ((timestamp_micros % 1_000_000) as u32) * 1_000; + let timestamp = chrono::DateTime::from_timestamp(timestamp_secs, timestamp_nanos) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S%.6f").to_string()) + .unwrap_or_default(); + + row.push_str(&format!( + "{delim}{offset}{delim}{ts}{delim}{quote}{stream}{quote}{delim}{quote}{topic}{quote}{delim}{partition}", + delim = delimiter, + offset = message.offset, + ts = timestamp, + stream = topic_metadata.stream, + topic = topic_metadata.topic, + partition = messages_metadata.partition_id + )); + } + + row.push('\n'); + csv_output.extend_from_slice(row.as_bytes()); + } + + Ok(csv_output) + } + + async fn execute_copy(&self, pool: &Pool, s3_path: &str) -> Result<(), Error> { + let table = &self.config.target_table; + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let max_errors = self.config.max_errors.unwrap_or(0); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let columns = if include_metadata { + "(id, payload, iggy_offset, iggy_timestamp, iggy_stream, iggy_topic, iggy_partition_id)" + } else { + "(id, payload)" + }; + + let credentials = if let Some(iam_role) = &self.config.iam_role { + format!("IAM_ROLE '{}'", iam_role) + } else if let (Some(key_id), Some(secret)) = ( + &self.config.aws_access_key_id, + &self.config.aws_secret_access_key, + ) { + format!("ACCESS_KEY_ID '{}' SECRET_ACCESS_KEY '{}'", key_id, secret) + } else { + return Err(Error::InitError( + "Either IAM role or AWS credentials must be provided".to_string(), + )); + }; + + let compression = self + .config + .compression + .as_deref() + .map(|c| format!("{} ", c.to_uppercase())) + .unwrap_or_default(); + + let copy_sql = format!( + "COPY {table} {columns} + FROM '{s3_path}' + {credentials} + {compression}FORMAT AS CSV + DELIMITER '{delimiter}' + QUOTE '{quote}' + MAXERROR {max_errors} + REGION '{region}'", + region = self.config.s3_region + ); + + let max_retries = self.config.max_retries.unwrap_or(3); + let retry_delay = self.config.retry_delay_ms.unwrap_or(1000); + + for attempt in 0..=max_retries { + match sqlx::query(©_sql).execute(pool).await { + Ok(_) => return Ok(()), + Err(e) if attempt < max_retries => { + let delay = retry_delay * 2u64.pow(attempt); + warn!( + "COPY command failed (attempt {}/{}): {}. Retrying in {}ms...", + attempt + 1, + max_retries + 1, + e, + delay + ); + tokio::time::sleep(Duration::from_millis(delay)).await; + } + Err(e) => { + error!( + "COPY command failed after {} attempts: {}", + max_retries + 1, + e + ); + let mut state = self.state.lock().await; + state.load_errors += 1; + return Err(Error::Storage(format!("COPY command failed: {e}"))); + } + } + } + + Ok(()) + } +} + +#[async_trait] +impl Sink for RedshiftSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Redshift sink connector ID: {}. Target: {}, S3 bucket: {}", + self.id, self.config.target_table, self.config.s3_bucket + ); + + self.config.validate()?; + self.init_s3_uploader()?; + self.connect_redshift().await?; + self.ensure_table_exists().await?; + + info!( + "Redshift sink connector ID: {} initialized successfully", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let batch_size = self.config.batch_size.unwrap_or(10000) as usize; + + for chunk in messages.chunks(batch_size) { + if let Err(e) = self + .process_batch(topic_metadata, &messages_metadata, chunk) + .await + { + error!( + "Failed to process batch for table '{}': {}", + self.config.target_table, e + ); + return Err(e); + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "Closing Redshift sink connector ID: {}. Stats: {} messages processed, {} batches loaded, {} errors", + self.id, state.messages_processed, state.batches_loaded, state.load_errors + ); + + if let Some(pool) = self.pool.take() { + pool.close().await; + } + + info!("Redshift sink connector ID: {} closed", self.id); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> RedshiftSinkConfig { + RedshiftSinkConfig { + connection_string: "postgres://user:pass@localhost:5439/dev".to_string(), + target_table: "test_table".to_string(), + iam_role: Some("arn:aws:iam::123456789:role/RedshiftS3Access".to_string()), + s3_bucket: "test-bucket".to_string(), + s3_prefix: Some("staging/".to_string()), + s3_region: "us-east-1".to_string(), + s3_endpoint: None, + aws_access_key_id: None, + aws_secret_access_key: None, + batch_size: Some(1000), + flush_interval_ms: None, + csv_delimiter: Some(','), + csv_quote: Some('"'), + ignore_header: None, + max_errors: Some(10), + compression: None, + delete_staged_files: Some(true), + max_connections: Some(5), + max_retries: Some(3), + retry_delay_ms: Some(1000), + include_metadata: Some(false), + auto_create_table: Some(false), + } + } + + #[test] + fn test_config_validation_valid() { + let config = test_config(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_config_validation_missing_credentials() { + let mut config = test_config(); + config.iam_role = None; + config.aws_access_key_id = None; + config.aws_secret_access_key = None; + assert!(config.validate().is_err()); + } + + #[test] + fn test_config_validation_partial_credentials() { + let mut config = test_config(); + config.iam_role = None; + config.aws_access_key_id = Some("AKIAIOSFODNN7EXAMPLE".to_string()); + config.aws_secret_access_key = None; + assert!(config.validate().is_err()); + } + + #[test] + fn test_sink_creation() { + let config = test_config(); + let sink = RedshiftSink::new(1, config); + assert_eq!(sink.id, 1); + assert!(sink.pool.is_none()); + assert!(sink.s3_uploader.is_none()); + } +} diff --git a/core/connectors/sinks/redshift_sink/src/s3.rs b/core/connectors/sinks/redshift_sink/src/s3.rs new file mode 100644 index 0000000000..ed7aa82a6e --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/s3.rs @@ -0,0 +1,200 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::Error; +use s3::bucket::Bucket; +use s3::creds::Credentials; +use s3::region::Region; +use tracing::{error, info}; +use uuid::Uuid; + +/// S3 uploader for staging CSV files before Redshift COPY. +#[derive(Debug)] +pub struct S3Uploader { + bucket: Box, + prefix: String, +} + +impl S3Uploader { + /// Creates a new S3 uploader with the specified configuration. + pub fn new( + bucket_name: &str, + prefix: &str, + region: &str, + access_key_id: Option<&str>, + secret_access_key: Option<&str>, + endpoint: Option<&str>, + ) -> Result { + let region = match endpoint { + Some(ep) => Region::Custom { + region: region.to_string(), + endpoint: ep.to_string(), + }, + None => Region::Custom { + region: region.to_string(), + endpoint: format!("https://s3.{}.amazonaws.com", region), + }, + }; + + let credentials = match (access_key_id, secret_access_key) { + (Some(key_id), Some(secret)) => { + Credentials::new(Some(key_id), Some(secret), None, None, None).map_err(|e| { + error!("Failed to create S3 credentials: {}", e); + Error::InitError(format!("Invalid AWS credentials: {e}")) + })? + } + _ => { + // Use default credential chain (environment variables, instance profile, etc.) + Credentials::default().map_err(|e| { + error!("Failed to load default S3 credentials: {}", e); + Error::InitError(format!("Failed to load AWS credentials: {e}")) + })? + } + }; + + let mut bucket = Bucket::new(bucket_name, region, credentials).map_err(|e| { + error!("Failed to create S3 bucket client: {}", e); + Error::InitError(format!("Failed to initialize S3 bucket: {e}")) + })?; + + // Use path-style access for custom endpoints (LocalStack, MinIO, etc.) + if endpoint.is_some() { + bucket = bucket.with_path_style(); + } + + let prefix = prefix.trim_end_matches('/').to_string(); + + Ok(S3Uploader { bucket, prefix }) + } + + /// Uploads CSV data to S3 and returns the S3 key. + pub async fn upload_csv(&self, data: &[u8]) -> Result { + let file_id = Uuid::new_v4(); + let key = if self.prefix.is_empty() { + format!("{}.csv", file_id) + } else { + format!("{}/{}.csv", self.prefix, file_id) + }; + + let response = self.bucket.put_object(&key, data).await.map_err(|e| { + error!("Failed to upload to S3 key '{}': {}", key, e); + Error::Storage(format!("S3 upload failed: {e}")) + })?; + + if response.status_code() != 200 { + error!( + "S3 upload returned status {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ); + return Err(Error::Storage(format!( + "S3 upload failed with status {}", + response.status_code() + ))); + } + + info!( + "Uploaded {} bytes to s3://{}/{}", + data.len(), + self.bucket.name(), + key + ); + Ok(key) + } + + /// Deletes a file from S3 by key. + pub async fn delete_file(&self, key: &str) -> Result<(), Error> { + let response = self.bucket.delete_object(key).await.map_err(|e| { + error!("Failed to delete S3 object '{}': {}", key, e); + Error::Storage(format!("S3 delete failed: {e}")) + })?; + + if response.status_code() != 204 && response.status_code() != 200 { + error!( + "S3 delete returned unexpected status {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ); + return Err(Error::Storage(format!( + "S3 delete failed with status {}", + response.status_code() + ))); + } + + info!("Deleted s3://{}/{}", self.bucket.name(), key); + Ok(()) + } + + /// Checks if the bucket is accessible by performing a HEAD request. + #[allow(dead_code)] + pub async fn check_connectivity(&self) -> Result<(), Error> { + self.bucket.head_object("/").await.map_err(|e| { + error!("S3 connectivity check failed: {}", e); + Error::Connection(format!("Cannot access S3 bucket: {e}")) + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_s3_uploader_creation_with_credentials() { + let result = S3Uploader::new( + "test-bucket", + "prefix/", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, + ); + assert!(result.is_ok()); + } + + #[test] + fn test_prefix_normalization() { + let uploader = S3Uploader::new( + "test-bucket", + "staging/redshift/", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, + ) + .unwrap(); + + assert_eq!(uploader.prefix, "staging/redshift"); + } + + #[test] + fn test_empty_prefix() { + let uploader = S3Uploader::new( + "test-bucket", + "", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, + ) + .unwrap(); + + assert_eq!(uploader.prefix, ""); + } +} diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index e30ef758c7..0fd6a9f88e 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -62,7 +62,7 @@ strum = { workspace = true } strum_macros = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } -testcontainers-modules = { version = "0.14.0", features = ["postgres"] } +testcontainers-modules = { version = "0.14.0", features = ["postgres", "localstack"] } tokio = { workspace = true } twox-hash = { workspace = true } uuid = { workspace = true } diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index 146de4d4bf..38bfc58ebf 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -30,6 +30,7 @@ use std::collections::HashMap; mod http_config_provider; mod postgres; mod random; +mod redshift; const DEFAULT_TEST_STREAM: &str = "test_stream"; const DEFAULT_TEST_TOPIC: &str = "test_topic"; diff --git a/core/integration/tests/connectors/redshift/config.toml b/core/integration/tests/connectors/redshift/config.toml new file mode 100644 index 0000000000..b24c908294 --- /dev/null +++ b/core/integration/tests/connectors/redshift/config.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/redshift/connectors_config" diff --git a/core/integration/tests/connectors/redshift/connectors_config/redshift.toml b/core/integration/tests/connectors/redshift/connectors_config/redshift.toml new file mode 100644 index 0000000000..57f3e4f98e --- /dev/null +++ b/core/integration/tests/connectors/redshift/connectors_config/redshift.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "redshift" +enabled = true +version = 0 +name = "Redshift sink" +path = "../../target/debug/libiggy_connector_redshift_sink" + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "test" + +[plugin_config] +connection_string = "" +target_table = "iggy_messages" +s3_bucket = "iggy-redshift-staging" +s3_region = "us-east-1" +s3_prefix = "staging/" +s3_endpoint = "" +batch_size = 100 +max_connections = 5 +auto_create_table = true +include_metadata = true +delete_staged_files = true diff --git a/core/integration/tests/connectors/redshift/mod.rs b/core/integration/tests/connectors/redshift/mod.rs new file mode 100644 index 0000000000..c318e0a424 --- /dev/null +++ b/core/integration/tests/connectors/redshift/mod.rs @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime}; +use std::collections::HashMap; +use testcontainers_modules::{ + localstack::LocalStack, + postgres, + testcontainers::{ContainerAsync, runners::AsyncRunner}, +}; + +mod redshift_sink; + +/// Holds the test containers to keep them alive during tests. +struct RedshiftTestContainers { + _postgres: ContainerAsync, + _localstack: ContainerAsync, +} + +/// Setup result containing both runtime and containers. +#[allow(dead_code)] +struct RedshiftTestSetup { + runtime: ConnectorsRuntime, + _containers: RedshiftTestContainers, +} + +async fn setup() -> RedshiftTestSetup { + // Start PostgreSQL container (simulating Redshift as they share the same wire protocol) + let postgres_container = postgres::Postgres::default() + .start() + .await + .expect("Failed to start Postgres (Redshift simulator)"); + let postgres_port = postgres_container + .get_host_port_ipv4(5432) + .await + .expect("Failed to get Postgres port"); + + // Start LocalStack for S3 + let localstack_container = LocalStack::default() + .start() + .await + .expect("Failed to start LocalStack"); + let localstack_port = localstack_container + .get_host_port_ipv4(4566) + .await + .expect("Failed to get LocalStack port"); + + // Create S3 bucket using LocalStack S3 API + let s3_endpoint = format!("http://localhost:{localstack_port}"); + let bucket_name = "iggy-redshift-staging"; + + // Create the bucket via LocalStack S3 API using path-style URL + let client = reqwest::Client::new(); + let create_bucket_url = format!("{s3_endpoint}/{bucket_name}"); + client + .put(&create_bucket_url) + .send() + .await + .expect("Failed to create S3 bucket in LocalStack"); + + let mut envs = HashMap::new(); + let iggy_setup = IggySetup::default(); + + // Redshift connection (using PostgreSQL as simulator) + let connection_string = format!("postgres://postgres:postgres@localhost:{postgres_port}"); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_CONNECTION_STRING".to_owned(), + connection_string, + ); + + // S3 configuration for staging + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_BUCKET".to_owned(), + bucket_name.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_REGION".to_owned(), + "us-east-1".to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_ENDPOINT".to_owned(), + s3_endpoint, + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_ACCESS_KEY_ID".to_owned(), + "test".to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_SECRET_ACCESS_KEY".to_owned(), + "test".to_owned(), + ); + + // Stream configuration + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_STREAM".to_owned(), + iggy_setup.stream.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_TOPICS_0".to_owned(), + iggy_setup.topic.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_SCHEMA".to_owned(), + "json".to_owned(), + ); + + let mut runtime = setup_runtime(); + runtime + .init("redshift/config.toml", Some(envs), iggy_setup) + .await; + + RedshiftTestSetup { + runtime, + _containers: RedshiftTestContainers { + _postgres: postgres_container, + _localstack: localstack_container, + }, + } +} diff --git a/core/integration/tests/connectors/redshift/redshift_sink.rs b/core/integration/tests/connectors/redshift/redshift_sink.rs new file mode 100644 index 0000000000..05174974a5 --- /dev/null +++ b/core/integration/tests/connectors/redshift/redshift_sink.rs @@ -0,0 +1,24 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::redshift::setup; + +#[tokio::test] +async fn given_valid_configuration_redshift_sink_connector_should_start() { + let _setup = setup().await; +}