[源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
作者:互联网
本文剖析 NetFlix Dynomite 的 Java 客户端 DynoJedisClient 如何实现。分析客户端是因为,此客户端的作用很类似于集群master,其思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
[源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
目录
- [源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
0x00 摘要
前面我们有文章介绍了Amazon Dynamo系统架构 和 NetFlix Dynomite。
我们今天来看看 NetFlix Dynomite 的 Java 客户端 DynoJedisClient 如何实现。分析客户端是因为,此客户端的作用很类似于集群master,其思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
因为 Dynomite 对于本文来说,过于庞大&底层,而且 DynoJedisClient 与 Dynomite 耦合过于紧密, 所以我们从最简单的功能点出发看看 DynoJedisClient,于是我们可以想到的功能点是:
- 如何提供基本功能,即提供数据库连接池;
- 如何管理节点连接;
- 如何拓扑感知;
- 如何负载均衡;
- 如何故障转移;
- 故障转移;
所以我们接下来就围绕这些基本功能点进行分析。
0x01 背景概念
1.1 Amazon Dynamo
亚马逊在业务发展期间面临一些问题,主要受限于关系型数据库的可扩展性和高可用性,因此研发了一套新的、基于 KV
存储模型的数据库,将之命名为 Dynamo
,其主要采取完全的分布式、去中心化的架构。
相较于传统的关系型数据库 MySQL
,Dynamo
的功能目标与之有一些细小的差别,例如: Amazon
的业务场景多数情况并不需要支持复杂查询,却要求必要的单节点故障容错性、数据最终一致性(即牺牲数据强一致优先保障可用性)、较强的可扩展性等。
1.2 NetFlix Dynomite
Dynomite 是 NetFlix 对亚马逊分布式存储引擎 Dynamo 的一个开源通用实现,它不仅支持基于内存的 K/V 数据库,还支持持久化的 Mysql、BerkeleyDb、LevelDb 等数据库,并具有简单、高效、支持跨数据中心的数据复制等优点。
Dynomite 的最终目标是提供数据库存储引擎不能提供的简单、高效、跨数据中心的数据复制功能。目前,Dynomite 已经实现了对 Redis 和 Memcached 的支持。
0x02 Netflix选型思路
Netflix选择Dynomite,是因为:
其具有性能,多数据中心复制和高可用性的特点;
Dynomite提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;
Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列。
Dyno为持久连接提供连接池;
Dyno可以为连接池配置为拓扑感知;
故障转移:Dyno为应用程序提供特定的本地机架,us-east-1a的客户端将连接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种情况下该客户端将进行故障转移。这个属性被用于通过区域划分队列。
Dynomite对于本文来说,过于底层。
所以我们重点就看看 DynoJedisClient 如何实现后面几点,当然,这几点其实也无法脱离Dynomite,我们只是力争剥离出来。
0x03 基础知识
3.1 Data Center
Data Center 是由多个Rack组成的逻辑集合。
Data Center 可以是一个机房或者一个区域的设备组合。
3.2 Rack
这是一个逻辑集合,有多个彼此临近node的组成。比如一个机架上的所有物理机器。可简单的理解为存放服务器的机柜。
数据中心与机架是什么关系呢?N:1,1:N,M:N。
- 如果只需要几台服务器就能满足业务需求,这些服务器至少有2个数据中心,那这种情况下多个数据中心可以放在1个机架上,不过这种情况对数据灾备来说是不太保险的。
- 第2种情况是1个数据中心相当于1个机房,那机房里会有多个机架。
- 第3种情况M:N为多个机房的多个数据中心置于多个机架上。
3.2 Rings and Tokens
由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,确定在环中的位置。
Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。
Token也决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。
0x04 需求 & 思路
因为要为上层屏蔽信息,所以 DynoJedisClient 就需要应对各种复杂信息,需要对系统有深刻的了解,比如:
- 如何维护连接,为持久连接提供连接池;
- 如何维护拓扑;
- 如何负载均衡;
- 如何故障转移;
- 如何自动重试及发现,比如自动重试挂掉的主机。自动发现集群中的其他主机。
- 如何监控底层机架状态;
因此,DynoJedisClient 的思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
0x05 使用
示例代码如下:
public static void main(String[] args) throws IOException { final String clusterName = args[0]; int version = Integer.parseInt(args[1]); final DynoQueueDemo demo = new DynoQueueDemo(clusterName, "us-east-1e"); Properties props = new Properties(); props.load(DynoQueueDemo.class.getResourceAsStream("/demo.properties")); for (String name : props.stringPropertyNames()) { System.setProperty(name, props.getProperty(name)); } try { demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false); if (version == 1) { demo.runSimpleV1Demo(demo.client); } else if (version == 2) { demo.runSimpleV2QueueDemo(demo.client); } Thread.sleep(10000); } catch (Exception ex) { ex.printStackTrace(); } finally { demo.stop(); logger.info("Done"); } }
以及辅助函数:
public void initWithRemoteClusterFromEurekaUrl(final String clusterName, final int port, boolean lock) throws Exception { initWithRemoteCluster(clusterName, getHostsFromDiscovery(clusterName), port, lock); } private void initWithRemoteCluster(String clusterName, final Listhosts, final int port, boolean lock) throws Exception { final HostSupplier clusterHostSupplier = () -> hosts; if (lock) initDynoLockClient(clusterHostSupplier, null, "test", clusterName); else init(clusterHostSupplier, port, null); } public void initDynoLockClient(HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, String appName, String clusterName) { dynoLockClient = new DynoLockClient.Builder().withApplicationName(appName) .withDynomiteClusterName(clusterName) .withTimeoutUnit(TimeUnit.MILLISECONDS) .withTimeout(10000) .withHostSupplier(hostSupplier) .withTokenMapSupplier(tokenMapSupplier).build(); }
0x06 配置
在 DynoJedisClient 之中,有如下重要配置类。
6.1 缺省配置
ConnectionPoolConfigurationImpl主要是提供缺省配置。
public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration { // DEFAULTS private static final LoadBalancingStrategy DEFAULT_LB_STRATEGY = LoadBalancingStrategy.TokenAware; private static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.NONE; private HostSupplier hostSupplier; private TokenMapSupplier tokenSupplier; private HostConnectionPoolFactory hostConnectionPoolFactory; private HashPartitioner hashPartitioner; private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY; private CompressionStrategy compressionStrategy = DEFAULT_COMPRESSION_STRATEGY; }
6.2 策略配置
ArchaiusConnectionPoolConfiguration最主要是提供了若干策略,包括负载,压缩,重试:
- LoadBalancingStrategy parseLBStrategy(String propertyPrefix) 是负载策略;
- CompressionStrategy parseCompressionStrategy(String propertyPrefix) 是压缩策略;
- RetryPolicyFactory parseRetryPolicyFactory(String propertyPrefix) 是重试策略;
具体如下:
public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigurationImpl { ...... private final LoadBalancingStrategy loadBalanceStrategy; private final CompressionStrategy compressionStrategy; private final ErrorRateMonitorConfig errorRateConfig; private final RetryPolicyFactory retryPolicyFactory; private final DynamicBooleanProperty failOnStartupIfNoHosts; private final DynamicIntProperty lockVotingSize; ...... }
0x07 定义
DynoJedisClient 定义如下,我们可以看到最重要的成员变量就是连接池ConnectionPool。
public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, MultiKeyCommands,ScriptingCommands, MultiKeyBinaryCommands, DynoJedisCommands { private final String appName; private final String clusterName; private final ConnectionPoolconnPool; private final AtomicReferencepipelineMonitor = new AtomicReference(); protected final DynoOPMonitor opMonitor; protected final ConnectionPoolMonitor cpMonitor; }
0x08 逻辑连接池
因为 DynoJedisClient 最主要是管理连接池,所以我们首先介绍 逻辑连接池 ConnectionPoolImpl。
连接池层为应用程序抽象所有连接管理。在这里,我们可以配置所有内容,例如指定池选项,负载平衡策略,重试策略或默认一致性级别。
ConnectionPoolImpl 是核心类,其主要功能是:
- 对于从HostSupplier获得的各种HostConnectionPool进行维护,形成一个HostConnectionPool集合;
- 对于HostSupplier检测到的hosts,进行添加删除;
- 从HostConnectionPool提取Connection,进行Operation的执行;
- 在执行Operation时,采用HostSelectionStrategy,比如:basically Round Robin 或者 TokenAware策略;
- 使用health check monitor来进行错误率跟踪。health check monitor可以决定重用HostConnectionPool,以及fallback到remote数据中心的HostConnectionPools执行;
- 使用RetryPolicy来执行operation;
具体定义如下:
public class ConnectionPoolImplimplements ConnectionPool, TopologyView { private final ConcurrentHashMap<Host, HostConnectionPool> cpMap = new ConcurrentHashMap<Host, HostConnectionPool>(); private final ConnectionPoolHealthTrackercpHealthTracker; private final HostConnectionPoolFactoryhostConnPoolFactory; private final ConnectionFactoryconnFactory; private final ConnectionPoolConfiguration cpConfiguration; private final ConnectionPoolMonitor cpMonitor; private final ScheduledExecutorService idleThreadPool = Executors.newSingleThreadScheduledExecutor(); private final HostsUpdater hostsUpdater; private final ScheduledExecutorService connPoolThreadPool = Executors.newScheduledThreadPool(1); private HostSelectionWithFallbackselectionStrategy; private Type poolType; }
此时逻辑如下:
+------------------------+ |DynoJedisClient | | | | | +------------------------+ | | | | | connPool +--------------> | ConnectionPoolImpl | | | | | | | +------------------------+ +------------------------+
8.1 启动
连接池 启动逻辑是:
- 利用hostsUpdater来获取到的host进行配置添加;
- 启用health check monitor来进行错误率跟踪;
具体如下:
@Override public Futurestart() throws DynoException { HostSupplier hostSupplier = cpConfiguration.getHostSupplier(); HostStatusTracker hostStatus = hostsUpdater.refreshHosts(); cpMonitor.setHostCount(hostStatus.getHostCount()); CollectionhostsUp = hostStatus.getActiveHosts(); final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size())); final List<Future> futures = new ArrayList<Future>(); // 利用hostsUpdater来获取到的host进行配置添加 for (final Host host : hostsUp) { // Add host connection pool, but don't init the load balancer yet futures.add(threadPool.submit(new Callable() { @Override public Void call() throws Exception { addHost(host, false); return null; } })); } // 启用health check monitor来进行错误率跟踪 boolean success = started.compareAndSet(false, true); if (success) { selectionStrategy = initSelectionStrategy(); cpHealthTracker.start(); connPoolThreadPool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { HostStatusTracker hostStatus = hostsUpdater.refreshHosts(); cpMonitor.setHostCount(hostStatus.getHostCount()); Logger.debug(hostStatus.toString()); updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts()); } }, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS); MonitorConsole.getInstance().registerConnectionPool(this); registerMonitorConsoleMBean(MonitorConsole.getInstance()); } return getEmptyFutureTask(true); }
8.2 配置Host
启动过程中,添加host逻辑如下:
- 依据host获取HostConnectionPool;
- 把HostConnectionPool加入到集合;
- 把 host,HostConnectionPool加入到选择策略selectionStrategy;
- 依据host设置health check monitor;
具体如下:
public boolean addHost(Host host, boolean refreshLoadBalancer) { HostConnectionPoolconnPool = cpMap.get(host); final HostConnectionPoolhostPool = hostConnPoolFactory.createHostConnectionPool(host, this); HostConnectionPoolprevPool = cpMap.putIfAbsent(host, hostPool); if (prevPool == null) { // This is the first time we are adding this pool. try { int primed = hostPool.primeConnections(); if (hostPool.isActive()) { if (refreshLoadBalancer) { selectionStrategy.addHost(host, hostPool); } cpHealthTracker.initializePingHealthchecksForPool(hostPool); cpMonitor.hostAdded(host, hostPool); } else { cpMap.remove(host); } return primed > 0; } catch (DynoException e) { cpMap.remove(host); return false; } } }
8.3 获取HostConnectionPool
关于获取HostConnectionPool,有同步和异步 两种实现方式,具体如下。
private class SyncHostConnectionPoolFactory implements HostConnectionPoolFactory{ @Override public HostConnectionPoolcreateHostConnectionPool(Host host, ConnectionPoolImplparentPoolImpl) { return new HostConnectionPoolImpl(host, connFactory, cpConfiguration, cpMonitor); } } private class AsyncHostConnectionPoolFactory implements HostConnectionPoolFactory{ @Override public HostConnectionPoolcreateHostConnectionPool(Host host, ConnectionPoolImplparentPoolImpl) { return new SimpleAsyncConnectionPoolImpl(host, connFactory, cpConfiguration, cpMonitor); } }
8.4 执行
逻辑连接池 有两种执行方式:executeWithRing 与 executeWithFailover。
executeWithRing使用较少,所以不详细介绍。
executeWithFailover 是 利用selectionStrategy获取Connection,在此Connection之上进行执行。如果失败就各种重试。
publicOperationResultexecuteWithFailover(Operationop) throws DynoException { RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy(); retry.begin(); do { Connectionconnection = null; try { connection = selectionStrategy.getConnectionUsingRetryPolicy(op, cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry); updateConnectionContext(connection.getContext(), connection.getHost()); OperationResultresult = connection.execute(op); // Add context to the result from the successful execution result.setNode(connection.getHost()).addMetadata(connection.getContext().getAll()); retry.success(); cpMonitor.incOperationSuccess(connection.getHost(), System.currentTimeMillis() - startTime); return result; } finally { if (connection != null) { if (connection.getLastException() != null && connection.getLastException() instanceof FatalConnectionException) { connection.getParentConnectionPool().recycleConnection(connection); // note - don't increment connection closed metric here; // it's done in closeConnection } else { connection.getContext().reset(); connection.getParentConnectionPool().returnConnection(connection); } } } } while (retry.allowRetry()); throw lastException; }
此时逻辑如下:
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | | | cpMap +--------> |[Host, HostConnectionPool]| +-------------------+ | | | + | +----------------------+ | | | +--------------------------+ | | | v +---------------+-----+ | | | HostConnectionPool | | | +---------------------+
0x09 具体连接池
HostConnectionPool 是具体连接池实现,此类为每一个Host节点维护一个有效连接池。
具体是:
- HostConnectionPool 使用 LinkedBlockingQueue availableConnections 来维护所有有效连接,当client需要一个连接,需要从queue中提取。
- 所以,availableConnections 就是有效连接池。
- availableConnections 之中每一个 连接就是一个 Connection;
- 这个 Connection (JedisConnection)是通过 JedisConnectionFactory 建立的;
- 另外,每一个 JedisConnection 里面有:
- HostConnectionPool
- Jedis jedisClient;
具体如下:
public class HostConnectionPoolImplimplements HostConnectionPool{ // The connections available for this connection pool private final LinkedBlockingQueue<Connection> availableConnections = new LinkedBlockingQueue<Connection>(); // Private members required by this class private final Host host; private final ConnectionFactoryconnFactory; private final ConnectionPoolConfiguration cpConfig; private final ConnectionPoolMonitor monitor; // states that dictate the behavior of the pool // cp not inited is the starting state of the pool. The pool will not allow connections to be borrowed in this state private final ConnectionPoolStatecpNotInited = new ConnectionPoolNotInited(); // cp active is where connections of the pool can be borrowed and returned private final ConnectionPoolStatecpActive = new ConnectionPoolActive(this); // cp reconnecting is where connections cannot be borrowed and all returning connections will be shutdown private final ConnectionPoolStatecpReconnecting = new ConnectionPoolReconnectingOrDown(); // similar to reconnecting private final ConnectionPoolStatecpDown = new ConnectionPoolReconnectingOrDown(); // The thread safe reference to the pool state private final AtomicReference<ConnectionPoolState> cpState = new AtomicReference<ConnectionPoolState>(cpNotInited); }
9.1 生成Connection
首先我们要看看 如何生成 Connection,大致就是从 connFactory 中直接获取,然后执行监控等相应操作。
@Override public ConnectioncreateConnection() { try { Connectionconnection; if (cpConfig.isConnectToDatastore()) { // 具体建立连接操作 connection = connFactory.createConnectionWithDataStore(pool); } else if (cpConfig.isConnectionPoolConsistencyProvided()) { connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency()); } else { connection = connFactory.createConnection(pool); } connection.open(); availableConnections.add(connection); monitor.incConnectionCreated(host); numActiveConnections.incrementAndGet(); return connection; } }
9.2 JedisConnectionFactory
JedisConnectionFactory 的 createConnectionWithDataStore 函数执行了具体 建立连接操作,涉及到 Jedis 很多朋友应该都很熟悉。
简略版代码如下:
public class JedisConnectionFactory implements ConnectionFactory{ private final OperationMonitor opMonitor; private final SSLSocketFactory sslSocketFactory; public JedisConnectionFactory(OperationMonitor monitor, SSLSocketFactory sslSocketFactory) { this.opMonitor = monitor; this.sslSocketFactory = sslSocketFactory; } @Override public ConnectioncreateConnectionWithDataStore(HostConnectionPoolpool) { return new JedisConnection(pool, true); } // TODO: raghu compose redisconnection with jedisconnection in it public class JedisConnection implements Connection{ private final HostConnectionPoolhostPool; private final Jedis jedisClient; public JedisConnection(HostConnectionPoolhostPool, boolean connectDataStore) { this.hostPool = hostPool; Host host = hostPool.getHost(); int port = connectDataStore ? host.getDatastorePort() : host.getPort(); if (sslSocketFactory == null) { JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port, hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT); jedisClient = new Jedis(shardInfo); } else { JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port, hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT, true, sslSocketFactory, new SSLParameters(), null); jedisClient = new Jedis(shardInfo); } } @Override public HostConnectionPoolgetParentConnectionPool() { return hostPool; } public Jedis getClient() { return jedisClient; } } }
此时逻辑如下:
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | | | cpMap +--------> |[Host, HostConnectionPool]| +-------------------+ | | | + | +----------------------+ | | | +--------------------------+ | | +-----------------------------+ | | JedisConnectionFactory | v | | +---------------+-------------------------------------------+ | | createConnectionWithDataStore | HostConnectionPool | | | | | | sslSocketFactory | <------------------------------------------------+ connFactory Host | | | | | | | | LinkedBlockingQueue<Connection<CL<>availableConnections | +-----------------------------+ | | +------------------------------+----------------------------+ + ^ | +----------------------------------------+ | | |JedisConnection | | | | | | | return | | return | | | HostConnectionPoolhostPool | | +---------------> | | +--------------------------------+ | Jedis(shardInfo) jedisClient | | | +----------------------------------------+
手机上如下:
9.3 获取Connection
用户使用 borrowConnection 来得到 连接,并且做监控。
@Override public ConnectionborrowConnection(int duration, TimeUnit unit) { // Start recording how long it takes to get the connection - for insight/metrics long startTime = System.nanoTime() / 1000; Connectionconn = null; // wait on the connection pool with a timeout conn = availableConnections.poll(duration, unit); long delay = System.nanoTime() / 1000 - startTime; monitor.incConnectionBorrowed(host, delay); }
0x10 拓扑
这里拓扑主要指的是token环,我们再复习下概念。
在 Dynomite 之中,由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,toekn 可以确定在环中的位置。
Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。
Token决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。
10.1 只读视图
TopologyView代表了服务器拓扑的只读视图。
public interface TopologyView { /** * Retrieves a read-only view of the server topology * * @return An unmodifiable map of server-id to list of token status */ Map<String, List> getTopologySnapshot(); /** * Returns the token for the given key. * * @param key The key of the record stored in dynomite * @return Long The token that owns the given key */ Long getTokenForKey(String key); }
ConnectionPoolImpl 实现了 TopologyView,即 implements TopologyView。
所以 ConnectionPoolImpl 本身就是一个 TopologyView。
public class ConnectionPoolImplimplements ConnectionPool, TopologyView { public TokenPoolTopology getTopology() { return selectionStrategy.getTokenPoolTopology(); } @Override public Map<String, List> getTopologySnapshot() { return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens()); } @Override public Long getTokenForKey(String key) { if (cpConfiguration .getLoadBalancingStrategy() == ConnectionPoolConfiguration.LoadBalancingStrategy.TokenAware) { return selectionStrategy.getTokenForKey(key); } return null; } }
在 DynoJedisClient 中获取 TopologyView 就是直接 获取了 ConnectionPoolImpl。
public TopologyView getTopologyView() { return this.getConnPool(); }
所以此时逻辑图上加入了 TopologyView 。
+----------------------+ +-------------------+ |ConnectionPoolImpl | |DynoJedisClient | | | | | | | +--------------+ | | | hostsUpdater +--------> | HostSupplier | | | | | +--------------+ | connPool +---------> | | | | | | +--------------------------+ | TopologyView +------> | cpMap +--------> |[Host, HostConnectionPool]| | | | | | + | +-------------------+ +----------------------+ | | | +--------------------------+ | | +-----------------------------+ | | JedisConnectionFactory | v | | +---------------+-------------------------------------------+ | | createConnectionWithDataStore | HostConnectionPool | | | | | | sslSocketFactory | <------------------------------------------------+ connFactory Host | | | | | | | | LinkedBlockingQueue<Connection<CL<>availableConnections | +-----------------------------+ | | +------------------------------+----------------------------+ + ^ | +----------------------------------------+ | | |JedisConnection | | | | | | | return | | return | | | HostConnectionPoolhostPool | | +---------------> | | +--------------------------------+ | Jedis(shardInfo) jedisClient | | | +----------------------------------------+
手机如下:
10.2 具体实现
TokenPoolTopology 属于 拓扑 的具体实现。
getTopologySnapshot就是return map
。就是得到对应了所有 rack 的 TokenStatus,这就是拓扑。
其实大家仔细想想就可以理解,拓扑不就是 “当前所有机架上分别有哪些东西,这些东西是什么状态" 的一个逻辑集合嘛。
具体定义如下,其核心成员是两个:
- map 可以理解为 rack 作为key,value 是一个list,即 "该 rack 上对应的 token status 被整理成 list";
- rackTokenHostMap 可以理解为 rack 作为 key,value 是一个map,即 "该 rack 上的 token status
这样就有两个不同维度可以分别处理这些 token了。
public class TokenPoolTopology { private final ConcurrentHashMap<String, List> map = new ConcurrentHashMap<String, List>(); private final ConcurrentHashMap<String, Map> rackTokenHostMap = new ConcurrentHashMap<String, Map>(); public ConcurrentHashMap<String, List> getAllTokens() { return map; } public void addToken(String rack, Long token, HostConnectionPool hostPool) { Listlist = map.get(rack); if (list == null) { list = new ArrayList(); map.put(rack, list); } list.add(new TokenStatus(token, hostPool)); } public void addHostToken(String rack, Long token, Host host) { MaptokenHostMap = rackTokenHostMap.get(rack); if (tokenHostMap == null) { tokenHostMap = new HashMap<>(); rackTokenHostMap.put(rack, tokenHostMap); } tokenHostMap.put(token, host); } }
10.3 如何使用
TokenPoolTopology 具体在 ConnectionPoolImpl 和 HostSelectionWithFallback 都有使用。
10.3.1 ConnectionPoolImpl
ConnectionPoolImpl中如下处理,或者直接返回由上层再处理,或者就是直接返回 TokenPoolTopology 之中的所有 token 给上层:
public TokenPoolTopology getTopology() { return selectionStrategy.getTokenPoolTopology(); } public Map<String, List> getTopologySnapshot() { return Collections.unmodifiableMap(selectionStrategy.getTokenPoolTopology().getAllTokens()); }
10.3.2 HostSelectionWithFallback
HostSelectionWithFallback中也有TokenPoolTopology的使用,只是用来 failover/fallback使用。
public class HostSelectionWithFallback{ // Represents the *initial* topology from the token supplier. This does not affect selection of a host connection // pool for traffic. It only affects metrics such as failover/fallback private final AtomicReferencetopology = new AtomicReference<>(null); }
HostSelectionWithFallback中 也利用 host tokens 来建立或者更新已有的 TokenPoolTopology。
/** * Create token pool topology from the host tokens * * @param allHostTokens * @return tokenPoolTopology with the host information */ public TokenPoolTopology createTokenPoolTopology(ListallHostTokens) { TokenPoolTopology topology = new TokenPoolTopology(replicationFactor.get()); for (HostToken hostToken : allHostTokens) { String rack = hostToken.getHost().getRack(); topology.addHostToken(rack, hostToken.getToken(), hostToken.getHost()); } updateTokenPoolTopology(topology); return topology; }
至此,连接管理和拓扑感知部分已经分析完毕,下文将继续分析自动发现和故障转移。
0xFF 参考
http://www.ningoo.net/html/2010/cassandra_token.html
标签:return,private,final,host,源码,DynoJedisClient,new,Dynomite,public 来源: https://blog.51cto.com/u_15179348/2734049