返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(4)- 插入数据#3(heap_insert)
  • 776
分享到

PostgreSQL 源码解读(4)- 插入数据#3(heap_insert)

2024-04-02 19:04:59 776人浏览 薄情痞子
摘要

本文简单介绍了PG插入数据部分的源码,这是第三部分,主要内容包括heap_insert函数的实现逻辑,该函数在源文件heapam.c中。 一、基础信息 heap_insert使用的

本文简单介绍了PG插入数据部分的源码,这是第三部分,主要内容包括heap_insert函数的实现逻辑,该函数在源文件heapam.c中。

一、基础信息

heap_insert使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义

1、CommandId
32bit无符号整型
typedef uint32 CommandId;

2、options
整型,标记bits
 
 #define HEAP_INSERT_SKIP_WAL    0x0001
 #define HEAP_INSERT_SKIP_FSM    0x0002
 #define HEAP_INSERT_FROZEN      0x0004
 #define HEAP_INSERT_SPECULATIVE 0x0008

3、BulkInsertState
批量插入状态指针
  
 typedef struct BulkInsertStateData
 {
     BufferAccessStrategy strategy;  
     Buffer      current_buf;    
 }     BulkInsertStateData;
 
typedef struct BulkInsertStateData *BulkInsertState;

4、TransactionId
32bit无符号整型
 typedef uint32 TransactionId;
 typedef uint32 LocalTransactionId;
 typedef uint32 SubTransactionId;

5、xl_heap_insert
 typedef struct xl_heap_insert
 {
     OffsetNumber offnum;        
     uint8       flags;
     
 } xl_heap_insert;
 
 #define SizeOfHeapinsert    (offsetof(xl_heap_insert, flags) + sizeof(uint8))

6、xl_heap_header
 typedef struct xl_heap_header
 {
     uint16      t_infomask2;
     uint16      t_infomask;
     uint8       t_hoff;
 } xl_heap_header;
 
 #define SizeOfHeapHeader    (offsetof(xl_heap_header, t_hoff) + sizeof(uint8))

7、XLogRecPtr
64bit无符号长整型
 typedef uint64 XLogRecPtr;

依赖的函数
1、heap_prepare_insert


static HeapTuple
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
                    CommandId cid, int options)
{
    
    //暂不支持并行操作
    if (IsParallelWorker())
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                 errmsg("cannot insert tuples in a parallel worker")));

    //设置Oid
    if (relation->rd_rel->relhasoids)
    {
#ifdef NOT_USED
        
        Assert(tup->t_data->t_infomask & HEAP_HASOID);
#endif

        
        if (!OidIsValid(HeapTupleGetOid(tup)))
            HeapTupleSetOid(tup, GetNewOid(relation));
    }
    else
    {
        
        Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
    }

    //设置标记位t_infomask/t_infomask2
    tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);//HEAP_XACT_MASK=0xFFF0,取反
    tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);//HEAP2_XACT_MASK=0xE000,取反
    tup->t_data->t_infomask |= HEAP_XMAX_INVALID;//插入数据,XMAX设置为invalid
    HeapTupleHeaderSetXmin(tup->t_data, xid);//设置xmin为当前事务id
    if (options & HEAP_INSERT_FROZEN)//冻结型插入(在事务id回卷时发生)
        HeapTupleHeaderSetXminFrozen(tup->t_data);
    //设置cid
    HeapTupleHeaderSetCmin(tup->t_data, cid);
    //设置xmax=0
    HeapTupleHeaderSetXmax(tup->t_data, 0); 
    //设置Oid
    tup->t_tableOid = RelationGetRelid(relation);

    
    if (relation->rd_rel->relkind != RELKIND_RELATION &&
        relation->rd_rel->relkind != RELKIND_MATVIEW)
    {
        
        Assert(!HeapTupleHasExternal(tup));
        return tup;
    }
    else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
        return toast_insert_or_update(relation, tup, NULL, options);
    else
        return tup;
}

2、RelationGetBufferForTuple
稍长,请耐心阅读,如能读懂,必有收获


