Python kafka 模块,KafkaConsumer() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用kafka.KafkaConsumer()

项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def check_kafka_msg(topic='events', nbr_msg=100):

    ## Collect Messages from Bus
    consumer = KafkaConsumer(
        bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT),
        auto_offset_reset='earliest')

    consumer.subscribe([topic])

    counter = 0
    for message in consumer:
        counter = counter + 1
        if counter == nbr_msg:
            break

    return counter
项目:ALBATROSS    作者:KVSDURGASURESH    | 项目源码 | 文件源码
def _python_kafka_consumer(self, topic):
        """
            Populate total message dump into console
        """
        self.topic = topic
        # KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
        KafkaConsumer(auto_offset_reset='latest', enable_auto_commit=False, group_id=None)
        consumer = KafkaConsumer(topic, bootstrap_servers=self.brokers)
        for msg in consumer:
            return msg
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def test_kafka_fixture(self):
        consumer = KafkaConsumer(
            self.topic,
            bootstrap_servers='kafka.docker:9092',
            key_deserializer=lambda item: json.loads(item.decode('utf-8')),
            value_deserializer=lambda item: json.loads(item.decode('utf-8')),
            auto_offset_reset='earliest',
        )

        actual_data = []
        for i in range(5):
            message = next(consumer)
            data = {'key': message.key, 'value': message.value}
            actual_data.append(data)

        expected_data = self.spark.read.json(
            absolute_path(__file__, 'resources', 'test_fixtures', 'kafka.json')
        )
        self.assertDataFrameEqual(expected_data, actual_data)
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def get_offset_start(brokers, topic=mjolnir.kafka.TOPIC_RESULT):
    """Find the current ending offset for all partitions in topic.

    By calling this prior to producing requests we know all responses come
    after these offsets.
    TODO: This naming doesn't feel right...

    Parameters
    ----------
    brokers : list of str
    topic : str

    Returns
    -------
    list of int
    """
    consumer = kafka.KafkaConsumer(bootstrap_servers=brokers, api_version=mjolnir.kafka.BROKER_VERSION)
    parts = consumer.partitions_for_topic(topic)
    if parts is None:
        return None
    partitions = [kafka.TopicPartition(topic, p) for p in parts]
    consumer.assign(partitions)
    return [consumer.position(p) for p in partitions]
项目:dcos-kafka-cassandra    作者:RobBagby    | 项目源码 | 文件源码
def consume():
    """Consumes events from partitionlag topic"""
    LOGGER.setLevel(APPLICATION_LOGGING_LEVEL)
    LOGGER.info("Starting lagreader")
    LOGGER.debug('Set Logging Level to ' + APPLICATION_LOGGING_LEVEL)
    LOGGER.debug('Listening on Kafka at: ' + KAFKA_URI)

    consumer = KafkaConsumer(group_id='lagConsumerGroup', bootstrap_servers=KAFKA_URI)
    consumer.subscribe(topics=['partitionlag'])
    partition_lag_dict = PartitionLagDict()

    last_writetime = datetime.datetime.now()

    for msg in consumer:
        jsonstring = msg.value
        partitionlag = PartitionLag.from_json(jsonstring)
        partition_lag_dict.addPartitionLag(partitionlag)
        LOGGER.debug(str(partitionlag.eventdate) + "  Received partitionlag event: " \
            + "partition: " + str(partitionlag.partition) \
            + " lag: " + str(partitionlag.lag))
        LOGGER.debug(str(datetime.datetime.now()) + ' Received partitionlag: ' \
            + partition_lag_dict.toString())

        last_writetime = _notifylag_conditionally(partition_lag_dict, last_writetime)
