Python cherrypy 模块,engine() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cherrypy.engine()

项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def _update_config(self):
        _cp_config = {
            ## Server Opts ##
            'log.screen': False,
            'log.error_file': "",
            'log.access_file': "",
            'server.socket_host': self._cfg.listen,
            'server.socket_port': self._cfg.listen_port,
            'server.thread_pool': 10,
            'engine.autoreload.on': False,
            'request.show_tracebacks': False,
            'request.show_mismatched_params': False,
            'response.headers.server': APIServer.SERVER_TOKEN,
            ## Custom Tools Opts ##
            'tools.delete_allow_header.on': True,
            'tools.fix_http_content_length.on': True,
            'tools.unescape_response.on': True,
            ## Error Handling Opts ##
            'error_page.default': error_response,
        }
        cherrypy.config.update(_cp_config)
        cherrypy.log.error_log = LOG
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def start_handler(self):
        """
        Runs at the end of the request processing by calling
        the opened method of the handler.
        """
        request = cherrypy.request
        if not hasattr(request, 'ws_handler'):
            return

        addr = (request.remote.ip, request.remote.port)
        ws_handler = request.ws_handler
        request.ws_handler = None
        delattr(request, 'ws_handler')

        # By doing this we detach the socket from
        # the CherryPy stack avoiding memory leaks
        detach_connection(request.rfile.rfile)

        cherrypy.engine.publish('handle-websocket', ws_handler, addr)
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def start_handler(self):
        """
        Runs at the end of the request processing by calling
        the opened method of the handler.
        """
        request = cherrypy.request
        if not hasattr(request, 'ws_handler'):
            return

        addr = (request.remote.ip, request.remote.port)
        ws_handler = request.ws_handler
        request.ws_handler = None
        delattr(request, 'ws_handler')

        # By doing this we detach the socket from
        # the CherryPy stack avoiding memory leaks
        detach_connection(request.rfile.rfile)

        cherrypy.engine.publish('handle-websocket', ws_handler, addr)
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def merge(base, other):
    """Merge one app config (from a dict, file, or filename) into another.

    If the given config is a filename, it will be appended to
    the list of files to monitor for "autoreload" changes.
    """
    if isinstance(other, text_or_bytes):
        cherrypy.engine.autoreload.files.add(other)

    # Load other into base
    for section, value_map in reprconf.as_dict(other).items():
        if not isinstance(value_map, dict):
            raise ValueError(
                'Application config must include section headers, but the '
                "config you tried to merge doesn't have any sections. "
                'Wrap your config in another dict with paths as section '
                "headers, for example: {'/': config}.")
        base.setdefault(section, {}).update(value_map)
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def _engine_namespace_handler(k, v):
    """Config handler for the "engine" namespace."""
    engine = cherrypy.engine

    if k == 'SIGHUP':
        engine.subscribe('SIGHUP', v)
    elif k == 'SIGTERM':
        engine.subscribe('SIGTERM', v)
    elif '.' in k:
        plugin, attrname = k.split('.', 1)
        plugin = getattr(engine, plugin)
        if attrname == 'on':
            if v and hasattr(getattr(plugin, 'subscribe', None), '__call__'):
                plugin.subscribe()
                return
            elif (
                (not v) and
                hasattr(getattr(plugin, 'unsubscribe', None), '__call__')
            ):
                plugin.unsubscribe()
                return
        setattr(plugin, attrname, v)
    else:
        setattr(engine, k, v)
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def check_site_config_entries_in_app_config(self):
        """Check for mounted Applications that have site-scoped config."""
        for sn, app in iteritems(cherrypy.tree.apps):
            if not isinstance(app, cherrypy.Application):
                continue

            msg = []
            for section, entries in iteritems(app.config):
                if section.startswith('/'):
                    for key, value in iteritems(entries):
                        for n in ('engine.', 'server.', 'tree.', 'checker.'):
                            if key.startswith(n):
                                msg.append('[%s] %s = %s' %
                                           (section, key, value))
            if msg:
                msg.insert(0,
                           'The application mounted at %r contains the '
                           'following config entries, which are only allowed '
                           'in site-wide config. Move them to a [global] '
                           'section and pass them to cherrypy.config.update() '
                           'instead of tree.mount().' % sn)
                warnings.warn(os.linesep.join(msg))
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def _populate_known_types(self):
        b = [x for x in vars(builtins).values()
             if type(x) is type(str)]

        def traverse(obj, namespace):
            for name in dir(obj):
                # Hack for 3.2's warning about body_params
                if name == 'body_params':
                    continue
                vtype = type(getattr(obj, name, None))
                if vtype in b:
                    self.known_config_types[namespace + '.' + name] = vtype

        traverse(cherrypy.request, 'request')
        traverse(cherrypy.response, 'response')
        traverse(cherrypy.server, 'server')
        traverse(cherrypy.engine, 'engine')
        traverse(cherrypy.log, 'log')
