本节继续介绍postgresql
本节继续介绍postgresql的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:
(gdb) bt
#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,
nevents=1) at latch.c:1048
#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,
wait_event_info=83886092) at latch.c:1000
#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,
wait_event_info=83886092) at latch.c:385
#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
at walsender.c:1539
#7 0x00000000008c0170 in PostgresMain (arGC=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
at postgres.c:4178
#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.
typedef struct StringInfoData
{
char *data;
int len;
int maxlen;
int cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;
exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
bool
exec_replication_command(const char *cmd_string)
{
int parse_rc;
node *cmd_node;
MemoryContext cmd_context;
MemoryContext old_context;
if (Got_STOPPING)
WalSndSetState(WALSNDSTATE_STOPPING);
if (MyWalSnd->state == WALSNDSTATE_STOPPING)
ereport(ERROR,
(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
SnapBuildClearExportedSnapshot();
//检查中断
CHECK_FOR_INTERRUPTS();
//命令上下文
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(cmd_context);
//初始化复制扫描器
replication_scanner_init(cmd_string);
parse_rc = replication_yyparse();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
(errmsg_internal("replication command parser returned %d",
parse_rc))));
cmd_node = replication_parse_result;
if (cmd_node->type != T_sqlCmd)
ereport(log_replication_commands ? LOG : DEBUG1,
(errmsg("received replication command: %s", cmd_string)));
if (!IsTransactionBlock())
SnapBuildClearExportedSnapshot();
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
"commands ignored until end of transaction block")));
CHECK_FOR_INTERRUPTS();
initStringInfo(&output_message);
initStringInfo(&reply_message);
initStringInfo(&tmpbuf);
//向pgstat报告该进程正在运行.
pgstat_report_activity(STATE_RUNNING, NULL);
//根据命令类型执行相应的命令
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
//识别系统
IdentifySystem();
break;
case T_BaseBackupCmd:
//BASE_BACKUP
PreventInTransactionBlock(true, "BASE_BACKUP");
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
case T_CreateReplicationSlotCmd:
//创建复制slot
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
break;
case T_DropReplicationSlotCmd:
//删除复制slot
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
break;
case T_StartReplicationCmd:
//START_REPLICATION
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
PreventInTransactionBlock(true, "START_REPLICATION");
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
break;
}
case T_TimeLineHistoryCmd:
//构造时间线历史 TIMELINE_HISTORY
PreventInTransactionBlock(true, "TIMELINE_HISTORY");
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
case T_VariableShowStmt:
//
{
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
GetPGVariable(n->name, dest);
}
break;
case T_SQLCmd:
//SQL命令
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
pgstat_report_activity(STATE_IDLE, NULL);
return false;
default:
//其他命令
elog(ERROR, "unrecognized replication command node tag: %u",
cmd_node->type);
}
//执行完毕,回到原来的内存上下文中
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
//命令结束
EndCommand("SELECT", DestRemote);
//报告状态
pgstat_report_activity(STATE_IDLE, NULL);
return true;
}
StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)
static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
if (ThisTimeLineID == 0)
//时间线校验
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
//#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a logical replication slot for physical replication"))));
}
if (am_cascading_walsender)
{
//这也会更新ThisTimeLineID变量
FlushPtr = GetStandbyFlushRecPtr();
}
else
FlushPtr = GetFlushRecPtr();
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
(uint32) (switchpoint >> 32),
(uint32) (switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = ThisTimeLineID;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
streamingDoneSending = streamingDoneReceiving = false;
//如果没有任何东西需要stream,不需要启动COPY命令
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
//设置状态
WalSndSetState(WALSNDSTATE_CATCHUP);
//发送CopyBothResponse消息,启动streaming
pq_beginmessage(&buf, 'W');//W->COPY命令?
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
(uint32) (FlushPtr >> 32),
(uint32) (FlushPtr))));
}
//从请求点开始streaming
sentPtr = cmd->startpoint;
//初始化共享内存状态
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
//walsender主循环,开始复制,激活复制
replication_active = true;
//主循环
WalSndLoop(XLogSendPhysical);
//完结后设置为非活动状态
replication_active = false;
if (got_STOPPING)
proc_exit(0);//退出
//设置状态
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
pq_puttextmessage('C', "START_STREAMING");
}
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1339 1 2 14:45 pts/0 00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION
(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
1.执行相关初始化和校验
(gdb) n
1446 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb)
1454 SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0,
writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456 CHECK_FOR_INTERRUPTS();
(gdb)
2.切换内存上下文
(gdb)
1458 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb)
1461 old_context = MemoryContextSwitchTo(cmd_context);
(gdb)
3.初始化复制扫描器
(gdb)
1463 replication_scanner_init(cmd_string);
(gdb) n
1464 parse_rc = replication_yyparse();
(gdb)
1465 if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb)
(gdb) n
1471 cmd_node = replication_parse_result;
(gdb)
(gdb)
1479 if (cmd_node->type != T_SQLCmd)
(gdb) n
1480 ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)
4.执行事务相关的判断或校验
(gdb) n
1487 if (!IsTransactionBlock())
(gdb)
1488 SnapBuildClearExportedSnapshot();
(gdb)
1494 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb)
1500 CHECK_FOR_INTERRUPTS();
(gdb)
5.初始化输入输出消息
(gdb)
1506 initStringInfo(&output_message);
(gdb)
1507 initStringInfo(&reply_message);
(gdb)
1508 initStringInfo(&tmpbuf);
(gdb)
1511 pgstat_report_activity(STATE_RUNNING, NULL);
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
(gdb) n
1513 switch (cmd_node->type)
(gdb)
1534 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb)
1536 PreventInTransactionBlock(true, "START_REPLICATION");
(gdb)
1538 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb)
1539 StartReplication(cmd);
进入StartReplication
1539 StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532 if (ThisTimeLineID == 0)
(gdb)
1.执行相关初始化和校验
(gdb) n
546 if (cmd->slotname)
(gdb)
560 if (am_cascading_walsender)
(gdb)
2.选择时间线
(gdb) n
568 if (cmd->timeline != 0)
(gdb)
572 sendTimeLine = cmd->timeline;
(gdb)
573 if (sendTimeLine == ThisTimeLineID)
(gdb)
575 sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576 sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb)
634 streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16,
startpoint = 1560281088, options = 0x0}
(gdb)
3.进入COPY模式
(gdb) n
637 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)
3.1设置状态
648 WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)
3.2发送CopyBothResponse消息,启动streaming
(gdb) n
651 pq_beginmessage(&buf, 'W');
(gdb)
652 pq_sendbyte(&buf, 0);
(gdb)
653 pq_sendint16(&buf, 0);
(gdb)
654 pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0: 0
(gdb) x/32hb buf->data
0x1df53b0: 0 0 0 127 127 127 127 127
0x1df53b8: 127 127 127 127 127 127 127 127
0x1df53c0: 127 127 127 127 127 127 127 127
0x1df53c8: 127 127 127 127 127 127 127 127
(gdb)
3.3初始化相关变量,如共享内存状态等
(gdb) n
655 pq_flush();
(gdb)
661 if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672 sentPtr = cmd->startpoint;
(gdb)
675 SpinLockAcquire(&MyWalSnd->mutex);
(gdb)
676 MyWalSnd->sentPtr = sentPtr;
(gdb)
677 SpinLockRelease(&MyWalSnd->mutex);
(gdb)
679 SyncRepInitConfig();
(gdb)
682 replication_active = true;
3.4进入主循环(WalSndLoop)
(gdb)
684 WalSndLoop(XLogSendPhysical);
(gdb)
DONE!
PG Source Code
--结束END--
本文标题: PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)
本文链接: https://lsjlt.com/news/49827.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0