Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions samples/Z0DAN/zodan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,7 @@ CREATE OR REPLACE PROCEDURE spock.check_commit_timestamp_and_advance_slot(
) LANGUAGE plpgsql AS $$
DECLARE
rec RECORD;
commit_ts timestamp;
start_point record;
slot_name text;
dbname text;
remotesql text;
Expand All @@ -2159,16 +2159,18 @@ BEGIN
FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP
-- Check commit timestamp for lag from "other" node to new node
BEGIN
remotesql := format('SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = %L AND receiver_name = %L',
rec.node_name, new_node_name);
remotesql := format('SELECT commit_lsn, commit_timestamp
FROM spock.lag_tracker
WHERE origin_name = %L AND receiver_name = %L',
rec.node_name, new_node_name);
IF verb THEN
RAISE NOTICE ' Remote SQL for commit timestamp check: %', remotesql;
END IF;

SELECT * FROM dblink(new_node_dsn, remotesql) AS t(ts timestamp) INTO commit_ts;
SELECT * FROM dblink(new_node_dsn, remotesql) AS t(lsn pg_lsn, ts timestamp) INTO start_point;

IF commit_ts IS NOT NULL THEN
RAISE NOTICE ' OK: %', rpad('Found commit timestamp for ' || rec.node_name || '->' || new_node_name || ': ' || commit_ts, 120, ' ');
IF start_point.ts IS NOT NULL THEN
RAISE NOTICE ' OK: %', rpad('Found commit timestamp for ' || rec.node_name || '->' || new_node_name || ': ' || start_point.ts, 120, ' ');
ELSE
RAISE NOTICE ' - %', rpad('No commit timestamp found for ' || rec.node_name || '->' || new_node_name, 120, ' ');
CONTINUE;
Expand Down Expand Up @@ -2205,7 +2207,6 @@ BEGIN

DECLARE
current_lsn pg_lsn;
target_lsn pg_lsn;
BEGIN
SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO current_lsn;

Expand All @@ -2214,31 +2215,23 @@ BEGIN
CONTINUE;
END IF;

-- Get target LSN from commit timestamp
remotesql := format('SELECT spock.get_lsn_from_commit_ts(%L, %L::timestamp)', slot_name, commit_ts);
IF verb THEN
RAISE NOTICE ' Remote SQL for LSN lookup: %', remotesql;
END IF;

SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO target_lsn;

IF target_lsn IS NULL OR target_lsn <= current_lsn THEN
RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, target_lsn;
IF start_point.lsn IS NULL OR start_point.lsn <= current_lsn THEN
RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, start_point.lsn;
CONTINUE;
END IF;

-- Advance the slot
remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, target_lsn);
remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', slot_name, start_point.lsn);
IF verb THEN
RAISE NOTICE ' Remote SQL for slot advancement: %', remotesql;
END IF;

PERFORM * FROM dblink(rec.dsn, remotesql) AS t(result text);
RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || target_lsn, 120, ' ');
RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || start_point.lsn, 120, ' ');
END;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to timestamp ' || commit_ts || ' (error: ' || SQLERRM || ')', 120, ' ');
RAISE NOTICE ' ✗ %', rpad('Advancing slot ' || slot_name || ' to timestamp ' || start_point.ts || ' (error: ' || SQLERRM || ')', 120, ' ');
-- Continue with other nodes even if this one fails
END;
END LOOP;
Expand Down