项目:kzmonitor    作者:tqlihuiqi    | 项目源码 | 文件源码
def getOffsets(self, topic, partitions, group):
        """ ??topic?partition?group, ??offsets?? """

        try:
            # ????zookeeper-storage api??offsets??
            # ?????group?offsets?????UnknownTopicOrPartitionError??
            tp = self.client.send_offset_fetch_request(group, [OffsetRequestPayload(topic, p, -1, 1) for p in partitions])
            offsets = {p.partition: p.offset for p in tp}

        except UnknownTopicOrPartitionError:
            # ???????kafka-storage api??offsets??
            consumer = KafkaConsumer(group_id=group, bootstrap_servers=self.broker, enable_auto_commit=False)
            tp = [TopicPartition(topic, p) for p in partitions]
            consumer.assign(tp)
            offsets = {p.partition: consumer.position(p) for p in tp}

        return offsets
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def consumer_msg(serverlist):
    consumer = KafkaConsumer('oa_qian',
                             group_id='my-group',
                             bootstrap_servers=[serverlist])
    for message in consumer:
        msg = bytes.decode(message.value)
        msglist = msg.split(";")
        username = msglist[0]
        password = msglist[1]
        key = msglist[2]
        type = msglist[3]
        oa = Attendance(username, password, key)
        if (type == '1'):
            result = oa.singin()
            print result
            if(result == 'success'):
                print oa.log_record(username, type)
        else:
            result = oa.singout()
            print result
            if(result == 'success'):
                print oa.log_record(username, type)
项目:wsp    作者:wangybnet    | 项目源码 | 文件源码
def __init__(self, config):
        assert isinstance(config, FetcherConfig), "Wrong configuration"
        log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr))
        self._config = config
        self.master_addr = self._config.master_rpc_addr
        if not self.master_addr.startswith("http://"):
            self.master_addr = "http://%s" % self.master_addr
        self._host, self._port = self._config.fetcher_rpc_addr.split(":")
        self._port = int(self._port)
        self._sys_config = self._pull_sys_config_from_master()
        self.isRunning = False
        self.rpcServer = self._create_rpc_server()
        self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ])
        self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ],
                                      auto_offset_reset='earliest',
                                      consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms)
        self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout)
        self._task_manager = TaskManager(self._sys_config, self._config)
        self._reporter_manager = ReporterManager(self._sys_config)
        self._request_task = {}
        self.taskDict = {}
        self._subscribe_lock = threading.Lock()
        # NOTE: Fetcher?????Master?????
        self._addr = None
项目:wsp    作者:wangybnet    | 项目源码 | 文件源码
def __init__(self, config):
        assert isinstance(config, FetcherConfig), "Wrong configuration"
        log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr))
        self._config = config
        self.master_addr = self._config.master_rpc_addr
        if not self.master_addr.startswith("http://"):
            self.master_addr = "http://%s" % self.master_addr
        self._host, self._port = self._config.fetcher_rpc_addr.split(":")
        self._port = int(self._port)
        self._sys_config = self._pull_sys_config_from_master()
        self.isRunning = False
        self.rpcServer = self._create_rpc_server()
        self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ])
        self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ],
                                      auto_offset_reset='earliest',
                                      consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms)
        self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout)
        self._task_manager = TaskManager(self._sys_config, self._config)
        self._reporter_manager = ReporterManager(self._sys_config)
        self._request_task = {}
        self.taskDict = {}
        self._subscribe_lock = threading.Lock()
        # NOTE: Fetcher?????Master?????
        self._addr = None
项目:daas    作者:havron    | 项目源码 | 文件源码
def est1_kafka(self): # kafka queue not testable?
    fixtureA = {"listing_id":1,"drone": 2, "owner": 2, "description": "please rent myseediestdrone!", "time_posted": "2016-10-24T04:28:48.932Z", "price_per_day": 10.0}
    fixtureB = {"listing_id":2,"drone": 3, "owner": 3, "description": "please rent myforgeddrone!", "time_posted": "2016-10-24T04:28:48.991Z", "price_per_day": 14.0}

    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    producer.send('new-listings-topic', json.dumps(fixtureA).encode('utf-8'))
    producer.send('new-listings-topic', json.dumps(fixtureB).encode('utf-8'))
    print("going to sleep for 30 seconds...")
    time.sleep(30) # allow kafka container to think...
    print("finished sleeping!")

    try:
      consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092'])
      self.assertIsNotNone(consumer)
    except:
      print("consumer not formed")
      #consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092'])

    for message in consumer:
      m = json.loads((message.value).decode('utf-8'))
      print(m)
