Java 类com.rabbitmq.client.DefaultConsumer 实例源码

项目:zipkin-reporter-java    文件:RabbitMQSenderTest.java   
private byte[] readMessage() throws Exception {
  final CountDownLatch countDown = new CountDownLatch(1);
  final AtomicReference<byte[]> result = new AtomicReference<>();

  Channel channel = sender.get().createChannel();
  try {
    channel.basicConsume(sender.queue(), true, new DefaultConsumer(channel) {
      @Override public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        result.set(body);
        countDown.countDown();
      }
    });
    countDown.await(5, TimeUnit.SECONDS);
  } finally {
    channel.close();
  }
  return result.get();
}
项目:june.mq    文件:ReceiveLogsDirect2.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月13日 下午2:57:32
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");
    // 获取匿名队列名称
    String queueName = channel.queueDeclare().getQueue();
    // 根据路由关键字进行多重绑定
    for (String severity : routingKeys2) {
        channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity);
        System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName
                + ", BindRoutingKey:" + severity);
    }
    System.out.println("ReceiveLogsDirect2 Waiting for messages");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws UnsupportedEncodingException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:june.mq    文件:ReceiveLogsDirect1.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月13日 下午2:53:18
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");
    // 获取匿名队列名称
    String queueName = channel.queueDeclare().getQueue();

    // 根据路由关键字进行绑定
    for (String routingKey : routingKeys1) {
        channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey);
        System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName
                + ", BindRoutingKey:" + routingKey);
    }
    System.out.println("ReceiveLogsDirect1  Waiting for messages");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:june.mq    文件:Customer.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月11日 下午5:32:45
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置RabbitMQ地址
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    // 创建一个新的连接
    Connection connection = factory.newConnection();
    // 创建一个通道
    Channel channel = connection.createChannel();
    // 声明要关注的队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println("Customer Waiting Received messages");
    // DefaultConsumer类实现了Consumer接口,通过传入一个频道,
    // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
    Consumer consumer = new DefaultConsumer(channel) {
        //envelope主要存放生产者相关信息(比如交换机、路由key等)
        //body是消息实体
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Customer Received '" + message + "'");
        }
    };
    // 自动回复队列应答 -- RabbitMQ中的消息确认机制
    channel.basicConsume(QUEUE_NAME, true, consumer);
}
项目:june.mq    文件:ReceiveLogs1.java   
/**
 * @param args
 * @throws TimeoutException
 * @throws IOException
 * @date 2017年7月13日 下午2:40:52
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    // 产生一个随机的队列名称
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定

    System.out.println("ReceiveLogs1 Waiting for messages");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogs1 Received '" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);// 队列会自动删除
}
项目:june.mq    文件:ReceiveLogsTopic2.java   
/**
     * @param args
     * @throws TimeoutException 
     * @throws IOException 
     * @date 2017年7月13日 下午3:08:40
     * @writer junehappylove
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setPort(port);
        factory.setVirtualHost(virtualHost);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      声明一个匹配模式的交换器
        channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic");
        String queueName = channel.queueDeclare().getQueue();
        // 路由关键字
        String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//      绑定路由关键字
        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey);
            System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }

        System.out.println("ReceiveLogsTopic2 Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws UnsupportedEncodingException  {
                String message = new String(body, "UTF-8");
                System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
项目:june.mq    文件:ReceiveLogsTopic1.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月13日 下午3:06:20
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明一个匹配模式的交换机
    channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic");
    String queueName = channel.queueDeclare().getQueue();
    // 路由关键字
    String[] routingKeys = new String[] { "*.orange.*" };
    // 绑定路由
    for (String routingKey : routingKeys) {
        channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey);
        System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName
                + ", BindRoutingKey:" + routingKey);
    }
    System.out.println("ReceiveLogsTopic1 Waiting for messages");

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.basicConsume(queueName, true, consumer);

}
项目:discord-bot-gateway    文件:GatewayClient.java   
/**
 * Configures the rabbitmq queues and the {@code shard-<id>-receive} consumer.
 *
 * Subclasses that want to change how messages are sent/received can override this method to disable the default implementation.
 *
 * @throws IOException if there's an error on {@link Channel#queueDeclare(String, boolean, boolean, boolean, Map)} or {@link Channel#basicConsume(String, boolean, Consumer)}.
 */
