Python asyncio 模块,DatagramTransport() 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用asyncio.DatagramTransport()

项目:pyaqara    作者:javefang    | 项目源码 | 文件源码
def test_unicast():
    """test_unicast"""
    mock_transport = DatagramTransport()
    mock_transport.sendto = MagicMock()
    mock_protocol = protocol.AqaraProtocol()
    mock_protocol.transport = mock_transport
    test_data = {"cmd": "read"}
    test_addr = "10.10.10.10"

    mock_protocol.unicast(test_addr, test_data)

    test_data_encoded = json.dumps(test_data).encode('utf-8')
    mock_transport.sendto.assert_called_with(test_data_encoded, (test_addr, GATEWAY_PORT))
项目:pyaqara    作者:javefang    | 项目源码 | 文件源码
def test_broadcast():
    """test_broadcast"""
    mock_transport = DatagramTransport()
    mock_transport.sendto = MagicMock()
    mock_protocol = protocol.AqaraProtocol()
    mock_protocol.transport = mock_transport
    test_data = {"cmd": "read"}

    mock_protocol.broadcast(test_data)

    test_data_encoded = json.dumps(test_data).encode('utf-8')
    mock_transport.sendto.assert_called_with(test_data_encoded, (MCAST_ADDR, MCAST_PORT))
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_dgram_not_implemented(self):
        transport = asyncio.DatagramTransport()

        self.assertRaises(NotImplementedError, transport.sendto, 'data')
        self.assertRaises(NotImplementedError, transport.abort)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_dgram_not_implemented(self):
        transport = asyncio.DatagramTransport()

        self.assertRaises(NotImplementedError, transport.sendto, 'data')
        self.assertRaises(NotImplementedError, transport.abort)
项目:bit-torrent    作者:borzunov    | 项目源码 | 文件源码
def connection_made(self, transport: asyncio.DatagramTransport):
        pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_dgram_not_implemented(self):
        transport = asyncio.DatagramTransport()

        self.assertRaises(NotImplementedError, transport.sendto, 'data')
        self.assertRaises(NotImplementedError, transport.abort)