From bde92b714e0b3d1023d7aae92236e98fb5dcca81 Mon Sep 17 00:00:00 2001 From: Andrew Lee <665282+chairmanlee8@users.noreply.github.com> Date: Thu, 27 Nov 2025 22:05:06 +0700 Subject: [PATCH 1/7] plan --- PLAN.md | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 PLAN.md diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 000000000..cbd7d392d --- /dev/null +++ b/PLAN.md @@ -0,0 +1,123 @@ +# Plan: Add Postgres TCP Keepalives + +## Summary + +Add TCP-level keepalive configuration to `PgReplicationClient` to improve connection reliability, especially for connections through NAT gateways or load balancers that may drop idle connections. + +## Background + +The etl codebase currently uses application-level Standby Status Update messages (~10 seconds) for connection health, but lacks TCP-level keepalives. The `pg_replicate` fork added these settings: +```rust +.keepalives(true) +.keepalives_idle(Duration::from_secs(30)) +.keepalives_interval(Duration::from_secs(30)) +.keepalives_retries(3) +``` + +TCP keepalives provide network-level dead connection detection that complements the existing application-level heartbeats. + +## Files to Modify + +1. `etl-config/src/shared/connection.rs` - Add keepalive configuration struct +2. `etl/src/replication/client.rs` - Apply keepalive settings during connection + +## Implementation Steps + +### Step 1: Add Keepalive Configuration + +Create a new `TcpKeepaliveConfig` struct in `etl-config/src/shared/connection.rs`: + +```rust +/// TCP keepalive configuration for Postgres connections. +/// +/// TCP keepalives provide network-level connection health checking which can +/// detect dead connections faster than application-level heartbeats alone. +/// This is especially useful for connections through NAT gateways or load +/// balancers that may drop idle connections. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TcpKeepaliveConfig { + /// Time in seconds a connection must be idle before sending keepalive probes. + pub idle_secs: u64, + /// Time in seconds between individual keepalive probes. + pub interval_secs: u64, + /// Number of keepalive probes to send before considering the connection dead. + pub retries: u32, +} + +impl Default for TcpKeepaliveConfig { + fn default() -> Self { + Self { + idle_secs: 30, + interval_secs: 30, + retries: 3, + } + } +} +``` + +### Step 2: Add to PgConnectionConfig + +Add the keepalive field to `PgConnectionConfig` as an `Option`: + +```rust +pub struct PgConnectionConfig { + // ... existing fields ... + /// TCP keepalive configuration for connection health monitoring. + /// When `None`, TCP keepalives are disabled (default). + pub keepalive: Option, +} +``` + +Also add to `PgConnectionConfigWithoutSecrets` for consistency. + +### Step 3: Update IntoConnectOptions Implementation + +Update the `IntoConnectOptions` implementation to apply keepalive settings when present: + +```rust +impl IntoConnectOptions for PgConnectionConfig { + fn without_db(&self) -> TokioPgConnectOptions { + // ... existing setup ... + + if let Some(keepalive) = &self.keepalive { + config + .keepalives(true) + .keepalives_idle(Duration::from_secs(keepalive.idle_secs)) + .keepalives_interval(Duration::from_secs(keepalive.interval_secs)) + .keepalives_retries(keepalive.retries); + } + + // ... rest of method ... + } +} +``` + +### Step 4: Verify Connection Methods + +The `PgReplicationClient::connect_no_tls()` and `connect_tls()` methods in `etl/src/replication/client.rs` use `pg_connection_config.clone().with_db()`, so they will automatically pick up the keepalive settings from the updated `IntoConnectOptions` implementation. + +No changes needed in `client.rs` itself since keepalives are applied in the config conversion. + +## Testing + +1. Unit test: Verify `TcpKeepaliveConfig::default()` values match expected (30s idle, 30s interval, 3 retries) +2. Integration test: Verify connections work with and without keepalives configured +3. Manual test: Confirm keepalive settings are applied via `pg_stat_activity` or network inspection + +## Rollout Considerations + +- Keepalives are disabled by default (`None`) for backwards compatibility +- To enable, add keepalive config to `PgConnectionConfig`: + ```toml + [source.keepalive] + idle_secs = 30 + interval_secs = 30 + retries = 3 + ``` +- Can use `TcpKeepaliveConfig::default()` for standard settings + +## References + +- `../pg_replicate/FORK_CHANGES.diff` - Original implementation in pg_replicate +- `TODO.md` - Task description +- `AGENTS.md` - Coding guidelines From f00cb7d96baa22a12cd4e8cd142e95267c95ffe4 Mon Sep 17 00:00:00 2001 From: Andrew Lee <665282+chairmanlee8@users.noreply.github.com> Date: Thu, 27 Nov 2025 22:16:21 +0700 Subject: [PATCH 2/7] _ --- etl-api/src/configs/source.rs | 2 + etl-api/src/k8s/http.rs | 1 + ...ts__create_replicator_config_map_json.snap | 2 +- etl-benchmarks/benches/table_copies.rs | 1 + etl-config/src/shared/connection.rs | 43 +++++++++++++++++++ etl-examples/src/main.rs | 1 + etl/src/test_utils/database.rs | 1 + 7 files changed, 50 insertions(+), 1 deletion(-) diff --git a/etl-api/src/configs/source.rs b/etl-api/src/configs/source.rs index a5d15fb35..956637083 100644 --- a/etl-api/src/configs/source.rs +++ b/etl-api/src/configs/source.rs @@ -87,6 +87,7 @@ impl StoredSourceConfig { trusted_root_certs: DEFAULT_TLS_TRUSTED_ROOT_CERTS.to_string(), enabled: DEFAULT_TLS_ENABLED, }, + keepalive: None, } } @@ -98,6 +99,7 @@ impl StoredSourceConfig { username: self.username, password: self.password.map(|p| p.into()), tls, + keepalive: None, } } } diff --git a/etl-api/src/k8s/http.rs b/etl-api/src/k8s/http.rs index d737135a7..049335f5d 100644 --- a/etl-api/src/k8s/http.rs +++ b/etl-api/src/k8s/http.rs @@ -1116,6 +1116,7 @@ mod tests { trusted_root_certs: "".to_string(), enabled: false, }, + keepalive: None, }, batch: BatchConfig { max_size: 10_000, diff --git a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap index 10a8d4417..6b75b8948 100644 --- a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap +++ b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap @@ -6,7 +6,7 @@ expression: config_map_json "apiVersion": "v1", "data": { "base.json": "", - "prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" + "prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false},\"keepalive\":null},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" }, "kind": "ConfigMap", "metadata": { diff --git a/etl-benchmarks/benches/table_copies.rs b/etl-benchmarks/benches/table_copies.rs index 7764a8a16..01503d18d 100644 --- a/etl-benchmarks/benches/table_copies.rs +++ b/etl-benchmarks/benches/table_copies.rs @@ -319,6 +319,7 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box> { trusted_root_certs: args.tls_certs, enabled: args.tls_enabled, }, + keepalive: None, }; let store = NotifyingStore::new(); diff --git a/etl-config/src/shared/connection.rs b/etl-config/src/shared/connection.rs index 87399233b..359bf0734 100644 --- a/etl-config/src/shared/connection.rs +++ b/etl-config/src/shared/connection.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgConnectOptions as SqlxConnectOptions, PgSslMode as SqlxSslMode}; @@ -137,6 +139,9 @@ pub struct PgConnectionConfig { pub password: Option, /// TLS configuration for secure connections. pub tls: TlsConfig, + /// TCP keepalive configuration for connection health monitoring. + /// When `None`, TCP keepalives are disabled. + pub keepalive: Option, } impl Config for PgConnectionConfig { @@ -158,6 +163,9 @@ pub struct PgConnectionConfigWithoutSecrets { pub username: String, /// TLS configuration for secure connections. pub tls: TlsConfig, + /// TCP keepalive configuration for connection health monitoring. + /// When `None`, TCP keepalives are disabled. + pub keepalive: Option, } impl From for PgConnectionConfigWithoutSecrets { @@ -168,6 +176,7 @@ impl From for PgConnectionConfigWithoutSecrets { name: value.name, username: value.username, tls: value.tls, + keepalive: value.keepalive, } } } @@ -194,6 +203,32 @@ impl TlsConfig { } } +/// TCP keepalive configuration for Postgres connections. +/// +/// TCP keepalives provide network-level connection health checking which can +/// detect dead connections faster than application-level heartbeats alone. +/// This is especially useful for connections through NAT gateways or load +/// balancers that may drop idle connections. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TcpKeepaliveConfig { + /// Time in seconds a connection must be idle before sending keepalive probes. + pub idle_secs: u64, + /// Time in seconds between individual keepalive probes. + pub interval_secs: u64, + /// Number of keepalive probes to send before considering the connection dead. + pub retries: u32, +} + +impl Default for TcpKeepaliveConfig { + fn default() -> Self { + Self { + idle_secs: 30, + interval_secs: 30, + retries: 3, + } + } +} + /// Trait for converting configuration to database client connection options. /// /// Provides a common interface for creating connection options for different @@ -273,6 +308,14 @@ impl IntoConnectOptions for PgConnectionConfig { config.password(password.expose_secret()); } + if let Some(keepalive) = &self.keepalive { + config + .keepalives(true) + .keepalives_idle(Duration::from_secs(keepalive.idle_secs)) + .keepalives_interval(Duration::from_secs(keepalive.interval_secs)) + .keepalives_retries(keepalive.retries); + } + config } diff --git a/etl-examples/src/main.rs b/etl-examples/src/main.rs index 37995a044..5eb0345a9 100644 --- a/etl-examples/src/main.rs +++ b/etl-examples/src/main.rs @@ -155,6 +155,7 @@ async fn main_impl() -> Result<(), Box> { trusted_root_certs: String::new(), enabled: false, // Set to true and provide certs for production }, + keepalive: None, }; // Create in-memory store for tracking table replication states and table schemas diff --git a/etl/src/test_utils/database.rs b/etl/src/test_utils/database.rs index 106bd0e2b..33eadf4c0 100644 --- a/etl/src/test_utils/database.rs +++ b/etl/src/test_utils/database.rs @@ -41,6 +41,7 @@ fn local_pg_connection_config() -> PgConnectionConfig { trusted_root_certs: String::new(), enabled: false, }, + keepalive: None, } } From 6627abbeaba536e2d061a5ab212a7ad866597929 Mon Sep 17 00:00:00 2001 From: Andrew Lee <665282+chairmanlee8@users.noreply.github.com> Date: Thu, 27 Nov 2025 22:18:27 +0700 Subject: [PATCH 3/7] _ --- etl-config/src/shared/connection.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etl-config/src/shared/connection.rs b/etl-config/src/shared/connection.rs index 359bf0734..5dc852f57 100644 --- a/etl-config/src/shared/connection.rs +++ b/etl-config/src/shared/connection.rs @@ -141,6 +141,7 @@ pub struct PgConnectionConfig { pub tls: TlsConfig, /// TCP keepalive configuration for connection health monitoring. /// When `None`, TCP keepalives are disabled. + #[serde(default)] pub keepalive: Option, } @@ -165,6 +166,7 @@ pub struct PgConnectionConfigWithoutSecrets { pub tls: TlsConfig, /// TCP keepalive configuration for connection health monitoring. /// When `None`, TCP keepalives are disabled. + #[serde(default)] pub keepalive: Option, } From aaa7b672dc2c5ee04e070cdefb4bbce19c732050 Mon Sep 17 00:00:00 2001 From: Andrew Lee <126016409+andrew-lee-architect@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:41:01 -0600 Subject: [PATCH 4/7] _ --- PLAN.md | 123 ---------------------------- etl-config/src/shared/connection.rs | 9 +- 2 files changed, 2 insertions(+), 130 deletions(-) delete mode 100644 PLAN.md diff --git a/PLAN.md b/PLAN.md deleted file mode 100644 index cbd7d392d..000000000 --- a/PLAN.md +++ /dev/null @@ -1,123 +0,0 @@ -# Plan: Add Postgres TCP Keepalives - -## Summary - -Add TCP-level keepalive configuration to `PgReplicationClient` to improve connection reliability, especially for connections through NAT gateways or load balancers that may drop idle connections. - -## Background - -The etl codebase currently uses application-level Standby Status Update messages (~10 seconds) for connection health, but lacks TCP-level keepalives. The `pg_replicate` fork added these settings: -```rust -.keepalives(true) -.keepalives_idle(Duration::from_secs(30)) -.keepalives_interval(Duration::from_secs(30)) -.keepalives_retries(3) -``` - -TCP keepalives provide network-level dead connection detection that complements the existing application-level heartbeats. - -## Files to Modify - -1. `etl-config/src/shared/connection.rs` - Add keepalive configuration struct -2. `etl/src/replication/client.rs` - Apply keepalive settings during connection - -## Implementation Steps - -### Step 1: Add Keepalive Configuration - -Create a new `TcpKeepaliveConfig` struct in `etl-config/src/shared/connection.rs`: - -```rust -/// TCP keepalive configuration for Postgres connections. -/// -/// TCP keepalives provide network-level connection health checking which can -/// detect dead connections faster than application-level heartbeats alone. -/// This is especially useful for connections through NAT gateways or load -/// balancers that may drop idle connections. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TcpKeepaliveConfig { - /// Time in seconds a connection must be idle before sending keepalive probes. - pub idle_secs: u64, - /// Time in seconds between individual keepalive probes. - pub interval_secs: u64, - /// Number of keepalive probes to send before considering the connection dead. - pub retries: u32, -} - -impl Default for TcpKeepaliveConfig { - fn default() -> Self { - Self { - idle_secs: 30, - interval_secs: 30, - retries: 3, - } - } -} -``` - -### Step 2: Add to PgConnectionConfig - -Add the keepalive field to `PgConnectionConfig` as an `Option`: - -```rust -pub struct PgConnectionConfig { - // ... existing fields ... - /// TCP keepalive configuration for connection health monitoring. - /// When `None`, TCP keepalives are disabled (default). - pub keepalive: Option, -} -``` - -Also add to `PgConnectionConfigWithoutSecrets` for consistency. - -### Step 3: Update IntoConnectOptions Implementation - -Update the `IntoConnectOptions` implementation to apply keepalive settings when present: - -```rust -impl IntoConnectOptions for PgConnectionConfig { - fn without_db(&self) -> TokioPgConnectOptions { - // ... existing setup ... - - if let Some(keepalive) = &self.keepalive { - config - .keepalives(true) - .keepalives_idle(Duration::from_secs(keepalive.idle_secs)) - .keepalives_interval(Duration::from_secs(keepalive.interval_secs)) - .keepalives_retries(keepalive.retries); - } - - // ... rest of method ... - } -} -``` - -### Step 4: Verify Connection Methods - -The `PgReplicationClient::connect_no_tls()` and `connect_tls()` methods in `etl/src/replication/client.rs` use `pg_connection_config.clone().with_db()`, so they will automatically pick up the keepalive settings from the updated `IntoConnectOptions` implementation. - -No changes needed in `client.rs` itself since keepalives are applied in the config conversion. - -## Testing - -1. Unit test: Verify `TcpKeepaliveConfig::default()` values match expected (30s idle, 30s interval, 3 retries) -2. Integration test: Verify connections work with and without keepalives configured -3. Manual test: Confirm keepalive settings are applied via `pg_stat_activity` or network inspection - -## Rollout Considerations - -- Keepalives are disabled by default (`None`) for backwards compatibility -- To enable, add keepalive config to `PgConnectionConfig`: - ```toml - [source.keepalive] - idle_secs = 30 - interval_secs = 30 - retries = 3 - ``` -- Can use `TcpKeepaliveConfig::default()` for standard settings - -## References - -- `../pg_replicate/FORK_CHANGES.diff` - Original implementation in pg_replicate -- `TODO.md` - Task description -- `AGENTS.md` - Coding guidelines diff --git a/etl-config/src/shared/connection.rs b/etl-config/src/shared/connection.rs index 5dc852f57..af9129deb 100644 --- a/etl-config/src/shared/connection.rs +++ b/etl-config/src/shared/connection.rs @@ -141,7 +141,7 @@ pub struct PgConnectionConfig { pub tls: TlsConfig, /// TCP keepalive configuration for connection health monitoring. /// When `None`, TCP keepalives are disabled. - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] pub keepalive: Option, } @@ -166,7 +166,7 @@ pub struct PgConnectionConfigWithoutSecrets { pub tls: TlsConfig, /// TCP keepalive configuration for connection health monitoring. /// When `None`, TCP keepalives are disabled. - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] pub keepalive: Option, } @@ -206,11 +206,6 @@ impl TlsConfig { } /// TCP keepalive configuration for Postgres connections. -/// -/// TCP keepalives provide network-level connection health checking which can -/// detect dead connections faster than application-level heartbeats alone. -/// This is especially useful for connections through NAT gateways or load -/// balancers that may drop idle connections. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TcpKeepaliveConfig { /// Time in seconds a connection must be idle before sending keepalive probes. From 0f874487811b4a9015f232eaa6211dc25743cd1f Mon Sep 17 00:00:00 2001 From: Andrew Lee <126016409+andrew-lee-architect@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:46:07 -0600 Subject: [PATCH 5/7] _ --- etl-config/src/shared/connection.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/etl-config/src/shared/connection.rs b/etl-config/src/shared/connection.rs index af9129deb..b86a4381d 100644 --- a/etl-config/src/shared/connection.rs +++ b/etl-config/src/shared/connection.rs @@ -1,8 +1,7 @@ -use std::time::Duration; - use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgConnectOptions as SqlxConnectOptions, PgSslMode as SqlxSslMode}; +use std::time::Duration; use tokio_postgres::{Config as TokioPgConnectOptions, config::SslMode as TokioPgSslMode}; use crate::Config; From b9d9fe1091aff568feac9ef84ae3104f1918929d Mon Sep 17 00:00:00 2001 From: Andrew Lee <126016409+andrew-lee-architect@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:46:32 -0600 Subject: [PATCH 6/7] _ --- ...pi__k8s__http__tests__create_replicator_config_map_json.snap | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap index 6b75b8948..10a8d4417 100644 --- a/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap +++ b/etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_replicator_config_map_json.snap @@ -6,7 +6,7 @@ expression: config_map_json "apiVersion": "v1", "data": { "base.json": "", - "prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false},\"keepalive\":null},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" + "prod.json": "{\"destination\":{\"big_query\":{\"project_id\":\"project-id\",\"dataset_id\":\"dataset-id\",\"max_concurrent_streams\":4}},\"pipeline\":{\"id\":42,\"publication_name\":\"all-pub\",\"pg_connection\":{\"host\":\"localhost\",\"port\":5432,\"name\":\"postgres\",\"username\":\"postgres\",\"tls\":{\"trusted_root_certs\":\"\",\"enabled\":false}},\"batch\":{\"max_size\":10000,\"max_fill_ms\":1000},\"table_error_retry_delay_ms\":500,\"table_error_retry_max_attempts\":3,\"max_table_sync_workers\":4}}" }, "kind": "ConfigMap", "metadata": { From 335b5d5271ed040dadb10241a2e8da42945e53e0 Mon Sep 17 00:00:00 2001 From: Andrew Lee <126016409+andrew-lee-architect@users.noreply.github.com> Date: Wed, 3 Dec 2025 13:52:45 -0600 Subject: [PATCH 7/7] fix doctest --- etl/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/etl/src/lib.rs b/etl/src/lib.rs index 01a4dc249..5001eff06 100644 --- a/etl/src/lib.rs +++ b/etl/src/lib.rs @@ -65,6 +65,7 @@ //! username: "postgres".to_string(), //! password: Some("password".to_string().into()), //! tls: TlsConfig { enabled: false, trusted_root_certs: String::new() }, +//! keepalive: None //! }; //! //! // Create memory-based store and destination for testing