Python pika 模块,BasicProperties() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pika.BasicProperties()

项目:frontoxy    作者:fabienvauchelles    | 项目源码 | 文件源码
def publish(self, item, priority=0, retry=2):
        body = json.dumps(item)

        try:
            self._channel.basic_publish(exchange=u'',
                                        routing_key=self._queue_name,
                                        body=body,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,
                                            priority=priority
                                        ))

        except exceptions.ConnectionClosed as err:
            if retry <= 0:
                raise err

            self.open()
            self.publish(item, retry - 1)
项目:aio-pika    作者:mosquito    | 项目源码 | 文件源码
def properties(self) -> BasicProperties:
        """ Build :class:`pika.BasicProperties` object """
        return BasicProperties(
            content_type=self.content_type,
            content_encoding=self.content_encoding,
            headers=self.headers,
            delivery_mode=self.delivery_mode,
            priority=self.priority,
            correlation_id=self.correlation_id,
            reply_to=self.reply_to,
            expiration=str(convert_timestamp(self.expiration * 1000)) if self.expiration else None,
            message_id=self.message_id,
            timestamp=self.timestamp,
            type=self.type,
            user_id=self.user_id,
            app_id=self.app_id
        )
项目:eagle    作者:saga92    | 项目源码 | 文件源码
def send(self, message):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                routing_key = 'eagle',
                properties = pika.BasicProperties(\
                    reply_to = self.callback_queue,
                    correlation_id =self.corr_id,),
                body=message
            )
        for i in xrange(self.timeout):
            if self.response is None:
                self.connection.process_data_events()
            else:
                break
            time.sleep(1)
        return self.response
项目:atlas    作者:johnb30    | 项目源码 | 文件源码
def process_rss(rss_result, message_body, redis_conn, message_queue):
    for result in rss_result:
        page_url = _convert_url(result.url, message_body['website'])

        in_database = _check_redis(page_url, redis_conn)

        message_body['title'] = result.title
        message_body['date'] = result.date
        message_body['url'] = page_url

        to_send = json.dumps(message_body)

        if not in_database:
            message_queue.basic_publish(exchange='',
                                        routing_key='scraper_queue',
                                        body=to_send,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,))
            #Set the value within redis to expire in 3 days
            redis_conn.setex(page_url, 259200, 1)
        else:
            pass
项目:BrundleFuzz    作者:carlosgprado    | 项目源码 | 文件源码
def on_mutation_request(self, ch, method, props, body):
        """Callback for messages in the 'rpc_mutations_queue'

        They say: "Hey, do you have a mutation for me?"
        """

        # This is the "remote procedure"
        # being called and returning a value
        mutation_obj = self.get_mutation()

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
                         properties = pika.BasicProperties(
                                    correlation_id = props.correlation_id),
                         body = mutation_obj.serialize_me())

        ch.basic_ack(delivery_tag = method.delivery_tag)
项目:BrundleFuzz    作者:carlosgprado    | 项目源码 | 文件源码
def on_evaluation_request(self, ch, method, props, body):
        """Callback for messages in the 'rpc_evaluations_queue'

        They say: "Hey, here are the execution results"
        """

        # This is the "remote procedure"
        # being called and returning a value
        ev_mutation_object = pickle.loads(body)
        self.process_execution_results(ev_mutation_object)

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
                         properties = pika.BasicProperties(
                                    correlation_id = props.correlation_id),
                         body = 'EVALUATION RECEIVED')

        ch.basic_ack(delivery_tag = method.delivery_tag)
项目:BrundleFuzz    作者:carlosgprado    | 项目源码 | 文件源码
def poll_mutation_queue(self):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_mutations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   body = 'POLL MUTATION QUEUE')

        self.ae.m_info("[x] Sent mutation queue poll")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
项目:BrundleFuzz    作者:carlosgprado    | 项目源码 | 文件源码
def send_evaluation(self, mutation_object):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_evaluations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   # This should be a serialized
                                   # evaluation object
                                   body = mutation_object.serialize_me())

        self.ae.m_info("[x] Sent evaluation")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