项目:kq    作者:joowani    | 项目源码 | 文件源码
def __init__(self,
                 hosts='127.0.0.1:9092',
                 cafile=None,
                 certfile=None,
                 keyfile=None,
                 crlfile=None):
        self._hosts = hosts
        self._consumer = kafka.KafkaConsumer(
            bootstrap_servers=self._hosts,
            ssl_cafile=cafile,
            ssl_certfile=certfile,
            ssl_keyfile=keyfile,
            ssl_crlfile=crlfile,
            consumer_timeout_ms=-1,
            enable_auto_commit=True,
            auto_offset_reset='latest',
        )
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        self.valid = 0
        self.invalid = 0

        for message in consumer:
            if len(message.value) == msg_size:
                self.valid += 1
            else:
                self.invalid += 1

            if consumer_stop.is_set():
                break

        consumer.close()
项目:kafka-python-ingest    作者:GISDev01    | 项目源码 | 文件源码
def main():
    # consumer = KafkaConsumer("topic3", group_id="group1",
    #                          bootstrap_servers='localhost:9092')
    # for message in consumer:
    #     print(message)
    #     print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.timestamp / 1000.0)))



    threads = [
        Consumer()
    ]

    for t in threads:
        t.start()
        logger.info('Thread started')

    # logger.info('Sleeping for 100 seconds')
    #time.sleep(100)
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def test_write_kafka_dataframe(self):
        self.expected_data.write_ext.kafka(
            'kafka.docker',
            self.topic,
            key_serializer=self.json_encoder,
            value_serializer=self.json_encoder,
        )

        consumer = KafkaConsumer(
            self.topic,
            bootstrap_servers='kafka.docker:9092',
            key_deserializer=lambda item: json.loads(item.decode('utf-8')),
            value_deserializer=lambda item: json.loads(item.decode('utf-8')),
            auto_offset_reset='earliest',
        )

        actual_data = []
        for i in range(self.expected_data.count()):
            message = next(consumer)
            data = {'key': message.key, 'value': message.value}
            actual_data.append(data)

        self.assertDataFrameEqual(self.expected_data, actual_data)
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def __init__(self, location, topic, group, partition_id):
        self._location = location
        self._group = group
        self._topic = topic
        self._consumer = KafkaConsumer(
            bootstrap_servers=self._location,
            group_id=self._group,
            max_partition_fetch_bytes=10485760,
            consumer_timeout_ms=100,
            client_id="%s-%s" % (self._topic, str(partition_id) if partition_id is not None else "all"),
            request_timeout_ms=120 * 1000,
        )

        if partition_id is not None:
            self._partition_ids = [TopicPartition(self._topic, partition_id)]
            self._consumer.assign(self._partition_ids)
        else:
            self._partition_ids = [TopicPartition(self._topic, pid) for pid in self._consumer.partitions_for_topic(self._topic)]
            self._consumer.subscribe(topics=[self._topic])
            if self._consumer._use_consumer_group():
                self._consumer._coordinator.ensure_coordinator_known()
                self._consumer._coordinator.ensure_active_group()

        self._consumer._update_fetch_positions(self._partition_ids)
        self._start_looping_call()
