/** {@inheritDoc} */ @Override public void sAdd(String key, String value) { int tries = 0; boolean sucess = false; do { tries++; try { cluster.sadd(key, value); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); }
/** {@inheritDoc} */ @Override public Set<String> sMembers(String key) { int tries = 0; boolean sucess = false; Set<String> retVal = null; do { tries++; try { retVal = cluster.smembers(key); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return retVal; }
/** {@inheritDoc} */ @Override public void zAdd(String key, Object data, double score) { int tries = 0; boolean sucess = false; do { tries++; try { cluster.zadd(key, score, SerializationUtil.serialize(data)); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); }
/** {@inheritDoc} */ @Override public Boolean zExists(String key, Object data) { int tries = 0; boolean sucess = false; boolean retVal = false; do { tries++; try { retVal = (cluster.zscore(key, SerializationUtil.serialize(data)) != null); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return retVal; }
/** {@inheritDoc} */ @Override public Long zRemove(String key, Object... data) { int tries = 0; Long retVal = null; boolean sucess = false; do { tries++; try { retVal = cluster.zrem(key, SerializationUtil.serialize(data)); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return retVal; }
/** {@inheritDoc} */ @Override public Set<Object> zGet(String key, double minScore, double maxScore, int offset, int count) { int tries = 0; boolean sucess = false; Set<String> retVal = null; do { tries++; try { retVal = cluster.zrangeByScore(key, minScore, maxScore, offset, count); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return SerializationUtil.deSerialize(retVal); }
/** {@inheritDoc} */ @Override public Double zScore(String key, Object data) { int tries = 0; boolean sucess = false; Double retVal = null; do { tries++; try { retVal = cluster.zscore(key, SerializationUtil.serialize(data)); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return retVal; }
/** {@inheritDoc} */ @Override public Long zCount(String key) { int tries = 0; boolean sucess = false; Long retVal = null; do { tries++; try { retVal = cluster.zcount(key, 0, Double.MAX_VALUE); sucess = true; } catch (JedisClusterMaxRedirectionsException | JedisConnectionException ex) { log.error(RedisConstants.CONN_FAILED_RETRY_MSG + tries); if (tries == numRetries) { throw ex; } waitforFailover(); } } while (!sucess && tries <= numRetries); return retVal; }
@Test(expected = JedisClusterMaxRedirectionsException.class) public void testRedisClusterMaxRedirections() { Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_CONFIG); int slot51 = JedisClusterCRC16.getSlot("51"); // This will cause an infinite redirection loop node2.clusterSetSlotMigrating(slot51, JedisClusterTestUtil.getNodeId(node3.clusterNodes())); jc.set("51", "foo"); }
@Test(expected = JedisClusterMaxRedirectionsException.class, timeout = DEFAULT_TIMEOUT) public void testReturnConnectionOnRedirection() { Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379)); JedisPoolConfig config = DEFAULT_CONFIG; config.setMaxTotal(1); JedisCluster jc = new JedisCluster(jedisClusterNode, 0, 2, DEFAULT_REDIRECTIONS, "cluster", config); // This will cause an infinite redirection between node 2 and 3 node3.clusterSetSlotMigrating(15363, JedisClusterTestUtil.getNodeId(node2.clusterNodes())); jc.get("e"); }
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) { if (attempts <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // release current connection before recursion releaseConnection(connection); connection = null; if (attempts <= 1) { // We need this because if node is not reachable anymore - we need to finally // initiate slots renewing, // or we can stuck with cluster state without one node in opposite case. // But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed // request. // TODO make tracking of successful/unsuccessful operations for node - do // renewing only // if there were no successful responses from this node last few seconds this.connectionHandler.renewSlotCache(); // no more redirections left, throw original exception, not // JedisClusterMaxRedirectionsException, because it's not MOVED situation throw jce; } return runWithRetries(key, attempts - 1, tryRandomNode, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, attempts - 1, false, asking); } finally { releaseConnection(connection); } }
private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(); } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { JedisClusterMaxRedirectionsException exception = new JedisClusterMaxRedirectionsException("Too many Cluster redirections? key=" + SafeEncoder.encode(key)); //收集 UsefulDataCollector.collectException(exception, "", System.currentTimeMillis(), ClientExceptionType.REDIS_CLUSTER); throw exception; } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }
private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } }