diff --git a/include/spock_group.h b/include/spock_group.h index 1f58e6da..921b6670 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -150,19 +150,16 @@ typedef struct SpockGroupEntry extern void spock_group_shmem_request(void); extern void spock_group_shmem_startup(int napply_groups); -SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id); -void spock_group_detach(void); +extern SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, + Oid remote_node_id); +extern void spock_group_detach(void); extern bool spock_group_progress_update(const SpockApplyProgress *sap); extern void spock_group_progress_update_ptr(SpockGroupEntry *entry, const SpockApplyProgress *sap); -SpockApplyProgress *apply_worker_get_progress(void); -SpockGroupEntry *spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id); - -/* Iterate all groups */ -typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg); -void spock_group_foreach(SpockGroupIterCB cb, void *arg); +extern SpockApplyProgress *apply_worker_get_progress(void); extern void spock_group_resource_dump(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); +extern void spock_group_progress_update_list(List *lst); #endif /* SPOCK_GROUP_H */ diff --git a/src/spock_group.c b/src/spock_group.c index 02fc6537..ba2a4fda 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -394,37 +394,34 @@ spock_group_progress_update_ptr(SpockGroupEntry *e, /* * apply_worker_get_progress * - * Return a pointer to the current apply worker's progress payload, or NULL + * Return a pointer to the snapshot of the current apply worker's progress. */ SpockApplyProgress * apply_worker_get_progress(void) { + static SpockApplyProgress sap; + Assert(MyApplyWorker != NULL); Assert(MyApplyWorker->apply_group != NULL); if (MyApplyWorker && MyApplyWorker->apply_group) - return &MyApplyWorker->apply_group->progress; + { + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED); + sap = MyApplyWorker->apply_group->progress; + LWLockRelease(SpockCtx->apply_group_master_lock); + } + else + /* + * Should never happen. In production just send the worker into + * exception behaviour without crash. + */ + elog(ERROR, "apply worker has not been fully initialised yet"); - return NULL; + return &sap; } -/* - * spock_group_lookup - * - * Snapshot-read the progress payload for the specified group. Uses HASH_FIND - * to locate the entry. - * - * Returns entry if found, NULL otherwise. - */ -SpockGroupEntry * -spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id) -{ - SpockGroupKey key = make_key(dbid, node_id, remote_node_id); - SpockGroupEntry *e; - - e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_FIND, NULL); - return e; /* may be NULL */ -} +/* Iterate all groups */ +typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg); /* * spock_group_foreach @@ -433,7 +430,7 @@ spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id) * Caller selects any gating needed for consistency (e.g., take the gate in * SHARED before calling this if you want a coherent snapshot). */ -void +static void spock_group_foreach(SpockGroupIterCB cb, void *arg) { HASH_SEQ_STATUS it; @@ -649,3 +646,31 @@ spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags) /* Dump group progress to resource.dat */ spock_group_resource_dump(); } + +void +spock_group_progress_update_list(List *lst) +{ + ListCell *lc; + + foreach (lc, lst) + { + SpockApplyProgress *sap = (SpockApplyProgress *) lfirst(lc); + + spock_apply_progress_add_to_wal(sap); + + spock_group_progress_update(sap); + + elog(LOG, "SPOCK: adjust spock.progress %d->%d to " + "remote_commit_ts='%s' " + "remote_commit_lsn=%llX remote_insert_lsn=%llX", + sap->key.remote_node_id, MySubscription->target->id, + timestamptz_to_str(sap->remote_commit_ts), + sap->remote_commit_lsn, sap->remote_insert_lsn); + } + + /* + * Free the list and each object. Be careful here because it is inside + * a memory context that is rarely reset. + */ + list_free_deep(lst); +} diff --git a/src/spock_sync.c b/src/spock_sync.c index 5437d0fa..f5c7cbfe 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -398,14 +398,15 @@ ensure_replication_origin(char *slot_name) } -static void -adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) +static List * +adjust_progress_info(PGconn *origin_conn) { const char *originQuery = "SELECT * FROM spock.progress " "WHERE node_id = %u AND remote_node_id <> %u"; StringInfoData query; PGresult *originRes; + List *resultList = NIL; /* * Select the current content of the origin's spock.progress table where @@ -427,7 +428,10 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) for (rno = 0; rno < PQntuples(originRes); rno++) { - SpockApplyProgress sap; + SpockApplyProgress *sap = + MemoryContextAlloc(CacheMemoryContext, + sizeof(SpockApplyProgress)); + MemoryContext oldctx; /* * Update the remote node's progress entry to what our sync @@ -443,10 +447,10 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) char *last_updated_ts = NULL; char *updated_by_decode = PQgetvalue(originRes, rno, GP_UPDATED_BY_DECODE); - sap.key.dbid = MyDatabaseId; - sap.key.node_id = MySubscription->target->id; - sap.key.remote_node_id = atooid(remote_node_id); - Assert(OidIsValid(sap.key.remote_node_id)); + sap->key.dbid = MyDatabaseId; + sap->key.node_id = MySubscription->target->id; + sap->key.remote_node_id = atooid(remote_node_id); + Assert(OidIsValid(sap->key.remote_node_id)); /* Check: we view only values related to a single database */ Assert(!PQgetisnull(originRes, rno, GP_DBOID)); @@ -460,13 +464,13 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) if (!PQgetisnull(originRes, rno, GP_REMOTE_COMMIT_TS)) { remote_commit_ts = PQgetvalue(originRes, rno, GP_REMOTE_COMMIT_TS); - sap.remote_commit_ts = str_to_timestamptz(remote_commit_ts); - Assert(IS_VALID_TIMESTAMP(sap.remote_commit_ts)); + sap->remote_commit_ts = str_to_timestamptz(remote_commit_ts); + Assert(IS_VALID_TIMESTAMP(sap->remote_commit_ts)); } - sap.prev_remote_ts = sap.remote_commit_ts; + sap->prev_remote_ts = sap->remote_commit_ts; - sap.remote_commit_lsn = str_to_lsn(remote_commit_lsn); - sap.remote_insert_lsn = str_to_lsn(remote_insert_lsn); + sap->remote_commit_lsn = str_to_lsn(remote_commit_lsn); + sap->remote_insert_lsn = str_to_lsn(remote_insert_lsn); /* * We don't actually receive a single WAL record - just assume @@ -474,17 +478,16 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) * value in case someone uses tracking data in state monitoring * scripts. */ - sap.received_lsn = str_to_lsn(remote_commit_lsn); + sap->received_lsn = str_to_lsn(remote_commit_lsn); if (!PQgetisnull(originRes, rno, GP_LAST_UPDATED_TS)) { last_updated_ts = PQgetvalue(originRes, rno, GP_LAST_UPDATED_TS); - sap.last_updated_ts = str_to_timestamptz(last_updated_ts); + sap->last_updated_ts = str_to_timestamptz(last_updated_ts); - Assert(IS_VALID_TIMESTAMP(sap.last_updated_ts)); - - if (sap.last_updated_ts < sap.remote_commit_ts) + Assert(IS_VALID_TIMESTAMP(sap->last_updated_ts)); + if (sap->last_updated_ts < sap->remote_commit_ts) /* * Complaining at the end of the sync we shouldn't flood * the log @@ -498,10 +501,11 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) MySubscription->origin->id), errhint("usually it means that the server's clocks are out of sync."))); } - sap.updated_by_decode = updated_by_decode[0] == 't', + sap->updated_by_decode = updated_by_decode[0] == 't', - /* Update progress */ - spock_group_progress_update(&sap); + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + resultList = lappend(resultList, sap); + MemoryContextSwitchTo(oldctx); elog(LOG, "SPOCK: adjust spock.progress %s->%d to " "remote_commit_ts='%s' " @@ -522,6 +526,7 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn) PQclear(originRes); resetStringInfo(&query); + return resultList; } @@ -887,6 +892,7 @@ copy_tables_data(SpockSubscription *sub, const char *origin_dsn, { PGconn *origin_conn; PGconn *target_conn; + List *progress_entries_list = NIL; ListCell *lc; /* Connect to origin node. */ @@ -917,11 +923,21 @@ copy_tables_data(SpockSubscription *sub, const char *origin_dsn, CHECK_FOR_INTERRUPTS(); } - adjust_progress_info(origin_conn, target_conn); + progress_entries_list = adjust_progress_info(origin_conn); /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); finish_copy_target_tx(target_conn); + + /* + * Update replication progress. We must do it after commit of the COPY. + * + * NOTE: + * It is not obvious we need to arrange progress in case of accidental + * single-table re-sync. But while this machinery serves information goals + * only we just follow the initial logic. + */ + spock_group_progress_update_list(progress_entries_list); } /* @@ -1005,7 +1021,7 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, CHECK_FOR_INTERRUPTS(); } - adjust_progress_info(origin_conn, target_conn); + adjust_progress_info(origin_conn); /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn);