Python kafka 模块,TopicPartition() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用kafka.TopicPartition()

项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def get(self):
        topic_partitions = self._client.cluster.partitions_for_topic(self.topic)
        if not topic_partitions:
            future = self._client.cluster.request_update()
            log.info("No partitions available, performing metadata update.")
            self._client.poll(future=future)
            return {}
        partitions = [TopicPartition(self.topic, partition_id) for partition_id in topic_partitions]
        offsets = self.offsets(partitions, -1)
        committed = self.fetch_committed_offsets(partitions)
        lags = {}
        for tp, offset in six.iteritems(offsets):
            commit_offset = committed[tp] if tp in committed else 0
            numerical = commit_offset if isinstance(commit_offset, int) else commit_offset.offset
            lag = offset - numerical
            pid = tp.partition if isinstance(tp, TopicPartition) else tp
            log.debug("Lag for %s (%s): %s, %s, %s", self.topic, pid, offset, commit_offset, lag)
            lags[pid] = lag
        return lags
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def get_offset_start(brokers, topic=mjolnir.kafka.TOPIC_RESULT):
    """Find the current ending offset for all partitions in topic.

    By calling this prior to producing requests we know all responses come
    after these offsets.
    TODO: This naming doesn't feel right...

    Parameters
    ----------
    brokers : list of str
    topic : str

    Returns
    -------
    list of int
    """
    consumer = kafka.KafkaConsumer(bootstrap_servers=brokers, api_version=mjolnir.kafka.BROKER_VERSION)
    parts = consumer.partitions_for_topic(topic)
    if parts is None:
        return None
    partitions = [kafka.TopicPartition(topic, p) for p in parts]
    consumer.assign(partitions)
    return [consumer.position(p) for p in partitions]
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def __init__(self, location, topic, group, partition_id):
        self._location = location
        self._group = group
        self._topic = topic
        self._consumer = KafkaConsumer(
            bootstrap_servers=self._location,
            group_id=self._group,
            max_partition_fetch_bytes=10485760,
            consumer_timeout_ms=100,
            client_id="%s-%s" % (self._topic, str(partition_id) if partition_id is not None else "all"),
            request_timeout_ms=120 * 1000,
        )

        if partition_id is not None:
            self._partition_ids = [TopicPartition(self._topic, partition_id)]
            self._consumer.assign(self._partition_ids)
        else:
            self._partition_ids = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)]
            self._consumer.subscribe(topics=[self._topic])
            if self._consumer._use_consumer_group():
                self._consumer._coordinator.ensure_coordinator_known()
                self._consumer._coordinator.ensure_active_group()

        self._consumer._update_fetch_positions(self._partition_ids)
        self._start_looping_call()
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def seek(self, consumer, topic, partition):
        KafkaOffset = apps.get_model(app_label='logpipe', model_name='KafkaOffset')
        tp = kafka.TopicPartition(topic=topic, partition=partition)
        try:
            obj = KafkaOffset.objects.get(topic=topic, partition=partition)
            logger.debug('Seeking to offset "%s" on topic "%s", partition "%s"' % (obj.offset, topic, partition))
            consumer.client.seek(tp, obj.offset)
        except KafkaOffset.DoesNotExist:
            logger.debug('Seeking to beginning of topic "%s", partition "%s"' % (topic, partition))
            consumer.client.seek_to_beginning(tp)
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def _get_topic_partitions(self):
        p = []
        partitions = self.client.partitions_for_topic(self.topic_name)
        if not partitions:
            raise MissingTopicError('Could not find topic %s. Does it exist?' % self.topic_name)
        for partition in partitions:
            tp = kafka.TopicPartition(self.topic_name, partition=partition)
            p.append(tp)
        return p
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def assign_to_topic_partition(self, topic_partition=None):
        """Assign a list of TopicPartitions to this consumer.

        - ``partitions`` (list of `TopicPartition`): Assignment for this instance.
        """

        if isinstance(topic_partition, TopicPartition):
            topic_partition = [topic_partition]
        if not self._is_assigned(topic_partition):
            self.consumer.assign(topic_partition)
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def get_position(self, topic_partition=None):
        """Return offset of the next record that will be fetched.

        - ``topic_partition`` (TopicPartition): Partition to check
        """

        if isinstance(topic_partition, TopicPartition):
            return self.consumer.position(topic_partition)
        else:
            raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def seek(self, offset, topic_partition=None):
        """Manually specify the fetch offset for a TopicPartition.

        - ``offset``: Message offset in partition
        - ``topic_partition`` (`TopicPartition`): Partition for seek operation
        """

        if isinstance(topic_partition, TopicPartition):
            self.consumer.seek(topic_partition, offset=offset)
        else:
            raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def seek_to_beginning(self, topic_partition=None):
        """Seek to the oldest available offset for partitions.

        - ``topic_partition``: Optionally provide specific TopicPartitions,
          otherwise default to all assigned partitions.
        """

        if isinstance(topic_partition, TopicPartition):
            self.consumer.seek_to_beginning(topic_partition)
        else:
            raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def get_number_of_messages_in_topicpartition(self, topic_partition=None):
        """Return number of messages in TopicPartition.

        - ``topic_partition`` (list of TopicPartition)
        """

        if isinstance(topic_partition, TopicPartition):
            topic_partition = [topic_partition]

        number_of_messages = 0
        assignment = self.consumer.assignment()

        self.consumer.unsubscribe()
        for Partition in topic_partition:
            if not isinstance(Partition, TopicPartition):
                raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")

            self.assign_to_topic_partition(Partition)

            self.consumer.seek_to_end(Partition)
            end = self.consumer.position(Partition)
            self.consumer.seek_to_beginning(Partition)
            start = self.consumer.position(Partition)
            number_of_messages += end-start

        self.consumer.unsubscribe()
        self.consumer.assign(assignment)
        return number_of_messages
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def commit(self, offsets=None):
        """Commit offsets to kafka, blocking until success or error.

        - ``offset`` (dict): `{TopicPartition: OffsetAndMetadata}` dict to commit with
          the configured group_id. Defaults to currently consumed offsets for all subscribed partitions.
        """

        self.consumer.commit(offsets)
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def committed(self, topic_partition):
        """Returns the last committed offset for the given partition, or None if there was no prior commit.

        - ``topic_partition`` (`TopicPartition`): The partition to check.
        """
        return self.consumer.committed(topic_partition)
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def create_topicpartition(self, topic, partition):
        """Create TopicPartition object

        - ``topic``: kafka topic name
        - ``partition``: topic partition number
        """

        return TopicPartition(topic=topic, partition=partition)
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def _get_result_offsets(self):
        """Get the latest offsets for all partitions in topic"""
        consumer = kafka.KafkaConsumer(bootstrap_servers=self.brokers,
                                       auto_offset_reset='latest',
                                       api_version=mjolnir.kafka.BROKER_VERSION)
        partitions = [kafka.TopicPartition(self.topic_result, p)
                      for p in consumer.partitions_for_topic(self.topic_result)]
        consumer.assign(partitions)
        consumer.seek_to_end()
        offsets = [consumer.position(tp) for tp in partitions]
        consumer.close()
        return offsets