Buffer
RelationGetBufferForTuple(Relation relation, Size len,
                          Buffer otherBuffer, int options,
                          BulkInsertState bistate,
                          Buffer *vmbuffer, Buffer *vmbuffer_other)
{
    bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);//是否使用FSM寻找空闲空间
    Buffer      buffer = InvalidBuffer;//
    Page        page;//
    Size        pageFreeSpace = 0,//page空闲空间
                saveFreeSpace = 0;//page需要预留的空间
    BlockNumber targetBlock,//目标Block
                otherBlock;//上一次pinned的buffer对应的Block
    bool        needLock;//是否需要上

    len = MAXALIGN(len);        //大小对齐

    
    Assert(otherBuffer == InvalidBuffer || !bistate);//otherBuffer有效,说明是update操作,不支持bi(BulkInsert)

    
    if (len > MaxHeapTupleSize)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("row is too big: size %zu, maximum size %zu",
                        len, MaxHeapTupleSize)));

    
    //获取预留空间
    saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                   HEAP_DEFAULT_FILLFACTOR);
    //update操作,获取上次pinned buffer对应的Block
    if (otherBuffer != InvalidBuffer)
        otherBlock = BufferGetBlockNumber(otherBuffer);
    else
        otherBlock = InvalidBlockNumber;    

    
    if (len + saveFreeSpace > MaxHeapTupleSize)
    {
        //如果需要的大小+预留空间大于可容纳的最大Tuple大小,不使用FSM,扩展后再尝试
        
        targetBlock = InvalidBlockNumber;
        use_fsm = false;
    }
    else if (bistate && bistate->current_buf != InvalidBuffer)//BulkInsert模式
        targetBlock = BufferGetBlockNumber(bistate->current_buf);
    else
        targetBlock = RelationGetTargetBlock(relation);//普通Insert模式

    if (targetBlock == InvalidBlockNumber && use_fsm)//还没有找到合适的BlockNumber,需要使用FSM
    {
        
        //使用FSM申请空闲空间=len + saveFreeSpace的块
        targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);

        
        //申请不到,使用最后一个块,否则扩展或者放弃
        if (targetBlock == InvalidBlockNumber)
        {
            BlockNumber nblocks = RelationGetNumberOfBlocks(relation);

            if (nblocks > 0)
                targetBlock = nblocks - 1;
        }
    }

loop:
    while (targetBlock != InvalidBlockNumber)//已成功获取插入数据的块号
    {
        
        if (otherBuffer == InvalidBuffer)//非Update操作
        {
            
            buffer = ReadBufferBI(relation, targetBlock, bistate);//获取Buffer
            if (PageIsAllVisible(BufferGetPage(buffer)))
                //如果Page全局可见,那么把Page Pin在内存中(Pin的意思是固定/保留)
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//锁定buffer
        }
        else if (otherBlock == targetBlock)//Update操作,新记录跟原记录在同一个Block中
        {
            
            buffer = otherBuffer;
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
        }
        else if (otherBlock < targetBlock)//Update操作,原记录所在的Block < 新记录的Block
        {
            
            buffer = ReadBuffer(relation, targetBlock);
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);//优先锁定BlockNumber小的那个
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
        }
        else//Update操作,原记录所在的Block > 新记录的Block
        {
            
            buffer = ReadBuffer(relation, targetBlock);
            if (PageIsAllVisible(BufferGetPage(buffer)))
                visibilitymap_pin(relation, targetBlock, vmbuffer);
            LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//优先锁定BlockNumber小的那个
            LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
        }

        
        if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
            GetVisibilityMapPins(relation, buffer, otherBuffer,
                                 targetBlock, otherBlock, vmbuffer,
                                 vmbuffer_other);//Pin VM在内存中
        else
            GetVisibilityMapPins(relation, otherBuffer, buffer,
                                 otherBlock, targetBlock, vmbuffer_other,
                                 vmbuffer);//Pin VM在内存中

        
        page = BufferGetPage(buffer);
        pageFreeSpace = PageGetHeapFreeSpace(page);
        if (len + saveFreeSpace <= pageFreeSpace)//有足够的空间存储数据,返回此Buffer
        {
            
            RelationSetTargetBlock(relation, targetBlock);
            return buffer;
        }

        
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
        if (otherBuffer == InvalidBuffer)
            ReleaseBuffer(buffer);
        else if (otherBlock != targetBlock)
        {
            LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
            ReleaseBuffer(buffer);
        }

        
        if (!use_fsm)//不使用FSM定位空闲空间,跳出循环,执行扩展
            break;

        
        //使用FSM获取下一个备选的Block
        //注意:如果全部扫描后发现没有满足条件的Block,targetBlock = InvalidBlockNumber,跳出循环
        targetBlock = RecordAndGetPageWithFreeSpace(relation,
                                                    targetBlock,
                                                    pageFreeSpace,
                                                    len + saveFreeSpace);
    }
    
    //没有获取满足条件的Block,扩展表
    
    needLock = !RELATION_IS_LOCAL(relation);//新创建的数据表或者临时表,无需Lock

    
    if (needLock)//需要锁定
    {
        if (!use_fsm)
            LockRelationForExtension(relation, ExclusiveLock);
        else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
        {
            
            LockRelationForExtension(relation, ExclusiveLock);

            
            //如有其它进程扩展了数据表,那么可以成功获取满足条件的targetBlock
            targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);

            
            if (targetBlock != InvalidBlockNumber)
            {
                UnlockRelationForExtension(relation, ExclusiveLock);
                Goto loop;
            }

            
            //其它进程没有扩展
            //Just extend it!
            RelationAddExtraBlocks(relation, bistate);
        }
    }

    
    //扩展表后,New Page!
    buffer = ReadBufferBI(relation, P_NEW, bistate);

    
    if (otherBuffer != InvalidBuffer)
        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);//otherBuffer的顺序一定在扩展的Block之后,Lock it!

    
    LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);//锁定New Page

    
    if (needLock)
        UnlockRelationForExtension(relation, ExclusiveLock);//释放扩展锁

    
    page = BufferGetPage(buffer);//获取相应的Page

    if (!PageIsNew(page))//不是New Page,那一定某个地方搞错了!
        elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
             BufferGetBlockNumber(buffer),
             RelationGetRelationName(relation));
    //初始化New Page
    PageInit(page, BufferGetPageSize(buffer), 0);
    //New Page也满足不了要求的大小,报错
    if (len > PageGetHeapFreeSpace(page))
    {
        
        elog(PANIC, "tuple is too big: size %zu", len);
    }

    
    //终于找到了可用于存储数据的Block
    RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
    //返回
    return buffer;
}

