Java 类org.apache.zookeeper.recipes.queue.DistributedQueue 实例源码

项目:zookeeper    文件:DistributedQueueTest.java   
@Test
@Concurrency(count = 100)
public void test() {
    setCallable(new RunnableCallable() {

        @Override
        public RunnableFuture run(int i) throws Exception {
            DistributedQueue queue = new DistributedQueue(ZooKeeperGetter.getZooKeeper(), "/queue", ZooDefs.Ids.OPEN_ACL_UNSAFE);
            String data = "data " + i;
            queue.offer(data.getBytes());
            LOG.info("queue offer: " + data);
            Thread.sleep(new Random().nextInt(10) * 1000);
            LOG.info("queue poll: " + new String(queue.poll()));
            return RunnableFuture.DONE;
        }

    });
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
@Test
public void testOffer1() throws Exception {
    String dir = "/testOffer1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[0].remove();
    Assert.assertEquals(new String(dequeuedBytes), testString);
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
@Test
public void testOffer2() throws Exception {
    String dir = "/testOffer2";
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[1].remove();
    Assert.assertEquals(new String(dequeuedBytes), testString);
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
@Test
public void testTake1() throws Exception {
    String dir = "/testTake1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[0].take();
    Assert.assertEquals(new String(dequeuedBytes), testString);
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
@Test
public void testRemove1() throws Exception{
    String dir = "/testRemove1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    try{
        queueHandles[0].remove();
    }catch(NoSuchElementException e){
        return;
    }
    Assert.assertTrue(false);
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
public void createNremoveMtest(String dir,int n,int m) throws Exception{
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    for(int i=0; i< n; i++){
        String offerString = testString + i;
        queueHandles[0].offer(offerString.getBytes());
    }

    byte data[] = null;
    for(int i=0; i<m; i++){
        data=queueHandles[1].remove();
    }
    Assert.assertEquals(new String(data), testString+(m-1));
}
项目:NeverwinterDP-Commons    文件:DistributedQueueTest.java   
public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    for(int i=0; i< n; i++){
        String offerString = testString + i;
        queueHandles[0].offer(offerString.getBytes());
    }

    byte data[] = null;
    for(int i=0; i<m; i++){
        data=queueHandles[1].remove();
    }
    Assert.assertEquals(new String(queueHandles[1].element()), testString+m);
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void testOffer1() throws Exception {
    String dir = "/testOffer1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[0].remove();
    assertEquals(new String(dequeuedBytes), testString);
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void testOffer2() throws Exception {
    String dir = "/testOffer2";
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[1].remove();
    assertEquals(new String(dequeuedBytes), testString);
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void testTake1() throws Exception {
    String dir = "/testTake1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    queueHandles[0].offer(testString.getBytes());

    byte dequeuedBytes[] = queueHandles[0].take();
    assertEquals(new String(dequeuedBytes), testString);
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void testRemove1() throws Exception{
    String dir = "/testRemove1";
    String testString = "Hello World";
    final int num_clients = 1;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    try{
        queueHandles[0].remove();
    }catch(NoSuchElementException e){
        return;
    }
    assertTrue(false);
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void createNremoveMtest(String dir,int n,int m) throws Exception{
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    for(int i=0; i< n; i++){
        String offerString = testString + i;
        queueHandles[0].offer(offerString.getBytes());
    }

    byte data[] = null;
    for(int i=0; i<m; i++){
        data=queueHandles[1].remove();
    }
    assertEquals(new String(data), testString+(m-1));
}
项目:zookeeper.dsc    文件:DistributedQueueTest.java   
public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
    String testString = "Hello World";
    final int num_clients = 2;
    ZooKeeper clients[] = new ZooKeeper[num_clients];
    DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
    for(int i=0; i < clients.length; i++){
        clients[i] = createClient();
        queueHandles[i] = new DistributedQueue(clients[i], dir, null);
    }

    for(int i=0; i< n; i++){
        String offerString = testString + i;
        queueHandles[0].offer(offerString.getBytes());
    }

    byte data[] = null;
    for(int i=0; i<m; i++){
        data=queueHandles[1].remove();
    }
    assertEquals(new String(queueHandles[1].element()), testString+m);
}