返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(123)- MVCC#8(提交事务-实际提交过程)
  • 958
分享到

PostgreSQL 源码解读(123)- MVCC#8(提交事务-实际提交过程)

2024-04-02 19:04:59 958人浏览 薄情痞子
摘要

本节介绍了postgresql提

本节介绍了postgresql提交事务的具体实现逻辑,主要解析了函数CommitTransaction->RecordTransactionCommit的实现逻辑。

一、数据结构

TransactionState
事务状态结构体




typedef enum TransState
{
    TRANS_DEFAULT,              
    TRANS_START,                
    TRANS_INPROGRESS,           
    TRANS_COMMIT,               
    TRANS_ABORT,                
    TRANS_PREPARE               
} TransState;

typedef enum TBlockState
{
    
    TBLOCK_DEFAULT,             
    TBLOCK_STARTED,             
    
    TBLOCK_BEGIN,               
    TBLOCK_INPROGRESS,          
    TBLOCK_IMPLICIT_INPROGRESS, 
    TBLOCK_PARALLEL_INPROGRESS, 
    TBLOCK_END,                 
    TBLOCK_ABORT,               
    TBLOCK_ABORT_END,           
    TBLOCK_ABORT_PENDING,       
    TBLOCK_PREPARE,             
    
    TBLOCK_SUBBEGIN,            
    TBLOCK_SUBINPROGRESS,       
    TBLOCK_SUBRELEASE,          
    TBLOCK_SUBCOMMIT,           
    TBLOCK_SUBABORT,            
    TBLOCK_SUBABORT_END,        
    TBLOCK_SUBABORT_PENDING,    
    TBLOCK_SUBRESTART,          
    TBLOCK_SUBABORT_RESTART     
} TBlockState;