//-------------------------------------------------------------------------------

static Buffer
ReadBufferBI(Relation relation, BlockNumber targetBlock,
             BulkInsertState bistate)
{
    Buffer      buffer;

    
    if (!bistate)
        return ReadBuffer(relation, targetBlock);//非BulkInsert模式,使用常规方法获取
    
    //TODO 以下为BI模式
    
    if (bistate->current_buf != InvalidBuffer)
    {
        if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
        {
            IncrBufferRefCount(bistate->current_buf);
            return bistate->current_buf;
        }
        
        ReleaseBuffer(bistate->current_buf);
        bistate->current_buf = InvalidBuffer;
    }

    
    buffer = ReadBufferExtended(relation, MaiN_FORKNUM, targetBlock,
                                RBM_NORMAL, bistate->strategy);

    
    IncrBufferRefCount(buffer);
    bistate->current_buf = buffer;

    return buffer;
}

 
 Buffer
 ReadBuffer(Relation reln, BlockNumber blockNum)
 {
     return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
 }

 typedef enum ForkNumber
 {
     InvalidForkNumber = -1,
     MAIN_FORKNUM = 0,
     FSM_FORKNUM,
     VISIBILITYMAP_FORKNUM,
     INIT_FORKNUM
 
     
 } ForkNumber; 
//参考url : https://www.postgresql.org/docs/11/static/storage-file-layout.html

 
 Buffer
 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
                    ReadBufferMode mode, BufferAccessStrategy strategy)
 {
     bool        hit;
     Buffer      buf;
 
     
     RelationOpenSmgr(reln);//Smgr=Storage Manager,数据表存储管理封装
 
     
     if (RELATION_IS_OTHER_TEMP(reln))
         ereport(ERROR,
                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                  errmsg("cannot access temporary tables of other sessions")));
 
     
     pgstat_count_buffer_read(reln);//统计信息
     //TODO Buffer管理后续再行解读
     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
                             forkNum, blockNum, mode, strategy, &hit);
     if (hit)
         pgstat_count_buffer_hit(reln);//统计信息
     return buf;
 }
 

