Python kafka 模块,SimpleProducer() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用kafka.SimpleProducer()

项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_producer_sync_fail_on_error(self):
        error = FailedPayloadsError('failure')
        with patch.object(SimpleClient, 'load_metadata_for_topics'):
            with patch.object(SimpleClient, 'ensure_topic_exists'):
                with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
                    with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):

                        client = SimpleClient(MagicMock())
                        producer = SimpleProducer(client, async=False, sync_fail_on_error=False)

                        # This should not raise
                        (response,) = producer.send_messages('foobar', b'test message')
                        self.assertEqual(response, error)

                        producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
                        with self.assertRaises(FailedPayloadsError):
                            producer.send_messages('foobar', b'test message')
项目:Twitter-and-IMDB-Sentimental-Analytics    作者:abhinandanramesh    | 项目源码 | 文件源码
def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        client = KafkaClient("localhost:9092")
        self.producer = SimpleProducer(client, async = True,
                          batch_send_every_n = 1000,
                          batch_send_every_t = 10)
项目:python-agent    作者:sandabuliu    | 项目源码 | 文件源码
def __init__(self, topic, server, client=None, **kwargs):
        try:
            import kafka
        except ImportError:
            raise OutputError('Lack of kafka module, try to execute `pip install kafka-python>=1.3.1` install it')

        client = client or kafka.SimpleClient
        self._producer = None
        self._topic = topic
        try:
            self._kafka = client(server, **kwargs)
        except Exception, e:
            raise OutputError('kafka client init failed: %s' % e)
        self.producer(kafka.SimpleProducer)
        super(Kafka, self).__init__()
项目:jtyd_python_spider    作者:xtuyaowu    | 项目源码 | 文件源码
def __init__(self, name, host='web14', port=51092, **kwargs):
        QueueBase.QueueBase.__init__(self, name, host, port)
        self.__queue = []
        self.__kafka = KafkaClient('%s:%d' % (host, port))
        self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False))
        self.__producer.client.ensure_topic_exists(self.name)
        self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
项目:kafka-utils    作者:Yelp    | 项目源码 | 文件源码
def produce_example_msg(topic, num_messages=1):
    kafka = KafkaToolClient(KAFKA_URL)
    producer = SimpleProducer(kafka)
    for i in range(num_messages):
        try:
            producer.send_messages(topic, b'some message')
        except LeaderNotAvailableError:
            # Sometimes kafka takes a bit longer to assign a leader to a new
            # topic
            time.sleep(10)
            producer.send_messages(topic, b'some message')
项目:deb-python-kafka    作者:openstack    | 项目源码 | 文件源码
def test_topic_message_types(self):
        client = MagicMock()

        def partitions(topic):
            return [0, 1]

        client.get_partition_ids_for_topic = partitions

        producer = SimpleProducer(client, random_start=False)
        topic = b"test-topic"
        producer.send_messages(topic, b'hi')
        assert client.send_produce_request.called
项目:Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka    作者:sridharswamy    | 项目源码 | 文件源码
def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        client = KafkaClient("localhost:9092")
        self.producer = SimpleProducer(client, async = True,
                          batch_send_every_n = 1000,
                          batch_send_every_t = 10)
项目:tsd-helpers    作者:robinhood    | 项目源码 | 文件源码
def make_kafka_producer(kafka_znode):
    kafka_brokers = get_kafka_brokers(kafka_znode)
    kafka_client = KafkaClient(kafka_brokers)
    return SimpleProducer(
        kafka_client,
        async=False,
        req_acks=1,
        random_start=True
    )
项目:log_to_kafka    作者:ShichaoMa    | 项目源码 | 文件源码
def __init__(self, settings):
        self.settings = settings
        self.client = KafkaClient(settings.get("KAFKA_HOSTS"))
        self.producer = SimpleProducer(self.client)
        self.producer.send_messages = failedpayloads_wrapper(
            settings.get("KAFKA_RETRY_TIME", 5))(self.producer.send_messages)
        super(KafkaHandler, self).__init__()