返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(97)- 查询语句#79(ExecHashJoin函数#5-H...
  • 714
分享到

PostgreSQL 源码解读(97)- 查询语句#79(ExecHashJoin函数#5-H...

2024-04-02 19:04:59 714人浏览 独家记忆
摘要

本节是ExecHashJoin函数介绍的第五部分,主要介绍了ExecHashJoin中依赖的其他函数的实现逻辑,这些函数在HJ_NEED_NEW_BATCH阶段中使用,主要的函数是

本节是ExecHashJoin函数介绍的第五部分,主要介绍了ExecHashJoin中依赖的其他函数的实现逻辑,这些函数在HJ_NEED_NEW_BATCH阶段中使用,主要的函数是ExecHashJoinNewBatch。

一、数据结构

JoinState
Hash/NestLoop/Merge Join的基类


typedef struct JoinState
{
    PlanState   ps;//基类PlanState
    JoinType    jointype;//连接类型
    //在找到一个匹配inner tuple的时候,如需要跳转到下一个outer tuple,则该值为T
    bool        single_match;   
    //连接条件表达式(除了ps.qual)
    ExprState  *joinqual;       
} JoinState;

HashJoinState
Hash Join运行期状态结构体


typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;

typedef struct HashJoinState
{
    JoinState   js;             
    ExprState  *hashclauses;//hash连接条件
    List       *hj_OuterHashKeys;   
    List       *hj_InnerHashKeys;   
    List       *hj_HashOperators;   
    HashJoinTable hj_HashTable;//Hash表
    uint32      hj_CurHashValue;//当前的Hash值
    int         hj_CurBucketNo;//当前的bucket编号
    int         hj_CurSkewBucketNo;//行倾斜bucket编号
    HashJoinTuple hj_CurTuple;//当前元组
    TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
    TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
    TupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slot
    TupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slot
    TupleTableSlot *hj_FirstOuterTupleSlot;//
    int         hj_JoinState;//JoinState状态
    bool        hj_MatchedOuter;//是否匹配
    bool        hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;

HashJoinTable
Hash表数据结构

typedef struct HashJoinTableData
{
    int         nbuckets;       
    int         log2_nbuckets;  

    int         nbuckets_original;  
    int         nbuckets_optimal;   
    int         log2_nbuckets_optimal;  

    
    //bucket [i]是内存中第i个桶中的元组链表的head item
    uNIOn
    {
        
        //未共享数组是按批处理存储的,所有元组均如此
        struct HashJoinTupleData **unshared;
        
        //共享数组是每个查询的DSA区域,所有元组均如此
        dsa_pointer_atomic *shared;
    }           buckets;

    bool        keepNulls;      

    bool        skewEnabled;    
    HashSkewBucket **skewBucket;    
    int         skewBucketLen;  
    int         nSkewBuckets;   
    int        *skewBucketNums; 

    int         nbatch;         
    int         curbatch;       

    int         nbatch_original;    
    int         nbatch_outstart;    

    bool        growEnabled;    

    double      totalTuples;    
    double      partialTuples;  
    double      skewTuples;     

    
    BufFile   **innerBatchFile; 
    BufFile   **outerBatchFile; 

    
    FmgrInfo   *outer_hashfunctions;    
    FmgrInfo   *inner_hashfunctions;    
    bool       *hashStrict;     

    Size        spaceUsed;      
    Size        spaceAllowed;   
    Size        spacePeak;      
    Size        spaceUsedSkew;  
    Size        spaceAllowedSkew;   

    MemoryContext hashCxt;      
    MemoryContext batchCxt;     

    
    //用于密集分配元组(到链接块中)
    HashMemoryChunk chunks;     

    
    //并行hash使用的共享和私有状态
    HashMemoryChunk current_chunk;  
    dsa_area   *area;           
    ParallelHashJoinState *parallel_state;//并行执行状态
    ParallelHashJoinBatchAccessor *batches;//并行访问器
    dsa_pointer current_chunk_shared;//当前chunk的开始指针
} HashJoinTableData;

typedef struct HashJoinTableData *HashJoinTable;

HashJoinTupleData
Hash连接元组数据







typedef struct HashJoinTupleData
{
    
    //link同一个桶中的下一个元组
    union
    {
        struct HashJoinTupleData *unshared;
        dsa_pointer shared;
    }           next;
    uint32      hashvalue;      
    
}           HashJoinTupleData;

#define HJTUPLE_OVERHEAD  MAXALIGN(sizeof(HashJoinTupleData))
#define HJTUPLE_MINTUPLE(hjtup)  \
    ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))

二、源码解读

ExecHashJoinNewBatch
切换到新的hashjoin批次,如成功,则返回T;已完成,返回F




//参见ExecHashJoin


//参见ExecHashJoin



