diff --git a/include/spock_group.h b/include/spock_group.h index 86a55be1..f6371d02 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -56,12 +56,12 @@ typedef struct DumpCtx /* Hash Key */ -typedef struct SpockGroupKey +typedef struct SpockProgressKey { Oid dbid; Oid node_id; Oid remote_node_id; -} SpockGroupKey; +} SpockProgressKey; /* * Columns for the UI routine get_apply_group_progress. @@ -84,68 +84,77 @@ typedef enum } GroupProgressTupDescColumns; /* - * Logical Replication Progress has made by a group of apply workers. + * Logical Replication Progress made by a group of apply workers. * + * IMPORTANT: The 'key' field must remain the first member of this structure + * because the hash table uses SpockProgressKey as its hash key type. The + * dynahash code accesses the key portion directly, so changing this layout + * would break the hash table operations. + * + * Field descriptions: + * key - Identifies the replication group (dbid, node_id, remote_node_id). + * MUST be first field for hash table compatibility. * remote_commit_ts - the most advanced timestamp of COMMIT commands, already - * applied by the replication group. In fact, an apply worker may finish - * the COMMIT apply if only all other commits with smaller timestamps have - * already been committed by other workers. So, this value tells us about - * the real progress. + * applied by the replication group. In fact, an apply worker may finish + * the COMMIT apply if only all other commits with smaller timestamps have + * already been committed by other workers. So, this value tells us about + * the real progress. * prev_remote_ts - XXX: It seems do nothing at the moment and should - * be considered to be removed. + * be considered to be removed. * remote_commit_lsn - LSN of the COMMIT corresponding to the remote_commit_ts. * remote_insert_lsn - an LSN of the most advanced WAL record written to - * the WAL on the remote side. Replication protocol attempts to update it as - * frequently as possible, but it still be a little stale. + * the WAL on the remote side. Replication protocol attempts to update it as + * frequently as possible, but it still be a little stale. * received_lsn - an LSN of the most advanced WAL record that was received by - * the group. + * the group. * last_updated_ts - timestamp when remote COMMIT command (identified by the - * remote_commit_ts and remote_commit_lsn) was applied locally. - * Spock employs this value to calculate replication_lag. + * remote_commit_ts and remote_commit_lsn) was applied locally. + * Spock employs this value to calculate replication_lag. * updated_by_decode - obsolete value. It was needed to decide on the LR lag - * that seems not needed if we have NULL value for a timestamp column. + * that seems not needed if we have NULL value for a timestamp column. */ typedef struct SpockApplyProgress { - SpockGroupKey key; /* common elements */ - TimestampTz remote_commit_ts; /* committed remote txn ts */ + SpockProgressKey key; /* MUST be first field */ + + TimestampTz remote_commit_ts; /* committed remote txn ts */ /* * Bit of duplication of remote_commit_ts. Serves the same purpose, except * keep the last updated value */ - TimestampTz prev_remote_ts; - XLogRecPtr remote_commit_lsn; /* LSN of remote commit on origin */ - XLogRecPtr remote_insert_lsn; /* origin insert/end LSN reported */ + TimestampTz prev_remote_ts; + XLogRecPtr remote_commit_lsn; /* LSN of remote commit on origin */ + XLogRecPtr remote_insert_lsn; /* origin insert/end LSN reported */ /* * The largest received LSN by the group. It is more or equal to the * remote_commit_lsn. */ - XLogRecPtr received_lsn; + XLogRecPtr received_lsn; - TimestampTz last_updated_ts; /* when we set this */ - bool updated_by_decode; /* set by decode or apply. OBSOLETE. Used + TimestampTz last_updated_ts; /* when we set this */ + bool updated_by_decode; /* set by decode or apply. OBSOLETE. Used * in versions <=5.x.x only */ } SpockApplyProgress; /* Hash entry: one per group (stable pointer; not moved by dynahash) */ typedef struct SpockGroupEntry { - SpockGroupKey key; /* hash key */ - SpockApplyProgress progress; - pg_atomic_uint32 nattached; - ConditionVariable prev_processed_cv; + SpockApplyProgress progress; + pg_atomic_uint32 nattached; + ConditionVariable prev_processed_cv; } SpockGroupEntry; /* shmem setup */ extern void spock_group_shmem_request(void); -extern void spock_group_shmem_startup(int napply_groups, bool found); +extern void spock_group_shmem_startup(int napply_groups); SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id); void spock_group_detach(void); -bool spock_group_progress_update(const SpockApplyProgress *sap); -void spock_group_progress_update_ptr(SpockGroupEntry *e, const SpockApplyProgress *sap); +extern bool spock_group_progress_update(const SpockApplyProgress *sap); +extern void spock_group_progress_update_ptr(SpockGroupEntry *entry, + const SpockApplyProgress *sap); SpockApplyProgress *apply_worker_get_progress(void); SpockGroupEntry *spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id); @@ -154,7 +163,6 @@ typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg); void spock_group_foreach(SpockGroupIterCB cb, void *arg); extern void spock_group_resource_dump(void); -extern void spock_group_resource_load(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); #endif /* SPOCK_GROUP_H */ diff --git a/include/spock_rmgr.h b/include/spock_rmgr.h index 55341f79..baa11d62 100644 --- a/include/spock_rmgr.h +++ b/include/spock_rmgr.h @@ -47,6 +47,5 @@ extern void spock_rmgr_cleanup(void); /* WAL helpers */ extern XLogRecPtr spock_apply_progress_add_to_wal(const SpockApplyProgress *sap); -extern void spock_group_emit_progress_wal_cb(const SpockGroupEntry *e, void *arg); #endif /* SPOCK_RMGR_H */ diff --git a/include/spock_worker.h b/include/spock_worker.h index 1df8fe0a..fe2506ec 100644 --- a/include/spock_worker.h +++ b/include/spock_worker.h @@ -159,7 +159,6 @@ extern void handle_sigterm(SIGNAL_ARGS); extern void spock_subscription_changed(Oid subid, bool kill); extern void spock_worker_shmem_init(void); -extern void spock_shmem_attach(void); extern int spock_worker_register(SpockWorker *worker); extern void spock_worker_attach(int slot, SpockWorkerType type); diff --git a/src/spock_apply.c b/src/spock_apply.c index fb8b5d08..00588e28 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -2703,36 +2703,21 @@ send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force) return true; } -/* - * Static value to track local progress. - * - * Follow the CommitTsShmemInit() code as an example how to initialize such data - * types in shared memory. Keep the order according to the SpockApplyProgress - * declaration. - * - * NOTE: we initialize empty timestamp with 0 because of multiple places where - * semantics assumes empty timestamp is 0. - */ -static SpockApplyProgress apply_progress = -{ - .remote_commit_ts = 0, - .prev_remote_ts = 0, - .remote_commit_lsn = InvalidXLogRecPtr, - .remote_insert_lsn = InvalidXLogRecPtr, - .received_lsn = InvalidXLogRecPtr, - .last_updated_ts = 0, - .updated_by_decode = false -}; - /* * Update frequently changing statistics of the apply group */ static void UpdateWorkerStats(XLogRecPtr last_received, XLogRecPtr last_inserted) { - apply_progress.received_lsn = last_received; - apply_progress.remote_insert_lsn = last_inserted; - spock_group_progress_update_ptr(MyApplyWorker->apply_group, &apply_progress); + SpockApplyProgress sap = {0}; + + sap.key.dbid = MyDatabaseId; + sap.key.node_id = MySubscription->target->id; + sap.key.remote_node_id = MySubscription->origin->id; + + sap.received_lsn = last_received; + sap.remote_insert_lsn = last_inserted; + spock_group_progress_update_ptr(MyApplyWorker->apply_group, &sap); } /* @@ -2768,12 +2753,6 @@ apply_work(PGconn *streamConn) MemoryContextSwitchTo(MessageContext); - /* Initialize our static progress variable */ - memset(&apply_progress.key, 0, sizeof(SpockGroupKey)); - apply_progress.key.dbid = MyDatabaseId; - apply_progress.key.node_id = MySubscription->target->id; - apply_progress.key.remote_node_id = MySubscription->origin->id; - /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); diff --git a/src/spock_functions.c b/src/spock_functions.c index 34724ab5..55332365 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -3477,35 +3477,36 @@ get_apply_group_progress(PG_FUNCTION_ARGS) hash_seq_init(&it, SpockGroupHash); while ((e = (SpockGroupEntry *) hash_seq_search(&it)) != NULL) { - Datum values[_GP_LAST_]; - bool nulls[_GP_LAST_] = {0}; + SpockApplyProgress *sap = &e->progress; + Datum values[_GP_LAST_]; + bool nulls[_GP_LAST_] = {0}; /* * Centralise conversion of local representation of the progress data * to an external representation. This is a good place to check * correctness of each value (valid oid and timestamps). */ - Assert(OidIsValid(e->key.dbid) && OidIsValid(e->key.node_id) && - OidIsValid(e->key.remote_node_id)); + Assert(OidIsValid(sap->key.dbid) && OidIsValid(sap->key.node_id) && + OidIsValid(sap->key.remote_node_id)); - values[GP_DBOID] = ObjectIdGetDatum(e->progress.key.dbid); - values[GP_NODE_ID] = ObjectIdGetDatum(e->progress.key.node_id); - values[GP_REMOTE_NODE_ID] = ObjectIdGetDatum(e->progress.key.remote_node_id); + values[GP_DBOID] = ObjectIdGetDatum(sap->key.dbid); + values[GP_NODE_ID] = ObjectIdGetDatum(sap->key.node_id); + values[GP_REMOTE_NODE_ID] = ObjectIdGetDatum(sap->key.remote_node_id); - if (e->progress.remote_commit_ts != 0) + if (sap->remote_commit_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.remote_commit_ts)); + Assert(IS_VALID_TIMESTAMP(sap->remote_commit_ts)); values[GP_REMOTE_COMMIT_TS] = - TimestampTzGetDatum(e->progress.remote_commit_ts); + TimestampTzGetDatum(sap->remote_commit_ts); } else nulls[GP_REMOTE_COMMIT_TS] = true; - if (e->progress.prev_remote_ts != 0) + if (sap->prev_remote_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.prev_remote_ts)); + Assert(IS_VALID_TIMESTAMP(sap->prev_remote_ts)); values[GP_PREV_REMOTE_TS] = - TimestampTzGetDatum(e->progress.prev_remote_ts); + TimestampTzGetDatum(sap->prev_remote_ts); } else nulls[GP_PREV_REMOTE_TS] = true; @@ -3515,20 +3516,20 @@ get_apply_group_progress(PG_FUNCTION_ARGS) * and the current LSN. Moreover, LSN=0 physically makes sense. So, * don't introduce NULL value for these LSN fields. */ - values[GP_REMOTE_COMMIT_LSN] = LSNGetDatum(e->progress.remote_commit_lsn); - values[GP_REMOTE_INSERT_LSN] = LSNGetDatum(e->progress.remote_insert_lsn); - values[GP_RECEIVED_LSN] = LSNGetDatum(e->progress.received_lsn); + values[GP_REMOTE_COMMIT_LSN] = LSNGetDatum(sap->remote_commit_lsn); + values[GP_REMOTE_INSERT_LSN] = LSNGetDatum(sap->remote_insert_lsn); + values[GP_RECEIVED_LSN] = LSNGetDatum(sap->received_lsn); - if (e->progress.last_updated_ts != 0) + if (sap->last_updated_ts != 0) { - Assert(IS_VALID_TIMESTAMP(e->progress.last_updated_ts)); + Assert(IS_VALID_TIMESTAMP(sap->last_updated_ts)); values[GP_LAST_UPDATED_TS] = - TimestampTzGetDatum(e->progress.last_updated_ts); + TimestampTzGetDatum(sap->last_updated_ts); } else nulls[GP_LAST_UPDATED_TS] = true; - values[GP_UPDATED_BY_DECODE] = BoolGetDatum(e->progress.updated_by_decode); + values[GP_UPDATED_BY_DECODE] = BoolGetDatum(sap->updated_by_decode); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } diff --git a/src/spock_group.c b/src/spock_group.c index 41ae2fc0..0955b43c 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -63,6 +63,40 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; HTAB *SpockGroupHash = NULL; +static void spock_group_resource_load(void); + +/* + * Initialize a SpockApplyProgress structure. + * + * The key must already be set (it's the hash key). This function zeros out + * all other fields following PostgreSQL conventions for shared memory + * initialization (see CommitTsShmemInit for similar pattern). + * + * We use C99 designated initializers to be explicit about what we're + * initializing and to avoid fragile pointer arithmetic. + */ +static inline void +init_progress_fields(SpockApplyProgress *progress) +{ + /* Key should already be set by hash_search or caller */ + Assert(OidIsValid(progress->key.dbid)); + Assert(OidIsValid(progress->key.node_id)); + Assert(OidIsValid(progress->key.remote_node_id)); + + /* + * Initialize all non-key fields to zero/invalid values. + * Using 0 for timestamps follows PostgreSQL convention where + * timestamp 0 represents "not set" (see CommitTsShmemInit). + */ + progress->remote_commit_ts = 0; + progress->prev_remote_ts = 0; + progress->remote_commit_lsn = InvalidXLogRecPtr; + progress->remote_insert_lsn = InvalidXLogRecPtr; + progress->received_lsn = InvalidXLogRecPtr; + progress->last_updated_ts = 0; + progress->updated_by_decode = false; +} + /* * spock_group_shmem_request * @@ -111,18 +145,15 @@ spock_group_shmem_request(void) * Initialize shared resources for db-origin management */ void -spock_group_shmem_startup(int napply_groups, bool found) +spock_group_shmem_startup(int napply_groups) { HASHCTL hctl; if (prev_shmem_startup_hook != NULL) prev_shmem_startup_hook(); - if (SpockGroupHash) - return; - MemSet(&hctl, 0, sizeof(hctl)); - hctl.keysize = sizeof(SpockGroupKey); + hctl.keysize = sizeof(SpockProgressKey); hctl.entrysize = sizeof(SpockGroupEntry); hctl.hash = tag_hash; hctl.num_partitions = 16; @@ -138,32 +169,29 @@ spock_group_shmem_startup(int napply_groups, bool found) if (!SpockGroupHash) elog(ERROR, "spock_group_shmem_startup: failed to init group map"); - /* - * If the shared memory structures already existed (found = true), then - * we're a background process attaching to structures created by the - * postmaster. The hash was already seeded from the file during postmaster - * startup, so skip loading. - * - * If found = false, we're the postmaster doing initial setup. Load the - * file to quickly seed the hash, then WAL recovery will run afterward and - * provide authoritative updates. - * - * Note: ShmemInitHash() doesn't have a 'found' output parameter like - * ShmemInitStruct(), so we rely on the 'found' status of other Spock - * structures (SpockCtx, etc.) as a proxy since they're all created - * together. - */ - if (found) - return; - - elog(DEBUG1, "spock_group_shmem_startup: loading resource file to seed hash"); spock_group_resource_load(); + + elog(DEBUG1, + "spock_group_shmem_startup: hash initialized with %lu entries from resource file", + hash_get_num_entries(SpockGroupHash)); } -static inline SpockGroupKey +/* + * make_key + * + * Construct a SpockProgressKey from component OIDs. + * + * TODO: Implement custom hash and comparison functions for SpockProgressKey + * to avoid undefined behavior from comparing padding bytes. Currently we + * zero the entire struct to ensure reproducible comparisons, but the C + * standard doesn't guarantee padding byte values are preserved during struct + * assignment. A proper fix would be to provide hash_func and match_func + * callbacks to hash_create() that only compare the actual OID fields. + */ +static inline SpockProgressKey make_key(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey k; + SpockProgressKey k; memset(&k, 0, sizeof(k)); k.dbid = dbid; @@ -183,24 +211,27 @@ make_key(Oid dbid, Oid node_id, Oid remote_node_id) SpockGroupEntry * spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey key = make_key(dbid, node_id, remote_node_id); - SpockGroupEntry *e; + SpockProgressKey key = make_key(dbid, node_id, remote_node_id); + SpockGroupEntry *entry; bool found; - e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_ENTER, &found); + entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, + HASH_ENTER, &found); if (!found) { - /* initialize key values; Other entries will be updated later */ - memset(&e->progress, 0, sizeof(e->progress)); - e->progress.key = e->key; + /* + * New entry: the hash table already copied 'key' into + * entry->progress.key, now initialize the remaining progress fields. + */ + init_progress_fields(&entry->progress); - pg_atomic_init_u32(&e->nattached, 0); - ConditionVariableInit(&e->prev_processed_cv); + pg_atomic_init_u32(&entry->nattached, 0); + ConditionVariableInit(&entry->prev_processed_cv); } - pg_atomic_add_fetch_u32(&e->nattached, 1); + pg_atomic_add_fetch_u32(&entry->nattached, 1); - return e; + return entry; } /* @@ -287,45 +318,66 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src) * Update the progress snapshot for (dbid,node_id,remote_node_id). * Uses hash_search(HASH_ENTER) for table access, then copies 'sap' into the * entry's progress payload under the gate lock (writers EXCLUSIVE). + * + * Returns: true if the record already existed (updated), false if newly inserted. */ bool spock_group_progress_update(const SpockApplyProgress *sap) { - SpockGroupKey key; - SpockGroupEntry *e; - bool found; + SpockGroupEntry *entry; + bool found; - if (!sap) - return false; + Assert(OidIsValid(sap->key.dbid) && OidIsValid(sap->key.node_id) && + OidIsValid(sap->key.remote_node_id)); + Assert(sap != NULL); if (!SpockGroupHash || !SpockCtx) { - elog(WARNING, "spock_group_progress_update: SpockGroupHash not initialized"); + /* + * This should never happen in normal operation. The shared memory + * structures are initialized during postmaster startup via + * shmem_startup_hook. If we hit this, it likely indicates: + * 1. A bug in initialization ordering, or + * 2. Corruption of shared memory pointers + * + * We return false to allow callers to continue (best-effort recovery), + * but this progress update is lost. + * + * TODO: Add a test case that deliberately calls this function before + * shared memory initialization (e.g., from a backend that loads spock + * extension late) to verify the warning fires and doesn't crash. + */ + elog(WARNING, "SpockGroupHash is not initialized; progress update skipped"); return false; } - key = make_key(sap->key.dbid, sap->key.node_id, sap->key.remote_node_id); - e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_ENTER, &found); + /* Potential hash table change needs an exclusive lock */ + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); + + entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &sap->key, + HASH_ENTER, &found); - if (!found) /* New Entry */ + if (!found) { - /* Initialize key values; Other entries will be updated later */ - memset(&e->progress, 0, sizeof(e->progress)); - e->progress.key = e->key; + /* + * New entry: the hash table already copied sap->key into + * entry->progress.key, now initialize the remaining fields. + */ + init_progress_fields(&entry->progress); - pg_atomic_init_u32(&e->nattached, 0); - ConditionVariableInit(&e->prev_processed_cv); + pg_atomic_init_u32(&entry->nattached, 0); + ConditionVariableInit(&entry->prev_processed_cv); } - LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); - progress_update_struct(&e->progress, sap); + progress_update_struct(&entry->progress, sap); LWLockRelease(SpockCtx->apply_group_master_lock); - return true; + return found; } /* Fast update when you already hold the pointer (apply hot path) */ void -spock_group_progress_update_ptr(SpockGroupEntry *e, const SpockApplyProgress *sap) +spock_group_progress_update_ptr(SpockGroupEntry *e, + const SpockApplyProgress *sap) { Assert(e && sap); LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); @@ -367,7 +419,7 @@ apply_worker_get_progress(void) SpockGroupEntry * spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id) { - SpockGroupKey key = make_key(dbid, node_id, remote_node_id); + SpockProgressKey key = make_key(dbid, node_id, remote_node_id); SpockGroupEntry *e; e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_FIND, NULL); @@ -398,12 +450,13 @@ spock_group_foreach(SpockGroupIterCB cb, void *arg) /* emit one record */ static void -dump_one_group_cb(const SpockGroupEntry *e, void *arg) +dump_one_group_cb(const SpockGroupEntry *entry, void *arg) { - DumpCtx *ctx = (DumpCtx *) arg; + DumpCtx *ctx = (DumpCtx *) arg; /* Only the progress payload goes to disk. It already contains the key. */ - write_buf(ctx->fd, &e->progress, sizeof(e->progress), SPOCK_RES_DUMPFILE "(data)"); + write_buf(ctx->fd, &entry->progress, sizeof(SpockApplyProgress), + SPOCK_RES_DUMPFILE "(data)"); ctx->count++; } @@ -419,19 +472,19 @@ dump_one_group_cb(const SpockGroupEntry *e, void *arg) void spock_group_resource_dump(void) { - char pathdir[MAXPGPATH]; - char pathtmp[MAXPGPATH]; - char pathfin[MAXPGPATH]; - int fd = -1; - - SpockResFileHeader hdr = {0}; - DumpCtx dctx = {0}; + char pathdir[MAXPGPATH]; + char pathtmp[MAXPGPATH]; + char pathfin[MAXPGPATH]; + int fd = -1; + SpockResFileHeader hdr = {0}; + DumpCtx dctx = {0}; /* * Safety check: if shared memory isn't initialized, we can't dump. This - * shouldn't happen if spock_checkpoint_hook() called spock_shmem_attach() - * first, but check anyway. + * shouldn't happen but check anyway. + * Do not tolerate it in development. */ + Assert(SpockCtx && SpockGroupHash); if (!SpockCtx || !SpockGroupHash) { elog(WARNING, "spock_group_resource_dump: shared memory not initialized, skipping dump"); @@ -502,14 +555,29 @@ spock_group_resource_dump(void) * version and system_identifier, then update each record via * spock_group_progress_update(). */ -void +static void spock_group_resource_load(void) { - char pathfin[MAXPGPATH]; - int fd; - SpockResFileHeader hdr; + char pathfin[MAXPGPATH]; + int fd; + SpockResFileHeader hdr; - snprintf(pathfin, sizeof(pathfin), "%s/%s/%s", DataDir, SPOCK_RES_DIRNAME, SPOCK_RES_DUMPFILE); + /* + * Check that we are actually inside shmem startup or recovery that + * guarantees we are alone. + */ + Assert(LWLockHeldByMe(AddinShmemInitLock)); + Assert(hash_get_num_entries(SpockGroupHash) == 0); + + snprintf(pathfin, sizeof(pathfin), "%s/%s/%s", + DataDir, SPOCK_RES_DIRNAME, SPOCK_RES_DUMPFILE); + + /* + * Locking note: We're called during shmem startup while holding + * AddinShmemInitLock, so no other process can access SpockGroupHash yet. + * The spock_group_progress_update() calls below will acquire + * apply_group_master_lock according to redo's convention. + */ fd = OpenTransientFile(pathfin, O_RDONLY | PG_BINARY); if (fd < 0) @@ -547,15 +615,26 @@ spock_group_resource_load(void) /* Read each record and upsert */ for (uint32 i = 0; i < hdr.entry_count; i++) { - SpockApplyProgress sap; + SpockApplyProgress rec; + bool ret; - read_buf(fd, &sap, sizeof(sap), SPOCK_RES_DUMPFILE "(data)"); + /* XXX: Do we need any kind of CRC here? */ + read_buf(fd, &rec, sizeof(SpockApplyProgress), + SPOCK_RES_DUMPFILE "(data)"); /* * Note: if ever version is changed in SpockApplyProgress and need * compatibility, it should be translated here. For now, 1:1. */ - (void) spock_group_progress_update(&sap); + ret = spock_group_progress_update(&rec); + /* + * Should never happen in real life, but be tolerant in production as + * much as possible. + */ + Assert(!ret); + if (ret) + elog(WARNING, "restoring the replication state (dbid=%u, node_id=%u, remote_node_id=%u) spock found a duplicate", + rec.key.dbid, rec.key.node_id, rec.key.remote_node_id); } CloseTransientFile(fd); @@ -567,15 +646,6 @@ spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags) if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY)) == 0) return; - /* - * Ensure we're attached to shared memory before accessing it. - * - * spock_shmem_attach() is idempotent - it tracks attachment per process - * and only does the actual work once, so it's safe to call on every - * qualifying checkpoint. - */ - spock_shmem_attach(); - /* Dump group progress to resource.dat */ spock_group_resource_dump(); } diff --git a/src/spock_output_plugin.c b/src/spock_output_plugin.c index 68e8526d..65c593f7 100644 --- a/src/spock_output_plugin.c +++ b/src/spock_output_plugin.c @@ -1403,8 +1403,6 @@ spock_output_plugin_shmem_startup(void) /* Get the shared resources */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - SpockCtx->slot_group_master_lock = &((GetNamedLWLockTranche("spock_slot_groups")[0]).lock); - SpockCtx->slot_ngroups = nworkers; slot_groups = ShmemInitStruct("spock_slot_groups", spock_output_plugin_shmem_size(nworkers), &found); diff --git a/src/spock_rmgr.c b/src/spock_rmgr.c index 0f035af2..156ad7f3 100644 --- a/src/spock_rmgr.c +++ b/src/spock_rmgr.c @@ -84,14 +84,17 @@ spock_rmgr_redo(XLogReaderState *record) { case SPOCK_RMGR_APPLY_PROGRESS: { - SpockApplyProgress *sap; + SpockApplyProgress *rec; - sap = (SpockApplyProgress *) XLogRecGetData(record); + rec = (SpockApplyProgress *) XLogRecGetData(record); - /* LWLockAcquire(SpockCtx->lock, LW_EXCLUSIVE); */ - - spock_group_progress_update(sap); - /* LWLockRelease(SpockCtx->lock); */ + /* + * During WAL replay, we must acquire locks when accessing + * shared hash tables (per commit c6d76d7 in PostgreSQL core). + * The spock_group_progress_update() function acquires + * apply_group_master_lock internally for us. + */ + (void) spock_group_progress_update(rec); } break; @@ -112,13 +115,17 @@ spock_rmgr_desc(StringInfo buf, XLogReaderState *record) { case SPOCK_RMGR_APPLY_PROGRESS: { - SpockApplyProgress *sap; - - sap = (SpockApplyProgress *) XLogRecGetData(record); - appendStringInfo(buf, "spock apply progress for db %u, node %u, remote_node %u", - sap->key.dbid, - sap->key.node_id, - sap->key.remote_node_id); + SpockApplyProgress *rec; + + rec = (SpockApplyProgress *) XLogRecGetData(record); + appendStringInfo(buf, "spock apply progress for dbid %u, node_id %u, remote_node_id %u; " + "remote_commit_lsn %X/%X, remote_insert_lsn %X/%X, received_lsn %X/%X", + rec->key.dbid, + rec->key.node_id, + rec->key.remote_node_id, + LSN_FORMAT_ARGS(rec->remote_commit_lsn), + LSN_FORMAT_ARGS(rec->remote_insert_lsn), + LSN_FORMAT_ARGS(rec->received_lsn)); } break; case SPOCK_RMGR_SUBTRANS_COMMIT_TS: @@ -149,15 +156,6 @@ spock_rmgr_identify(uint8 info) void spock_rmgr_startup(void) { - /* - * During WAL recovery in the startup process, we need to attach to the - * shared memory structure (SpockGroupHash) that was created by the - * postmaster's shmem_startup_hook. - * - * spock_shmem_attach() handles resetting inherited globals and properly - * re-attaching to shared memory before WAL replay begins. - */ - spock_shmem_attach(); } void @@ -180,7 +178,7 @@ spock_rmgr_cleanup(void) XLogRecPtr spock_apply_progress_add_to_wal(const SpockApplyProgress *sap) { - XLogRecPtr lsn; + XLogRecPtr lsn; Assert(sap != NULL); @@ -188,16 +186,13 @@ spock_apply_progress_add_to_wal(const SpockApplyProgress *sap) XLogRegisterData((char *) sap, sizeof(SpockApplyProgress)); lsn = XLogInsert(SPOCK_RMGR_ID, SPOCK_RMGR_APPLY_PROGRESS); - return lsn; -} + /* + * Force the WAL record to disk immediately. This ensures that progress + * is durably recorded before we update the in-memory state and continue + * processing. If we crash after updating memory but before the WAL + * flushes, we could lose progress tracking and replay would be incorrect. + */ + XLogFlush(lsn); -/* - * spock_group_emit_progress_wal_cb - * - * Foreach callback, emit a group's apply-progress to WAL. - */ -void -spock_group_emit_progress_wal_cb(const SpockGroupEntry *e, void *arg) -{ - spock_apply_progress_add_to_wal(&e->progress); + return lsn; } diff --git a/src/spock_sync.c b/src/spock_sync.c index e496a816..834aefd0 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -443,6 +443,11 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) char *last_updated_ts = NULL; char *updated_by_decode = PQgetvalue(originRes, rno, GP_UPDATED_BY_DECODE); + sap.key.dbid = MyDatabaseId; + sap.key.node_id = MySubscription->target->id; + sap.key.remote_node_id = atooid(remote_node_id); + Assert(OidIsValid(sap.key.remote_node_id)); + /* Check: we view only values related to a single database */ Assert(!PQgetisnull(originRes, rno, GP_DBOID)); if (dbid_str == NULL) @@ -452,9 +457,6 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) Assert(strcmp(dbid_str, PQgetvalue(originRes, rno, GP_DBOID)) == 0); } - sap.key.dbid = MyDatabaseId; - sap.key.node_id = MySubscription->target->id; - sap.key.remote_node_id = atooid(remote_node_id); if (!PQgetisnull(originRes, rno, GP_REMOTE_COMMIT_TS)) { remote_commit_ts = PQgetvalue(originRes, rno, GP_REMOTE_COMMIT_TS); @@ -490,8 +492,8 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("transaction apply time precedes its original commit time"), - errdetail("Commit time %s (node ID %d), but it was applied at %s on the replica (node ID %d)", - remote_commit_ts, sap.key.remote_node_id, + errdetail("Commit time %s (node ID %s), but it was applied at %s on the replica (node ID %d)", + remote_commit_ts, remote_node_id, last_updated_ts, MySubscription->origin->id), errhint("usually it means that the server's clocks are out of sync."))); @@ -499,7 +501,7 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) sap.updated_by_decode = updated_by_decode[0] == 't', /* Update progress */ - spock_group_progress_update(&sap); + spock_group_progress_update(&sap); elog(LOG, "SPOCK: adjust spock.progress %s->%d to " "remote_commit_ts='%s' " diff --git a/src/spock_worker.c b/src/spock_worker.c index fc8857ac..2415c865 100644 --- a/src/spock_worker.c +++ b/src/spock_worker.c @@ -79,7 +79,7 @@ static void wait_for_worker_startup(SpockWorker *worker, BackgroundWorkerHandle *handle); static void signal_worker_xact_callback(XactEvent event, void *arg); static uint32 spock_ch_stats_hash(const void *key, Size keysize); -static bool spock_shmem_init_internal(int nworkers); +static void spock_shmem_init_internal(int nworkers); void @@ -344,15 +344,6 @@ spock_worker_on_exit(int code, Datum arg) void spock_worker_attach(int slot, SpockWorkerType type) { - /* - * Ensure shared memory is attached before using SpockCtx. - * - * Background workers inherit global variables from the postmaster but the - * pointers may not be valid. Always call spock_shmem_attach() which is - * idempotent and handles proper re-initialization. - */ - spock_shmem_attach(); - Assert(slot >= 0); Assert(slot < SpockCtx->total_workers); @@ -838,9 +829,18 @@ spock_worker_shmem_startup(void) * This is kludge for Windows (Postgres does not define the GUC variable * as PGDLLIMPORT) */ - nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, - false)); + nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, false)); + + /* + * Reset in case this is a restart within the postmaster + * Spock extension must be loaded on startup. In case of a fatal error and + * further restart, postmaster clean up shared memory but local variables + * stay the same. So, we need to clean outdated pointers in advance. + */ SpockCtx = NULL; + SpockHash = NULL; + SpockGroupHash = NULL; + exception_log_ptr = NULL; /* Avoid possible race-conditions when initializing shared memory. */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); @@ -850,7 +850,7 @@ spock_worker_shmem_startup(void) * This internally calls spock_group_shmem_startup() to handle group * initialization and file loading. */ - (void) spock_shmem_init_internal(nworkers); + spock_shmem_init_internal(nworkers); LWLockRelease(AddinShmemInitLock); } @@ -878,140 +878,58 @@ spock_ch_stats_hash(const void *key, Size keysize) * This centralizes the logic for setting up all Spock shared memory pointers, * which is used by both the initial startup hook and the attach function. */ -static bool +static void spock_shmem_init_internal(int nworkers) { HASHCTL hctl; bool found; - bool all_found = true; - /* Init signaling context for the various processes. */ - if (!SpockCtx) + SpockCtx = ShmemInitStruct("spock_context", + worker_shmem_size(nworkers, false), &found); + if (!found) { - SpockCtx = ShmemInitStruct("spock_context", - worker_shmem_size(nworkers, false), &found); - if (!SpockCtx) - elog(ERROR, "failed to initialize spock_context"); - - if (!found) - { - /* First time - initialize the structure */ - SpockCtx->lock = &((GetNamedLWLockTranche("spock")[0]).lock); - SpockCtx->apply_group_master_lock = &((GetNamedLWLockTranche(SPOCK_GROUP_TRANCHE_NAME)[0]).lock); - SpockCtx->supervisor = NULL; - SpockCtx->subscriptions_changed = false; - SpockCtx->total_workers = nworkers; - memset(SpockCtx->workers, 0, - sizeof(SpockWorker) * SpockCtx->total_workers); - all_found = false; - } + /* First time - initialize the structure */ + SpockCtx->lock = &((GetNamedLWLockTranche("spock")[0]).lock); + SpockCtx->apply_group_master_lock = &((GetNamedLWLockTranche(SPOCK_GROUP_TRANCHE_NAME)[0]).lock); + SpockCtx->slot_group_master_lock = &((GetNamedLWLockTranche("spock_slot_groups")[0]).lock); + SpockCtx->slot_ngroups = nworkers; + SpockCtx->supervisor = NULL; + SpockCtx->subscriptions_changed = false; + SpockCtx->total_workers = nworkers; + memset(SpockCtx->workers, 0, + sizeof(SpockWorker) * SpockCtx->total_workers); } /* * Initialize exception log pointer array. */ - if (!exception_log_ptr) - { - exception_log_ptr = ShmemInitStruct("spock_exception_log_ptr", - worker_shmem_size(nworkers, false), &found); - if (!exception_log_ptr) - elog(ERROR, "failed to initialize spock_exception_log_ptr"); - - if (!found) - { - memset(exception_log_ptr, 0, sizeof(SpockExceptionLog) * nworkers); - all_found = false; - } - } + exception_log_ptr = ShmemInitStruct("spock_exception_log_ptr", + worker_shmem_size(nworkers, false), &found); - /* - * Initialize SpockHash - channel stats hash. - */ - if (!SpockHash) + if (!found) { - memset(&hctl, 0, sizeof(hctl)); - hctl.keysize = sizeof(spockStatsKey); - hctl.entrysize = sizeof(spockStatsEntry); - hctl.hash = spock_ch_stats_hash; - SpockHash = ShmemInitHash("spock channel stats hash", - spock_stats_max_entries, - spock_stats_max_entries, - &hctl, - HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); - if (!SpockHash) - elog(ERROR, "failed to initialize spock channel stats hash"); + memset(exception_log_ptr, 0, sizeof(SpockExceptionLog) * nworkers); } /* - * Initialize SpockGroupHash via the spock_group module. This handles both - * creation/attachment and file loading if needed. - * - * Note: We pass 'all_found' to indicate whether this is first-time setup - * or attachment to existing structures. - */ - spock_group_shmem_startup(nworkers, all_found); - - return all_found; -} - -/* - * spock_shmem_attach - * - * Attach (or re-attach) to existing shared memory structures. - * - * This is called from processes that need to access Spock's shared memory: - * - Startup process (via spock_rmgr_startup) - once per recovery - * - Checkpointer (via spock_checkpoint_hook) - once per process lifetime - * - Background workers (via spock_worker_attach) - once per worker - * - * IMPORTANT: Auxiliary processes (startup, checkpointer) inherit global - * variable values from the postmaster, but these pointers might not be valid - * in their address space. - * - * This function uses a static flag to ensure we only attach once per process, - * avoiding redundant shared memory lookups on subsequent calls. - * - * Unlike spock_worker_shmem_startup(), this doesn't acquire AddinShmemInitLock - * or register hooks - it just looks up and attaches to existing structures. - */ -void -spock_shmem_attach(void) -{ - static bool attached = false; - int nworkers; - - /* If already attached in this process, nothing to do */ - if (attached) - return; - - /* - * Reset globals to NULL to force proper attachment. - * - * This is critical for auxiliary processes (startup, checkpointer) that - * inherit invalid global values from the postmaster. - */ - SpockCtx = NULL; - SpockHash = NULL; - SpockGroupHash = NULL; - exception_log_ptr = NULL; - - /* - * Get max_worker_processes to know the size of structures. + * Initialize SpockHash - channel stats hash. */ - nworkers = atoi(GetConfigOptionByName("max_worker_processes", NULL, false)); - if (nworkers <= 0) - nworkers = 9; + memset(&hctl, 0, sizeof(hctl)); + hctl.keysize = sizeof(spockStatsKey); + hctl.entrysize = sizeof(spockStatsEntry); + hctl.hash = spock_ch_stats_hash; + SpockHash = ShmemInitHash("spock channel stats hash", + spock_stats_max_entries, + spock_stats_max_entries, + &hctl, + HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); /* - * Attach to all structures using the common initialization logic. Returns - * true if structures were found (normal case), false if they had to be - * created (shouldn't happen but harmless). + * Initialize SpockGroupHash via the spock_group module. + * This creates the hash table and loads the resource file to seed it. */ - (void) spock_shmem_init_internal(nworkers); - - /* Mark as attached to avoid redundant work on subsequent calls */ - attached = true; + spock_group_shmem_startup(nworkers); } /* diff --git a/utils/pgindent/typedefs.list b/utils/pgindent/typedefs.list index d05b8a10..e3d4880d 100644 --- a/utils/pgindent/typedefs.list +++ b/utils/pgindent/typedefs.list @@ -4376,7 +4376,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4461,7 +4461,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4532,7 +4532,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog @@ -4616,7 +4616,7 @@ SpockTupleData SpockSyncStatus SpockResFileHeader DumpCtx -SpockGroupKey +SpockProgressKey SpockApplyProgress SpockGroupEntry SpockExceptionLog