项目:LalkaChat    作者:DeForce    | 项目源码 | 文件源码
def run(self):
        while self.running:
            message = s_queue.get()

            if 'timestamp' not in message:
                message['timestamp'] = datetime.datetime.now().isoformat()

            if message['type'] in HISTORY_TYPES:
                cherrypy.engine.publish('add-history', message)
            elif message['type'] == 'command':
                cherrypy.engine.publish('process-command', message['command'], message)

            if message['type'] == 'system_message' and not self.settings['chat']['keys'].get('show_system_msg', True):
                continue

            log.debug("%s", message)
            self.send_message(message, 'chat')
            self.send_message(message, 'gui')
        log.info("Messaging thread stopping")
项目:LalkaChat    作者:DeForce    | 项目源码 | 文件源码
def __init__(self, host, port, root_folder, **kwargs):
        super(self.__class__, self).__init__()
        self.daemon = True
        self.host = host
        self.port = port
        self.root_folder = root_folder
        self.style_settings = kwargs['style_settings']
        self.modules = kwargs.pop('modules')

        self.root_config = None
        self.css_config = None
        self.gui_root_config = None
        self.gui_css_config = None

        self.rest_config = None

        cherrypy.config.update({'server.socket_port': int(self.port), 'server.socket_host': self.host,
                                'engine.autoreload.on': False})
        self.websocket = WebChatPlugin(cherrypy.engine, self.style_settings)
        self.websocket.subscribe()
        cherrypy.tools.websocket = WebSocketTool()
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def update_install():
        ver = version.Version()

        if not core.UPDATE_STATUS or core.UPDATE_STATUS['status'] != u'behind':
            return

        logging.info(u'Running automatic updater.')

        logging.info(u'Currently {} commits behind. Updating to {}.'.format(
                     core.UPDATE_STATUS['behind_count'], core.UPDATE_STATUS['new_hash']))

        core.UPDATING = True

        logging.info(u'Executing update.')
        update = ver.manager.execute_update()
        core.UPDATING = False

        if not update:
            logging.error(u'Update failed.')

        logging.info(u'Update successful, restarting.')
        cherrypy.engine.restart()
        return
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def merge(base, other):
    """Merge one app config (from a dict, file, or filename) into another.

    If the given config is a filename, it will be appended to
    the list of files to monitor for "autoreload" changes.
    """
    if isinstance(other, text_or_bytes):
        cherrypy.engine.autoreload.files.add(other)

    # Load other into base
    for section, value_map in reprconf.as_dict(other).items():
        if not isinstance(value_map, dict):
            raise ValueError(
                "Application config must include section headers, but the "
                "config you tried to merge doesn't have any sections. "
                "Wrap your config in another dict with paths as section "
                "headers, for example: {'/': config}.")
        base.setdefault(section, {}).update(value_map)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def _engine_namespace_handler(k, v):
    """Config handler for the "engine" namespace."""
    engine = cherrypy.engine

    if k == 'SIGHUP':
        engine.subscribe('SIGHUP', v)
    elif k == 'SIGTERM':
        engine.subscribe('SIGTERM', v)
    elif "." in k:
        plugin, attrname = k.split(".", 1)
        plugin = getattr(engine, plugin)
        if attrname == 'on':
            if v and hasattr(getattr(plugin, 'subscribe', None), '__call__'):
                plugin.subscribe()
                return
            elif (
                (not v) and
                hasattr(getattr(plugin, 'unsubscribe', None), '__call__')
            ):
                plugin.unsubscribe()
                return
        setattr(plugin, attrname, v)
    else:
        setattr(engine, k, v)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def test_5_Start_Error(self):
        # If test_3 has not been executed, the server won't be stopped,
        # so we'll have to do it.
        if engine.state != engine.states.EXITING:
            engine.exit()

        # If a process errors during start, it should stop the engine
        # and exit with a non-zero exit code.
        p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'),
                             wait=True)
        p.write_conf(
            extra="""starterror: True
test_case_name: "test_5_Start_Error"
"""
        )
        p.start(imports='cherrypy.test._test_states_demo')
        if p.exit_code == 0:
            self.fail("Process failed to return nonzero exit code.")
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def check_site_config_entries_in_app_config(self):
        """Check for mounted Applications that have site-scoped config."""
        for sn, app in iteritems(cherrypy.tree.apps):
            if not isinstance(app, cherrypy.Application):
                continue

            msg = []
            for section, entries in iteritems(app.config):
                if section.startswith('/'):
                    for key, value in iteritems(entries):
                        for n in ("engine.", "server.", "tree.", "checker."):
                            if key.startswith(n):
                                msg.append("[%s] %s = %s" %
                                           (section, key, value))
            if msg:
                msg.insert(0,
                           "The application mounted at %r contains the "
                           "following config entries, which are only allowed "
                           "in site-wide config. Move them to a [global] "
                           "section and pass them to cherrypy.config.update() "
                           "instead of tree.mount()." % sn)
                warnings.warn(os.linesep.join(msg))
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def _populate_known_types(self):
        b = [x for x in vars(builtins).values()
             if type(x) is type(str)]

        def traverse(obj, namespace):
            for name in dir(obj):
                # Hack for 3.2's warning about body_params
                if name == 'body_params':
                    continue
                vtype = type(getattr(obj, name, None))
                if vtype in b:
                    self.known_config_types[namespace + "." + name] = vtype

        traverse(cherrypy.request, "request")
        traverse(cherrypy.response, "response")
        traverse(cherrypy.server, "server")
        traverse(cherrypy.engine, "engine")
        traverse(cherrypy.log, "log")