static bool
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
    HashJoinTable hashtable = hjstate->hj_HashTable;//Hash表
    int         nbatch;//批次数
    int         curbatch;//当前批次
    BufFile    *innerFile;//inner relation缓存文件
    TupleTableSlot *slot;//slot
    uint32      hashvalue;//hash值

    nbatch = hashtable->nbatch;
    curbatch = hashtable->curbatch;

    if (curbatch > 0)
    {
        
        if (hashtable->outerBatchFile[curbatch])
            BufFileClose(hashtable->outerBatchFile[curbatch]);
        hashtable->outerBatchFile[curbatch] = NULL;
    }
    else                        
    {
        
        hashtable->skewEnabled = false;
        hashtable->skewBucket = NULL;
        hashtable->skewBucketNums = NULL;
        hashtable->nSkewBuckets = 0;
        hashtable->spaceUsedSkew = 0;
    }

    
    curbatch++;
    while (curbatch < nbatch &&
           (hashtable->outerBatchFile[curbatch] == NULL ||
            hashtable->innerBatchFile[curbatch] == NULL))
    {
        if (hashtable->outerBatchFile[curbatch] &&
            HJ_FILL_OUTER(hjstate))
            break;              
        if (hashtable->innerBatchFile[curbatch] &&
            HJ_FILL_INNER(hjstate))
            break;              
        if (hashtable->innerBatchFile[curbatch] &&
            nbatch != hashtable->nbatch_original)
            break;              
        if (hashtable->outerBatchFile[curbatch] &&
            nbatch != hashtable->nbatch_outstart)
            break;              
        
        
        //均不符合规则1-3,则可以忽略这个批次了
        //释放临时文件
        if (hashtable->innerBatchFile[curbatch])
            BufFileClose(hashtable->innerBatchFile[curbatch]);
        hashtable->innerBatchFile[curbatch] = NULL;
        if (hashtable->outerBatchFile[curbatch])
            BufFileClose(hashtable->outerBatchFile[curbatch]);
        hashtable->outerBatchFile[curbatch] = NULL;
        curbatch++;//下一个批次
    }

    if (curbatch >= nbatch)
        return false;           

    hashtable->curbatch = curbatch;//下一个批次

    
    ExecHashTableReset(hashtable);//重置Hash表
    //inner relation文件
    innerFile = hashtable->innerBatchFile[curbatch];
    //批次文件不为NULL
    if (innerFile != NULL)
    {
        if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))//扫描innerFile,不成功,则报错
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not rewind hash-join temporary file: %m")));

        while ((slot = ExecHashJoinGetSavedTuple(hjstate,
                                                 innerFile,
                                                 &hashvalue,
                                                 hjstate->hj_HashTupleSlot)))//
        {
            
            ExecHashTableInsert(hashtable, slot, hashvalue);
        }

        
        BufFileClose(innerFile);
        hashtable->innerBatchFile[curbatch] = NULL;
    }

    
    if (hashtable->outerBatchFile[curbatch] != NULL)
    {
        if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not rewind hash-join temporary file: %m")));
    }

    return true;
}



static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                          BufFile *file,
                          uint32 *hashvalue,
                          TupleTableSlot *tupleSlot)
{
    uint32      header[2];
    size_t      nread;
    MinimalTuple tuple;

    
    CHECK_FOR_INTERRUPTS();

    
    nread = BufFileRead(file, (void *) header, sizeof(header));//读取文件
    if (nread == 0)             
    {
        //已读取完毕,返回NULL
        ExecClearTuple(tupleSlot);
        return NULL;
    }
    if (nread != sizeof(header))//读取的大小不等于header的大小,报错
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not read from hash-join temporary file: %m")));
    //hash值
    *hashvalue = header[0];
    //tuple,分配的内存大小为MinimalTuple结构体大小
    tuple = (MinimalTuple) palloc(header[1]);
    //元组大小
    tuple->t_len = header[1];
    //读取文件
    nread = BufFileRead(file,
                        (void *) ((char *) tuple + sizeof(uint32)),
                        header[1] - sizeof(uint32));
    //读取有误,报错
    if (nread != header[1] - sizeof(uint32))
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not read from hash-join temporary file: %m")));
    //存储到slot中
    ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
    return tupleSlot;//返回slot
}


 
void
ExecHashTableInsert(HashJoinTable hashtable,
                    TupleTableSlot *slot,
                    uint32 hashvalue)
{
    bool        shouldFree;//是否释放资源
    MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);//获取一个MinimalTuple
    int         bucketno;//桶号
    int         batchno;//批次号

    ExecHashGetBucketAndBatch(hashtable, hashvalue,
                              &bucketno, &batchno);//获取桶号和批次号

    
    if (batchno == hashtable->curbatch)
    {
        //批次号==hash表的批次号,放到hash表中
        
        HashJoinTuple hashTuple;//hash tuple
        int         hashTupleSize;//大小
        double      ntuples = (hashtable->totalTuples - hashtable->skewTuples);//常规元组数量

        
        //创建HashJoinTuple
        hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;//大小
        hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);//分配存储空间
        //hash值
        hashTuple->hashvalue = hashvalue;
        //拷贝数据
        memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);

        
        HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));

        
        //
        hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
        hashtable->buckets.unshared[bucketno] = hashTuple;

        
        if (hashtable->nbatch == 1 &&
            ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
        {
            //只有1个批次而且元组数大于桶数*每桶的元组数
            
            //确保整数不要溢出
            if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
                hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
            {
                hashtable->nbuckets_optimal *= 2;
                hashtable->log2_nbuckets_optimal += 1;
            }
        }

        
        //声明使用的存储空间,如果使用太多,需要回退
        hashtable->spaceUsed += hashTupleSize;
        if (hashtable->spaceUsed > hashtable->spacePeak)
            hashtable->spacePeak = hashtable->spaceUsed;//超出峰值,则跳转
        if (hashtable->spaceUsed +
            hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
            > hashtable->spaceAllowed)
            ExecHashIncreaseNumBatches(hashtable);//超出允许的空间,则增加批次
    }
    else
    {
        //不在这个批次中
        
        Assert(batchno > hashtable->curbatch);
        ExecHashJoinSaveTuple(tuple,
                              hashvalue,
                              &hashtable->innerBatchFile[batchno]);//存储tuple到临时文件中
    }

    if (shouldFree)//如需要释放空间,则处理之
        heap_free_minimal_tuple(tuple);
}

