Java 类redis.clients.jedis.JedisPubSub 实例源码

项目:redis_util    文件:MessageConsumerRedisImpl.java   
/**
 *
 * @param redisConnection redis 连接类
 * @param channels 订阅的频道列表
 */
public MessageConsumerRedisImpl(RedisConnection redisConnection, String[] channels) {
    Jedis jedis = null;
    try {
        if (channels != null && channels.length > 0) {
            jedis = redisConnection.getJedis();
            jedis.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    System.out.println("receive " + message + " from " + channel);
                    handleMessage(message);
                }
            }, channels);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (jedis != null) {
            jedis.close();
        }
    }
}
项目:JRediClients    文件:PublishSubscribeCommandsTest.java   
@Test
public void pubSubChannels() {
  final List<String> expectedActiveChannels = Arrays
      .asList("testchan1", "testchan2", "testchan3");
  jedis.subscribe(new JedisPubSub() {
    private int count = 0;

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
      count++;
      // All channels are subscribed
      if (count == 3) {
        Jedis otherJedis = createJedis();
        List<String> activeChannels = otherJedis.pubsubChannels("test*");
        assertTrue(expectedActiveChannels.containsAll(activeChannels));
        unsubscribe();
      }
    }
  }, "testchan1", "testchan2", "testchan3");
}
项目:JRediClients    文件:PublishSubscribeCommandsTest.java   
@Test
public void pubSubNumPat() {
  jedis.psubscribe(new JedisPubSub() {
    private int count = 0;

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
      count++;
      if (count == 3) {
        Jedis otherJedis = createJedis();
        Long numPatterns = otherJedis.pubsubNumPat();
        assertEquals(new Long(2l), numPatterns);
        punsubscribe();
      }
    }

  }, "test*", "test*", "chan*");
}
项目:JRediClients    文件:PublishSubscribeCommandsTest.java   
@Test
public void pubSubNumSub() {
  final Map<String, String> expectedNumSub = new HashMap<String, String>();
  expectedNumSub.put("testchannel2", "1");
  expectedNumSub.put("testchannel1", "1");
  jedis.subscribe(new JedisPubSub() {
    private int count = 0;

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
      count++;
      if (count == 2) {
        Jedis otherJedis = createJedis();
        Map<String, String> numSub = otherJedis.pubsubNumSub("testchannel1", "testchannel2");
        assertEquals(expectedNumSub, numSub);
        unsubscribe();
      }
    }
  }, "testchannel1", "testchannel2");
}
项目:bitstd    文件:RedisCacheWithCluster.java   
public void subscribe(JedisPubSub jedisPubSub, String channel) {
    Collection<JedisPool> poolCollection = jedisCluster.getClusterNodes().values();
    Iterator iterator = poolCollection.iterator();

    while(iterator.hasNext()) {
        JedisPool jedisPool = (JedisPool)iterator.next();
        Jedis jedis = null;

        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(jedisPubSub, new String[]{channel});
        } catch (Exception var11) {
            throw new RuntimeException(var11);
        } finally {
            jedisPool.returnResourceObject(jedis);
        }
    }

}
项目:ninja-vertx-standalone    文件:PubSub.java   
public void startSubscribe() {

        // 支持短线重连
        new Thread(() -> {
            while (true) {
                try {
                    Jedis jedisSub = this.createJedis();
                    jedisSub.psubscribe(new JedisPubSub() {
                        public void onPMessage(String pattern, String channel, String message) {
                            vertx.eventBus().publish(channel, message);
                        }
                    }, "*");
                } catch (Exception e) {
                    e.printStackTrace();
                    try {
                        Thread.sleep(1000l);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }, "vertx-jedis-pubsub").start();
    }
项目:miniature-queue    文件:JedisServerImplementation.java   
private void createSubscriptionListener() {
    subscriptions = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            Collection<Function<byte[], Boolean>> currentActions = actionMap.get(channel);
            if(currentActions != null) {
                for(Function<byte[], Boolean> currentAction : currentActions) {
                    if(!currentAction.apply(toStringCodex.decode(message))) {
                        log.warning("attempt to not ack " + channel + " however this is not supported");
                    }
                }
            }

        }
    };
}
项目:Liudao    文件:ConfigService.java   
@PostConstruct
public void init() {
    new Thread() {
        @Override
        public void run() {
            redisDao.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String pchannel, String message) {
                    logger.info("redis通知,channel={},message={}", pchannel, message);
                    if (channel.equals(pchannel)) {
                        updateCache();
                    }
                }
            }, channel);
        }
    }.start();

    updateCache();
}
项目:Liudao    文件:BlackListService.java   
@PostConstruct
public void init() {
    new Thread() {
        @Override
        public void run() {
            redisDao.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String pchannel, String message) {
                    logger.info("redis通知,channel={},message={}", pchannel, message);
                    if (channel.equals(pchannel)) {
                        updateCache();
                    }
                }
            }, channel);
        }
    }.start();

    updateCache();
}
项目:SimpleAPI    文件:RedisSubscriber.java   
private JedisPubSub setupPubSub() {
    final JedisPubSub jedisPubSub = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            RedisSubscriber.this.onMessage(channel, message);
        }
    };
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Jedis jedis = SimpleAPI.getInstance().getManager(RedisManager.class).getRedisPool().getResource();
                jedis.subscribe(jedisPubSub, channel);
                jedis.quit();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }, "subscriberThread").start();
    SimpleAPI.getInstance().addOnDisable(() -> jedisPubSub.unsubscribe());
    return jedisPubSub;
}
项目:x-pipe    文件:StandaloneStatMonitor.java   
@SuppressWarnings("static-access")
@Override
public void run() {
    Thread.currentThread().setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread arg0, Throwable arg1) {
            logger.error("[error]{}:{}",slaveRedis.getIp(), slaveRedis.getPort(), arg1);
            Cat.logError(arg1);
            redisStatCheckResult.put(slaveRedis, Boolean.FALSE);
            if (null != slave) {
                slave.close();
            }
        }
    });

    logger.debug("[Psubscribe]{}:{}", slaveRedis.getIp(), slaveRedis.getPort());
    slave.psubscribe(new JedisPubSub() {
        @Override
        public void onPMessage(String pattern, String channel, String msg) {
            logger.debug("[OnPMessage]{}:{}", slaveRedis.getIp(), slaveRedis.getPort());
            redisStatCheckResult.put(slaveRedis, Boolean.TRUE);
        }
    }, generateURL(masterRedis.getIp(), masterRedis.getPort()));
}
项目:smart-cache    文件:JedisTemplate.java   
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
    if (cluster) {
        jedisCluster.subscribe(jedisPubSub, channels);
    } else {
        jedisOperator.subscribe(jedisPubSub, channels);
    }
}
项目:smart-cache    文件:JedisOperator.java   
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
    execute(new JedisExecutor<Void>() {
        @Override
        Void doInJedis(Jedis jedis) {
            jedis.subscribe(jedisPubSub, channels);
            return null;
        }
    });
}
项目:java-redis-client    文件:TracingJedisCluster.java   
@Override
public void subscribe(JedisPubSub jedisPubSub, String... channels) {
  Span span = helper.buildSpan("subscribe");
  span.setTag("channels", Arrays.toString(channels));
  try {
    super.subscribe(jedisPubSub, channels);
  } catch (Exception e) {
    onError(e, span);
    throw e;
  } finally {
    span.finish();
  }
}
项目:java-redis-client    文件:TracingJedisCluster.java   
@Override
public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
  Span span = helper.buildSpan("psubscribe");
  span.setTag("patterns", Arrays.toString(patterns));
  try {
    super.psubscribe(jedisPubSub, patterns);
  } catch (Exception e) {
    onError(e, span);
    throw e;
  } finally {
    span.finish();
  }
}
项目:java-redis-client    文件:TracingJedis.java   
@Override
public void subscribe(JedisPubSub jedisPubSub, String... channels) {
  Span span = helper.buildSpan("subscribe");
  span.setTag("channels", Arrays.toString(channels));
  try {
    super.subscribe(jedisPubSub, channels);
  } catch (Exception e) {
    onError(e, span);
    throw e;
  } finally {
    span.finish();
  }
}
项目:java-redis-client    文件:TracingJedis.java   
@Override
public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
  Span span = helper.buildSpan("psubscribe");
  span.setTag("patterns", Arrays.toString(patterns));
  try {
    super.psubscribe(jedisPubSub, patterns);
  } catch (Exception e) {
    onError(e, span);
    throw e;
  } finally {
    span.finish();
  }
}
项目:bitstd    文件:RedisCacheWithoutCluster.java   
public void subscribe(JedisPubSub jedisPubSub, String channel) {
    Jedis jedis = null;

    try {
        jedis = pool.getResource();
        jedis.subscribe(jedisPubSub, new String[]{channel});
    } catch (Exception var8) {
        throw new RuntimeException(var8);
    } finally {
        pool.returnResourceObject(jedis);
    }

}
项目:Liudao    文件:RedisDao.java   
/**
 * 订阅给定的一个或多个频道的信息
 *
 * @param jedisPubSub
 * @param channels
 */
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
    redisTemplate.execute(new RedisCallback<Object>() {
        @Override
        public Object doInRedis(RedisConnection connection) throws DataAccessException {
            ((Jedis) connection.getNativeConnection()).subscribe(jedisPubSub, channels);
            return null;
        }
    });
}
项目:codePay    文件:JedisUtil.java   
/**
 * 订阅消息<br>
 */