3、CheckForSerializableConflictIn
检查序列化操作是否会出现冲突。比如并发执行delete & update操作的时候。


 void
 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
                                Buffer buffer)
 {
     PREDICATELOCKTARGETTAG targettag;
 
     if (!SerializationNeededForWrite(relation))
         return;
 
     
     if (SxactIsDoomed(MySerializableXact))
         ereport(ERROR,
                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                  errmsg("could not serialize access due to read/write dependencies among transactions"),
                  errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
                  errhint("The transaction might succeed if retried.")));
 
     
     MyXactDidWrite = true;
 
     
     if (tuple != NULL)
     {
         SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
                                          relation->rd_node.dbNode,
                                          relation->rd_id,
                                          ItemPointerGetBlockNumber(&(tuple->t_self)),
                                          ItemPointerGetOffsetNumber(&(tuple->t_self)));
         CheckTargetForConflictsIn(&targettag);
     }
 
     if (BufferIsValid(buffer))
     {
         SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
                                         relation->rd_node.dbNode,
                                         relation->rd_id,
                                         BufferGetBlockNumber(buffer));
         CheckTargetForConflictsIn(&targettag);
     }
 
     SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
                                         relation->rd_node.dbNode,
                                         relation->rd_id);
     CheckTargetForConflictsIn(&targettag);
 }

4、START_CRIT_SECTION

extern PGDLLIMPORT volatile uint32 CritSectionCount; 
#define START_CRIT_SECTION()  (CritSectionCount++)

5、PageIsAllVisible

通过位操作判断Page是否All Visible
#define PageIsAllVisible(page) \
     (((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)

6、PageClearAllVisible

通过位操作清除All Visible标记
 #define PageClearAllVisible(page) \
  (((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)

7、visibilitymap_clear

//TODO 缓冲区管理相关的设置,待进一步理解
 
 bool
 visibilitymap_clear(Relation rel, BlockNumber heapBlk, Buffer buf, uint8 flags)
 {
     BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
     int         mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
     int         mapOffset = HEAPBLK_TO_OFFSET(heapBlk);
     uint8       mask = flags << mapOffset;
     char       *map;
     bool        cleared = false;
 
     Assert(flags & VISIBILITYMAP_VALID_BITS);
 
 #ifdef TRACE_VISIBILITYMAP
     elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
 #endif
      if (!BufferIsValid(buf) || BufferGetBlockNumber(buf) != mapBlock)
         elog(ERROR, "wrong buffer passed to visibilitymap_clear");
 
     LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
     map = PageGetContents(BufferGetPage(buf));
 
     if (map[mapByte] & mask)
     {
         map[mapByte] &= ~mask;
          MarkBufferDirty(buf);
         cleared = true;
     }
      LockBuffer(buf, BUFFER_LOCK_UNLOCK);
      return cleared;
 }

8、MarkBufferDirty

//设置缓冲块为Dirty(待Flush到数据文件)
//TODO 缓冲区相关管理
 
 void
 MarkBufferDirty(Buffer buffer)
 {
     BufferDesc *bufHdr;
     uint32      buf_state;
     uint32      old_buf_state;
 
     if (!BufferIsValid(buffer))
         elog(ERROR, "bad buffer ID: %d", buffer);
 
     if (BufferIsLocal(buffer))
     {
         MarkLocalBufferDirty(buffer);
         return;
     }
 
     bufHdr = GetBufferDescriptor(buffer - 1);
 
     Assert(BufferIsPinned(buffer));
     Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
                                 LW_EXCLUSIVE));
 
     old_buf_state = pg_atomic_read_u32(&bufHdr->state);
     for (;;)
     {
         if (old_buf_state & BM_LOCKED)
             old_buf_state = WaitBufHdrUnlocked(bufHdr);
 
         buf_state = old_buf_state;
 
         Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
         buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
 
         if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
                                            buf_state))
             break;
     }
 
     
     if (!(old_buf_state & BM_DIRTY))
     {
         VacuumPageDirty++;
         pgBufferUsage.shared_blks_dirtied++;
         if (VacuumCostActive)
             VacuumCostBalance += VacuumCostPageDirty;
     }
 }

9、RelationNeedsWAL

