Java 类org.apache.zookeeper.server.persistence.TxnLog.TxnIterator 实例源码

项目:SecureKeeper    文件:FileTxnLog.java   
/**
 * get the last zxid that was logged in the transaction logs
 * @return the last zxid logged in the transaction logs
 */
public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    long maxLog=files.length>0?
            Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

    // if a log file is more recent we must scan it to find
    // the highest zxid
    long zxid = maxLog;
    TxnIterator itr = null;
    try {
        FileTxnLog txn = new FileTxnLog(logDir);
        itr = txn.read(maxLog);
        while (true) {
            if(!itr.next())
                break;
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        close(itr);
    }
    return zxid;
}
项目:SecureKeeper    文件:FileTxnLog.java   
/**
 * get the last zxid that was logged in the transaction logs
 * @return the last zxid logged in the transaction logs
 */
public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    long maxLog=files.length>0?
            Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

    // if a log file is more recent we must scan it to find
    // the highest zxid
    long zxid = maxLog;
    TxnIterator itr = null;
    try {
        FileTxnLog txn = new FileTxnLog(logDir);
        itr = txn.read(maxLog);
        while (true) {
            if(!itr.next())
                break;
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        close(itr);
    }
    return zxid;
}
项目:fuck_zookeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:https-github.com-apache-zookeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);
    // make sure to snapshot, so that we have something there when
    // truncateLog reloads the db
    snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:ZooKeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:StreamProcessingInfrastructure    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:bigstreams    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:bigstreams    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
}
项目:bigstreams    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage());
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:zookeeper-src-learning    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:zookeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:SecureKeeper    文件:FileTxnLog.java   
private void close(TxnIterator itr) {
    if (itr != null) {
        try {
            itr.close();
        } catch (IOException ioe) {
            LOG.warn("Error closing file iterator", ioe);
        }
    }
}
项目:SecureKeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:SecureKeeper    文件:FileTxnLog.java   
private void close(TxnIterator itr) {
    if (itr != null) {
        try {
            itr.close();
        } catch (IOException ioe) {
            LOG.warn("Error closing file iterator", ioe);
        }
    }
}
项目:SecureKeeper    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:StreamBench    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
    iter.close();
    ClientBase.recursiveDelete(tmpdir);
}
项目:ACaZoo    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:LoadBalanced_zk    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:LoadBalanced_zk    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
}
项目:LoadBalanced_zk    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:LoadBalanced_zk    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
}
项目:zookeeper.dsc    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
            throw new IOException("Failed to process transaction type: " +
                    hdr.getType() + " error: " + e.getMessage());
        }
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:zookeeper-pkg    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    while (true) {
        // iterator points to 
        // the first valid txn when initialized
        hdr = itr.getHeader();
        if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
        }
        if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error(highestZxid + "(higestZxid) > "
                    + hdr.getZxid() + "(next log) for type "
                    + hdr.getType());
        } else {
            highestZxid = hdr.getZxid();
        }
        try {
            processTransaction(hdr,dt,sessions, itr.getTxn());
        } catch(KeeperException.NoNodeException e) {
           throw new IOException("Failed to process transaction type: " +
                 hdr.getType() + " error: " + e.getMessage(), e);
        }
        listener.onTxnLoaded(hdr, itr.getTxn());
        if (!itr.next()) 
            break;
    }
    return highestZxid;
}
项目:zookeeper-pkg    文件:TruncateTest.java   
@Test
public void testTruncationStreamReset() throws Exception {
    File tmpdir = ClientBase.createTmpDir();
    FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir);
    ZKDatabase zkdb = new ZKDatabase(snaplog);

    for (int i = 1; i <= 100; i++) {
        append(zkdb, i);
    }

    zkdb.truncateLog(1);

    append(zkdb, 200);

    zkdb.close();

    // verify that the truncation and subsequent append were processed
    // correctly
    FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2"));
    TxnIterator iter = txnlog.read(1);

    TxnHeader hdr = iter.getHeader();
    Record txn = iter.getTxn();
    Assert.assertEquals(1, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);

    iter.next();

    hdr = iter.getHeader();
    txn = iter.getTxn();
    Assert.assertEquals(200, hdr.getZxid());
    Assert.assertTrue(txn instanceof SetDataTxn);
}
项目:fuck_zookeeper    文件:FileTxnSnapLog.java   
/**
 * this function restores[修复] the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
项目:fuck_zookeeper    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
    ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);
    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
    zks.shutdown();
}
项目:https-github.com-apache-zookeeper    文件:FileTxnSnapLog.java   
/**
 * this function restores the server
 * database after reading from the
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions,
        PlayBackListener listener) throws IOException {
    long deserializeResult = snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    boolean trustEmptyDB;
    File initFile = new File(dataDir.getParent(), "initialize");
    if (Files.deleteIfExists(initFile.toPath())) {
        LOG.info("Initialize file found, an empty database will not block voting participation");
        trustEmptyDB = true;
    } else {
        trustEmptyDB = autoCreateDB;
    }
    if (-1L == deserializeResult) {
        /* this means that we couldn't find any snapshot, so we need to
         * initialize an empty database (reported in ZOOKEEPER-2325) */
        if (txnLog.getLastLoggedZxid() != -1) {
            throw new IOException(
                    "No snapshot found, but there are log entries. " +
                    "Something is broken!");
        }

        if (trustEmptyDB) {
            /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
             *       or use Map on save() */
            save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);

            /* return a zxid of 0, since we know the database is empty */
            return 0L;
        } else {
            /* return a zxid of -1, since we are possibly missing data */
            LOG.warn("Unexpected empty data tree, setting zxid to -1");
            dt.lastProcessedZxid = -1L;
            return -1L;
        }
    }
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(highestZxid) > {}(next log) for type {}",
                        highestZxid, hdr.getZxid(), hdr.getType());
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next())
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
项目:https-github.com-apache-zookeeper    文件:TxnLogProposalIterator.java   
public TxnLogProposalIterator(TxnIterator itr) {
    if (itr != null) {
        this.itr = itr;
        hasNext = (itr.getHeader() != null);
    }
}
项目:https-github.com-apache-zookeeper    文件:ZKDatabase.java   
/**
 * Get proposals from txnlog. Only packet part of proposal is populated.
 *
 * @param startZxid the starting zxid of the proposal
 * @param sizeLimit maximum on-disk size of txnlog to fetch
 *                  0 is unlimited, negative value means disable.
 * @return list of proposal (request part of each proposal is null)
 */
