@Test public void testMetrics() { MetricRegistry metrics = mock(MetricRegistry.class); Client client = mock(Client.class); when(client.getHost()).thenReturn("myhost"); when(jedis.getClient()).thenReturn(client); when(pool.getNumActive()).thenReturn(1); when(pool.getNumIdle()).thenReturn(2); when(pool.getNumWaiters()).thenReturn(3); rf.startMonitoring(metrics); @SuppressWarnings("rawtypes") ArgumentCaptor<Gauge> gauge = ArgumentCaptor.forClass(Gauge.class); verify(metrics).register(eq("com.amadeus.session.redis.myhost.active"), gauge.capture()); verify(metrics).register(eq("com.amadeus.session.redis.myhost.idle"), gauge.capture()); verify(metrics).register(eq("com.amadeus.session.redis.myhost.waiting"), gauge.capture()); assertEquals(1, gauge.getAllValues().get(0).getValue()); assertEquals(2, gauge.getAllValues().get(1).getValue()); assertEquals(3, gauge.getAllValues().get(2).getValue()); }
@Override public Collection<Jedis> getAllShards() { Jedis jedis = new Jedis() { @Override public Client getClient() { return new Client() { @Override public boolean isBroken() { return true; } }; } }; return Collections.singletonList(jedis); }
@Test public void linsert() { long status = jedis.linsert("foo", Client.LIST_POSITION.BEFORE, "bar", "car"); assertEquals(0, status); jedis.lpush("foo", "a"); status = jedis.linsert("foo", Client.LIST_POSITION.AFTER, "a", "b"); assertEquals(2, status); List<String> actual = jedis.lrange("foo", 0, 100); List<String> expected = new ArrayList<String>(); expected.add("a"); expected.add("b"); assertEquals(expected, actual); status = jedis.linsert("foo", Client.LIST_POSITION.BEFORE, "bar", "car"); assertEquals(-1, status); // Binary long bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.BEFORE, bbar, bcar); assertEquals(0, bstatus); jedis.lpush(bfoo, bA); bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.AFTER, bA, bB); assertEquals(2, bstatus); List<byte[]> bactual = jedis.lrange(bfoo, 0, 100); List<byte[]> bexpected = new ArrayList<byte[]>(); bexpected.add(bA); bexpected.add(bB); assertByteArrayListEquals(bexpected, bactual); bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.BEFORE, bbar, bcar); assertEquals(-1, bstatus); }
@Override public Client getClient() { Span span = helper.buildSpan("getClient"); try { return super.getClient(); } catch (Exception e) { onError(e, span); throw e; } finally { span.finish(); } }
private void disconnect() { // hack in to get the private client since disconnect() isn't in the interface Field field = ReflectionUtils.findField(pipeline.getClass(), "client"); field.setAccessible(true); try { Client client = (Client)field.get(pipeline); client.disconnect(); } catch (Exception e) { ReflectionUtils.handleReflectionException(e); } }
/** * 当出现异常的时候概率性自动重连,如果连接池内没有可用连接则关闭连接 */ @Override public void close() { // 获取所有服务端 Collection<Jedis> allShards = shardedJedis.getAllShards(); // 根据reConnectPercent的配置概率自动重连 Random random = new Random(); for (Jedis jedis : allShards) { Client client = jedis.getClient(); // 判断redis是否可以连接 if(client.isBroken()) { if(random.nextInt(100) < reConnectPercent) { // 设置重连超时毫秒值 client.setConnectionTimeout(reConnectTimeout); // 先断开连接 jedis.disconnect(); try { // 尝试重新连接 jedis.connect(); // System.out.println(host+":"+port+"重新连接成功==="); }catch(Exception e1) { // System.out.println(host+":"+port+"重新连接失败==="); } } }else{ // System.out.println(host+":"+port+"连接正常==="); } } }
@Test public void linsert() { long status = jedis.linsert("foo", Client.LIST_POSITION.BEFORE, "bar", "car"); assertEquals(0, status); jedis.lpush("foo", "a"); status = jedis.linsert("foo", Client.LIST_POSITION.AFTER, "a", "b"); assertEquals(2, status); List<String> actual = jedis.lrange("foo", 0, 100); List<String> expected = new ArrayList<String>(); expected.add("a"); expected.add("b"); assertEquals(expected, actual); status = jedis.linsert("foo", Client.LIST_POSITION.BEFORE, "bar", "car"); assertEquals(-1, status); // Binary long bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.BEFORE, bbar, bcar); assertEquals(0, bstatus); jedis.lpush(bfoo, bA); bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.AFTER, bA, bB); assertEquals(2, bstatus); List<byte[]> bactual = jedis.lrange(bfoo, 0, 100); List<byte[]> bexpected = new ArrayList<byte[]>(); bexpected.add(bA); bexpected.add(bB); assertEquals(bexpected, bactual); bstatus = jedis.linsert(bfoo, Client.LIST_POSITION.BEFORE, bbar, bcar); assertEquals(-1, bstatus); }
@SuppressWarnings("static-access") @Override public void execute() { Boolean inUse = RedisConstants.isInUse(); if (!inUse) { return; } Properties is = PropertiesUtil.getProperties("redis"); String host = is.getProperty(database + ".host"); GenericObjectPoolConfig config = new JedisPoolConfig(); // 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 config.setMaxTotal(Integer.parseInt(is.getProperty(database + ".maxActive", MAXACTIVE))); config.setMaxIdle(Integer.parseInt(is.getProperty(database + ".maxIdle", MAXIDLE))); config.setMaxWaitMillis(Long.parseLong(is.getProperty(database + ".maxWait", MAXWAIT))); // 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的; String password = is.getProperty(database + ".password", null); // config.setTestOnBorrow(configurationManager.getRedisTestOnBorrow()); String masterName = is.getProperty(database + ".masterName", "mymaster"); JedisSentinelPoolExt pool = new JedisSentinelPoolExt(masterName, host, config, password, TIMEOUT, 3, true); Redis jedis = new Redis(pool); while (true) { try { Jedis writeJedis = jedis.getTemplate().getWriteJedis(); Client client = writeJedis.getClient(); this.proceedWithPatterns(client, "__key*__:*"); Thread.currentThread().sleep(3000); client.close(); } catch (JedisConnectionException | InterruptedException e) { log.error(e.getMessage(), e); } } }
@Override public void returnConnection(Jedis conn) { Client client = conn.getClient(); if (client.isBroken()) super.invalidateResource(conn); else super.returnResource(conn); }
@Override public void returnConnection(Jedis conn) { Client client = conn.getClient(); if (client.isBroken()) this.invalidateResource(conn); else this.returnResource(conn); }
private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。 // 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同说明所有的client都已经添加,就不用再调用add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } }
@Override protected Client getClient(String key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; }
public JedisMock(String host, int port) { super(host, port); client = mock(Client.class); // for 'get' command when(client.isInMulti()).thenReturn(false); when(client.getBulkReply()).thenReturn("bar"); when(client.getBinaryBulkReply()).thenReturn("bar".getBytes()); }
@Override public Client getClient() { redis.clients.jedis.Jedis delegate = pool.getResource(); try { return delegate.getClient(); } finally { pool.returnResource(delegate); } }
@Test public void test_json() throws IOException { Settings.Builder builder = ImmutableSettings.builder().put("redis.response.set", "json") .put("redis.response.del", "json"); client().admin().cluster().prepareUpdateSettings().setTransientSettings(builder).get(); JedisPool pool = new JedisPool(new GenericObjectPool.Config(), "localhost", 6379, 1000000); Jedis jedis = pool.getResource(); // test set String setResult = jedis.set("/test/person/1", jsonBuilder().startObject().field("test", "value").endObject().string()); assertThat(setResult, containsString("\"created\":true")); assertThat(setResult, containsString("\"_index\":\"test\"")); assertThat(setResult, containsString("\"_type\":\"person\"")); assertThat(setResult, containsString("\"_id\":\"1\"")); Client jedisClient = jedis.getClient(); // test delete jedisClient.del("/test/person/1"); String deleteResult = jedisClient.getBulkReply(); assertThat(deleteResult, containsString("\"found\":true")); assertThat(deleteResult, containsString("\"_index\":\"test\"")); assertThat(deleteResult, containsString("\"_type\":\"person\"")); assertThat(deleteResult, containsString("\"_id\":\"1\"")); }
private boolean validate(final Object target, final Object[] args) { if (args == null || args.length == 0 || args[0] == null) { if (isDebug) { logger.debug("Invalid arguments. Null or not found args({}).", args); } return false; } if (!(args[0] instanceof Client)) { if (isDebug) { logger.debug("Invalid arguments. Expect Client but args[0]({}).", args[0]); } return false; } if (!(args[0] instanceof EndPointAccessor)) { if (isDebug) { logger.debug("Invalid args[0] object. Need field accessor({}).", EndPointAccessor.class.getName()); } return false; } if (!(target instanceof EndPointAccessor)) { if (isDebug) { logger.debug("Invalid target object. Need field accessor({}).", EndPointAccessor.class.getName()); } return false; } return true; }
@Override public Client getClient() { Client client = jedis.getClient(); System.out.println("client:" + client); return client; }
@Override public Client getClient() { return null; }
@Override protected Client getClient(byte[] key) { return this.getClient(SafeEncoder.encode(key)); }
@Override public Client getClient() { throw new FakeJedisNotImplementedException(); }
@Override protected Client getClient(String key) { throw new FakeJedisNotImplementedException(); }
@Override protected Client getClient(byte[] key) { throw new FakeJedisNotImplementedException(); }
/** * <pre> * 返回格式 * host:port * </pre> */ private static String toServerString(Jedis jedis) { final Client client = jedis.getClient(); return client.getHost() + ':' + client.getPort(); }
/** * @param client * @see redis.clients.jedis.Pipeline#setClient(redis.clients.jedis.Client) */ public void setClient(Client client) { delegate.setClient(client); }
/** * @return * @see redis.clients.jedis.BinaryJedis#getClient() */ public abstract Client getClient();