返回顶部
首页 > 资讯 > 数据库 >PostgreSQL中StartLogStreamer分析
  • 523
分享到

PostgreSQL中StartLogStreamer分析

2024-04-02 19:04:59 523人浏览 独家记忆
摘要

本篇内容主要讲解“postgresql中StartLogStreamer分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中StartLog

本篇内容主要讲解“postgresql中StartLogStreamer分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中StartLogStreamer分析”吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedef struct
{
     ////后台连接
    PGConn     *bgconn;
    //开始位置
    XLogRecPtr  startptr;
    //目录或者tar文件,依赖于使用的模式
    char        xlog[MAXPGPATH];    
    //系统标识符
    char       *sysidentifier;
    //时间线
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数


typedef struct StreamCtl
{
    //streaming的开始位置
    XLogRecPtr  startpos;       
    //时间线
    TimeLineID  timeline;       
    //系统标识符
    char       *sysidentifier;  
    //standby超时信息
    int         standby_message_timeout;    
    //是否同步(写入时是否马上Flush WAL data)
    bool        synchronous;    
    //在已归档的数据中标记segment为已完成
    bool        mark_done;      
    //刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)
    bool        do_sync;        
    //在返回T时停止streaming
    stream_stop_callback stream_stop;   
    //如有效,监测该Socket中的输入并检查stream_stop()的返回
    pgsocket    stop_socket;    
    //如何写WAL
    WalWriteMethod *walmethod;  
    //附加到部分接受文件的后缀
    char       *partial_suffix; 
    //使用的replication slot,如无则为NULL
    char       *replication_slot;   
} StreamCtl;

二、源码解读

StartLogStreamer
StartLogStreamer用于在备份时初始化后台进程用于接收WAL.接收进程将创建自己的数据库连接以并行的方式对文件进行streaming复制.


static void
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
{
    //参数
    logstreamer_param *param;
    uint32      hi,
                lo;//高位/低位
    char        statusdir[MAXPGPATH];
    param = pg_malloc0(sizeof(logstreamer_param));
    param->timeline = timeline;
    param->sysidentifier = sysidentifier;
    
    //转换开始位置(高低位转换)
    if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
    {
        fprintf(stderr,
                _("%s: could not parse write-ahead log location \"%s\"\n"),
                progname, startpos);
        exit(1);
    }
    //开始位置,转换为64bit的地址
    param->startptr = ((uint64) hi) << 32 | lo;
    
    //按segment取整
    param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
#ifndef WIN32
    //WIN32使用的代码
    
    if (pipe(bgpipe) < 0)
    {
        fprintf(stderr,
                _("%s: could not create pipe for background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
    
    //获取第二个连接
    param->bgconn = GetConnection();
    if (!param->bgconn)
        
        exit(1);
    
    //在PG 10,pg_xlog已命名为pg_wal
    snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
             basedir,
             PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
             "pg_xlog" : "pg_wal");
    
    //临时复制slots只在PG10+支持
    if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
        temp_replication_slot = false;
    
    //static char *replication_slot = NULL;
    //static bool temp_replication_slot = true;
    if (temp_replication_slot && !replication_slot)
        //创建replication slot
        replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
    if (temp_replication_slot || create_slot)
    {
        //创建replication slot
        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
                                   temp_replication_slot, true, true, false))
            exit(1);
        if (verbose)
        {
            //显示诊断信息
            if (temp_replication_slot)
                fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
                        progname, replication_slot);
            else
                fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
                        progname, replication_slot);
        }
    }
    if (fORMat == 'p')
    {
        
        snprintf(statusdir, sizeof(statusdir), "%s/%s/arcHive_status",
                 basedir,
                 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
                 "pg_xlog" : "pg_wal");
        if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
        {
            fprintf(stderr,
                    _("%s: could not create directory \"%s\": %s\n"),
                    progname, statusdir, strerror(errno));
            exit(1);
        }
    }
    