三、跟踪分析

设置work_mem为较低的值(1MB),便于手工产生批次

testdb=# set work_mem='1MB';
SET
testdb=# show work_mem;
 work_mem 
----------
 1MB
(1 row)

测试脚本如下

testdb=# set enable_nestloop=false;
SET
testdb=# set enable_mergejoin=false;
SET
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                          QUERY PLAN                                           
-----------------------------------------------------------------------------------------------
 Sort  (cost=14828.83..15078.46 rows=99850 width=47)
   Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3176.00..6537.55 rows=99850 width=47)
         Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
         Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
         ->  Hash Join  (cost=289.00..2277.61 rows=99850 width=32)
               Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
               Inner Unique: true
               Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
               ->  Seq Scan on public.t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
                     Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
               ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
                     Output: dw.dwmc, dw.dwbh, dw.dwdz
                     ->  Seq Scan on public.t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
                           Output: dw.dwmc, dw.dwbh, dw.dwdz
         ->  Hash  (cost=1637.00..1637.00 rows=100000 width=20)
               Output: jf.ny, jf.je, jf.grbh
               ->  Seq Scan on public.t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
                     Output: jf.ny, jf.je, jf.grbh
(20 rows)

启动gdb,设置断点,进入ExecHashJoinNewBatch

(gdb) b ExecHashJoinNewBatch
Breakpoint 1 at 0x7031f5: file nodeHashjoin.c, line 943.
(gdb) c
Continuing.

Breakpoint 1, ExecHashJoinNewBatch (hjstate=0x1c40738) at nodeHashjoin.c:943
943     HashJoinTable hashtable = hjstate->hj_HashTable;

获取批次数(8个批次)和当前批次(0,第一个批次)

(gdb) n
950     nbatch = hashtable->nbatch;
(gdb) 
951     curbatch = hashtable->curbatch;
(gdb) 
953     if (curbatch > 0)
(gdb) p nbatch
$5 = 8
(gdb) p curbatch
$6 = 0

curbatch ==0,刚刚完成了第一个批次,重置倾斜优化的相关状态变量

(gdb) n
971         hashtable->skewEnabled = false;
(gdb) 
972         hashtable->skewBucket = NULL;
(gdb) 
973         hashtable->skewBucketNums = NULL;
(gdb) 
974         hashtable->nSkewBuckets = 0;
(gdb) 
975         hashtable->spaceUsedSkew = 0;
(gdb) 
995     curbatch++;

外表为空或内表为空时的优化,本次调用均不为空

