Java 类org.apache.hadoop.hbase.mapreduce.TableSplit 实例源码

项目:themis    文件:ThemisTableInputFormat.java   
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException {
  if (getHTable() == null) {
    throw new IOException("Cannot create a record reader because of a"
        + " previous error. Please look at the previous logs lines from"
        + " the task's full log for more details.");
  }
  TableSplit tSplit = (TableSplit) split;
  ThemisTableRecordReader trr = this.themisTableRecordReader;
  if (trr == null) {
    trr = new ThemisTableRecordReader();
  }
  Scan sc = new Scan(this.getScan());
  sc.setStartRow(tSplit.getStartRow());
  sc.setStopRow(tSplit.getEndRow());
  trr.setScan(sc);
  trr.setConf(getHTable().getConfiguration());
  trr.setTableName(getHTable().getTableName());
  try {
    trr.initialize(tSplit, context);
  } catch (InterruptedException e) {
    throw new InterruptedIOException(e.getMessage());
  }
  return trr;
}
项目:themis    文件:MultiThemisTableInputFormat.java   
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  TableSplit tSplit = (TableSplit) split;

  if (tSplit.getTableName() == null) {
    throw new IOException("Cannot create a record reader because of a"
        + " previous error. Please look at the previous logs lines from"
        + " the task's full log for more details.");
  }
  ThemisTableRecordReader trr = this.themisTableRecordReader;
  // if no table record reader was provided use default
  if (trr == null) {
    trr = new ThemisTableRecordReader();
  }
  Scan sc = tSplit.getScan();
  sc.setStartRow(tSplit.getStartRow());
  sc.setStopRow(tSplit.getEndRow());
  trr.setScan(sc);
  trr.setConf(context.getConfiguration());
  trr.setTableName(tSplit.getTableName());
  trr.initialize(split, context);
  return trr;
}
项目:spork-streaming    文件:HBaseTableInputFormat.java   
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    ListIterator<InputSplit> splitIter = splits.listIterator();
    while (splitIter.hasNext()) {
        TableSplit split = (TableSplit) splitIter.next();
        byte[] startKey = split.getStartRow();
        byte[] endKey = split.getEndRow();
        // Skip if the region doesn't satisfy configured options.
        if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
                (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
            splitIter.remove();
        }
    }
    return splits;
}
项目:spork    文件:HBaseTableInputFormat.java   
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    ListIterator<InputSplit> splitIter = splits.listIterator();
    while (splitIter.hasNext()) {
        TableSplit split = (TableSplit) splitIter.next();
        byte[] startKey = split.getStartRow();
        byte[] endKey = split.getEndRow();
        // Skip if the region doesn't satisfy configured options.
        if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
                (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
            splitIter.remove();
        }
    }
    return splits;
}
项目:PonIC    文件:HBaseTableInputFormat.java   
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    ListIterator<InputSplit> splitIter = splits.listIterator();
    while (splitIter.hasNext()) {
        TableSplit split = (TableSplit) splitIter.next();
        byte[] startKey = split.getStartRow();
        byte[] endKey = split.getEndRow();
        // Skip if the region doesn't satisfy configured options.
        if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
                (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
            splitIter.remove();
        }
    }
    return splits;
}
项目:spliceengine    文件:SMRecordReaderImplIT.java   
@Test
public void completeMemstoreScan() throws Exception{
    List<String> names = new ArrayList<String>();
    names.add("COL1");
    names.add("COL2");              
    config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".A", names).base64Encode());
    SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
    String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".A");
    HTable htable = new HTable(config,tableName);       
    Scan scan = new Scan();
    rr.setHTable(htable);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
        i++;
        Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
        Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
        Assert.assertNotNull("Current Key is null", rr.getCurrentKey());            
    }
    Assert.assertEquals("incorrect results returned",1000,i);
}
项目:spliceengine    文件:SMRecordReaderImplIT.java   
@Test
public void emptyMemstoreScan() throws Exception{
    List<String> names = new ArrayList<String>();
    names.add("COL1");
    names.add("COL2");              
    config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".D", names).base64Encode());
    SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
    String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".D");
    HTable htable = new HTable(config,tableName);       
    Scan scan = new Scan();
    rr.setHTable(htable);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
        i++;
        Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
        Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
        Assert.assertNotNull("Current Key is null", rr.getCurrentKey());            
    }
    Assert.assertEquals("incorrect results returned",1000,i);
}
项目:spliceengine    文件:SMRecordReaderImplIT.java   
@Test
public void singleRegionScanWithOneStoreFileAndMemstore() throws Exception{
    List<String> names = new ArrayList<String>();
    names.add("COL1");
    names.add("COL2");              
    config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".B", names).base64Encode());
    SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
    String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".B");
    HTable htable = new HTable(config,tableName);       
    Scan scan = new Scan();
    rr.setHTable(htable);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
        i++;
        Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
        Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
        Assert.assertNotNull("Current Key is null", rr.getCurrentKey());            
    }
    Assert.assertEquals("incorrect results returned",1000,i);
}
项目:spliceengine    文件:SMRecordReaderImplIT.java   
@Test
public void twoRegionsWithMemstores() throws Exception{
    List<String> names = new ArrayList<String>();
    names.add("COL1");
    names.add("COL2");              
    config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".C", names).base64Encode());
    SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
    String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".C");
    HTable htable = new HTable(config,tableName);       
    Scan scan = new Scan();
    rr.setHTable(htable);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
        i++;
        Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
        Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
        Assert.assertNotNull("Current Key is null", rr.getCurrentKey());            
    }
    Assert.assertEquals("incorrect results returned",10000,i);
}
项目:spliceengine    文件:SMRecordReaderImplIT.java   
@Test
public void testScanAfterMajorCompaction() throws Exception{
    List<String> names = new ArrayList<String>();
    names.add("COL1");
    names.add("COL2");              
    config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".E", names).base64Encode());
    SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
    String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".E");
    HTable htable = new HTable(config,tableName);       
    Scan scan = new Scan();
    rr.setHTable(htable);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
        i++;
        Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
        Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
        Assert.assertNotNull("Current Key is null", rr.getCurrentKey());            
    }
    Assert.assertEquals("incorrect results returned",5000,i);
}
项目:sedge    文件:HBaseTableInputFormat.java   
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    ListIterator<InputSplit> splitIter = splits.listIterator();
    while (splitIter.hasNext()) {
        TableSplit split = (TableSplit) splitIter.next();
        byte[] startKey = split.getStartRow();
        byte[] endKey = split.getEndRow();
        // Skip if the region doesn't satisfy configured options.
        if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
                (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
            splitIter.remove();
        }
    }
    return splits;
}
项目:hgraphdb    文件:TableInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
    if (isMock()) {
        if (table == null) {
            initialize(context);
        }
        List<InputSplit> splits = new ArrayList<>(1);
        TableSplit split = new TableSplit(getTable().getName(), getScan(),
                HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "", 0);
        splits.add(split);
        return splits;
    } else {
        return super.getSplits(context);
    }
}
项目:RStore    文件:TableInputFormatBase.java   
/**
 * Calculates the splits that will serve as input for the map tasks. The
 * number of splits matches the number of regions in a table.
 * 
 * @param context
 *            The current job context.
 * @return The list of input splits.
 * @throws IOException
 *             When creating the list of splits fails.
 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
 */