protected void configure() throws IOException {
    channel.queueDeclare("shard-" + shardId + "-send", true, false, false, null);
    channel.queueDeclare("shard-" + shardId + "-receive", true, false, false, null);
    channel.queueDeclare("shard-" + shardId + "-available", true, false, false, null);
    channel.queueDeclare("shard-" + shardId + "-unavailable", true, false, false, null);
    channel.basicConsume("shard-" + shardId + "-receive", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            ClientWebSocketClient c = client.get();
            if(c == null) return;
            JSONObject obj = new JSONObject(new String(body, StandardCharsets.UTF_8));
            if(obj.has("t") && obj.getString("t").equals("gateway-ping-update")) {
                c.getJDA().setPing(obj.getJSONObject("d").getLong("ping"));
                return;
            }
            if(enableRawGatewayEvent)
                c.getJDA().getEventManager().handle(new RawGatewayEvent(c.getJDA(), new JSONObject(obj.toString())));
            c.handleEvent(obj);
        }
    });
}
项目:mumu-rabbitmq    文件:RabbitMQQuickStart.java   
/**
 * 接受消息
 */
public void receiveQuickstartMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null);
    System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息");
    //建立一个消费者 监听消息的接受
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + message);
        }
    };
    channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer);
    //channel.close();
}
项目:mumu-rabbitmq    文件:RabbitMQRPC.java   
/**
 * 调用接口
 *
 * @param message 消息内容 不能为空
 */
public void client(String message) throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    String replyQueueName = channel.getChannel().queueDeclare().getQueue();
    String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties
            .Builder()
            .correlationId(corrId)
            .replyTo(replyQueueName)
            .build();
    System.out.println("rpc客户端发送消息:" + message);
    channel.getChannel().basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
    channel.getChannel().basicConsume(replyQueueName, true, new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (properties.getCorrelationId().equals(corrId)) {
                System.out.println("rpc客户端收到结果:" + new String(body, "UTF-8") + "\n");
            }
        }
    });
}
项目:mumu-rabbitmq    文件:RabbitMQPubsub.java   
/**
 * 接受消息
 */
public void receivePubsubMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.getChannel().queueDeclare().getQueue();
    channel.getChannel().queueBind(queueName, EXCHANGE_NAME, "");
    System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息");
    System.out.println("选择队列:"+queueName);
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + message + "'");
        }
    };
    channel.getChannel().basicConsume(queueName, true, consumer);
}
项目:mumu-rabbitmq    文件:RabbitMQTopic.java   
/**
 * 接受主题消息
 */
