返回顶部
首页 > 资讯 > 数据库 >PostgreSQL 源码解读(199)- 查询#114(排序#7 - inittapes&dumptuples)
  • 686
分享到

PostgreSQL 源码解读(199)- 查询#114(排序#7 - inittapes&dumptuples)

2024-04-02 19:04:59 686人浏览 安东尼
摘要

本节继续介绍排序的实现,主要内容

本节继续介绍排序的实现,主要内容是tuplesort_puttupleslot->puttuple_common调用的inittapes和dumptuples函数.

在内存不能满足排序需求时,使用了Polyphase Merging排序

一、数据结构

Tuplesortstate
Tuplesort操作的私有状态.




typedef enum
{
    //装载元组,在内存限制之内
    TSS_INITIAL,                
    //装载元组到有界堆中
    TSS_BOUNDED,                
    //装载元组,写入到tape中
    TSS_BUILDRUNS,              
    //完全在内存中完成排序
    TSS_SORTEDINMEM,            
    //完成排序,最后在tape上执行排序
    TSS_SORTEDONTAPE,           
    //不落地执行最后的归并
    TSS_FINALMERGE              
} TupSortStatus;

#define MINORDER        6       
#define MAXORDER        500     
#define TAPE_BUFFER_OVERHEAD        BLCKSZ
#define MERGE_BUFFER_SIZE           (BLCKSZ * 32)
typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
                                    Tuplesortstate *state);

struct Tuplesortstate
{
    //状态 : 枚举值详见上面的信息
    TupSortStatus status;       
    //sort key中的列数
    int         nKeys;          
    //调用者需要随机访问?
    bool        randoMaccess;   
    //调用者是否指定了最大返回的元组的数目?
    bool        bounded;        
    //使用有界堆,则返回T
    bool        boundUsed;      
    //如为有界堆,这里存储最大的元组个数
    int         bound;          
    //SortTuple.tuple是否可以设置?
    bool        tuples;         
    //剩余可用内存大小(单位:字节)
    int64       availMem;       
    //允许的内存总大小(单位:字节)
    int64       allowedMem;     
    //tapes个数
    int         maxTapes;       
    //tapes个数 - 1
    int         tapeRange;      
    //主要用于排序数据的内存上下文
    MemoryContext sortcontext;  
    //用于元组数据的sortcontext的子上下文
    MemoryContext tuplecontext; 
    //临时文件中tapes的logtape.c对象
    LogicalTapeSet *tapeset;    
    
    SortTupleComparator comparetup;
    
    void        (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
    
    void        (*writetup) (Tuplesortstate *state, int tapenum,
                             SortTuple *stup);
    
    void        (*readtup) (Tuplesortstate *state, SortTuple *stup,
                            int tapenum, unsigned int len);
    
    //SortTuple结构体数组
    SortTuple  *memtuples;      
    //当前存在的元组数
    int         memtupcount;    
    //memtuples数组的已分配的大小
    int         memtupsize;     
    //memtuples的增长仍在进行中?
    bool        growmemtuples;  
    
    bool        slabAllocatorUsed;
    //slab内存空间的起始位置
    char       *slabMemoryBegin;    
    //slab内存空间的结束位置
    char       *slabMemoryEnd;  
    //链表头
    SlabSlot   *slabFreeHead;   
    
    //在归并期间用于读取输入tapes的缓存大小
    size_t      read_buffer_size;
    
    void       *lastReturnedTuple;
    
    int         currentRun;
    
    
    //活跃的输入run源?
    bool       *mergeactive;    
    
    //Knuth's l
    int         Level;          
    //当前输出tape(Knuth's j)
    int         destTape;       
    //目标斐波拉契run计数(A[])
    int        *tp_fib;         
    //每一个tape上真正runs的编号
    int        *tp_runs;        
    //每一个tape(D[])上虚拟runs的编号
    int        *tp_dummy;       
    //实际的tape编号(TAPE[])
    int        *tp_tapenum;     
    //归并轮中的活动输入tapes编号
    int         activeTapes;    
    
    //已完成输出的实际tape编号
    int         result_tape;    
    //数组编号(仅用于SORTEDINMEM)
    int         current;        
    //是否到达EOF(用于游标)
    bool        eof_reached;    
    
    //markpos_xxx保持已标记的位置,用于标记和存储
    //tape block编号(只用于SORTEDONTAPE)
    long        markpos_block;  
    //存储的"current",或者tape块中的偏移
    int         markpos_offset; 
    //存储的eof_reached
    bool        markpos_eof;    
    
    int         worker;
    Sharedsort *shared;
    int         nParticipants;
    
    TupleDesc   tupDesc;
    //长度nKeys数组
    SortSupport sorTKEys;       
    
    SortSupport onlyKey;
    
    int64       abbrevNext;     
    
    //将用于依赖的索引信息
    IndexInfo  *indexInfo;      
    //解析索引表达式的运行期状态
    EState     *estate;         
    
    //数据表
    Relation    heapRel;        
    //正在创建的index
    Relation    indexRel;       
    
    //这些仅在index_btree下使用
    //如发现重复元组,则提示
    bool        enforceUnique;  
    
    //index_hash情况
    uint32      high_mask;      
    uint32      low_mask;
    uint32      max_buckets;
    
    Oid         datumType;
    
    //需要typelen用于知道如何拷贝Datums.
    int         datumTypeLen;
    
#ifdef TRACE_SORT
    PGRUsage    ru_start;
#endif
};

二、源码解读

inittapes
初始化tape sorting(Polyphase Merging).




static void
inittapes(Tuplesortstate *state, bool mergeruns)
{
    int         maxTapes,//最大tapes
                j;
    Assert(!LEADER(state));
    if (mergeruns)
    {
        
        //计算tapes数 : 归并顺序 + 1
        
        maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
    }
    else
    {
        
        //Worker进程有时可能产生单个run,不需要归并直接输出.
        Assert(WORKER(state));
        maxTapes = MINORDER + 1;
    }
#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d switching to external sort with %d tapes: %s",
             state->worker, maxTapes, pg_rusage_show(&state->ru_start));
#endif
    