非临时表,需持久化的数据表
 
 #define RelationNeedsWAL(relation) \
     ((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)

10、RelationIsAccessibleInLogicalDecoding

 
 #define RelationIsAccessibleInLogicalDecoding(relation) \
     (XLogLogicalInfoActive() && \ //处于逻辑复制活动状态
      RelationNeedsWAL(relation) && \ //需要写WAL日志
      (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))//Catalog类型表

11、log_heap_new_cid

 
 static XLogRecPtr
 log_heap_new_cid(Relation relation, HeapTuple tup)
 {
     xl_heap_new_cid xlrec;
 
     XLogRecPtr  recptr;
     HeapTupleHeader hdr = tup->t_data;
 
     Assert(ItemPointerIsValid(&tup->t_self));
     Assert(tup->t_tableOid != InvalidOid);
 
     xlrec.top_xid = GetTopTransactionId();
     xlrec.target_node = relation->rd_node;
     xlrec.target_tid = tup->t_self;
 
     
     if (hdr->t_infomask & HEAP_COMBOCID)
     {
         Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
         Assert(!HeapTupleHeaderXminInvalid(hdr));
         xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
         xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
         xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
     }
     
     else
     {
         
         if (hdr->t_infomask & HEAP_XMAX_INVALID ||
             HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
         {
             xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
             xlrec.cmax = InvalidCommandId;
         }
         
         else
         {
             xlrec.cmin = InvalidCommandId;
             xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
 
         }
         xlrec.combocid = InvalidCommandId;
     }
 
     
     XLogBeginInsert();
     XLogReGISterData((char *) &xlrec, SizeOfHeapNewCid);
 
     
 
     recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
 
     return recptr;
 }

12、RelationIsLogicallyLogged
判断数据表是否正在可用于逻辑复制,如需要,则需要记录足够信息用于后续的日志解析

 
 #define RelationIsLogicallyLogged(relation) \
     (XLogLogicalInfoActive() && \
      RelationNeedsWAL(relation) && \
      !IsCatalogRelation(relation))
 

13、XLog*

XLogBeginInsert
XLogRegisterData
XLogRegisterBuffer
XLogRegisterBufData
XLogSetRecordFlags
XLogInsert

14、PageSetLSN
设置PageHeader的LSN(先前已解析)

 #define PageSetLSN(page, lsn) \
  PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn)

15、END_CRIT_SECTION

 #define END_CRIT_SECTION() \
 do { \
     Assert(CritSectionCount > 0); \
     CritSectionCount--; \
 } while(0)

16、UnlockReleaseBuffer
释放Buffer锁

 
 void
 UnlockReleaseBuffer(Buffer buffer)
 {
     LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
     ReleaseBuffer(buffer);
 }

17、ReleaseBuffer
Unpin Buffer,意味着Buffer可Flush用于其他地方

 
 void
 ReleaseBuffer(Buffer buffer)
 {
     if (!BufferIsValid(buffer))
         elog(ERROR, "bad buffer ID: %d", buffer);
 
     if (BufferIsLocal(buffer))
     {
         ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
          Assert(LocalRefCount[-buffer - 1] > 0);
         LocalRefCount[-buffer - 1]--;
         return;
     }
      UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
 }

18、CacheInvalidateHeapTuple
缓存那些已“无用”的Tuple,比如Update操作的原记录,Delete操作的原记录等。

 
 void
 CacheInvalidateHeapTuple(Relation relation,
                          HeapTuple tuple,
                          HeapTuple newtuple)
 {
     Oid         tupleRelId;
     Oid         databaseId;
     Oid         relationId;
 
     
     if (IsBootstrapProcessingMode())
         return;
 
     
     if (!IsCatalogRelation(relation))
         return;
 
     
     if (IsToastRelation(relation))
         return;
 
     
     PrepareInvalidationState();
 
     
     tupleRelId = RelationGetRelid(relation);
     if (RelationInvalidatesSnapshotsOnly(tupleRelId))
     {
         databaseId = IssharedRelation(tupleRelId) ? InvalidOid : MyDatabaseId;
         RegisterSnapshotInvalidation(databaseId, tupleRelId);
     }
     else
         PrepareToInvalidateCacheTuple(relation, tuple, newtuple,
                                       RegisterCatcacheInvalidation);
 
     
     if (tupleRelId == RelationRelationId)
     {
         Form_pg_class classtup = (Form_pg_class) GETSTRUCT(tuple);
 
         relationId = HeapTupleGetOid(tuple);
         if (classtup->relisshared)
             databaseId = InvalidOid;
         else
             databaseId = MyDatabaseId;
     }
     else if (tupleRelId == AttributeRelationId)
     {
         Form_pg_attribute atttup = (Form_pg_attribute) GETSTRUCT(tuple);
 
         relationId = atttup->attrelid;
 
         
         databaseId = MyDatabaseId;
     }
     else if (tupleRelId == IndexRelationId)
     {
         Form_pg_index indextup = (Form_pg_index) GETSTRUCT(tuple);
 
         
         relationId = indextup->indexrelid;
         databaseId = MyDatabaseId;
     }
     else
         return;
 
     
     RegisterRelcacheInvalidation(databaseId, relationId);
 }

19、heap_freetuple
释放内存

 
 void
 heap_freetuple(HeapTuple htup)
 {
     pfree(htup);
 }

二、源码解读

heap_insert函数本身不复杂,最为复杂的子函数RelationGetBufferForTuple已在上一小节解析


Oid
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
            int options, BulkInsertState bistate)
{
    TransactionId xid = GetCurrentTransactionId();//事务id
    HeapTuple   heaptup;//Heap Tuple数据,亦即数据行
    Buffer      buffer;//数据缓存块
    Buffer      vmbuffer = InvalidBuffer;//vm缓冲块
    bool        all_visible_cleared = false;//标记

    
    //插入前准备工作,比如设置t_infomask标记等
    heaptup = heap_prepare_insert(relation, tup, xid, cid, options);

    
    //获取相应的buffer,详见上面的子函数解析
    buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                       InvalidBuffer, options, bistate,
                                       &vmbuffer, NULL);

    
    //检查序列化是否冲突
    CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);

    
    //开始,变量+1
    START_CRIT_SECTION();
    //插入数据(详见上一节对该函数的解析)
    RelationPutHeapTuple(relation, buffer, heaptup,
                         (options & HEAP_INSERT_SPECULATIVE) != 0);
    //如Page is All Visible
    if (PageIsAllVisible(BufferGetPage(buffer)))
    {
        //复位
        all_visible_cleared = true;
        PageClearAllVisible(BufferGetPage(buffer));
        visibilitymap_clear(relation,
                            ItemPointerGetBlockNumber(&(heaptup->t_self)),
                            vmbuffer, VISIBILITYMAP_VALID_BITS);
    }

    
    //设置缓冲块为脏块
    MarkBufferDirty(buffer);

    
    //记录日志
    if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
    {
        xl_heap_insert xlrec;
        xl_heap_header xlhdr;
        XLogRecPtr  recptr;
        Page        page = BufferGetPage(buffer);
        uint8       info = XLOG_HEAP_INSERT;
        int         bufflags = 0;

        
        if (RelationIsAccessibleInLogicalDecoding(relation))
            log_heap_new_cid(relation, heaptup);

        
        if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
            PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
        {
            info |= XLOG_HEAP_INIT_PAGE;
            bufflags |= REGBUF_WILL_INIT;
        }

        xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
        xlrec.flags = 0;
        if (all_visible_cleared)
            xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
        if (options & HEAP_INSERT_SPECULATIVE)
            xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
        Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));

        
        if (RelationIsLogicallyLogged(relation))
        {
            xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
            bufflags |= REGBUF_KEEP_DATA;
        }

        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);

        xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
        xlhdr.t_infomask = heaptup->t_data->t_infomask;
        xlhdr.t_hoff = heaptup->t_data->t_hoff;

        
        XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
        XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
        
        XLogRegisterBufData(0,
                            (char *) heaptup->t_data + SizeofHeapTupleHeader,
                            heaptup->t_len - SizeofHeapTupleHeader);

        
        XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);

        recptr = XLogInsert(RM_HEAP_ID, info);

        PageSetLSN(page, recptr);
    }
    //完成!
    END_CRIT_SECTION();
    //解锁Buffer,包括vm buffer
    UnlockReleaseBuffer(buffer);
    if (vmbuffer != InvalidBuffer)
        ReleaseBuffer(vmbuffer);

    
    //缓存操作后变“无效”的Tuple
    CacheInvalidateHeapTuple(relation, heaptup, NULL);

    
    //更新统计信息
    pgstat_count_heap_insert(relation, 1);

    
    if (heaptup != tup)
    {
        tup->t_self = heaptup->t_self;
        heap_freetuple(heaptup);
    }

    return HeapTupleGetOid(tup);
}

