Python kafka 模块,KafkaProducer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用kafka.KafkaProducer()

项目:dcos-kafka-cassandra    作者:RobBagby    | 项目源码 | 文件源码
def simulate():
    """simulate temperature events for machines"""

    LOGGER.setLevel(APPLICATION_LOGGING_LEVEL)
    LOGGER.info("Starting producer")
    LOGGER.debug('Set Logging Level to ' + APPLICATION_LOGGING_LEVEL)
    LOGGER.debug('Writing to Kafka listening at: ' + KAFKA_URI)

    producer = KafkaProducer(bootstrap_servers=KAFKA_URI)

    last_temperatures = {}

    while True:
        for i in range(PUBLISH_NUMBER_OF_SENSORS):
            sensor = 'sensor' + str(i)
            temperature = _get_temperature(sensor, last_temperatures)
            message = MachineTemperature(sensor, temperature, datetime.datetime.utcnow()).to_json()

            producer.send(SENSOR_TEMPERATURE_TOPIC, str.encode(message), key=sensor.encode())

        LOGGER.info(str(PUBLISH_NUMBER_OF_SENSORS) + " messages published")
        time.sleep(PUBLISH_DELAY_IN_SECONDS)
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def __init__(self, brokers, n_workers=5, topic_work=mjolnir.kafka.TOPIC_REQUEST,
                 topic_result=mjolnir.kafka.TOPIC_RESULT, topic_complete=mjolnir.kafka.TOPIC_COMPLETE,
                 max_request_size=4*1024*1024):
        self.brokers = brokers
        self.n_workers = n_workers
        self.topic_work = topic_work
        self.topic_result = topic_result
        self.topic_complete = topic_complete
        # Standard producer for query results
        self.producer = kafka.KafkaProducer(bootstrap_servers=brokers,
                                            max_request_size=max_request_size,
                                            compression_type='gzip',
                                            api_version=mjolnir.kafka.BROKER_VERSION)
        # More reliable producer for reflecting end run sigils. As this
        # is only used for sigils and not large messages like es responses
        # compression is unnecessary here.
        self.ack_all_producer = kafka.KafkaProducer(bootstrap_servers=brokers,
                                                    acks='all',
                                                    api_version=mjolnir.kafka.BROKER_VERSION)
        # TODO: 10 items? No clue how many is appropriate...10 seems reasonable
        # enough.  We want enough to keep the workers busy, but not so many
        # that the commited offsets are siginficantly ahead of the work
        # actually being performed.
        self.work_queue = Queue.Queue(10)
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def kafka(TOPIC=None):
    # Lazy init of the Kafka producer
    #
    global PRODUCER
    if PRODUCER is None:
        PRODUCER = KafkaProducer(
            bootstrap_servers=KAFKA_BOOSTRAP_SERVERS,
            sasl_mechanism=KAFKA_SASL_MECHANISM,
            sasl_plain_username=KAFKA_USER,
            sasl_plain_password=KAFKA_PASSWORD)
    try:
        future = PRODUCER.send(TOPIC, request.get_data())
        future.get(timeout=60)
        return "OK", 200, None
    except KafkaTimeoutError:
        return "Internal Server Error", 500, None
项目:wsp    作者:wangybnet    | 项目源码 | 文件源码
def __init__(self, config):
        assert isinstance(config, FetcherConfig), "Wrong configuration"
        log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr))
        self._config = config
        self.master_addr = self._config.master_rpc_addr
        if not self.master_addr.startswith("http://"):
            self.master_addr = "http://%s" % self.master_addr
        self._host, self._port = self._config.fetcher_rpc_addr.split(":")
        self._port = int(self._port)
        self._sys_config = self._pull_sys_config_from_master()
        self.isRunning = False
        self.rpcServer = self._create_rpc_server()
        self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ])
        self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ],
                                      auto_offset_reset='earliest',
                                      consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms)
        self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout)
        self._task_manager = TaskManager(self._sys_config, self._config)
        self._reporter_manager = ReporterManager(self._sys_config)
        self._request_task = {}
        self.taskDict = {}
        self._subscribe_lock = threading.Lock()
        # NOTE: Fetcher?????Master?????
        self._addr = None
