public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws UnknownHostException { mConfig = config; mOffsetTracker = offsetTracker; mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { throw new RuntimeException("Topic filter and blacklist cannot be both specified."); } TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()): new Whitelist(mConfig.getKafkaTopicFilter()); LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); List<KafkaStream<byte[], byte[]>> streams = mConsumerConnector.createMessageStreamsByFilter(topicFilter); KafkaStream<byte[], byte[]> stream = streams.get(0); mIterator = stream.iterator(); mLastAccessTime = new HashMap<TopicPartition, Long>(); StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId()); mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds(); mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads(); mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); }
public String pattern() { if (topicFilter instanceof Whitelist) return whiteListPattern; else if (topicFilter instanceof Blacklist) return blackListPattern; else throw new KafkaZKException("Invalid topicFilter."); }
public static TopicFilter noneOf( String...topics) { StringJoiner joiner = new StringJoiner(","); for ( String topic : topics ) { joiner.add(topic); } return new Blacklist(joiner.toString()); }
@SuppressWarnings("unchecked") public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group, String consumerId) { KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group); String subscriptionPattern = null; Map<String, Integer> topMap = null; try { String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId); ObjectMapper mapper = new ObjectMapper(); TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() { }; Map<String, Object> jsonObj = mapper.reader(typeMap).readValue( topicCountString); if (jsonObj == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); Object pattern = jsonObj.get("pattern"); if (pattern == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); subscriptionPattern = (String) pattern; Object sub = jsonObj.get("subscription"); if (sub == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); topMap = (Map<String, Integer>) sub; } catch (Throwable t) { throw new KafkaZKException(t); } boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern); boolean hasBlackList = blackListPattern.equals(subscriptionPattern); if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) { return new StaticTopicCount(consumerId, topMap); } else { String regex = null; Integer numStreams = -1; for (Entry<String, Integer> entity : topMap.entrySet()) { regex = entity.getKey(); numStreams = entity.getValue(); break; } TopicFilter filter = hasWhiteList ? new Whitelist(regex) : new Blacklist(regex); return new WildcardTopicCount(zkClient, consumerId, filter, numStreams); } }