private SimpleConsumer createSimpleConsumer(Integer brokerId) { try { String brokerInfo = zkClient.readData(ZkUtils.BrokerIdsPath() + "/" + brokerId, true); if (brokerInfo == null) { log.error("Broker clientId %d does not exist", brokerId); return null; } Map<String, Object> map = Resources.jsonMapper.readValue( brokerInfo, new TypeReference<Map<String, Object>>() { } ); String host = (String) map.get("host"); Integer port = (Integer) map.get("port"); return new SimpleConsumer(host, port, 10000, 100000, "KafkaConsumerInfos"); } catch (Exception e) { log.error(e, "Could not parse broker[%d] info", brokerId); return null; } }
private long getTopicLogSize(String topic, int pid) { Option<Object> o = ZkUtils.getLeaderForPartition(zkClient, topic, pid); if (o.isEmpty() || o.get() == null) { log.error("No broker for partition %s - %s", topic, pid); return 0; } Integer leaderId = Int.unbox(o.get()); SimpleConsumer consumer = consumerMap.get(leaderId); if (consumer == null) { consumer = createSimpleConsumer(leaderId); } // createSimpleConsumer may fail. if (consumer == null) { return 0; } consumerMap.put(leaderId, consumer); TopicAndPartition topicAndPartition = new TopicAndPartition(topic, pid); PartitionOffsetRequestInfo requestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1); OffsetRequest request = new OffsetRequest( new Map1<TopicAndPartition, PartitionOffsetRequestInfo>(topicAndPartition, requestInfo), 0, Request.OrdinaryConsumerId() ); OffsetResponse response = consumer.getOffsetsBefore(request); PartitionOffsetsResponse offsetsResponse = response.partitionErrorAndOffsets().get(topicAndPartition).get(); return scala.Long.unbox(offsetsResponse.offsets().head()); }
private List<BrokerInfo> getBrokerInfos() { List<BrokerInfo> infos = Lists.newArrayListWithExpectedSize(consumerMap.size()); for (Map.Entry<Integer, SimpleConsumer> entry : consumerMap.entrySet()) { BrokerInfo info = new BrokerInfo(); info.id = entry.getKey(); info.host = entry.getValue().host(); info.port = entry.getValue().port(); infos.add(info); } return infos; }
@Override public void close() { for (SimpleConsumer consumer : consumerMap.values()) { if (consumer != null) { consumer.close(); } } if (zkClient != null) { zkClient.close(); } }
/** * Gets all of the input splits for the {@code topic}, filtering out any {@link InputSplit}s already consumed by the * {@code group}. * * @param conf * the job configuration. * @param topic * the topic. * @param group * the consumer group. * @return input splits for the job. * @throws IOException */ List<InputSplit> getInputSplits(final Configuration conf, final String topic, final String group) throws IOException { final List<InputSplit> splits = Lists.newArrayList(); final ZkUtils zk = getZk(conf); final Map<Broker, SimpleConsumer> consumers = Maps.newHashMap(); try { for (final Partition partition : zk.getPartitions(topic)) { // cache the consumer connections - each partition will make use of each broker consumer final Broker broker = partition.getBroker(); if (!consumers.containsKey(broker)) { consumers.put(broker, getConsumer(broker)); } // grab all valid offsets final List<Long> offsets = getOffsets(consumers.get(broker), topic, partition.getPartId(), zk.getLastCommit(group, partition), getIncludeOffsetsAfterTimestamp(conf), getMaxSplitsPerPartition(conf)); for (int i = 0; i < offsets.size() - 1; i++) { // ( offsets in descending order ) final long start = offsets.get(i + 1); final long end = offsets.get(i); // since the offsets are in descending order, the first offset in the list is the largest offset for // the current partition. This split will be in charge of committing the offset for this partition. final boolean partitionCommitter = (i == 0); final InputSplit split = new KafkaInputSplit(partition, start, end, partitionCommitter); LOG.debug("Created input split: " + split); splits.add(split); } } } finally { // close resources IOUtils.closeQuietly(zk); for (final SimpleConsumer consumer : consumers.values()) { consumer.close(); } } return splits; }
@VisibleForTesting List<Long> getOffsets(final SimpleConsumer consumer, final String topic, final int partitionNum, final long lastCommit, final long asOfTime, final int maxSplitsPerPartition) { // all offsets that exist for this partition (in descending order) final long[] allOffsets = consumer.getOffsetsBefore(topic, partitionNum, OffsetRequest.LatestTime(), Integer.MAX_VALUE); // this gets us an offset that is strictly before 'asOfTime', or zero if none exist before that time final long[] offsetsBeforeAsOf = consumer.getOffsetsBefore(topic, partitionNum, asOfTime, 1); final long includeAfter = offsetsBeforeAsOf.length == 1 ? offsetsBeforeAsOf[0] : 0; // note that the offsets are in descending order List<Long> result = Lists.newArrayList(); for (final long offset : allOffsets) { if (offset > lastCommit && offset > includeAfter) { result.add(offset); } else { // we add "lastCommit" iff it is after "includeAfter" if (lastCommit > includeAfter) { result.add(lastCommit); } // we can break out of loop here bc offsets are in desc order, and we've hit the latest one to include break; } } // to get maxSplitsPerPartition number of splits, you need (maxSplitsPerPartition + 1) number of offsets. if (result.size() - 1 > maxSplitsPerPartition) { result = result.subList(result.size() - maxSplitsPerPartition - 1, result.size()); } LOG.debug(String.format("Offsets for %s:%d:%d = %s", consumer.host(), consumer.port(), partitionNum, result)); return result; }
protected void init() { simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId); brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port); metricId = new ClientIdAndBroker(clientId, brokerInfo); fetcherStats = new FetcherStats(metricId); fetcherLagStats = new FetcherLagStats(metricId); fetchRequestBuilder = new FetchRequestBuilder(). clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). minBytes(minBytes); }
@VisibleForTesting SimpleConsumer getConsumer(final Broker broker) { return new SimpleConsumer(broker.getHost(), broker.getPort(), DEFAULT_SOCKET_TIMEOUT_MS, DEFAULT_BUFFER_SIZE_BYTES); }
@VisibleForTesting SimpleConsumer getConsumer(final KafkaInputSplit split, final Configuration conf) { return new SimpleConsumer(split.getPartition().getBroker().getHost(), split.getPartition().getBroker() .getPort(), getKafkaSocketTimeoutMs(conf), getKafkaBufferSizeBytes(conf)); }
@Test public void testGetInputSplits() throws Exception { final KafkaInputFormat inputFormat = spy(new KafkaInputFormat()); final SimpleConsumer mockConsumer = mock(SimpleConsumer.class); final ZkUtils mockZk = mock(ZkUtils.class); final Configuration mockConf = new Configuration(false); final Broker broker = new Broker("127.0.0.1", 9092, 1); doReturn(mockConsumer).when(inputFormat).getConsumer(broker); doReturn(mockZk).when(inputFormat).getZk(mockConf); doReturn(Lists.newArrayList(20l, 10l)).when(inputFormat).getOffsets(mockConsumer, "topic", 0, -1, 0, Integer.MAX_VALUE); doReturn(Lists.newArrayList(30l, 20l, 0l)).when(inputFormat).getOffsets(mockConsumer, "topic", 1, 10, 0, Integer.MAX_VALUE); final Partition p1 = new Partition("topic", 0, broker); final Partition p2 = new Partition("topic", 1, broker); when(mockZk.getPartitions("topic")).thenReturn(Lists.newArrayList(p1, p2)); when(mockZk.getBroker(1)).thenReturn(broker); when(mockZk.getLastCommit("group", p1)).thenReturn(-1l); when(mockZk.getLastCommit("group", p2)).thenReturn(10l); final List<InputSplit> result = inputFormat.getInputSplits(mockConf, "topic", "group"); // assert the contents of each split Assert.assertEquals(3, result.size()); final KafkaInputSplit split1 = (KafkaInputSplit) result.get(0); final Broker broker1 = split1.getPartition().getBroker(); assertEquals(broker, broker1); assertEquals("127.0.0.1", broker1.getHost()); assertEquals(9092, broker1.getPort()); assertEquals(1, broker1.getId()); assertEquals("1-0", split1.getPartition().getBrokerPartition()); assertEquals(0, split1.getPartition().getPartId()); assertEquals(10l, split1.getStartOffset()); assertEquals(20l, split1.getEndOffset()); assertEquals("topic", split1.getPartition().getTopic()); final KafkaInputSplit split2 = (KafkaInputSplit) result.get(1); assertEquals(20l, split2.getStartOffset()); assertEquals(30l, split2.getEndOffset()); assertEquals("1-1", split2.getPartition().getBrokerPartition()); final KafkaInputSplit split3 = (KafkaInputSplit) result.get(2); assertEquals(0l, split3.getStartOffset()); assertEquals(20l, split3.getEndOffset()); assertEquals("1-1", split3.getPartition().getBrokerPartition()); // verify one and only one call to getConsumer - should get the cached consumer second time around verify(inputFormat, times(1)).getConsumer(broker); verify(inputFormat, times(1)).getConsumer(any(Broker.class)); // verify the closeable components are closed verify(mockConsumer, times(1)).close(); verify(mockZk, times(1)).close(); }
@Test public void testGetOffsets() throws Exception { final SimpleConsumer consumer = mock(SimpleConsumer.class); final long[] offsets = { 101, 91, 81, 71, 61, 51, 41, 31, 21, 11 }; when(consumer.getOffsetsBefore("topic", 1, OffsetRequest.LatestTime(), Integer.MAX_VALUE)).thenReturn(offsets); when(consumer.getOffsetsBefore("topic", 1, 0, 1)).thenReturn(new long[] {}); final KafkaInputFormat inputFormat = new KafkaInputFormat(); // case 0: get everything (-1 last commit, 0 asOfTime, as many partitions as possible) -> all offsets long[] expected = offsets; List<Long> actual = inputFormat.getOffsets(consumer, "topic", 1, -1, 0, Integer.MAX_VALUE); compareArrayContents(offsets, actual); // case 1: lastCommit of 52 -> we should only get back the first 5 offsets + the lastCommit final int lastCommit = 52; expected = new long[6]; System.arraycopy(offsets, 0, expected, 0, 6); expected[5] = lastCommit; actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, 0, Integer.MAX_VALUE); compareArrayContents(expected, actual); // case 2: lastCommit of 52, asOfTime 51 -> still include last offsets final int asOfTime = 999; when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 51 }); actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE); compareArrayContents(expected, actual); // case 3: lastCommit of 52, asOfTime 52 -> don't include last offsets when(consumer.getOffsetsBefore("topic", 1, asOfTime, 1)).thenReturn(new long[] { 52 }); expected = Arrays.copyOfRange(offsets, 0, 5); actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, Integer.MAX_VALUE); compareArrayContents(expected, actual); // case 4: maxSplitsPerPartition == number of commits (5) -> should include all 5 offsets actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 5); compareArrayContents(expected, actual); // case 5: maxSplitsPerPartition = number of commits - 1 (4) -> should STILL include all 5 offsets actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 4); compareArrayContents(expected, actual); // case 6: maxSplitsPerPartition = number of commits - 2 (3) -> should exclude the first (largest) offset actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 3); expected = Arrays.copyOfRange(offsets, 1, 5); compareArrayContents(expected, actual); // case 7: maxSplitsPerPartition = 1 -> should include just 2 commits actual = inputFormat.getOffsets(consumer, "topic", 1, lastCommit, asOfTime, 1); expected = Arrays.copyOfRange(offsets, 3, 5); compareArrayContents(expected, actual); }