本篇内容主要讲解“postgresql中PostgresMain函数的实现逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中Post
本篇内容主要讲解“postgresql中PostgresMain函数的实现逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Postgresql中PostgresMain函数的实现逻辑是什么”吧!
PostgresMain函数
void
PostgresMain(int arGC, char *argv[],
const char *dbname,
const char *username)
{
int firstchar;//临时变量,读取输入的Command
StringInfoData input_message;//字符串增强结构体
sigjmp_buf local_sigjmp_buf;//系统变量
volatile bool send_ready_for_query = true;//
bool disable_idle_in_transaction_timeout = false;
if (!IsUnderPostmaster//未初始化?initialized for the bootstrap/standalone case
InitStandaloneProcess(argv[0]);//初始化进程
SetProcessingMode(InitProcessing);//设置进程状态为InitProcessing
if (!IsUnderPostmaster)
InitializeGUCOptions();//初始化GUC参数,GUC=Grand Unified Configuration
process_postgres_switches(argc, argv, PGC_POSTMASTER, &dbname);//解析输入参数
if (dbname == NULL)//输入的dbname为空
{
dbname = username;//设置为用户名
if (dbname == NULL)//如仍为空,报错
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s: no database nor user name specified",
progname)));
}
if (!IsUnderPostmaster)
{
if (!SelectConfigFiles(userDoption, progname))//读取配置文件conf/hba文件&定位数据目录
proc_exit(1);
}
if (am_walsender)//wal sender进程?
WalSndSignals();//如果是,则调用WalSndSignals
else//不是wal sender进程
{
pqsignal(SIGHUP, PostgresSigHupHandler);
pqsignal(SIGINT, StatementCancelHandler);
pqsignal(SIGTERM, die);
if (IsUnderPostmaster)
pqsignal(SIGQUIT, quickdie);
else
pqsignal(SIGQUIT, die);
InitializeTimeouts();
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGCHLD, SIG_DFL);
}
pqinitmask();//Initialize BlockSig, UnBlockSig, and StartupBlockSig.
if (IsUnderPostmaster)
{
sigdelset(&BlockSig, SIGQUIT);
}
PG_SETMASK(&BlockSig);
if (!IsUnderPostmaster)
{
checkDataDir();//确认数据库路径OK,使用stat命令
ChangeToDataDir();//切换至数据库路径,使用chdir命令
CreateDataDirLockFile(false);//创建锁定文件,CreateLockFile(DIRECTORY_LOCK_FILE, amPostmaster, "", true, DataDir);
LocalProcessControlFile(false);//Read the control file, set respective GUCs.
InitializeMaxBackends();//Initialize MaxBackends value from config options.
}
BaseInit();//基础的初始化
// initialize a per-process data structure for this backend
#ifdef EXEC_BACKEND
if (!IsUnderPostmaster)
InitProcess();
#else
InitProcess();
#endif
PG_SETMASK(&UnBlockSig);
InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);//Initialize POSTGRES
if (PostmasterContext)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
}
SetProcessingMode(NORMalProcessing);//完成初始化后,设置进程模式为NormalProcessing
BeginReportingGUCOptions();//Report GUC
if (IsUnderPostmaster && Log_disconnections)
on_proc_exit(log_disconnections, 0);//this function adds a callback function to the list of functions invoked by proc_exit()
if (am_walsender)
InitWalSender();//初始化 WAL sender process
process_session_preload_libraries();//加载LIB
if (whereToSendOutput == DestRemote)
{
StringInfoData buf;
pq_beginmessage(&buf, 'K');
pq_sendint32(&buf, (int32) MyProcPid);
pq_sendint32(&buf, (int32) MyCancelkey);
pq_endmessage(&buf);
}
if (whereToSendOutput == DestDebug)
printf("\nPostgreSQL stand-alone backend %s\n", PG_VERSION);
//初始化内存上下文:MessageContext
MessageContext = AllocSetContextCreate(TopMemoryContext,
"MessageContext",
ALLOCSET_DEFAULT_SIZES);
//TODO 传输RowDescription messages?
row_description_context = AllocSetContextCreate(TopMemoryContext,
"RowDescriptionContext",
ALLOCSET_DEFAULT_SIZES);
MemoryContextSwitchTo(row_description_context);
initStringInfo(&row_description_buf);
MemoryContextSwitchTo(TopMemoryContext);
if (!IsUnderPostmaster)
PgStartTime = GetCurrentTimestamp();//记录启动时间
if (sigsetjmp(local_sigjmp_buf, 1) != 0)//
{
error_context_stack = NULL;
HOLD_INTERRUPTS();
disable_all_timeouts(false);
QueryCancelPending = false;
stmt_timeout_active = false;
DoingCommandRead = false;
pq_comm_reset();
EmitErrorReport();
debug_query_string = NULL;
AbortCurrentTransaction();
if (am_walsender)
WalSndErrorCleanup();
PortalErrorCleanup();
SPICleanup();
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
ReplicationSlotCleanup();
jit_reset_after_error();
MemoryContextSwitchTo(TopMemoryContext);
FlushErrorState();
if (doing_extended_query_message)
ignore_till_sync = true;
xact_started = false;
if (pq_is_reading_msg())
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("terminating connection because protocol synchronization was lost")));
RESUME_INTERRUPTS();
}
PG_exception_stack = &local_sigjmp_buf;
if (!ignore_till_sync)
send_ready_for_query = true;
for (;;)//主循环
{
doing_extended_query_message = false;
MemoryContextSwitchTo(MessageContext);//切换至MessageContext
MemoryContextResetAndDeleteChildren(MessageContext);
initStringInfo(&input_message);//初始化输入的信息
InvalidateCatalogSnapshotConditionally();
if (send_ready_for_query)//I am ready!
{
if (IsAbortedTransactionBlockState())
{
set_ps_display("idle in transaction (aborted)", false);
pgstat_report_activity(STATE_IDLEINTRANSACTION_ABORTED, NULL);
if (IdleInTransactionSessionTimeout > 0)
{
disable_idle_in_transaction_timeout = true;
enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
IdleInTransactionSessionTimeout);
}
}
else if (IsTransactionOrTransactionBlock())
{
set_ps_display("idle in transaction", false);
pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
if (IdleInTransactionSessionTimeout > 0)
{
disable_idle_in_transaction_timeout = true;
enable_timeout_after(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
IdleInTransactionSessionTimeout);
}
}
else
{
ProcessCompletedNotifies();
pgstat_report_stat(false);
set_ps_display("idle", false);
pgstat_report_activity(STATE_IDLE, NULL);
}
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
}
DoingCommandRead = true;
firstchar = ReadCommand(&input_message);//读取命令
CHECK_FOR_INTERRUPTS();
DoingCommandRead = false;
if (disable_idle_in_transaction_timeout)
{
disable_timeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT, false);
disable_idle_in_transaction_timeout = false;
}
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
if (ignore_till_sync && firstchar != EOF)
continue;
switch (firstchar)
{
case 'Q':
{
const char *query_string;
SetCurrentStatementStartTimestamp();
query_string = pq_getmsgstring(&input_message);//SQL语句
pq_getmsgend(&input_message);
if (am_walsender)
{
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}
else
exec_simple_query(query_string);//执行SQL语句
send_ready_for_query = true;
}
break;
case 'P':
{
const char *stmt_name;
const char *query_string;
int numParams;
Oid *paramTypes = NULL;
forbidden_in_wal_sender(firstchar);
SetCurrentStatementStartTimestamp();
stmt_name = pq_getmsgstring(&input_message);
query_string = pq_getmsgstring(&input_message);
numParams = pq_getmsgint(&input_message, 2);
if (numParams > 0)
{
int i;
paramTypes = (Oid *) palloc(numParams * sizeof(Oid));
for (i = 0; i < numParams; i++)
paramTypes[i] = pq_getmsgint(&input_message, 4);
}
pq_getmsgend(&input_message);
exec_parse_message(query_string, stmt_name,
paramTypes, numParams);
}
break;
case 'B':
forbidden_in_wal_sender(firstchar);
SetCurrentStatementStartTimestamp();
exec_bind_message(&input_message);
break;
case 'E':
{
const char *portal_name;
int max_rows;
forbidden_in_wal_sender(firstchar);
SetCurrentStatementStartTimestamp();
portal_name = pq_getmsgstring(&input_message);
max_rows = pq_getmsgint(&input_message, 4);
pq_getmsgend(&input_message);
exec_execute_message(portal_name, max_rows);
}
break;
case 'F':
forbidden_in_wal_sender(firstchar);
SetCurrentStatementStartTimestamp();
pgstat_report_activity(STATE_FASTPATH, NULL);
set_ps_display("<FASTPATH>", false);
start_xact_command();
MemoryContextSwitchTo(MessageContext);
HandleFunctionRequest(&input_message);
finish_xact_command();
send_ready_for_query = true;
break;
case 'C':
{
int close_type;
const char *close_target;
forbidden_in_wal_sender(firstchar);
close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
switch (close_type)
{
case 'S':
if (close_target[0] != '\0')
DropPreparedStatement(close_target, false);
else
{
drop_unnamed_stmt();
}
break;
case 'P':
{
Portal portal;
portal = GetPortalByName(close_target);
if (PortalIsValid(portal))
PortalDrop(portal, false);
}
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid CLOSE message subtype %d",
close_type)));
break;
}
if (whereToSendOutput == DestRemote)
pq_putemptymessage('3');
}
break;
case 'D':
{
int describe_type;
const char *describe_target;
forbidden_in_wal_sender(firstchar);
SetCurrentStatementStartTimestamp();
describe_type = pq_getmsgbyte(&input_message);
describe_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
switch (describe_type)
{
case 'S':
exec_describe_statement_message(describe_target);
break;
case 'P':
exec_describe_portal_message(describe_target);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid DESCRIBE message subtype %d",
describe_type)));
break;
}
}
break;
case 'H':
pq_getmsgend(&input_message);
if (whereToSendOutput == DestRemote)
pq_flush();
break;
case 'S':
pq_getmsgend(&input_message);
finish_xact_command();
send_ready_for_query = true;
break;
case 'X':
case EOF:
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;
proc_exit(0);
case 'd':
case 'c':
case 'f':
break;
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d",
firstchar)));
}
}
}
PostgresMain函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、StringInfoData
typedef struct StringInfoData
{
char *data;
int len;
int maxlen;
int cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;
2、宏定义
#ifndef WIN32
#define PG_SETMASK(mask) sigprocmask(SIG_SETMASK, mask, NULL)
#else
typedef int sigset_t;
extern int pqsigsetmask(int mask);
#define PG_SETMASK(mask) pqsigsetmask(*(mask))
#define sigemptyset(set) (*(set) = 0)
#define sigfillset(set) (*(set) = ~0)
#define sigaddset(set, signum) (*(set) |= (sigmask(signum)))
#define sigdelset(set, signum) (*(set) &= ~(sigmask(signum)))
#endif
3、全局变量
bool IsPostmasterEnvironment = false;
bool IsUnderPostmaster = false;
bool IsBinaryUpgrade = false;
bool IsBackgroundWorker = false;
bool am_walsender = false;
依赖的函数
1、InitStandaloneProcess
void
InitStandaloneProcess(const char *argv0)
{
Assert(!IsPostmasterEnvironment);
MyProcPid = getpid();
MyStartTime = time(NULL);
InitializeLatchSupport();
MyLatch = &LocalLatchData;
InitLatch(MyLatch);
if (my_exec_path[0] == '\0')
{
if (find_my_exec(argv0, my_exec_path) < 0)
elog(FATAL, "%s: could not locate my own executable path",
argv0);
}
if (pkglib_path[0] == '\0')
get_pkglib_path(my_exec_path, pkglib_path);
}
2、InitializeGUCOptions
void
InitializeGUCOptions(void)
{
int i;
pg_timezone_initialize();
build_guc_variables();
for (i = 0; i < num_guc_variables; i++)
{
InitializeOneGUCOption(guc_variables[i]);
}
guc_dirty = false;
reporting_enabled = false;
SetConfiGoption("transaction_isolation", "default",
PGC_POSTMASTER, PGC_S_OVERRIDE);
SetConfigOption("transaction_read_only", "no",
PGC_POSTMASTER, PGC_S_OVERRIDE);
SetConfigOption("transaction_deferrable", "no",
PGC_POSTMASTER, PGC_S_OVERRIDE);
InitializeGUCOptionsFromEnvironment();
}
3、process_postgres_switches
void
process_postgres_switches(int argc, char *argv[], GucContext ctx,
const char **dbname)
{
bool secure = (ctx == PGC_POSTMASTER);
int errs = 0;
GucSource gucsource;
int flag;
if (secure)
{
gucsource = PGC_S_ARGV;
if (argc > 1 && strcmp(argv[1], "--single") == 0)
{
argv++;
argc--;
}
}
else
{
gucsource = PGC_S_CLIENT;
}
#ifdef HAVE_INT_OPTERR
opterr = 0;
#endif
while ((flag = getopt(argc, argv, "B:bc:C:D:d:EeFf:h:ijk:lN:nOo:Pp:r:S:sTt:v:W:-:")) != -1)
{
switch (flag)
{
case 'B':
SetConfigOption("shared_buffers", optarg, ctx, gucsource);
break;
case 'b':
if (secure)
IsBinaryUpgrade = true;
break;
case 'C':
break;
case 'D':
if (secure)
userDoption = strdup(optarg);
break;
case 'd':
set_debug_options(atoi(optarg), ctx, gucsource);
break;
case 'E':
if (secure)
EchoQuery = true;
break;
case 'e':
SetConfigOption("datestyle", "euro", ctx, gucsource);
break;
case 'F':
SetConfigOption("fsync", "false", ctx, gucsource);
break;
case 'f':
if (!set_plan_disabling_options(optarg, ctx, gucsource))
errs++;
break;
case 'h':
SetConfigOption("listen_addresses", optarg, ctx, gucsource);
break;
case 'i':
SetConfigOption("listen_addresses", "*", ctx, gucsource);
break;
case 'j':
if (secure)
UseSemiNewlineNewline = true;
break;
case 'k':
SetConfigOption("unix_Socket_directories", optarg, ctx, gucsource);
break;
case 'l':
SetConfigOption("ssl", "true", ctx, gucsource);
break;
case 'N':
SetConfigOption("max_connections", optarg, ctx, gucsource);
break;
case 'n':
break;
case 'O':
SetConfigOption("allow_system_table_mods", "true", ctx, gucsource);
break;
case 'o':
errs++;
break;
case 'P':
SetConfigOption("ignore_system_indexes", "true", ctx, gucsource);
break;
case 'p':
SetConfigOption("port", optarg, ctx, gucsource);
break;
case 'r':
if (secure)
strlcpy(OutputFileName, optarg, MAXPGPATH);
break;
case 'S':
SetConfigOption("work_mem", optarg, ctx, gucsource);
break;
case 's':
SetConfigOption("log_statement_stats", "true", ctx, gucsource);
break;
case 'T':
break;
case 't':
{
const char *tmp = get_stats_option_name(optarg);
if (tmp)
SetConfigOption(tmp, "true", ctx, gucsource);
else
errs++;
break;
}
case 'v':
if (secure)
FrontendProtocol = (ProtocolVersion) atoi(optarg);
break;
case 'W':
SetConfigOption("post_auth_delay", optarg, ctx, gucsource);
break;
case 'c':
case '-':
{
char *name,
*value;
ParseLongOption(optarg, &name, &value);
if (!value)
{
if (flag == '-')
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("--%s requires a value",
optarg)));
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("-c %s requires a value",
optarg)));
}
SetConfigOption(name, value, ctx, gucsource);
free(name);
if (value)
free(value);
break;
}
default:
errs++;
break;
}
if (errs)
break;
}
if (!errs && dbname && *dbname == NULL && argc - optind >= 1)
*dbname = strdup(argv[optind++]);
if (errs || argc != optind)
{
if (errs)
optind--;
if (IsUnderPostmaster)
ereport(FATAL,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid command-line argument for server process: %s", argv[optind]),
errhint("Try \"%s --help\" for more information.", progname)));
else
ereport(FATAL,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s: invalid command-line argument: %s",
progname, argv[optind]),
errhint("Try \"%s --help\" for more information.", progname)));
}
optind = 1;
#ifdef HAVE_INT_OPTRESET
optreset = 1;
#endif
}
4、SelectConfigFiles
bool
SelectConfigFiles(const char *userDoption, const char *progname)
{
char *configdir;
char *fname;
struct stat stat_buf;
if (userDoption)
configdir = make_absolute_path(userDoption);
else
configdir = make_absolute_path(getenv("PGDATA"));
if (configdir && stat(configdir, &stat_buf) != 0)
{
write_stderr("%s: could not access directory \"%s\": %s\n",
progname,
configdir,
strerror(errno));
if (errno == ENOENT)
write_stderr("Run initdb or pg_basebackup to initialize a PostgreSQL data directory.\n");
return false;
}
if (ConfigFileName)
fname = make_absolute_path(ConfigFileName);
else if (configdir)
{
fname = guc_malloc(FATAL,
strlen(configdir) + strlen(CONFIG_FILENAME) + 2);
sprintf(fname, "%s/%s", configdir, CONFIG_FILENAME);
}
else
{
write_stderr("%s does not know where to find the server configuration file.\n"
"You must specify the --config-file or -D invocation "
"option or set the PGDATA environment variable.\n",
progname);
return false;
}
SetConfigOption("config_file", fname, PGC_POSTMASTER, PGC_S_OVERRIDE);
free(fname);
if (stat(ConfigFileName, &stat_buf) != 0)
{
write_stderr("%s: could not access the server configuration file \"%s\": %s\n",
progname, ConfigFileName, strerror(errno));
free(configdir);
return false;
}
ProcessConfigFile(PGC_POSTMASTER);
if (data_directory)
SetDataDir(data_directory);
else if (configdir)
SetDataDir(configdir);
else
{
write_stderr("%s does not know where to find the database system data.\n"
"This can be specified as \"data_directory\" in \"%s\", "
"or by the -D invocation option, or by the "
"PGDATA environment variable.\n",
progname, ConfigFileName);
return false;
}
SetConfigOption("data_directory", DataDir, PGC_POSTMASTER, PGC_S_OVERRIDE);
ProcessConfigFile(PGC_POSTMASTER);
pg_timezone_abbrev_initialize();
if (HbaFileName)
fname = make_absolute_path(HbaFileName);
else if (configdir)
{
fname = guc_malloc(FATAL,
strlen(configdir) + strlen(HBA_FILENAME) + 2);
sprintf(fname, "%s/%s", configdir, HBA_FILENAME);
}
else
{
write_stderr("%s does not know where to find the \"hba\" configuration file.\n"
"This can be specified as \"hba_file\" in \"%s\", "
"or by the -D invocation option, or by the "
"PGDATA environment variable.\n",
progname, ConfigFileName);
return false;
}
SetConfigOption("hba_file", fname, PGC_POSTMASTER, PGC_S_OVERRIDE);
free(fname);
if (IdentFileName)
fname = make_absolute_path(IdentFileName);
else if (configdir)
{
fname = guc_malloc(FATAL,
strlen(configdir) + strlen(IDENT_FILENAME) + 2);
sprintf(fname, "%s/%s", configdir, IDENT_FILENAME);
}
else
{
write_stderr("%s does not know where to find the \"ident\" configuration file.\n"
"This can be specified as \"ident_file\" in \"%s\", "
"or by the -D invocation option, or by the "
"PGDATA environment variable.\n",
progname, ConfigFileName);
return false;
}
SetConfigOption("ident_file", fname, PGC_POSTMASTER, PGC_S_OVERRIDE);
free(fname);
free(configdir);
return true;
}
5、pqinitmask
void
pqinitmask(void)
{
sigemptyset(&UnBlockSig);
sigfillset(&BlockSig);
sigfillset(&StartupBlockSig);
#ifdef SIGTRAP
sigdelset(&BlockSig, SIGTRAP);
sigdelset(&StartupBlockSig, SIGTRAP);
#endif
#ifdef SIGABRT
sigdelset(&BlockSig, SIGABRT);
sigdelset(&StartupBlockSig, SIGABRT);
#endif
#ifdef SIGILL
sigdelset(&BlockSig, SIGILL);
sigdelset(&StartupBlockSig, SIGILL);
#endif
#ifdef SIGFPE
sigdelset(&BlockSig, SIGFPE);
sigdelset(&StartupBlockSig, SIGFPE);
#endif
#ifdef SIGSEGV
sigdelset(&BlockSig, SIGSEGV);
sigdelset(&StartupBlockSig, SIGSEGV);
#endif
#ifdef SIGBUS
sigdelset(&BlockSig, SIGBUS);
sigdelset(&StartupBlockSig, SIGBUS);
#endif
#ifdef SIGSYS
sigdelset(&BlockSig, SIGSYS);
sigdelset(&StartupBlockSig, SIGSYS);
#endif
#ifdef SIGCONT
sigdelset(&BlockSig, SIGCONT);
sigdelset(&StartupBlockSig, SIGCONT);
#endif
#ifdef SIGQUIT
sigdelset(&StartupBlockSig, SIGQUIT);
#endif
#ifdef SIGTERM
sigdelset(&StartupBlockSig, SIGTERM);
#endif
#ifdef SIGALRM
sigdelset(&StartupBlockSig, SIGALRM);
#endif
}
6、BaseInit
void
BaseInit(void)
{
InitCommunication();
DebugFileOpen();
InitFileAccess();
smgrinit();
InitBufferPoolAccess();
}
7、InitProcess
void
InitProcess(void)
{
PGPROC *volatile *procgloballist;
if (ProcGlobal == NULL)
elog(PANIC, "proc header uninitialized");
if (MyProc != NULL)
elog(ERROR, "you already exist");
if (IsAnyAutoVacuumProcess())
procgloballist = &ProcGlobal->autovacFreeProcs;
else if (IsBackgroundWorker)
procgloballist = &ProcGlobal->bgworkerFreeProcs;
else
procgloballist = &ProcGlobal->freeProcs;
SpinLockAcquire(ProcStructLock);
set_spins_per_delay(ProcGlobal->spins_per_delay);
MyProc = *procgloballist;
if (MyProc != NULL)
{
*procgloballist = (PGPROC *) MyProc->links.next;
SpinLockRelease(ProcStructLock);
}
else
{
SpinLockRelease(ProcStructLock);
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
}
MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
Assert(MyProc->procgloballist == procgloballist);
if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
MarkPostmasterChildActive();
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false;
MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
MyPgXact->xid = InvalidTransactionId;
MyPgXact->xmin = InvalidTransactionId;
MyProc->pid = MyProcPid;
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
MyProc->isBackgroundWorker = IsBackgroundWorker;
MyPgXact->delayChkpt = false;
MyPgXact->vacuumFlags = 0;
if (IsAutoVacuumWorkerProcess())
MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
MyProc->lwWaiting = false;
MyProc->lwWaitMode = 0;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
#ifdef USE_ASSERT_CHECKING
{
int i;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
}
#endif
MyProc->recoveryConflictPending = false;
MyProc->waitLSN = 0;
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
SHMQueueElemInit(&(MyProc->syncRepLinks));
MyProc->procArrayGroupMember = false;
MyProc->procArrayGroupMemberXid = InvalidTransactionId;
pg_atomic_init_u32(&MyProc->procArrayGroupNext, INVALID_PGPROCNO);
Assert(MyProc->lockGroupLeader == NULL);
Assert(dlist_is_empty(&MyProc->lockGroupMembers));
MyProc->wait_event_info = 0;
MyProc->clogGroupMember = false;
MyProc->clogGroupMemberXid = InvalidTransactionId;
MyProc->clogGroupMemberXidStatus = TRANSACTION_STATUS_IN_PROGRESS;
MyProc->clogGroupMemberPage = -1;
MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO);
OwnLatch(&MyProc->procLatch);
SwitchToSharedLatch();
PGSemaphoreReset(MyProc->sem);
on_shmem_exit(ProcKill, 0);
InitLWLockAccess();
InitDeadLockChecking();
}
8、InitPostgres
void
InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname, bool override_allow_connections)
{
bool bootstrap = IsBootstrapProcessingMode();
bool am_superuser;
char *fullpath;
char dbname[NAMEDATALEN];
elog(DEBUG3, "InitPostgres");
InitProcessPhase2();
MyBackendId = InvalidBackendId;
SharedInvalBackendInit(false);
if (MyBackendId > MaxBackends || MyBackendId <= 0)
elog(FATAL, "bad backend ID: %d", MyBackendId);
ProcSignalInit(MyBackendId);
if (!bootstrap)
{
ReGISterTimeout(DEADLOCK_TIMEOUT, CheckDeadLockAlert);
RegisterTimeout(STATEMENT_TIMEOUT, StatementTimeoutHandler);
RegisterTimeout(LOCK_TIMEOUT, LockTimeoutHandler);
RegisterTimeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
IdleInTransactionSessionTimeoutHandler);
}
InitBufferPoolBackend();
if (IsUnderPostmaster)
{
(void) RecoveryInProgress();
}
else
{
CreateAuxProcessResourceOwner();
StartupXLOG();
ReleaseAuxProcessResources(true);
CurrentResourceOwner = NULL;
on_shmem_exit(ShutdownXLOG, 0);
}
RelationCacheInitialize();
InitCatalogCache();
InitPlanCache();
EnablePortalManager();
if (!bootstrap)
pgstat_initialize();
RelationCacheInitializePhase2();
before_shmem_exit(ShutdownPostgres, 0);
if (IsAutoVacuumLauncherProcess())
{
pgstat_bestart();
return;
}
if (!bootstrap)
{
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
XactIsoLevel = XACT_READ_COMMITTED;
(void) GetTransactionSnapshot();
}
if (bootstrap || IsAutoVacuumWorkerProcess())
{
InitializeSessionUserIdStandalone();
am_superuser = true;
}
else if (!IsUnderPostmaster)
{
InitializeSessionUserIdStandalone();
am_superuser = true;
if (!ThereIsAtLeastOneRole())
ereport(WARNING,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("no roles are defined in this database system"),
errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.",
username != NULL ? username : "postgres")));
}
else if (IsBackgroundWorker)
{
if (username == NULL && !OidIsValid(useroid))
{
InitializeSessionUserIdStandalone();
am_superuser = true;
}
else
{
InitializeSessionUserId(username, useroid);
am_superuser = superuser();
}
}
else
{
Assert(MyProcPort != NULL);
PerformAuthentication(MyProcPort);
InitializeSessionUserId(username, useroid);
am_superuser = superuser();
}
if ((!am_superuser || am_walsender) &&
MyProcPort != NULL &&
MyProcPort->canAcceptConnections == CAC_WAITBACKUP)
{
if (am_walsender)
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new replication connections are not allowed during database shutdown")));
else
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to connect during database shutdown")));
}
if (IsBinaryUpgrade && !am_superuser)
{
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to connect in binary upgrade mode")));
}
if ((!am_superuser || am_walsender) &&
ReservedBackends > 0 &&
!HaveNFreeProcs(ReservedBackends))
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("remaining connection slots are reserved for non-replication superuser connections")));
if (am_walsender)
{
Assert(!bootstrap);
if (!superuser() && !has_rolreplication(GetUserId()))
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or replication role to start walsender")));
}
if (am_walsender && !am_db_walsender)
{
if (MyProcPort != NULL)
process_startup_options(MyProcPort, am_superuser);
if (PostAuthDelay > 0)
pg_usleep(PostAuthDelay * 1000000L);
InitializeClientEncoding();
pgstat_bestart();
CommitTransactionCommand();
return;
}
if (bootstrap)
{
MyDatabaseId = TemplateDbOid;
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
}
else if (in_dbname != NULL)
{
HeapTuple tuple;
Form_pg_database dbform;
tuple = GetDatabaseTuple(in_dbname);
if (!HeapTupleIsValid(tuple))
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", in_dbname)));
dbform = (Form_pg_database) GETSTRUCT(tuple);
MyDatabaseId = HeapTupleGetOid(tuple);
MyDatabaseTableSpace = dbform->dattablespace;
strlcpy(dbname, in_dbname, sizeof(dbname));
}
else if (OidIsValid(dboid))
{
HeapTuple tuple;
Form_pg_database dbform;
tuple = GetDatabaseTupleByOid(dboid);
if (!HeapTupleIsValid(tuple))
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database %u does not exist", dboid)));
dbform = (Form_pg_database) GETSTRUCT(tuple);
MyDatabaseId = HeapTupleGetOid(tuple);
MyDatabaseTableSpace = dbform->dattablespace;
Assert(MyDatabaseId == dboid);
strlcpy(dbname, NameStr(dbform->datname), sizeof(dbname));
if (out_dbname)
strcpy(out_dbname, dbname);
}
else
{
if (!bootstrap)
{
pgstat_bestart();
CommitTransactionCommand();
}
return;
}
if (!bootstrap)
LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
RowExclusiveLock);
MyProc->databaseId = MyDatabaseId;
InvalidateCatalogSnapshot();
if (!bootstrap)
{
HeapTuple tuple;
tuple = GetDatabaseTuple(dbname);
if (!HeapTupleIsValid(tuple) ||
MyDatabaseId != HeapTupleGetOid(tuple) ||
MyDatabaseTableSpace != ((Form_pg_database) GETSTRUCT(tuple))->dattablespace)
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname),
errdetail("It seems to have just been dropped or renamed.")));
}
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
if (!bootstrap)
{
if (access(fullpath, F_OK) == -1)
{
if (errno == ENOENT)
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist",
dbname),
errdetail("The database subdirectory \"%s\" is missing.",
fullpath)));
else
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not access directory \"%s\": %m",
fullpath)));
}
ValidatePgVersion(fullpath);
}
SetDatabasePath(fullpath);
RelationCacheInitializePhase3();
initialize_acl();
if (!bootstrap)
CheckMyDatabase(dbname, am_superuser, override_allow_connections);
if (MyProcPort != NULL)
process_startup_options(MyProcPort, am_superuser);
process_settings(MyDatabaseId, GetSessionUserId());
if (PostAuthDelay > 0)
pg_usleep(PostAuthDelay * 1000000L);
InitializeSearchPath();
InitializeClientEncoding();
InitializeSession();
if (!bootstrap)
pgstat_bestart();
if (!bootstrap)
CommitTransactionCommand();
}
插入测试数据:
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
pg_backend_pid
----------------
1893
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(23,'I am PostgresMain','I am PostgresMain','I am PostgresMain');
testdb=# -- 插入1行
insert into t_insert values(23,'I am PostgresMain','I am PostgresMain','I am PostgresMain');
(挂起)
启动gdb,跟踪调试:
[root@localhost ~]# gdb -p 1893
GNU gdb (GDB) Red Hat Enterprise linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
#断点设置在循环中
(gdb) b postgres.c:4013
Breakpoint 1 at 0x850d26: file postgres.c, line 4013.
...
(gdb) p input_message
$7 = {data = 0x1508ef0 "insert into t_insert values(23,'I am PostgresMain','I am PostgresMain','I am PostgresMain');", len = 93, maxlen = 1024, cursor = 0}
(gdb) n
...
4135 switch (firstchar)
(gdb)
4142 SetCurrentStatementStartTimestamp();
(gdb) p firstchar
$8 = 81
...
(gdb) finish
Run till exit from #0 PostgresMain (argc=1, argv=0x1532aa8, dbname=0x1532990 "testdb", username=0x1532978 "xdb") at postgres.c:4020
#DONE!
使用gdb跟踪postgres进程启动过程:
[xdb@localhost ~]$ gdb postgres
...
(gdb) set follow-fork-mode child
(gdb) start
Temporary breakpoint 1 at 0x6f1735: file main.c, line 62.
Starting program: /appdb/xdb/bin/postgres
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Temporary breakpoint 1, main (argc=1, argv=0x7fffffffe538) at main.c:62
62 bool do_check_root = true;
(gdb) b PostgresMain
Breakpoint 2 at 0x8507bb: file postgres.c, line 3631.
(gdb) del 1
No breakpoint number 1.
...
(gdb) attach 3028
Attaching to program: /appdb/xdb/bin/postgres, process 3028
...
#连接DB
[xdb@localhost ~]$ psql -d testdb
#回到gdb
#finish 直至进入postmaster.c中的PostgresMain
(gdb) finish
Run till exit from #0 ServerLoop () at postmaster.c:1704
[New process 3042]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7feb740 (LWP 3042)]
Breakpoint 2, PostgresMain (argc=1, argv=0xf28ac8, dbname=0xf289b0 "testdb", username=0xf28998 "xdb") at postgres.c:3631
3631 volatile bool send_ready_for_query = true;
(gdb) next
3632 bool disable_idle_in_transaction_timeout = false;
(gdb)
3635 if (!IsUnderPostmaster)
(gdb) p IsUnderPostmaster
$3 = true
(gdb) p dbname
$4 = 0xf289b0 "testdb"
(gdb) p username
$5 = 0xf28998 "xdb"
...
3845 MessageContext = AllocSetContextCreate(TopMemoryContext,
(gdb)
3855 row_description_context = AllocSetContextCreate(TopMemoryContext,
(gdb)
3858 MemoryContextSwitchTo(row_description_context);
(gdb)
3859 initStringInfo(&row_description_buf);
(gdb)
3860 MemoryContextSwitchTo(TopMemoryContext);
(gdb)
3865 if (!IsUnderPostmaster)
(gdb)
3890 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
(gdb) p *MessageContext
$6 = {type = T_AllocSetContext, isReset = true, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0xef9b90, firstchild = 0x0, prevchild = 0xf74c80, nextchild = 0xfabb50,
name = 0xb4e87c "MessageContext", ident = 0x0, reset_cbs = 0x0}
...
(gdb) n
4090 DoingCommandRead = true;
(gdb)
4095 firstchar = ReadCommand(&input_message);
(gdb)
4106 CHECK_FOR_INTERRUPTS();
(gdb)
4107 DoingCommandRead = false;
(gdb) p firstchar
$8 = 81
(gdb) p input_message
$9 = {data = 0xeff010 "insert into t_insert values(24,'I am PostgresMain','I am PostgresMain','I am PostgresMain');", len = 93, maxlen = 1024, cursor = 0}
(gdb) n
...
#DONE!
到此,相信大家对“PostgreSQL中PostgresMain函数的实现逻辑是什么”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
--结束END--
本文标题: PostgreSQL中PostgresMain函数的实现逻辑是什么
本文链接: https://lsjlt.com/news/65054.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0