Python zmq 模块,SNDMORE 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用zmq.SNDMORE

项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def send(self, topic, payload):
        '''Send a message with topic, payload
`
        Topic is a unicode string. It will be sent as utf-8 encoded byte array.
        Payload is a python dict. It will be sent as a msgpack serialized dict.

        If payload has the key '__raw_data__'
        we pop if of the payload and send its raw contents as extra frames
        everything else need to be serializable
        the contents of the iterable in '__raw_data__'
        require exposing the pyhton memoryview interface.
        '''
        if '__raw_data__' not in payload:
            self.socket.send_string(topic, flags=zmq.SNDMORE)
            self.socket.send(serializer.dumps(payload, use_bin_type=True))
        else:
            extra_frames = payload.pop('__raw_data__')
            assert(isinstance(extra_frames, (list, tuple)))
            self.socket.send_string(topic, flags=zmq.SNDMORE)
            self.socket.send(serializer.dumps(payload), flags=zmq.SNDMORE)
            for frame in extra_frames[:-1]:
                self.socket.send(frame, flags=zmq.SNDMORE, copy=True)
            self.socket.send(extra_frames[-1], copy=True)
项目:mflow    作者:datastreaming    | 项目源码 | 文件源码
def send(self, message, send_more=False, block=True, as_json=False):

        flags = 0
        if send_more:
            flags = zmq.SNDMORE
        if not block:
            flags = flags | zmq.NOBLOCK

        try:
            if as_json:
                self.socket.send_json(message, flags)
            else:
                self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
        except zmq.Again as e:
            if not block:
                pass
            else:
                raise e
        except zmq.ZMQError as e:
            logger.error(sys.exc_info()[1])
            raise e
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def start_server(self, new_address):
        self.thread_pipe.send_string('Bind', flags=zmq.SNDMORE)
        self.thread_pipe.send_string(new_address)
        response = self.thread_pipe.recv_string()
        msg = self.thread_pipe.recv_string()
        if response == 'Bind OK':
            host, port = msg.split(':')
            self.host = host
            self.port = port
            return

        # fail logic
        logger.error(msg)

        # for service we shut down
        if self.g_pool.app == 'service':
            audio.say("Error: Port already in use.")
            self.notify_all({'subject': 'service_process.should_stop'})
            return

        # for capture we try to bind to a arbitrary port on the first external interface
        else:
            self.thread_pipe.send_string('Bind', flags=zmq.SNDMORE)
            self.thread_pipe.send_string('tcp://*:*')
            response = self.thread_pipe.recv_string()
            msg = self.thread_pipe.recv_string()
            if response == 'Bind OK':
                host, port = msg.split(':')
                self.host = host
                self.port = port
            else:
                logger.error(msg)
                raise Exception("Could not bind to port")
项目:esys-pbi    作者:fsxfreak    | 项目源码 | 文件源码
def thread_loop(self, context, pipe):
        poller = zmq.Poller()
        ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url)
        poller.register(pipe, zmq.POLLIN)
        remote_socket = None

        while True:
            items = dict(poller.poll())
            if pipe in items:
                cmd = pipe.recv_string()
                if cmd == 'Exit':
                    break
                elif cmd == 'Bind':
                    new_url = pipe.recv_string()
                    if remote_socket:
                        poller.unregister(remote_socket)
                        remote_socket.close(linger=0)
                    try:
                        remote_socket = context.socket(zmq.REP)
                        remote_socket.bind(new_url)
                    except zmq.ZMQError as e:
                        remote_socket = None
                        pipe.send_string("Error", flags=zmq.SNDMORE)
                        pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e))
                    else:
                        pipe.send_string("Bind OK", flags=zmq.SNDMORE)
                        # `.last_endpoint` is already of type `bytes`
                        pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b""))
                        poller.register(remote_socket, zmq.POLLIN)
            if remote_socket in items:
                self.on_recv(remote_socket, ipc_pub)

        self.thread_pipe = None