项目:wsp    作者:wangybnet    | 项目源码 | 文件源码
def __init__(self, config):
        assert isinstance(config, FetcherConfig), "Wrong configuration"
        log.debug("New fetcher with master_rpc_addr=%s, rpc_addr=%s" % (config.master_rpc_addr, config.fetcher_rpc_addr))
        self._config = config
        self.master_addr = self._config.master_rpc_addr
        if not self.master_addr.startswith("http://"):
            self.master_addr = "http://%s" % self.master_addr
        self._host, self._port = self._config.fetcher_rpc_addr.split(":")
        self._port = int(self._port)
        self._sys_config = self._pull_sys_config_from_master()
        self.isRunning = False
        self.rpcServer = self._create_rpc_server()
        self.producer = KafkaProducer(bootstrap_servers=[self._sys_config.kafka_addr, ])
        self.consumer = KafkaConsumer(bootstrap_servers=[self._sys_config.kafka_addr, ],
                                      auto_offset_reset='earliest',
                                      consumer_timeout_ms=self._sys_config.kafka_consumer_timeout_ms)
        self.downloader = Downloader(clients=self._sys_config.downloader_clients, timeout=self._sys_config.downloader_timeout)
        self._task_manager = TaskManager(self._sys_config, self._config)
        self._reporter_manager = ReporterManager(self._sys_config)
        self._request_task = {}
        self.taskDict = {}
        self._subscribe_lock = threading.Lock()
        # NOTE: Fetcher?????Master?????
        self._addr = None
项目:daas    作者:havron    | 项目源码 | 文件源码
def est1_kafka(self): # kafka queue not testable?
    fixtureA = {"listing_id":1,"drone": 2, "owner": 2, "description": "please rent myseediestdrone!", "time_posted": "2016-10-24T04:28:48.932Z", "price_per_day": 10.0}
    fixtureB = {"listing_id":2,"drone": 3, "owner": 3, "description": "please rent myforgeddrone!", "time_posted": "2016-10-24T04:28:48.991Z", "price_per_day": 14.0}

    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    producer.send('new-listings-topic', json.dumps(fixtureA).encode('utf-8'))
    producer.send('new-listings-topic', json.dumps(fixtureB).encode('utf-8'))
    print("going to sleep for 30 seconds...")
    time.sleep(30) # allow kafka container to think...
    print("finished sleeping!")

    try:
      consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092'])
      self.assertIsNotNone(consumer)
    except:
      print("consumer not formed")
      #consumer = KafkaConsumer('new-listings-topic', group_id='listing-indexer', bootstrap_servers=['kafka:9092'])

    for message in consumer:
      m = json.loads((message.value).decode('utf-8'))
      print(m)
项目:pyflume    作者:jiangruocheng    | 项目源码 | 文件源码
def process_data(self, msg):
        result = 'ok'
        _data = msg['filename'] + ': ' + msg['data']
        self.log.debug(msg['collectors'] + _data)

        producer = KafkaProducer(bootstrap_servers=self.kfk_server)

        future = producer.send(self.topic, _data)
        # Block for 'synchronous' sends
        try:
            record_metadata = future.get(timeout=10)
        except KafkaError:
            # Decide what to do if produce request failed...
            self.log.error(traceback.format_exc())
            result = 'Fail'
        finally:
            producer.close()

        # return record_metadata.topic, record_metadata.partition, record_metadata.offset
        return result,
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('topic')
    parser.add_argument('filename')
    parser.add_argument('--kafka-host')
    parser.add_argument('--validate', action='store_true')
    args = parser.parse_args()

    if args.validate:
        with open(args.filename, 'rt') as f:
            data = json.dumps(json.load(f)).encode('utf8')
    else:
        with open(args.filename, 'rb') as f:
            data = f.read()

    logging.basicConfig(level=logging.ERROR)
    kafka_kwargs = {}
    if args.kafka_host:
        kafka_kwargs['bootstrap_servers'] = args.kafka_host
    producer = KafkaProducer(
        max_request_size=104857600, **kafka_kwargs)

    producer.send(args.topic, data)
    producer.flush()
    print('Pushed {} bytes to {}'.format(len(data), args.topic))