public void receiveTopicMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().exchangeDeclare(TOPIC, "topic");
    String queueName = channel.getChannel().queueDeclare().getQueue();
    channel.getChannel().queueBind(queueName, TOPIC, "bindingKey");
    System.out.println("等待接受topic主题【" + TOPIC + "】消息");
    System.out.println("选择队列:" + queueName);
    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'");
        }
    };
    channel.getChannel().basicConsume(queueName, true, consumer);
}
项目:uavstack    文件:RabbitMQRecv.java   
@POST
@Path("rabbitmqRecv")
public void send() throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setHost("127.0.0.1");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DefaultConsumer consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {

            System.out.println(properties.getHeaders());
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
}
项目:GabrielBot    文件:Gateway.java   
GabrielGateway(int shardId, Channel channel) throws IOException {
    super(shardId, channel);
    channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null);
    channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null);
    channel.basicConsume("shard-" + shardId + "-getping", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            channel.basicPublish("", "shard-" + shardId + "-getping-response", null, body);
        }
    });
}
项目:GabrielBot    文件:GatewayInfo.java   
public static synchronized void init(Channel channel) throws IOException, TimeoutException {
    if(current != null) {
        throw new IllegalStateException("Already started");
    }
    current = new GatewayInfo("unknown", "unknown", -1, -1, -1, -1, -1, -1, -1);
    channel.queueDeclare("gateway-info", false, false, false, null);
    channel.basicConsume("gateway-info", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            JSONObject object = new JSONObject(new String(body, StandardCharsets.UTF_8));
            JSONObject ram = object.getJSONObject("ram");
            try {
                current = new GatewayInfo(
                        object.getString("version"),
                        object.getString("jda-version"),
                        object.getDouble("cpu-usage"),
                        object.getInt("thread-count"),
                        object.getLong("uptime"),
                        ram.getLong("used"),
                        ram.getLong("free"),
                        ram.getLong("total"),
                        ram.getLong("max")
                );
            } catch(JSONException e) {
                GabrielBot.LOGGER.error("Error creating GatewayInfo: " + e.getMessage());
            }
        }
    });
}
项目:GabrielBot    文件:GabrielGatewayClient.java   
public GabrielGatewayClient(int shardId, Channel channel) throws IOException {
    super(shardId, channel, true);
    channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null);
    channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null);
    channel.basicConsume("shard-" + shardId + "-getping-response", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            long now = System.currentTimeMillis();
            ping = now - Longs.fromByteArray(body);
        }
    });
    calculatePing();
    PING_CALCULATOR.scheduleAtFixedRate(this::calculatePing, 30, 30, TimeUnit.SECONDS);
}
项目:flowing-retail-old    文件:RabbitMqConsumer.java   
protected void connect() throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  channel = connection.createChannel();

  String queueName = "flowing-retail-" + name;
  channel.queueDeclare(queueName, true, false, false, null);
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model
  channel.queueBind(queueName, EXCHANGE_NAME, "*");

  System.out.println(" [*] Waiting for messages.");

  Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String message = new String(body, "UTF-8");
      System.out.println(" [x] Received '" + message + "'");
      eventHandler.handleEvent(message);
    }
  };
  channel.basicConsume(queueName, true, consumer);
}
项目:rabbitmq-resource-adapter    文件:ConnectorTestCase.java   
@Test
public void testConnectionFactory() throws Exception {
    Assert.assertNotNull(connectionFactory1);
    Assert.assertNotNull(queue);

    RabbitmqConnection connection = connectionFactory1.getConnection();
    Assert.assertNotNull(connection);
    String queueName = "testing";
    Channel channel = connection.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);
    String message = "Hello World!";

    final CountDownLatch counter = new CountDownLatch(1);
    Consumer consume = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            Assert.assertEquals("Hello World!", new String(body));
            counter.countDown();
        }
    };

    channel.basicConsume(queueName, true, consume);
    channel.basicPublish("", queueName, null, message.getBytes());
    counter.await(10, TimeUnit.SECONDS);
    Assert.assertEquals(0, counter.getCount());
    channel.close();

}
项目:rabbitmq-resource-adapter    文件:RabbitmqMessageHandler.java   
public void setup() throws Exception {
    if (log.isTraceEnabled()) {
        log.trace("setup()");
    }
    this.consumer = new DefaultConsumer(this.channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            MessageEndpoint localEndpoint;
            try {
                localEndpoint = endpointFactory.createEndpoint(null);
                RabbitmqBytesMessage m = new RabbitmqBytesMessage(consumerTag,envelope,properties,body);
                onMessage(localEndpoint, m);
            } catch (UnavailableException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    };
    if ("javax.jms.Queue".equals(this.spec.getDestinationType())) {
        RabbitmqAdminQueueImpl queue = Util.lookup(new InitialContext(), this.spec.getDestination(), RabbitmqAdminQueueImpl.class);
        this.channel.basicConsume(queue.getDestinationAddress(),true, consumer);
    }
}
项目:zipkin-reporter-java    文件:RabbitMQSenderBenchmarks.java   
@Override protected Sender createSender() throws Exception {
  RabbitMQSender result = RabbitMQSender.newBuilder()
      .queue("zipkin-jmh")
      .addresses("localhost:5672").build();

  CheckResult check = result.check();
  if (!check.ok()) {
    throw new AssumptionViolatedException(check.error().getMessage(), check.error());
  }

  channel = result.get().createChannel();
  channel.queueDelete(result.queue());
  channel.queueDeclare(result.queue(), false, true, true, null);

  Thread.sleep(500L);

  new Thread(() -> {
    try {
      channel.basicConsume(result.queue(), true, new DefaultConsumer(channel));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }).start();

  return result;
}
项目:Mache    文件:RabbitMQEventConsumer.java   
@Override
public void beginSubscriptionThread() throws InterruptedException, IOException {
    DefaultConsumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                                Envelope envelope,
                                                AMQP.BasicProperties properties,
                                                byte[] body)
            throws IOException {
            long deliveryTag = envelope.getDeliveryTag();

            LOG.info("[RabbitMQEventConsumer {}] Received Message: {}",
                Thread.currentThread().getId(), new String(body));

            Gson gson = new Gson();
            @SuppressWarnings("unchecked")
            final CoordinationEntryEvent<K> event = gson.fromJson(new String(body), CoordinationEntryEvent.class);
            routeEventToListeners(event);

            channel.basicAck(deliveryTag, false);
        }
    };
    consumerTag = channel.basicConsume(queueName, false, consumer);
}
项目:rabbitmq-resource-adapter    文件:RabbitmqMessageHandler.java   
public void setup() throws Exception {
    if (log.isTraceEnabled()) {
        log.trace("setup()");
    }
    this.consumer = new DefaultConsumer(this.channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            MessageEndpoint localEndpoint;
            try {
                localEndpoint = endpointFactory.createEndpoint(null);
                RabbitmqBytesMessage m = new RabbitmqBytesMessage(consumerTag,envelope,properties,body);
                onMessage(localEndpoint, m);
            } catch (UnavailableException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    };
    if ("javax.jms.Queue".equals(this.spec.getDestinationType())) {
        RabbitmqAdminQueueImpl queue = Util.lookup(new InitialContext(), this.spec.getDestination(), RabbitmqAdminQueueImpl.class);
        this.channel.basicConsume(queue.getDestinationAddress(),true, consumer);
    }
}
项目:oneview-sdk-java    文件:MessageBusClient.java   
public Channel addMsmbHandler(String routingKey, final MsmbMessageHandler handler) {
    Channel channel;

    try {
        channel = this.connection.createChannel();
        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue, MSMB_EXCHANGE, routingKey);

        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                MsmbMessage msmbMessage = MessageBusClient.this.adaptor.buildResource(message, MsmbMessage.class);

                handler.handleMessage(msmbMessage);
            }
        });
    } catch (IOException e) {
        throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError,
                "Could not subscribe to Metric Streaming Message Bus", e);
    }
    return channel;
}
项目:pinpoint    文件:RabbitMQConsumeInterceptor.java   
@Override
protected void doInAfterTrace(SpanRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
    DefaultConsumer consumer = (DefaultConsumer) target;
    Connection connection = consumer.getChannel().getConnection();
    Envelope envelope = (Envelope) args[1];
    AMQP.BasicProperties properties = (AMQP.BasicProperties) args[2];
    byte[] body = (byte[]) args[3];

    String exchange=envelope.getExchange();
    if (exchange == null || exchange.equals(""))
        exchange = "unknown";

    recorder.recordApi(methodDescriptor);
    recorder.recordAttribute(RabbitMQConstants.RABBITMQ_ROUTINGKEY_ANNOTATION_KEY, envelope.getRoutingKey());
    recorder.recordRemoteAddress(connection.getAddress().getHostAddress() + ":" + connection.getPort());

    if (throwable != null) {
        recorder.recordException(throwable);
    }
}
项目:june.mq    文件:Work1.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月11日 下午5:55:38
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    final ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println("Worker1  Waiting for messages");

    // 每次从队列获取的数量
    //channel.basicQos(1);保证一次只分发一个 
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Worker1  Received '" + message + "'");
            try {
                //throw new Exception();
                doWork(message);
            } catch (Exception e) {
                channel.abort();
            } finally {
                System.out.println("Worker1 Done");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    //autoAck是否自动回复,
    //如果为true的话,每次生产者只要发送信息就会从内存中删除,
    //那么如果消费者程序异常退出,那么就无法获取数据,
    //我们当然是不希望出现这样的情况,所以才去手动回复,
    //每当消费者收到并处理信息然后在通知生成者。
    //最后从队列中删除这条信息。
    //如果消费者异常退出,如果还有其他消费者,
    //那么就会把队列中的消息发送给其他消费者,
    //如果没有,等消费者启动时候再次发送。
    boolean autoAck = false;
    // 消息消费完成确认
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}
项目:june.mq    文件:Work2.java   
/**
 * @param args
 * @throws IOException
 * @throws TimeoutException
 * @date 2017年7月11日 下午5:55:38
 * @writer junehappylove
 */
