返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(90)- 查询语句#75(ExecHashJoin函数#1)
  • 784
分享到

PostgreSQL 源码解读(90)- 查询语句#75(ExecHashJoin函数#1)

2024-04-02 19:04:59 784人浏览 安东尼
摘要

本节介绍了ExecProcnode的其中一个Real函数(ExecHashJoin)。ExecHashJoin函数实现了Hash Join算法。 一、数据结构 Plan 所有计划节

本节介绍了ExecProcnode的其中一个Real函数(ExecHashJoin)。ExecHashJoin函数实现了Hash Join算法

一、数据结构

Plan
所有计划节点通过将Plan结构作为第一个字段从Plan结构“派生”。这确保了在将节点转换为计划节点时,一切都能正常工作。(在执行器中以通用方式传递时,节点指针经常被转换为Plan *)


typedef struct Plan
{
    NodeTag     type;//节点类型

    
    Cost        startup_cost;   
    Cost        total_cost;     

    
    double      plan_rows;      
    int         plan_width;     

    
    bool        parallel_aware; 
    bool        parallel_safe;  

    
    int         plan_node_id;   
    List       *targetlist;     
    List       *qual;           
    struct Plan *lefttree;      
    struct Plan *righttree;
    List       *initPlan;       

    
    Bitmapset  *extParam;
    Bitmapset  *allParam;
} Plan;

JoinState
Hash/NestLoop/Merge Join的基类


typedef struct JoinState
{
    PlanState   ps;//基类PlanState
    JoinType    jointype;//连接类型
    //在找到一个匹配inner tuple的时候,如需要跳转到下一个outer tuple,则该值为T
    bool        single_match;   
    //连接条件表达式(除了ps.qual)
    ExprState  *joinqual;       
} JoinState;

HashJoinState
Hash Join运行期状态结构体


typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;

typedef struct HashJoinState
{
    JoinState   js;             
    ExprState  *hashclauses;//hash连接条件
    List       *hj_OuterHashKeys;   
    List       *hj_InnerHashKeys;   
    List       *hj_HashOperators;   
    HashJoinTable hj_HashTable;//Hash表
    uint32      hj_CurHashValue;//当前的Hash值
    int         hj_CurBucketNo;//当前的bucket编号
    int         hj_CurSkewBucketNo;//行倾斜bucket编号
    HashJoinTuple hj_CurTuple;//当前元组
    TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
    TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
    TupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slot
    TupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slot
    TupleTableSlot *hj_FirstOuterTupleSlot;//
    int         hj_JoinState;//JoinState状态
    bool        hj_MatchedOuter;//是否匹配
    bool        hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;

二、源码解读

ExecHashJoin函数实现了Hash Join算法,实际实现的函数是ExecHashJoinImpl.
ExecHashJoinImpl函数把Hash Join划分为多个阶段/状态(有限状态机),保存在HashJoinState->hj_JoinState字段中,这些状态分别是分别为HJ_BUILD_HASHTABLE/HJ_NEED_NEW_OUTER/HJ_SCAN_BUCKET/HJ_FILL_OUTER_TUPLE/HJ_FILL_INNER_TUPLES/HJ_NEED_NEW_BATCH.
HJ_BUILD_HASHTABLE:创建Hash表;
HJ_NEED_NEW_OUTER:扫描outer relation,计算外表连接键的hash值,把相匹配元组放在合适的bucket中;
HJ_SCAN_BUCKET:扫描bucket,匹配的tuple返回
HJ_FILL_OUTER_TUPLE:当前outer relation元组已耗尽,因此检查是否发出一个虚拟的外连接元组。
HJ_FILL_INNER_TUPLES:已完成一个批处理,但做的是右外连接/完全连接,填充虚拟连接元组
HJ_NEED_NEW_BATCH:开启下一批次
注意:在work_mem不足以装下Hash Table时,分批执行.每个批次执行时,会把outer relation与inner relation匹配(指hash值一样)的tuple会存储起来,放在合适的批次文件中(hashtable->outerBatchFile[batchno]),以避免多次的outer relation扫描.


#define HJ_FILL_INNER(hjstate)  ((hjstate)->hj_NullOuterTupleSlot != NULL)


static TupleTableSlot *         
ExecHashJoin(PlanState *pstate)
{
    
    return ExecHashJoinImpl(pstate, false);
}


#define HJ_BUILD_HASHTABLE      1
#define HJ_NEED_NEW_OUTER       2
#define HJ_SCAN_BUCKET          3
#define HJ_FILL_OUTER_TUPLE     4
#define HJ_FILL_INNER_TUPLES    5
#define HJ_NEED_NEW_BATCH       6


#define HJ_FILL_OUTER(hjstate)  ((hjstate)->hj_NullInnerTupleSlot != NULL)

#define HJ_FILL_INNER(hjstate)  ((hjstate)->hj_NullOuterTupleSlot != NULL)

static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
                          HashJoinState *hjstate,
                          uint32 *hashvalue);