public void subscribe(JedisPubSub subscriber, String channel) {
    // 从池中获取一个jedis实例
    Jedis jedis = null;
    try {
        jedis = getJedis();
        jedis.subscribe(subscriber, channel);
    } finally {
        // 还会到连接池
        if (null != jedis) {
            jedis.close();
        }
    }
}
项目:leopard    文件:RedisImpl.java   
@Override
public void psubscribe(final JedisPubSub jedisPubSub, final String... patterns) {
    this.execute(new Invoker() {
        @Override
        public Object execute(Jedis jedis) {
            jedis.psubscribe(jedisPubSub, patterns);
            return null;
        }
    });
}
项目:leopard    文件:RedisImpl.java   
@Override
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
    this.execute(new Invoker() {
        @Override
        public Object execute(Jedis jedis) {
            jedis.subscribe(jedisPubSub, channels);
            return null;
        }
    });
}
项目:leopard    文件:RedisMemoryImpl.java   
@Override
public Long publish(String channel, String message) {
    JedisPubSub jedisPubSub = this.channelMap.get(channel);
    if (jedisPubSub != null) {
        jedisPubSub.onMessage(channel, message);
    }
    return 1L;
}
项目:leopard    文件:RedisMemoryImplTest.java   
@Test
public void publish() {
    // Assert.noimpl(redis).publish("channel", "message");
    JedisPubSub jedisPubSub = Mockito.mock(JedisPubSub.class);
    redis.subscribe(jedisPubSub, "channel");
    redis.publish("channel", "message");
    Mockito.verify(jedisPubSub).onMessage("channel", "message");
}
项目:jahhan    文件:JedisSentinelPoolExt.java   
public void run() {
    running.set(true);

    while (running.get()) {

        j = new Jedis(host, port);

        try {
            j.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + ".");

                    String[] switchMasterMsg = message.split(" ");

                    if (switchMasterMsg.length > 7) {
                        String slaveHost = switchMasterMsg[2];
                        int slavePort = Integer.valueOf(switchMasterMsg[3]);
                        HostAndPort hap = new HostAndPort(slaveHost, slavePort);
                        insertReadPool(hap);
                    } else {
                        log.warn("Invalid message received on Sentinel " + host + ":" + port + " on channel "
                                + channel + ": " + message);
                    }
                }
            }, "-sdown", "+slave");
        } catch (JedisConnectionException e) {
            runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis);
        }
    }
}
项目:jahhan    文件:JedisSentinelPoolExt.java   
public void run() {

            running.set(true);

            while (running.get()) {

                j = new Jedis(host, port);

                try {
                    j.subscribe(new JedisPubSub() {
                        @Override
                        public void onMessage(String channel, String message) {
                            log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + ".");

                            String[] switchMasterMsg = message.split(" ");

                            if (switchMasterMsg.length > 7) {
                                String slaveHost = switchMasterMsg[2];
                                int slavePort = Integer.valueOf(switchMasterMsg[3]);
                                HostAndPort hap = new HostAndPort(slaveHost, slavePort);
                                removeFromReadPool(hap);
                            } else if (switchMasterMsg.length != 4) {
                                // master +sdown length=4
                                log.error("Invalid message received on Sentinel " + host + ":" + port
                                        + " on channel +sdown: " + message);
                            }
                        }
                    }, "+sdown");
                } catch (JedisConnectionException e) {
                    runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis);
                }
            }
        }