项目:hh-page-classifier    作者:TeamHG-Memex    | 项目源码 | 文件源码
def __init__(self, kafka_host=None, model_cls=None, model_kwargs=None,
                 debug=False):
        self.model_cls = model_cls
        self.model_kwargs = model_kwargs or {}
        kafka_kwargs = {}
        if kafka_host is not None:
            kafka_kwargs['bootstrap_servers'] = kafka_host
        self.consumer = KafkaConsumer(
            self.input_topic,
            group_id=self.group_id,
            max_partition_fetch_bytes=self.max_message_size,
            consumer_timeout_ms=100,
            **kafka_kwargs)
        self.producer = KafkaProducer(
            max_request_size=self.max_message_size,
            **kafka_kwargs)
        self.debug = debug
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def client(self):
        if not self._client:
            kwargs = self._get_client_config()
            self._client = kafka.KafkaProducer(**kwargs)
        return self._client
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def _make_producer(brokers):
    return kafka.KafkaProducer(bootstrap_servers=brokers,
                               compression_type='gzip',
                               api_version=mjolnir.kafka.BROKER_VERSION)
项目:broadview-collector    作者:openstack    | 项目源码 | 文件源码
def getKafkaProducer(self):
        try:
            self._producer = kafka.KafkaProducer(bootstrap_servers=['{}:{}'.format(self._ip_address, self._port)])
        except kafka.errors.NoBrokersAvailable as e:
            LOG.error("BroadViewPublisher: NoBrokersAvailable {}".format(e))
        except:
            LOG.error("Unexpected error: {}".format(sys.exc_info()[0]))
项目:kafka-spark-influx-csv-analysis    作者:bwsw    | 项目源码 | 文件源码
def __init__(self, delay, topic, servers='localhost:29092', data_file="", random_data=False, file_ip="",
                 continuously=False):
        self._path_to_file = data_file
        self._delay = delay
        self.status = ProducerStatus.Created
        if self._path_to_file:
            self._file = open(data_file, "r")
        self._topic = topic
        self._producer = KafkaProducer(bootstrap_servers=servers)
        self._continuously = continuously
        self._random_data = random_data
        if self._random_data:
            self._random_sflow = GeneratingRandomSFLOW(file_ip)
项目:kafka-paperplane    作者:wsacin    | 项目源码 | 文件源码
def producer(self):
        if not self._producer:
            self._producer = KafkaProducer(
                    bootstrap_servers=self._bootstrap_servers,
                )
        return self._producer
项目:Stock-Visualizer    作者:saguo    | 项目源码 | 文件源码
def __init__(self, broker, topic):
        """ Instantiate a producer given broker and topic
        :param broker: ip and port number of broker, e.g. 127.0.0.1:9092
        :param topic: name of the topic
        :return: None
        """
        self.producer = KafkaProducer(bootstrap_servers=broker, value_serializer=lambda v: ujson.dumps(v).encode('utf-8'))
        self.topic = topic
        logger.info("Setup kafka producer at {} with topic {}".format(broker, topic))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def producer_msg(self):
        try:
            producer = KafkaProducer(bootstrap_servers=[self.serverlist])
            producer.send('oa_qian', (self.msg).encode("utf-8"))
            producer.flush()
            producer.close(timeout=60)
            return "success"
        except Exception as e:
            log.error(e)
            return "error"

