@Test public void testLogging() throws InterruptedException { for (int i = 0; i<1000; ++i) { logger.info("message"+i); } final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0); final ConsumerIterator<byte[], byte[]> iterator = log.iterator(); for (int i=0; i<1000; ++i) { final String messageFromKafka = new String(iterator.next().message(), UTF8); assertThat(messageFromKafka, Matchers.equalTo("message"+i)); } }
/** * 启动 MessageReceiver,开始监听topic消息 */ @Override public void start() { if (consumer == null) { //sync init synchronized (lock) { init(); } } String topicString = buildTopicsString(); Whitelist topicFilter = new Whitelist(topicString); List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions); if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList)) try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { log.warn(e.getMessage(), e); } processStreamsByTopic(topicString, streamList); }
public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws UnknownHostException { mConfig = config; mOffsetTracker = offsetTracker; mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig()); if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) { throw new RuntimeException("Topic filter and blacklist cannot be both specified."); } TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty()? new Blacklist(mConfig.getKafkaTopicBlacklist()): new Whitelist(mConfig.getKafkaTopicFilter()); LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter); List<KafkaStream<byte[], byte[]>> streams = mConsumerConnector.createMessageStreamsByFilter(topicFilter); KafkaStream<byte[], byte[]> stream = streams.get(0); mIterator = stream.iterator(); mLastAccessTime = new HashMap<TopicPartition, Long>(); StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId()); mTopicPartitionForgetSeconds = mConfig.getTopicPartitionForgetSeconds(); mCheckMessagesPerSecond = mConfig.getMessagesPerSecond() / mConfig.getConsumerThreads(); mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass()); }
private static List<String> getTopics(ZkClient zkClient, TopicCommandOptions opts) { String topicsSpec = opts.options.valueOf(opts.topicOpt); final Whitelist topicsFilter = new Whitelist(topicsSpec); Set<String> allTopics = ZkUtils.getAllTopics(zkClient); final List<String> result = Lists.newArrayList(); Utils.foreach(allTopics, new Callable1<String>() { @Override public void apply(String topic) { if (topicsFilter.isTopicAllowed(topic)) result.add(topic); } }); Collections.sort(result); return result; }
@Test public void testLogging() throws InterruptedException { final Logger logger = loggerContext.getLogger("ROOT"); unit.start(); assertTrue("appender is started", unit.isStarted()); for (int i = 0; i<1000; ++i) { final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]); unit.append(loggingEvent); } final Properties consumerProperties = new Properties(); consumerProperties.put("metadata.broker.list", kafka.getBrokerList()); consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt()); consumerProperties.put("auto.commit.enable","false"); consumerProperties.put("auto.offset.reset","smallest"); consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection()); final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties); final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0); final ConsumerIterator<byte[], byte[]> iterator = log.iterator(); for (int i=0; i<1000; ++i) { final String messageFromKafka = new String(iterator.next().message(), UTF8); assertThat(messageFromKafka, Matchers.equalTo("message"+i)); } }
private void init() { // register kafka offset lag metrics, one Gauge is for per consumer level granularity MetricRegistry registry = Metrics.getRegistry(); try { fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate"); failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest"); kafkaOffsetLagGauge = registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge( new ObjectName(maxLagMetricName), "Value")); } catch (MalformedObjectNameException | IllegalArgumentException e) { logger.error("Register failure for metrics of KafkaIngesterConsumer", e); } TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME); logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME); this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0); iterator = stream.iterator(); logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName()); if (AuditConfig.INGESTER_ENABLE_DEDUP) { deduplicator = new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT, AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX, AuditConfig.INGESTER_HOSTS_WITH_DUP); deduplicator.open(); } else { deduplicator = null; } }
public String pattern() { if (topicFilter instanceof Whitelist) return whiteListPattern; else if (topicFilter instanceof Blacklist) return blackListPattern; else throw new KafkaZKException("Invalid topicFilter."); }
@Test public void testCodahaleKafkaMetricsReporter() { registry = new MetricRegistry(); registry.counter("test_counter").inc(); kafkaReporter = KafkaReporter.builder(registry, kafkaConnect, topic, schemaRegistry).build(); // ObjectMapper mapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, // TimeUnit.SECONDS, // false)); // StringWriter r = new StringWriter(); // try { // mapper.writeValue(r, registry); // } catch (IOException e) { // e.printStackTrace(); // } kafkaReporter.report(); Properties props = new Properties(); props.put("zookeeper.connect", zkConnect); props.put("group.id", UUID.randomUUID().toString()); props.put("auto.offset.reset", "smallest"); props.put("zookeeper.session.timeout.ms", "30000"); props.put("consumer.timeout.ms", "30000"); props.put("schema.registry.url", schemaRegistry); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(props))).get(0); GenericRecord message = (GenericRecord) messageStream.iterator().next().message(); assertNotNull(message); }
@Test public void testTopicReporter() { MetricsRegistry registry = new MetricsRegistry(); Counter counter = registry.newCounter(KafkaReporterTest.class, "test-counter"); counter.inc(); Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnect); producerProps.put("schema.registry.url", schemaRegistry); KafkaReporter reporter = new KafkaReporter(registry, producerProps, topic); reporter.start(1, TimeUnit.SECONDS); Properties props = new Properties(); props.put("zookeeper.connect", zkConnect); props.put("group.id", UUID.randomUUID().toString()); props.put("auto.offset.reset", "smallest"); props.put("zookeeper.session.timeout.ms", "30000"); props.put("consumer.timeout.ms", "30000"); props.put("schema.registry.url", schemaRegistry); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); KafkaStream<String, Object> messageStream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(props))).get(0); GenericRecord message = (GenericRecord) messageStream.iterator().next().message(); assertNotNull(message); reporter.shutdown(); }
public static TopicFilter anyOf( String...topics) { StringJoiner joiner = new StringJoiner(","); for ( String topic : topics ) { joiner.add(topic); } return new Whitelist(joiner.toString()); }
public synchronized void start() { Config conf= Config.getInstance(); Properties props = new Properties(); props.put("zookeeper.connect",conf .get("kafka.servers")); props.put("group.id", conf.get("kafka.groupid")); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( props)); String topics = ""; for (String s : callbacks.keySet()) { topics += "," + s; } if (topics.length() > 0) { topics = topics.substring(1); Decoder<String> sd = new StringDecoder(null); List<KafkaStream<String, String>> streams = consumer .createMessageStreamsByFilter( new Whitelist(topics), Integer.parseInt(conf.get( "kafka.threads")), sd, sd); if (streams != null) { ExecutorService tph = ThreadPoolHelper.getInstance() .getSchPool(); for (KafkaStream<String, String> stream : streams) { tph.submit(new CallbackThread(callbacks, stream)); } } } }
/** * Default ctor * @param group The group object this writer belongs to * @param threadID The ID of the thread, an index from 0 to max int * @param topics The topic list to subscribe to */ public KafkaRpcPluginThread(final KafkaRpcPluginGroup group, final int threadID, final String topics) { if (topics == null || topics.isEmpty()) { throw new IllegalArgumentException("Missing topics"); } if (threadID < 0) { throw new IllegalArgumentException("Cannot have a negative thread ID: " + threadID); } if (group.getParent().getTSDB() == null) { throw new IllegalArgumentException("Missing TSDB in the group"); } if (group.getRateLimiter() == null) { throw new IllegalArgumentException("Missing rate limiter in the group"); } if (group.getGroupID() == null || group.getGroupID().isEmpty()) { throw new IllegalArgumentException("Missing group ID"); } if (group.getParent().getHost() == null || group.getParent().getHost().isEmpty()) { throw new IllegalArgumentException("Missing host name"); } namespace_counters = group.getParent().getNamespaceCounters(); track_metric_prefix = group.getParent().trackMetricPrefix(); this.thread_id = threadID; this.group = group; this.tsdb = group.getParent().getTSDB(); this.rate_limiter = group.getRateLimiter(); this.consumer_type = group.getConsumerType(); thread_running.set(false); topic_filter = new Whitelist(topics); consumer_id = threadID + "_" + group.getParent().getHost(); if (consumer_type == TsdbConsumerType.REQUEUE_RAW) { if (group.getParent().getConfig().hasProperty( KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay")) { requeue_delay = group.getParent().getConfig().getLong( KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + "requeueDelay"); } else { requeue_delay = KafkaRpcPluginConfig.DEFAULT_REQUEUE_DELAY_MS; } } else { requeue_delay = 0; } deserializer = group.getDeserializer(); }
@SuppressWarnings("unchecked") public static TopicCount constructTopicCount(ZKConnector<?> zkClient, String group, String consumerId) { KafkaZKData.ZKGroupDirs dirs = new KafkaZKData.ZKGroupDirs(group); String subscriptionPattern = null; Map<String, Integer> topMap = null; try { String topicCountString = zkClient.readData(dirs.consumerRegistryDir() + "/" + consumerId); ObjectMapper mapper = new ObjectMapper(); TypeReference<Map<String, Object>> typeMap = new TypeReference<Map<String, Object>>() { }; Map<String, Object> jsonObj = mapper.reader(typeMap).readValue( topicCountString); if (jsonObj == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); Object pattern = jsonObj.get("pattern"); if (pattern == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); subscriptionPattern = (String) pattern; Object sub = jsonObj.get("subscription"); if (sub == null) throw new KafkaZKException("error constructing TopicCount : " + topicCountString); topMap = (Map<String, Integer>) sub; } catch (Throwable t) { throw new KafkaZKException(t); } boolean hasWhiteList = whiteListPattern.equals(subscriptionPattern); boolean hasBlackList = blackListPattern.equals(subscriptionPattern); if (topMap.isEmpty() || !(hasWhiteList || hasBlackList)) { return new StaticTopicCount(consumerId, topMap); } else { String regex = null; Integer numStreams = -1; for (Entry<String, Integer> entity : topMap.entrySet()) { regex = entity.getKey(); numStreams = entity.getValue(); break; } TopicFilter filter = hasWhiteList ? new Whitelist(regex) : new Blacklist(regex); return new WildcardTopicCount(zkClient, consumerId, filter, numStreams); } }
public static TopicFilter of( String topic ) { return new Whitelist(topic); }