三、跟踪分析

插入一条记录,使用gdb进行跟踪分析:

-- 这次启动事务
testdb=# begin;
BEGIN
testdb=# select pg_backend_pid();
 pg_backend_pid 
----------------
           1556
(1 row)
testdb=# insert into t_insert values(11,'11','11','11');
(挂起)

启动gdb:

[root@localhost ~]# gdb -p 1556
GNU gdb (GDB) Red Hat Enterprise linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b heap_insert
Breakpoint 1 at 0x4c343c: file heapam.c, line 2444.
#输入参数:
(gdb) p *relation
$1 = {rd_node = {spcNode = 1663, dbNode = 16477, relNode = 26731}, rd_smgr = 0x0, rd_refcnt = 1, rd_backend = -1, rd_islocaltemp = false, rd_isnailed = false, rd_isvalid = true, 
  rd_indexvalid = 0 '\000', rd_statvalid = false, rd_createSubid = 0, rd_newRelfilenodeSubid = 0, rd_rel = 0x7f5fdd1771f0, rd_att = 0x7f5fdd177300, rd_id = 26731, rd_lockInfo = {lockRelId = {
      relId = 26731, dbId = 16477}}, rd_rules = 0x0, rd_rulescxt = 0x0, trigdesc = 0x0, rd_rsdesc = 0x0, rd_fkeylist = 0x0, rd_fkeyvalid = false, rd_parTKEycxt = 0x0, rd_partkey = 0x0, rd_pdcxt = 0x0, 
  rd_partdesc = 0x0, rd_partcheck = 0x0, rd_indexlist = 0x0, rd_oidindex = 0, rd_pkindex = 0, rd_replidindex = 0, rd_statlist = 0x0, rd_indexattr = 0x0, rd_projindexattr = 0x0, rd_keyattr = 0x0, 
  rd_pkattr = 0x0, rd_idattr = 0x0, rd_projidx = 0x0, rd_pubactions = 0x0, rd_options = 0x0, rd_index = 0x0, rd_indextuple = 0x0, rd_amhandler = 0, rd_indexcxt = 0x0, rd_amroutine = 0x0, 
  rd_opfamily = 0x0, rd_opcintype = 0x0, rd_support = 0x0, rd_supportinfo = 0x0, rd_indoption = 0x0, rd_indexprs = 0x0, rd_indpred = 0x0, rd_exclops = 0x0, rd_exclprocs = 0x0, rd_exclstrats = 0x0, 
  rd_amcache = 0x0, rd_indcollation = 0x0, rd_fdwroutine = 0x0, rd_toastoid = 0, pgstat_info = 0x146f9b8}