# if __name__ == '__main__':
# test code
# mq_s = Mq_s('kafka.sunqb.com:9092', 'sunqingbiao;sun;890897;1')
# mq_s.producer_msg()
项目:maxwell-faker    作者:movio    | 项目源码 | 文件源码
def produce_to_kafka(schema, args, config):
    topic = config['kafka']['topic']
    producer = KafkaProducer(bootstrap_servers = config['kafka']['brokers'])

    def f_produce(topic, partition, key, value):
        producer.send(topic, key = key, value = value, partition = partition)

    partition_count = 1 + max(producer.partitions_for(topic))
    try:
        bootstrap(f_produce, partition_count, schema, args.database, args.table, config)
    except KeyboardInterrupt:
        sys.exit(1)
    producer.flush()
    producer.close()
项目:maxwell-faker    作者:movio    | 项目源码 | 文件源码
def produce_to_bruce(schema, args, config):
    topic = config['kafka']['topic']

    if args.partition_count:
        partition_count = args.partition_count
    else:
        print 'fetch partition info for topic ' + topic
        producer = KafkaProducer(bootstrap_servers = config['kafka']['brokers'])
        partition_count = 1 + max(producer.partitions_for(topic))
        producer.close()

    socket = bruce.open_bruce_socket()

    # batching socket send
    buff = []

    def flush_buff():
        for msg in buff:
            socket.sendto(msg, '/var/run/bruce/bruce.socket')
        del buff[:]

    def f_produce(topic, partition, key, value):
        if len(buff) < 1000:
            buff.append(bruce.create_msg(partition, topic, bytes(key), bytes(value)))
        else:
            flush_buff()

    try:
        bootstrap(f_produce, partition_count, schema, args.database, args.table, config)
        flush_buff()
    except KeyboardInterrupt:
        sys.exit(1)
    finally:
        socket.close()
项目:Visor    作者:xuwenyihust    | 项目源码 | 文件源码
def run(self):
        self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
        super(fake_access_producer, self).run()
项目:Visor    作者:xuwenyihust    | 项目源码 | 文件源码
def run(self):
        self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
        super(fake_error_producer, self).run()
项目:monasca-analytics    作者:openstack    | 项目源码 | 文件源码
def sink_dstream(self, dstream):
        if self._producer is None:
            self._producer = kafka.KafkaProducer(
                bootstrap_servers="{0}:{1}".format(self._host, self._port))
        dstream.foreachRDD(self._persist)
项目:StreamingLogisticRegression    作者:keiraqz    | 项目源码 | 文件源码
def __init__(self):
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
        self.merchants_list = []
项目:python-agent    作者:sandabuliu    | 项目源码 | 文件源码
def close(self):
        from kafka import KafkaProducer
        self.inter.set()
        producer = KafkaProducer(bootstrap_servers=self.servers)
        producer.send(self.topic, self.end)
        producer.flush()
项目:Locus    作者:senditya    | 项目源码 | 文件源码
def publish(topics, message, hostname=None, port_num=None):
    hostname, port_num = insure_host_port(hostname, port_num)
    server = hostname+':'+str(port_num)
    publisher = KafkaProducer(bootstrap_servers=server)
    for topic in topics:
        publisher.send(topic, message.encode('utf-8'))
