本节是ExecHashJoin函数介绍的第五部分,主要介绍了ExecHashJoin中依赖的其他函数的实现逻辑,这些函数在HJ_NEED_NEW_BATCH阶段中使用,主要的函数是
本节是ExecHashJoin函数介绍的第五部分,主要介绍了ExecHashJoin中依赖的其他函数的实现逻辑,这些函数在HJ_NEED_NEW_BATCH阶段中使用,主要的函数是ExecHashJoinNewBatch。
JoinState
Hash/NestLoop/Merge Join的基类
typedef struct JoinState
{
PlanState ps;//基类PlanState
JoinType jointype;//连接类型
//在找到一个匹配inner tuple的时候,如需要跳转到下一个outer tuple,则该值为T
bool single_match;
//连接条件表达式(除了ps.qual)
ExprState *joinqual;
} JoinState;
HashJoinState
Hash Join运行期状态结构体
typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;
typedef struct HashJoinState
{
JoinState js;
ExprState *hashclauses;//hash连接条件
List *hj_OuterHashKeys;
List *hj_InnerHashKeys;
List *hj_HashOperators;
HashJoinTable hj_HashTable;//Hash表
uint32 hj_CurHashValue;//当前的Hash值
int hj_CurBucketNo;//当前的bucket编号
int hj_CurSkewBucketNo;//行倾斜bucket编号
HashJoinTuple hj_CurTuple;//当前元组
TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
TupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slot
TupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slot
TupleTableSlot *hj_FirstOuterTupleSlot;//
int hj_JoinState;//JoinState状态
bool hj_MatchedOuter;//是否匹配
bool hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;
HashJoinTable
Hash表数据结构
typedef struct HashJoinTableData
{
int nbuckets;
int log2_nbuckets;
int nbuckets_original;
int nbuckets_optimal;
int log2_nbuckets_optimal;
//bucket [i]是内存中第i个桶中的元组链表的head item
uNIOn
{
//未共享数组是按批处理存储的,所有元组均如此
struct HashJoinTupleData **unshared;
//共享数组是每个查询的DSA区域,所有元组均如此
dsa_pointer_atomic *shared;
} buckets;
bool keepNulls;
bool skewEnabled;
HashSkewBucket **skewBucket;
int skewBucketLen;
int nSkewBuckets;
int *skewBucketNums;
int nbatch;
int curbatch;
int nbatch_original;
int nbatch_outstart;
bool growEnabled;
double totalTuples;
double partialTuples;
double skewTuples;
BufFile **innerBatchFile;
BufFile **outerBatchFile;
FmgrInfo *outer_hashfunctions;
FmgrInfo *inner_hashfunctions;
bool *hashStrict;
Size spaceUsed;
Size spaceAllowed;
Size spacePeak;
Size spaceUsedSkew;
Size spaceAllowedSkew;
MemoryContext hashCxt;
MemoryContext batchCxt;
//用于密集分配元组(到链接块中)
HashMemoryChunk chunks;
//并行hash使用的共享和私有状态
HashMemoryChunk current_chunk;
dsa_area *area;
ParallelHashJoinState *parallel_state;//并行执行状态
ParallelHashJoinBatchAccessor *batches;//并行访问器
dsa_pointer current_chunk_shared;//当前chunk的开始指针
} HashJoinTableData;
typedef struct HashJoinTableData *HashJoinTable;
HashJoinTupleData
Hash连接元组数据
typedef struct HashJoinTupleData
{
//link同一个桶中的下一个元组
union
{
struct HashJoinTupleData *unshared;
dsa_pointer shared;
} next;
uint32 hashvalue;
} HashJoinTupleData;
#define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
#define HJTUPLE_MINTUPLE(hjtup) \
((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
ExecHashJoinNewBatch
切换到新的hashjoin批次,如成功,则返回T;已完成,返回F
//参见ExecHashJoin
//参见ExecHashJoin
static bool
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;//Hash表
int nbatch;//批次数
int curbatch;//当前批次
BufFile *innerFile;//inner relation缓存文件
TupleTableSlot *slot;//slot
uint32 hashvalue;//hash值
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
if (curbatch > 0)
{
if (hashtable->outerBatchFile[curbatch])
BufFileClose(hashtable->outerBatchFile[curbatch]);
hashtable->outerBatchFile[curbatch] = NULL;
}
else
{
hashtable->skewEnabled = false;
hashtable->skewBucket = NULL;
hashtable->skewBucketNums = NULL;
hashtable->nSkewBuckets = 0;
hashtable->spaceUsedSkew = 0;
}
curbatch++;
while (curbatch < nbatch &&
(hashtable->outerBatchFile[curbatch] == NULL ||
hashtable->innerBatchFile[curbatch] == NULL))
{
if (hashtable->outerBatchFile[curbatch] &&
HJ_FILL_OUTER(hjstate))
break;
if (hashtable->innerBatchFile[curbatch] &&
HJ_FILL_INNER(hjstate))
break;
if (hashtable->innerBatchFile[curbatch] &&
nbatch != hashtable->nbatch_original)
break;
if (hashtable->outerBatchFile[curbatch] &&
nbatch != hashtable->nbatch_outstart)
break;
//均不符合规则1-3,则可以忽略这个批次了
//释放临时文件
if (hashtable->innerBatchFile[curbatch])
BufFileClose(hashtable->innerBatchFile[curbatch]);
hashtable->innerBatchFile[curbatch] = NULL;
if (hashtable->outerBatchFile[curbatch])
BufFileClose(hashtable->outerBatchFile[curbatch]);
hashtable->outerBatchFile[curbatch] = NULL;
curbatch++;//下一个批次
}
if (curbatch >= nbatch)
return false;
hashtable->curbatch = curbatch;//下一个批次
ExecHashTableReset(hashtable);//重置Hash表
//inner relation文件
innerFile = hashtable->innerBatchFile[curbatch];
//批次文件不为NULL
if (innerFile != NULL)
{
if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))//扫描innerFile,不成功,则报错
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
while ((slot = ExecHashJoinGetSavedTuple(hjstate,
innerFile,
&hashvalue,
hjstate->hj_HashTupleSlot)))//
{
ExecHashTableInsert(hashtable, slot, hashvalue);
}
BufFileClose(innerFile);
hashtable->innerBatchFile[curbatch] = NULL;
}
if (hashtable->outerBatchFile[curbatch] != NULL)
{
if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
}
return true;
}
static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot)
{
uint32 header[2];
size_t nread;
MinimalTuple tuple;
CHECK_FOR_INTERRUPTS();
nread = BufFileRead(file, (void *) header, sizeof(header));//读取文件
if (nread == 0)
{
//已读取完毕,返回NULL
ExecClearTuple(tupleSlot);
return NULL;
}
if (nread != sizeof(header))//读取的大小不等于header的大小,报错
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from hash-join temporary file: %m")));
//hash值
*hashvalue = header[0];
//tuple,分配的内存大小为MinimalTuple结构体大小
tuple = (MinimalTuple) palloc(header[1]);
//元组大小
tuple->t_len = header[1];
//读取文件
nread = BufFileRead(file,
(void *) ((char *) tuple + sizeof(uint32)),
header[1] - sizeof(uint32));
//读取有误,报错
if (nread != header[1] - sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from hash-join temporary file: %m")));
//存储到slot中
ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
return tupleSlot;//返回slot
}
void
ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue)
{
bool shouldFree;//是否释放资源
MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);//获取一个MinimalTuple
int bucketno;//桶号
int batchno;//批次号
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&bucketno, &batchno);//获取桶号和批次号
if (batchno == hashtable->curbatch)
{
//批次号==hash表的批次号,放到hash表中
HashJoinTuple hashTuple;//hash tuple
int hashTupleSize;//大小
double ntuples = (hashtable->totalTuples - hashtable->skewTuples);//常规元组数量
//创建HashJoinTuple
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;//大小
hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);//分配存储空间
//hash值
hashTuple->hashvalue = hashvalue;
//拷贝数据
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
//
hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
hashtable->buckets.unshared[bucketno] = hashTuple;
if (hashtable->nbatch == 1 &&
ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
{
//只有1个批次而且元组数大于桶数*每桶的元组数
//确保整数不要溢出
if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
{
hashtable->nbuckets_optimal *= 2;
hashtable->log2_nbuckets_optimal += 1;
}
}
//声明使用的存储空间,如果使用太多,需要回退
hashtable->spaceUsed += hashTupleSize;
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;//超出峰值,则跳转
if (hashtable->spaceUsed +
hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);//超出允许的空间,则增加批次
}
else
{
//不在这个批次中
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(tuple,
hashvalue,
&hashtable->innerBatchFile[batchno]);//存储tuple到临时文件中
}
if (shouldFree)//如需要释放空间,则处理之
heap_free_minimal_tuple(tuple);
}
设置work_mem为较低的值(1MB),便于手工产生批次
testdb=# set work_mem='1MB';
SET
testdb=# show work_mem;
work_mem
----------
1MB
(1 row)
测试脚本如下
testdb=# set enable_nestloop=false;
SET
testdb=# set enable_mergejoin=false;
SET
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Sort (cost=14828.83..15078.46 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Sort Key: dw.dwbh
-> Hash Join (cost=3176.00..6537.55 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
-> Hash Join (cost=289.00..2277.61 rows=99850 width=32)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
Inner Unique: true
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Seq Scan on public.t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
-> Hash (cost=164.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Seq Scan on public.t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Hash (cost=1637.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
-> Seq Scan on public.t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
(20 rows)
启动gdb,设置断点,进入ExecHashJoinNewBatch
(gdb) b ExecHashJoinNewBatch
Breakpoint 1 at 0x7031f5: file nodeHashjoin.c, line 943.
(gdb) c
Continuing.
Breakpoint 1, ExecHashJoinNewBatch (hjstate=0x1c40738) at nodeHashjoin.c:943
943 HashJoinTable hashtable = hjstate->hj_HashTable;
获取批次数(8个批次)和当前批次(0,第一个批次)
(gdb) n
950 nbatch = hashtable->nbatch;
(gdb)
951 curbatch = hashtable->curbatch;
(gdb)
953 if (curbatch > 0)
(gdb) p nbatch
$5 = 8
(gdb) p curbatch
$6 = 0
curbatch ==0,刚刚完成了第一个批次,重置倾斜优化的相关状态变量
(gdb) n
971 hashtable->skewEnabled = false;
(gdb)
972 hashtable->skewBucket = NULL;
(gdb)
973 hashtable->skewBucketNums = NULL;
(gdb)
974 hashtable->nSkewBuckets = 0;
(gdb)
975 hashtable->spaceUsedSkew = 0;
(gdb)
995 curbatch++;
外表为空或内表为空时的优化,本次调用均不为空
(gdb) n
996 while (curbatch < nbatch &&
(gdb)
997 (hashtable->outerBatchFile[curbatch] == NULL ||
(gdb) p hashtable->outerBatchFile[curbatch]
$7 = (BufFile *) 0x1d85290
(gdb) p hashtable->outerBatchFile[curbatch]
$8 = (BufFile *) 0x1d85290
设置当前批次,重建Hash表
(gdb)
1023 if (curbatch >= nbatch)
(gdb)
1026 hashtable->curbatch = curbatch;
(gdb)
1031 ExecHashTableReset(hashtable);
获取inner relation批次临时文件
(gdb)
1033 innerFile = hashtable->innerBatchFile[curbatch];
(gdb)
1035 if (innerFile != NULL)
(gdb) p innerFile
$9 = (BufFile *) 0x1cc0540
临时文件不为NULL,读取文件
(gdb) n
1037 if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
(gdb)
1042 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
进入函数ExecHashJoinGetSavedTuple
(gdb) step
ExecHashJoinGetSavedTuple (hjstate=0x1c40fd8, file=0x1cc0540, hashvalue=0x7ffeace60824, tupleSlot=0x1c4cc20)
at nodeHashjoin.c:1259
1259 CHECK_FOR_INTERRUPTS();
(gdb)
ExecHashJoinGetSavedTuple->读取头部8个字节(header,类型为void *,在64 bit的机器上,大小8个字节)
gdb) n
1266 nread = BufFileRead(file, (void *) header, sizeof(header));
(gdb)
1267 if (nread == 0)
(gdb) p nread
$10 = 8
(gdb) n
1272 if (nread != sizeof(header))
(gdb)
ExecHashJoinGetSavedTuple->获取Hash值(1978434688)
(gdb)
1276 *hashvalue = header[0];
(gdb) n
1277 tuple = (MinimalTuple) palloc(header[1]);
(gdb) p *hashvalue
$11 = 1978434688
ExecHashJoinGetSavedTuple->获取tuple&元组长度
(gdb) n
1278 tuple->t_len = header[1];
(gdb)
1281 header[1] - sizeof(uint32));
(gdb) p tuple->t_len
$16 = 24
(gdb) p *tuple
$17 = {t_len = 24, mt_padding = "\177\177\177\177\177\177", t_infomask2 = 32639, t_infomask = 32639, t_hoff = 127 '\177',
t_bits = 0x1c5202f "\177\177\177\177\177\177\177\177\177~\177\177\177\177\177\177\177"}
(gdb)
ExecHashJoinGetSavedTuple->根据大小读取文件获取元组
(gdb) n
1279 nread = BufFileRead(file,
(gdb)
1282 if (nread != header[1] - sizeof(uint32))
(gdb) p header[1]
$18 = 24
(gdb) p sizeof(uint32)
$19 = 4
(gdb) p *tuple
$20 = {t_len = 24, mt_padding = "\000\000\000\000\000", t_infomask2 = 3, t_infomask = 2, t_hoff = 24 '\030',
t_bits = 0x1c5202f ""}
ExecHashJoinGetSavedTuple->存储到slot中,完成调用
(gdb) n
1286 return ExecStoreMinimalTuple(tuple, tupleSlot, true);
(gdb)
1287 }
(gdb)
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1051
1051 ExecHashTableInsert(hashtable, slot, hashvalue);
插入到Hash表中
(gdb)
1051 ExecHashTableInsert(hashtable, slot, hashvalue);
进入ExecHashTableInsert
(gdb) step
ExecHashTableInsert (hashtable=0x1c6e1c0, slot=0x1c4cc20, hashvalue=3757101760) at nodeHash.c:1593
1593 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
(gdb)
ExecHashTableInsert->获取批次号和hash桶号
(gdb) n
1597 ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb)
1603 if (batchno == hashtable->curbatch)
(gdb) p batchno
$21 = 1
(gdb) p bucketno
$22 = 21184
(gdb)
(gdb) p hashtable->curbatch
$23 = 1
ExecHashTableInsert->批次号与Hash表中的批次号一致,把元组放到Hash表中
常规元组数量=100000
(gdb) n
1610 double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
(gdb) n
1613 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
(gdb) p ntuples
$24 = 100000
ExecHashTableInsert->创建HashJoinTuple,重置元组匹配标记
(gdb) n
1614 hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
(gdb)
1616 hashTuple->hashvalue = hashvalue;
(gdb)
1617 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
(gdb)
1625 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
(gdb)
ExecHashTableInsert->元组放在Hash表桶链表的前面
(gdb) n
1628 hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
(gdb)
1629 hashtable->buckets.unshared[bucketno] = hashTuple;
(gdb)
1636 if (hashtable->nbatch == 1 &&
(gdb)
ExecHashTableInsert->调整或记录Hash表内存使用的峰值并返回,回到ExecHashJoinNewBatch
(gdb)
1649 hashtable->spaceUsed += hashTupleSize;
(gdb)
...
(gdb)
1667 }
(gdb) n
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1042
1042 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
循环插入到Hash表中
1042 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
(gdb) n
1051 ExecHashTableInsert(hashtable, slot, hashvalue);
...
DONE!
Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join
--结束END--
本文标题: PostgreSQL 源码解读(97)- 查询语句#79(ExecHashJoin函数#5-H...
本文链接: https://lsjlt.com/news/49260.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0