项目:benchmark-python-client-for-kafka    作者:sucitw    | 项目源码 | 文件源码
def python_kafka_consumer_performance(topic=topic):

    print("\n>>> Connect Kafka in {} by kafka-python as consumer". format(bootstrap_servers))

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset = 'earliest', # start at earliest topic
        group_id = None # do no offest commit
    )
    msg_consumed_count = 0

    consumer_start = time.time()
    consumer.subscribe([topic])
    for msg in consumer:
        msg_consumed_count += 1

        if msg_consumed_count >= msg_count:
            break

    consumer_timing = time.time() - consumer_start
    consumer.close()
    return consumer_timing
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def __init__(self, kafka_host=None, model_cls=None, model_kwargs=None,
                 debug=False):
        self.model_cls = model_cls
        self.model_kwargs = model_kwargs or {}
        kafka_kwargs = {}
        if kafka_host is not None:
            kafka_kwargs['bootstrap_servers'] = kafka_host
        self.consumer = KafkaConsumer(
            self.input_topic,
            group_id=self.group_id,
            max_partition_fetch_bytes=self.max_message_size,
            consumer_timeout_ms=100,
            **kafka_kwargs)
        self.producer = KafkaProducer(
            max_request_size=self.max_message_size,
            **kafka_kwargs)
        self.debug = debug
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def client(self):
        if not self._client:
            kwargs = self._get_client_config()
            self._client = kafka.KafkaConsumer(**kwargs)
            tps = self._get_topic_partitions()
            self._client.assign(tps)
            backend = get_offset_backend()
            for tp in tps:
                backend.seek(self, tp.topic, tp.partition)
                self._client.committed(tp)
        return self._client
项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def check_kafka_is_running():
    # Verify we can connect to Kafka

    time.sleep(2)
    consumer = KafkaConsumer(bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT),
                             auto_offset_reset='earliest')

    mytopic = consumer.topics()

    return 1
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def _get_result_offsets(self):
        """Get the latest offsets for all partitions in topic"""
        consumer = kafka.KafkaConsumer(bootstrap_servers=self.brokers,
                                       auto_offset_reset='latest',
                                       api_version=mjolnir.kafka.BROKER_VERSION)
        partitions = [kafka.TopicPartition(self.topic_result, p)
                      for p in consumer.partitions_for_topic(self.topic_result)]
        consumer.assign(partitions)
        consumer.seek_to_end()
        offsets = [consumer.position(tp) for tp in partitions]
        consumer.close()
        return offsets
项目:kafka-paperplane    作者:wsacin    | 项目源码 | 文件源码
def consumer(self):
        params = {}
        if self._group_id:
            params['group_id'] = self._group_id
        if self._auto_offset_reset:
            params['auto_offset_reset'] = self._auto_offset_reset
        params['bootstrap_servers'] = self._bootstrap_servers

        if not self._consumer:
            self._consumer = KafkaConsumer(**params)
            self._consumer.subscribe(self._receive_from)
        return self._consumer
项目:Stock-Visualizer    作者:saguo    | 项目源码 | 文件源码
def __init__(self, broker, topic):
        self.consumer = KafkaConsumer(topic, bootstrap_servers=broker)
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def main(conf):
    # Enable to topics/feeds
    topics = [
        'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector',
        'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node',
        'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix', 'openbmp.parsed.l3vpn'
    ]

    # Read config file
    with open(conf, 'r') as f:
        config_content = yaml.load(f)

    bootstrap_server = config_content['bootstrap_servers']

    try:
        # connect and bind to topics
        print ("Connecting to kafka... takes a minute to load offsets and topics, please wait")
        consumer = kafka.KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_server,
            client_id="dev-testing" + str(time.time()),
            group_id="dev-testing" + str(time.time()),
            enable_auto_commit=True,
            auto_commit_interval_ms=1000,
            auto_offset_reset="largest"
        )

        print ("Now consuming/waiting for messages...")
        for m in consumer:
            process_message(m)

    except kafka.common.KafkaUnavailableError as err:
        print ("Kafka Error: %s" % str(err))

    except KeyboardInterrupt:
        print ("User stop requested")