void setSplitsForTable(HTable tTable, Scan tScan, List<InputSplit> splits)
        throws IOException {
    if (tTable == null) {
        throw new IOException("No table was provided.");
    }
    Pair<byte[][], byte[][]> keys = tTable.getStartEndKeys();
    if (keys == null || keys.getFirst() == null
            || keys.getFirst().length == 0) {
        throw new IOException("Expecting at least one region.");
    }
    int count = 0;
    for (int i = 0; i < keys.getFirst().length; i++) {
        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
            continue;
        }
        String regionLocation = tTable.getRegionLocation(keys.getFirst()[i]).getHostname();
        byte[] startRow = tScan.getStartRow();
        byte[] stopRow = tScan.getStopRow();
        // determine if the given start an stop key fall into the region
        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes
                .compareTo(startRow, keys.getSecond()[i]) < 0)
                && (stopRow.length == 0 || Bytes.compareTo(stopRow,
                        keys.getFirst()[i]) > 0)) {
            byte[] splitStart = startRow.length == 0
                    || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
                    .getFirst()[i] : startRow;
            byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(
                    keys.getSecond()[i], stopRow) <= 0)
                    && keys.getSecond()[i].length > 0 ? keys.getSecond()[i]
                    : stopRow;
            InputSplit split = new TableSplit(tTable.getTableName(),
                    splitStart, splitStop, regionLocation);
            splits.add(split);
            if (LOG.isDebugEnabled())
                LOG.debug("getSplits: split -> " + (count++) + " -> "
                        + split);
        }
    }
}
项目:spork    文件:HBaseStorage.java   
@Override
public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
    if (split instanceof TableSplit) {
        return new TableSplitComparable((TableSplit) split);
    } else {
        throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
    }
}
项目:HGraph    文件:TableInputFormat.java   
private void checkAndPatchRegionEndKey(LinkedList<InputSplit> newSplits, byte[] regionEndKey) {
  if (Arrays.equals(HConstants.EMPTY_BYTE_ARRAY, regionEndKey)) {
    TableSplit tmpSplit = (TableSplit) newSplits.removeLast();
    newSplits.add(new TableSplit(tmpSplit.getTableName(), tmpSplit.getStartRow(),
        regionEndKey, tmpSplit.getLocations()[0]));
  }
}
项目:spliceengine    文件:HBasePartitioner.java   
@Override
public void initialize() {
    List<Partition> partitions = ((SparkDataSet) dataSet).rdd.partitions();
    tableSplits = new ArrayList<>(partitions.size());
    for (Partition p : partitions) {
        NewHadoopPartition nhp = (NewHadoopPartition) p;
        SMSplit sms = (SMSplit) nhp.serializableHadoopSplit().value();
        TableSplit ts = sms.getSplit();
        if (ts.getStartRow() != null && Bytes.equals(ts.getStartRow(),ts.getEndRow()) && ts.getStartRow().length > 0) {
            // this would be an empty partition, with the same start and end key, so don't add it
            continue;
        }
        tableSplits.add(ts);
    }
}
项目:spliceengine    文件:SMRecordReaderImpl.java   
public void init(Configuration config, InputSplit split) throws IOException, InterruptedException { 
    if (LOG.isDebugEnabled())
        SpliceLogUtils.debug(LOG, "init");
    TaskContext.get().addTaskFailureListener(this);
    String tableScannerAsString = config.get(MRConstants.SPLICE_SCAN_INFO);
       if (tableScannerAsString == null)
        throw new IOException("splice scan info was not serialized to task, failing");
    byte[] scanStartKey = null;
    byte[] scanStopKey = null;
    try {
        builder = TableScannerBuilder.getTableScannerBuilderFromBase64String(tableScannerAsString);
        if (LOG.isTraceEnabled())
            SpliceLogUtils.trace(LOG, "config loaded builder=%s", builder);
        TableSplit tSplit = ((SMSplit) split).getSplit();
        DataScan scan = builder.getScan();
        scanStartKey = scan.getStartKey();
        scanStopKey = scan.getStopKey();
        if (Bytes.startComparator.compare(scanStartKey, tSplit.getStartRow()) < 0) {
            // the split itself is more restrictive
            scan.startKey(tSplit.getStartRow());
        }
        if (Bytes.endComparator.compare(scanStopKey, tSplit.getEndRow()) > 0) {
            // the split itself is more restrictive
            scan.stopKey(tSplit.getEndRow());
        }
        setScan(((HScan) scan).unwrapDelegate());
        // TODO (wjk): this seems weird (added with DB-4483)
        this.statisticsRun = AbstractSMInputFormat.oneSplitPerRegion(config);
        restart(scan.getStartKey());
    } catch (IOException ioe) {
        LOG.error(String.format("Received exception with scan %s, original start key %s, original stop key %s, split %s",
                scan, Bytes.toStringBinary(scanStartKey), Bytes.toStringBinary(scanStopKey), split), ioe);
        throw ioe;
       } catch (StandardException e) {
        throw new IOException(e);
    }
}
项目:spliceengine    文件:AbstractSMInputFormat.java   
private List<InputSplit> toSMSplits (List<Partition> splits) throws IOException {
    List<InputSplit> sMSplits = Lists.newArrayList();
    HBaseTableInfoFactory infoFactory = HBaseTableInfoFactory.getInstance(HConfiguration.getConfiguration());
    for(Partition split:splits) {
        SMSplit smSplit = new SMSplit(
                new TableSplit(
                        infoFactory.getTableInfo(split.getTableName()),
                        split.getStartKey(),
                        split.getEndKey(),
                        split.owningServer().getHostname()));
        sMSplits.add(smSplit);
    }
    return sMSplits;
}
项目:ditb    文件:VerifyReplication.java   
/**
 * Map method that compares every scanned row with the equivalent from
 * a distant cluster.
 * @param row  The current table row key.
 * @param value  The columns.
 * @param context  The current context.
 * @throws IOException When something is broken with the data.
 */