项目:BrundleFuzz    作者:carlosgprado    | 项目源码 | 文件源码
def poll_mutation_queue(self):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_mutations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   body = 'POLL MUTATION QUEUE')

        self.ae.m_info("[x] Sent mutation queue poll")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
项目:salt-terraform-demo    作者:dguitarbite    | 项目源码 | 文件源码
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        basicProperties = pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=self.corr_id,
        )

        self.channel.basic_publish(exchange='',
                                   routing_key=QUEUE,
                                   properties=basicProperties,
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events()
        return self.response
项目:salt-terraform-demo    作者:dguitarbite    | 项目源码 | 文件源码
def on_request(ch, method, props, body):

    global REQ_COUNT

    REQ_COUNT += 1
    print(" [x] Listening ... Request Number: %i" % REQ_COUNT)
    body = json.load(StringIO(body))
    operator = body['operator']
    values = body['data']

    print(" [.] mathOps(%s)" % operator)
    response = json.dumps(mathOps(values, operator=operator), 
                                  separators=(',', ':'))
    print(" Output: %s\n" % response)
    basicProperties = pika.BasicProperties(correlation_id=props.correlation_id)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=basicProperties,
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
项目:easy-job    作者:inb-co    | 项目源码 | 文件源码
def run(self, function, args=None, kwargs=None, retry_policy=None, callback=None):
        with self._pool.acquire() as cxn:
            cxn.channel.basic_publish(
                body=self.serialize(
                    {
                        "function": function,
                        "parameters": {
                            "args": args or tuple(),
                            "kwargs": kwargs or {},
                            "retry_policy": retry_policy,
                            "callback": callback
                        }
                    }
                ),
                exchange='',
                routing_key=self.queue_name,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                )
            )
            self.log(logging.DEBUG, "Task received : {}".format(function))
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def request(self, n):
        corr_id = str(uuid.uuid4())
        self.response[corr_id] = None

        #???????????????correlation_id
        self.channel.basic_publish(exchange='',
                             routing_key='compute_queue',
                             properties=pika.BasicProperties(
                               reply_to = self.callback_queue,
                               correlation_id = corr_id,
                                         ),
                 body=str(n))
        #???????
        while self.response[corr_id] is None:
            self.connection.process_data_events()
        return int(self.response[corr_id])
项目:flowder    作者:amir-khakshour    | 项目源码 | 文件源码
def publish(self, message):
        self._message_number_out += 1

        amqp_message_update_meta(message, self.get_meta())
        amqp_msg = amqp_message_encode(message)
        log.debug("Publish message #%s, AMQP message: %s" % (self._message_number_out, amqp_msg))
        properties = BasicProperties(
            app_id=self.app_id,
            content_type='application/json',
            content_encoding='utf-8',
            delivery_mode=2,  # persistent
        )
        try:
            yield self._channel.basic_publish(
                self.exchange_name,
                self.queue_out_routing_key,
                amqp_msg,
                properties=properties,
            )
        except ChannelClosed:
            self.retry_channel()
            self._cached_messages.append(message)
        except AMQPError:
            self.retry_connect()
            self._cached_messages.append(message)
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def callback(self, ch, method, properties, body):
        """
        ????,????????????rabbitmq???
        :param ch:  ???self.channel
        :param method:
        :param properties:???????????
        :param body:????
        :return:
        """
        before = time.monotonic()  # ?????????????
        exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
        exec_cmd.start()
        exec_cmd.join(self.timeout)
        after = time.monotonic()  # ????????????,????????????
        if (after - before) > self.timeout:  # ????????????????,??????????,???????????
            self.response = bytes("command exec timeout", "utf8")
        print(" [*] Got a task {}".format(str(body, "utf8)")))
        message = {"host": self.id, "data": self.response}
        ch.basic_publish(exchange="",
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=properties.correlation_id,),
                         body=bytes(str(message), "utf-8"))
        ch.basic_ack(delivery_tag=method.delivery_tag)
项目:python-logging-rabbitmq    作者:albertomr86    | 项目源码 | 文件源码
def message_worker(self):
        while 1:
            try:
                record, routing_key = self.queue.get()

                if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed:
                    self.open_connection()

                self.channel.basic_publish(
                    exchange=self.exchange,
                    routing_key=routing_key,
                    body=self.format(record),
                    properties=pika.BasicProperties(
                        delivery_mode=2
                    )
                )
            except Exception:
                self.channel, self.connection = None, None
                self.handleError(record)
            finally:
                self.queue.task_done()
                if self.close_after_emit:
                    self.close_connection()