public static void main(String[] args) throws IOException, TimeoutException {
    final ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setPort(port);
    factory.setVirtualHost(virtualHost);
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println("Worker2 Waiting for messages");

    // 每次从队列获取的数量
    //channel.basicQos(1);保证一次只分发一个 
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Worker2 Received '" + message + "'");
            try {
                //throw new Exception();
                doWork(message);
            } catch (Exception e) {
                channel.abort();
            } finally {
                System.out.println("Worker2 Done");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    //autoAck是否自动回复,
    //如果为true的话,每次生产者只要发送信息就会从内存中删除,
    //那么如果消费者程序异常退出,那么就无法获取数据,
    //我们当然是不希望出现这样的情况,所以才去手动回复,
    //每当消费者收到并处理信息然后在通知生成者。
    //最后从队列中删除这条信息。
    //如果消费者异常退出,如果还有其他消费者,
    //那么就会把队列中的消息发送给其他消费者,
    //如果没有,等消费者启动时候再次发送。
    boolean autoAck = false;
    // 消息消费完成确认
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}
项目:PepSIIrup-2017    文件:RabbitClient.java   
/**
 * Method to exchange a message to another service
 * @param data
 * @param routingKey
 * @return
 */
public String rabbitRPCRoutingKeyExchange(byte[] data, String routingKey){
    this.corrId = UUID.randomUUID().toString();
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(this.corrId).replyTo(replyQueueName).build();     
    try {
        channel.basicPublish(this.exchange, routingKey, props, data);
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    boolean b = response.offer(new String(body, ENCODE));
                    Log
                    .forContext("responseStatus",b)
                    .forContext("Service", "web-service")
                    .information("rabbit message handled status ");
                }
            }
        });
        return response.take();
    } catch (Exception e) {
        Log
        .forContext("MemberName", "rabbitRPCRoutingKeyExchange")
        .forContext("Service", "web-service")
        .error(e,"Exception");
    }
    return null;
}
项目:mumu-rabbitmq    文件:RabbitMQRPC.java   
/**
 * 服务端开启服务
 */