static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
                                  HashJoinState *hjstate,
                                  uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                          BufFile *file,
                          uint32 *hashvalue,
                          TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);


static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
    HashJoinState *node = castNode(HashJoinState, pstate);
    PlanState  *outerNode;
    HashState  *hashNode;
    ExprState  *joinqual;
    ExprState  *otherqual;
    ExprContext *econtext;
    HashJoinTable hashtable;
    TupleTableSlot *outerTupleSlot;
    uint32      hashvalue;
    int         batchno;
    ParallelHashJoinState *parallel_state;

    
    joinqual = node->js.joinqual;
    otherqual = node->js.ps.qual;
    hashNode = (HashState *) innerPlanState(node);
    outerNode = outerPlanState(node);
    hashtable = node->hj_HashTable;
    econtext = node->js.ps.ps_ExprContext;
    parallel_state = hashNode->parallel_state;

    
    ResetExprContext(econtext);

    
    for (;;)
    {
        
        CHECK_FOR_INTERRUPTS();

        switch (node->hj_JoinState)
        {
            case HJ_BUILD_HASHTABLE://-->HJ_BUILD_HASHTABLE阶段

                
                Assert(hashtable == NULL);

                
                if (HJ_FILL_INNER(node))
                {
                    
                    //不构建哈希表是不可能的了
                    node->hj_FirstOuterTupleSlot = NULL;
                }
                else if (parallel)
                {
                    
                    node->hj_FirstOuterTupleSlot = NULL;
                }
                else if (HJ_FILL_OUTER(node) ||
                         (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
                          !node->hj_OuterNotEmpty))
                {
                    node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
                    if (TupIsNull(node->hj_FirstOuterTupleSlot))
                    {
                        node->hj_OuterNotEmpty = false;
                        return NULL;
                    }
                    else
                        node->hj_OuterNotEmpty = true;
                }
                else
                    node->hj_FirstOuterTupleSlot = NULL;

                
                hashtable = ExecHashTableCreate(hashNode,
                                                node->hj_HashOperators,
                                                HJ_FILL_INNER(node));
                node->hj_HashTable = hashtable;

                
                hashNode->hashtable = hashtable;
                (void) MultiExecProcNode((PlanState *) hashNode);

                
                if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
                    return NULL;

                
                hashtable->nbatch_outstart = hashtable->nbatch;

                
                node->hj_OuterNotEmpty = false;//重置OuterNotEmpty为F

                if (parallel)
                {
                    //启用并行
                    Barrier    *build_barrier;

                    build_barrier = &parallel_state->build_barrier;
                    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
                           BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
                    if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
                    {
                        
                        if (hashtable->nbatch > 1)
                            ExecParallelHashJoinPartitionOuter(node);
                        BarrierArriveAndWait(build_barrier,
                                             WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
                    }
                    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);

                    
                    //每一个后台worker需选择批次
                    hashtable->curbatch = -1;
                    node->hj_JoinState = HJ_NEED_NEW_BATCH;

                    continue;//下一循环
                }
                else
                    //非并行执行,设置hj_JoinState状态
                    node->hj_JoinState = HJ_NEED_NEW_OUTER;

                

            case HJ_NEED_NEW_OUTER://-->HJ_NEED_NEW_OUTER阶段

                
                if (parallel)
                    outerTupleSlot =
                        ExecParallelHashJoinOuterGetTuple(outerNode, node,
                                                          &hashvalue);//并行执行
                else
                    outerTupleSlot =
                        ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);//普通执行

                if (TupIsNull(outerTupleSlot))
                {
                    //如outerTupleSlot为NULL
                    
                    //完成此批数据处理,或者可能是全连接
                    if (HJ_FILL_INNER(node))//hj_NullOuterTupleSlot != NULL
                    {
                        
                        //不匹配的行,填充NULL(外连接)
                        ExecPrepHashTableForUnmatched(node);
                        node->hj_JoinState = HJ_FILL_INNER_TUPLES;
                    }
                    else
                        node->hj_JoinState = HJ_NEED_NEW_BATCH;//需要下一个批次
                    continue;
                }
                //设置变量
                econtext->ecxt_outertuple = outerTupleSlot;
                node->hj_MatchedOuter = false;

                
                node->hj_CurHashValue = hashvalue;
                //获取Hash Bucket并处理此批次
                ExecHashGetBucketAndBatch(hashtable, hashvalue,
                                          &node->hj_CurBucketNo, &batchno);
                //Hash倾斜优化(某个值的数据特别多)
                node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
                                                                 hashvalue);
                node->hj_CurTuple = NULL;

                
                if (batchno != hashtable->curbatch &&
                    node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
                {
                    
                    Assert(parallel_state == NULL);
                    Assert(batchno > hashtable->curbatch);
                    ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
                                          hashvalue,
                                          &hashtable->outerBatchFile[batchno]);

                    
                    //循环,保持HJ_NEED_NEW_OUTER状态
                    continue;
                }

                
                //已完成此阶段,切换至HJ_SCAN_BUCKET状态
                node->hj_JoinState = HJ_SCAN_BUCKET;

                

            case HJ_SCAN_BUCKET://-->HJ_SCAN_BUCKET阶段

                
                if (parallel)
                {
                    //并行处理
                    if (!ExecParallelScanHashBucket(node, econtext))
                    {
                        
                        // 无法匹配,检查可能的外连接填充,状态切换为HJ_FILL_OUTER_TUPLE
                        node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
                        continue;
                    }
                }
                else
                {
                    //非并行执行
                    if (!ExecScanHashBucket(node, econtext))
                    {
                        
                        node->hj_JoinState = HJ_FILL_OUTER_TUPLE;//同上
                        continue;
                    }
                }

                
                if (joinqual == NULL || ExecQual(joinqual, econtext))
                {
                    node->hj_MatchedOuter = true;
                    HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));

                    
                    //反连接,则不能返回匹配的元组
                    if (node->js.jointype == JOIN_ANTI)
                    {
                        node->hj_JoinState = HJ_NEED_NEW_OUTER;
                        continue;
                    }

                    
                    if (node->js.single_match)
                        node->hj_JoinState = HJ_NEED_NEW_OUTER;

                    if (otherqual == NULL || ExecQual(otherqual, econtext))
                        return ExecProject(node->js.ps.ps_ProjInfo);//执行投影操作
                    else
                        InstrCountFiltered2(node, 1);//其他条件不匹配
                }
                else
                    InstrCountFiltered1(node, 1);//连接条件不匹配
                break;

            case HJ_FILL_OUTER_TUPLE://-->HJ_FILL_OUTER_TUPLE阶段

                
                node->hj_JoinState = HJ_NEED_NEW_OUTER;//切换状态为HJ_NEED_NEW_OUTER

                if (!node->hj_MatchedOuter &&
                    HJ_FILL_OUTER(node))
                {
                    
                    econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;

                    if (otherqual == NULL || ExecQual(otherqual, econtext))
                        return ExecProject(node->js.ps.ps_ProjInfo);//投影操作
                    else
                        InstrCountFiltered2(node, 1);
                }
                break;

            case HJ_FILL_INNER_TUPLES://-->HJ_FILL_INNER_TUPLES阶段

                
                if (!ExecScanHashTableForUnmatched(node, econtext))
                {
                    
                    //不存在更多不匹配的元组,切换状态为HJ_NEED_NEW_BATCH(开始下一批次)
                    node->hj_JoinState = HJ_NEED_NEW_BATCH;
                    continue;
                }

                
                econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;

                if (otherqual == NULL || ExecQual(otherqual, econtext))
                    return ExecProject(node->js.ps.ps_ProjInfo);
                else
                    InstrCountFiltered2(node, 1);
                break;

            case HJ_NEED_NEW_BATCH://-->HJ_NEED_NEW_BATCH阶段

                
                if (parallel)
                {
                    //并行处理
                    if (!ExecParallelHashJoinNewBatch(node))
                        return NULL;    
                }
                else
                {
                    //非并行处理
                    if (!ExecHashJoinNewBatch(node))
                        return NULL;    
                }
                node->hj_JoinState = HJ_NEED_NEW_OUTER;//切换状态
                break;

            default://非法的JoinState
                elog(ERROR, "unrecognized hashjoin state: %d",
                     (int) node->hj_JoinState);
        }
    }
}