项目:Locus    作者:senditya    | 项目源码 | 文件源码
def poll(topic, offset=0, hostname=None, port_num=None, max_timeout=100):
    hostname, port_num = insure_host_port(hostname, port_num)
    server = hostname+':'+str(port_num)
    topic_partition = TopicPartition(topic, partition)

    consumer = KafkaConsumer(bootstrap_servers=server, group_id=None)
    consumer.assign([topic_partition])
    consumer.seek(topic_partition, offset)
    msgs = consumer.poll(max_timeout).values()
    consumer.close()
    if len(msgs) > 0:
        return msgs[0]
    else:
        return {}
项目:kq    作者:joowani    | 项目源码 | 文件源码
def info(self):
        """Print the offset information for all topics and partitions."""
        print('Offsets per Topic:')
        for topic in self._consumer.topics():
            print('\nTopic {}:\n'.format(topic))
            partitions = self._consumer.partitions_for_topic(topic)
            if partitions is None:  # pragma: no cover
                print('    Polling failed (please try again)')
                continue
            for partition in self._consumer.partitions_for_topic(topic):
                topic_partition = kafka.TopicPartition(topic, partition)
                self._consumer.assign([topic_partition])
                offset = self._consumer.position(topic_partition)
                print('    Partition {:<3}: {}'.format(partition, offset))
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def offsets(self, partitions, timestamp):
        """Fetch a single offset before the given timestamp for the set of partitions.

        Blocks until offset is obtained, or a non-retriable exception is raised

        Arguments:
            partitions (iterable of TopicPartition) The partition that needs fetching offset.
            timestamp (int): timestamp for fetching offset. -1 for the latest
                available, -2 for the earliest available. Otherwise timestamp
                is treated as epoch seconds.

        Returns:
            dict: TopicPartition and message offsets
        """
        while True:
            offsets = {}
            ok = True
            for future in self._send_offset_request(partitions, timestamp):
                self._client.poll(future=future)

                if future.succeeded():
                    for tp, offset in future.value:
                        offsets[tp] = offset
                    continue

                if not future.retriable():
                    raise future.exception  # pylint: disable-msg=raising-bad-type

                if future.exception.invalid_metadata:
                    refresh_future = self._client.cluster.request_update()
                    self._client.poll(future=refresh_future, sleep=True)
                    ok = False
                    break
            if ok:
                return offsets
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def _send_offset_request(self, partitions, timestamp):
        """Fetch a single offset before the given timestamp for the partition.

        Arguments:
            partitions iterable of TopicPartition: partitions that needs fetching offset
            timestamp (int): timestamp for fetching offset

        Returns:
            list of Future: resolves to the corresponding offset
        """
        topic = partitions[0].topic
        nodes_per_partitions = {}
        for partition in partitions:
            node_id = self._client.cluster.leader_for_partition(partition)
            if node_id is None:
                log.debug("Partition %s is unknown for fetching offset,"
                          " wait for metadata refresh", partition)
                return Future().failure(Errors.StaleMetadata(partition))
            elif node_id == -1:
                log.debug("Leader for partition %s unavailable for fetching offset,"
                          " wait for metadata refresh", partition)
                return Future().failure(Errors.LeaderNotAvailableError(partition))
            nodes_per_partitions.setdefault(node_id, []).append(partition)

        # Client returns a future that only fails on network issues
        # so create a separate future and attach a callback to update it
        # based on response error codes
        futures = []
        for node_id, partitions in six.iteritems(nodes_per_partitions):
            request = OffsetRequest[0](
                -1, [(topic, [(partition.partition, timestamp, 1) for partition in partitions])]
            )
            future_request = Future()
            _f = self._client.send(node_id, request)
            _f.add_callback(self._handle_offset_response, partitions, future_request)
            _f.add_errback(lambda e: future_request.failure(e))
            futures.append(future_request)
        return futures
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def _handle_offset_response(self, partitions, future, response):
        """Callback for the response of the list offset call above.

        Arguments:
            partition (TopicPartition): The partition that was fetched
            future (Future): the future to update based on response
            response (OffsetResponse): response from the server

        Raises:
            AssertionError: if response does not match partition
        """
        topic, partition_info = response.topics[0]
        assert len(response.topics) == 1, (
            'OffsetResponse should only be for a single topic')
        partition_ids = set([part.partition for part in partitions])
        result = []
        for pi in partition_info:
            part, error_code, offsets = pi
            assert topic == partitions[0].topic and part in partition_ids, (
                'OffsetResponse partition does not match OffsetRequest partition')
            error_type = Errors.for_code(error_code)
            if error_type is Errors.NoError:
                assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
                log.debug("Fetched offset %s for partition %d", offsets[0], part)
                result.append((TopicPartition(topic, part), offsets[0]))
            elif error_type in (Errors.NotLeaderForPartitionError,
                                Errors.UnknownTopicOrPartitionError):
                log.debug("Attempt to fetch offsets for partition %s failed due"
                          " to obsolete leadership information, retrying.",
                          str(partitions))
                future.failure(error_type(partitions))
            else:
                log.warning("Attempt to fetch offsets for partition %s failed due to:"
                            " %s", partitions, error_type)
                future.failure(error_type(partitions))
        future.success(result)
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def fetch_committed_offsets(self, partitions):
        """Fetch the current committed offsets for specified partitions

        Arguments:
            partitions (list of TopicPartition): partitions to fetch

        Returns:
            dict: {TopicPartition: OffsetAndMetadata}
        """
        if not partitions:
            return {}

        while True:
            self._ensure_coordinator_known()

            # contact coordinator to fetch committed offsets
            future = self._send_offset_fetch_request(partitions)
            self._client.poll(future=future)

            if future.succeeded():
                return future.value

            if not future.retriable():
                raise future.exception  # pylint: disable-msg=raising-bad-type

            time.sleep(self.config['retry_backoff_ms'] / 1000.0)
