Skip to content

Conversation

@GaneshPatil7517
Copy link

Summary

Implements Issue #2540 - Redshift Sink Connector with S3 staging support.

Features

  • ✅ S3 staging with automatic CSV file upload
  • ✅ Redshift COPY command execution via PostgreSQL wire protocol
  • ✅ IAM role authentication (recommended) or access key credentials
  • ✅ Configurable batch size and compression (gzip, lzop, bzip2, zstd)
  • ✅ Automatic table creation with customizable schema
  • ✅ Retry logic with exponential backoff for transient failures
  • ✅ Automatic cleanup of staged S3 files

Configuration Options

Option Description Default
connection_string Redshift cluster connection URL Required
target_table Destination table name Required
iam_role IAM role ARN for S3 access Optional*
s3_bucket S3 bucket for staging Required
s3_region AWS region Required
batch_size Messages per batch 10000
compression COPY compression format None
delete_staged_files Auto-cleanup toggle true
auto_create_table Create table if missing true

*Either iam_role or aws_access_key_id+aws_secret_access_key required

Testing

  • 14 unit tests passing
  • Clippy clean with -D warnings

Closes #2540

Implements Issue apache#2540 - Redshift Sink Connector with S3 staging support.

Features:
- S3 staging with automatic CSV file upload
- Redshift COPY command execution via PostgreSQL wire protocol
- IAM role authentication (recommended) or access key credentials
- Configurable batch size and compression (gzip, lzop, bzip2, zstd)
- Automatic table creation with customizable schema
- Retry logic with exponential backoff for transient failures
- Automatic cleanup of staged S3 files

Configuration options:
- connection_string: Redshift cluster connection URL
- target_table: Destination table name
- iam_role: IAM role ARN for S3 access (recommended)
- s3_bucket/s3_region/s3_prefix: S3 staging location
- batch_size: Messages per batch (default: 10000)
- compression: COPY compression format
- delete_staged_files: Auto-cleanup toggle (default: true)
- auto_create_table: Create table if missing (default: true)

Closes apache#2540
- Fix markdown lint issues in README.md (table formatting, blank lines, code fence language)
- Fix trailing newline in Cargo.toml
- Apply TOML formatting via taplo
- Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions)
@hubcio
Copy link
Contributor

hubcio commented Jan 14, 2026

Please write true integration e2e test, similarly to postgres.

@GaneshPatil7517
Copy link
Author

Please write true integration e2e test, similarly to postgres.

ok ill update....

- Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3
- Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO)
- Add path-style S3 access for custom endpoints
- Add localstack feature to testcontainers-modules
- Create test configuration files for Redshift connector
- Add s3_endpoint: None to test_config() in lib.rs (fixes E0063)
- Add endpoint parameter to S3Uploader tests in s3.rs
- Fix formatting for long line in init_s3_uploader()
- Add iggy_connector_redshift_sink to DEPENDENCIES.md
- Add maybe-async, md5, minidom to DEPENDENCIES.md
Copilot AI review requested due to automatic review settings January 15, 2026 06:59
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a Redshift Sink Connector that loads data from Iggy streams into Amazon Redshift using S3 staging. The connector uploads batches of messages as CSV files to S3, executes Redshift COPY commands to load the data, and optionally cleans up staged files.

Changes:

  • Adds new Redshift sink connector with S3 staging support, IAM/access key authentication, and configurable batching
  • Implements retry logic with exponential backoff and automatic table creation
  • Adds integration tests using PostgreSQL as a Redshift simulator and LocalStack for S3

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
core/connectors/sinks/redshift_sink/src/lib.rs Main connector implementation with batch processing, CSV generation, and COPY command execution
core/connectors/sinks/redshift_sink/src/config.rs Configuration structure with validation for credentials, compression formats, and required fields
core/connectors/sinks/redshift_sink/src/s3.rs S3 uploader for staging CSV files with path-style and virtual-hosted-style support
core/connectors/sinks/redshift_sink/Cargo.toml Package definition with dependencies on sqlx, rust-s3, and chrono
core/connectors/sinks/redshift_sink/README.md Comprehensive documentation with configuration reference and examples
core/integration/tests/connectors/redshift/mod.rs Integration test setup with PostgreSQL and LocalStack containers
core/integration/tests/connectors/redshift/redshift_sink.rs Basic integration test case
Cargo.toml Adds redshift_sink to workspace members
DEPENDENCIES.md Updates dependency list with new packages

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 198 to 204
if self.config.delete_staged_files.unwrap_or(true)
&& let Err(e) = s3_uploader.delete_file(&s3_key).await
{
warn!("Failed to delete staged file {}: {}", s3_key, e);
}