(gdb) p *tup
$2 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_tableOid = 26731, t_data = 0x14b19f8}
(gdb) p *(tup->t_data)
$3 = {t_choice = {t_heap = {t_xmin = 244, t_xmax = 4294967295, t_field3 = {t_cid = 2249, t_xvac = 2249}}, t_datum = {datum_len_ = 244, datum_typmod = -1, datum_typeid = 2249}}, t_ctid = {ip_blkid = {
      bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_infomask2 = 4, t_infomask = 2, t_hoff = 24 '\030', t_bits = 0x14b1a0f ""}
(gdb) p *(tup->t_data->t_bits)
$4 = 0 '\000'
(gdb) p cid
$5 = 0
(gdb) p options
$6 = 0
(gdb) p bistate
$7 = (BulkInsertState) 0x0
(gdb) next
2447        Buffer      vmbuffer = InvalidBuffer;
(gdb) p xid
$8 = 1612859
(gdb) next
2448        bool        all_visible_cleared = false;
(gdb) 
2457        heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
(gdb) 
2463        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
(gdb) p *heaptup
$9 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_tableOid = 26731, t_data = 0x14b19f8}
(gdb) next
2482        CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
(gdb) p buffer
$10 = 185
(gdb) next
2485        START_CRIT_SECTION();
(gdb) 
2488                             (options & HEAP_INSERT_SPECULATIVE) != 0);
(gdb) 
2487        RelationPutHeapTuple(relation, buffer, heaptup,
(gdb) 
2490        if (PageIsAllVisible(BufferGetPage(buffer)))
(gdb) 
2510        MarkBufferDirty(buffer);
(gdb) p buffer
$11 = 185
(gdb) next
2513        if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
(gdb) 
2518            Page        page = BufferGetPage(buffer);
(gdb) 
2519            uint8       info = XLOG_HEAP_INSERT;
(gdb) p *page
$12 = 1 '\001'
(gdb) p *(PageHeader)page
$13 = {pd_lsn = {xlogid = 1, xrecoff = 3677481952}, pd_checksum = 0, pd_flags = 0, pd_lower = 64, pd_upper = 7552, pd_special = 8192, pd_pagesize_version = 8196, pd_prune_xid = 0, 
  pd_linp = 0x7f5fc5409318}
(gdb) next
2520            int         bufflags = 0;
(gdb) 
2526            if (RelationIsAccessibleInLogicalDecoding(relation))
(gdb) 
2534            if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
(gdb) 
2541            xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
(gdb) 
2542            xlrec.flags = 0;
(gdb) 
2543            if (all_visible_cleared)
(gdb) 
2545            if (options & HEAP_INSERT_SPECULATIVE)
(gdb) 
2554            if (RelationIsLogicallyLogged(relation))
(gdb) 
2560            XLogBeginInsert();
(gdb) 
2561            XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
(gdb) p xlrec
$14 = {offnum = 10, flags = 0 '\000'}
(gdb) next
2563            xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
(gdb) 
2564            xlhdr.t_infomask = heaptup->t_data->t_infomask;
(gdb) 
2565            xlhdr.t_hoff = heaptup->t_data->t_hoff;
(gdb) 
2572            XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
(gdb) 
2573            XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
(gdb) 
2577                                heaptup->t_len - SizeofHeapTupleHeader);
(gdb) 
2575            XLogRegisterBufData(0,
(gdb) 
2576                                (char *) heaptup->t_data + SizeofHeapTupleHeader,
(gdb) 
2575            XLogRegisterBufData(0,
(gdb) 
2580            XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
(gdb) 
2582            recptr = XLogInsert(RM_HEAP_ID, info);
(gdb) 
2584            PageSetLSN(page, recptr);
(gdb) 
2587        END_CRIT_SECTION();
(gdb) 
2589        UnlockReleaseBuffer(buffer);
(gdb) 
2590        if (vmbuffer != InvalidBuffer)
(gdb) 
2599        CacheInvalidateHeapTuple(relation, heaptup, NULL);
(gdb) 
2602        pgstat_count_heap_insert(relation, 1);
(gdb) 
2608        if (heaptup != tup)
(gdb) 
2614        return HeapTupleGetOid(tup);
(gdb) p *tup
$15 = {t_len = 61, t_self = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 10}, t_tableOid = 26731, t_data = 0x14b19f8}
(gdb) p *(tup->t_data)
$16 = {t_choice = {t_heap = {t_xmin = 1612859, t_xmax = 0, t_field3 = {t_cid = 0, t_xvac = 0}}, t_datum = {datum_len_ = 1612859, datum_typmod = 0, datum_typeid = 0}}, t_ctid = {ip_blkid = {
      bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_infomask2 = 4, t_infomask = 2050, t_hoff = 24 '\030', t_bits = 0x14b1a0f ""}
(gdb)
(gdb) n
2615    }
(gdb) n
#done!
ExecInsert (mtstate=0x14b0c10, slot=0x14b1250, planSlot=0x14b1250, estate=0x14b08c0, canSetTag=true) at nodeModifyTable.c:534
534             if (resultRelInfo->ri_NumIndices > 0) 

四、小结

1、简单的反面是复杂:插入一行数据,涉及缓冲区管理(在PG中还需要考虑死锁)、日志处理等一系列的细节,原理/理论是简单的,但要在工程上实现得漂亮,不容易!程序猿们,加油吧!
2、NoSQL是“简单”的,RDBMS是“复杂”的:Nosql不需要考虑事务,简化了日志处理,实现逻辑相对简单;RDBMS需要考虑A/B/C/D...,权衡了各种利弊,值得深入学习

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(4)- 插入数据#3(heap_insert)

本文链接: https://lsjlt.com/news/46452.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作