Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etl-api/src/configs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl StoredSourceConfig {
trusted_root_certs: DEFAULT_TLS_TRUSTED_ROOT_CERTS.to_string(),
enabled: DEFAULT_TLS_ENABLED,
},
keepalive: None,
}
}

Expand All @@ -98,6 +99,7 @@ impl StoredSourceConfig {
username: self.username,
password: self.password.map(|p| p.into()),
tls,
keepalive: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions etl-api/src/k8s/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ mod tests {
trusted_root_certs: "".to_string(),
enabled: false,
},
keepalive: None,
},
batch: BatchConfig {
max_size: 10_000,
Expand Down
1 change: 1 addition & 0 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
trusted_root_certs: args.tls_certs,
enabled: args.tls_enabled,
},
keepalive: None,
};

let store = NotifyingStore::new();
Expand Down
39 changes: 39 additions & 0 deletions etl-config/src/shared/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions as SqlxConnectOptions, PgSslMode as SqlxSslMode};
use std::time::Duration;
use tokio_postgres::{Config as TokioPgConnectOptions, config::SslMode as TokioPgSslMode};

use crate::Config;
Expand Down Expand Up @@ -137,6 +138,10 @@ pub struct PgConnectionConfig {
pub password: Option<SecretString>,
/// TLS configuration for secure connections.
pub tls: TlsConfig,
/// TCP keepalive configuration for connection health monitoring.
/// When `None`, TCP keepalives are disabled.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub keepalive: Option<TcpKeepaliveConfig>,
}

impl Config for PgConnectionConfig {
Expand All @@ -158,6 +163,10 @@ pub struct PgConnectionConfigWithoutSecrets {
pub username: String,
/// TLS configuration for secure connections.
pub tls: TlsConfig,
/// TCP keepalive configuration for connection health monitoring.
/// When `None`, TCP keepalives are disabled.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub keepalive: Option<TcpKeepaliveConfig>,
}

impl From<PgConnectionConfig> for PgConnectionConfigWithoutSecrets {
Expand All @@ -168,6 +177,7 @@ impl From<PgConnectionConfig> for PgConnectionConfigWithoutSecrets {
name: value.name,
username: value.username,
tls: value.tls,
keepalive: value.keepalive,
}
}
}
Expand All @@ -194,6 +204,27 @@ impl TlsConfig {
}
}

/// TCP keepalive configuration for Postgres connections.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TcpKeepaliveConfig {
/// Time in seconds a connection must be idle before sending keepalive probes.
pub idle_secs: u64,
/// Time in seconds between individual keepalive probes.
pub interval_secs: u64,
/// Number of keepalive probes to send before considering the connection dead.
pub retries: u32,
}

impl Default for TcpKeepaliveConfig {
fn default() -> Self {
Self {
idle_secs: 30,
interval_secs: 30,
retries: 3,
}
}
}

/// Trait for converting configuration to database client connection options.
///
/// Provides a common interface for creating connection options for different
Expand Down Expand Up @@ -273,6 +304,14 @@ impl IntoConnectOptions<TokioPgConnectOptions> for PgConnectionConfig {
config.password(password.expose_secret());
}

if let Some(keepalive) = &self.keepalive {
config
.keepalives(true)
.keepalives_idle(Duration::from_secs(keepalive.idle_secs))
.keepalives_interval(Duration::from_secs(keepalive.interval_secs))
.keepalives_retries(keepalive.retries);
}

config
}

Expand Down
1 change: 1 addition & 0 deletions etl-examples/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
trusted_root_certs: String::new(),
enabled: false, // Set to true and provide certs for production
},
keepalive: None,
};

// Create in-memory store for tracking table replication states and table schemas
Expand Down
1 change: 1 addition & 0 deletions etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
//! username: "postgres".to_string(),
//! password: Some("password".to_string().into()),
//! tls: TlsConfig { enabled: false, trusted_root_certs: String::new() },
//! keepalive: None
//! };
//!
//! // Create memory-based store and destination for testing
Expand Down
1 change: 1 addition & 0 deletions etl/src/test_utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn local_pg_connection_config() -> PgConnectionConfig {
trusted_root_certs: String::new(),
enabled: false,
},
keepalive: None,
}
}

Expand Down