项目:samza-prometheus-exporter    作者:movio    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='Feed Apache Samza metrics into Prometheus.')
    parser.add_argument('--brokers', metavar='BROKERS', type=str, required=True,
                        help='list of comma-separated kafka brokers: host[:port],host[:port],...')
    parser.add_argument('--port', metavar='PORT', type=int, nargs='?', default=8080,
                        help='port to serve metrics to Prometheus (default: 8080)')
    parser.add_argument('--topic', metavar='TOPIC', type=str, nargs='?',default='samza-metrics',
                        help='name of topic to consume (default: "samza-metrics")')
    parser.add_argument('--from-beginning', action='store_const', const=True,
                        help='consume topic from offset 0')
    parser.add_argument('--include-jobs-regex', metavar='INCLUDE_JOBS_REGEX', type=str, nargs='?', default='.*',
                        help='only include jobs which match the given regex')
    parser.add_argument('--ttl', metavar='GAUGES_TTL', type=int, nargs='?',
                        help='time in seconds after which a metric (or label set) is no longer reported when not updated (default: 60s)')

    args = parser.parse_args()
    brokers = args.brokers.split(',')
    consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers)
    start_http_server(args.port)

    set_gauges_ttl(args.ttl)

    if args.from_beginning:
        consumer.seek_to_beginning()

    start_ttl_watchdog_thread()

    try:
        consume_topic(consumer, args.brokers, re.compile(args.include_jobs_regex))
    except KeyboardInterrupt:
        pass # FIXME : should we close consumer ?

    print('Shutting down')
项目:opsweb    作者:wylok    | 项目源码 | 文件源码
def intranet_topic():
    consumer = KafkaConsumer('haproxy_logs',
                             group_id='haproxy_logs',
                             bootstrap_servers=bootstrap_servers)
    for message in consumer:
        Msg = message.value.strip()
        try:
            tt = time.strftime('%Y%m%d', time.localtime())
            H_key = 'haproxy2_topic_%s' % tt
            if Msg:
                val = Msg.split('{')
                if len(val) >= 2:
                    Topic = val[1].split('}')[0]
                    Rtime = val[0].split()[8]
                    Rtime = int(Rtime.split('/')[4])
                    if ':' in Topic:
                        Topic = str(Topic.split(':')[0])
                    if '|' in Topic:
                        Topic = str(Topic.split('|')[0])
                    if '.baihe.com' in Topic:
                        Key = 'haproxy2_logs_%s_%s' % (tt,Topic)
                        Rt_Key = 'Rtime2_%s_%s' % (tt,Topic)
                        for KEY in (H_key,Key,Rt_Key):
                            rc.expire(KEY, 86400)
                        rc.sadd(H_key,Topic)
                        rc.incr(Key)
                        if Rtime:
                            rc.lpush(Rt_Key, Rtime)
        except Exception as e:
            #loging.write()
            continue
    sys.exit()
项目:kafkatos3    作者:ConnectedHomes    | 项目源码 | 文件源码
def run_consumer(self):
        '''core consumer code'''
        bootstrap_server = self.config.get('consumer', 'kafka_bootstrap')
        consumer_group = self.config.get('consumer', 'kafka_consumer_group')

        offset_reset = self.config.get(
            'consumer', 'kafka_auto_offset_reset')
        self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server,\
                                        consumer_timeout_ms=60000,\
                                        group_id=consumer_group,\
                                        auto_offset_reset=offset_reset)
        topic_whitelist = self.config.get(
            'consumer', 'topic_whitelist')
        self.logger.info("Topic list is " + topic_whitelist)

        self.consumer.subscribe(topic_whitelist.split(","), None, self)

        self.logger.info("Consumer " + self.consumer_id +
                         " starting.... " + str(self.consumer.assignment()))

        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)

        while not self.shutting_down:
            for message in self.consumer:

                consumer_message = MessageInfo(message.topic, message.partition, message.key,\
                                               message.value, message.offset)
                self.process_message(consumer_message)
                if self.shutting_down:
                    break
            self.check_for_rotation()

        for part in self.partitions:
            self.partitions[part].writer.close()

        self.logger.info("Graceful shutdown of consumer " +
                         str(self.consumer_id) + " successful")