(gdb) n
996     while (curbatch < nbatch &&
(gdb) 
997            (hashtable->outerBatchFile[curbatch] == NULL ||
(gdb) p hashtable->outerBatchFile[curbatch]
$7 = (BufFile *) 0x1d85290
(gdb) p hashtable->outerBatchFile[curbatch]
$8 = (BufFile *) 0x1d85290

设置当前批次,重建Hash表

(gdb) 
1023        if (curbatch >= nbatch)
(gdb) 
1026        hashtable->curbatch = curbatch;
(gdb) 
1031        ExecHashTableReset(hashtable);

获取inner relation批次临时文件

(gdb) 
1033        innerFile = hashtable->innerBatchFile[curbatch];
(gdb) 
1035        if (innerFile != NULL)
(gdb) p innerFile
$9 = (BufFile *) 0x1cc0540

临时文件不为NULL,读取文件

(gdb) n
1037            if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
(gdb) 
1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,

进入函数ExecHashJoinGetSavedTuple

(gdb) step
ExecHashJoinGetSavedTuple (hjstate=0x1c40fd8, file=0x1cc0540, hashvalue=0x7ffeace60824, tupleSlot=0x1c4cc20)
    at nodeHashjoin.c:1259
1259        CHECK_FOR_INTERRUPTS();
(gdb) 

ExecHashJoinGetSavedTuple->读取头部8个字节(header,类型为void *,在64 bit的机器上,大小8个字节)

gdb) n
1266        nread = BufFileRead(file, (void *) header, sizeof(header));
(gdb) 
1267        if (nread == 0)             
(gdb) p nread
$10 = 8
(gdb) n
1272        if (nread != sizeof(header))
(gdb) 

ExecHashJoinGetSavedTuple->获取Hash值(1978434688)

(gdb) 
1276        *hashvalue = header[0];
(gdb) n
1277        tuple = (MinimalTuple) palloc(header[1]);
(gdb) p *hashvalue
$11 = 1978434688

ExecHashJoinGetSavedTuple->获取tuple&元组长度

(gdb) n
1278        tuple->t_len = header[1];
(gdb) 
1281                            header[1] - sizeof(uint32));
(gdb) p tuple->t_len
$16 = 24
(gdb) p *tuple
$17 = {t_len = 24, mt_padding = "\177\177\177\177\177\177", t_infomask2 = 32639, t_infomask = 32639, t_hoff = 127 '\177', 
  t_bits = 0x1c5202f "\177\177\177\177\177\177\177\177\177~\177\177\177\177\177\177\177"}
(gdb) 

ExecHashJoinGetSavedTuple->根据大小读取文件获取元组

(gdb) n
1279        nread = BufFileRead(file,
(gdb) 
1282        if (nread != header[1] - sizeof(uint32))
(gdb) p header[1]
$18 = 24
(gdb) p sizeof(uint32)
$19 = 4
(gdb) p *tuple
$20 = {t_len = 24, mt_padding = "\000\000\000\000\000", t_infomask2 = 3, t_infomask = 2, t_hoff = 24 '\030', 
  t_bits = 0x1c5202f ""}

ExecHashJoinGetSavedTuple->存储到slot中,完成调用

(gdb) n
1286        return ExecStoreMinimalTuple(tuple, tupleSlot, true);
(gdb) 
1287    }
(gdb) 
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1051
1051                ExecHashTableInsert(hashtable, slot, hashvalue);

插入到Hash表中

(gdb) 
1051                ExecHashTableInsert(hashtable, slot, hashvalue);

进入ExecHashTableInsert

(gdb) step
ExecHashTableInsert (hashtable=0x1c6e1c0, slot=0x1c4cc20, hashvalue=3757101760) at nodeHash.c:1593
1593        MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
(gdb) 

ExecHashTableInsert->获取批次号和hash桶号

(gdb) n
1597        ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb) 
1603        if (batchno == hashtable->curbatch)
(gdb) p batchno
$21 = 1
(gdb) p bucketno
$22 = 21184
(gdb) 
(gdb) p hashtable->curbatch
$23 = 1

ExecHashTableInsert->批次号与Hash表中的批次号一致,把元组放到Hash表中
常规元组数量=100000

(gdb) n
1610            double      ntuples = (hashtable->totalTuples - hashtable->skewTuples);
(gdb) n
1613            hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
(gdb) p ntuples
$24 = 100000

ExecHashTableInsert->创建HashJoinTuple,重置元组匹配标记

(gdb) n
1614            hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
(gdb) 
1616            hashTuple->hashvalue = hashvalue;
(gdb) 
1617            memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
(gdb) 
1625            HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
(gdb) 

ExecHashTableInsert->元组放在Hash表桶链表的前面

(gdb) n
1628            hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
(gdb) 
1629            hashtable->buckets.unshared[bucketno] = hashTuple;
(gdb) 
1636            if (hashtable->nbatch == 1 &&
(gdb) 

ExecHashTableInsert->调整或记录Hash表内存使用的峰值并返回,回到ExecHashJoinNewBatch

(gdb) 
1649            hashtable->spaceUsed += hashTupleSize;
(gdb) 
...
(gdb) 
1667    }
(gdb) n
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1042
1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,

循环插入到Hash表中

1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,
(gdb) n
1051                ExecHashTableInsert(hashtable, slot, hashvalue);
...

DONE!

四、参考资料

Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(97)- 查询语句#79(ExecHashJoin函数#5-H...

本文链接: https://lsjlt.com/news/49260.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作