#ifndef WIN32
    //UNIX:fork进程
    bgchild = fork();
    if (bgchild == 0)
    {
        //这是子进程,返回0
        
        //启动新进程
        exit(LogStreamerMain(param));
    }
    else if (bgchild < 0)
    {
        fprintf(stderr, _("%s: could not create background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
    
    atexit(kill_bgchild_atexit);
#else                           
    //WIN32:创建线程
    bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
    if (bgchild == 0)
    {
        fprintf(stderr, _("%s: could not create background thread: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
}

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流数据时的全局参数
    in_log_streamer = true;
    //初始化StreamCtl结构体
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收数据
    if (!ReceiveXlogStream(param->bgconn, &stream))
        
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //结束连接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否内存
    pg_free(stream.walmethod);
    return 0;
}

三、跟踪分析

备份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise linux 7.6.1-110.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <Http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and Redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.
(gdb) b StartLogStreamer
Breakpoint 1 at 0x403e6b: file pg_basebackup.c, line 555.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
PassWord: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/57000060 on timeline 16
pg_basebackup: starting background WAL receiver
Breakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0/57000060", timeline=16, 
    sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555
555     param = pg_malloc0(sizeof(logstreamer_param));
(gdb)

输入参数
startpos=0x7fffffffdf60 “0/57000060”,
timeline=16,
sysidentifier=0x61f1a0 “6666964067616600474”
构造参数

(gdb) n
556     param->timeline = timeline;
(gdb) 
557     param->sysidentifier = sysidentifier;
(gdb) 
560     if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
(gdb) 
567     param->startptr = ((uint64) hi) << 32 | lo;
(gdb) p hi
$1 = 0
(gdb) p lo
$2 = 1459617888
(gdb) n
569     param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
(gdb) n
573     if (pipe(bgpipe) < 0)
(gdb) p *param
$3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' <repeats 1023 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

建立连接,创建replication slot

(gdb) n
583     param->bgconn = GetConnection();
(gdb) 
584     if (!param->bgconn)
(gdb) 
591              PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
589     snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
(gdb) 
595     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
(gdb) 
601     if (temp_replication_slot && !replication_slot)
(gdb) 
602         replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
(gdb) 
603     if (temp_replication_slot || create_slot)
(gdb) 
605         if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
(gdb) 
609         if (verbose)
(gdb) 
611             if (temp_replication_slot)
(gdb) 
612                 fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
(gdb) 
pg_basebackup: created temporary replication slot "pg_basebackup_59378"
620     if (format == 'p')
(gdb) 
(gdb) n
630                  PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
628         snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",

创建备份目录

(gdb) 
633         if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
(gdb) p *param
$4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb) n
647     bgchild = fork();
(gdb) 
#############
[xdb@localhost backup]$ ls
pg_wal

fork进程,父进程返回子进程的PID

(gdb) n
647     bgchild = fork();
(gdb) n
Detaching after fork from child process 43001.
648     if (bgchild == 0)
(gdb) p bgchild
$5 = 43001
(gdb)

子进程(PID=43001)

[xdb@localhost backup]$ ps -ef|grep 43001
xdb      43001 42820  1 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[xdb@localhost backup]$ ps -ef|grep 192.168.26.25
xdb      42820 42756  0 11:48 pts/1    00:00:00 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
xdb      43001 42820  0 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

完成调用

(gdb) n
653     else if (bgchild < 0)
(gdb) 
672 }
(gdb) 
BaseBackup () at pg_basebackup.c:1937
1937        for (i = 0; i < PQntuples(res); i++)
(gdb)

pg_wal目录中的数据

[xdb@localhost backup]$ ls -l ./pg_wal/
total 16388
-rw-------. 1 xdb xdb 16777216 Mar 18 11:54 000000100000000000000057
-rw-------. 1 xdb xdb      217 Mar 18 11:54 00000010.history
drwx------. 2 xdb xdb       35 Mar 18 11:54 archive_status
[xdb@localhost backup]$

到此,相信大家对“PostgreSQL中StartLogStreamer分析”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

您可能感兴趣的文档:

--结束END--

本文标题: PostgreSQL中StartLogStreamer分析

本文链接: https://lsjlt.com/news/64181.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • PostgreSQL中StartLogStreamer分析
    本篇内容主要讲解“PostgreSQL中StartLogStreamer分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中StartLog...
    99+
    2024-04-02
  • PostgreSQL中REDO point分析
    这篇文章主要介绍“PostgreSQL中REDO point分析”,在日常操作中,相信很多人在PostgreSQL中REDO point分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望...
    99+
    2024-04-02
  • PostgreSQL中vacuum过程分析
    本篇内容主要讲解“PostgreSQL中vacuum过程分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中vacuum过程分析”吧!一、数...
    99+
    2024-04-02
  • 分析PostgreSQL中的tuple locks
    这篇文章主要介绍“分析PostgreSQL中的tuple locks”,在日常操作中,相信很多人在分析PostgreSQL中的tuple locks问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方...
    99+
    2024-04-02
  • PostgreSQL中hash_inner_and_outer函数分析
    这篇文章主要讲解了“PostgreSQL中hash_inner_and_outer函数分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“PostgreSQL...
    99+
    2024-04-02
  • PostgreSQL中sort_inner_and_outer函数分析
    这篇文章主要讲解了“PostgreSQL中sort_inner_and_outer函数分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“PostgreSQL...
    99+
    2024-04-02
  • PostgreSQL中match_unsorted_outer函数分析
    这篇文章主要讲解了“PostgreSQL中match_unsorted_outer函数分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“PostgreSQL...
    99+
    2024-04-02
  • PostgreSQL中make_rel_from_joinlist函数分析
    这篇文章主要介绍“PostgreSQL中make_rel_from_joinlist函数分析”,在日常操作中,相信很多人在PostgreSQL中make_rel_from_joinlist函数分析问题上存在...
    99+
    2024-04-02
  • PostgreSQL中Old Master节点分析
    本篇内容介绍了“PostgreSQL中Old Master节点分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有...
    99+
    2024-04-02
  • PostgreSQL中的ProcessRepliesIfAny函数分析
    本篇内容主要讲解“PostgreSQL中的ProcessRepliesIfAny函数分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中的P...
    99+
    2024-04-02
  • PostgreSQL中vacuum主流程分析
    本篇内容介绍了“PostgreSQL中vacuum主流程分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!...
    99+
    2024-04-02
  • 分析PostgreSQL中的synchronous_commit参数
    本篇内容主要讲解“分析PostgreSQL中的synchronous_commit参数”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“分析PostgreSQL中的...
    99+
    2024-04-02
  • PostgreSQL中PlannedStmt的跟踪分析
    这篇文章主要为大家展示了“PostgreSQL中PlannedStmt的跟踪分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“PostgreSQL中Planne...
    99+
    2024-04-02
  • PostgreSQL中的Multi Version Heap Tuple分析
    这篇文章主要介绍“PostgreSQL中的Multi Version Heap Tuple分析”,在日常操作中,相信很多人在PostgreSQL中的Multi Version Heap Tuple分析问题上...
    99+
    2024-04-02
  • 分析PostgreSQL中的distinct和group by
    本篇内容介绍了“分析PostgreSQL中的distinct和group by”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅...
    99+
    2024-04-02
  • 分析PostgreSQL CreateFunction中的interpret_function_parameter_list函数
    这篇文章主要介绍“分析PostgreSQL CreateFunction中的interpret_function_parameter_list函数”,在日常操作中,相信很多人在分析PostgreSQL Cr...
    99+
    2024-04-02
  • 分析PostgreSQL CreateFunction中的ProcedureCreate函数
    本篇内容介绍了“分析PostgreSQL CreateFunction中的ProcedureCreate函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何...
    99+
    2024-04-02
  • 分析PostgreSQL中的Prepare Transaction特性
    本篇内容介绍了“分析PostgreSQL中的Prepare Transaction特性”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大...
    99+
    2024-04-02
  • 分析PostgreSQL中的大表连接
    这篇文章主要介绍“分析PostgreSQL中的大表连接”,在日常操作中,相信很多人在分析PostgreSQL中的大表连接问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”分析Po...
    99+
    2024-04-02
  • PostgreSQL中WAL文件结构分析
    本篇内容介绍了“PostgreSQL中WAL文件结构分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!WA...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作