public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); conf.setInt("hbase.client.retries.number", 1); conf.setInt("ipc.client.connect.max.retries", 1); byte[] table = Bytes.toBytes("t"); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("id")); final ColumnInterpreter<Long, Long> columnInterpreter = new LongStrColumnInterpreter(); try { AggregationClient aClient = new AggregationClient(conf); Long rowCount = aClient.min(table, columnInterpreter, scan); System.out.println("The result is " + rowCount); } catch (Throwable e) { e.printStackTrace(); } }
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { CompletableFuture<R> future = new CompletableFuture<>(); sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { if (error != null) { future.completeExceptionally(error); } else if (sumByRegion.isEmpty()) { future.completeExceptionally(new NoSuchElementException()); } else { findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan), sumByRegion); } }); return future; }
<R, S, P extends Message, Q extends Message, T extends Message> TimeseriesAggregateRequest validateArgAndGetPB(Scan scan, ColumnInterpreter<R, S, P, Q, T> ci, boolean canFamilyBeAbsent, int intervalSeconds, Integer timestampSecondsMin, Integer timestampSecondsMax, String keyFilterPattern) throws IOException { validateParameters(scan, canFamilyBeAbsent); final TimeseriesAggregateRequest.Builder requestBuilder = TimeseriesAggregateRequest.newBuilder(); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); P columnInterpreterSpecificData = null; if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); } requestBuilder.setScan(ProtobufUtil.toScan(scan)); requestBuilder.setTimeIntervalSeconds(intervalSeconds); if (!(null == timestampSecondsMin)) { final TimeseriesRange.Builder rangeBuilder = TimeseriesRange.newBuilder(); rangeBuilder.setKeyTimestampMin(timestampSecondsMin); rangeBuilder.setKeyTimestampMax(timestampSecondsMax); rangeBuilder.setKeyTimestampFilterPattern(keyFilterPattern); requestBuilder.setRange(rangeBuilder.build()); } return requestBuilder.build(); }
@Override public TimeseriesAggregateProtos.TimeseriesAggregateResponse wrapForTransport(Map results, ColumnInterpreter ci) { Map<Long,Pair<Long,S>> avgs = results; TimeseriesAggregateProtos.TimeseriesAggregateResponse.Builder responseBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponse.newBuilder(); for (Map.Entry<Long, Pair<Long, S>> entry : avgs.entrySet()) { TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.Builder valueBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.newBuilder(); TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.Builder mapElementBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.newBuilder(); ByteString first = ci.getProtoForPromotedType(entry.getValue().getSecond()).toByteString(); valueBuilder.addFirstPart(first); ByteBuffer bb = ByteBuffer.allocate(8).putLong(entry.getValue().getFirst()); bb.rewind(); valueBuilder.setSecondPart(ByteString.copyFrom(bb)); mapElementBuilder.setKey(entry.getKey()); mapElementBuilder.setValue(valueBuilder.build()); responseBuilder.addEntry(mapElementBuilder.build()); } return responseBuilder.build(); }
@Override public <T, S, P extends Message, Q extends Message, R extends Message> Map<Long, S> compute( Map results, Cell kv, ColumnInterpreter<T, S, P, Q, R> ci, byte[] columnFamily, long timestamp, List<TimeRange> timeRanges) throws IOException { Map<Long, S> sums = results; ColumnInterpreter<T, S, P, Q, R> columnInterpreter = ci; T temp; S sum; for (TimeRange t : timeRanges) { if (t.withinTimeRange(timestamp)) { long minTimestamp = t.getMin(); if (sums.containsKey(minTimestamp)) { sum = sums.get(minTimestamp); } else sum = null; temp = ci.getValue(columnFamily, CellUtil.cloneQualifier(kv), kv); if (temp != null) sum = ci.add(sum, ci.castToReturnType(temp)); sums.put(minTimestamp, sum); } } return sums; }
@Override public TimeseriesAggregateProtos.TimeseriesAggregateResponse wrapForTransport(Map results, ColumnInterpreter ci) { Map<Long, S> sums = results; TimeseriesAggregateProtos.TimeseriesAggregateResponse.Builder responseBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponse.newBuilder(); for (Map.Entry<Long, S> entry : sums.entrySet()) { TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.Builder valueBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.newBuilder(); TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.Builder mapElementBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.newBuilder(); valueBuilder.addFirstPart(ci.getProtoForPromotedType(entry.getValue()).toByteString()); mapElementBuilder.setKey(entry.getKey()); mapElementBuilder.setValue(valueBuilder.build()); responseBuilder.addEntry(mapElementBuilder.build()); } return responseBuilder.build(); }
@Override public <T, S, P extends Message, Q extends Message, R extends Message> Map<Long, T> compute( Map results, Cell kv, ColumnInterpreter<T, S, P, Q, R> ci, byte[] columnFamily, long timestamp, List<TimeRange> timeRanges) throws IOException { Map<Long, T> minimums = results; ColumnInterpreter<T, S, P, Q, R> columnInterpreter = ci; T temp; T min; for (TimeRange t : timeRanges) { if (t.withinTimeRange(timestamp)) { long minTimestamp = t.getMin(); if (minimums.containsKey(minTimestamp)) { min = minimums.get(minTimestamp); } else min = null; temp = ci.getValue(columnFamily, CellUtil.cloneQualifier(kv), kv); min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min; minimums.put(minTimestamp, min); } } return minimums; }
@Override public TimeseriesAggregateProtos.TimeseriesAggregateResponse wrapForTransport(Map results, ColumnInterpreter ci) { Map<Long, T> minimums = results; TimeseriesAggregateProtos.TimeseriesAggregateResponse.Builder responseBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponse.newBuilder(); for (Map.Entry<Long, T> entry : minimums.entrySet()) { TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.Builder valueBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.newBuilder(); TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.Builder mapElementBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.newBuilder(); valueBuilder.addFirstPart(ci.getProtoForCellType(entry.getValue()).toByteString()); mapElementBuilder.setKey(entry.getKey()); mapElementBuilder.setValue(valueBuilder.build()); responseBuilder.addEntry(mapElementBuilder.build()); } return responseBuilder.build(); }
@Override public <T, S, P extends Message, Q extends Message, R extends Message> Map compute( Map results, Cell kv, ColumnInterpreter<T, S, P, Q, R> ci, byte[] columnFamily, long timestamp, List<TimeRange> timeRanges) throws IOException { Map<Long, Long> counts = results; long count; for (TimeRange t : timeRanges) { if (t.withinTimeRange(timestamp)) { long minTimestamp = t.getMin(); if (counts.containsKey(minTimestamp)) { count = counts.get(minTimestamp); count++; } else count = 1L; counts.put(minTimestamp, count); } } return counts; }
@Override public TimeseriesAggregateProtos.TimeseriesAggregateResponse wrapForTransport(Map results, ColumnInterpreter ci) { Map<Long, Long> counts = results; TimeseriesAggregateProtos.TimeseriesAggregateResponse.Builder responseBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponse.newBuilder(); for (Map.Entry<Long, Long> entry : counts.entrySet()) { TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.Builder valueBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.newBuilder(); TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.Builder mapElementBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.newBuilder(); valueBuilder.addFirstPart(ByteString.copyFrom(Bytes.toBytes(entry.getValue()))); mapElementBuilder.setKey(entry.getKey()); mapElementBuilder.setValue(valueBuilder.build()); responseBuilder.addEntry(mapElementBuilder.build()); } return responseBuilder.build(); }
@Override public <T, S, P extends Message, Q extends Message, R extends Message> Map<Long, T> compute( Map results, Cell kv, ColumnInterpreter<T, S, P, Q, R> ci, byte[] columnFamily, long timestamp, List<TimeRange> timeRanges) throws IOException { Map<Long, T> maximums = results; ColumnInterpreter<T, S, P, Q, R> columnInterpreter = ci; T temp; T max; for (TimeRange t : timeRanges) { if (t.withinTimeRange(timestamp)) { long minTimestamp = t.getMin(); if (maximums.containsKey(minTimestamp)) { max = maximums.get(minTimestamp); } else max = null; temp = columnInterpreter.getValue(columnFamily, CellUtil.cloneQualifier(kv), kv); max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max; maximums.put(minTimestamp, max); } } return maximums; }
@Override public TimeseriesAggregateProtos.TimeseriesAggregateResponse wrapForTransport(Map results, ColumnInterpreter ci) { Map<Long, T> maximums = results; TimeseriesAggregateProtos.TimeseriesAggregateResponse.Builder responseBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponse.newBuilder(); for (Map.Entry<Long, T> entry : maximums.entrySet()) { TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.Builder valueBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseEntry.newBuilder(); TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.Builder mapElementBuilder = TimeseriesAggregateProtos.TimeseriesAggregateResponseMapEntry.newBuilder(); valueBuilder.addFirstPart(ci.getProtoForCellType(entry.getValue()).toByteString()); mapElementBuilder.setKey(entry.getKey()); mapElementBuilder.setValue(valueBuilder.build()); responseBuilder.addEntry(mapElementBuilder.build()); } return responseBuilder.build(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param table * @param ci * @param scan * @return max val <> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S, P extends Message, Q extends Message, T extends Message> R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); return ci.getCellValueFromProto(q); } return null; } }, aMaxCallBack); return aMaxCallBack.getMax(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKeyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param table * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); bb.rewind(); return bb.getLong(); } }, rowNum); return rowNum.getRowNumCount(); }
<R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent) throws IOException { validateParameters(scan, canFamilyBeAbsent); final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder(); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); P columnInterpreterSpecificData = null; if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); } requestBuilder.setScan(ProtobufUtil.toScan(scan)); return requestBuilder.build(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * * @param table * @param ci * @param scan * @return max val <R> * @throws Throwable The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S, P extends Message, Q extends Message, T extends Message> R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); return ci.getCellValueFromProto(q); } return null; } }, aMaxCallBack); return aMaxCallBack.getMax(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKeyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * * @param table * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); bb.rewind(); return bb.getLong(); } }, rowNum); return rowNum.getRowNumCount(); }
<R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest validateArgAndGetPB(Scan scan, ColumnInterpreter<R, S, P, Q, T> ci, boolean canFamilyBeAbsent) throws IOException { validateParameters(scan, canFamilyBeAbsent); final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder(); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); P columnInterpreterSpecificData = null; if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); } requestBuilder.setScan(ProtobufUtil.toScan(scan)); return requestBuilder.build(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMax(ci, scan); } }, aMaxCallBack); } finally { if (table != null) { table.close(); } } return aMaxCallBack.getMax(); }
/** * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return min val <R> * @throws Throwable */ public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class MinCallBack implements Batch.Callback<R> { private R min = null; public R getMinimum() { return min; } @Override public synchronized void update(byte[] region, byte[] row, R result) { min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; } } MinCallBack minCallBack = new MinCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() { @Override public R call(AggregateProtocol instance) throws IOException { return instance.getMin(ci, scan); } }, minCallBack); } finally { if (table != null) { table.close(); } } log.debug("Min fom all regions is: " + minCallBack.getMinimum()); return minCallBack.getMinimum(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKEyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param tableName * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S> long rowCount(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() { @Override public Long call(AggregateProtocol instance) throws IOException { return instance.getRowNum(ci, scan); } }, rowNum); } finally { if (table != null) { table.close(); } } return rowNum.getRowNumCount(); }
/** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. * @param tableName * @param ci * @param scan * @return sum <S> * @throws Throwable */ public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class SumCallBack implements Batch.Callback<S> { S sumVal = null; public S getSumResult() { return sumVal; } @Override public synchronized void update(byte[] region, byte[] row, S result) { sumVal = ci.add(sumVal, result); } } SumCallBack sumCallBack = new SumCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() { @Override public S call(AggregateProtocol instance) throws IOException { return instance.getSum(ci, scan); } }, sumCallBack); } finally { if (table != null) { table.close(); } } return sumCallBack.getSumResult(); }
/** * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. * @param tableName * @param scan * @throws Throwable */ private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName, final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable { validateParameters(scan); class AvgCallBack implements Batch.Callback<Pair<S, Long>> { S sum = null; Long rowCount = 0l; public Pair<S, Long> getAvgArgs() { return new Pair<S, Long>(sum, rowCount); } @Override public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { sum = ci.add(sum, result.getFirst()); rowCount += result.getSecond(); } } AvgCallBack avgCallBack = new AvgCallBack(); HTable table = null; try { table = new HTable(conf, tableName); table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() { @Override public Pair<S, Long> call(AggregateProtocol instance) throws IOException { return instance.getAvg(ci, scan); } }, avgCallBack); } finally { if (table != null) { table.close(); } } return avgCallBack.getAvgArgs(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param table * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S, P extends Message, Q extends Message, T extends Message> R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); return ci.getCellValueFromProto(q); } return null; } }, aMaxCallBack); return aMaxCallBack.getMax(); }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKeyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param table * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); bb.rewind(); return bb.getLong(); } }, rowNum); return rowNum.getRowNumCount(); }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S, P extends Message, Q extends Message, T extends Message> R max( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return max(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * It gives the maximum value of a column for a given column family for the * given range. In case qualifier is null, a max of all values for the given * family is returned. * @param table * @param ci * @param scan * @return max val <R> * @throws Throwable * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ public <R, S, P extends Message, Q extends Message, T extends Message> R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); class MaxCallBack implements Batch.Callback<R> { R max = null; R getMax() { return max; } @Override public synchronized void update(byte[] region, byte[] row, R result) { max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; } } MaxCallBack aMaxCallBack = new MaxCallBack(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); return ci.getCellValueFromProto(q); } return null; } }, aMaxCallBack); return aMaxCallBack.getMax(); }
/** * It gives the minimum value of a column for a given column family for the * given range. In case qualifier is null, a min of all values for the given * family is returned. * @param tableName * @param ci * @param scan * @return min val <R> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> R min( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return min(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKeyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param tableName * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return rowCount(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * It gives the row count, by summing up the individual results obtained from * regions. In case the qualifier is null, FirstKeyValueFilter is used to * optimised the operation. In case qualifier is provided, I can't use the * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. * @param table * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback<Long> { private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AggregateResponse> rpcCallback = new BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); bb.rewind(); return bb.getLong(); } }, rowNum); return rowNum.getRowNumCount(); }
/** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. * @param tableName * @param ci * @param scan * @return sum <S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> S sum( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return sum(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. * @param tableName * @param scan * @throws Throwable */ private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return getAvgArgs(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * This is the client side interface/handle for calling the std method for a * given cf-cq combination. It was necessary to add one more call stack as its * return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the std and returns the double value. * @param tableName * @param ci * @param scan * @return <R, S> * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return std(table, ci, scan); } finally { if (table != null) { table.close(); } } }
/** * This is the client side interface/handler for calling the median method for a * given cf-cq combination. This method collects the necessary parameters * to compute the median and returns the median. * @param tableName * @param ci * @param scan * @return R the median * @throws Throwable */ public <R, S, P extends Message, Q extends Message, T extends Message> R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); return median(table, ci, scan); } finally { if (table != null) { table.close(); } } }