    //创建tape集合并分配per-tape数据数组
    inittapestate(state, maxTapes);
    state->tapeset =
        LogicalTapeSetCreate(maxTapes, NULL,
                             state->shared ? &state->shared->fileset : NULL,
                             state->worker);
    state->currentRun = 0;
    
    for (j = 0; j < maxTapes; j++)
    {
        state->tp_fib[j] = 1;
        state->tp_runs[j] = 0;
        state->tp_dummy[j] = 1;
        state->tp_tapenum[j] = j;
    }
    state->tp_fib[state->tapeRange] = 0;
    state->tp_dummy[state->tapeRange] = 0;
    state->Level = 1;
    state->destTape = 0;
    //变更状态为TSS_BUILDRUNS
    state->status = TSS_BUILDRUNS;
}

dumptuples
清除memtuples中的元组并写入初始run到tape中




static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
    int         memtupwrite;
    int         i;
    
    if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
        !alltuples)
        return;
    
    Assert(state->status == TSS_BUILDRUNS);
    
    if (state->currentRun == INT_MAX)
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("cannot have more than %d runs for an external sort",
                        INT_MAX)));
    state->currentRun++;
#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d starting quicksort of run %d: %s",
             state->worker, state->currentRun,
             pg_rusage_show(&state->ru_start));
#endif
    
    tuplesort_sort_memtuples(state);
#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d finished quicksort of run %d: %s",
             state->worker, state->currentRun,
             pg_rusage_show(&state->ru_start));
#endif
    //写入到tape中
    memtupwrite = state->memtupcount;
    for (i = 0; i < memtupwrite; i++)
    {
        WRITETUP(state, state->tp_tapenum[state->destTape],
                 &state->memtuples[i]);
        state->memtupcount--;
    }
    
    MemoryContextReset(state->tuplecontext);
    markrunend(state, state->tp_tapenum[state->destTape]);
    state->tp_runs[state->destTape]++;
    state->tp_dummy[state->destTape]--; 
#ifdef TRACE_SORT
    if (trace_sort)
        elog(LOG, "worker %d finished writing run %d to tape %d: %s",
             state->worker, state->currentRun, state->destTape,
             pg_rusage_show(&state->ru_start));
#endif
    //未完成所有元组的处理,分配新的tape
    if (!alltuples)
        selectnewtape(state);
}

三、跟踪分析

N/A

四、参考资料

Merge sort
Polyphase merge sort
Sorting AlGorithms: Internal and External

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL 源码解读(199)- 查询#114(排序#7 - inittapes&dumptuples)

本文链接: https://lsjlt.com/news/49482.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作