Python cherrypy 模块,lib() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用cherrypy.lib()

项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def default(self, *vpath, **params):
        rpcparams, rpcmethod = _xmlrpc.process_body()

        subhandler = self
        for attr in str(rpcmethod).split('.'):
            subhandler = getattr(subhandler, attr, None)

        if subhandler and getattr(subhandler, 'exposed', False):
            body = subhandler(*(vpath + rpcparams), **params)

        else:
            # https://github.com/cherrypy/cherrypy/issues/533
            # if a method is not found, an xmlrpclib.Fault should be returned
            # raising an exception here will do that; see
            # cherrypy.lib.xmlrpcutil.on_error
            raise Exception('method "%s" is not supported' % attr)

        conf = cherrypy.serving.request.toolmaps['tools'].get('xmlrpc', {})
        _xmlrpc.respond(body,
                        conf.get('encoding', 'utf-8'),
                        conf.get('allow_none', 0))
        return cherrypy.serving.response.body
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def default(self, *vpath, **params):
        rpcparams, rpcmethod = _xmlrpc.process_body()

        subhandler = self
        for attr in str(rpcmethod).split('.'):
            subhandler = getattr(subhandler, attr, None)

        if subhandler and getattr(subhandler, "exposed", False):
            body = subhandler(*(vpath + rpcparams), **params)

        else:
            # https://github.com/cherrypy/cherrypy/issues/533
            # if a method is not found, an xmlrpclib.Fault should be returned
            # raising an exception here will do that; see
            # cherrypy.lib.xmlrpcutil.on_error
            raise Exception('method "%s" is not supported' % attr)

        conf = cherrypy.serving.request.toolmaps['tools'].get("xmlrpc", {})
        _xmlrpc.respond(body,
                        conf.get('encoding', 'utf-8'),
                        conf.get('allow_none', 0))
        return cherrypy.serving.response.body
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def XMLRPCDispatcher(next_dispatcher=Dispatcher()):
    from cherrypy.lib import xmlrpcutil

    def xmlrpc_dispatch(path_info):
        path_info = xmlrpcutil.patched_path(path_info)
        return next_dispatcher(path_info)
    return xmlrpc_dispatch
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def hooks_namespace(k, v):
    """Attach bare hooks declared in config."""
    # Use split again to allow multiple hooks for a single
    # hookpoint per path (e.g. "hooks.before_handler.1").
    # Little-known fact you only get from reading source ;)
    hookpoint = k.split('.', 1)[0]
    if isinstance(v, text_or_bytes):
        v = cherrypy.lib.attributes(v)
    if not isinstance(v, Hook):
        v = Hook(v)
    cherrypy.serving.request.hooks[hookpoint].append(v)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def XMLRPCDispatcher(next_dispatcher=Dispatcher()):
    from cherrypy.lib import xmlrpcutil

    def xmlrpc_dispatch(path_info):
        path_info = xmlrpcutil.patched_path(path_info)
        return next_dispatcher(path_info)
    return xmlrpc_dispatch
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def hooks_namespace(k, v):
    """Attach bare hooks declared in config."""
    # Use split again to allow multiple hooks for a single
    # hookpoint per path (e.g. "hooks.before_handler.1").
    # Little-known fact you only get from reading source ;)
    hookpoint = k.split(".", 1)[0]
    if isinstance(v, text_or_bytes):
        v = cherrypy.lib.attributes(v)
    if not isinstance(v, Hook):
        v = Hook(v)
    cherrypy.serving.request.hooks[hookpoint].append(v)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def setup_server():
        class Root:

            @cherrypy.expose
            def index(self):
                return "This is public."

        class DigestProtected:

            @cherrypy.expose
            def index(self):
                return "Hello %s, you've been authorized." % (
                    cherrypy.request.login)

        def fetch_users():
            return {'test': 'test'}

        get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users())
        conf = {'/digest': {'tools.auth_digest.on': True,
                            'tools.auth_digest.realm': 'localhost',
                            'tools.auth_digest.get_ha1': get_ha1,
                            'tools.auth_digest.key': 'a565c27146791cfb',
                            'tools.auth_digest.debug': 'True'}}

        root = Root()
        root.digest = DigestProtected()
        cherrypy.tree.mount(root, config=conf)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def test_1_Ram_Concurrency(self):
        self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession')
        self._test_Concurrency()
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def test_2_File_Concurrency(self):
        self.getPage('/set_session_cls/cherrypy.lib.sessions.FileSession')
        self._test_Concurrency()
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def finalize(self):
        """Transform headers (and cookies) into self.header_list. (Core)"""
        try:
            code, reason, _ = httputil.valid_status(self.status)
        except ValueError:
            raise cherrypy.HTTPError(500, sys.exc_info()[1].args[0])

        headers = self.headers

        self.status = '%s %s' % (code, reason)
        self.output_status = ntob(str(code), 'ascii') + \
            ntob(' ') + headers.encode(reason)

        if self.stream:
            # The upshot: wsgiserver will chunk the response if
            # you pop Content-Length (or set it explicitly to None).
            # Note that lib.static sets C-L to the file's st_size.
            if dict.get(headers, 'Content-Length') is None:
                dict.pop(headers, 'Content-Length', None)
        elif code < 200 or code in (204, 205, 304):
            # "All 1xx (informational), 204 (no content),
            # and 304 (not modified) responses MUST NOT
            # include a message-body."
            dict.pop(headers, 'Content-Length', None)
            self.body = ntob('')
        else:
            # Responses which are not streamed should have a Content-Length,
            # but allow user code to set Content-Length if desired.
            if dict.get(headers, 'Content-Length') is None:
                content = self.collapse_body()
                dict.__setitem__(headers, 'Content-Length', len(content))

        # Transform our header dict into a list of tuples.
        self.header_list = h = headers.output()

        cookie = self.cookie.output()
        if cookie:
            for line in cookie.split('\r\n'):
                name, value = line.split(': ', 1)
                if isinstance(name, six.text_type):
                    name = name.encode('ISO-8859-1')
                if isinstance(value, six.text_type):
                    value = headers.encode(value)
                h.append((name, value))
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def finalize(self):
        """Transform headers (and cookies) into self.header_list. (Core)"""
        try:
            code, reason, _ = httputil.valid_status(self.status)
        except ValueError:
            raise cherrypy.HTTPError(500, sys.exc_info()[1].args[0])

        headers = self.headers

        self.status = "%s %s" % (code, reason)
        self.output_status = ntob(str(code), 'ascii') + \
            ntob(" ") + headers.encode(reason)

        if self.stream:
            # The upshot: wsgiserver will chunk the response if
            # you pop Content-Length (or set it explicitly to None).
            # Note that lib.static sets C-L to the file's st_size.
            if dict.get(headers, 'Content-Length') is None:
                dict.pop(headers, 'Content-Length', None)
        elif code < 200 or code in (204, 205, 304):
            # "All 1xx (informational), 204 (no content),
            # and 304 (not modified) responses MUST NOT
            # include a message-body."
            dict.pop(headers, 'Content-Length', None)
            self.body = ntob("")
        else:
            # Responses which are not streamed should have a Content-Length,
            # but allow user code to set Content-Length if desired.
            if dict.get(headers, 'Content-Length') is None:
                content = self.collapse_body()
                dict.__setitem__(headers, 'Content-Length', len(content))

        # Transform our header dict into a list of tuples.
        self.header_list = h = headers.output()

        cookie = self.cookie.output()
        if cookie:
            for line in cookie.split("\n"):
                if line.endswith("\r"):
                    # Python 2.4 emits cookies joined by LF but 2.5+ by CRLF.
                    line = line[:-1]
                name, value = line.split(": ", 1)
                if isinstance(name, six.text_type):
                    name = name.encode("ISO-8859-1")
                if isinstance(value, six.text_type):
                    value = headers.encode(value)
                h.append((name, value))
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def _setup_server(cls, supervisor, conf):
        v = sys.version.split()[0]
        log.info("Python version used to run this test script: %s" % v)
        log.info("CherryPy version: %s" % cherrypy.__version__)
        if supervisor.scheme == "https":
            ssl = " (ssl)"
        else:
            ssl = ""
        log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl))
        log.info("PID: %s" % os.getpid())

        cherrypy.server.using_apache = supervisor.using_apache
        cherrypy.server.using_wsgi = supervisor.using_wsgi

        if sys.platform[:4] == 'java':
            cherrypy.config.update({'server.nodelay': False})

        if isinstance(conf, text_or_bytes):
            parser = cherrypy.lib.reprconf.Parser()
            conf = parser.dict_from_file(conf).get('global', {})
        else:
            conf = conf or {}
        baseconf = conf.copy()
        baseconf.update({'server.socket_host': supervisor.host,
                         'server.socket_port': supervisor.port,
                         'server.protocol_version': supervisor.protocol,
                         'environment': "test_suite",
                         })
        if supervisor.scheme == "https":
            #baseconf['server.ssl_module'] = 'builtin'
            baseconf['server.ssl_certificate'] = serverpem
            baseconf['server.ssl_private_key'] = serverpem

        # helper must be imported lazily so the coverage tool
        # can run against module-level statements within cherrypy.
        # Also, we have to do "from cherrypy.test import helper",
        # exactly like each test module does, because a relative import
        # would stick a second instance of webtest in sys.modules,
        # and we wouldn't be able to globally override the port anymore.
        if supervisor.scheme == "https":
            webtest.WebCase.HTTP_CONN = HTTPSConnection
        return baseconf
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def test_7_session_cookies(self):
        self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession')
        self.getPage('/clear')
        self.getPage('/session_cookie')
        # grab the cookie ID
        cookie_parts = dict([p.strip().split('=')
                            for p in self.cookies[0][1].split(";")])
        # Assert there is no 'expires' param
        self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
        id1 = cookie_parts['temp']
        self.assertEqual(copykeys(sessions.RamSession.cache), [id1])

        # Send another request in the same "browser session".
        self.getPage('/session_cookie', self.cookies)
        cookie_parts = dict([p.strip().split('=')
                            for p in self.cookies[0][1].split(";")])
        # Assert there is no 'expires' param
        self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
        self.assertBody(id1)
        self.assertEqual(copykeys(sessions.RamSession.cache), [id1])

        # Simulate a browser close by just not sending the cookies
        self.getPage('/session_cookie')
        # grab the cookie ID
        cookie_parts = dict([p.strip().split('=')
                            for p in self.cookies[0][1].split(";")])
        # Assert there is no 'expires' param
        self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
        # Assert a new id has been generated...
        id2 = cookie_parts['temp']
        self.assertNotEqual(id1, id2)
        self.assertEqual(set(sessions.RamSession.cache.keys()),
                         set([id1, id2]))

        # Wait for the session.timeout on both sessions
        time.sleep(2.5)
        cache = copykeys(sessions.RamSession.cache)
        if cache:
            if cache == [id2]:
                self.fail("The second session did not time out.")
            else:
                self.fail("Unknown session id in cache: %r", cache)