其他分享
首页 > 其他分享> > Mybatis数据源结构解析之连接池

Mybatis数据源结构解析之连接池

作者:互联网

对于 ORM 框架而言,数据源的组织是一个非常重要的一部分,这直接影响到框架的性能问题。本文将通过对 MyBatis 框架的数据源结构进行详尽的分析,找出什么时候创建 Connection ,并且深入解析 MyBatis 的连接池。


本章的组织结构:

连接池和线程池

连接池:(降低物理连接损耗)

数据库连接是一项有限的昂贵资源,一个数据库连接对象均对应一个物理数据库连接,每次操作都打开一个物理连接,使用完都关闭连接,这样造成系统的性能低下。 数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并将这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。并且应用程序可以根据池中连接的使用率,动态增加或减少池中的连接数。


线程池:(降低线程创建销毁损耗)

线程池是一次性创建一定数量的线程(应该可以配置初始线程数量的),当用请求过来不用去创建新的线程,直接使用已创建的线程,使用后又放回到线程池中。
避免了频繁创建线程,及销毁线程的系统开销,提高是内存和CPU效率。

相同点:

都是事先准备好资源,避免频繁创建和销毁的代价。

数据源的分类

在Mybatis体系中,分为3DataSource

打开Mybatis源码找到datasource包,可以看到3个子package

MyBatis内部分别定义了实现了java.sql.DataSource接口的UnpooledDataSource,PooledDataSource类来表示UNPOOLED、POOLED类型的数据源。 如下图所示:

JNDI类型的数据源DataSource,则是通过JNDI上下文中取值。

数据源 DataSource 的创建过程

在mybatis的XML配置文件中,使用元素来配置数据源:

<!-- 配置数据源(连接池) -->
<dataSource type="POOLED"> //这里 type 属性的取值就是为POOLED、UNPOOLED、JNDI
  <property name="driver" value="${jdbc.driver}"/>
  <property name="url" value="${jdbc.url}"/>
  <property name="username" value="${jdbc.username}"/>
  <property name="password" value="${jdbc.password}"/>
</dataSource>

MyBatis在初始化时,解析此文件,根据<dataSource>的type属性来创建相应类型的的数据源DataSource,即:

MyBatis创建了DataSource实例后,会将其放到Configuration对象内的Environment对象中, 供以后使用。

注意dataSource 此时只会保存好配置信息.连接池此时并没有创建好连接.只有当程序在调用操作数据库的方法时,才会初始化连接.

DataSource什么时候创建Connection对象

我们需要创建SqlSession对象并需要执行SQL语句时,这时候MyBatis才会去调用dataSource对象来创建java.sql.Connection对象。也就是说,java.sql.Connection对象的创建一直延迟到执行SQL语句的时候。

例子:

  @Test
  public void testMyBatisBuild() throws IOException {
      Reader reader = Resources.getResourceAsReader("mybatis-config.xml");
      SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(reader);
      SqlSession sqlSession = factory.openSession();
      TestMapper mapper = sqlSession.getMapper(TestMapper.class);
      Ttest one = mapper.getOne(1L);//直到这一行,才会去创建一个数据库连接!
      System.out.println(one);
      sqlSession.close();
  }

口说无凭,跟进源码看看他们是在什么时候创建的...

跟进源码,验证Datasource 和Connection对象创建时机

验证Datasource创建时机
@Override
public DataSource getDataSource() {//此方法是UnpooledDataSourceFactory实现DataSourceFactory复写
  System.out.println("创建了数据源");
  System.out.println(dataSource.toString());
  return dataSource;
}

结论:在创建完SqlsessionFactory时,DataSource实例就创建了.


验证Connection创建时机

首先我们先查出现在数据库的所有连接数,在数据库中执行

SELECT * FROM performance_schema.hosts;

返回数据: 显示当前连接数为1,总连接数70

show full processlist; //显示所有的任务列表

返回:当前只有一个查询的连接在运行

重新启动项目,在运行到需要执行实际的sql操作时,可以看到他已经被代理增强了

直到此时,连接数还是没有变,说明连接还没有创建,我们接着往下看.

我们按F7进入方法,可以看到,他被代理,,这时候会执行到之前的代理方法中调用invoke方法.这里有一个判断,但是并不成立,于是进入cachedInvoker(method).invoke()方法代理执行一下操作

cachedInvoker(method).invoke()方法

  
    @Override
    public Object invoke(Object proxy, Method method, Object[] args, SqlSession sqlSession) throws Throwable {
      return mapperMethod.execute(sqlSession, args);
    }

继续F7进入方法,由于我们是单条查询select 所以会case进入select块中的selectOne

继续F7

继续F7

通过configuration.getMappedStatement获取MappedStatement

单步步过,F8后,进入executor.query方法

@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
  BoundSql boundSql = ms.getBoundSql(parameterObject);
  CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
  return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

继续走到底,F7进入query方法

此时,会去缓存中查询,这里的缓存是二级缓存对象 ,生命周期是mapper级别的(一级缓存是一个session级别的),因为我们此时是第一次运行程序,所以肯定为Null,这时候会直接去查询,调用delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql)方法,F7进入这个方法

二级缓存没有获取到,又去查询了一级缓存,发现一级缓存也没有,这个时候,才去查数据库

queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);//没有缓存则去db查

F7进入queryFromDatabase方法.看到是一些对一级缓存的操作,我们主要看doQuery方法F7进入它.

可以看到它准备了一个空的Statement

我们F7跟进看一下prepareStatement方法 ,发现他调用了getConnection,哎!有点眼熟了,继续F7进入getConnection()方法,

又是一个getConnection()....继续F7进入transaction.getConnection()方法

