Python SimpleXMLRPCServer 模块,SimpleXMLRPCServer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用SimpleXMLRPCServer.SimpleXMLRPCServer()

项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def parse_request(self):
        res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self)
        if not res:
            return res
        database_name = self.path[1:]
        if not database_name:
            self.tryton = {'user': None, 'session': None}
            return res
        try:
            method, up64 = self.headers['Authorization'].split(None, 1)
            if method.strip().lower() == 'basic':
                user, password = base64.decodestring(up64).split(':', 1)
                user_id, session = security.login(database_name, user,
                        password)
                self.tryton = {'user': user_id, 'session': session}
                return res
        except Exception:
            pass
        self.send_error(401, 'Unauthorized')
        self.send_header("WWW-Authenticate", 'Basic realm="Tryton"')
        return False
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def rpcserver(self):
        """
        rpcserver is intended to be launched as a separate thread. This class
        will launch an RPC server that can pass and receive debugging events
        to debugging clients. 
        """
        try:
            self.log.info("plugin.http.ObjectEditor: starting XML RPC Server")
            server = SimpleXMLRPCServer(addr=("localhost", 20758), logRequests=False, allow_none=1)
            #server.register_function(self.disable, "")
            server.register_function(self.connect, "connect")
            server.serve_forever()
        except:
            self.log.error("plugin.http.ObjectEditor: rpcserver: error " \
                            "connecting to remote")
            self.log.error(sys.exc_info())