public void service() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    channel.getChannel().basicQos(1);
    System.out.println("等待rpc客户端连接...");

    Consumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(properties.getCorrelationId())
                    .build();
            String response = "";
            try {
                String message = new String(body, "UTF-8");
                System.out.println("服务端接受到消息:" + message);
                response = message + UUID.randomUUID().toString();
            } catch (RuntimeException e) {
                e.printStackTrace();
            } finally {
                channel.getChannel().basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.getChannel().basicAck(envelope.getDeliveryTag(), false);
                System.out.println("服务端将处理结果:" + response + ",返回客户单\n");
            }
        }
    };
    channel.getChannel().basicConsume(RPC_QUEUE_NAME, false, consumer);
}
项目:mumu-rabbitmq    文件:RabbitMQWorkQueue.java   
/**
 * 接受工作队列消息
 */
public void receiveWorkQueueMessage() throws IOException, TimeoutException {
    RabbitMQChannel channel = new RabbitMQChannel().channel();
    channel.getChannel().queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
    channel.getChannel().basicQos(1);
    System.out.println("等待接受workQueue队列【"+WORK_QUEUE_NAME+"】消息");
    //建立一个消费者 监听消息的接受
    DefaultConsumer consumer = new DefaultConsumer(channel.getChannel()) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("接受workQueue消息:" + message);
            try {
                for (char ch : message.toCharArray()) {
                    if (ch == '.') {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException _ignored) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            } finally {
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    channel.getChannel().basicConsume(WORK_QUEUE_NAME, false, consumer);
}
项目:RabbitMQ-gateway    文件:RabbitListener.java   
private Consumer createConsumer(Channel channel) {
  final Consumer consumer = new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
      incomingMessagesSubject.onNext(body);
    }
  };
  return consumer;
}
项目:apple-jms    文件:BaseMessageConsumer.java   
protected void init() throws IOException {
    Consumer consumer = new DefaultConsumer(channel) {  
           @Override  
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                   throws IOException {
            processByteMessage(body);
           }  
       };  
       channel.basicConsume(topic, true, consumer);  
}
项目:VoltDB    文件:ExportRabbitMQVerifier.java   
private Consumer createConsumer(final Channel channel)
{
    return new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body)
                throws IOException
        {
            long deliveryTag = envelope.getDeliveryTag();

            String row[] = ExportOnServerVerifier.RoughCSVTokenizer.tokenize(new String(body, Charsets.UTF_8));
            if (row.length != 29) {
                return;
            }
            ExportOnServerVerifier.ValidationErr err = null;
            try {
                err = ExportOnServerVerifier.verifyRow(row);
            } catch (ExportOnServerVerifier.ValidationErr validationErr) {
                validationErr.printStackTrace();
            }
            if (err != null) {
                System.out.println("ERROR in validation: " + err.toString());
            }

            if (++m_verifiedRows % VALIDATION_REPORT_INTERVAL == 0) {
                System.out.println("Verified " + m_verifiedRows + " rows.");
            }

            channel.basicAck(deliveryTag, false);
        }
    };
}
项目:cukes    文件:MessageService.java   
@SneakyThrows({IOException.class, InterruptedException.class})
public MessageWrapper receiveMessage(String queue, int timeoutInSeconds) {
    Channel channel = connectionService.getChannel();
    BlockingQueue<MessageWrapper> result = new ArrayBlockingQueue<MessageWrapper>(1);
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String response = new String(body);
            MessageWrapper messageWrapper = new MessageWrapper(response, properties);
            result.add(messageWrapper);
        }
    });
    return result.poll(timeoutInSeconds, TimeUnit.SECONDS);
}
项目:rabbitmq-resource-adapter    文件:ConnectorTestCase.java   
@Test
public void testConnectionFactory() throws Exception {
    Assert.assertNotNull(connectionFactory1);
    Assert.assertNotNull(queue);

    RabbitmqConnection connection = connectionFactory1.getConnection();
    Assert.assertNotNull(connection);
    String queueName = "testing";
    Channel channel = connection.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);
    String message = "Hello World!";

    final CountDownLatch counter = new CountDownLatch(1);
    Consumer consume = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                BasicProperties properties, byte[] body) throws IOException {
            Assert.assertEquals("Hello World!", new String(body));
            counter.countDown();
        }
    };

    channel.basicConsume(queueName, true, consume);
    channel.basicPublish("", queueName, null, message.getBytes());
    counter.await(10, TimeUnit.SECONDS);
    Assert.assertEquals(0, counter.getCount());
    channel.close();

}
项目:oneview-sdk-java    文件:MessageBusClient.java   
public <T extends BaseModelResource> Channel addScmbTypedHandler(String routingKey,
        final ScmbTypedMessageHandler<T> handler) {

    Channel channel;

    try {
        channel = this.connection.createChannel();
        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue, SCMB_EXCHANGE, routingKey);

        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @SuppressWarnings("unchecked")
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                ScmbTypedMessage<T> scmbMessage = (ScmbTypedMessage<T>) MessageBusClient.this.adaptor.buildResource(
                        message, handler.typeToken().getType());

                handler.handleMessage(scmbMessage);
            }
        });
    } catch (IOException e) {
        throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError,
                "Could not subscribe to State-Changed Message Bus", e);
    }
    return channel;
}
项目:oneview-sdk-java    文件:MessageBusClient.java   
public Channel addScmbHandler(String routingKey, final ScmbMessageHandler handler) {
    Channel channel;

    try {
        channel = this.connection.createChannel();
        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue, SCMB_EXCHANGE, routingKey);

        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @SuppressWarnings("unchecked")
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                ScmbMessage scmbMessage = MessageBusClient.this.adaptor.buildResource(message, ScmbMessage.class);

                handler.handleMessage(scmbMessage);
            }
        });
    } catch (IOException e) {
        throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError,
                "Could not subscribe to State-Changed Message Bus", e);
    }
    return channel;
}
项目:priority-consumer-examples    文件:HiLoPriorityConsumers.java   
private static DefaultConsumer instantiateConsumer(final Channel ch1, final String priority) {
    return new DefaultConsumer(ch1) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("[" + priority + " priority] Consumed " + new String(body, "UTF-8"));
        }
    };
}
项目:Rabbitqueue    文件:MainActivity.java   
void subscribe() {
    subscribeThread = new Thread(new Runnable() {

        Connection connection = null;

        @Override
        public void run() {
            try {
                ConnectionFactory factory;
                factory = new ConnectionFactory();
                setupConnectionFactory(factory);
                connection = factory.newConnection();
                Log.d("debug", "connection success");
                Channel channel = connection.createChannel();
                channel.basicQos(1);
                AMQP.Queue.DeclareOk q = channel.queueDeclare();
                channel.queueBind(q.getQueue(), "logs", "");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        Log.d("TEST", " [x] Received '" + message + "'");
                        try {
                            JSONObject jsonObject = new JSONObject(message);
                            //dataList 변수에 vel_raw, vel_max, dis_raw, dis_max 정보가 담기게 된다.
                            //저장한 값을 그래프에 넣어준다.
                            dataList = parseJsonFromDataSet(jsonObject);
                            float[] velRaw = dataList.getVelRaw();
                            float velMax = dataList.getVelMax();
                            float[] disRaw = dataList.getDisRaw();
                            float disMax = dataList.getDisMax();

                            ArrayList<ChartItem> chartItems = new ArrayList<>();
                            chartItems.add(new LineChartItem(generateDataLine(velRaw, "vel_raw"), getApplicationContext(),velMax));
                            chartItems.add(new LineChartItem(generateDataLine(disRaw, "dis_raw"), getApplicationContext(),disMax));

                            ChartDataAdapter cda = new ChartDataAdapter(getApplicationContext(), chartItems);
                            lv.setAdapter(cda);
                        } catch (IOException | JSONException | ParseException e) {
                            e.printStackTrace();
                        }
                    }
                };
                channel.basicConsume(q.getQueue(), true, consumer);

            } catch (Exception e1) {
                Log.d("", "Connection broken: " + e1.getClass().getName());
                try {
                    Thread.sleep(4000); //sleep and then try again
                } catch (InterruptedException e) {
                    ;
                }
            }

        }
    });
    subscribeThread.start();
}
项目:java-rabbitmq-client    文件:TracingTest.java   
@Test
public void basicConsume() throws Exception {
  String exchangeName = "basicConsumeExchange";
  String queueName = "basicConsumeQueue";
  String routingKey = "#";

  channel.exchangeDeclare(exchangeName, "direct", true);
  channel.queueDeclare(queueName, true, false, false, null);
  channel.queueBind(queueName, exchangeName, routingKey);

  byte[] messageBodyBytes = "Hello, world!".getBytes();

  channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

  final CountDownLatch latch = new CountDownLatch(1);
  channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag,
        Envelope envelope,
        AMQP.BasicProperties properties,
        byte[] body)
        throws IOException {
      long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
      latch.countDown();
    }
  });

  latch.await(30, TimeUnit.SECONDS);

  List<MockSpan> finishedSpans = mockTracer.finishedSpans();
  int tries = 10;
  while (tries > 0 && finishedSpans.size() < 2) {
    TimeUnit.SECONDS.sleep(1L);
    finishedSpans = mockTracer.finishedSpans();
    tries--;
  }

  assertEquals(2, finishedSpans.size());
  checkSpans(finishedSpans);
  assertNull(mockTracer.activeSpan());
}
项目:osc-core    文件:RabbitMQClient.java   
/**
 *
 * This method will create a Connection with a channel to the provided server and Will provide a consumer to consume
 * Notifications sent on the subscribed Queue
 *
 * @throws IOException
 *
 */
public void connect() throws IOException {
    ConnectionFactory factory = initConnectionFactory();
    while (true) {
        try {
            if (this.connection == null || !this.connection.isOpen()) {
                this.connection = factory.newConnection();
            }
            if (this.channel == null || !this.channel.isOpen()) {
                this.channel = this.connection.createChannel();
            }
            initChannel(this.channel);

            final DefaultConsumer consumer = new DefaultConsumer(this.channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    receiveMessage(new String(body));
                }

            };
            this.consumerTag = this.channel.basicConsume(this.queue, true, consumer);
            log.info("Successfully connected to RabbitMQ Server :- " + this.serverIP);
            return;
        } catch (Exception e) {
            // Ignore connection timeout and retry until you connect...
            log.error("Failed to connect to Rabbit MQ server '" + this.serverIP + "' Error:" + e.getMessage());
            generateConnectionFailureAlert();
            try {
                Thread.sleep(RECONNECT_DELAY);
            } catch (InterruptedException ex) {
                log.error("RabbitMQ connect was interrupted! '" + this.serverIP + "' Error:" + e.getMessage());
                return;
            }
        }
    }
}