From 2a8607d4672dfa73631a30f3a309fc2ce0d5144b Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Tue, 23 Dec 2025 15:30:04 +0100 Subject: [PATCH 1/2] Remove duplicate key field from SpockGroupEntry structure The SpockGroupEntry structure contained a redundant key field that duplicated the key already embedded in its progress member (SpockGroupEntry.key vs SpockGroupEntry.progress.key). This redundancy violated the DRY principle and created potential for inconsistency if the two keys ever diverged. This commit removes the duplicate key field from SpockGroupEntry, relying solely on the key embedded in SpockApplyProgress. The hash table already uses SpockProgressKey (the first field of SpockApplyProgress) as the hash key, making the separate SpockGroupEntry.key field unnecessary. Changes: 1. Structural cleanup: - Removed SpockGroupEntry.key field (the duplicate) - Kept SpockApplyProgress.key as the single source of truth - Renamed SpockGroupKey to SpockProgressKey for clarity 2. Code quality improvements: - Created init_progress_fields() helper for safe initialization - Replaced fragile pointer arithmetic with explicit field initialization - Follows PostgreSQL conventions (similar to CommitTsShmemInit) - Added comprehensive documentation explaining hash table design 3. Documentation and comments: - Added header explaining Group Registry architecture - Clarified that SpockApplyProgress.key MUST be first field - Documented persistence strategy (WAL + file snapshot) - Improved TODO comments to be actionable The functional behavior is unchanged; this is a refactoring that improves code quality and reduces redundancy. --- include/spock_group.h | 66 +++++++----- include/spock_rmgr.h | 1 - src/spock_apply.c | 39 ++----- src/spock_functions.c | 41 +++---- src/spock_group.c | 201 ++++++++++++++++++++++++++--------- src/spock_rmgr.c | 54 +++++----- src/spock_sync.c | 14 +-- utils/pgindent/typedefs.list | 8 +- 8 files changed, 257 insertions(+), 167 deletions(-) diff --git a/include/spock_group.h b/include/spock_group.h index 86a55be1..57633760 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -56,12 +56,12 @@ typedef struct DumpCtx /* Hash Key */ -typedef struct SpockGroupKey +typedef struct SpockProgressKey { Oid dbid; Oid node_id; Oid remote_node_id; -} SpockGroupKey; +} SpockProgressKey; /* * Columns for the UI routine get_apply_group_progress. @@ -84,58 +84,66 @@ typedef enum } GroupProgressTupDescColumns; /* - * Logical Replication Progress has made by a group of apply workers. + * Logical Replication Progress made by a group of apply workers. * + * IMPORTANT: The 'key' field must remain the first member of this structure + * because the hash table uses SpockProgressKey as its hash key type. The + * dynahash code accesses the key portion directly, so changing this layout + * would break the hash table operations. + * + * Field descriptions: + * key - Identifies the replication group (dbid, node_id, remote_node_id). + * MUST be first field for hash table compatibility. * remote_commit_ts - the most advanced timestamp of COMMIT commands, already - * applied by the replication group. In fact, an apply worker may finish - * the COMMIT apply if only all other commits with smaller timestamps have - * already been committed by other workers. So, this value tells us about - * the real progress. + * applied by the replication group. In fact, an apply worker may finish + * the COMMIT apply if only all other commits with smaller timestamps have + * already been committed by other workers. So, this value tells us about + * the real progress. * prev_remote_ts - XXX: It seems do nothing at the moment and should - * be considered to be removed. + * be considered to be removed. * remote_commit_lsn - LSN of the COMMIT corresponding to the remote_commit_ts. * remote_insert_lsn - an LSN of the most advanced WAL record written to - * the WAL on the remote side. Replication protocol attempts to update it as - * frequently as possible, but it still be a little stale. + * the WAL on the remote side. Replication protocol attempts to update it as + * frequently as possible, but it still be a little stale. * received_lsn - an LSN of the most advanced WAL record that was received by - * the group. + * the group. * last_updated_ts - timestamp when remote COMMIT command (identified by the - * remote_commit_ts and remote_commit_lsn) was applied locally. - * Spock employs this value to calculate replication_lag. + * remote_commit_ts and remote_commit_lsn) was applied locally. + * Spock employs this value to calculate replication_lag. * updated_by_decode - obsolete value. It was needed to decide on the LR lag - * that seems not needed if we have NULL value for a timestamp column. + * that seems not needed if we have NULL value for a timestamp column. */ typedef struct SpockApplyProgress { - SpockGroupKey key; /* common elements */ - TimestampTz remote_commit_ts; /* committed remote txn ts */ + SpockProgressKey key; /* MUST be first field */ + + TimestampTz remote_commit_ts; /* committed remote txn ts */ /* * Bit of duplication of remote_commit_ts. Serves the same purpose, except * keep the last updated value */ - TimestampTz prev_remote_ts; - XLogRecPtr remote_commit_lsn; /* LSN of remote commit on origin */ - XLogRecPtr remote_insert_lsn; /* origin insert/end LSN reported */ + TimestampTz prev_remote_ts; + XLogRecPtr remote_commit_lsn; /* LSN of remote commit on origin */ + XLogRecPtr remote_insert_lsn; /* origin insert/end LSN reported */ /* * The largest received LSN by the group. It is more or equal to the * remote_commit_lsn. */ - XLogRecPtr received_lsn; + XLogRecPtr received_lsn; - TimestampTz last_updated_ts; /* when we set this */ - bool updated_by_decode; /* set by decode or apply. OBSOLETE. Used + TimestampTz last_updated_ts; /* when we set this */ + bool updated_by_decode; /* set by decode or apply. OBSOLETE. Used * in versions <=5.x.x only */ } SpockApplyProgress; /* Hash entry: one per group (stable pointer; not moved by dynahash) */ typedef struct SpockGroupEntry { - SpockGroupKey key; /* hash key */ - SpockApplyProgress progress; - pg_atomic_uint32 nattached; - ConditionVariable prev_processed_cv; + SpockApplyProgress progress; + pg_atomic_uint32 nattached; + ConditionVariable prev_processed_cv; } SpockGroupEntry; /* shmem setup */ @@ -144,8 +152,9 @@ extern void spock_group_shmem_startup(int napply_groups, bool found); SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id); void spock_group_detach(void); -bool spock_group_progress_update(const SpockApplyProgress *sap); -void spock_group_progress_update_ptr(SpockGroupEntry *e, const SpockApplyProgress *sap); +extern bool spock_group_progress_update(const SpockApplyProgress *sap); +extern void spock_group_progress_update_ptr(SpockGroupEntry *entry, + const SpockApplyProgress *sap); SpockApplyProgress *apply_worker_get_progress(void); SpockGroupEntry *spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id); @@ -154,7 +163,6 @@ typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg); void spock_group_foreach(SpockGroupIterCB cb, void *arg); extern void spock_group_resource_dump(void); -extern void spock_group_resource_load(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); #endif /* SPOCK_GROUP_H */ diff --git a/include/spock_rmgr.h b/include/spock_rmgr.h index 55341f79..baa11d62 100644 --- a/include/spock_rmgr.h +++ b/include/spock_rmgr.h @@ -47,6 +47,5 @@ extern void spock_rmgr_cleanup(void); /* WAL helpers */ extern XLogRecPtr spock_apply_progress_add_to_wal(const SpockApplyProgress *sap); -extern void spock_group_emit_progress_wal_cb(const SpockGroupEntry *e, void *arg); #endif /* SPOCK_RMGR_H */ diff --git a/src/spock_apply.c b/src/spock_apply.c index fb8b5d08..00588e28 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -2703,36 +2703,21 @@ send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force) return true; } -/* - * Static value to track local progress. - * - * Follow the CommitTsShmemInit() code as an example how to initialize such data - * types in shared memory. Keep the order according to the SpockApplyProgress - * declaration. - * - * NOTE: we initialize empty timestamp with 0 because of multiple places where - * semantics assumes empty timestamp is 0. - */ -static SpockApplyProgress apply_progress = -{ - .remote_commit_ts = 0, - .prev_remote_ts = 0, - .remote_commit_lsn = InvalidXLogRecPtr, - .remote_insert_lsn = InvalidXLogRecPtr, - .received_lsn = InvalidXLogRecPtr, - .last_updated_ts = 0, - .updated_by_decode = false -}; - /* * Update frequently changing statistics of the apply group */ static void UpdateWorkerStats(XLogRecPtr last_received, XLogRecPtr last_inserted) { - apply_progress.received_lsn = last_received; - apply_progress.remote_insert_lsn = last_inserted; - spock_group_progress_update_ptr(MyApplyWorker->apply_group, &apply_progress); + SpockApplyProgress sap = {0}; + + sap.key.dbid = MyDatabaseId; + sap.key.node_id = MySubscription->target->id; + sap.key.remote_node_id = MySubscription->origin->id; + + sap.received_lsn = last_received; + sap.remote_insert_lsn = last_inserted; + spock_group_progress_update_ptr(MyApplyWorker->apply_group, &sap); } /* @@ -2768,12 +2753,6 @@ apply_work(PGconn *streamConn) MemoryContextSwitchTo(MessageContext); - /* Initialize our static progress variable */ - memset(&apply_progress.key, 0, sizeof(SpockGroupKey)); - apply_progress.key.dbid = MyDatabaseId; - apply_progress.key.node_id = MySubscription->target->id; - apply_progress.key.remote_node_id = MySubscription->origin->id; - /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); diff --git a/src/spock_functions.c b/src/spock_functions.c index 34724ab5..55332365 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -3477,35 +3477,36 @@ get_apply_group_progress(PG_FUNCTION_ARGS) hash_seq_init(&it, SpockGroupHash); while ((e = (SpockGroupEntry *) hash_seq_search(&it)) != NULL) { - Datum values[_GP_LAST_]; - bool nulls[_GP_LAST_] = {0}; + SpockApplyProgress *sap = &e->progress; + Datum values[_GP_LAST_]; + bool nulls[_GP_LAST_] = {0}; /* * Centralise conversion of local representation of the progress data * to an external representation. This is a good place to check * correctness of each value (valid oid and timestamps). */ - Assert(OidIsValid(e->key.dbid) && OidIsValid(e->key.node_id) && - OidIsValid(e->key.remote_node_id)); + Assert(OidIsValid(sap->key.dbid) && OidIsValid(sap->key.node_id) && + OidIsValid(sap->key.remote_node_id)); - values[GP_DBOID] = ObjectIdGetDatum(e->progress.key.dbid); - values[GP_NODE_ID] = ObjectIdGetDatum(e->progress.key.node_id); - values[GP_REMOTE_NODE_ID] = ObjectIdGetDatum(e->progress.key.remote_node_id); + values[GP_DBOID] = ObjectIdGetDatum(sap->key.dbid); + values[GP_NODE_ID] = ObjectIdGetDatum(sap->key.node_id); + values[GP_REMOTE_NODE_ID] = ObjectIdGetDatum(sap->key.remote_node_id); - if (e->progress.remote_commit_ts != 0) + if (sap->remote_commit_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.remote_commit_ts)); + Assert(IS_VALID_TIMESTAMP(sap->remote_commit_ts)); values[GP_REMOTE_COMMIT_TS] = - TimestampTzGetDatum(e->progress.remote_commit_ts); + TimestampTzGetDatum(sap->remote_commit_ts); } else nulls[GP_REMOTE_COMMIT_TS] = true; - if (e->progress.prev_remote_ts != 0) + if (sap->prev_remote_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.prev_remote_ts)); + Assert(IS_VALID_TIMESTAMP(sap->prev_remote_ts)); values[GP_PREV_REMOTE_TS] = - TimestampTzGetDatum(e->progress.prev_remote_ts); + TimestampTzGetDatum(sap->prev_remote_ts); } else nulls[GP_PREV_REMOTE_TS] = true; @@ -3515,20 +3516,20 @@ get_apply_group_progress(PG_FUNCTION_ARGS) * and the current LSN. Moreover, LSN=0 physically makes sense. So, * don't introduce NULL value for these LSN fields. */ - values[GP_REMOTE_COMMIT_LSN] = LSNGetDatum(e->progress.remote_commit_lsn); - values[GP_REMOTE_INSERT_LSN] = LSNGetDatum(e->progress.remote_insert_lsn); - values[GP_RECEIVED_LSN] = LSNGetDatum(e->progress.received_lsn); + values[GP_REMOTE_COMMIT_LSN] = LSNGetDatum(sap->remote_commit_lsn); + values[GP_REMOTE_INSERT_LSN] = LSNGetDatum(sap->remote_insert_lsn); + values[GP_RECEIVED_LSN] = LSNGetDatum(sap->received_lsn); - if (e->progress.last_updated_ts != 0) + if (sap->last_updated_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.last_updated_ts)); + Assert(IS_VALID_TIMESTAMP(sap->last_updated_ts)); values[GP_LAST_UPDATED_TS] = - TimestampTzGetDatum(e->progress.last_updated_ts); + TimestampTzGetDatum(sap->last_updated_ts); } else nulls[GP_LAST_UPDATED_TS] = true; - values[GP_UPDATED_BY_DECODE] = BoolGetDatum(e->progress.updated_by_decode); + values[GP_UPDATED_BY_DECODE] = BoolGetDatum(sap->updated_by_decode); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } diff --git a/src/spock_group.c b/src/spock_group.c index 41ae2fc0..347a7f67 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -63,6 +63,40 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; HTAB *SpockGroupHash = NULL; +static void spock_group_resource_load(void); + +/* + * Initialize a SpockApplyProgress structure. + * + * The key must already be set (it's the hash key). This function zeros out + * all other fields following PostgreSQL conventions for shared memory + * initialization (see CommitTsShmemInit for similar pattern). + * + * We use C99 designated initializers to be explicit about what we're + * initializing and to avoid fragile pointer arithmetic. + */ +static inline void +init_progress_fields(SpockApplyProgress *progress) +{ + /* Key should already be set by hash_search or caller */ + Assert(OidIsValid(progress->key.dbid)); + Assert(OidIsValid(progress->key.node_id)); + Assert(OidIsValid(progress->key.remote_node_id)); + + /* + * Initialize all non-key fields to zero/invalid values. + * Using 0 for timestamps follows PostgreSQL convention where + * timestamp 0 represents "not set" (see CommitTsShmemInit). + */ + progress->remote_commit_ts = 0; + progress->prev_remote_ts = 0; + progress->remote_commit_lsn = InvalidXLogRecPtr; + progress->remote_insert_lsn = InvalidXLogRecPtr; + progress->received_lsn = InvalidXLogRecPtr; + progress->last_updated_ts = 0; + progress->updated_by_decode = false; +} + /* * spock_group_shmem_request * @@ -122,7 +156,7 @@ spock_group_shmem_startup(int napply_groups, bool found) return; MemSet(&hctl, 0, sizeof(hctl)); - hctl.keysize = sizeof(SpockGroupKey); + hctl.keysize = sizeof(SpockProgressKey); hctl.entrysize = sizeof(SpockGroupEntry); hctl.hash = tag_hash; hctl.num_partitions = 16; @@ -160,10 +194,22 @@ spock_group_shmem_startup(int napply_groups, bool found) spock_group_resource_load(); } -static inline SpockGroupKey +/* + * make_key + * + * Construct a SpockProgressKey from component OIDs. + * + * TODO: Implement custom hash and comparison functions for SpockProgressKey + * to avoid undefined behavior from comparing padding bytes. Currently we + * zero the entire struct to ensure reproducible comparisons, but the C + * standard doesn't guarantee padding byte values are preserved during struct + * assignment. A proper fix would be to provide hash_func and match_func + * callbacks to hash_create() that only compare the actual OID fields. + */ +static inline SpockProgressKey make_key(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey k; + SpockProgressKey k; memset(&k, 0, sizeof(k)); k.dbid = dbid; @@ -183,24 +229,27 @@ make_key(Oid dbid, Oid node_id, Oid remote_node_id) SpockGroupEntry * spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey key = make_key(dbid, node_id, remote_node_id); - SpockGroupEntry *e; + SpockProgressKey key = make_key(dbid, node_id, remote_node_id); + SpockGroupEntry *entry; bool found; - e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_ENTER, &found); + entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, + HASH_ENTER, &found); if (!found) { - /* initialize key values; Other entries will be updated later */ - memset(&e->progress, 0, sizeof(e->progress)); - e->progress.key = e->key; + /* + * New entry: the hash table already copied 'key' into + * entry->progress.key, now initialize the remaining progress fields. + */ + init_progress_fields(&entry->progress); - pg_atomic_init_u32(&e->nattached, 0); - ConditionVariableInit(&e->prev_processed_cv); + pg_atomic_init_u32(&entry->nattached, 0); + ConditionVariableInit(&entry->prev_processed_cv); } - pg_atomic_add_fetch_u32(&e->nattached, 1); + pg_atomic_add_fetch_u32(&entry->nattached, 1); - return e; + return entry; } /* @@ -287,45 +336,66 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src) * Update the progress snapshot for (dbid,node_id,remote_node_id). * Uses hash_search(HASH_ENTER) for table access, then copies 'sap' into the * entry's progress payload under the gate lock (writers EXCLUSIVE). + * + * Returns: true if the record already existed (updated), false if newly inserted. */ bool spock_group_progress_update(const SpockApplyProgress *sap) { - SpockGroupKey key; - SpockGroupEntry *e; - bool found; + SpockGroupEntry *entry; + bool found; - if (!sap) - return false; + Assert(OidIsValid(sap->key.dbid) && OidIsValid(sap->key.node_id) && + OidIsValid(sap->key.remote_node_id)); + Assert(sap != NULL); if (!SpockGroupHash || !SpockCtx) { - elog(WARNING, "spock_group_progress_update: SpockGroupHash not initialized"); + /* + * This should never happen in normal operation. The shared memory + * structures are initialized during postmaster startup via + * shmem_startup_hook. If we hit this, it likely indicates: + * 1. A bug in initialization ordering, or + * 2. Corruption of shared memory pointers + * + * We return false to allow callers to continue (best-effort recovery), + * but this progress update is lost. + * + * TODO: Add a test case that deliberately calls this function before + * shared memory initialization (e.g., from a backend that loads spock + * extension late) to verify the warning fires and doesn't crash. + */ + elog(WARNING, "SpockGroupHash is not initialized; progress update skipped"); return false; } - key = make_key(sap->key.dbid, sap->key.node_id, sap->key.remote_node_id); - e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_ENTER, &found); + /* Potential hash table change needs an exclusive lock */ + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); + + entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &sap->key, + HASH_ENTER, &found); - if (!found) /* New Entry */ + if (!found) { - /* Initialize key values; Other entries will be updated later */ - memset(&e->progress, 0, sizeof(e->progress)); - e->progress.key = e->key; + /* + * New entry: the hash table already copied sap->key into + * entry->progress.key, now initialize the remaining fields. + */ + init_progress_fields(&entry->progress); - pg_atomic_init_u32(&e->nattached, 0); - ConditionVariableInit(&e->prev_processed_cv); + pg_atomic_init_u32(&entry->nattached, 0); + ConditionVariableInit(&entry->prev_processed_cv); } - LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); - progress_update_struct(&e->progress, sap); + progress_update_struct(&entry->progress, sap); LWLockRelease(SpockCtx->apply_group_master_lock); - return true; + return found; } /* Fast update when you already hold the pointer (apply hot path) */ void -spock_group_progress_update_ptr(SpockGroupEntry *e, const SpockApplyProgress *sap) +spock_group_progress_update_ptr(SpockGroupEntry *e, + const SpockApplyProgress *sap) { Assert(e && sap); LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); @@ -367,7 +437,7 @@ apply_worker_get_progress(void) SpockGroupEntry * spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey key = make_key(dbid, node_id, remote_node_id); + SpockProgressKey key = make_key(dbid, node_id, remote_node_id); SpockGroupEntry *e; e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_FIND, NULL); @@ -398,12 +468,13 @@ spock_group_foreach(SpockGroupIterCB cb, void *arg) /* emit one record */ static void -dump_one_group_cb(const SpockGroupEntry *e, void *arg) +dump_one_group_cb(const SpockGroupEntry *entry, void *arg) { - DumpCtx *ctx = (DumpCtx *) arg; + DumpCtx *ctx = (DumpCtx *) arg; /* Only the progress payload goes to disk. It already contains the key. */ - write_buf(ctx->fd, &e->progress, sizeof(e->progress), SPOCK_RES_DUMPFILE "(data)"); + write_buf(ctx->fd, &entry->progress, sizeof(SpockApplyProgress), + SPOCK_RES_DUMPFILE "(data)"); ctx->count++; } @@ -419,19 +490,19 @@ dump_one_group_cb(const SpockGroupEntry *e, void *arg) void spock_group_resource_dump(void) { - char pathdir[MAXPGPATH]; - char pathtmp[MAXPGPATH]; - char pathfin[MAXPGPATH]; - int fd = -1; - - SpockResFileHeader hdr = {0}; - DumpCtx dctx = {0}; + char pathdir[MAXPGPATH]; + char pathtmp[MAXPGPATH]; + char pathfin[MAXPGPATH]; + int fd = -1; + SpockResFileHeader hdr = {0}; + DumpCtx dctx = {0}; /* * Safety check: if shared memory isn't initialized, we can't dump. This - * shouldn't happen if spock_checkpoint_hook() called spock_shmem_attach() - * first, but check anyway. + * shouldn't happen but check anyway. + * Do not tolerate it in development. */ + Assert(SpockCtx && SpockGroupHash); if (!SpockCtx || !SpockGroupHash) { elog(WARNING, "spock_group_resource_dump: shared memory not initialized, skipping dump"); @@ -502,14 +573,29 @@ spock_group_resource_dump(void) * version and system_identifier, then update each record via * spock_group_progress_update(). */ -void +static void spock_group_resource_load(void) { - char pathfin[MAXPGPATH]; - int fd; - SpockResFileHeader hdr; + char pathfin[MAXPGPATH]; + int fd; + SpockResFileHeader hdr; + + /* + * Check that we are actually inside shmem startup or recovery that + * guarantees we are alone. + */ + Assert(LWLockHeldByMe(AddinShmemInitLock)); + Assert(hash_get_num_entries(SpockGroupHash) == 0); - snprintf(pathfin, sizeof(pathfin), "%s/%s/%s", DataDir, SPOCK_RES_DIRNAME, SPOCK_RES_DUMPFILE); + snprintf(pathfin, sizeof(pathfin), "%s/%s/%s", + DataDir, SPOCK_RES_DIRNAME, SPOCK_RES_DUMPFILE); + + /* + * Locking note: We're called during shmem startup while holding + * AddinShmemInitLock, so no other process can access SpockGroupHash yet. + * The spock_group_progress_update() calls below will acquire + * apply_group_master_lock according to redo's convention. + */ fd = OpenTransientFile(pathfin, O_RDONLY | PG_BINARY); if (fd < 0) @@ -547,15 +633,26 @@ spock_group_resource_load(void) /* Read each record and upsert */ for (uint32 i = 0; i < hdr.entry_count; i++) { - SpockApplyProgress sap; + SpockApplyProgress rec; + bool ret; - read_buf(fd, &sap, sizeof(sap), SPOCK_RES_DUMPFILE "(data)"); + /* XXX: Do we need any kind of CRC here? */ + read_buf(fd, &rec, sizeof(SpockApplyProgress), + SPOCK_RES_DUMPFILE "(data)"); /* * Note: if ever version is changed in SpockApplyProgress and need * compatibility, it should be translated here. For now, 1:1. */ - (void) spock_group_progress_update(&sap); + ret = spock_group_progress_update(&rec); + /* + * Should never happen in real life, but be tolerant in production as + * much as possible. + */ + Assert(!ret); + if (ret) + elog(WARNING, "restoring the replication state (dbid=%u, node_id=%u, remote_node_id=%u) spock found a duplicate", + rec.key.dbid, rec.key.node_id, rec.key.remote_node_id); } CloseTransientFile(fd); diff --git a/src/spock_rmgr.c b/src/spock_rmgr.c index 0f035af2..281d5cc5 100644 --- a/src/spock_rmgr.c +++ b/src/spock_rmgr.c @@ -84,14 +84,17 @@ spock_rmgr_redo(XLogReaderState *record) { case SPOCK_RMGR_APPLY_PROGRESS: { - SpockApplyProgress *sap; + SpockApplyProgress *rec; - sap = (SpockApplyProgress *) XLogRecGetData(record); + rec = (SpockApplyProgress *) XLogRecGetData(record); - /* LWLockAcquire(SpockCtx->lock, LW_EXCLUSIVE); */ - - spock_group_progress_update(sap); - /* LWLockRelease(SpockCtx->lock); */ + /* + * During WAL replay, we must acquire locks when accessing + * shared hash tables (per commit c6d76d7 in PostgreSQL core). + * The spock_group_progress_update() function acquires + * apply_group_master_lock internally for us. + */ + (void) spock_group_progress_update(rec); } break; @@ -112,13 +115,17 @@ spock_rmgr_desc(StringInfo buf, XLogReaderState *record) { case SPOCK_RMGR_APPLY_PROGRESS: { - SpockApplyProgress *sap; - - sap = (SpockApplyProgress *) XLogRecGetData(record); - appendStringInfo(buf, "spock apply progress for db %u, node %u, remote_node %u", - sap->key.dbid, - sap->key.node_id, - sap->key.remote_node_id); + SpockApplyProgress *rec; + + rec = (SpockApplyProgress *) XLogRecGetData(record); + appendStringInfo(buf, "spock apply progress for dbid %u, node_id %u, remote_node_id %u; " + "remote_commit_lsn %X/%X, remote_insert_lsn %X/%X, received_lsn %X/%X", + rec->key.dbid, + rec->key.node_id, + rec->key.remote_node_id, + LSN_FORMAT_ARGS(rec->remote_commit_lsn), + LSN_FORMAT_ARGS(rec->remote_insert_lsn), + LSN_FORMAT_ARGS(rec->received_lsn)); } break; case SPOCK_RMGR_SUBTRANS_COMMIT_TS: @@ -180,7 +187,7 @@ spock_rmgr_cleanup(void) XLogRecPtr spock_apply_progress_add_to_wal(const SpockApplyProgress *sap) { - XLogRecPtr lsn; + XLogRecPtr lsn; Assert(sap != NULL); @@ -188,16 +195,13 @@ spock_apply_progress_add_to_wal(const SpockApplyProgress *sap) XLogRegisterData((char *) sap, sizeof(SpockApplyProgress)); lsn = XLogInsert(SPOCK_RMGR_ID, SPOCK_RMGR_APPLY_PROGRESS); - return lsn; -} + /* + * Force the WAL record to disk immediately. This ensures that progress + * is durably recorded before we update the in-memory state and continue + * processing. If we crash after updating memory but before the WAL + * flushes, we could lose progress tracking and replay would be incorrect. + */ + XLogFlush(lsn); -/* - * spock_group_emit_progress_wal_cb - * - * Foreach callback, emit a group's apply-progress to WAL. - */ -void -spock_group_emit_progress_wal_cb(const SpockGroupEntry *e, void *arg) -{ - spock_apply_progress_add_to_wal(&e->progress); + return lsn; } diff --git a/src/spock_sync.c b/src/spock_sync.c index e496a816..834aefd0 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -443,6 +443,11 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) char *last_updated_ts = NULL; char *updated_by_decode = PQgetvalue(originRes, rno, GP_UPDATED_BY_DECODE); + sap.key.dbid = MyDatabaseId; + sap.key.node_id = MySubscription->target->id; + sap.key.remote_node_id = atooid(remote_node_id); + Assert(OidIsValid(sap.key.remote_node_id)); + /* Check: we view only values related to a single database */ Assert(!PQgetisnull(originRes, rno, GP_DBOID)); if (dbid_str == NULL) @@ -452,9 +457,6 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) Assert(strcmp(dbid_str, PQgetvalue(originRes, rno, GP_DBOID)) == 0); } - sap.key.dbid = MyDatabaseId; - sap.key.node_id = MySubscription->target->id; - sap.key.remote_node_id = atooid(remote_node_id); if (!PQgetisnull(originRes, rno, GP_REMOTE_COMMIT_TS)) { remote_commit_ts = PQgetvalue(originRes, rno, GP_REMOTE_COMMIT_TS); @@ -490,8 +492,8 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("transaction apply time precedes its original commit time"), - errdetail("Commit time %s (node ID %d), but it was applied at %s on the replica (node ID %d)", - remote_commit_ts, sap.key.remote_node_id, + errdetail("Commit time %s (node ID %s), but it was applied at %s on the replica (node ID %d)", + remote_commit_ts, remote_node_id, last_updated_ts, MySubscription->origin->id), errhint("usually it means that the server's clocks are out of sync."))); @@ -499,7 +501,7 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) sap.updated_by_decode = updated_by_decode[0] == 't', /* Update progress */ - spock_group_progress_update(&sap); + spock_group_progress_update(&sap); elog(LOG, "SPOCK: adjust spock.progress %s->%d to " "remote_commit_ts='%s' " diff --git a/utils/pgindent/typedefs.list b/utils/pgindent/typedefs.list index d05b8a10..e3d4880d 100644 --- a/utils/pgindent/typedefs.list +++ b/utils/pgindent/typedefs.list @@ -4376,7 +4376,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4461,7 +4461,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4532,7 +4532,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4616,7 +4616,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog From 497ea5db13332b2a20c73dc06ba20dafbe586b02 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Mon, 29 Dec 2025 16:37:29 +0100 Subject: [PATCH 2/2] Remove unnecessary spock_shmem_attach routine Being loaded via shared_preload_libraries, Spock always initializes its shared memory segment and hash tables in the postmaster process during shmem_startup_hook. The shmem_startup_hook is called exactly once per postmaster lifetime while holding AddinShmemInitLock. There is no case where the postmaster does not call shmem_startup_hook() after allocating shared memory. Also, there is no case where the postmaster performs any operations requiring Spock's shared memory before initialization. During recovery, the evidence that shared memory is already initialized and operable is the fact that redo operations can acquire locks and access the hash tables. This commit removes the redundant spock_shmem_attach() routine and its calls from checkpoint hook, WAL redo startup, and bgworker initialization. This simplifies the code by eliminating the attach/detach complexity. Changes: 1. Removed spock_shmem_attach() function entirely - Startup process, checkpointer, and bgworkers no longer need to "attach" - All processes can directly access shared memory after postmaster init - Removed the static 'attached' flag tracking per-process attachment 2. Simplified spock_group_shmem_startup() - Removed 'found' parameter (always creating new structures) - Removed conditional file loading (always load on initialization) - The function is only called once via shmem_startup_hook, so checks for existing structures are unnecessary 3. Simplified spock_shmem_init_internal() - Removed 'found' return value and tracking - Removed conditional initialization (if (!SpockCtx), etc.) - ShmemInit* functions are safely called once under AddinShmemInitLock - Changed return type from bool to void 4. Code quality improvements - Updated comments to reflect simplified initialization model - Improved DEBUG log message clarity - Removed outdated references to 'all_found' parameter Benefits: - Eliminates ~70 lines of unnecessary attachment logic - Makes initialization flow clearer and easier to understand - Removes per-process attachment tracking overhead - Prevents potential bugs from missed spock_shmem_attach() calls The functional behavior is unchanged; all processes still have access to Spock's shared memory structures after postmaster startup. --- include/spock_group.h | 2 +- include/spock_worker.h | 1 - src/spock_group.c | 37 ++------- src/spock_output_plugin.c | 2 - src/spock_rmgr.c | 9 -- src/spock_worker.c | 170 ++++++++++---------------------------- 6 files changed, 50 insertions(+), 171 deletions(-) diff --git a/include/spock_group.h b/include/spock_group.h index 57633760..f6371d02 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -148,7 +148,7 @@ typedef struct SpockGroupEntry /* shmem setup */ extern void spock_group_shmem_request(void); -extern void spock_group_shmem_startup(int napply_groups, bool found); +extern void spock_group_shmem_startup(int napply_groups); SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id); void spock_group_detach(void); diff --git a/include/spock_worker.h b/include/spock_worker.h index 1df8fe0a..fe2506ec 100644 --- a/include/spock_worker.h +++ b/include/spock_worker.h @@ -159,7 +159,6 @@ extern void handle_sigterm(SIGNAL_ARGS); extern void spock_subscription_changed(Oid subid, bool kill); extern void spock_worker_shmem_init(void); -extern void spock_shmem_attach(void); extern int spock_worker_register(SpockWorker *worker); extern void spock_worker_attach(int slot, SpockWorkerType type); diff --git a/src/spock_group.c b/src/spock_group.c index 347a7f67..0955b43c 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -145,16 +145,13 @@ spock_group_shmem_request(void) * Initialize shared resources for db-origin management */ void -spock_group_shmem_startup(int napply_groups, bool found) +spock_group_shmem_startup(int napply_groups) { HASHCTL hctl; if (prev_shmem_startup_hook != NULL) prev_shmem_startup_hook(); - if (SpockGroupHash) - return; - MemSet(&hctl, 0, sizeof(hctl)); hctl.keysize = sizeof(SpockProgressKey); hctl.entrysize = sizeof(SpockGroupEntry); @@ -172,26 +169,11 @@ spock_group_shmem_startup(int napply_groups, bool found) if (!SpockGroupHash) elog(ERROR, "spock_group_shmem_startup: failed to init group map"); - /* - * If the shared memory structures already existed (found = true), then - * we're a background process attaching to structures created by the - * postmaster. The hash was already seeded from the file during postmaster - * startup, so skip loading. - * - * If found = false, we're the postmaster doing initial setup. Load the - * file to quickly seed the hash, then WAL recovery will run afterward and - * provide authoritative updates. - * - * Note: ShmemInitHash() doesn't have a 'found' output parameter like - * ShmemInitStruct(), so we rely on the 'found' status of other Spock - * structures (SpockCtx, etc.) as a proxy since they're all created - * together. - */ - if (found) - return; - - elog(DEBUG1, "spock_group_shmem_startup: loading resource file to seed hash"); spock_group_resource_load(); + + elog(DEBUG1, + "spock_group_shmem_startup: hash initialized with %lu entries from resource file", + hash_get_num_entries(SpockGroupHash)); } /* @@ -664,15 +646,6 @@ spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags) if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY)) == 0) return; - /* - * Ensure we're attached to shared memory before accessing it. - * - * spock_shmem_attach() is idempotent - it tracks attachment per process - * and only does the actual work once, so it's safe to call on every - * qualifying checkpoint. - */ - spock_shmem_attach(); - /* Dump group progress to resource.dat */ spock_group_resource_dump(); } diff --git a/src/spock_output_plugin.c b/src/spock_output_plugin.c index 68e8526d..65c593f7 100644 --- a/src/spock_output_plugin.c +++ b/src/spock_output_plugin.c @@ -1403,8 +1403,6 @@ spock_output_plugin_shmem_startup(void) /* Get the shared resources */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - SpockCtx->slot_group_master_lock = &((GetNamedLWLockTranche("spock_slot_groups")[0]).lock); - SpockCtx->slot_ngroups = nworkers; slot_groups = ShmemInitStruct("spock_slot_groups", spock_output_plugin_shmem_size(nworkers), &found); diff --git a/src/spock_rmgr.c b/src/spock_rmgr.c index 281d5cc5..156ad7f3 100644 --- a/src/spock_rmgr.c +++ b/src/spock_rmgr.c @@ -156,15 +156,6 @@ spock_rmgr_identify(uint8 info) void spock_rmgr_startup(void) { - /* - * During WAL recovery in the startup process, we need to attach to the - * shared memory structure (SpockGroupHash) that was created by the - * postmaster's shmem_startup_hook. - * - * spock_shmem_attach() handles resetting inherited globals and properly - * re-attaching to shared memory before WAL replay begins. - */ - spock_shmem_attach(); } void diff --git a/src/spock_worker.c b/src/spock_worker.c index fc8857ac..2415c865 100644 --- a/src/spock_worker.c +++ b/src/spock_worker.c @@ -79,7 +79,7 @@ static void wait_for_worker_startup(SpockWorker *worker, BackgroundWorkerHandle *handle); static void signal_worker_xact_callback(XactEvent event, void *arg); static uint32 spock_ch_stats_hash(const void *key, Size keysize); -static bool spock_shmem_init_internal(int nworkers); +static void spock_shmem_init_internal(int nworkers); void @@ -344,15 +344,6 @@ spock_worker_on_exit(int code, Datum arg) void spock_worker_attach(int slot, SpockWorkerType type) { - /* - * Ensure shared memory is attached before using SpockCtx. - * - * Background workers inherit global variables from the postmaster but the - * pointers may not be valid. Always call spock_shmem_attach() which is - * idempotent and handles proper re-initialization. - */ - spock_shmem_attach(); - Assert(slot >= 0); Assert(slot < SpockCtx->total_workers); @@ -838,9 +829,18 @@ spock_worker_shmem_startup(void) * This is kludge for Windows (Postgres does not define the GUC variable * as PGDLLIMPORT) */ - nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, - false)); + nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, false)); + + /* + * Reset in case this is a restart within the postmaster + * Spock extension must be loaded on startup. In case of a fatal error and + * further restart, postmaster clean up shared memory but local variables + * stay the same. So, we need to clean outdated pointers in advance. + */ SpockCtx = NULL; + SpockHash = NULL; + SpockGroupHash = NULL; + exception_log_ptr = NULL; /* Avoid possible race-conditions when initializing shared memory. */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); @@ -850,7 +850,7 @@ spock_worker_shmem_startup(void) * This internally calls spock_group_shmem_startup() to handle group * initialization and file loading. */ - (void) spock_shmem_init_internal(nworkers); + spock_shmem_init_internal(nworkers); LWLockRelease(AddinShmemInitLock); } @@ -878,140 +878,58 @@ spock_ch_stats_hash(const void *key, Size keysize) * This centralizes the logic for setting up all Spock shared memory pointers, * which is used by both the initial startup hook and the attach function. */ -static bool +static void spock_shmem_init_internal(int nworkers) { HASHCTL hctl; bool found; - bool all_found = true; - /* Init signaling context for the various processes. */ - if (!SpockCtx) + SpockCtx = ShmemInitStruct("spock_context", + worker_shmem_size(nworkers, false), &found); + if (!found) { - SpockCtx = ShmemInitStruct("spock_context", - worker_shmem_size(nworkers, false), &found); - if (!SpockCtx) - elog(ERROR, "failed to initialize spock_context"); - - if (!found) - { - /* First time - initialize the structure */ - SpockCtx->lock = &((GetNamedLWLockTranche("spock")[0]).lock); - SpockCtx->apply_group_master_lock = &((GetNamedLWLockTranche(SPOCK_GROUP_TRANCHE_NAME)[0]).lock); - SpockCtx->supervisor = NULL; - SpockCtx->subscriptions_changed = false; - SpockCtx->total_workers = nworkers; - memset(SpockCtx->workers, 0, - sizeof(SpockWorker) * SpockCtx->total_workers); - all_found = false; - } + /* First time - initialize the structure */ + SpockCtx->lock = &((GetNamedLWLockTranche("spock")[0]).lock); + SpockCtx->apply_group_master_lock = &((GetNamedLWLockTranche(SPOCK_GROUP_TRANCHE_NAME)[0]).lock); + SpockCtx->slot_group_master_lock = &((GetNamedLWLockTranche("spock_slot_groups")[0]).lock); + SpockCtx->slot_ngroups = nworkers; + SpockCtx->supervisor = NULL; + SpockCtx->subscriptions_changed = false; + SpockCtx->total_workers = nworkers; + memset(SpockCtx->workers, 0, + sizeof(SpockWorker) * SpockCtx->total_workers); } /* * Initialize exception log pointer array. */ - if (!exception_log_ptr) - { - exception_log_ptr = ShmemInitStruct("spock_exception_log_ptr", - worker_shmem_size(nworkers, false), &found); - if (!exception_log_ptr) - elog(ERROR, "failed to initialize spock_exception_log_ptr"); - - if (!found) - { - memset(exception_log_ptr, 0, sizeof(SpockExceptionLog) * nworkers); - all_found = false; - } - } + exception_log_ptr = ShmemInitStruct("spock_exception_log_ptr", + worker_shmem_size(nworkers, false), &found); - /* - * Initialize SpockHash - channel stats hash. - */ - if (!SpockHash) + if (!found) { - memset(&hctl, 0, sizeof(hctl)); - hctl.keysize = sizeof(spockStatsKey); - hctl.entrysize = sizeof(spockStatsEntry); - hctl.hash = spock_ch_stats_hash; - SpockHash = ShmemInitHash("spock channel stats hash", - spock_stats_max_entries, - spock_stats_max_entries, - &hctl, - HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); - if (!SpockHash) - elog(ERROR, "failed to initialize spock channel stats hash"); + memset(exception_log_ptr, 0, sizeof(SpockExceptionLog) * nworkers); } /* - * Initialize SpockGroupHash via the spock_group module. This handles both - * creation/attachment and file loading if needed. - * - * Note: We pass 'all_found' to indicate whether this is first-time setup - * or attachment to existing structures. - */ - spock_group_shmem_startup(nworkers, all_found); - - return all_found; -} - -/* - * spock_shmem_attach - * - * Attach (or re-attach) to existing shared memory structures. - * - * This is called from processes that need to access Spock's shared memory: - * - Startup process (via spock_rmgr_startup) - once per recovery - * - Checkpointer (via spock_checkpoint_hook) - once per process lifetime - * - Background workers (via spock_worker_attach) - once per worker - * - * IMPORTANT: Auxiliary processes (startup, checkpointer) inherit global - * variable values from the postmaster, but these pointers might not be valid - * in their address space. - * - * This function uses a static flag to ensure we only attach once per process, - * avoiding redundant shared memory lookups on subsequent calls. - * - * Unlike spock_worker_shmem_startup(), this doesn't acquire AddinShmemInitLock - * or register hooks - it just looks up and attaches to existing structures. - */ -void -spock_shmem_attach(void) -{ - static bool attached = false; - int nworkers; - - /* If already attached in this process, nothing to do */ - if (attached) - return; - - /* - * Reset globals to NULL to force proper attachment. - * - * This is critical for auxiliary processes (startup, checkpointer) that - * inherit invalid global values from the postmaster. - */ - SpockCtx = NULL; - SpockHash = NULL; - SpockGroupHash = NULL; - exception_log_ptr = NULL; - - /* - * Get max_worker_processes to know the size of structures. + * Initialize SpockHash - channel stats hash. */ - nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, false)); - if (nworkers <= 0) - nworkers = 9; + memset(&hctl, 0, sizeof(hctl)); + hctl.keysize = sizeof(spockStatsKey); + hctl.entrysize = sizeof(spockStatsEntry); + hctl.hash = spock_ch_stats_hash; + SpockHash = ShmemInitHash("spock channel stats hash", + spock_stats_max_entries, + spock_stats_max_entries, + &hctl, + HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); /* - * Attach to all structures using the common initialization logic. Returns - * true if structures were found (normal case), false if they had to be - * created (shouldn't happen but harmless). + * Initialize SpockGroupHash via the spock_group module. + * This creates the hash table and loads the resource file to seed it. */ - (void) spock_shmem_init_internal(nworkers); - - /* Mark as attached to avoid redundant work on subsequent calls */ - attached = true; + spock_group_shmem_startup(nworkers); } /*