项目:creator-system-test-framework    作者:CreatorDev    | 项目源码 | 文件源码
def startXMLRPCServer(rpcInstance, address, port, logFilename):

    rpcInstance.logger = createLogger(logFilename)
    #server = SimpleXMLRPCServer.SimpleXMLRPCServer((address, port), allow_none=True, logRequests = True)
    server = VerboseFaultXMLRPCServer((address, port), LoggingSimpleXMLRPCRequestHandler, allow_none=True)
    print "SimpleXMLRPCServer Started. Listening on %s:%d..." % (address, port, )
    server.register_instance(rpcInstance, True)
    try:
        while True:
            server.handle_request()
        #server.serve_forever()
    except KeyboardInterrupt:
        pass
    except Exception:
        raise
    finally:
        print "Exiting"
        server.server_close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_basic(self):
        # check that flag is false by default
        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_fail_with_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("x-exception"), expected_err)
                self.assertTrue(e.headers.get("x-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_basic(self):
        # check that flag is false by default
        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_fail_with_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("x-exception"), expected_err)
                self.assertTrue(e.headers.get("x-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
项目:android_malware_detection    作者:congyuandong    | 项目源码 | 文件源码
def main():
    autoWait()
    ea = ScreenEA()

    server = SimpleXMLRPCServer(("localhost", 9000))
    server.register_function(is_connected, "is_connected")

    server.register_function(wrapper_get_raw, "get_raw")
    server.register_function(wrapper_get_function, "get_function")
    server.register_function(wrapper_Heads, "Heads")
    server.register_function(wrapper_Functions, "Functions")

    server.register_instance(IDAWrapper())

    server.register_function(wrapper_quit, "quit")
    server.serve_forever()

    qexit(0)
项目:android_malware_detection    作者:congyuandong    | 项目源码 | 文件源码
def main():
    autoWait()
    ea = ScreenEA()

    server = SimpleXMLRPCServer(("localhost", 9000))
    server.register_function(is_connected, "is_connected")

    server.register_function(wrapper_get_raw, "get_raw")
    server.register_function(wrapper_get_function, "get_function")
    server.register_function(wrapper_Heads, "Heads")
    server.register_function(wrapper_Functions, "Functions")

    server.register_instance(IDAWrapper())

    server.register_function(wrapper_quit, "quit")
    server.serve_forever()

    qexit(0)
项目:QT4i    作者:Tencent    | 项目源码 | 文件源码
def __init__(self, addr):
        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr=addr, logRequests=False, allow_none=True, encoding='UTF-8')
        self.address, self.port = addr
        self._start_work_time = datetime.datetime.fromtimestamp(time.time())
        self._last_interactive_time = None
        self.register_introspection_functions()
        self.register_function(self.get_start_work_time , 'get_start_work_time')
        self.register_function(self.get_last_interactive_time , 'get_last_interactive_time')
        self.register_function(self.get_pid, 'get_pid')
        self.register_function(self.get_log, 'get_log')
        self.register_function(self.get_binary_data, 'get_binary_data')
        self.register_function(self.send_binary_data, 'send_binary_data')
        self.register_function(self.is_working, 'is_working')
        self.register_function(self.shutdown_driver, 'shutdown_driver')
        self.register_package()
        print_msg('Server(%s:%s) - Started' % (self.address, self.port))
项目:QT4i    作者:Tencent    | 项目源码 | 文件源码
def _dispatch(self, method, params):
        _result = None
        print_msg('--- --- --- --- --- ---')
        print_msg('%s <<< %s' % (method, params))
        try: 
            _result = SimpleXMLRPCServer.SimpleXMLRPCServer._dispatch(self, method, params)
        except Exception, e: 
            print_err('%s - Error: %s' % (method, e))
            print_err('%s - Traceback:\n%s' % (method, traceback.format_exc()))
            raise
        if method in XMLRPCServer.LOG_FILTERED_METHODS:
            print_msg('%s >>> Binary Data' %  method)
        else:
            print_msg('%s >>> %s' % (method, _result))
        self._last_interactive_time = datetime.datetime.fromtimestamp(time.time())
        return _result
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_basic(self):
        # check that flag is false by default
        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_fail_with_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("x-exception"), expected_err)
                self.assertTrue(e.headers.get("x-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_basic(self):
        # check that flag is false by default
        flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_fail_with_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("x-exception"), expected_err)
                self.assertTrue(e.headers.get("x-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def main():
    autoWait()
    ea = ScreenEA()

    server = SimpleXMLRPCServer(("localhost", 9000))
    server.register_function(is_connected, "is_connected")

    server.register_function(wrapper_get_raw, "get_raw")
    server.register_function(wrapper_get_function, "get_function")
    server.register_function(wrapper_Heads, "Heads")
    server.register_function(wrapper_Functions, "Functions")

    server.register_instance(IDAWrapper())

    server.register_function(wrapper_quit, "quit")
    server.serve_forever()

    qexit(0)
项目:DroidWatcher    作者:suemi994    | 项目源码 | 文件源码
def main():
    autoWait()
    ea = ScreenEA()

    server = SimpleXMLRPCServer(("localhost", 9000))
    server.register_function(is_connected, "is_connected")

    server.register_function(wrapper_get_raw, "get_raw")
    server.register_function(wrapper_get_function, "get_function")
    server.register_function(wrapper_Heads, "Heads")
    server.register_function(wrapper_Functions, "Functions")

    server.register_instance(IDAWrapper())

    server.register_function(wrapper_quit, "quit")
    server.serve_forever()

    qexit(0)
项目:Immunity-py    作者:JohnTroony    | 项目源码 | 文件源码
def setupXMLRPCSocket(self):
                """
                Listen for XML-RPC requests on a socket
                """
                import SimpleXMLRPCServer
                import threading
                host="0.0.0.0"
                if self.XMLRPCport==0:
                        return 
                try:
                    server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport),allow_none=True)
                except TypeError:
                    print "2.4 Python did not allow allow_none=True!"
                    server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport))
                self.log("Set up XMLRPC Socket on %s port %d"%(host, self.XMLRPCport))
                listener_object=listener_instance(self)
                server.register_instance(listener_object)
                #start new thread.
                lt=listener_thread(server, listener_object)
                lt.start()
                self.listener_thread=lt 
                self.listener=listener_object
                return
项目:Immunity-py    作者:JohnTroony    | 项目源码 | 文件源码
def setupXMLRPCSocket(self):
                """
                Listen for XML-RPC requests on a socket
                """
                import SimpleXMLRPCServer
                import threading
                host="0.0.0.0"
                if self.XMLRPCport==0:
                        return 
                try:
                    server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport),allow_none=True)
                except TypeError:
                    print "2.4 Python did not allow allow_none=True!"
                    server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport))
                self.log("Set up XMLRPC Socket on %s port %d"%(host, self.XMLRPCport))
                listener_object=listener_instance(self)
                server.register_instance(listener_object)
                #start new thread.
                lt=listener_thread(server, listener_object)
                lt.start()
                self.listener_thread=lt 
                self.listener=listener_object
                return