三、跟踪分析

测试脚本如下

testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                          QUERY PLAN                                           
-----------------------------------------------------------------------------------------------
 Sort  (cost=14828.83..15078.46 rows=99850 width=47)
   Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3176.00..6537.55 rows=99850 width=47)
         Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
         Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
         ->  Hash Join  (cost=289.00..2277.61 rows=99850 width=32)
               Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
               Inner Unique: true
               Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
               ->  Seq Scan on public.t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
                     Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
               ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
                     Output: dw.dwmc, dw.dwbh, dw.dwdz
                     ->  Seq Scan on public.t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
                           Output: dw.dwmc, dw.dwbh, dw.dwdz
         ->  Hash  (cost=1637.00..1637.00 rows=100000 width=20)
               Output: jf.ny, jf.je, jf.grbh
               ->  Seq Scan on public.t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
                     Output: jf.ny, jf.je, jf.grbh
(20 rows)

启动gdb,设置断点,进入ExecHashJoin

(gdb) b ExecHashJoin
Breakpoint 1 at 0x70292e: file nodeHashjoin.c, line 565.
(gdb) c
Continuing.

Breakpoint 1, ExecHashJoin (pstate=0x2ee1a88) at nodeHashjoin.c:565
565     return ExecHashJoinImpl(pstate, false);

继续执行,进入第2个Hash Join,即t_grxx & t_dwxx的连接