typedef struct TransactionStateData
{
    //事务ID
    TransactionId transactionId;    
    //子事务ID
    SubTransactionId subTransactionId;  
    //保存点名称
    char       *name;           
    //保存点级别
    int         savepointLevel; 
    //低级别的事务状态
    TransState  state;          
    //高级别的事务状态
    TBlockState blockState;     
    //事务嵌套深度
    int         nestingLevel;   
    //GUC上下文嵌套深度
    int         gucNestLevel;   
    //事务生命周期上下文
    MemoryContext curTransactionContext;    
    //查询资源
    ResourceOwner curTransactionOwner;  
    //按XID顺序保存的已提交的子事务ID
    TransactionId *childXids;   
    //childXids数组大小
    int         nChildXids;     
    //分配的childXids数组空间
    int         maxChildXids;   
    //上一个CurrentUserId
    Oid         prevUser;       
    //上一个SecurityRestrictionContext
    int         prevSecContext; 
    //上一事务是否只读?
    bool        prevXactReadOnly;   
    //是否处于Recovery?
    bool        startedInRecovery;  
    //XID是否已保存在WAL Record中?
    bool        didLogXid;      
    //Enter/ExitParallelMode计数器
    int         parallelModeLevel;  
    //父事务状态
    struct TransactionStateData *parent;    
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;

二、源码解读

RecordTransactionCommit函数,在WAL Record中记录COMMIT Record,返回最新的XID,如果xact没有XID,则返回InvalidTransactionId。.




static TransactionId
RecordTransactionCommit(void)
{
    TransactionId xid = GetTopTransactionIdIfAny();//获取XID
    bool        markXidCommitted = TransactionIdIsValid(xid);//标记
    TransactionId latestXid = InvalidTransactionId;//最后的XID
    int         nrels;
    RelFilenode *rels;
    int         nchildren;
    TransactionId *children;
    int         nmsgs = 0;
    SharedInvalidationMessage *invalMessages = NULL;
    bool        RelcacheInitFileInval = false;
    bool        wrote_xlog;
    
    //为WAL Record的commit record准备数据.
    nrels = smgrGetPendingDeletes(true, &rels);
    nchildren = xactGetCommittedChildren(&children);
    if (XLogStandbyInfoActive())
        nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
                                                     &RelcacheInitFileInval);
    wrote_xlog = (XactLastRecEnd != 0);
    
    if (!markXidCommitted)
    {
        
        if (nrels != 0)
            elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
        
        //没有child XIDs,AssignTransactionId会强制实现此逻辑.
        Assert(nchildren == 0);
        
        if (nmsgs != 0)
        {
            LogStandbyInvalidations(nmsgs, invalMessages,
                                    RelcacheInitFileInval);
            wrote_xlog = true;  
        }
        
        if (!wrote_xlog)
            Goto cleanup;
    }
    else
    {
        bool        replorigin;
        
        replorigin = (replorigin_session_origin != InvalidRepOriginId &&
                      replorigin_session_origin != DoNotReplicateId);
        
        
        //通知bufmgr和smgr准备提交
        BufmgrCommit();
        
        START_CRIT_SECTION();
        MyPgXact->delayChkpt = true;
        SetCurrentTransactionStopTimestamp();
        XactLoGCommitRecord(xactStopTimestamp,
                            nchildren, children, nrels, rels,
                            nmsgs, invalMessages,
                            RelcacheInitFileInval, forceSyncCommit,
                            MyXactFlags,
                            InvalidTransactionId, NULL  );
        if (replorigin)
            
          //为该复制源向前移动LSNs
            replorigin_session_advance(replorigin_session_origin_lsn,
                                       XactLastRecEnd);
        
        if (!replorigin || replorigin_session_origin_timestamp == 0)
            replorigin_session_origin_timestamp = xactStopTimestamp;
        TransactionTreeSetCommitTsData(xid, nchildren, children,
                                       replorigin_session_origin_timestamp,
                                       replorigin_session_origin, false);
    }
    
    if ((wrote_xlog && markXidCommitted &&
         synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
        forceSyncCommit || nrels > 0)
    {
        XLogFlush(XactLastRecEnd);
        
        if (markXidCommitted)
            TransactionIdCommitTree(xid, nchildren, children);
    }
    else
    {
        //异步提交
        
        XLogSetAsyncXactLSN(XactLastRecEnd);
        
        if (markXidCommitted)
            TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
    }
    
    if (markXidCommitted)
    {
        MyPgXact->delayChkpt = false;
        END_CRIT_SECTION();
    }
    
    //如持有子XIDs,计算最后的latestXid
    latestXid = TransactionIdLatest(xid, nchildren, children);
    
    if (wrote_xlog && markXidCommitted)
        SyncRepWaitForLSN(XactLastRecEnd, true);
    
    //记录最后commit记录的位置
    XactLastCommitEnd = XactLastRecEnd;
    
    //重置XactLastRecEnd直至下个事务写入数据.
    XactLastRecEnd = 0;
cleanup:
    
    //清除本地数据
    if (rels)
        pfree(rels);
    //返回XID
    return latestXid;
}

三、跟踪分析

插入数据,执行commit



10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;

启动gdb,设置断点



(gdb) b RecordTransactionCommit
Breakpoint 2 at 0x547528: file xact.c, line 1141.
(gdb) c
Continuing.
Breakpoint 2, RecordTransactionCommit () at xact.c:1141
1141        TransactionId xid = GetTopTransactionIdIfAny();
(gdb)

查看调用栈



(gdb) bt
#0  RecordTransactionCommit () at xact.c:1141
#1  0x00000000005483f2 in CommitTransaction () at xact.c:2070
#2  0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#3  0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#4  0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#5  0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3Dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
    at postgres.c:4182
#6  0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#7  0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#8  0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#9  0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#10 0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)

获取事务ID



(gdb) p xid
$3 = 2411
(gdb)

设置其他变量,markXidCommitted —> True



(gdb) n
1143        TransactionId latestXid = InvalidTransactionId;
(gdb) 
1148        int         nmsgs = 0;
(gdb) 
1149        SharedInvalidationMessage *invalMessages = NULL;
(gdb) 
1150        bool        RelcacheInitFileInval = false;
(gdb) 
1154        nrels = smgrGetPendingDeletes(true, &rels);
(gdb) 
1155        nchildren = xactGetCommittedChildren(&children);
(gdb) 
1156        if (XLogStandbyInfoActive())
(gdb) 
1159        wrote_xlog = (XactLastRecEnd != 0);
(gdb) 
1165        if (!markXidCommitted)
(gdb) p latestXid
$4 = 0
(gdb) p markXidCommitted
$5 = true
(gdb) p nrels
$6 = 0
(gdb) p nchildren
$7 = 0
(gdb) p wrote_xlog
$8 = true
(gdb)

markXidCommitted为T,进入相应的处理逻辑.
开始进入提交关键部分并插入commit XLOG记录。



(gdb) n
1214            replorigin = (replorigin_session_origin != InvalidRepOriginId &&
(gdb) 
1221            BufmgrCommit();
(gdb) p replorigin
$9 = false
(gdb)

进入提交部分,设置当前事务时间戳



(gdb) n
1240            START_CRIT_SECTION();
(gdb) 
1241            MyPgXact->delayChkpt = true;
(gdb) 
1243            SetCurrentTransactionStopTimestamp();
(gdb) p *MyPgXact
$10 = {xid = 2411, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = true, nxids = 0 '\000'}
(gdb)

插入XLOG



(gdb) n
1245            XactLogCommitRecord(xactStopTimestamp,
(gdb) 
1252            if (replorigin)
(gdb)

设置提交事务数据



(gdb) 
1267            if (!replorigin || replorigin_session_origin_timestamp == 0)
(gdb) 
1268                replorigin_session_origin_timestamp = xactStopTimestamp;
(gdb) 
1270            TransactionTreeSetCommitTsData(xid, nchildren, children,
(gdb) 
1300        if ((wrote_xlog && markXidCommitted &&
(gdb)

同步刷新XLOG



(gdb) 
1301             synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
(gdb) 
1300        if ((wrote_xlog && markXidCommitted &&
(gdb) 
1304            XLogFlush(XactLastRecEnd);
(gdb) 
1309            if (markXidCommitted)
(gdb)

更新CLOG,如果我们在上面已写入了COMMIT WAL Record.



(gdb) 
1310                TransactionIdCommitTree(xid, nchildren, children);
(gdb) 
1309            if (markXidCommitted)
(gdb)

退出提交关键区域



(gdb) 
1340        if (markXidCommitted)
(gdb) 
1342            MyPgXact->delayChkpt = false;
(gdb) 
1343            END_CRIT_SECTION();
(gdb)

计算最后的latestXid



(gdb) 
1347        latestXid = TransactionIdLatest(xid, nchildren, children);
(gdb) n
1358        if (wrote_xlog && markXidCommitted)
(gdb) p latestXid
$11 = 2411
(gdb)

记录最后commit记录的位置



(gdb) n
1359            SyncRepWaitForLSN(XactLastRecEnd, true);
(gdb) 
1362        XactLastCommitEnd = XactLastRecEnd;
(gdb) 
1365        XactLastRecEnd = 0;
(gdb) 
1368        if (rels)
(gdb) 
1371        return latestXid;
(gdb) p XactLastCommitEnd
$12 = 5522364896
(gdb)

返回,完成调用



(gdb) n
1372    }
(gdb) 
CommitTransaction () at xact.c:2087
2087        TRACE_POSTGREsql_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)

DONE!

四、参考资料

How Postgres Makes Transactions Atomic
PG Source Code

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(123)- MVCC#8(提交事务-实际提交过程)

本文链接: https://lsjlt.com/news/49124.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作