项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def run(self, daemon=False):
        if daemon:
            daemon = cherrypy.process.plugins.Daemonizer(cherrypy.engine)
            daemon.subscribe()
        if self._cfg.pid_file:
            pid = cherrypy.process.plugins.PIDFile(cherrypy.engine, self._cfg.pid_file)
            pid.subscribe()
        cherrypy.engine.start()
        cherrypy.engine.block()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def __init__(self, bus: Bus, settings_module: str = 'settings',
                 wsgi_http_logger: type = HTTPLogger,
                 local_settings: Settings = None) -> None:
        """ CherryPy engine plugin to configure and mount
        the Django application onto the CherryPy server.
        """
        plugins.SimplePlugin.__init__(self, bus)
        self.settings_module = settings_module
        self.wsgi_http_logger = wsgi_http_logger
        self.crawler_settings = local_settings
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def start(self) -> None:
        """ When the bus starts, the plugin is also started
        and we load the Django application. We then mount it on
        the CherryPy engine for serving as a WSGI application.
        We let CherryPy serve the application's static files.
        """
        from frontend import settings
        settings.crawler_settings = self.crawler_settings

        os.environ['DJANGO_SETTINGS_MODULE'] = self.settings_module

        from django.core.wsgi import get_wsgi_application

        def corsstaticdir(
                section: str, dir: str, root: str = '', match: str = '',
                content_types: None = None, index: str = '', debug: bool = False
        ) -> bool:
            cherrypy.response.headers['Access-Control-Allow-Origin'] = '*'
            return static.staticdir(section, dir, root, match, content_types, index, debug)

        cherrypy.tree.graft(self.wsgi_http_logger(get_wsgi_application(), self.crawler_settings))

        settings.WORKERS.start_workers(settings.CRAWLER_SETTINGS)

        tool = cherrypy._cptools.HandlerTool(corsstaticdir)
        static_handler = tool.handler(
            section="/",
            dir=os.path.split(settings.STATIC_ROOT)[1],
            root=os.path.abspath(os.path.split(settings.STATIC_ROOT)[0])
        )
        cherrypy.tree.mount(static_handler, settings.STATIC_URL)

        media_handler = tool.handler(
            section="/",
            dir=os.path.split(settings.MEDIA_ROOT)[1],
            root=os.path.abspath(os.path.split(settings.MEDIA_ROOT)[0])
        )
        cherrypy.tree.mount(media_handler, settings.MEDIA_URL)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def reset(self, token=None, username=None):
        if cherrypy.engine.publish('token-verify', token, username).pop():
            cherrypy.session['token'] = token
            cherrypy.session['username'] = username
            return {'ok': True}
        else:
            return {'ok': False}
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def change(self, **params):
        engine = cherrypy.engine
        if cherrypy.session.get('auth', False):
            user = cherrypy.session['user']
            oldpasswd = cherrypy.request.params.get('oldpassword')
            newpasswd = cherrypy.request.params.get('newpassword')

            try:
                user.change_password(oldpasswd, newpasswd)

                return {'ok': True}
            except InvalidCredentials:
                return {
                    'ok': False,
                    'error': 'Current password invalid.'
                }
            except UserModelException:
                return {
                    'ok': False,
                    'error': 'Unknown system error.  Contact your Systems Administrator.'
                }

        elif cherrypy.session.get('token', False):
            cherrypy.session['user'] = User(cherrypy.session['username'])
            newpassword = cherrypy.request.params.get('newpassword')

            try:
                cherrypy.session['user'].set_password(newpassword)
                return {'ok': True}
            except UserModelException:
                return {
                    'ok': False,
                    'error': 'Unable to change your password. Try again later.'
                }
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def setUpModule():
    cherrypy.tree.mount(Root(), '/')
    cherrypy.engine.start()
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def tearDownModule():
    cherrypy.engine.exit()
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def setUpClass(cls):
        cls.mockldap = MockLdap(LDAPTestDirectory.directory)
        cherrypy.engine.stop()
        cherrypy.engine.ldap = LDAPEnginePlugin(
            bus=cherrypy.engine,
            uri="ldap://localhost/",
            base_dn="ou=example,o=test",
            bind_dn="cn=admin,ou=example,o=test",
            bind_pw="ldaptest",
            tls=True,
            no_verify=True)
        cherrypy.engine.ldap.subscribe()
        cherrypy.engine.start()
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def tearDownClass(cls):
        cherrypy.engine.ldap.unsubscribe()
        del cherrypy.engine.ldap
        del cls.mockldap
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_bad_base_dn(self):
        with self.assertRaises(cherrypy.process.wspbus.ChannelFailures):
            cherrypy.engine.publish('ldap-auth', 'alice,ou==bad,,', 'alicepw')
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_auth(self):
        auth = cherrypy.engine.publish('ldap-auth', 'alice', 'alicepw').pop()
        self.assertEqual(auth, True)
        with self.assertRaises(cherrypy.process.wspbus.ChannelFailures):
            cherrypy.engine.publish('ldap-auth', 'alice', 'bobpw')
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_get_user_by_uid(self):
        alice = cherrypy.engine.publish('ldap-user-by-uid', 'alice').pop()
        self.assertEqual(alice['uid'], 'alice')
        nouser = cherrypy.engine.publish('ldap-user-by-uid', 'nouser').pop()
        self.assertEqual(nouser, {})
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_set_password(self):
        username = 'bob'
        cherrypy.engine.publish('ldap-set-password', username, self.hashes)
        bob = cherrypy.engine.publish('ldap-user-by-uid', 'bob').pop()
        self.assertEqual(self.hashes['userPassword'] in bob['userPassword'], True)
        self.assertEqual(self.hashes['sambaNTPassword'] in bob['sambaNTPassword'], True)
        self.assertEqual(cherrypy.engine.publish('ldap-auth', 'bob', 'changeme'), [True])
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_change_password(self):
        cherrypy.engine.publish('ldap-change-password', 'bob', 'bobpw', self.hashes)
        bob = cherrypy.engine.publish('ldap-user-by-uid', 'bob').pop()
        self.assertEqual(self.hashes['userPassword'] in bob['userPassword'], True)
        self.assertEqual(self.hashes['sambaNTPassword'] in bob['sambaNTPassword'], True)
        self.assertEqual(cherrypy.engine.publish('ldap-auth', 'bob', 'changeme'), [True])
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_change_passwd_bad_creds(self):
        with self.assertRaises(cherrypy.process.wspbus.ChannelFailures):
            cherrypy.engine.publish('ldap-change-password', 'bob', 'alicepw', self.hashes)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_change_passwd_bad_dn(self):
        with self.assertRaises(cherrypy.process.wspbus.ChannelFailures):
            cherrypy.engine.publish('ldap-change-password', 'bob,,', 'alicepw', self.hashes)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_add_sshpubkey(self):
        # key validation happens at the model level; using a short key for brevity
        sshpubkey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAo1d01QraTlMVSsbxNrRFi9wrf+M7Q== fakekey'
        cherrypy.engine.publish('ldap-add-key', 'bob', sshpubkey)
        bob = cherrypy.engine.publish('ldap-user-by-uid', 'bob').pop()
        self.assertEqual(sshpubkey in bob['sshPublicKey'], True)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def setUpClass(cls):
        cherrypy.engine.stop()
        cherrypy.engine.passwd = PasswordEnginePlugin(cherrypy.engine)
        cherrypy.engine.passwd.subscribe()
        cherrypy.engine.start()
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def tearDownClass(cls):
        cherrypy.engine.passwd.unsubscribe()
        del cherrypy.engine.passwd
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_plugin(self, mock_random):
        print cherrypy.engine.publish
        hashes = cherrypy.engine.publish('password-hash', 'changeme').pop()
        self.assertIsInstance(hashes, dict)
        self.assertEqual(hashes['userPassword'], '{SSHA}6QBuyak4WbsUzcqUKx0yB74RFUFvDbys')
        self.assertEqual(hashes['sambaNTPassword'], '6597D9FE8469E21D840E2CBFF8D43C8B')
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def setUpClass(cls):
        cherrypy.engine.stop()
        cherrypy.engine.token = TokenEnginePlugin(cherrypy.engine, secret="testsecret")
        cherrypy.engine.token.subscribe()
        cherrypy.engine.start()
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def tearDownClass(cls):
        cherrypy.engine.token.unsubscribe()
        del cherrypy.engine.token
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_valid_token(self, mock_epoch):
        valid = cherrypy.engine.publish('token-verify', self.valid_token, self.data).pop()
        self.assertEqual(valid, True)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_invalid_token(self, mock_epoch):
        invalid = cherrypy.engine.publish('token-verify', self.invalid_token, self.data).pop()
        self.assertEqual(invalid, False)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_invalid_data(self, mock_epoch):
        invalid = cherrypy.engine.publish('token-verify', self.valid_token, 'baddata').pop()
        self.assertEqual(invalid, False)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def test_expired_token(self, mock_epoch=15999.0):
        invalid = cherrypy.engine.publish('token-verify', self.valid_token, self.data).pop()
        self.assertEqual(invalid, False)
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def run(self):
        engine = cherrypy.engine

        if hasattr(engine, "signal_handler"):
            engine.signal_handler.subscribe()

        if hasattr(engine, "console_control_handler"):
            engine.console_control_handler.subscribe()

        engine.start()
        engine.block()