(gdb) n

Breakpoint 1, ExecHashJoin (pstate=0x2ee1d98) at nodeHashjoin.c:565
565     return ExecHashJoinImpl(pstate, false);

查看输入参数,ExecProcNode=ExecProcNodeReal=ExecHashJoin

(gdb) p *pstate
$8 = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>, 
  ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
  qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
  ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0}
(gdb) 

pstate的lefttree对应的是SeqScan,righttree对应的是Hash,即左树(outer relation)为t_grxx的顺序扫描运算生成的relation,右树(inner relation)为t_dwxx的顺序扫描运算生成的relation(在此relation上创建Hash Table)

(gdb) p *pstate->lefttree
$6 = {type = T_SeqScanState, plan = 0x2fa8ff0, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>, 
  ExecProcNodeReal = 0x71578d <ExecSeqScan>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
  qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
  ps_ResultTupleSlot = 0x2ee27d8, ps_ExprContext = 0x2ee2188, ps_ProjInfo = 0x0, scandesc = 0x7f0710d02bd0}
(gdb) p *pstate->righttree
$9 = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>, 
  ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
  qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
  ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}

进入ExecHashJoinImpl函数

(gdb) step
ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:167
167     HashJoinState *node = castNode(HashJoinState, pstate);

赋值,查看HashJoinState等变量值

(gdb) n
182     joinqual = node->js.joinqual;
(gdb) n
183     otherqual = node->js.ps.qual;
(gdb) 
184     hashNode = (HashState *) innerPlanState(node);
(gdb) 
185     outerNode = outerPlanState(node);
(gdb) 
186     hashtable = node->hj_HashTable;
(gdb) 
187     econtext = node->js.ps.ps_ExprContext;
(gdb) 
188     parallel_state = hashNode->parallel_state;
(gdb) 
194     ResetExprContext(econtext);
(gdb) p *node
$10 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>, 
      ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
      qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
      ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0}, 
    jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230, 
  hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x0, hj_CurHashValue = 0, hj_CurBucketNo = 0, 
  hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0, hj_HashTupleSlot = 0x2ee3278, 
  hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0, hj_JoinState = 1, 
  hj_MatchedOuter = false, hj_OuterNotEmpty = false}
(gdb) p *otherqual
Cannot access memory at address 0x0
(gdb) p *hashNode
$11 = {ps = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>, 
    ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
    qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
    ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}, hashtable = 0x0, 
  hashkeys = 0x2f22740, shared_info = 0x0, hinstrument = 0x0, parallel_state = 0x0}
(gdb) p *hashtable
Cannot access memory at address 0x0
(gdb) p parallel_state
$12 = (ParallelHashJoinState *) 0x0
(gdb)   

进入HJ_BUILD_HASHTABLE处理逻辑,创建Hash表

(gdb) p node->hj_JoinState
$13 = 1

HJ_BUILD_HASHTABLE->执行相关判断,本例为内连接,因此不存在FILL_OUTER等情况