项目:ravel    作者:ravel-net    | 项目源码 | 文件源码
def __init__(self, host, port, consumer=None):
        """host: the hostname or IP address of the RPC server
           port: the port for the RPC server
           consumer: the consuming object for received messages"""
        self.host = host
        self.port = port
        self.consumer = consumer
        self.server = SimpleXMLRPCServer((host, port),
                                         logRequests=False,
                                         allow_none=True)
        self.server.register_function(self._client_send, "client_send")
        self.msg = None
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def __init__(self, server_address, HandlerClass, logRequests=1):
        self.handlers = set()
        SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, server_address,
            HandlerClass, logRequests)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET,
                socket.SO_REUSEADDR, 1)
        self.socket.setsockopt(socket.SOL_SOCKET,
            socket.SO_KEEPALIVE, 1)
        SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def server_close(self):
        SimpleXMLRPCServer.SimpleXMLRPCServer.server_close(self)
        for handler in self.handlers.copy():
            self.shutdown_request(handler.request)
项目:krafters    作者:GianlucaBortoli    | 项目源码 | 文件源码
def run(self):
        server = SimpleXMLRPCServer(('127.0.0.1', PSO_APPEND_PORT), allow_none=True)
        server.register_function(self.pso_append, "append_entry")
        server.serve_forever()
项目:krafters    作者:GianlucaBortoli    | 项目源码 | 文件源码
def run(self):
        server = SimpleXMLRPCServer(('127.0.0.1', PAXOS_APPEND_PORT), allow_none=True)
        server.register_function(self.messenger.paxos_append, "append_entry")
        server.serve_forever()
项目:zsky    作者:wenguonideshou    | 项目源码 | 文件源码
def rpc_server():
    rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 8004), logRequests=False)
    rpcserver.register_function(announce, 'announce')
    print 'Starting xml rpc server...'
    rpcserver.serve_forever()
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def __init__(self, addr,
                 requestHandler=SecureXMLRPCRequestHandler,
                 logRequests=1):
        """
        This is the exact same code as SimpleXMLRPCServer.__init__
        except it calls SecureTCPServer.__init__ instead of plain
        old TCPServer.__init__
        """
        self.funcs = {}
        self.logRequests = logRequests
        self.instance = None
        SecureTCPServer.__init__(self, addr, requestHandler)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def run(self):
        self.server = SimpleXMLRPCServer(('localhost', SIMULATOR_PORT), allow_none = True)
        self.server.register_instance(self.simulator)

        log.info("Listening on %s" % SIMULATOR_PORT)
        self.server.serve_forever()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        self.evt = threading.Event()
        # start server thread to handle requests
        serv_args = (self.evt, self.request_count, self.requestHandler)
        threading.Thread(target=self.threadFunc, args=serv_args).start()

        # wait for the server to be ready
        self.evt.wait(10)
        self.evt.clear()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def tearDown(self):
        # wait on the server thread to terminate
        self.evt.wait(10)

        # disable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False

# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_introspection4(self):
        # the SimpleXMLRPCServer doesn't support signatures, but
        # at least check that we can try making the call
        try:
            p = xmlrpclib.ServerProxy(URL)
            divsig = p.system.methodSignature('div')
            self.assertEqual(divsig, 'signatures not supported')
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_dotted_attribute(self):
        # Raises an AttributeError because private methods are not allowed.
        self.assertRaises(AttributeError,
                          SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')

        self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
        # Get the test to run faster by sending a request with test_simple1.
        # This avoids waiting for the socket timeout.
        self.test_simple1()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_fail_no_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # The two server-side error headers shouldn't be sent back in this case
                self.assertTrue(e.headers.get("X-exception") is None)
                self.assertTrue(e.headers.get("X-traceback") is None)
        else:
            self.fail('ProtocolError not raised')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        self.evt = threading.Event()
        # start server thread to handle requests
        serv_args = (self.evt, self.request_count, self.requestHandler)
        threading.Thread(target=self.threadFunc, args=serv_args).start()

        # wait for the server to be ready
        self.evt.wait(10)
        self.evt.clear()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def tearDown(self):
        # wait on the server thread to terminate
        self.evt.wait(10)

        # disable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False

# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_introspection4(self):
        # the SimpleXMLRPCServer doesn't support signatures, but
        # at least check that we can try making the call
        try:
            p = xmlrpclib.ServerProxy(URL)
            divsig = p.system.methodSignature('div')
            self.assertEqual(divsig, 'signatures not supported')
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_dotted_attribute(self):
        # Raises an AttributeError because private methods are not allowed.
        self.assertRaises(AttributeError,
                          SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')

        self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
        # Get the test to run faster by sending a request with test_simple1.
        # This avoids waiting for the socket timeout.
        self.test_simple1()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_fail_no_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # The two server-side error headers shouldn't be sent back in this case
                self.assertTrue(e.headers.get("X-exception") is None)
                self.assertTrue(e.headers.get("X-traceback") is None)
        else:
            self.fail('ProtocolError not raised')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
项目:2PhaseCommit    作者:aysebilgegunduz    | 项目源码 | 文件源码
def main():
    try:
        port = int(sys.argv[1]) #5001
        pid = str(sys.argv[2]) #"1"
        server = SimpleXMLRPCServer(("localhost", port))
        print("listen")
        participant = Participant(('participant'+pid), port) #pid, port
        server.register_instance(participant)
        server.serve_forever()
    except Exception,e:
        print e.args
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def rpc_server():
    rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 8004), logRequests=False)
    rpcserver.register_function(announce, 'announce')
    print 'Starting xml rpc server...'
    rpcserver.serve_forever()
项目:pyactor    作者:pedrotgn    | 项目源码 | 文件源码
def __init__(self, addr):
        threading.Thread.__init__(self)
        ip, port = addr
        self.addr = addr

        self.server = SimpleXMLRPCServer((ip, port),
                                         requestHandler=RequestHandler,
                                         logRequests=False,
                                         allow_none=True)
        # self.server.register_introspection_functions()
项目:gw_trade    作者:stormtrader    | 项目源码 | 文件源码
def init_server(self):
        self.server = SimpleXMLRPCServer(("localhost", 8000), allow_none=True)
        self.server.register_function(self.is_even, "is_even")
        self.server.register_function(self.db.add_condition_order, "add_condition_order")
        self.server.register_function(self.db.get_todo_orders, "get_todo_orders")
        self.server.register_function(self.db.get_all_orders, "get_all_orders")
        self.server.register_function(self.db.cancel_cond_order, "cancel_cond_order")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        # enable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True

        self.evt = threading.Event()
        # start server thread to handle requests
        serv_args = (self.evt, self.request_count, self.requestHandler)
        threading.Thread(target=self.threadFunc, args=serv_args).start()

        # wait for the server to be ready
        self.evt.wait(10)
        self.evt.clear()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def tearDown(self):
        # wait on the server thread to terminate
        test_support.gc_collect() # to close the active connections
        self.evt.wait(10)

        # disable traceback reporting
        SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False

# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer.  This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_introspection4(self):
        # the SimpleXMLRPCServer doesn't support signatures, but
        # at least check that we can try making the call
        try:
            p = xmlrpclib.ServerProxy(URL)
            divsig = p.system.methodSignature('div')
            self.assertEqual(divsig, 'signatures not supported')
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_dotted_attribute(self):
        # Raises an AttributeError because private methods are not allowed.
        self.assertRaises(AttributeError,
                          SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')

        self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
        # Get the test to run faster by sending a request with test_simple1.
        # This avoids waiting for the socket timeout.
        self.test_simple1()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_fail_no_info(self):
        # use the broken message class
        SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error), e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # The two server-side error headers shouldn't be sent back in this case
                self.assertTrue(e.headers.get("X-exception") is None)
                self.assertTrue(e.headers.get("X-traceback") is None)
        else:
            self.fail('ProtocolError not raised')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()