本篇内容介绍了“postgresql中heap_insert->XLogInsert函数分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情
本篇内容介绍了“postgresql中heap_insert->XLogInsert函数分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
静态变量
进程中全局共享
static XLogRecData *rdatas;
//已使用的入口
static int num_rdatas;
//已分配的空间大小
static int max_rdatas;
//是否调用XLogBeginInsert函数
static bool begininsert_called = false;
宏定义
typedef char* Pointer;//指针
typedef Pointer Page;//Page
#define XLOG_HEAP_INSERT 0x00
typedef uint64 XLogRecPtr;
#define PageGetLSN(page) \
PageXLogRecPtrGet(((PageHeader) (page))->pd_lsn)
#define PageSetLSN(page, lsn) \
PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn)
//存储压缩会后的块镜像所需要的缓存空间大小
#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
typedef int slock_t;
XLogCtl
XLOG的所有共享内存状态信息
typedef struct XLoGCtlData
{
XLogCtlInsert Insert;//插入控制器
//------ 通过info_lck锁保护
XLogwrtRqst LogwrtRqst;
//Insert->RedoRecPtr最近的拷贝
XLogRecPtr RedoRecPtr;
//最后的checkpoint的nextXID & epoch
uint32 ckptXidEpoch;
TransactionId ckptXid;
//最新异步提交/回滚的LSN
XLogRecPtr asyncXactLSN;
//slot需要的最"老"的LSN
XLogRecPtr replicationSlotMinLSN;
//最后移除/回收的XLOG段
XLogSegNo lastRemovedSegNo;
//---- "伪装"的LSN计数器,用于不需要记录日志的关系.通过ulsn_lck锁保护
XLogRecPtr unloggedLSN;
slock_t ulsn_lck;
//---- 切换后最新的xlog段的时间线和LSN,通过WALWriteLock锁保护
pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN;
XLogwrtResult LogwrtResult;
XLogRecPtr InitializedUpTo;
//未写入的XLOG pages的缓存
char *pages;
//ptr-s的第一个字节 + XLOG_BLCKSZ
XLogRecPtr *xlblocks;
//已分配的xlog缓冲的索引最高值
int XLogCacheBlck;
TimeLineID ThisTimeLineID;
TimeLineID PrevTimeLineID;
bool SharedRecoveryInProgress;
bool SharedHotStandbyActive;
bool WalWriterSleeping;
Latch recoveryWakeupLatch;
XLogRecPtr lastCheckPointRecPtr;
XLogRecPtr lastCheckPointEndPtr;
CheckPoint lastCheckPoint;
XLogRecPtr lastReplayedEndRecPtr;
TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
//最后的COMMIT/ABORT回放(或正在回放)记录的时间戳
TimestampTz recoveryLastXTime;
TimestampTz currentChunkStartTime;
//是否请求暂停恢复
bool recoveryPause;
XLogRecPtr lastFpwDisableRecPtr;
//锁结构
slock_t info_lck;
} XLogCtlData;
static XLogCtlData *XLogCtl = NULL;
heap_insert
主要实现逻辑是插入元组到堆中,其中存在对WAL(XLog)进行处理的部分.
参见Postgresql 源码解读(104)- WAL#1(Insert & WAL-heap_insert函数#1)
XLogInsert
插入一个具有指定的RMID和info字节的XLOG记录,该记录的主体是先前通过XLogReGISter*调用注册的数据和缓冲区引用。
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info)
{
XLogRecPtr EndPos;//uint64
//在此前,XLogBeginInsert()必须已调用
if (!begininsert_called)
elog(ERROR, "XLogBeginInsert was not called");
if ((info & ~(XLR_RMGR_INFO_MASK |
XLR_SPECIAL_REL_UPDATE |
XLR_CHECK_CONSISTENCY)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
XLogResetInsertion();
EndPos = SizeOfXLogLongPHD;
return EndPos;
}
do
{
//循环
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
&fpw_lsn);
//curinsert_flags类型为uint8
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
return EndPos;
}
XLogInsertRecord
插入一个由已经构造的数据chunks链表示的XLOG记录。
XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
XLogRecPtr fpw_lsn,
uint8 flags)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;//XLOG写入控制器
pg_crc32c rdata_crc;//uint32
bool inserted;
XLogRecord *rechdr = (XLogRecord *) rdata->data;
uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
bool prevDoPageWrites = doPageWrites;
//假定所有的记录头部数据都处于第一个chunk中
Assert(rdata->len >= SizeOfXLogRecord);
//交叉检查
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
START_CRIT_SECTION();
if (isLogSwitch)
WALInsertLockAcquireExclusive();
else
WALInsertLockAcquire();
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
}
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
if (doPageWrites &&
(!prevDoPageWrites ||
(fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
{
WALInsertLockRelease();
END_CRIT_SECTION();
return InvalidXLogRecPtr;
}
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
if (inserted)
{
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
StartPos, EndPos);
if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
{
int lockno = holdingAllLocks ? 0 : MyLockNo;
WALInsertLocks[lockno].l.lastImportantAt = StartPos;
}
}
else
{
}
WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
END_CRIT_SECTION();
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
SpinLockAcquire(&XLogCtl->info_lck);
//预先请求包含新块(s)
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
//如有机会,更新本地的结果拷贝
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
}
if (isLogSwitch)
{
TRACE_POSTGRESQL_WAL_SWITCH();
XLogFlush(EndPos);
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
if (offset == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
#ifdef WAL_DEBUG//DEBUG代码
if (XLOG_DEBUG)
{
static XLogReaderState *debug_reader = NULL;
StringInfoData buf;
StringInfoData recordBuf;
char *errORMsg = NULL;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(walDebugCxt);
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
(uint32) (EndPos >> 32), (uint32) EndPos);
initStringInfo(&recordBuf);
for (; rdata != NULL; rdata = rdata->next)
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
if (!debug_reader)
{
appendStringInfoString(&buf, "error decoding record: out of memory");
}
else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
&errormsg))
{
appendStringInfo(&buf, "error decoding record: %s",
errormsg ? errormsg : "no error message");
}
else
{
appendStringInfoString(&buf, " - ");
xlog_outdesc(&buf, debug_reader);
}
elog(LOG, "%s", buf.data);
pfree(buf.data);
pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt);
}
#endif
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
return EndPos;
}
测试脚本如下
insert into t_wal_partition(c1,c2,c3) VALUES(0,'HASH0','HAHS0');
启动gdb,设置断点,进入XLogInsert
(gdb) b XLogInsert
Breakpoint 1 at 0x5652d6: file xloginsert.c, line 420.
(gdb) c
Continuing.
Breakpoint 1, XLogInsert (rmid=10 '\n', info=0 '\000') at xloginsert.c:420
420 if (!begininsert_called)
在此前,XLogBeginInsert()必须已调用
420 if (!begininsert_called)
(gdb) n
调用方必须设置rmgr位:XLR_SPECIAL_REL_UPDATE & XLR_CHECK_CONSISTENCY.其余在这里保留使用
427 if ((info & ~(XLR_RMGR_INFO_MASK |
(gdb) n
432 TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
进入循环
(gdb) n
438 if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
(gdb)
457 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
获取决定是否执行全页写入所需的值
(gdb) p *RedoRecPtr
$1 = 1166604425
(gdb) p doPageWrites
$2 = false
(gdb) n
459 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
(gdb) p RedoRecPtr
$3 = 5411227832
(gdb) p doPageWrites
$4 = true
获取rdt
(gdb) n
462 EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
(gdb) p *rdt
$5 = {next = 0x2a911b8, data = 0x2a8f460 <incomplete sequence \322>, len = 51}
XLogInsertRecord->调用XLogInsertRecord,进入XLogInsertRecord函数
fpw_lsn=0, flags=1 '\001'
(gdb) step
XLogInsertRecord (rdata=0xf9cc70 <hdr_rdt>, fpw_lsn=0, flags=1 '\001') at xlog.c:970
970 XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogInsertRecord->获取插入管理器
(gdb) n
973 XLogRecord *rechdr = (XLogRecord *) rdata->data;
(gdb) p *Insert
$6 = {insertpos_lck = 0 '\000', CurrBytePos = 5395369608, PrevBytePos = 5395369552, pad = '\000' <repeats 127 times>,
RedoRecPtr = 5411227832, forcePageWrites = false, fullPageWrites = true, exclusiveBackupState = EXCLUSIVE_BACKUP_NONE,
nonExclusiveBackups = 0, lastBackupStart = 0, WALInsertLocks = 0x7fa2523D4100}
XLogInsertRecord->变量赋值
(gdb) n
974 uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
(gdb)
975 bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
(gdb)
979 bool prevDoPageWrites = doPageWrites;
(gdb)
982 Assert(rdata->len >= SizeOfXLogRecord);
(gdb)
(gdb) p *rechdr
$7 = {xl_tot_len = 210, xl_xid = 1948, xl_prev = 0, xl_info = 0 '\000', xl_rmid = 10 '\n', xl_crc = 3212449170}
(gdb) p info
$8 = 0 '\000'
(gdb) p isLogSwitch
$9 = false
(gdb) p prevDoPageWrites
$10 = true
XLogInsertRecord->执行相关判断,开启CRIT_SECTION,并获取WAL插入锁
(gdb) n
985 if (!XLogInsertAllowed())
(gdb)
1020 START_CRIT_SECTION();
(gdb)
1021 if (isLogSwitch)
(gdb)
1024 WALInsertLockAcquire();
(gdb)
1042 if (RedoRecPtr != Insert->RedoRecPtr)
(gdb)
XLogInsertRecord->执行相关判断,更新doPageWrites
(gdb) p RedoRecPtr
$11 = 5411227832
(gdb) p Insert->RedoRecPtr
$12 = 5411227832
(gdb) n
1047 doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
(gdb)
1049 if (doPageWrites &&
(gdb) p doPageWrites
$13 = true
(gdb) n
1050 (!prevDoPageWrites ||
(gdb)
1049 if (doPageWrites &&
XLogInsertRecord->在WAL预留记录空间.同时会设置xl_prev指针.
(gdb)
1050 (!prevDoPageWrites ||
(gdb)
1066 if (isLogSwitch)
(gdb)
1070 ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
(gdb)
1072 inserted = true;
(gdb) p rechdr->xl_tot_len
$14 = 210
(gdb) p StartPos
$15 = 5411228000
(gdb) p EndPos
$16 = 5411228216
(gdb) p *rechdr->xl_prev
Cannot access memory at address 0x14288c928
(gdb) p rechdr->xl_prev
$17 = 5411227944
(gdb)
XLogInsertRecord->现在xl_prev指针已填充,计算记录头部的CRC
(gdb) n
1075 if (inserted)
(gdb)
1081 rdata_crc = rechdr->xl_crc;
(gdb)
1082 COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
(gdb)
1083 FIN_CRC32C(rdata_crc);
(gdb)
1084 rechdr->xl_crc = rdata_crc;
(gdb)
1090 CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
(gdb) p rdata_crc
$18 = 2310972234
(gdb) p *rechdr
$19 = {xl_tot_len = 210, xl_xid = 1948, xl_prev = 5411227944, xl_info = 0 '\000', xl_rmid = 10 '\n', xl_crc = 2310972234}
(gdb)
XLogInsertRecord->所有的记录数据,包括头部数据已OK,准备插入!拷贝记录到保留空间中.
除非记录被标记为不重要,否则更新当前slot中最后一条重要记录的LSN.
(gdb) n
1098 if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
(gdb)
1100 int lockno = holdingAllLocks ? 0 : MyLockNo;
(gdb)
(gdb) n
1102 WALInsertLocks[lockno].l.lastImportantAt = StartPos;
(gdb)
1117 WALInsertLockRelease();
XLogInsertRecord->全部完成!让其他插入器知道我们已经完成了!
如跨越了page边界,更新共享的LogwrtRqst.Write变量
(gdb)
1117 WALInsertLockRelease();
(gdb) n
1119 MarkCurrentTransactionIdLoggedIfAny();
(gdb)
1121 END_CRIT_SECTION();
(gdb)
1126 if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
(gdb)
1142 if (isLogSwitch)
XLogInsertRecord->更新全局变量,函数返回
(gdb)
1220 ProcLastRecPtr = StartPos;
(gdb)
1221 XactLastRecEnd = EndPos;
(gdb)
1223 return EndPos;
(gdb)
1224 }
返回XLogInsert,重置insertion,返回EndPos,结束
(gdb)
XLogInsert (rmid=10 '\n', info=0 '\000') at xloginsert.c:463
463 } while (EndPos == InvalidXLogRecPtr);
(gdb) n
465 XLogResetInsertion();
(gdb)
467 return EndPos;
(gdb)
468 }
(gdb) p EndPos
$20 = 5411228216
(gdb)
$21 = 5411228216
(gdb) n
heap_insert (relation=0x7fa280616228, tup=0x2b15440, cid=0, options=0, bistate=0x0) at heapam.c:2590
2590 PageSetLSN(page, recptr);
(gdb)
“PostgreSQL中heap_insert->XLogInsert函数分析”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!
--结束END--
本文标题: PostgreSQL中heap_insert->XLogInsert函数分析
本文链接: https://lsjlt.com/news/64732.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0