copy_result?;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup logic has a critical flaw. If execute_copy fails, the function returns early at line 204 (copy_result?) without attempting to delete the staged S3 file. This will leave orphaned files in S3 when COPY operations fail. The cleanup should occur regardless of the COPY command result, or at minimum should be attempted even on failure to avoid accumulating staging files.

Copilot uses AI. Check for mistakes.

// Validate compression if specified
if let Some(compression) = &self.compression {
let valid = ["gzip", "lzop", "bzip2", "none", "zstd"];
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redshift COPY command does not support 'zstd' compression format. According to AWS documentation, Redshift COPY only supports GZIP, LZOP, BZIP2, and ZSTD is not listed as a valid compression option. This will cause runtime errors when users specify 'zstd' compression. Remove 'zstd' from the valid compression formats list.

Suggested change
let valid = ["gzip", "lzop", "bzip2", "none", "zstd"];
let valid = ["gzip", "lzop", "bzip2", "none"];

Copilot uses AI. Check for mistakes.
| `csv_delimiter` | Char | No | `,` | CSV field delimiter |
| `csv_quote` | Char | No | `"` | CSV quote character |
| `max_errors` | Integer | No | `0` | Max errors before COPY fails |
| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2`, `zstd` |
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation lists 'zstd' as a supported compression format, but Redshift COPY does not support ZSTD compression. This is inconsistent with AWS Redshift documentation and will mislead users. Remove 'zstd' from the supported compression formats list.

Suggested change
| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2`, `zstd` |
| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2` |

Copilot uses AI. Check for mistakes.
// Create the bucket via LocalStack S3 API using path-style URL
let client = reqwest::Client::new();
let create_bucket_url = format!("{s3_endpoint}/{bucket_name}");
let _ = client.put(&create_bucket_url).send().await;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bucket creation result is silently ignored with let _ =. If bucket creation fails, subsequent tests will fail in confusing ways. The error should be handled explicitly with .expect() to provide clear test failure messages, similar to how other container setup failures are handled in this function.

Suggested change
let _ = client.put(&create_bucket_url).send().await;
client
.put(&create_bucket_url)
.send()
.await
.expect("Failed to create S3 bucket in LocalStack");

Copilot uses AI. Check for mistakes.
};

// Escape quotes in payload
let escaped_payload = payload_str.replace(quote, &format!("{quote}{quote}"));
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new String via format! macro for every message is inefficient. Since quote is always a single character, this can be optimized to use a string literal or a pre-allocated string. Consider using a match statement or caching the escaped quote string.

Copilot uses AI. Check for mistakes.
Critical fixes:
- Change Rust edition from 2024 to 2021 in Cargo.toml
- Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files)

Moderate fixes:
- Remove zstd from valid compression options (not supported by Redshift)
- Update README to remove zstd from compression list
- Handle bucket creation error in integration tests with expect()
- Log JSON serialization errors instead of silent unwrap_or_default()

Performance:
- Cache escaped quote string to avoid repeated format! allocations

Windows compatibility (for local testing):
- Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs
@spetz
Copy link
Contributor

spetz commented Jan 15, 2026

I'd greatly appreciate the actual human interaction in this PR, otherwise, I'm closing this.

Fixes clippy warning about unused 'runtime' field in test setup struct.
The runtime field is kept for future test expansion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Redshift Sink Connector with S3 staging

3 participants