项目:kali    作者:lsenta    | 项目源码 | 文件源码
def __init__(self, topic, host=None, decoder=None):
        self.log = log.getChild(self.__class__.__name__)

        if host is not None:
            additional = dict(bootstrap_servers=[host])
        else:
            additional = dict()

        self._consumer = KafkaConsumer(topic, consumer_timeout_ms=KAFKA_CONSUMER_TIMEOUT_MS, **additional)
        self._stream_queue = make_stream_queue(_as_closable_stream(self._consumer))

        self._all = []
        self._decoder = decoder
项目:Locus    作者:senditya    | 项目源码 | 文件源码
def poll(topic, offset=0, hostname=None, port_num=None, max_timeout=100):
    hostname, port_num = insure_host_port(hostname, port_num)
    server = hostname+':'+str(port_num)
    topic_partition = TopicPartition(topic, partition)

    consumer = KafkaConsumer(bootstrap_servers=server, group_id=None)
    consumer.assign([topic_partition])
    consumer.seek(topic_partition, offset)
    msgs = consumer.poll(max_timeout).values()
    consumer.close()
    if len(msgs) > 0:
        return msgs[0]
    else:
        return {}
项目:scoring-pipelines    作者:trustedanalytics    | 项目源码 | 文件源码
def __init__(self, kafka_URI,topic_str):
        consumer = KafkaConsumer(bootstrap_servers=kafka_URI, auto_offset_reset='earliest', enable_auto_commit=False)
        consumer.subscribe([topic_str])
        self.consumer = consumer
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_user_tweets(username):
    producer = KafkaProducer(bootstrap_servers='kafka:9092') 
    consumer = KafkaConsumer('twitterCheckpoint', bootstrap_servers=['kafka:9092']) 
    msgSent = producer.send('twitterUser', username.encode('utf-8'))

    # wait for msg to be send back by producer
    msgReceived = None
    while not msgReceived:
        for msg in consumer:
            msgReceived = msg.value.decode('utf-8')

            if msgReceived==username:
                return
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_page_posts(page):                                                       
    producer = KafkaProducer(bootstrap_servers='kafka:9092')                    
    consumer = KafkaConsumer('fbCheckpoint', bootstrap_servers=['kafka:9092'])                                                                              
    msgSent = producer.send('fbPage', page.encode('utf-8'))                 

    # wait for msg to be send back by producer
    msgReceived = None                        
    while not msgReceived:                                                      
        for msg in consumer:                                                    
            msgReceived = msg.value.decode('utf-8')                             

            if msgReceived==page:                                               
                return
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_subreddit_posts(subr):                                                  
    producer = KafkaProducer(bootstrap_servers='kafka:9092')                    
    consumer = KafkaConsumer('redditCheckpoint', bootstrap_servers=['kafka:9092'])                                                                            
    msgSent = producer.send('subreddit', subr.encode('utf-8'))                  

    # wait for msg to be send back by producer
    msgReceived = None                                                          
    while not msgReceived:                                                      
        for msg in consumer:                                                    
            msgReceived = msg.value.decode('utf-8')                             

            if msgReceived==subr:                                               
                return
