前言 Druid是阿里开源的数据库连接池,是阿里监控系统Dragoon的副产品,提供了强大的可监控性和基于Filter-Chain的可扩展性。 本篇文章将对Druid数据库连接池的连接获取,归还和连接泄漏检测进行分析。分析Druid数据库
Druid是阿里开源的数据库连接池,是阿里监控系统Dragoon的副产品,提供了强大的可监控性和基于Filter-Chain的可扩展性。
本篇文章将对Druid数据库连接池的连接获取,归还和连接泄漏检测进行分析。分析Druid数据库连接池的源码前,需要明确几个概念。
public DruidAbstractDataSource(boolean lockFair) { lock = new ReentrantLock(lockFair); notEmpty = lock.newCondition(); empty = lock.newCondition();}复制代码
Druid版本:1.2.11
DruidDataSource获取连接的入口方法是DruidDataSource#getConnection方法,实现如下。
public DruidPooledConnection getConnection() throws SQLException { // maxWait表示获取连接时最大等待时间,单位毫秒,默认值为-1 return getConnection(maxWait);}public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { // 首次获取连接时触发数据库连接池初始化 init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { // 直接获取连接 return getConnectionDirect(maxWaitMillis); }}复制代码
DruidDataSource#getConnection方法会调用到DruidDataSource#getConnectionDirect方法来获取连接,实现如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws sqlException { int notFullTimeoutRetryCnt = 0; for (; ; ) { DruidPooledConnection poolableConnection; try { // 从连接池拿到连接 poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { // 拿连接时有异常,可以重试 // 重试次数由notFullTimeoutRetryCount指定 if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; } // 如果配置了testOnBorrow = true,那么每次拿到连接后,都需要校验这个连接的有效性 if (testOnBorrow) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); // 如果连接不可用,则销毁连接,然后重新从池中获取 if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } } else { if (poolableConnection.conn.isClosed()) { discardConnection(poolableConnection.holder); continue; } // 如果配置testOnBorrow = fasle但testWhileIdle = true // 则判断连接空闲时间是否大于等于timeBetweenEvictionRunsMillis // 如果是,则校验连接的有效性 if (testWhileIdle) { final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); // lastActiveTimeMillis是连接最近一次活跃时间 // 新建连接,归还连接到连接池,都会更新这个时间 long lastActiveTimeMillis = holder.lastActiveTimeMillis; // lastExecTimeMillis是连接最近一次执行时间 // 新建连接,设置连接的事务是否自动提交,记录SQL到事务信息中,都会更新这个时间 long lastExecTimeMillis = holder.lastExecTimeMillis; // lastKeepTimeMillis是连接最近一次保活时间 // 在连接被保活并放回连接池时,会更新这个时间 long lastKeepTimeMillis = holder.lastKeepTimeMillis; // 如果配置checkExecuteTime为true,则最近活跃时间取值为最近执行时间 if (checkExecuteTime && lastExecTimeMillis != lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; } // 如果连接最近一次做的操作是保活,那么最近活跃时间取值为最近保活时间 if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lasTKEepTimeMillis; } // 计算空闲时间 long idleMillis = currentTimeMillis - lastActiveTimeMillis; // testWhileIdle为true时的判断时间间隔 long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis; if (timeBetweenEvictionRunsMillis <= 0) { // timeBetweenEvictionRunsMillis如果小于等于0,那么重置为60秒 timeBetweenEvictionRunsMillis= DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; } // 如果空闲时间大于等于timeBetweenEvictionRunsMillis,则执行连接的有效性校验 if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0 ) { boolean validate = testConnectionInternal(poolableConnection.holder,poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } } } } // 如果设置removeAbandoned为true // 则将连接放到activeConnections活跃连接map中 if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true; activeConnectionLock.lock(); try { activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } } if (!this.defaultAutoCommit) { poolableConnection.setAutoCommit(false); } return poolableConnection; }}复制代码
DruidDataSource#getConnectionDirect方法中会先调用getConnectionInternal() 方法从连接池中拿连接,然后如果开启了testOnBorrow,则校验一下连接的有效性,如果无效则重新调用getConnectionInternal() 方法拿连接,直到拿到的连接通过校验。如果没有开启testOnBorrow但是开启了testWhileIdle,则会判断连接的空闲时间是否大于等于timeBetweenEvictionRunsMillis参数,如果满足则校验一下连接的有效性,若没有通过校验,那么需要重新调用getConnectionInternal() 方法拿连接,直到拿到的连接通过校验或者连接的空闲时间小于timeBetweenEvictionRunsMillis。
下面看一下实际从连接池拿连接的getConnectionInternal() 方法的实现,如下所示。
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { // 省略 final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder; // 在死循环中从连接池拿连接 // 一开始createDirect为false,表示先从池子中拿 for (boolean createDirect = false; ; ) { if (createDirect) { // createDirect为true表示直接创建连接 createStartNanosUpdater.set(this, System.nanoTime()); // creatinGCount为0表示当前没有其它连接正在被创建 if (creatingCountUpdater.compareAndSet(this, 0, 1)) { // 创建物理连接 PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis(); creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); // 省略 boolean discard; lock.lock(); try { // 如果当前正在使用的连接数未达到最大连接数 // 则当前正在使用的连接数加1 // 否则销毁刚刚创建出来的连接 if (activeCount < maxActive) { activeCount++; holder.active = true; if (activeCount > activePeak) {activePeak = activeCount;activePeakTime = System.currentTimeMillis(); } break; } else { discard = true; } } finally { lock.unlock(); } if (discard) { JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } } // 上锁 try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { // maxWaitThreadCount表示允许的最大等待连接的应用线程数 // notEmptyWaitThreadCount表示正在等待连接的应用线程数 // 等待连接的应用线程数达到最大值时,抛出异常 if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } // 发生了致命错误,且设置了致命错误数最大值大于0,且正在使用的连接数大于等于致命错误数最大值 if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrORMaxActive) { // 拼接异常并抛出 // 省略 throw new SQLException( errorMsg.toString(), lastFatalError); } connectCount++; // 如果配置的创建连接的线程池是一个定时线程池 // 且连接池已经没有可用连接, // 且当前借出的连接数未达到允许的最大连接数 // 且当前没有其它线程(应用线程,创建连接的线程,创建连接的线程池里的线程)在创建连接 // 此时将createDirect置为true,让当前应用线程直接创建连接 if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0) { createDirect = true; continue; } } if (maxWait > 0) { // 如果设置了等待连接的最大等待时间,则调用pollLast()方法来拿连接 // pollLast()方法执行时如果池中没有连接,则应用线程会在notEmpty上最多等待maxWait的时间 holder = pollLast(nanos); } else { // 调用takeLast()方法拿连接时,如果池中没有连接,则会在notEmpty上一直等待,直到池中有连接 holder = takeLast(); } if (holder != null) { if (holder.discard) { continue; } // 正在使用的连接数加1 activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this); throw e; } finally { lock.unlock(); } break; } // 如果拿到的连接为null,说明拿连接时等待超时了 // 此时抛出连接超时异常 if (holder == null) { // 省略 final Throwable createError; try { lock.lock(); // 省略 createError = this.createError; } finally { lock.unlock(); } // 省略 if (createError != null) { throw new GetConnectionTimeoutException(errorMessage, createError); } else { throw new GetConnectionTimeoutException(errorMessage); } } holder.incrementUseCount(); DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection;}复制代码
getConnectionInternal() 方法中拿到连接的方式有三种,如下所示。
下面最后看一下超时等待拿连接的DruidDataSource#pollLast方法的实现。
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (; ; ) { if (poolingCount == 0) { // 如果池中已经没有连接,则唤醒在empty上等待的创建连接线程来创建连接 emptySignal(); if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } // 等待时间耗尽,返回null if (estimate <= 0) { waitNanosLocal.set(nanos - estimate); return null; } // 应用线程即将在下面的notEmpty上等待 // 这里先把等待获取连接的应用线程数加1 notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { long startEstimate = estimate; // 应用线程在notEmpty上等待 // 有连接被创建或者被归还时,会唤醒在notEmpty上等待的应用线程 estimate = notEmpty.awaitNanos(estimate); notEmptyWaitCount++; notEmptyWaitNanos += (startEstimate - estimate); if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } } catch (InterruptedException ie) { notEmpty.signal(); notEmptySignalCount++; throw ie; } finally { notEmptyWaitThreadCount--; } if (poolingCount == 0) { if (estimate > 0) { // 若唤醒后池中还是没有连接,且此时等待时间还有剩余 // 则重新在notEmpty上等待 continue; } waitNanosLocal.set(nanos - estimate); return null; } } // poolingCount-- decrementPoolingCount(); // 从池中拿到连接 DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; long waitNanos = nanos - estimate; last.setLastNotEmptyWaitNanos(waitNanos); return last; }}复制代码
Druid 数据库连接池中,每一个物理连接都会被包装成DruidConnectionHolder,在提供给应用线程前,还会将DruidConnectionHolder包装成DruidPooledConnection,类图如下所示。
应用线程中使用连接完毕后,会调用DruidPooledConnection的close() 方法来归还连接,也就是将连接放回连接池。DruidPooledConnection#close方法如下所示。
public void close() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } DruidAbstractDataSource dataSource = holder.getDataSource(); // 判断归还连接的线程和获取连接的线程是否是同一个线程 boolean isSameThread = this.getOwnerThread() == Thread.currentThread(); // 如果不是同一个线程,则设置asyncCloseConnectionEnable为true if (!isSameThread) { dataSource.setAsyncCloseConnectionEnable(true); } // 如果开启了removeAbandoned机制 // 或者asyncCloseConnectionEnable为true // 则调用syncClose()方法来归还连接 // syncClose()方法中会先加锁,然后调用recycle()方法来回收连接 if (dataSource.isAsyncCloseConnectionEnable()) { syncClose(); return; } if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) { return; } try { for (ConnectionEventListener listener : holder.getConnectionEventListeners()) { listener.connectionClosed(new ConnectionEvent(this)); } List filters = dataSource.getProxyFilters(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(dataSource); filterChain.dataSource_recycle(this); } else { // 回收连接 recycle(); } } finally { CLOSING_UPDATER.set(this, 0); } this.disable = true;}复制代码
在DruidPooledConnection#close方法中,会先判断本次归还连接的线程和获取连接的线程是否是同一个线程,如果不是,则先加锁然后再调用recycle() 方法来回收连接,如果是则直接调用recycle() 方法来回收连接。当开启了removeAbandoned机制时,就可能会出现归还连接的线程和获取连接的线程不是同一个线程的情况,这是因为一旦开启了removeAbandoned机制,那么每一个被借出的连接都会被放到activeConnections活跃连接map中,并且在销毁连接的线程DestroyConnectionThread 中会每间隔timeBetweenEvictionRunsMillis的时间就遍历一次activeConnections活跃连接map,一旦有活跃连接被借出的时间大于了removeAbandonedTimeoutMillis,那么销毁连接的线程DestroyConnectionThread就会主动去回收这个连接,以防止连接泄漏。
下面看一下DruidPooledConnection#recycle方法的实现。
public void recycle() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } if (!this.abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); // 调用DruidAbstractDataSource#recycle回收当前连接 dataSource.recycle(this); } this.holder = null; conn = null; transactionInfo = null; closed = true;}复制代码
在DruidPooledConnection#recycle方法中会调用到DruidDataSource#recycle方法来回收连接。DruidDataSource#recycle方法实现如下所示。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { final DruidConnectionHolder holder = pooledConnection.holder; // 省略 final boolean isAutoCommit = holder.underlyingAutoCommit; final boolean isReadOnly = holder.underlyingReadOnly; final boolean testOnReturn = this.testOnReturn; try { // 如果是非自动提交且存在事务 // 则回滚事务 if ((!isAutoCommit) && (!isReadOnly)) { pooledConnection.rollback(); } // 重置连接信息(配置还原为默认值,关闭Statement,清除连接的Warnings等) boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread(); if (!isSameThread) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { holder.reset(); } finally { lock.unlock(); } } else { holder.reset(); } // 省略 // 开启了testOnReturn机制,则校验连接有效性 if (testOnReturn) { boolean validate = testConnectionInternal(holder, physicalConnection); // 校验不通过则关闭物理连接 if (!validate) { JdbcUtils.close(physicalConnection); destroyCountUpdater.incrementAndGet(this); lock.lock(); try { if (holder.active) { activeCount--; holder.active = false; } closeCount++; } finally { lock.unlock(); } return; } } // 省略 lock.lock(); try { // 连接即将放回连接池,需要将active设置为false if (holder.active) { activeCount--; holder.active = false; } closeCount++; // 将连接放到connections数组的poolingCount位置 // 然后poolingCount加1 // 然后唤醒在notEmpty上等待连接的一个应用线程 result = putLast(holder, currentTimeMillis); recycleCount++; } finally { lock.unlock(); } if (!result) { JdbcUtils.close(holder.conn); LOG.info("connection recyle failed."); } } catch (Throwable e) { // 省略 }}复制代码
DruidDataSource#recycle方法中会先重置连接信息,即将连接的一些配置重置为默认值,然后关闭连接的Statement和Warnings,如果开启了testOnReturn机制,则还需要校验一下连接的有效性,校验不通过则直接关闭物理连接,最后,将连接放回到connections数组的poolingCount位置,然后唤醒一个在notEmpty上等待连接的应用线程。
Druid数据库连接池提供了removeAbandoned机制来防止连接泄漏。要开启removeAbandoned机制,需要设置如下参数。
参数 | 说明 |
---|---|
removeAbandoned | 发生连接泄漏时,是否需要回收泄漏的连接。默认为false,表示不回收。 |
removeAbandonedTimeoutMillis | 判断发生连接泄漏的超时时间。默认为300秒。 |
logAbandoned | 是否打印堆栈。默认为false,包是不打印。 |
下面将对开启removeAbandoned机制后,如何回收发生了泄漏的连接进行说明。当应用线程从连接池获取到一个连接后,如果开启了removeAbandoned机制,那么会将这个连接放到activeConnections活跃连接map中,对应的方法为DruidDataSource#getConnectionDirect,源码片段如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (; ; ) { DruidPooledConnection poolableConnection; // 省略 if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; // 设置connectedTimeNano,用于后续判断连接借出时间是否大于removeAbandonedTimeoutMillis poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true; activeConnectionLock.lock(); try { // 将从连接池获取到的连接放到activeConnections中 activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } } if (!this.defaultAutoCommit) { poolableConnection.setAutoCommit(false); } return poolableConnection; }}复制代码
又已知Druid数据库连接池有一个销毁连接的线程会每间隔timeBetweenEvictionRunsMillis执行一次DestroyTask#run方法来销毁连接,DestroyTask#run方法如下所示。
public void run() { shrink(true, keepAlive); // 如果开启了removeAbandoned机制 // 则执行removeAbandoned()方法来检测发生了泄漏的连接并回收 if (isRemoveAbandoned()) { removeAbandoned(); }}复制代码
DestroyTask#run方法的最后会判断是否开启了removeAbandoned机制,如果开启了则会执行DruidDataSource#removeAbandoned方法来检测哪些连接发生了泄漏,并主动回收这些连接。DruidDataSource#removeAbandoned方法如下所示。
public int removeAbandoned() { int removeCount = 0; long currrentNanos = System.nanoTime(); List abandonedList = new ArrayList(); activeConnectionLock.lock(); try { Iterator iter = activeConnections.keySet().iterator(); for (; iter.hasNext(); ) { DruidPooledConnection pooledConnection = iter.next(); // 运行中的连接不会被判定为发生了泄漏 if (pooledConnection.isRunning()) { continue; } long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); // 判断连接借出时间是否达到连接泄漏的超时时间 if (timeMillis >= removeAbandonedTimeoutMillis) { // 将发生了泄漏的连接从activeConnections中移除 iter.remove(); pooledConnection.setTraceEnable(false); // 将发生了泄露的连接添加到abandonedList集合中 abandonedList.add(pooledConnection); } } } finally { activeConnectionLock.unlock(); } if (abandonedList.size() > 0) { // 遍历abandonedList集合 // 主动调用每个发生了泄漏的DruidPooledConnection的close()方法来回收连接 for (DruidPooledConnection pooledConnection : abandonedList) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { if (pooledConnection.isDisable()) { continue; } } finally { lock.unlock(); } JdbcUtils.close(pooledConnection); pooledConnection.abandond(); removeAbandonedCount++; removeCount++; // 省略 } } return removeCount;}复制代码
DruidDataSource#removeAbandoned方法中主要完成的事情就是将每个发生了泄漏的连接从activeConnections中移动到abandonedList中,然后遍历abandonedList中的每个连接并调用DruidPooledConnection#close方法,最终完成泄漏连接的回收。
结合搞懂Druid之连接池初始化和搞懂Druid之连接创建和销毁,对Druid的一个基本原理进行如下总结。
Druid数据库连接池中,应用线程向连接池获取连接时,如果池中没有连接,则应用线程会在notEmpty上等待,同时Druid数据库连接池中有一个创建连接的线程,会持续的向连接池创建连接,如果连接池已满,则创建连接的线程会在empty上等待。
当有连接被生产,或者有连接被归还,会唤醒在notEmpty上等待的应用线程,同理有连接被销毁时,会唤醒在empty上等待的生产连接的线程。
Druid数据库连接池中还有一个销毁连接的线程,会每间隔timeBetweenEvictionRunsMillis的时间执行一次DestroyTask任务来销毁连接,这些被销毁的连接可以是存活时间达到最大值的连接,也可以是空闲时间达到指定值的连接。如果还开启了保活机制,那么空闲时间大于keepAliveBetweenTimeMillis的连接都会被校验一次有效性,校验不通过的连接会被销毁。
最后,Druid数据库连接池提供了removeAbandoned机制来防止连接泄漏,当开启了removeAbandoned机制时,每一个被应用线程获取的连接都会被添加到activeConnections活跃连接map中,如果这个连接在应用线程中使用完毕后没有被关闭,那么Druid数据库连接池会从activeConnections中将其识别出来并主动回收。
来源地址:https://blog.csdn.net/BASK2311/article/details/129136983
--结束END--
本文标题: 搞懂Druid之连接获取和归还
本文链接: https://lsjlt.com/news/411540.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0