public RedisPubSubSync(final CacheTemplate cacheTemplate) { this.cacheTemplate = cacheTemplate; this.cacheSyncHandler = new CacheSyncHandler(this.cacheTemplate); this.jedisTemplate = cacheTemplate.getJedisTemplate(); new Thread(new Runnable() { @Override public void run() { jedisTemplate.subscribe(new BinaryJedisPubSub() { @Override public void onMessage(byte[] channel, byte[] message) { Command cmd = jedisTemplate.deserializeVal(message); if (cmd != null) { if ((cmd.src != null && !cmd.src.equals(Cache.ID)) || cmd.oper == Command.OPT_FETCH) { logger.debug("recieve from " + cmd.src + " > " + cmd.toString()); cacheSyncHandler.handle(cmd); } } } }, Cache.CACHE_STORE_SYNC); } }, "RedisPubSubSync.Subscribe").start(); }
public JbootEhredisCacheImpl() { this.ehcacheImpl = new JbootEhcacheImpl(); this.ehcacheImpl.setCacheEventListener(this); this.redisCacheImpl = new JbootRedisCacheImpl(); this.clientId = StringUtils.uuid(); this.serializer = Jboot.me().getSerializer(); this.redis = redisCacheImpl.getRedis(); this.redis.subscribe(new BinaryJedisPubSub() { @Override public void onMessage(byte[] channel, byte[] message) { JbootEhredisCacheImpl.this.onMessage((String) serializer.deserialize(channel), serializer.deserialize(message)); } }, serializer.serialize(channel)); }
@Override protected ClosureExSet createClosureExSet(final Object event) { ClosureExSet set = super.createClosureExSet(event); new Thread(new Runnable() { @Override public void run() { // 这里会阻塞 // TODO 是否需要把 BinaryJedisPubSub 缓存?做unsubscribe操作 jedisCluster.subscribe(new BinaryJedisPubSub() { @Override public void onMessage(byte[] channel, byte[] message) { Object evt = deserialize(channel); Object[] args = deserialize(message); // 这里一定要call父类的方法 RedisEventBus.super.doFireEvent(evt, args); } }, serialize(event)); } }, "RedisSubscriber" + threadIndex.incrementAndGet() + "-thread-").start(); return set; }
public void subscribe(final BinaryJedisPubSub jedisPubSub, final String... channels) { if (cluster) { jedisCluster.subscribe(jedisPubSub, convertObjectArrayToByteArray_serializeKey(channels)); } else { jedisOperator.subscribe(jedisPubSub, convertObjectArrayToByteArray_serializeKey(channels)); } }
public void subscribe(final BinaryJedisPubSub jedisPubSub, final byte[]... channels) { execute(new JedisExecutor<Void>() { @Override Void doInJedis(Jedis jedis) { jedis.subscribe(jedisPubSub, channels); return null; } }); }
public JbootRedismqImpl() { initChannels(); JbootmqRedisConfig redisConfig = Jboot.config(JbootmqRedisConfig.class); if (redisConfig.isConfigOk()) { redis = JbootRedisManager.me().getRedis(redisConfig); } else { redis = Jboot.me().getRedis(); } if (redis == null) { throw new JbootIllegalConfigException("can not use redis mq (redis mq is default), " + "please config jboot.redis.host=yourhost and check your jboot.properties, " + "or use other mq component. "); } Object[] channels = this.channels.toArray(); redis.subscribe(new BinaryJedisPubSub() { @Override public void onMessage(byte[] channel, byte[] message) { notifyListeners(redis.bytesToKey(channel), Jboot.me().getSerializer().deserialize(message)); } }, redis.keysToBytesArray(channels)); dequeueThread = new Thread(this); dequeueThread.start(); }
@Override public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { Span span = helper.buildSpan("subscribe"); span.setTag("channels", Arrays.toString(channels)); try { super.subscribe(jedisPubSub, channels); } catch (Exception e) { onError(e, span); throw e; } finally { span.finish(); } }
@Override public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { Span span = helper.buildSpan("psubscribe"); span.setTag("patterns", Arrays.toString(patterns)); try { super.psubscribe(jedisPubSub, patterns); } catch (Exception e) { onError(e, span); throw e; } finally { span.finish(); } }
@Test public void testPsubscribe() { BinaryJedisPubSub listener = mock(BinaryJedisPubSub.class); RedisFacade.RedisPubSub facadeListener = mock(RedisFacade.RedisPubSub.class); when(facadeListener.getLinked()).thenReturn(listener); String pattern = "test"; rf.psubscribe(facadeListener, pattern); ArgumentCaptor<BinaryJedisPubSub> capture = ArgumentCaptor.forClass(BinaryJedisPubSub.class); verify(facadeListener).link(capture.capture()); verify(jedis).psubscribe(capture.getValue(), SafeEncoder.encode(pattern)); }
@Test public void testPsubscribe() { BinaryJedisPubSub listener = mock(BinaryJedisPubSub.class); RedisFacade.RedisPubSub facadeListener = mock(RedisFacade.RedisPubSub.class); when(facadeListener.getLinked()).thenReturn(listener); String pattern = "test"; rf.psubscribe(facadeListener, pattern); ArgumentCaptor<BinaryJedisPubSub> capture = ArgumentCaptor.forClass(BinaryJedisPubSub.class); verify(facadeListener).link(capture.capture()); verify(jedisCluster).psubscribe(capture.getValue(), SafeEncoder.encode(pattern)); }
@Override public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { redis.clients.jedis.Jedis delegate = pool.getResource(); try { delegate.subscribe(jedisPubSub, channels); } finally { pool.returnResource(delegate); } }
@Override public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { redis.clients.jedis.Jedis delegate = pool.getResource(); try { delegate.psubscribe(jedisPubSub, patterns); } finally { pool.returnResource(delegate); } }
@Override public void punsubscribe(final RedisPubSub listener, byte[] pattern) { ((BinaryJedisPubSub) listener.getLinked()).punsubscribe(pattern); }
@Override public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { }
private BinaryJedisPubSub createResultListener() { JedisHelper.BinaryCallBackPubSub pubSub = new JedisHelper.BinaryCallBackPubSub(); pubSub.addOnMessageListener(this::onMessage); return pubSub; }
private BinaryJedisPubSub createCommandListener() { JedisHelper.BinaryCallBackPubSub pubSub = new JedisHelper.BinaryCallBackPubSub(); listener = pubSub; pubSub.addOnMessageListener(this::onCommandMessage); return pubSub; }
@Override public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { throw new FakeJedisNotImplementedException(); }
@Override public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { throw new FakeJedisNotImplementedException(); }
@Override public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) { super.subscribe(jedisPubSub, channels); }
@Override public void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns) { super.psubscribe(jedisPubSub, patterns); }
/** * 订阅 * * @param binaryListener * @param channels */ public void subscribe(BinaryJedisPubSub binaryListener, final byte[]... channels);
/** * @param jedisPubSub * @param channels * @see redis.clients.jedis.BinaryJedis#subscribe(redis.clients.jedis.BinaryJedisPubSub, * byte[][]) */ public abstract void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels);
/** * @param jedisPubSub * @param patterns * @see redis.clients.jedis.BinaryJedis#psubscribe(redis.clients.jedis.BinaryJedisPubSub, * byte[][]) */ public abstract void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns);
void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels);
void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns);