public class JedisConnectionWrapper implements InvocationHandler {
private static final String CLOSE = "close";
private static final String HASH_CODE = "hashCode";
private static final String EQUALS = "equals";
private static final String MULTI = "multi";
private static final String EXEC = "exec";
private JedisClusterConnectionHandler connectionHandler;
static Method METHOD_GETCONNECTION = null;
static Method METHOD_GETCONNECTIONFROMSLOT = null;
static Field FIELD_CONNECTIONHANDLER = null;
private ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();
private int redirections;
static {
METHOD_GETCONNECTION = ReflectionUtils.findMethod(JedisClusterConnectionHandler.class, "getConnection");
METHOD_GETCONNECTION.setAccessible(true);
METHOD_GETCONNECTIONFROMSLOT = ReflectionUtils.findMethod(JedisClusterConnectionHandler.class,
"getConnectionFromSlot", int.class);
METHOD_GETCONNECTIONFROMSLOT.setAccessible(true);
FIELD_CONNECTIONHANDLER = ReflectionUtils.findField(JedisCluster.class, "connectionHandler");
FIELD_CONNECTIONHANDLER.setAccessible(true);
}
@SuppressWarnings("unchecked")
public JedisConnectionWrapper(JedisCluster jedisCluster, int redirections) {
this.redirections = redirections;
connectionHandler = (JedisClusterConnectionHandler) ReflectionUtils.getField(FIELD_CONNECTIONHANDLER,
jedisCluster);
Assert.notNull(connectionHandler, "connectionHandler 获取失败");
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals(EQUALS)) {
return (proxy == args[0]);
} else if (method.getName().equals(HASH_CODE)) {
return System.identityHashCode(proxy);
} else if (method.getName().equals(CLOSE)) {
return null;
} else if (method.getName().equals(MULTI)) {
log.warn("集群不支持事务,所以跳过:{}", MULTI);
return null;
} else if (method.getName().equals(EXEC)) {
log.warn("集群不支持事务,所以跳过:{}", EXEC);
return null;
}
Object retVal = runWithRetries(method, args, redirections, false, false);
return retVal;
}
private Object runWithRetries(Method method, Object[] args, int redirections, boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Map<String, JedisPool> map = connectionHandler.getNodes();
Assert.notEmpty(map, "没有可用的连接资源");
Jedis connection = null;
JedisConnection jconnection = null;
JedisConnection target;
try {
if (asking) {
connection = askConnection.get();
connection.asking();
asking = false;
} else {
connection = (Jedis) METHOD_GETCONNECTION.invoke(connectionHandler);
}
Client client = connection.getClient();
log.trace("redis node 当前值:" + client.getHost() + "------" + client.getPort());
jconnection = new JedisConnection(connection);
target = jconnection;
Object result = method.invoke(target, args);
String commandName = method.getName();
log.debug("exec command:{} @{}:{}", commandName, client.getHost(), client.getPort());
return result;
} catch (Exception e) {
releaseConnection(connection);
connection = null;
try {
Throwable t = e.getCause();
if (e instanceof InvocationTargetException) {
t = ((InvocationTargetException) e).getTargetException();
if (t instanceof NestedRuntimeException) {
t = t.getCause();
}
}
throw t;
} catch (JedisRedirectionException jre) {
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
HostAndPort node = jre.getTargetNode();
log.trace("redis node 期望值:" + node.getHost() + "---EEEE---" + node.getPort());
askConnection.set(this.connectionHandler.getConnectionFromNode(node));
} else if (jre instanceof JedisMovedDataException) {
this.connectionHandler.renewSlotCache();
asking = true;
HostAndPort node = jre.getTargetNode();
log.trace("redis node 期望值:" + node.getHost() + "---BBBBB--" + node.getPort());
askConnection.set(this.connectionHandler.getConnectionFromNode(node));
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(method, args, redirections - 1, false, asking);
} catch (Throwable t) {
e.printStackTrace();
}
} finally {
releaseConnection(connection);
}
return null;
}