项目:scoring-pipelines    作者:trustedanalytics    | 项目源码 | 文件源码
def __init__(self, kafka_URI, topic_str):
        self.producer = KafkaProducer(bootstrap_servers=kafka_URI)
        self.topic = topic_str
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_user_tweets(username):
    producer = KafkaProducer(bootstrap_servers='kafka:9092') 
    consumer = KafkaConsumer('twitterCheckpoint', bootstrap_servers=['kafka:9092']) 
    msgSent = producer.send('twitterUser', username.encode('utf-8'))

    # wait for msg to be send back by producer
    msgReceived = None
    while not msgReceived:
        for msg in consumer:
            msgReceived = msg.value.decode('utf-8')

            if msgReceived==username:
                return
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_page_posts(page):                                                       
    producer = KafkaProducer(bootstrap_servers='kafka:9092')                    
    consumer = KafkaConsumer('fbCheckpoint', bootstrap_servers=['kafka:9092'])                                                                              
    msgSent = producer.send('fbPage', page.encode('utf-8'))                 

    # wait for msg to be send back by producer
    msgReceived = None                        
    while not msgReceived:                                                      
        for msg in consumer:                                                    
            msgReceived = msg.value.decode('utf-8')                             

            if msgReceived==page:                                               
                return
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_subreddit_posts(subr):                                                  
    producer = KafkaProducer(bootstrap_servers='kafka:9092')                    
    consumer = KafkaConsumer('redditCheckpoint', bootstrap_servers=['kafka:9092'])                                                                            
    msgSent = producer.send('subreddit', subr.encode('utf-8'))                  

    # wait for msg to be send back by producer
    msgReceived = None                                                          
    while not msgReceived:                                                      
        for msg in consumer:                                                    
            msgReceived = msg.value.decode('utf-8')                             

            if msgReceived==subr:                                               
                return
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_user_tweets(username):
    # Connect to Kafka
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    # Twitter API
    api = authenticate_api()

    tweets = 0
    need_update = True
    try:
        for page in Cursor(api.user_timeline, screen_name=username, count=200).pages(16):
            for status in page:
                status = status._json
                msg = producer.send('data', json.dumps(format_tweet(status)).encode('utf-8'))
                tweets += 1
                print(tweets)

                with open('test1.jsonl', 'a') as f:
                    f.write(json.dumps(format_tweet(status))+'\n')

            # Flush kafka producer
            producer.flush()      
            # Follow Twitter's Rate limit 
            sleep(2)
    except Exception as e:
        print(e)
        pass

    # Flush kafka producer                                              
    producer.flush()
    return username
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_reddit_submissions(subreddit):
    # Connect to Kafka
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    # Reddit API
    reddit = authenticate_api()

    submissions = 0
    try:
        for submission in reddit.subreddit(subreddit).new():
            sub = format_submission(submission)
            if submissions > 1000:
                break

            msg = producer.send('data', json.dumps(sub).encode('utf-8'))
            submissions += 1
            print(submissions)
            with open('test.jsonl', 'a') as f:
                f.write(json.dumps(sub)+'\n') 

        # Flush kafka producer
        producer.flush()
    except Exception as e:
        with open('Errors.txt', 'a') as f:
            f.write(str(type(e))+'\n')
            f.write(str(e)+'\n') 

    # Flush kafka producer                                                  
    producer.flush()
    return subreddit
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_reddit_submissions(subreddit):
    # Connect to Kafka
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    # Reddit API
    reddit = authenticate_api()

    submissions = 0
    try:
        for submission in reddit.subreddit(subreddit).new():
            sub = format_submission(submission)
            if submissions > 1000:
                break

            msg = producer.send('data', json.dumps(sub).encode('utf-8'))
            submissions += 1
            print(submissions)
            with open('test.jsonl', 'a') as f:
                f.write(json.dumps(sub)+'\n') 

        # Flush kafka producer
        producer.flush()
    except Exception as e:
        with open('Errors.txt', 'a') as f:
            f.write(str(type(e))+'\n')
            f.write(str(e)+'\n') 

    # Flush kafka producer                                                  
    producer.flush()
    return subreddit
项目:quorum    作者:Data4Democracy    | 项目源码 | 文件源码
def get_user_tweets(username):
    # Connect to Kafka
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    # Twitter API
    api = authenticate_api()

    tweets = 0
    need_update = True
    try:
        for page in Cursor(api.user_timeline, screen_name=username, count=200).pages(16):
            for status in page:
                status = status._json
                msg = producer.send('data', json.dumps(format_tweet(status)).encode('utf-8'))
                tweets += 1
                print(tweets)

                with open('test1.jsonl', 'a') as f:
                    f.write(json.dumps(format_tweet(status))+'\n')

            # Flush kafka producer
            producer.flush()      
            # Follow Twitter's Rate limit 
            sleep(2)
    except Exception as e:
        print(e)
        pass

    # Flush kafka producer                                              
    producer.flush()
    return username