项目:dist_hyperas    作者:osh    | 项目源码 | 文件源码
def send_json(self, identity, obj):
        self.stream.send( identity, zmq.SNDMORE )
        self.stream.send_json(obj)
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def send_supvisors_status(self, status):
        """ This method sends a serialized form of the supvisors status
        through the socket. """
        self.logger.trace('send SupvisorsStatus {}'.format(status))
        self.socket.send_string(EventHeaders.SUPVISORS, zmq.SNDMORE)
        self.socket.send_json(status.serial())
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def send_address_status(self, status):
        """ This method sends a serialized form of the address status
        through the socket. """
        self.logger.trace('send RemoteStatus {}'.format(status))
        self.socket.send_string(EventHeaders.ADDRESS, zmq.SNDMORE)
        self.socket.send_json(status.serial())
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def send_application_status(self, status):
        """ This method sends a serialized form of the application status
        through the socket. """
        self.logger.trace('send ApplicationStatus {}'.format(status))
        self.socket.send_string(EventHeaders.APPLICATION, zmq.SNDMORE)
        self.socket.send_json(status.serial())
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def send_process_event(self, address, event):
        """ This method sends a process event through the socket. """
        # build the event before it is sent
        evt = event.copy()
        evt['address'] = address
        self.logger.trace('send Process Event {}'.format(evt))
        self.socket.send_string(EventHeaders.PROCESS_EVENT, zmq.SNDMORE)
        self.socket.send_json(evt)
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def send_process_status(self, status):
        """ This method sends a serialized form of the process status
        through the socket. """
        self.logger.trace('send Process Status {}'.format(status))
        self.socket.send_string(EventHeaders.PROCESS_STATUS, zmq.SNDMORE)
        self.socket.send_json(status.serial())
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        """send a sequence of buffers as a multipart message

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            SNDMORE is handled automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.
        """
        for msg in msg_parts[:-1]:
            self.send(msg, SNDMORE|flags, copy=copy, track=track)
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        """send a sequence of buffers as a multipart message

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            SNDMORE is handled automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.
        """
        for msg in msg_parts[:-1]:
            self.send(msg, SNDMORE|flags, copy=copy, track=track)
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        """send a sequence of buffers as a multipart message

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            SNDMORE is handled automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.
        """
        for msg in msg_parts[:-1]:
            self.send(msg, SNDMORE|flags, copy=copy, track=track)
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        """send a sequence of buffers as a multipart message

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            SNDMORE is handled automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.
        """
        for msg in msg_parts[:-1]:
            self.send(msg, SNDMORE|flags, copy=copy, track=track)
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)
项目:pynutmeg    作者:kitizz    | 项目源码 | 文件源码
def _publish(self, msg, binary_data):
        # Check socketlock
        self.socket_lock.acquire()
        try:
            # Inject task ID (thread safe in here)
            task = Task(self, self.task_count)
            self.tasks[self.task_count] = task
            msg['id'] = self.task_count
            self.task_count += 1

            # Send message
            # print("Sending:", "Nutmeg")
            self.pubsock.send(b"Nutmeg", flags=zmq.SNDMORE)
            # print("Sending:", msg)
            self.pubsock.send_json(msg, flags=zmq.SNDMORE)
            # Then data
            for data in binary_data:
                # print("Sending binary")
                self.pubsock.send(data, flags=zmq.SNDMORE, copy=True)

            # Makes code nicer just simply having a "null message"
            self.pubsock.send(b'')

            return task

        except IOError:
            raise

        finally:
            self.socket_lock.release()
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def send_changes(socket, discussion, changeset):
    order = next(_counter)
    socket.send(str(discussion).encode('ascii'), zmq.SNDMORE)
    socket.send(str(order).encode('ascii'), zmq.SNDMORE)
    socket.send_json(changeset)
    log.debug("sent %d %s %s " % (order, discussion, changeset))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        """send a sequence of buffers as a multipart message

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            SNDMORE is handled automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.
        """
        # typecheck parts before sending:
        for i,msg in enumerate(msg_parts):
            if isinstance(msg, (zmq.Frame, bytes, _buffer_type)):
                continue
            try:
                _buffer_type(msg)
            except Exception as e:
                rmsg = repr(msg)
                if len(rmsg) > 32:
                    rmsg = rmsg[:32] + '...'
                raise TypeError(
                    "Frame %i (%s) does not support the buffer interface." % (
                    i, rmsg,
                ))
        for msg in msg_parts[:-1]:
            self.send(msg, SNDMORE|flags, copy=copy, track=track)
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)