项目:openbmp-python-api-message    作者:OpenBMP    | 项目源码 | 文件源码
def main():
    # Enable to topics/feeds
    topics = [
        'openbmp.parsed.unicast_prefix'
#        'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector',
#        'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node',
#        'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix'
    ]

    # Read config file
    with open('config.yaml', 'r') as f:
        config_content = yaml.load(f)

    bootstrap_server = config_content['bootstrap_servers']

    try:
        # connect and bind to topics
        print ("Connecting to kafka... takes a minute to load offsets and topics, please wait")
        consumer = kafka.KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_server,
            client_id="dev-testing" + str(time.time()),
            group_id="dev-testing" + str(time.time()),
            enable_auto_commit=True,
            auto_commit_interval_ms=1000,
            auto_offset_reset="largest"
        )

        print ("Now consuming/waiting for messages...")
        for m in consumer:
            process_message(m)

    except kafka.common.KafkaUnavailableError as err:
        print(("Kafka Error: %s" % str(err)))

    except KeyboardInterrupt:
        print ("User stop requested")
项目:openbmp-python-api-message    作者:OpenBMP    | 项目源码 | 文件源码
def main():
    # Enable to topics/feeds
    topics = [
        'openbmp.parsed.router', 'openbmp.parsed.peer', 'openbmp.parsed.collector',
        'openbmp.parsed.bmp_stat', 'openbmp.parsed.unicast_prefix', 'openbmp.parsed.ls_node',
        'openbmp.parsed.ls_link', 'openbmp.parsed.ls_prefix', 'openbmp.parsed.l3vpn'
    ]

    # Read config file
    with open('config.yaml', 'r') as f:
        config_content = yaml.load(f)

    bootstrap_server = config_content['bootstrap_servers']

    try:
        # connect and bind to topics
        print("Connecting to kafka... takes a minute to load offsets and topics, please wait")
        consumer = kafka.KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_server,
            client_id="dev-testing" + str(time.time()),
            group_id="dev-testing" + str(time.time()),
            enable_auto_commit=True,
            auto_commit_interval_ms=1000,
            auto_offset_reset="largest"
        )

        print("Now consuming/waiting for messages...")
        for m in consumer:
            process_message(m)

    except kafka.common.KafkaUnavailableError as err:
        print("Kafka Error: %s" % str(err))

    except KeyboardInterrupt:
        print("User stop requested")
项目:citysense    作者:tzano    | 项目源码 | 文件源码
def __init__(self, mongo_db, topic):
        """
        Fetching data from kafka and insert it to mongodb

        :param mongo_db: mongodb cursor
        :type mongo_db: cursor

        :param topic: name of kafka topic
        :type topic: :py:class:`str`

        """
        Thread.__init__(self)
        self.kafka_consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
        self.mongo_db = mongo_db
        self.topic = topic
项目:kq    作者:joowani    | 项目源码 | 文件源码
def __init__(self,
                 hosts='127.0.0.1:9092',
                 topic='default',
                 timeout=None,
                 callback=None,
                 job_size=1048576,
                 cafile=None,
                 certfile=None,
                 keyfile=None,
                 crlfile=None,
                 proc_ttl=5000,
                 offset_policy='latest'):
        self._hosts = hosts
        self._topic = topic
        self._timeout = timeout
        self._callback = callback
        self._pool = None
        self._proc_ttl = proc_ttl
        self._logger = logging.getLogger('kq')
        self._consumer = kafka.KafkaConsumer(
            self._topic,
            group_id=self._topic,
            bootstrap_servers=self._hosts,
            max_partition_fetch_bytes=job_size * 2,
            ssl_cafile=cafile,
            ssl_certfile=certfile,
            ssl_keyfile=keyfile,
            ssl_crlfile=crlfile,
            consumer_timeout_ms=-1,
            enable_auto_commit=False,
            auto_offset_reset=offset_policy,
        )
项目:kq    作者:joowani    | 项目源码 | 文件源码
def consumer(self):
        """Return the Kafka consumer object.

        :return: Kafka consumer object.
        :rtype: kafka.consumer.KafkaConsumer
        """
        return self._consumer