项目:confitura2015    文件:RedisTopicTest.java   
private JedisPubSub jedisConsume() {
    return new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            consumedMessages.add(message);
            super.onMessage(channel, message);
        }
    };
}
项目:leopard-data    文件:RedisImpl.java   
@Override
public void psubscribe(final JedisPubSub jedisPubSub, final String... patterns) {
    this.execute(new Invoker() {
        @Override
        public Object execute(Jedis jedis) {
            jedis.psubscribe(jedisPubSub, patterns);
            return null;
        }
    });
}
项目:leopard-data    文件:RedisImpl.java   
@Override
public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
    this.execute(new Invoker() {
        @Override
        public Object execute(Jedis jedis) {
            jedis.subscribe(jedisPubSub, channels);
            return null;
        }
    });
}
项目:leopard-data    文件:RedisMemoryImpl.java   
@Override
public Long publish(String channel, String message) {
    JedisPubSub jedisPubSub = this.channelMap.get(channel);
    if (jedisPubSub != null) {
        jedisPubSub.onMessage(channel, message);
    }
    return 1L;
}
项目:leopard-data    文件:RedisMemoryImplTest.java   
@Test
public void publish() {
    // Assert.noimpl(redis).publish("channel", "message");
    JedisPubSub jedisPubSub = Mockito.mock(JedisPubSub.class);
    redis.subscribe(jedisPubSub, "channel");
    redis.publish("channel", "message");
    Mockito.verify(jedisPubSub).onMessage("channel", "message");
}
项目:gameserver    文件:JedisAdapter.java   
@Override
public void subscribe(JedisPubSub jedisPubSub, String... channels) {
    redis.clients.jedis.Jedis delegate = pool.getResource();
    try {
        delegate.subscribe(jedisPubSub, channels);
    } finally {
        pool.returnResource(delegate);
    }
}
项目:gameserver    文件:JedisAdapter.java   
@Override
public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
    redis.clients.jedis.Jedis delegate = pool.getResource();
    try {
        delegate.psubscribe(jedisPubSub, patterns);
    } finally {
        pool.returnResource(delegate);
    }
}
项目:unique-web    文件:RedisUtil.java   
/**
 * 订阅给定的一个频道的信息。
 * 
 * @param jedisPubSub 监听器
 * @param channel 频道
 */