又是一个getConnection()方法.判断connection是否为空.为空openConnection()否则直接返回connection;我们F7继续跟进openConnection()方法

  protected void openConnection() throws SQLException {
  if (log.isDebugEnabled()) {
    log.debug("Opening JDBC Connection");
  }
  connection = dataSource.getConnection();//最终获取连接的地方在这句.
  if (level != null) {
    connection.setTransactionIsolation(level.getLevel());//设置隔离等级
  }
  setDesiredAutoCommit(autoCommit);//是否自动提交,默认false,update不会提交到数据库,需要手动commit
}

dataSource.getConnection()执行完,至此一个connection才创建完成.
我们验证一下 在dataSource.getConnection()时打一下断点.

此时数据库中的连接数依然没变 还是1

我们按F8 执行一步

在控制台可以看到connection = com.mysql.jdbc.JDBC4Connection@1500b2f3 实例创建完毕 我们再去数据库中看看连接数

两个连接分别是:

不使用连接池的 UnpooledDataSource

的type属性被配置成了”UNPOOLED”,MyBatis首先会实例化一个UnpooledDataSourceFactory工厂实例,然后通过.getDataSource()方法返回一个UnpooledDataSource实例对象引用,我们假定为dataSource。
使用UnpooledDataSource的getConnection(),每调用一次就会产生一个新的Connection实例对象。

UnPooledDataSource的getConnection()方法实现如下:

/*
UnpooledDataSource的getConnection()实现
*/
public Connection getConnection() throws SQLException
{
  return doGetConnection(username, password);
}

private Connection doGetConnection(String username, String password) throws SQLException
{
  //封装username和password成properties
  Properties props = new Properties();
  if (driverProperties != null)
  {
      props.putAll(driverProperties);
  }
  if (username != null)
  {
      props.setProperty("user", username);
  }
  if (password != null)
  {
      props.setProperty("password", password);
  }
  return doGetConnection(props);
}

/*
*  获取数据连接
*/
private Connection doGetConnection(Properties properties) throws SQLException
{
  //1.初始化驱动
  initializeDriver();
  //2.从DriverManager中获取连接,获取新的Connection对象
  Connection connection = DriverManager.getConnection(url, properties);
  //3.配置connection属性
  configureConnection(connection);
  return connection;
}

UnpooledDataSource会做以下几件事情:

我们每调用一次getConnection()方法,都会通过DriverManager.getConnection()返回新的java.sql.Connection实例,这样当然对于资源是一种浪费,为了防止重复的去创建和销毁连接,于是引入了连接池的概念.

使用了连接池的 PooledDataSource

要理解连接池,首先要了解它对于connection的容器,它使用PoolState容器来管理所有的conncetion


在PoolState中,它将connection分为两种状态,空闲状态(idle)活动状态(active),他们分别被存储到PoolState容器内的idleConnectionsactiveConnections两个ArrayList中

从连接池中获取一个连接对象的过程

下面让我们看一下PooledDataSource 的popConnection方法获取Connection对象的实现:

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      synchronized (state) {//给state对象加锁
        if (!state.idleConnections.isEmpty()) {//如果空闲列表不空,就从空闲列表中拿connection
          // Pool has available connection
          conn = state.idleConnections.remove(0);//拿出空闲列表中的第一个,去验证连接是否还有效
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 空闲连接池中没有可用的连接,就来看看活跃连接列表中是否有..先判断活动连接总数 是否小于 最大可用的活动连接数
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // 如果连接数小于list.size 直接创建新的连接.
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // 此时连接数也满了,不能创建新的连接. 找到最老的那个,检查它是否过期
            //计算它的校验时间,如果校验时间大于连接池规定的最大校验时间,则认为它已经过期了
            // 利用这个PoolConnection内部的realConnection重新生成一个PooledConnection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 可以要求过期这个连接.
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              //如果不能释放,则必须等待
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {//去验证连接是否还有效.
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

如上所示,对于PooledDataSource的getConnection()方法内,先是调用类PooledDataSource的popConnection()方法返回了一个PooledConnection对象,然后调用了PooledConnection的getProxyConnection()来返回Connection对象。

复用连接的过程

如果我们使用了连接池,我们在用完了Connection对象时,需要将它放在连接池中,该怎样做呢? 如果让我们来想的话,应该是通过代理Connection对象,在调用close时,并不真正关闭,而是丢到管理连接的容器中去. 要验证这个想法 那么 来看看Mybatis帮我们怎么实现复用连接的.

class PooledConnection implements InvocationHandler {

  //......
  //所创建它的datasource引用
  private PooledDataSource dataSource;
  //真正的Connection对象
  private Connection realConnection;
  //代理自己的代理Connection
  private Connection proxyConnection;

  //......
}

PooledConenction实现了InvocationHandler接口,并且,proxyConnection对象也是根据这个它来生成的代理对象:

public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;//真实连接
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

实际上,我们调用PooledDataSource的getConnection()方法返回的就是这个proxyConnection对象。
当我们调用此proxyConnection对象上的任何方法时,都会调用PooledConnection对象内invoke()方法。
让我们看一下PooledConnection类中的invoke()方法定义:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //当调用关闭的时候,回收此Connection到PooledDataSource中
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

结论:当我们使用了pooledDataSource.getConnection()返回的Connection对象的close()方法时,不会调用真正Connection的close()方法,而是将此Connection对象放到连接池中。调用dataSource.pushConnection(this)实现

protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          state.notifyAll();
        } else {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

关注公众号:java宝典
a

标签:state,数据源,Connection,PooledConnection,connection,连接池,Mybatis,conn
来源: https://www.cnblogs.com/java-bible/p/14066834.html