项目:arduino-ciao-meteor-ddp-connector    作者:andrea689    | 项目源码 | 文件源码
def cleanup(self):
        """
        Terminate all connections and clear the pool. Executed when the engine stops.
        """
        self.manager.close_all()
        self.manager.stop()
        self.manager.join()
项目:of    作者:OptimalBPM    | 项目源码 | 文件源码
def start_server():
    """
        Initialize and start the server.
    """
    if hasattr(cherrypy.engine, 'subscribe'):  # CherryPy >= 3.1
        pass
    else:
        raise Exception("This application requires CherryPy >= 3.1 or higher.")
        # cherrypy.engine.on_stop_engine_list.append(_save_data)

    # Some global configuration

    cherrypy.config.update({"server.ssl_certificate": os.path.join(os.path.dirname(__file__), "../cert.pem"),
                            "server.ssl_private_key": os.path.join(os.path.dirname(__file__), "../privkey.pem")})

    # cherrypy.config.update(config=os.path.join(script_dir, "config.ini"))
    app = cherrypy.tree.mount(Server(), '/',
                              {'/':
                                  {
                                      "tools.staticdir.root": os.path.abspath(
                                          os.path.join(os.path.dirname(__file__), "static")),
                                      "tools.decode.on": True,
                                      "tools.trailing_slash.on": True,
                                      "tools.staticdir.on": True,
                                      "tools.staticdir.index": "index.html",
                                      "tools.staticdir.dir": ""
                                  }
                              })

    if hasattr(cherrypy.engine, "signal_handler"):
        cherrypy.engine.signal_handler.subscribe()
    if hasattr(cherrypy.engine, "console_control_handler"):
        cherrypy.engine.console_control_handler.subscribe()
    cherrypy.engine.start()
    cherrypy.engine.block()
