半个PostgreSQL DBA,热衷于数据库相关的技术。我的ppt分享https://pan.baidu.com/s/1eRQsdAa https://github.com/chenhuajun https://chenhuajun.github.io
分类: Mysql/postgreSQL
2018-07-30 01:27:24
订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。 每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。
从执行CREATE SUBSCRIPTION开始订阅端的相关处理流程概述如下:
订阅表进入同步状态(状态码是‘s’或'r')后,发布端的变更都会通过消息通知订阅端; 订阅端apply worker按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表apply变更,并反馈apply位置,用于监视复制延迟。
通过调试,确认发布端发生更新时,发送给订阅端的数据包。
insert into tbx3 values(100);
发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端
insert into tbx10 values(100);
发布端产生了和订阅表无关修改,在事务提交时,发布端依次发送下面的消息到订阅端
错误日志示例:
2018-07-28 20:11:56.018 UTC [470] ERROR: duplicate key value violates unique constraint "tbx3_pkey" 2018-07-28 20:11:56.018 UTC [470] DETAIL: Key (id)=(2) already exists. 2018-07-28 20:11:56.022 UTC [47] LOG: worker process: logical replication worker for subscription 74283 (PID 470) exited with exit code 1 2018-07-28 20:12:01.029 UTC [471] LOG: logical replication apply worker for subscription "sub_shard" has started 2018-07-28 20:12:01.049 UTC [471] ERROR: duplicate key value violates unique constraint "tbx3_pkey" 2018-07-28 20:12:01.049 UTC [471] DETAIL: Key (id)=(2) already exists. 2018-07-28 20:12:01.058 UTC [47] LOG: worker process: logical replication worker for subscription 74283 (PID 471) exited with exit code 1 2018-07-28 20:12:06.070 UTC [472] LOG: logical replication apply worker for subscription "sub_shard" has started 2018-07-28 20:12:06.089 UTC [472] ERROR: duplicate key value violates unique constraint "tbx3_pkey" 2018-07-28 20:12:06.089 UTC [472] DETAIL: Key (id)=(2) already exists.
CREATE PUBLICATION CreatePublication() CatalogTupleInsert(rel, tup); // 在pg_publication系统表中插入此发布信息 PublicationAddTables(puboid, rels, true, NULL);// publication_add_relation() check_publication_add_relation();// 检查表类型,不支持的表报错。只支持普通表('r'),且不是unloged和临时表 CatalogTupleInsert(rel, tup); // 在pg_publication_rel系统表中插入订阅和表的映射
CREATE SUBSCRIPTION CreateSubscription() CatalogTupleInsert(rel, tup); //在pg_subscription系统表中插入此订阅信息 replorigin_create(originname); //在pg_replication_origin系统表中插入此订阅对应的复制源 foreach(lc, tables) // 设置每个表的pg_subscription_rel.srsubstate table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; // ★★★1 如果拷贝数据,设置每个表的pg_subscription_rel.srsubstate='i' SetSubscriptionRelState(subid, relid, table_state,InvalidXLogRecPtr, false); walrcv_create_slot(wrconn, slotname, false,CRS_NOEXPORT_SNAPSHOT, &lsn); ApplyLauncherWakeupAtCommit(); //唤醒logical replication launcher进程
ApplyLauncherMain() sublist = get_subscription_list(); //从pg_subscription获取订阅列表 foreach(lc, sublist) logicalrep_worker_launch(..., InvalidOid); // 对enabled且没有创建worker的订阅创建apply worker。apply worker如果已超过max_logical_replication_workers(默认4)报错 RegisterDynamicBackgroundWorker(&bgw, &bgw_handle);// 注册后台工作进程,入口函数为"ApplyWorkerMain"
ApplyWorkerMain replorigin_session_setup(originid); // 从共享内存中查找并设置复制源,如果不存在使用新的,复制源名称为pg_${订阅OID}。 origin_startpos = replorigin_session_get_progress(false);// 获取复制源的remote_lsn walrcv_connect(MySubscription->conninfo, true, MySubscription->name,&err); // 连接到订阅端 walrcv_startstreaming(wrconn, &options); // 开始流复制 LogicalRepApplyLoop(origin_startpos); // Apply进程主循环 for(;;) len = walrcv_receive(wrconn, &buf, &fd); if (c == 'w') // 'w'消息的处理 UpdateWorkerStats(last_received, send_time, false);更新worker统计信息(last_lsn,last_send_time,last_recv_time) apply_dispatch(&s); // 分发逻辑复制命令 switch (action) case 'B': /* BEGIN */ apply_handle_begin(s); case 'C': /* COMMIT */ apply_handle_commit(s); if (IsTransactionState() && !am_tablesync_worker()) // 当发布端的事务更新不涉及订阅表时,仍会发送B和C消息,此时不在事务中,跳过下面操作 replorigin_session_origin_lsn = commit_data.end_lsn; // 更新复制源状态,确保apply worker crash时可以找到正确的开始位置 replorigin_session_origin_timestamp = commit_data.committime; CommitTransactionCommand(); // 提交事务 pgstat_report_stat(false); // 更新统计信息 process_syncing_tables(commit_data.end_lsn); // 对处于同步中的表,协调sync worker和apply worker进程同步状态 process_syncing_tables_for_apply(current_lsn); GetSubscriptionNotReadyRelations(MySubscription->oid); // 从pg_subscription_rel中获取订阅中所有非ready状态的表。 foreach(lc, table_states) // 处理每个非ready状态的表 if (rstate->state == SUBREL_STATE_SYNCDONE) { if (current_lsn >= rstate->lsn) { rstate->state = SUBREL_STATE_READY; //处理第一个事务后,从syncdone->ready状态,但这个事务不需要和这个表相关。 rstate->lsn = current_lsn; SetSubscriptionRelState(MyLogicalRepWorker->subid, // 更新pg_subscription_rel rstate->relid, rstate->state, rstate->lsn, true); } } else { syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) { /* Found one, update our copy of its state */ rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* * Sync worker is waiting for apply. Tell sync worker it * can catchup now. */ syncworker->relstate = SUBREL_STATE_CATCHUP; // ★★★3 SUBREL_STATE_SYNCWAIT -> SUBREL_STATE_CATCHUP syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn); } /* If we told worker to catch up, wait for it. */ if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ if (syncworker->proc) logicalrep_worker_wakeup_ptr(syncworker); wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); // 等待sync worker将表的同步状态设置为SUBREL_STATE_SYNCDONE } } else { /* * If there is no sync worker for this table yet, count * running sync workers for this subscription, while we have * the lock. */ logicalrep_worker_launch(MyLogicalRepWorker->dbid, // 如果这个表没有对应的sync worker,且sync worker数未超过max_sync_workers_per_subscription,启动一个。 MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, rstate->relid); } else if (c == 'k') // 'k'消息的处理 send_feedback(last_received, reply_requested, false); // 向订阅端发生反馈 UpdateWorkerStats(last_received, timestamp, true); // 更新worker统计信息(last_lsn,last_send_time,last_recv_time,reply_lsn,send_time) case I': /* INSERT */ apply_handle_insert(s); relid = logicalrep_read_insert(s, &newtup); if (!should_apply_changes_for_rel(rel))return; if (am_tablesync_worker()) return MyLogicalRepWorker->relid == rel->localreloid; // 对sync worker,只apply其负责同步的表 else return (rel->state == SUBREL_STATE_READY || // 对apply worker, 同步状态为SUBREL_STATE_SYNCDONE时,只同步syncdone位置之后的wal (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); ExecSimpleRelationInsert(estate, remoteslot); // 插入记录 ExecBRInsertTriggers(estate, resultRelInfo, slot); // 处理BEFORE ROW INSERT Triggers simple_heap_insert(rel, tuple); ExecARInsertTriggers(estate, resultRelInfo, tuple,recheckIndexes, NULL); // 处理AFTER ROW INSERT Triggers AfterTriggerEndQuery(estate); // 处理 queued AFTER triggers ... send_feedback(last_received, false, false);//没有新的消息要处理,向发布端发送位置反馈 process_syncing_tables(last_received);//如果不在事务块里,同步表状态
ApplyWorkerMain() //apply worker和sync worker使用相同的入口函数 LogicalRepSyncTableStart(&origin_startpos); GetSubscriptionRelState()(MyLogicalRepWorker->subid,MyLogicalRepWorker->relid,&relstate_lsn, true);// 从pg_subscription_rel中获取订阅的复制lsn walrcv_connect(MySubscription->conninfo, true, slotname, &err); switch (MyLogicalRepWorker->relstate) { case SUBREL_STATE_INIT: case SUBREL_STATE_DATASYNC: { MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, true); res = walrcv_exec(wrconn, // 开始事务 "BEGIN READ ONLY ISOLATION LEVEL " "REPEATABLE READ", 0, NULL); walrcv_create_slot(wrconn, slotname, true, // 使用快照创建临时复制槽,并记录快照位置。 CRS_USE_SNAPSHOT, origin_startpos); copy_table(rel); // copy表数据 walrcv_exec(wrconn, "COMMIT", 0, NULL); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; // ★★★2 更新表同步状态为SUBREL_STATE_SYNCWAIT MyLogicalRepWorker->relstate_lsn = *origin_startpos; wait_for_worker_state_change(SUBREL_STATE_CATCHUP); // 等待apply worker将状态变更为SUBREL_STATE_CATCHUP if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) // 如果sync worker落后于apply worker,sync worker跳过此步继续apply WAL; { /* * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. */ SetSubscriptionRelState(MyLogicalRepWorker->subid, // ★★★4 把同步状态从SUBREL_STATE_CATCHUP更新到SUBREL_STATE_SYNCDONE并退出 MyLogicalRepWorker->relid, SUBREL_STATE_SYNCDONE, *origin_startpos, true); finish_sync_worker(); } break; } case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: finish_sync_worker(); break; } options.startpoint = origin_startpos; walrcv_startstreaming(wrconn, &options);// 开始流复制,以同步快照位置作为流的开始位置 LogicalRepApplyLoop(origin_startpos); // Apply进程主循环 for(;;) len = walrcv_receive(wrconn, &buf, &fd); UpdateWorkerStats(last_received, send_time, false); 更新worker统计信息(last_lsn,last_send_time,last_recv_time) apply_dispatch(&s); // 分发逻辑复制命令 switch (action) case 'B': /* BEGIN */ apply_handle_begin(s); case 'C': /* COMMIT */ apply_handle_commit(s); process_syncing_tables(commit_data.end_lsn); // 对处于同步中的表,协调sync worker和apply worker进程同步状态 process_syncing_tables_for_sync(current_lsn); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) { TimeLineID tli; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; // ★★★4 把同步状态从SUBREL_STATE_CATCHUP更新到SUBREL_STATE_SYNCDONE MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, true); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); } case I': /* INSERT */ apply_handle_insert(s);