项目:Malicious_Domain_Whois    作者:h-j-13    | 项目源码 | 文件源码
def whois_push(**whois_recv_info):
    global channel_whois
    # print 'whois push:', whois_recv_info
    result = ''
    try:
        result = json.dumps(whois_recv_info)
    except UnicodeDecodeError:
        for key in whois_recv_info.keys():
            if type(whois_recv_info[key]) == str:
                whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
        result = json.dumps(whois_recv_info)
    if result != '':
        channel_whois.basic_publish(
            exchange='',
            routing_key='whois_queue',
            body=json.dumps(result),
            properties=pika.BasicProperties(
                delivery_mode=2)
        )


# ????com_manage???????whois??????xxx?????????????
项目:Malicious_Domain_Whois    作者:h-j-13    | 项目源码 | 文件源码
def whois_push(**whois_recv_info):
    global channel_whois
    # print 'whois push:', whois_recv_info
    result = ''
    try:
        result = json.dumps(whois_recv_info)
    except UnicodeDecodeError:
        for key in whois_recv_info.keys():
            if type(whois_recv_info[key]) == str:
                whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
        result = json.dumps(whois_recv_info)
    if result != '':
        channel_whois.basic_publish(
            exchange='',
            routing_key='whois_queue',
            body=json.dumps(result),
            properties=pika.BasicProperties(
                delivery_mode=2)
        )


# ????com_manage???????whois??????xxx?????????????
项目:pymqant    作者:liangdas    | 项目源码 | 文件源码
def send_task(self):
        while   True:
            if self.send_queue.empty()&self.handle_stoping:
                self.send_stop_evt.set()
                return
            if not self.send_queue.empty():
                callinfo=self.send_queue.get_nowait()
                # ??RPC?????RPC????`rpc_queue`????????`reply_to`?`correlation_id`
                self._channel.basic_publish(exchange=self.Exchange,
                                            routing_key=self.Queue,
                                            properties=pika.BasicProperties(
                                                    reply_to = self.callback_queue,
                                            ),
                                            body=callinfo.body)

            gevent.sleep(0)
项目:EvalAI    作者:Cloud-CV    | 项目源码 | 文件源码
def publish_submission_message(challenge_id, phase_id, submission_id):

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='evalai_submissions', type='topic')

    # though worker is creating the queue(queue creation is idempotent too)
    # but lets create the queue here again, so that messages dont get missed
    # later on we can apply a check on queue message length to raise some alert
    # this way we will be notified of worker being up or not
    channel.queue_declare(queue='submission_task_queue', durable=True)

    message = {
        'challenge_id': challenge_id,
        'phase_id': phase_id,
        'submission_id': submission_id
    }
    channel.basic_publish(exchange='evalai_submissions',
                          routing_key='submission.*.*',
                          body=json.dumps(message),
                          properties=pika.BasicProperties(delivery_mode=2))    # make message persistent

    print(" [x] Sent %r" % message)
    connection.close()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def start_consuming(self):
        """Exchange, channel, consumer ready to start listening"""

        # send rpc request
        self.worker_id = None
        self.correlation_id = uuid.uuid4().hex
        self._channel.basic_publish(
            exchange=self.exchange,
            routing_key='%s.worker.%s' % (self.key, self.worker_type),
            properties=pika.BasicProperties(
                reply_to=self.queue,
                correlation_id=self.correlation_id,
                content_type='application/json',
            ),
            body=json.dumps(self.worker_kwargs),
        )
        log.info("%s: sent RPC request, will wait for response.", self.lbl)

        super(_HubTornadoConsumer, self).start_consuming()