public void subscribe(final JedisPubSub jedisPubSub, final String channel) {
    new Executor<Object>(shardedJedisPool) {

        @Override
        Object execute() {
            Jedis _jedis = jedis.getShard(channel);
            // 注意subscribe是一个阻塞操作,因为当前线程要轮询Redis的响应然后调用subscribe
            _jedis.subscribe(jedisPubSub, channel);
            return null;
        }
    }.getResult();
}
项目:nighthawk    文件:JaRedisSentinelPool.java   
public void run() {
    running.set(true);
    while (running.get()) {
        j = new Jedis(host, port);
        try {
            // double check that it is not being shutdown
            if (!running.get()) {
                break;
            }
            j.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");

                    String[] switchMasterMsg = message.split(" ");

                    if (switchMasterMsg.length > 3) {

                        if (masterName.equals(switchMasterMsg[0])) {
                            initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
                        } else {
                            log.fine("Ignoring message on +switch-master for master name "
                                    + switchMasterMsg[0] + ", our master name is " + masterName);
                        }

                    } else {
                        log.severe("Invalid message received on Sentinel " + host + ":" + port
                                + " on channel +switch-master: " + message);
                    }
                }
            }, "+switch-master");

        } catch (JedisConnectionException e) {
            if (running.get()) {
                log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port
                        + ". Sleeping 5000ms and retrying.", e);
                try {
                    Thread.sleep(subscribeRetryWaitTimeMillis);
                } catch (InterruptedException e1) {
                    log.log(Level.SEVERE, "Sleep interrupted: ", e1);
                }
            } else {
                log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
            }
        } finally {
            j.close();
        }
    }
}
项目:JRediClients    文件:PublishSubscribeCommandsTest.java   
@Test(expected = JedisConnectionException.class)
public void unsubscribeWhenNotSusbscribed() throws InterruptedException {
  JedisPubSub pubsub = new JedisPubSub() {
  };
  pubsub.unsubscribe();
}
项目:JRediClients    文件:PublishSubscribeCommandsTest.java   
@Test(expected = JedisConnectionException.class)
public void handleClientOutputBufferLimitForSubscribeTooSlow() throws InterruptedException {
  final Jedis j = createJedis();
  final AtomicBoolean exit = new AtomicBoolean(false);

  final Thread t = new Thread(new Runnable() {
    public void run() {
      try {

        // we already set jedis1 config to
        // client-output-buffer-limit pubsub 256k 128k 5
        // it means if subscriber delayed to receive over 256k or
        // 128k continuously 5 sec,
        // redis disconnects subscriber

        // we publish over 100M data for making situation for exceed
        // client-output-buffer-limit
        String veryLargeString = makeLargeString(10485760);

        // 10M * 10 = 100M
        for (int i = 0; i < 10 && !exit.get(); i++) {
          j.publish("foo", veryLargeString);
        }

        j.disconnect();
      } catch (Exception ex) {
      }
    }
  });
  t.start();
  try {
    jedis.subscribe(new JedisPubSub() {
      public void onMessage(String channel, String message) {
        try {
          // wait 0.5 secs to slow down subscribe and
          // client-output-buffer exceed
          // System.out.println("channel - " + channel +
          // " / message - " + message);
          Thread.sleep(100);
        } catch (Exception e) {
          try {
            t.join();
          } catch (InterruptedException e1) {
          }

          fail(e.getMessage());
        }
      }
    }, "foo");
  } finally {
    // exit the publisher thread. if exception is thrown, thread might
    // still keep publishing things.
    exit.set(true);
    if (t.isAlive()) {
      t.join();
    }
  }
}
项目:bitstd    文件:RedisEmptyCache.java   
public void subscribe(JedisPubSub jedisPubSub, String channel) {
}