项目:mysql-binlog-replication    作者:nghiaminhle    | 项目源码 | 文件源码
def fetch_lastest_checkpoint(self)->BinLogMetadata:

        partition = TopicPartition(self._topic, 0)
        self._consumer.assign([partition])
        self._consumer.seek_to_end()
        offset = self._consumer.position(partition)
        if offset == None or offset ==0:
            return None
        self._consumer.seek(partition, offset-1)
        for message in self._consumer:
            metadata_attributes = json.loads(message.value.decode())
            metadata = BinLogMetadata(log_pos=metadata_attributes['pos'], log_file=metadata_attributes['file'])
            return metadata

        return None
项目:robotframework-KafkaLibrary    作者:s4int    | 项目源码 | 文件源码
def seek_to_end(self, topic_partition=None):
        """Seek to the most recent available offset for partitions.

        - ``topic_partition``: Optionally provide specific `TopicPartitions`,
          otherwise default to all assigned partitions.
        """

        if isinstance(topic_partition, TopicPartition):
            self.consumer.seek_to_end(topic_partition)
        else:
            raise TypeError("topic_partition must be of type TopicPartition, create it with Create TopicPartition keyword.")
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def get_offset_end(brokers, run_id, num_end_sigils, topic=mjolnir.kafka.TOPIC_COMPLETE):
    """ Find the offset of the last message of our run

    The 'end run' message gets reflected, by the client running on relforge,
    back into TOPIC_COMPLETE into all partitions. This reads those partitions
    and looks for the ending offset of all partitions based on that reflected
    message

    Parameters
    ----------
    brokers : list of str
    run_id : str
        Unique identifier for this run
    num_end_sigils : int
        The number of unique end run sigils to expect. This should be the number of partitions
        of the topic requests were produced to.
    topic : str, optional
        Topic to look for end run messages in

    Returns
    -------
    list of ints
        The offset of the end run message for all partitions
    """
    consumer = kafka.KafkaConsumer(bootstrap_servers=brokers,
                                   # The topic we are reading from is very low volume,
                                   # containing only reflected end run sigils. To make
                                   # sure we don't miss one start at the beginning.
                                   auto_offset_reset='earliest',
                                   value_deserializer=json.loads,
                                   api_version=mjolnir.kafka.BROKER_VERSION)
    parts = consumer.partitions_for_topic(topic=mjolnir.kafka.TOPIC_COMPLETE)
    if parts is None:
        raise RuntimeError("topic %s missing" % topic)

    partitions = [kafka.TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
    consumer.assign(partitions)
    # Tracks the maximum reported offset in the response topic
    offsets_end = [-1] * num_end_sigils
    # Tracks the sigils that have been seen for the request topics
    # Uses a set incase duplicate messages are sent somehow, to ensure
    # we see a message for all expected partitions
    seen_sigils = set()
    for message in consumer:
        if 'run_id' in message.value and message.value['run_id'] == run_id and 'complete' in message.value:
            print 'found sigil for run %s and partition %d' % (message.value['run_id'], message.value['partition'])
            for partition, offset in enumerate(message.value['offsets']):
                offsets_end[partition] = max(offsets_end[partition], offset)
            seen_sigils.add(message.value['partition'])
            # Keep reading until all sigils have been reflected.
            if len(seen_sigils) >= num_end_sigils:
                consumer.close()
                return offsets_end
    consumer.close()
    raise RuntimeError("Finished consuming, but %d partitions remain" % (len(partitions) - len(seen_sigils)))
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def _send_offset_fetch_request(self, partitions):
        """Fetch the committed offsets for a set of partitions.

        This is a non-blocking call. The returned future can be polled to get
        the actual offsets returned from the broker.

        Arguments:
            partitions (list of TopicPartition): the partitions to fetch

        Returns:
            Future: resolves to dict of offsets: {TopicPartition: int}
        """
        assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
        assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
        if not partitions:
            return Future().success({})

        elif self._coordinator_unknown():
            return Future().failure(Errors.GroupCoordinatorNotAvailableError)

        node_id = self._coordinator_id

        # Verify node is ready
        if not self._client.ready(node_id):
            log.debug("Node %s not ready -- failing offset fetch request",
                      node_id)
            return Future().failure(Errors.NodeNotReadyError)

        log.debug("Group %s fetching committed offsets for partitions: %s",
                  self.group_id, partitions)
        # construct the request
        topic_partitions = collections.defaultdict(set)
        for tp in partitions:
            topic_partitions[tp.topic].add(tp.partition)

        if self.config['api_version'] >= (0, 8, 2):
            request = OffsetFetchRequest[1](
                self.group_id,
                list(topic_partitions.items())
            )
        else:
            request = OffsetFetchRequest[0](
                self.group_id,
                list(topic_partitions.items())
            )

        # send the request with a callback
        future = Future()
        _f = self._client.send(node_id, request)
        _f.add_callback(self._handle_offset_fetch_response, future)
        _f.add_errback(self._failed_request, node_id, request, future)
        return future