项目:mist.api    作者:mistio    | 项目源码 | 文件源码
def send_to_worker(self, action, msg=''):
        if not self.consumer.worker_id:
            raise Exception("Routing key not yet received in RPC response.")
        routing_key = '%s.%s' % (self.consumer.worker_id, action)
        if isinstance(msg, basestring):
            self.consumer._channel.basic_publish(exchange=self.exchange,
                                                 routing_key=routing_key,
                                                 body=msg)
        else:
            self.consumer._channel.basic_publish(
                exchange=self.exchange,
                routing_key=routing_key,
                properties=pika.BasicProperties(
                    content_type='application/json',
                ),
                body=json.dumps(msg),
            )
项目:frontoxy    作者:fabienvauchelles    | 项目源码 | 文件源码
def _publish(self, exchange_name, queue_name, body, priority, retry):
        try:
            self._channel.basic_publish(exchange=exchange_name,
                                        routing_key=queue_name,
                                        body=body,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,
                                            priority=priority
                                        ))

        except exceptions.ConnectionClosed as err:
            if retry <= 0:
                raise err

            self.open()
            self._publish(exchange_name, queue_name, body, priority, retry - 1)
项目:rebus    作者:airbus-seclab    | 项目源码 | 文件源码
def _send_signal(self, signal_name, args):
        # Send a signal on the exchange
        body = {'signal_name': signal_name, 'args': args}
        body = serializer.dumps(body)
        b = False
        while not b:
            try:
                self.channel.basic_publish(
                    exchange='rebus_signals', routing_key='', body=body,
                    properties=pika.BasicProperties(delivery_mode=2,))
                b = True
            except pika.exceptions.ConnectionClosed:
                log.info("Disconnected (in _send_signal). "
                         "Trying to reconnect...")
                self._reconnect()
                time.sleep(0.5)

    # TODO Check is the key is valid
项目:SolutionGambling    作者:eganwall    | 项目源码 | 文件源码
def publish_message(queue_name, message):
    message_body = json.dumps(message)
    message_id = message['message_id']

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message_body,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
    )
    log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format(
        queue_name, message['comment_id'], message['username'], message['comment_body'])
    logger.log_info_message(message_id, LogUtilityConstants.message_published_event,
                            'sub_monitor', log_msg)

    connection.close()
项目:Simplechaindb    作者:BUAANLSDE    | 项目源码 | 文件源码
def publish(queue_name,body='Hello World!',exchange=''):
    """Publish the content or message to the queue

        pika.BasicProperties(delivery_mode=2) will make message persistent

    Args:
        queue_name: the mq`s name
        body(str) —— the content will be publish
        exchange:

    """

    channel = get_channel(queue_name)
    if channel:
        channel.basic_publish(exchange=exchange,
                                     routing_key=queue_name,
                                     body=body,properties=pika.BasicProperties(delivery_mode=2))
    # logger.info('ramq publish queue_name: ' + str(queue_name) + ' ,body: \n' + str(body) + '\n')
    # print(" [x] Sent " + body)
项目:python_rabbitmq_multiprocessing_crawl    作者:ghotiv    | 项目源码 | 文件源码
def put_queue_list(self, queue_name=None, message_list=None):
        """put queue to list"""
        if not queue_name:
            return None
        try:
            if not message_list:
                return None
            if isinstance(message_list, dict):
                message_list = [message_list]
            self.__connect()
            self.channel.queue_declare(queue=queue_name, durable=True)
            for message in message_list:
                message = json.dumps(message)
                self.channel.basic_publish(
                    exchange='',
                    routing_key=queue_name,
                    body=message,
                    properties=pika.BasicProperties(delivery_mode=2, ))
            self.connection.close()
        except Exception as e:
            print e
            return None