项目:citysense    作者:tzano    | 项目源码 | 文件源码
def __init__(self, sensor_node_id, kafka_topic='wifi'):
        """
         Wifi Sniffer class to structure/store data from wifi devices. Data is collected using Scapy.

         :param sensor_node_id: node id
         :type sensor_node_id: :py:class:`str`

         :param kafka_topic: name of kafka topic
         :type kafka_topic: :py:class:`str`
         """
        Thread.__init__(self)
        self.sensor_node_id = sensor_node_id
        self.kafka_topic = kafka_topic
        self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)
        self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
项目:citysense    作者:tzano    | 项目源码 | 文件源码
def __init__(self, sensor_node_id, kafka_topic='bluetooth'):
        """
        Bluetooth Sniffer class to structure/store data from bluetooth devices
        :param sensor_node_id: node id
        :type sensor_node_id: :py:class:`str`

        :param kafka_topic: name of kafka topic
        :type kafka_topic: :py:class:`str`

        """
        Thread.__init__(self)
        self.sensor_node_id = sensor_node_id
        self.kafka_topic = kafka_topic
        self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)
        self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
项目:citysense    作者:tzano    | 项目源码 | 文件源码
def __init__(self, sensor_node_id, kafka_topic='environment'):
        """
        Environment Sniffer class to structure/store data from envinonmental devices. Data is received from Arduino.

        :param sensor_node_id: node id
        :type sensor_node_id: :py:class:`str`

        :param kafka_topic: name of kafka topic
        :type kafka_topic: :py:class:`str`
        """
        Thread.__init__(self)
        self.sensor_node_id = sensor_node_id
        self.kafka_topic = kafka_topic
        self.kafka_server = '{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)
        self.kafka_producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(KAFKA_SERVER, KAFKA_PORT)])
项目:kq    作者:joowani    | 项目源码 | 文件源码
def __init__(self,
                 hosts='127.0.0.1:9092',
                 topic='default',
                 timeout=None,
                 compression=None,
                 acks=1,
                 retries=0,
                 job_size=1048576,
                 cafile=None,
                 certfile=None,
                 keyfile=None,
                 crlfile=None):
        self._hosts = hosts
        self._topic = topic
        self._timeout = timeout
        self._logger = logging.getLogger('kq')
        self._producer = kafka.KafkaProducer(
            bootstrap_servers=self._hosts,
            compression_type=compression,
            acks=acks,
            retries=retries,
            max_request_size=job_size,
            buffer_memory=max(job_size, 33554432),
            ssl_cafile=cafile,
            ssl_certfile=certfile,
            ssl_keyfile=keyfile,
            ssl_crlfile=crlfile
        )
项目:kq    作者:joowani    | 项目源码 | 文件源码
def producer(self):
        """Return the Kafka producer object.

        :return: Kafka producer object.
        :rtype: kafka.producer.KafkaProducer
        """
        return self._producer
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_kafka_producer_gc_cleanup():
    threads = threading.active_count()
    producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection
    assert threading.active_count() == threads + 1
    del(producer)
    gc.collect()
    assert threading.active_count() == threads
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092')
        self.sent = 0

        while not producer_stop.is_set():
            producer.send('my-topic', self.big_msg)
            self.sent += 1
        producer.flush()
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092')

        while True:
            producer.send('my-topic', b"test")
            producer.send('my-topic', b"\xc2Hola, mundo!")
            time.sleep(1)
项目:napalm-logs    作者:napalm-automation    | 项目源码 | 文件源码
def start(self):
        try:
            self.producer = kafka.KafkaProducer(bootstrap_servers=self.bootstrap_servers)
        except kafka.errors.NoBrokersAvailable as err:
            log.error(err, exc_info=True)
            raise NapalmLogsException(err)
