@Override public RowFilter adapt(FilterAdapterContext context, FuzzyRowFilter filter) throws IOException { Interleave.Builder interleaveBuilder = Interleave.newBuilder(); List<Pair<byte[], byte[]>> pairs = extractFuzzyRowFilterPairs(filter); if (pairs.isEmpty()) { return ALL_VALUES_FILTER; } for (Pair<byte[], byte[]> pair : pairs) { Preconditions.checkArgument( pair.getFirst().length == pair.getSecond().length, "Fuzzy info and match mask must have the same length"); interleaveBuilder.addFilters( createSingleRowFilter( pair.getFirst(), pair.getSecond())); } return RowFilter.newBuilder().setInterleave(interleaveBuilder).build(); }
public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) { if (fuzzyKeys != null && fuzzyKeys.size() > 0) { FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys)); Filter filter = scan.getFilter(); if (filter != null) { // may have existed InclusiveStopFilter, see buildScan FilterList filterList = new FilterList(); filterList.addFilter(filter); filterList.addFilter(rowFilter); scan.setFilter(filterList); } else { scan.setFilter(rowFilter); } } }
private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) { List<Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys(); if (fuzzyKeys != null && fuzzyKeys.size() > 0) { FuzzyRowFilter rowFilter = new FuzzyRowFilter(fuzzyKeys); Filter filter = scan.getFilter(); if (filter != null) { // may have existed InclusiveStopFilter, see buildScan FilterList filterList = new FilterList(); filterList.addFilter(filter); filterList.addFilter(rowFilter); scan.setFilter(filterList); } else { scan.setFilter(rowFilter); } } }
private static List<Pair<byte[], byte[]>> extractFuzzyRowFilterPairs(FuzzyRowFilter filter) throws IOException { // TODO: Change FuzzyRowFilter to expose fuzzyKeysData. FilterProtos.FuzzyRowFilter filterProto = FilterProtos.FuzzyRowFilter.parseFrom(filter.toByteArray()); List<Pair<byte[], byte[]>> result = new ArrayList<>(filterProto.getFuzzyKeysDataCount()); for (BytesBytesPair protoPair : filterProto.getFuzzyKeysDataList()) { result.add( new Pair<>( protoPair.getFirst().toByteArray(), protoPair.getSecond().toByteArray())); } return result; }
@Test public void fuzzyKeysAreTranslatedToRegularExpressions() throws IOException { List<Pair<byte[], byte[]>> testPairs = ImmutableList.<Pair<byte[], byte[]>>builder() .add(new Pair<>(new byte[]{0, 0, 0, 0}, Bytes.toBytes("abcd"))) .add(new Pair<>(new byte[]{0, 0, 1, 0}, Bytes.toBytes(".fgh"))) .add(new Pair<>(new byte[]{1, 1, 1, 1}, Bytes.toBytes("ijkl"))) .build(); FuzzyRowFilter filter = new FuzzyRowFilter(testPairs); RowFilter adaptedFilter = adapter.adapt(context, filter); Assert.assertEquals( RowFilter.newBuilder() .setInterleave( Interleave.newBuilder() .addFilters( RowFilter.newBuilder() .setRowKeyRegexFilter( ByteString.copyFromUtf8("abcd"))) .addFilters( RowFilter.newBuilder() .setRowKeyRegexFilter( ByteString.copyFromUtf8("\\.f\\Ch"))) .addFilters( RowFilter.newBuilder() .setRowKeyRegexFilter( ByteString.copyFromUtf8("\\C\\C\\C\\C")))) .build(), adaptedFilter); }
Scan GenerateScanFuzzy(long starttime, long endtime, String cluster, String path) throws IOException { Scan scan = createScanWithAllColumns(); String rowKeySuffix = HdfsConstants.SEP + cluster + HdfsConstants.SEP + StringUtil.cleanseToken(path); String rowKey = HdfsConstants.INVERTED_TIMESTAMP_FUZZY_INFO + rowKeySuffix; int fuzzyLength = HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP + rowKeySuffix.length(); byte[] fuzzyInfo = new byte[fuzzyLength]; for (int i = 0; i < HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP; i++) { fuzzyInfo[i] = 1; } for (int i = HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP; i < fuzzyLength; i++) { fuzzyInfo[i] = 0; } @SuppressWarnings("unchecked") FuzzyRowFilter rowFilter = new FuzzyRowFilter(Arrays.asList( new Pair<byte[], byte[]>(Bytes.toBytesBinary(rowKey), fuzzyInfo))); scan.setFilter(rowFilter); String minStartKey = Long.toString(getEncodedRunId(starttime)); String maxEndKey = Long.toString(getEncodedRunId(endtime)); LOG.info( starttime + " " + getEncodedRunId(starttime) + " min " + minStartKey + " " + endtime + " " + maxEndKey + " " + getEncodedRunId(endtime)); scan.setStartRow(Bytes.toBytes(minStartKey + rowKeySuffix)); scan.setStopRow(Bytes.toBytes(maxEndKey + rowKeySuffix)); LOG.info(" scan: " + scan.toJSON()); return scan; }
@Override public FilterSupportStatus isFilterSupported( FilterAdapterContext context, FuzzyRowFilter filter) { return FilterSupportStatus.SUPPORTED; }