private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); List<InputSplit> splits = tsif.getSplits(job); Assert.assertEquals(expectedNumSplits, splits.size()); HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext); rr.initialize(split, taskAttemptContext); // validate we can read all the data back while (rr.nextKeyValue()) { byte[] row = rr.getCurrentKey().get(); verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); rowTracker.addRow(row); } rr.close(); } // validate all rows are seen rowTracker.validate(); }
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); List<InputSplit> splits = tsif.getSplits(job); Assert.assertEquals(expectedNumSplits, splits.size()); HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); boolean localityEnabled = job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); if (localityEnabled) { Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); } else { Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); } // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext); rr.initialize(split, taskAttemptContext); // validate we can read all the data back while (rr.nextKeyValue()) { byte[] row = rr.getCurrentKey().get(); verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); rowTracker.addRow(row); } rr.close(); } // validate all rows are seen rowTracker.validate(); }