项目:xiaoxiang-oj    作者:hanfei19910905    | 项目源码 | 文件源码
def call(self, submit_id, result_path, data_path, judge_path):
        rpc_body = encode(submit_id, result_path, data_path, judge_path)
        for i in range(5):
            try:
                app.logger.info("try!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
                self.channel.basic_publish(exchange='',
                                           routing_key=self.ch,
                                           properties=pika.BasicProperties(
                                                 delivery_mode=2,
                                                 ),
                                           body=rpc_body)
                return
            except pika.exceptions.ConnectionClosed:
                self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))

                self.channel = self.connection.channel()

                self.channel.queue_declare(queue=self.ch, durable=True)

        app.logger.info("local!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
        #convert to local judge. that's a sync way!
        from .sandbox_server import SandBoxService
        SandBoxService.local_exec(submit_id, result_path, data_path, judge_path)
项目:ssp-campaigns    作者:bloogrox    | 项目源码 | 文件源码
def publish(self, payload):
        with rmq_pool.acquire() as cxn:
            try:
                cxn.channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
                cxn.channel.basic_publish(
                    body=json.dumps(payload),
                    exchange='',
                    routing_key=QUEUE_NAME,
                    properties=pika.BasicProperties(
                        content_type='plain/text'
                    )
                )
                subscriber_id = payload['subscriber']['_id']
                logger.info(f"Queue.publish published: {subscriber_id}")
            except Exception as e:
                logger.error(f"Queue.publish exception: {e}")
项目:djangoStatusPanel    作者:okar1    | 项目源码 | 文件源码
def sendRegisterMessage(server,routingKeys):

    exchangeName="qos.service"
    queueName="heartbeatService"
    msgHeaders={"__TypeId__":"com.tecomgroup.qos.communication.message.ServerStarted"}
    msgBody={"originName":None,"serverName":""}

    serverConfig = server.getConfigObject()
    errors=[]
    mqConf = getMqConf(serverConfig['mq'], server.name, errors)

    # raise exception only if all mq's are down, so message sending is impossible
    if mqConf is None:
        raise Exception("sendRegisterMessage error: " + str(errors))

    connection=pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl']))
    channel = connection.channel()

    channel.exchange_declare(exchange=exchangeName, exchange_type='topic', durable=True)
    channel.queue_declare(queue=queueName, durable=True,arguments={'x-message-ttl':1800000})
    channel.queue_bind(queue=queueName, exchange=exchangeName, routing_key="server.agent.register")

    for key in routingKeys:
        channel.basic_publish(
            exchange=exchangeName,
            routing_key=key,
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
                content_type='application/json',
                content_encoding='UTF-8',
                priority=0,
                expiration="86400000",
                headers=msgHeaders),
            body=json.dumps(msgBody).encode('UTF-8')
        )
    connection.close()
项目:roomfinder    作者:GuillaumeMorini    | 项目源码 | 文件源码
def send_message_to_queue(message):
    global corr_id
    global response
    global connection
    global channel
    global callback_queue

    response=None
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="37.187.22.103",port=2765,heartbeat_interval=30))  
    channel = connection.channel()
    result=channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    channel.basic_consume(on_response, no_ack=True,
                                   queue=callback_queue)
    corr_id=str(uuid.uuid4())

    response = None
    corr_id =  str(uuid.uuid4())
    channel.basic_publish(  exchange='',
                            routing_key="rpc_queue",
                            properties=pika.BasicProperties(
                                         reply_to = callback_queue,
                                         correlation_id = corr_id),
                            body=message)

    print(" [x] Sent data to RabbitMQ")   

    while response is None:
        connection.process_data_events()
    print(" [x] Get response from RabbitMQ")   
    return str(response)
项目:roomfinder    作者:GuillaumeMorini    | 项目源码 | 文件源码
def send_message_to_queue(message):
    global corr_id
    global response
    global connection
    global channel
    global callback_queue

    response=None
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq,port=int(rabbitmq_port),heartbeat_interval=30))  
    channel = connection.channel()
    result=channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    channel.basic_consume(on_response, no_ack=True,
                                   queue=callback_queue)
    corr_id=str(uuid.uuid4())

    response = None
    corr_id =  str(uuid.uuid4())
    channel.basic_publish(  exchange='',
                            routing_key="rpc_queue",
                            properties=pika.BasicProperties(
                                         reply_to = callback_queue,
                                         correlation_id = corr_id),
                            body=message)

    print(" [x] Sent data to RabbitMQ")   

    while response is None:
        connection.process_data_events()
    print(" [x] Get response from RabbitMQ")   
    print "response: "+str(response)
    return response