@Override
public void map(ImmutableBytesWritable row, final Result value,
                Context context)
    throws IOException {
  if (replicatedScanner == null) {
    Configuration conf = context.getConfiguration();
    final Scan scan = new Scan();
    scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
    long startTime = conf.getLong(NAME + ".startTime", 0);
    long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
    String families = conf.get(NAME + ".families", null);
    if(families != null) {
      String[] fams = families.split(",");
      for(String fam : fams) {
        scan.addFamily(Bytes.toBytes(fam));
      }
    }
    scan.setTimeRange(startTime, endTime);
    int versions = conf.getInt(NAME+".versions", -1);
    LOG.info("Setting number of version inside map as: " + versions);
    if (versions >= 0) {
      scan.setMaxVersions(versions);
    }

    final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
    HConnectionManager.execute(new HConnectable<Void>(conf) {
      @Override
      public Void connect(HConnection conn) throws IOException {
        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
            zkClusterKey, PEER_CONFIG_PREFIX);

        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
        replicatedTable = new HTable(peerConf, tableName);
        scan.setStartRow(value.getRow());
        scan.setStopRow(tableSplit.getEndRow());
        replicatedScanner = replicatedTable.getScanner(scan);
        return null;
      }
    });
    currentCompareRowInPeerTable = replicatedScanner.next();
  }
  while (true) {
    if (currentCompareRowInPeerTable == null) {
      // reach the region end of peer table, row only in source table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
      break;
    }
    int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
    if (rowCmpRet == 0) {
      // rowkey is same, need to compare the content of the row
      try {
        Result.compareResults(value, currentCompareRowInPeerTable);
        context.getCounter(Counters.GOODROWS).increment(1);
      } catch (Exception e) {
        logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
        LOG.error("Exception while comparing row : " + e);
      }
      currentCompareRowInPeerTable = replicatedScanner.next();
      break;
    } else if (rowCmpRet < 0) {
      // row only exists in source table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
      break;
    } else {
      // row only exists in peer table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
        currentCompareRowInPeerTable);
      currentCompareRowInPeerTable = replicatedScanner.next();
    }
  }
}
项目:pbase    文件:VerifyReplication.java   
/**
 * Map method that compares every scanned row with the equivalent from
 * a distant cluster.
 * @param row  The current table row key.
 * @param value  The columns.
 * @param context  The current context.
 * @throws IOException When something is broken with the data.
 */