(gdb) n
216                 Assert(hashtable == NULL);
(gdb) 
241                 if (HJ_FILL_INNER(node))
(gdb) 
246                 else if (parallel)
(gdb) 
258                 else if (HJ_FILL_OUTER(node) ||
(gdb) 
259                          (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb) 

HJ_BUILD_HASHTABLE->outer node的启动成本低于创建Hash表的总成本而且outer relation为空(初始化node->hj_OuterNotEmpty为false),那么尝试获取outer relation的第一个元组,如为NULL,则可快速返回NULL,否则设置node->hj_OuterNotEmpty标记为T

258                 else if (HJ_FILL_OUTER(node) ||
(gdb) 
260                           !node->hj_OuterNotEmpty))
(gdb) 
259                          (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb) 
262                     node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
(gdb) 
263                     if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb) 
269                         node->hj_OuterNotEmpty = true;

HJ_BUILD_HASHTABLE->创建Hash Table

(gdb) n
263                     if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb) 
281                                                 HJ_FILL_INNER(node));
(gdb) 
279                 hashtable = ExecHashTableCreate(hashNode,
(gdb) 

HJ_BUILD_HASHTABLE->Hash Table(HashJoinTable结构体)的内存结构
bucket数量为16384(16K),取对数结果为14(即log2_nbuckets/log2_nbuckets_optimal的结果值)
skewEnabled为F,没有启用倾斜优化

(gdb) p *hashtable
$14 = {nbuckets = 16384, log2_nbuckets = 14, nbuckets_original = 16384, nbuckets_optimal = 16384, 
  log2_nbuckets_optimal = 14, buckets = {unshared = 0x2fb1260, shared = 0x2fb1260}, keepNulls = false, skewEnabled = false, 
  skewBucket = 0x0, skewBucketLen = 0, nSkewBuckets = 0, skewBucketNums = 0x0, nbatch = 1, curbatch = 0, 
  nbatch_original = 1, nbatch_outstart = 1, growEnabled = true, totalTuples = 0, partialTuples = 0, skewTuples = 0, 
  innerBatchFile = 0x0, outerBatchFile = 0x0, outer_hashfunctions = 0x3053b68, inner_hashfunctions = 0x3053bc0, 
  hashStrict = 0x3053c18, spaceUsed = 0, spaceAllowed = 16777216, spacePeak = 0, spaceUsedSkew = 0, 
  spaceAllowedSkew = 335544, hashCxt = 0x3053a50, batchCxt = 0x2f8b170, chunks = 0x0, current_chunk = 0x0, area = 0x0, 
  parallel_state = 0x0, batches = 0x0, current_chunk_shared = 9187201950435737471}

HJ_BUILD_HASHTABLE->使用的Hash函数

(gdb) p *hashtable->inner_hashfunctions
$15 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', 
  fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}
(gdb) p *hashtable->outer_hashfunctions
$16 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', 
  fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}

HJ_BUILD_HASHTABLE->赋值,并执行此Hash Node节点,结果总元组数为10000

(gdb) n
289                 hashNode->hashtable = hashtable;
(gdb) 
290                 (void) MultiExecProcNode((PlanState *) hashNode);
(gdb) 
297                 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
(gdb) p hashtable->totalTuples 
$18 = 10000

HJ_BUILD_HASHTABLE->批次数为1,只需要执行1个批次即可

(gdb) n
304                 hashtable->nbatch_outstart = hashtable->nbatch;
(gdb) p hashtable->nbatch
$19 = 1

HJ_BUILD_HASHTABLE->重置OuterNotEmpty为F

(gdb) n
311                 node->hj_OuterNotEmpty = false;
(gdb) 
313                 if (parallel)

HJ_BUILD_HASHTABLE->非并行执行,切换状态为HJ_NEED_NEW_OUTER

(gdb) 
313                 if (parallel)
(gdb) n
340                     node->hj_JoinState = HJ_NEED_NEW_OUTER;

HJ_NEED_NEW_OUTER->获取(执行ExecHashJoinOuterGetTuple)下一个outer relation的一个元组

349                 if (parallel)
(gdb) n
354                     outerTupleSlot =
(gdb) 
357                 if (TupIsNull(outerTupleSlot))
(gdb) p *outerTupleSlot
$20 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = true, 
  tts_tuple = 0x2f88300, tts_tupleDescriptor = 0x7f0710d02bd0, tts_mcxt = 0x2ee1640, tts_buffer = 507, tts_nvalid = 1, 
  tts_values = 0x2ee22a8, tts_isnull = 0x2ee22d0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 2, tts_fixedTupleDescriptor = true}