项目:wptagent    作者:WPO-Foundation    | 项目源码 | 文件源码
def cleanup(self):
        """
        Terminate all connections and clear the pool. Executed when the engine stops.
        """
        self.manager.close_all()
        self.manager.stop()
        self.manager.join()
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def run(self):
        wsgi_app_https = ReverseProxied(ProxyFix(wsgi.WSGIPathInfoDispatcher({'/': app})))
        wsgi_app_http = ReverseProxied(ProxyFix(wsgi.WSGIPathInfoDispatcher({'/': adapter_app})))
        cherrypy.server.unsubscribe()
        cherrypy.config.update({'environment': 'production'})

        bind_addr = (self.config['listen'], self.config['port'])

        server_https = wsgi.WSGIServer(bind_addr=bind_addr,
                                       wsgi_app=wsgi_app_https)
        server_https.ssl_adapter = http_helpers.ssl_adapter(self.config['certificate'],
                                                            self.config['private_key'])
        ServerAdapter(cherrypy.engine, server_https).subscribe()
        logger.debug('WSGIServer starting... uid: %s, listen: %s:%s',
                     os.getuid(), bind_addr[0], bind_addr[1])

        for route in http_helpers.list_routes(app):
            logger.debug(route)

        if self.adapter_config['enabled']:
            bind_addr = (self.adapter_config['listen'], self.adapter_config['port'])
            server_adapter = wsgi.WSGIServer(bind_addr=bind_addr,
                                             wsgi_app=wsgi_app_http)
            ServerAdapter(cherrypy.engine, server_adapter).subscribe()
            logger.debug('WSGIServer starting... uid: %s, listen: %s:%s',
                         os.getuid(), bind_addr[0], bind_addr[1])

            for route in http_helpers.list_routes(adapter_app):
                logger.debug(route)

        else:
            logger.debug('Adapter server is disabled')

        try:
            cherrypy.engine.start()
            cherrypy.engine.wait(states.EXITING)
        except KeyboardInterrupt:
            logger.warning('Stopping xivo-ctid-ng: KeyboardInterrupt')
            cherrypy.engine.exit()
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def stop(self):
        cherrypy.engine.exit()
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def join(self):
        if cherrypy.engine.state == states.EXITING:
            cherrypy.engine.block()
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def load(self):
        """Copy stored session data into this session instance."""
        data = self._load()
        # data is either None or a tuple (session_data, expiration_time)
        if data is None or data[1] < self.now():
            if self.debug:
                cherrypy.log('Expired session %r, flushing data.' % self.id,
                             'TOOLS.SESSIONS')
            self._data = {}
        else:
            if self.debug:
                cherrypy.log('Data loaded for session %r.' % self.id,
                             'TOOLS.SESSIONS')
            self._data = data[0]
        self.loaded = True

        # Stick the clean_thread in the class, not the instance.
        # The instances are created and destroyed per-request.
        cls = self.__class__
        if self.clean_freq and not cls.clean_thread:
            # clean_up is an instancemethod and not a classmethod,
            # so that tool config can be accessed inside the method.
            t = cherrypy.process.plugins.Monitor(
                cherrypy.engine, self.clean_up, self.clean_freq * 60,
                name='Session cleanup')
            t.subscribe()
            cls.clean_thread = t
            t.start()
            if self.debug:
                cherrypy.log('Started cleanup thread.', 'TOOLS.SESSIONS')