项目:kafka-python-ingest    作者:GISDev01    | 项目源码 | 文件源码
def run(self):
        producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'),
                                 bootstrap_servers='localhost:9092')

        producer.send('topic3', {'Producer1': 'value1'})
        print('Sent Prod1 Message')
        time.sleep(3)
        self.run()
项目:kafka-python-ingest    作者:GISDev01    | 项目源码 | 文件源码
def run(self):
        producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'),
                                 bootstrap_servers='localhost:9092')

        producer.send('topic3', {'Producer2': 'value2'})
        print('Sent Prod2 Message')
        time.sleep(2)
        self.run()
项目:kafka-python-ingest    作者:GISDev01    | 项目源码 | 文件源码
def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092')

        producer.send('topic2', b"Topic2 Test2 Bytes String")
        producer.send('topic2', b"Msg 2!!")
        print('Sent Messages')
        time.sleep(1)
项目:FlaskService    作者:b96705008    | 项目源码 | 文件源码
def __init__(self, config):
        kafka_host = config.get('kafka', 'bootstrap_servers')

        # producer
        self.pub_topic = config.get('kafka', 'pub_topic')
        self.producer = KafkaProducer(bootstrap_servers = kafka_host)
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def _publish_data(self, data):
        producer = KafkaProducer(bootstrap_servers='kafka.docker',
                                 key_serializer=self.key_serializer,
                                 value_serializer=self.value_serializer)
        for item in data:
            producer.send(self.topic, key=item['key'], value=item['value'])

        producer.flush()
        producer.close()
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def write_data(self, df, host, topic, port):
        producer = KafkaProducer(
            bootstrap_servers=['{}:{}'.format(host, port)],
            key_serializer=pickle.dumps,
            value_serializer=pickle.dumps,
        )
        rows = df.collect()
        for row in rows:
            producer.send(topic, key=row.key, value=row.value)
        producer.flush()
        return len(rows)
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def _create(self):
        self._producer = KafkaProducer(bootstrap_servers=self._location, retries=5,
                                       compression_type=self._compression)
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def __init__(self, location, topic_done, partitioner, compression):
        self._location = location
        self._topic_done = topic_done
        self._partitioner = partitioner
        self._compression = compression
        self._producer = KafkaProducer(bootstrap_servers=self._location, partitioner=partitioner, retries=5,
                                       compression_type=self._compression)
项目:kafka_extract    作者:tqlihuiqi    | 项目源码 | 文件源码
def __init__(self, output, cluster, topic, diskPath=None, avroSchema=None, targetBrokers=None, targetTopic=None, compressType="snappy"):
        """ ???????

        :?? cluster: ??Kafka????
        :?? topic: ??Kafka Topic??

        :?? output: ???? disk/kafka
            :disk:
                :?? diskPath: ??????, ??: None
                :?? avroSchema: ??AVRO????????AVRO Schema???????, ??: None
            :kafka:
                :?? targetBrokers: ????Kafka Broker????, ??: None
                :?? targetTopic: ????Kafka Topic??, ??: None
                :?? compressType: ????Kafka Topic???????, ??: snappy ????("lz4", "snappy", "gzip")
        """

        self.output = output

        if output == "disk":
            if not diskPath or not os.path.isdir(diskPath):
                raise ValueError("Invalid disk path.")

            filename = "%s_%s.data" % (cluster, topic)
            filepath = os.path.join(diskPath, filename)

            if os.path.exists(filepath):
                raise IOError("File already exists.")

            self.handler = open(filepath, "ab+")

            if avroSchema:
                self.avroSchema = eval(avroSchema)
            else:  
                self.avroSchema = None

        elif output == "kafka":
            self.handler = KafkaProducer(bootstrap_servers=targetBrokers.split(","), compression_type=compressType)
            self.targetTopic = targetTopic

        else:
            raise ValueError("Invalid output parameter.")