HJ_NEED_NEW_OUTER->设置相关变量

(gdb) n
371                 econtext->ecxt_outertuple = outerTupleSlot;
(gdb) 
372                 node->hj_MatchedOuter = false;
(gdb) 
378                 node->hj_CurHashValue = hashvalue;
(gdb) 
379                 ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb) p hashvalue
$21 = 2324234220
(gdb) n
381                 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
(gdb) 
383                 node->hj_CurTuple = NULL;
(gdb) p *node
$22 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>, 
      ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, 
      qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, 
      ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0}, 
    jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230, 
  hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x2f88ee8, hj_CurHashValue = 2324234220, 
  hj_CurBucketNo = 16364, hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0, 
  hj_HashTupleSlot = 0x2ee3278, hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0, 
  hj_JoinState = 2, hj_MatchedOuter = false, hj_OuterNotEmpty = true}
(gdb) p *econtext
$25 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x2ee2248, 
  ecxt_per_query_memory = 0x2ee1640, ecxt_per_tuple_memory = 0x2f710c0, ecxt_param_exec_vals = 0x0, 
  ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true, 
  domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x2ee1758, ecxt_callbacks = 0x0}
(gdb) p *node->hj_HashTupleSlot
$26 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, 
  tts_tuple = 0x0, tts_tupleDescriptor = 0x2ee3060, tts_mcxt = 0x2ee1640, tts_buffer = 0, tts_nvalid = 0, 
  tts_values = 0x2ee32d8, tts_isnull = 0x2ee32f0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}  

HJ_NEED_NEW_OUTER->切换状态为HJ_SCAN_BUCKET,开始扫描Hash Table

(gdb) n
407                 node->hj_JoinState = HJ_SCAN_BUCKET;
(gdb) 

HJ_SCAN_BUCKET->不匹配,切换状态为HJ_FILL_OUTER_TUPLE

(gdb) 
416                 if (parallel)
(gdb) n
427                     if (!ExecScanHashBucket(node, econtext))
(gdb) 
430                         node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
(gdb) 
431                         continue;
(gdb) 

HJ_FILL_OUTER_TUPLE->切换状态为HJ_NEED_NEW_OUTER
不管是否获得/发出一个元组,下一个状态是NEED_NEW_OUTER

209         switch (node->hj_JoinState)
(gdb) 
483                 node->hj_JoinState = HJ_NEED_NEW_OUTER;

HJ_FILL_OUTER_TUPLE->由于不是外连接,无需FILL,回到HJ_NEED_NEW_OUTER处理逻辑

(gdb) n
485                 if (!node->hj_MatchedOuter &&
(gdb) 
486                     HJ_FILL_OUTER(node))
(gdb) 
485                 if (!node->hj_MatchedOuter &&
(gdb) 
549     }
(gdb) 

HJ_SCAN_BUCKET->在SCAN_BUCKET成功扫描的位置设置断点

(gdb) b nodeHashjoin.c:441
Breakpoint 3 at 0x7025c3: file nodeHashjoin.c, line 441.
(gdb) c
Continuing.
Breakpoint 3, ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:447
447                 if (joinqual == NULL || ExecQual(joinqual, econtext))

HJ_SCAN_BUCKET->存在匹配的元组,设置相关标记

(gdb) n
449                     node->hj_MatchedOuter = true;
(gdb) 
450                     HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
(gdb) 
453                     if (node->js.jointype == JOIN_ANTI)
(gdb) n
464                     if (node->js.single_match)
(gdb) 
465                         node->hj_JoinState = HJ_NEED_NEW_OUTER;
(gdb) 

HJ_SCAN_BUCKET->执行投影操作并返回

467                     if (otherqual == NULL || ExecQual(otherqual, econtext))
(gdb) 
468                         return ExecProject(node->js.ps.ps_ProjInfo);
(gdb) 

总的来说,Hash Join的实现是创建inner relation的Hash Table,然后获取outer relation的元组,如匹配则执行投影操作返回相应的元组,除了创建HT外,其他步骤不断的变换状态执行,直至满足Portal要求的元组数量为止.

四、参考资料

Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(90)- 查询语句#75(ExecHashJoin函数#1)

本文链接: https://lsjlt.com/news/48548.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作