项目:eagle    作者:saga92    | 项目源码 | 文件源码
def on_request(self, ch, method, props, body):
        response = self.receive(body)
        ch.basic_publish(exchange='',
                routing_key=props.reply_to,
                properties=pika.BasicProperties(correlation_id = \
                        props.correlation_id),
                body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def request(ch, method, props, body):
    print(" [.] increase(%s)"  % (body,))

    response = increase(int(body))

    #???????????????correlation_id???
    ch.basic_publish(exchange='',
              routing_key=props.reply_to,
              properties=pika.BasicProperties(correlation_id = \
                                          props.correlation_id),
                         body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def request(self, n):
        self.response = None
        #??????????????
        self.channel.basic_publish(exchange='',
                             routing_key='compute_queue',
                             properties=pika.BasicProperties(
                               reply_to =self.callback_queue,
                                         ),
                             body=str(n))
        #???????
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
项目:myRabbit    作者:bsab    | 项目源码 | 文件源码
def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.

        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.

        """
        if self._stopping:
            return

        # controllo che il servizio di acquisizione sia attivo ...
        # if not self._winservice.isRunning():
        #    LOGGER.info('Win Service is not running...')
        #    print 'Win Service is not running...'
        #    return

        message = self.get_message_from_selected_data();
        #print "***************************************"
        #print json.dumps(message, ensure_ascii=False)
        #print "***************************************"

        properties = pika.BasicProperties(app_id='myrabbit_py-publisher',
                                          content_type='application/json')

        self._channel.basic_publish(self.EXCHANGE,
                                    self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)

        self._message_number += 1
        self._deliveries.append(self._message_number)
        logger.info('Published message # %i', self._message_number)
        self.schedule_next_message()
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def send(self, body, **kw):
        return self.channel.basic_publish(self.exchange,
                                          self.routing_key,
                                          json.dumps(body),
                                          properties=pika.BasicProperties(
                                             delivery_mode = 2, # make message persistent
                                          ),
                                          **kw)
项目:fmn.sse    作者:fedora-infra    | 项目源码 | 文件源码
def push_message(self, msg):
        self._check_connection()

        if self.channel.basic_publish(exchange=self.exchange,
                                      routing_key=self.exchange + '-' + self.queue_name,
                                      body=msg,
                                      properties=pika.BasicProperties(
                                          delivery_mode=2)):
            print('message sent')
        else:
            print('ERROR: message failed to send')
项目:SkySpyWatch    作者:nstarpost    | 项目源码 | 文件源码
def enqueue_flight_snippet(flight_snippet):
    """Add items from the flight_dictionary to rabbitmq queue as json strings"""
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=flight_snippet,
                          properties=pika.BasicProperties(delivery_mode=2)  # make message persistent
                          )


# this function reads a dictionary of a flight snapshot and returns a different and easier to work with dictionary
项目:dazzar    作者:Philaeux    | 项目源码 | 文件源码
def produce(self, message):
        """Publish a message to add inside the queue.

        Args;
            message: object to add inside the queue.
        """
        self.channel.basic_publish(exchange='',
                                   routing_key='dazzar_jobs',
                                   body=pickle.dumps(message),
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,  # make message persistent
                                   ))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:EventMiner    作者:hltcoe    | 项目源码 | 文件源码
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def start(self, cmd, routing_key="remote.call"):
        self.response = []  # ??????????
        self.correlation_id = str(uuid.uuid4())
        self.log.info("exec command {}".format(cmd))
        self.log.debug("routing key {}".format(routing_key))
        self.channel.basic_publish(exchange=self.exchange,
                                   routing_key=routing_key,  # ?routing key???????????????routing key???
                                   properties=pika.BasicProperties(
                                       reply_to=self.queue_name,
                                       correlation_id=self.correlation_id
                                   ),
                                   body=cmd)
        before = time.monotonic()  # ??????????
        after_len = 0  # ?????????????
        while True:
            if len(self.response) != after_len:  # ?????????,????????????
                before_len = len(self.response)
            else:
                before_len = after_len  # ????????????????????????????,??????,????????,?????????
                time.sleep(0.4)
            self.connection.process_data_events()  # ??????,?????????????
            if len(self.response) == before_len and before_len:
                break
            after = time.monotonic()  # ?????????????
            if (after - before) > self.timeout:  # ???????16s,?????????
                break
        return self.response  # ??????????
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)