@Override
public void map(ImmutableBytesWritable row, final Result value,
                Context context)
    throws IOException {
  if (replicatedScanner == null) {
    Configuration conf = context.getConfiguration();
    final Scan scan = new Scan();
    scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
    long startTime = conf.getLong(NAME + ".startTime", 0);
    long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
    String families = conf.get(NAME + ".families", null);
    if(families != null) {
      String[] fams = families.split(",");
      for(String fam : fams) {
        scan.addFamily(Bytes.toBytes(fam));
      }
    }
    scan.setTimeRange(startTime, endTime);
    if (versions >= 0) {
      scan.setMaxVersions(versions);
    }

    final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
    HConnectionManager.execute(new HConnectable<Void>(conf) {
      @Override
      public Void connect(HConnection conn) throws IOException {
        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
        Configuration peerConf = HBaseConfiguration.create(conf);
        ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);

        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
        // TODO: THis HTable doesn't get closed.  Fix!
        Table replicatedTable = new HTable(peerConf, tableName);
        scan.setStartRow(value.getRow());
        scan.setStopRow(tableSplit.getEndRow());
        replicatedScanner = replicatedTable.getScanner(scan);
        return null;
      }
    });
    currentCompareRowInPeerTable = replicatedScanner.next();
  }
  while (true) {
    if (currentCompareRowInPeerTable == null) {
      // reach the region end of peer table, row only in source table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
      break;
    }
    int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
    if (rowCmpRet == 0) {
      // rowkey is same, need to compare the content of the row
      try {
        Result.compareResults(value, currentCompareRowInPeerTable);
        context.getCounter(Counters.GOODROWS).increment(1);
      } catch (Exception e) {
        logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
      }
      currentCompareRowInPeerTable = replicatedScanner.next();
      break;
    } else if (rowCmpRet < 0) {
      // row only exists in source table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
      break;
    } else {
      // row only exists in peer table
      logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
        currentCompareRowInPeerTable);
      currentCompareRowInPeerTable = replicatedScanner.next();
    }
  }
}
项目:themis    文件:TestThemisMapReduce.java   
@Override
public void setup(Context context) {
  tableName = ((TableSplit)context.getInputSplit()).getTableName();
}
项目:spork    文件:TableSplitComparable.java   
public TableSplitComparable() {
    tsplit = new TableSplit();
}
项目:spork    文件:TableSplitComparable.java   
public TableSplitComparable(TableSplit tsplit) {
    this.tsplit = tsplit;
}
项目:spork    文件:TableSplitComparable.java   
@Override
public int compareTo(TableSplit split) {
    return tsplit.compareTo((TableSplit) split);
}
项目:spliceengine    文件:SMHiveSplit.java   
public TableSplit getSplit() {
  return this.split.getSplit();
}
项目:spliceengine    文件:SMSplit.java   
public SMSplit() throws IOException{
  super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0,null);
  split = new TableSplit();
}
项目:spliceengine    文件:SMSplit.java   
public SMSplit(TableSplit split) throws IOException{
  super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0, null);
  this.split = split;
}
项目:spliceengine    文件:SMSplit.java   
public TableSplit getSplit() {
  return this.split;
}