/** * * @param redisConnection redis 连接类 * @param channels 订阅的频道列表 */ public MessageConsumerRedisImpl(RedisConnection redisConnection, String[] channels) { Jedis jedis = null; try { if (channels != null && channels.length > 0) { jedis = redisConnection.getJedis(); jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println("receive " + message + " from " + channel); handleMessage(message); } }, channels); } } catch (Exception e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } }
@Test public void pubSubChannels() { final List<String> expectedActiveChannels = Arrays .asList("testchan1", "testchan2", "testchan3"); jedis.subscribe(new JedisPubSub() { private int count = 0; @Override public void onSubscribe(String channel, int subscribedChannels) { count++; // All channels are subscribed if (count == 3) { Jedis otherJedis = createJedis(); List<String> activeChannels = otherJedis.pubsubChannels("test*"); assertTrue(expectedActiveChannels.containsAll(activeChannels)); unsubscribe(); } } }, "testchan1", "testchan2", "testchan3"); }
@Test public void pubSubNumPat() { jedis.psubscribe(new JedisPubSub() { private int count = 0; @Override public void onPSubscribe(String pattern, int subscribedChannels) { count++; if (count == 3) { Jedis otherJedis = createJedis(); Long numPatterns = otherJedis.pubsubNumPat(); assertEquals(new Long(2l), numPatterns); punsubscribe(); } } }, "test*", "test*", "chan*"); }
@Test public void pubSubNumSub() { final Map<String, String> expectedNumSub = new HashMap<String, String>(); expectedNumSub.put("testchannel2", "1"); expectedNumSub.put("testchannel1", "1"); jedis.subscribe(new JedisPubSub() { private int count = 0; @Override public void onSubscribe(String channel, int subscribedChannels) { count++; if (count == 2) { Jedis otherJedis = createJedis(); Map<String, String> numSub = otherJedis.pubsubNumSub("testchannel1", "testchannel2"); assertEquals(expectedNumSub, numSub); unsubscribe(); } } }, "testchannel1", "testchannel2"); }
public void subscribe(JedisPubSub jedisPubSub, String channel) { Collection<JedisPool> poolCollection = jedisCluster.getClusterNodes().values(); Iterator iterator = poolCollection.iterator(); while(iterator.hasNext()) { JedisPool jedisPool = (JedisPool)iterator.next(); Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(jedisPubSub, new String[]{channel}); } catch (Exception var11) { throw new RuntimeException(var11); } finally { jedisPool.returnResourceObject(jedis); } } }
public void startSubscribe() { // 支持短线重连 new Thread(() -> { while (true) { try { Jedis jedisSub = this.createJedis(); jedisSub.psubscribe(new JedisPubSub() { public void onPMessage(String pattern, String channel, String message) { vertx.eventBus().publish(channel, message); } }, "*"); } catch (Exception e) { e.printStackTrace(); try { Thread.sleep(1000l); } catch (InterruptedException e1) { e1.printStackTrace(); } } } }, "vertx-jedis-pubsub").start(); }
private void createSubscriptionListener() { subscriptions = new JedisPubSub() { @Override public void onMessage(String channel, String message) { Collection<Function<byte[], Boolean>> currentActions = actionMap.get(channel); if(currentActions != null) { for(Function<byte[], Boolean> currentAction : currentActions) { if(!currentAction.apply(toStringCodex.decode(message))) { log.warning("attempt to not ack " + channel + " however this is not supported"); } } } } }; }
@PostConstruct public void init() { new Thread() { @Override public void run() { redisDao.subscribe(new JedisPubSub() { @Override public void onMessage(String pchannel, String message) { logger.info("redis通知,channel={},message={}", pchannel, message); if (channel.equals(pchannel)) { updateCache(); } } }, channel); } }.start(); updateCache(); }
private JedisPubSub setupPubSub() { final JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { RedisSubscriber.this.onMessage(channel, message); } }; new Thread(new Runnable() { @Override public void run() { try { Jedis jedis = SimpleAPI.getInstance().getManager(RedisManager.class).getRedisPool().getResource(); jedis.subscribe(jedisPubSub, channel); jedis.quit(); } catch (Exception e) { e.printStackTrace(); } } }, "subscriberThread").start(); SimpleAPI.getInstance().addOnDisable(() -> jedisPubSub.unsubscribe()); return jedisPubSub; }
@SuppressWarnings("static-access") @Override public void run() { Thread.currentThread().setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread arg0, Throwable arg1) { logger.error("[error]{}:{}",slaveRedis.getIp(), slaveRedis.getPort(), arg1); Cat.logError(arg1); redisStatCheckResult.put(slaveRedis, Boolean.FALSE); if (null != slave) { slave.close(); } } }); logger.debug("[Psubscribe]{}:{}", slaveRedis.getIp(), slaveRedis.getPort()); slave.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String msg) { logger.debug("[OnPMessage]{}:{}", slaveRedis.getIp(), slaveRedis.getPort()); redisStatCheckResult.put(slaveRedis, Boolean.TRUE); } }, generateURL(masterRedis.getIp(), masterRedis.getPort())); }
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) { if (cluster) { jedisCluster.subscribe(jedisPubSub, channels); } else { jedisOperator.subscribe(jedisPubSub, channels); } }
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) { execute(new JedisExecutor<Void>() { @Override Void doInJedis(Jedis jedis) { jedis.subscribe(jedisPubSub, channels); return null; } }); }
@Override public void subscribe(JedisPubSub jedisPubSub, String... channels) { Span span = helper.buildSpan("subscribe"); span.setTag("channels", Arrays.toString(channels)); try { super.subscribe(jedisPubSub, channels); } catch (Exception e) { onError(e, span); throw e; } finally { span.finish(); } }
@Override public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { Span span = helper.buildSpan("psubscribe"); span.setTag("patterns", Arrays.toString(patterns)); try { super.psubscribe(jedisPubSub, patterns); } catch (Exception e) { onError(e, span); throw e; } finally { span.finish(); } }
public void subscribe(JedisPubSub jedisPubSub, String channel) { Jedis jedis = null; try { jedis = pool.getResource(); jedis.subscribe(jedisPubSub, new String[]{channel}); } catch (Exception var8) { throw new RuntimeException(var8); } finally { pool.returnResourceObject(jedis); } }
/** * 订阅给定的一个或多个频道的信息 * * @param jedisPubSub * @param channels */ public void subscribe(final JedisPubSub jedisPubSub, final String... channels) { redisTemplate.execute(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { ((Jedis) connection.getNativeConnection()).subscribe(jedisPubSub, channels); return null; } }); }
/** * 订阅消息<br> */ public void subscribe(JedisPubSub subscriber, String channel) { // 从池中获取一个jedis实例 Jedis jedis = null; try { jedis = getJedis(); jedis.subscribe(subscriber, channel); } finally { // 还会到连接池 if (null != jedis) { jedis.close(); } } }
@Override public void psubscribe(final JedisPubSub jedisPubSub, final String... patterns) { this.execute(new Invoker() { @Override public Object execute(Jedis jedis) { jedis.psubscribe(jedisPubSub, patterns); return null; } }); }
@Override public void subscribe(final JedisPubSub jedisPubSub, final String... channels) { this.execute(new Invoker() { @Override public Object execute(Jedis jedis) { jedis.subscribe(jedisPubSub, channels); return null; } }); }
@Override public Long publish(String channel, String message) { JedisPubSub jedisPubSub = this.channelMap.get(channel); if (jedisPubSub != null) { jedisPubSub.onMessage(channel, message); } return 1L; }
@Test public void publish() { // Assert.noimpl(redis).publish("channel", "message"); JedisPubSub jedisPubSub = Mockito.mock(JedisPubSub.class); redis.subscribe(jedisPubSub, "channel"); redis.publish("channel", "message"); Mockito.verify(jedisPubSub).onMessage("channel", "message"); }
public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 7) { String slaveHost = switchMasterMsg[2]; int slavePort = Integer.valueOf(switchMasterMsg[3]); HostAndPort hap = new HostAndPort(slaveHost, slavePort); insertReadPool(hap); } else { log.warn("Invalid message received on Sentinel " + host + ":" + port + " on channel " + channel + ": " + message); } } }, "-sdown", "+slave"); } catch (JedisConnectionException e) { runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis); } } }
public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 7) { String slaveHost = switchMasterMsg[2]; int slavePort = Integer.valueOf(switchMasterMsg[3]); HostAndPort hap = new HostAndPort(slaveHost, slavePort); removeFromReadPool(hap); } else if (switchMasterMsg.length != 4) { // master +sdown length=4 log.error("Invalid message received on Sentinel " + host + ":" + port + " on channel +sdown: " + message); } } }, "+sdown"); } catch (JedisConnectionException e) { runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis); } } }
private JedisPubSub jedisConsume() { return new JedisPubSub() { @Override public void onMessage(String channel, String message) { consumedMessages.add(message); super.onMessage(channel, message); } }; }
@Override public void subscribe(JedisPubSub jedisPubSub, String... channels) { redis.clients.jedis.Jedis delegate = pool.getResource(); try { delegate.subscribe(jedisPubSub, channels); } finally { pool.returnResource(delegate); } }
@Override public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { redis.clients.jedis.Jedis delegate = pool.getResource(); try { delegate.psubscribe(jedisPubSub, patterns); } finally { pool.returnResource(delegate); } }
/** * 订阅给定的一个频道的信息。 * * @param jedisPubSub 监听器 * @param channel 频道 */ public void subscribe(final JedisPubSub jedisPubSub, final String channel) { new Executor<Object>(shardedJedisPool) { @Override Object execute() { Jedis _jedis = jedis.getShard(channel); // 注意subscribe是一个阻塞操作,因为当前线程要轮询Redis的响应然后调用subscribe _jedis.subscribe(jedisPubSub, channel); return null; } }.getResult(); }
public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { // double check that it is not being shutdown if (!running.get()) { break; } j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 3) { if (masterName.equals(switchMasterMsg[0])) { initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); } else { log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); } } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); } } }, "+switch-master"); } catch (JedisConnectionException e) { if (running.get()) { log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying.", e); try { Thread.sleep(subscribeRetryWaitTimeMillis); } catch (InterruptedException e1) { log.log(Level.SEVERE, "Sleep interrupted: ", e1); } } else { log.fine("Unsubscribing from Sentinel at " + host + ":" + port); } } finally { j.close(); } } }
@Test(expected = JedisConnectionException.class) public void unsubscribeWhenNotSusbscribed() throws InterruptedException { JedisPubSub pubsub = new JedisPubSub() { }; pubsub.unsubscribe(); }
@Test(expected = JedisConnectionException.class) public void handleClientOutputBufferLimitForSubscribeTooSlow() throws InterruptedException { final Jedis j = createJedis(); final AtomicBoolean exit = new AtomicBoolean(false); final Thread t = new Thread(new Runnable() { public void run() { try { // we already set jedis1 config to // client-output-buffer-limit pubsub 256k 128k 5 // it means if subscriber delayed to receive over 256k or // 128k continuously 5 sec, // redis disconnects subscriber // we publish over 100M data for making situation for exceed // client-output-buffer-limit String veryLargeString = makeLargeString(10485760); // 10M * 10 = 100M for (int i = 0; i < 10 && !exit.get(); i++) { j.publish("foo", veryLargeString); } j.disconnect(); } catch (Exception ex) { } } }); t.start(); try { jedis.subscribe(new JedisPubSub() { public void onMessage(String channel, String message) { try { // wait 0.5 secs to slow down subscribe and // client-output-buffer exceed // System.out.println("channel - " + channel + // " / message - " + message); Thread.sleep(100); } catch (Exception e) { try { t.join(); } catch (InterruptedException e1) { } fail(e.getMessage()); } } }, "foo"); } finally { // exit the publisher thread. if exception is thrown, thread might // still keep publishing things. exit.set(true); if (t.isAlive()) { t.join(); } } }
public void subscribe(JedisPubSub jedisPubSub, String channel) { }