项目:kq    作者:joowani    | 项目源码 | 文件源码
def consumer(self):
        """Return the Kafka consumer object.

        :return: Kafka consumer object.
        :rtype: kafka.consumer.KafkaConsumer
        """
        return self._consumer
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_session_timeout_larger_than_request_timeout_raises(self):
        with self.assertRaises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000)
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
        with self.assertRaises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_subscription_copy(self):
        consumer = KafkaConsumer('foo', api_version=(0, 10))
        sub = consumer.subscription()
        assert sub is not consumer.subscription()
        assert sub == set(['foo'])
        sub.add('fizz')
        assert consumer.subscription() == set(['foo'])
项目:napalm-logs    作者:napalm-automation    | 项目源码 | 文件源码
def start(self):
        '''
        Startup the kafka consumer.
        '''
        log.debug('Creating the consumer using the bootstrap servers: %s and the group ID: %s',
                  self.bootstrap_servers,
                  self.group_id)
        try:
            self.consumer = kafka.KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
                                                group_id=self.group_id)
        except kafka.errors.NoBrokersAvailable as err:
            log.error(err, exc_info=True)
            raise ListenerException(err)
        log.debug('Subscribing to the %s topic', self.topic)
        self.consumer.subscribe(topics=[self.topic])
项目:kafka-python-ingest    作者:GISDev01    | 项目源码 | 文件源码
def run(self):
        consumer = KafkaConsumer(
            'topic2',
            bootstrap_servers='localhost:9092') #,
            #auto_offset_reset='earliest')

        #consumer.subscribe(['topic1'])

        for message in consumer:
            print(message)
            print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(message.timestamp / 1000.0)))
项目:FlaskService    作者:b96705008    | 项目源码 | 文件源码
def run(self):
        consumer = KafkaConsumer(bootstrap_servers=self.kafka_host,
                                 group_id=self.group_id,
                                 auto_offset_reset='latest')
        consumer.subscribe(self.sub_topics)

        for message in consumer:
            for s in self.subscribers:
                if s.should_call(message):
                    self.executor.submit(s.call, message)
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def __init__(self, CC_obj: object, auto_offset_reset: str="largest"):
        """

        :param CC_obj:
        :param auto_offset_reset:
        """
        self.configuration = CC_obj.configuration
        self.hostIP = self.configuration['kafkaserver']['host']
        self.hostPort = self.configuration['kafkaserver']['port']

        self.consumer = KafkaConsumer(bootstrap_servers=str(self.hostIP)+":"+str(self.hostPort), api_version=(0,10),
                                      auto_offset_reset=auto_offset_reset)
项目:mysql-binlog-replication    作者:nghiaminhle    | 项目源码 | 文件源码
def __init__(self):    
        self._consumer = KafkaConsumer(
            bootstrap_servers=kafka_bootstrap_server, 
            group_id=self._consumer_group_id, 
            auto_offset_reset = self._offset_reset,
            enable_auto_commit=True
        )
项目:mysql-binlog-replication    作者:nghiaminhle    | 项目源码 | 文件源码
def __init__(self, topics, consumer_group, offset_reset = 'earliest', auto_commit = False):
        self._consumer_thread = Thread(target=self.do_consume)
        self._consumer_thread.setDaemon(True)

        self._auto_commit = auto_commit
        self._consumer = KafkaConsumer(
                bootstrap_servers=kafka_bootstrap_server, 
                group_id=consumer_group, 
                auto_offset_reset = offset_reset,
                enable_auto_commit= auto_commit
            )

        self._consumer.subscribe(topics)
项目:plugins    作者:site24x7    | 项目源码 | 文件源码
def getConnection(self):
        self.connection=KafkaConsumer(bootstrap_servers=[self.broker+":"+self.port])   # Kafka consumer connection
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def _acquire(self, partitions):
        if not self.consumer:
            self.consumer = KafkaConsumer(partitions, **self.config)
        else:
            self.consumer.set_topic_partitions(partitions)
        if self.post_rebalance_callback:
            self.post_rebalance_callback(partitions)

    # set_topic_partitions causes a metadata request, which may fail on the
    # first try.