public Iterator<Proposal> getProposalsFromTxnLog(long startZxid,
                                                 long sizeLimit) {
    if (sizeLimit < 0) {
        LOG.debug("Negative size limit - retrieving proposal via txnlog is disabled");
        return TxnLogProposalIterator.EMPTY_ITERATOR;
    }

    TxnIterator itr = null;
    try {

        itr = snapLog.readTxnLog(startZxid, false);

        // If we cannot guarantee that this is strictly the starting txn
        // after a given zxid, we should fail.
        if ((itr.getHeader() != null)
                && (itr.getHeader().getZxid() > startZxid)) {
            LOG.warn("Unable to find proposals from txnlog for zxid: "
                    + startZxid);
            itr.close();
            return TxnLogProposalIterator.EMPTY_ITERATOR;
        }

        if (sizeLimit > 0) {
            long txnSize = itr.getStorageSize();
            if (txnSize > sizeLimit) {
                LOG.info("Txnlog size: " + txnSize + " exceeds sizeLimit: "
                        + sizeLimit);
                itr.close();
                return TxnLogProposalIterator.EMPTY_ITERATOR;
            }
        }
    } catch (IOException e) {
        LOG.error("Unable to read txnlog from disk", e);
        try {
            if (itr != null) {
                itr.close();
            }
        } catch (IOException ioe) {
            LOG.warn("Error closing file iterator", ioe);
        }
        return TxnLogProposalIterator.EMPTY_ITERATOR;
    }
    return new TxnLogProposalIterator(itr);
}
项目:https-github.com-apache-zookeeper    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    final String hostPort = HOST + PortAssignment.unique();
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(hostPort.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT));
    ZooKeeper zk = ClientBase.createZKClient(hostPort);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);

    // Check that storage space return some value
    FileTxnIterator fileItr = (FileTxnIterator) itr;
    long storageSize = fileItr.getStorageSize();
    LOG.info("Txnlog size: " + storageSize + " bytes");
    Assert.assertTrue("Storage size is greater than zero ",
            (storageSize > 0));

    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
    zks.shutdown();
}
项目:https-github.com-apache-zookeeper    文件:LoadFromLogTest.java   
/**
 * test that we fail to load txnlog of a request zxid that is older
 * than what exist on disk
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoadFailure() throws Exception {
    final String hostPort = HOST + PortAssignment.unique();
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    // So we have at least 4 logs
    SyncRequestProcessor.setSnapCount(50);
    final int PORT = Integer.parseInt(hostPort.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(hostPort,CONNECTION_TIMEOUT));
    ZooKeeper zk = ClientBase.createZKClient(hostPort);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/data-", new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(hostPort, CONNECTION_TIMEOUT));

    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    File[] logFiles = FileTxnLog.getLogFiles(logDir.listFiles(), 0);
    // Verify that we have at least 4 txnlog
    Assert.assertTrue(logFiles.length > 4);
    // Delete the first log file, so we will fail to read it back from disk
    Assert.assertTrue("delete the first log file", logFiles[0].delete());

    // Find zxid for the second log
    long secondStartZxid = Util.getZxidFromName(logFiles[1].getName(), "log");

    FileTxnLog txnLog = new FileTxnLog(logDir);
    TxnIterator itr = txnLog.read(1, false);

    // Oldest log is already remove, so this should point to the start of
    // of zxid on the second log
    Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid());

    itr = txnLog.read(secondStartZxid, false);
    Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid());
    Assert.assertTrue(itr.next());

    // Trying to get a second txn on second txnlog give us the
    // the start of second log, since the first one is removed
    long nextZxid = itr.getHeader().getZxid();

    itr = txnLog.read(nextZxid, false);
    Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid());

    // Trying to get a first txn on the third give us the
    // the start of second log, since the first one is removed
    long thirdStartZxid = Util.getZxidFromName(logFiles[2].getName(), "log");
    itr = txnLog.read(thirdStartZxid, false);
    Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid());
    Assert.assertTrue(itr.next());

    nextZxid = itr.getHeader().getZxid();
    itr = txnLog.read(nextZxid, false);
    Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid());

}
项目:ZooKeeper    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
项目:ZooKeeper    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
    ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);
    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
    zks.shutdown();
}
项目:StreamProcessingInfrastructure    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
项目:StreamProcessingInfrastructure    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
    ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);
    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
    zks.shutdown();
}
项目:bigstreams    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
    ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);
    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
}
项目:bigstreams    文件:LoadFromLogTest.java   
/**
    * test that all transactions from the Log are loaded, and only once
    * @throws Exception an exception might be thrown here
    */
   @Test
   public void testLoad() throws Exception {
       // setup a single server cluster
       File tmpDir = ClientBase.createTmpDir();
       ClientBase.setupTestEnv();
       ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
       SyncRequestProcessor.setSnapCount(100);
       final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
       ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
       f.startup(zks);
       Assert.assertTrue("waiting for server being up ",
               ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
       ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

       // generate some transactions that will get logged
       try {
           for (int i = 0; i< NUM_MESSAGES; i++) {
               zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                       CreateMode.PERSISTENT);
           }
       } finally {
           zk.close();
       }
       f.shutdown();
       Assert.assertTrue("waiting for server to shutdown",
               ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

       // now verify that the FileTxnLog reads every transaction only once
File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
FileTxnLog txnLog = new FileTxnLog(logDir);

       TxnIterator itr = txnLog.read(0);
       long expectedZxid = 0;
       long lastZxid = 0;
       TxnHeader hdr;
       do {
           hdr = itr.getHeader();
           expectedZxid++;
           Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
           Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
           lastZxid = hdr.getZxid();
       }while(itr.next());

       Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
   }
项目:zookeeper    文件:FileTxnSnapLog.java   
/**
 * this function restores the server 
 * database after reading from the 
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the 
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    // 内存快照文件反序列化为内存数据库
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized

            // 事务头
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }

            // 比较内存快照的zxid和事务日志的zxid, 得到大的zxid
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // 从事务日志恢复内存数据库
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            // 触发事务加载监听事件
            listener.onTxnLoaded(hdr, itr.getTxn());
            // 处理下一条事务日志
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
项目:zookeeper    文件:LoadFromLogTest.java   
/**
 * test that all transactions from the Log are loaded, and only once
 * @throws Exception an exception might be thrown here
 */
@Test
public void testLoad() throws Exception {
    // setup a single server cluster
    File tmpDir = ClientBase.createTmpDir();
    ClientBase.setupTestEnv();
    ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
    SyncRequestProcessor.setSnapCount(100);
    final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
    ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
    f.startup(zks);
    Assert.assertTrue("waiting for server being up ",
            ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
    ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);

    // generate some transactions that will get logged
    try {
        for (int i = 0; i< NUM_MESSAGES; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
    } finally {
        zk.close();
    }
    f.shutdown();
    Assert.assertTrue("waiting for server to shutdown",
            ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));

    // now verify that the FileTxnLog reads every transaction only once
    File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
    FileTxnLog txnLog = new FileTxnLog(logDir);

    TxnIterator itr = txnLog.read(0);
    long expectedZxid = 0;
    long lastZxid = 0;
    TxnHeader hdr;
    do {
        hdr = itr.getHeader();
        expectedZxid++;
        Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid());
        Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid));
        lastZxid = hdr.getZxid();
    }while(itr.next());

    Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS));
    zks.shutdown();
}
项目:SecureKeeper    文件:FileTxnSnapLog.java   
/**
 * this function restores the server
 * database after reading from the
 * snapshots and transaction logs
 * @param dt the datatree to be restored
 * @param sessions the sessions to be restored
 * @param listener the playback listener to run on the
 * database restoration
 * @return the highest zxid restored
 * @throws IOException
 */
public long restore(DataTree dt, Map<Long, Integer> sessions,
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next())
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}