Python logging 模块,config() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.config()

项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def init(self):
        import DataStore, readconf, logging, sys
        self.conf.update({ "debug": None, "logging": None })
        self.conf.update(DataStore.CONFIG_DEFAULTS)

        args, argv = readconf.parse_argv(self.argv, self.conf, strict=False)
        if argv and argv[0] in ('-h', '--help'):
            print self.usage()
            return None, []

        logging.basicConfig(
            stream=sys.stdout, level=logging.DEBUG, format="%(message)s")
        if args.logging is not None:
            import logging.config as logging_config
            logging_config.dictConfig(args.logging)

        store = DataStore.new(args)

        return store, argv

# Abstract hex-binary conversions for eventual porting to Python 3.
项目:flask    作者:ljxxcaijing    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:concierge    作者:9seconds    | 项目源码 | 文件源码
def configure_logging(debug=False, verbose=True, stderr=True):
    config = copy.deepcopy(LOG_CONFIG)

    for handler in config["handlers"].values():
        if verbose:
            handler["level"] = "INFO"
        if debug:
            handler["level"] = "DEBUG"

    if verbose:
        config["handlers"]["stderr"]["formatter"] = "verbose"
    if debug:
        config["handlers"]["stderr"]["formatter"] = "debug"

    if stderr:
        config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr")

    logging.config.dictConfig(config)
项目:zlktqa    作者:NunchakusHuang    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:PowerMeter-Reader    作者:lucab85    | 项目源码 | 文件源码
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'):
        path = default_path
        self.logconf = None
        value = os.getenv(env_key, None)
        if value:
            path = value
        if os.path.exists(path):
            with open(path, 'rt') as f:
                config = json.load(f)
            self.logconf = logging.config.dictConfig(config)
        elif os.path.exists(path.replace("../", "")):
            with open(path.replace("../", ""), 'rt') as f:
                config = json.load(f)
                self._changePath(config["handlers"])
            self.logconf = logging.config.dictConfig(config)
        else:
            print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path)
            self.logconf = logging.basicConfig(level=default_level)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:BookCloud    作者:livro-aberto    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:sharkfacts    作者:andrewthetechie    | 项目源码 | 文件源码
def _get_config(key, default_value=None, required=False):
    """
    Gets config from environment variables
    Will return default_value if key is not in environment variables
    :param key: the key of the env variable you are looking for
    :param default_value: value to return if key not in os.environ.
    :param required: if true and key is not set, will raise InvalidConfigException
    :return: os.environ[key] if key in os.environ els default_value
    :exception InvalidConfigException - raised when a required config key is not properly set
    """
    if required and key not in os.environ:
        raise InvalidConfigException("Invalid ENV variable. Please check {0}".format(key))
    to_return = os.environ.get(key, default_value)
    if isinstance(to_return, basestring):
        try:
            to_return = _string_to_bool(to_return)
        except NonBooleanStringException:
            pass
    os.environ[key] = str(to_return)
    return to_return
项目:Round1    作者:general-ai-challenge    | 项目源码 | 文件源码
def setup_logging(
    default_path='logging.ini',
    default_level=logging.INFO,
    env_key='LOG_CFG'
):
    """Setup logging configuration

    """
    path = default_path
    value = os.getenv(env_key, None)
    if value:
        path = value
    if os.path.exists(path):
        logging.config.fileConfig(default_path)
    else:
        logging.basicConfig(level=default_level)
项目:IntegraTI-API    作者:discentes-imd    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def configure_logging(logging_config, logging_settings):
    if not sys.warnoptions:
        # Route warnings through python logging
        logging.captureWarnings(True)
        # RemovedInNextVersionWarning is a subclass of DeprecationWarning which
        # is hidden by default, hence we force the "default" behavior
        warnings.simplefilter("default", RemovedInNextVersionWarning)

    if logging_config:
        # First find the logging configuration function ...
        logging_config_func = import_string(logging_config)

        logging.config.dictConfig(DEFAULT_LOGGING)

        # ... then invoke it with the logging settings
        if logging_settings:
            logging_config_func(logging_settings)
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:bit    作者:codesmart-co    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _applyConfigurationToValues(self, parser, config, values):
        for name, value, filename in config:
            if name in option_blacklist:
                continue
            try:
                self._processConfigValue(name, value, values, parser)
            except NoSuchOptionError, exc:
                self._file_error(
                    "Error reading config file %r: "
                    "no such option %r" % (filename, exc.name),
                    name=name, filename=filename)
            except optparse.OptionValueError, exc:
                msg = str(exc).replace('--' + name, repr(name), 1)
                self._file_error("Error reading config file %r: "
                                 "%s" % (filename, msg),
                                 name=name, filename=filename)
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:microflack_messages    作者:miguelgrinberg    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:sitrep    作者:bittorrent    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:ud-catalog    作者:aviaryan    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:invalidroutesreporter    作者:pierky    | 项目源码 | 文件源码
def process_alert(self, route):
        recipients = list(set(route["recipient_ids"]))

        logging.debug("Processing alert for {}, recipients {}".format(
            str(route), str(recipients)
        ))

        if "*" in self.data:
            recipients.append("*")

        for recipient_id in recipients:
            if not recipient_id in self.data:
                continue

            recipient = self.data[recipient_id]

            if len(recipient["routes"]) < recipient["config"]["max_routes"]:
                recipient["routes"].append(route)
            else:
                logging.debug("Discarding route {} for {}: buffer full ".format(
                    route["prefix"], recipient_id
                ))
项目:invalidroutesreporter    作者:pierky    | 项目源码 | 文件源码
def _flush_recipient(self, recipient):
        if not isinstance(recipient["config"]["info"]["email"], list):
            email_addresses = [recipient["config"]["info"]["email"]]
        else:
            email_addresses = list(set(recipient["config"]["info"]["email"]))

        logging.info("Sending email to {} ({}) for {}".format(
            recipient["id"],
            ", ".join(email_addresses),
            ", ".join([route["prefix"] for route in recipient["routes"]])
        ))

        data = {
            "id": recipient["id"],
            "from_addr": self.from_addr,
            "subject": self.subject,
            "routes_list": self._format_list_of_routes(recipient["routes"])
        }
        msg = MIMEText(self.template.format(**data))
        msg['Subject'] = self.subject
        msg['From'] = self.from_addr
        msg['To'] = ", ".join(email_addresses)

        self._send_email(self.from_addr, email_addresses, msg.as_string())
项目:invalidroutesreporter    作者:pierky    | 项目源码 | 文件源码
def process_alert(self, route):
        recipients = list(set(route["recipient_ids"]))

        logging.debug("Processing alert for {}, recipients {}".format(
            str(route), str(recipients)
        ))

        if "*" in self.data:
            recipients.append("*")

        for recipient_id in recipients:
            if not recipient_id in self.data:
                continue

            recipient = self.data[recipient_id]

            if len(recipient["routes"]) < recipient["config"]["max_routes"]:
                recipient["routes"].append(route)
            else:
                logging.debug("Discarding route {} for {}: buffer full ".format(
                    route["prefix"], recipient_id
                ))
项目:invalidroutesreporter    作者:pierky    | 项目源码 | 文件源码
def _flush_recipient(self, recipient):
        if not isinstance(recipient["config"]["info"]["email"], list):
            email_addresses = [recipient["config"]["info"]["email"]]
        else:
            email_addresses = list(set(recipient["config"]["info"]["email"]))

        logging.info("Sending email to {} ({}) for {}".format(
            recipient["id"],
            ", ".join(email_addresses),
            ", ".join([route["prefix"] for route in recipient["routes"]])
        ))

        data = {
            "id": recipient["id"],
            "from_addr": self.from_addr,
            "subject": self.subject,
            "routes_list": self._format_list_of_routes(recipient["routes"])
        }
        msg = MIMEText(self.template.format(**data))
        msg['Subject'] = self.subject
        msg['From'] = self.from_addr
        msg['To'] = ", ".join(email_addresses)

        self._send_email(self.from_addr, email_addresses, msg.as_string())
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:akamatsu    作者:rmed    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:flask-microservices-users    作者:realpython    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:podigger    作者:perna    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def setup_logging(verbosity_level, save_debug_log):

    logging.captureWarnings(True)

    # if config['logging']['config_file']:
    #     # Logging config from file must be read before other handlers are
    #     # added. If not, the other handlers will have no effect.
    #     try:
    #         path = config['logging']['config_file']
    #         logging.config.fileConfig(path, disable_existing_loggers=False)
    #     except Exception as e:
    #         # Catch everything as logging does not specify what can go wrong.
    #         logger.error('Loading logging config %r failed. %s', path, e)

    setup_console_logging(verbosity_level)
    if save_debug_log:
        print('Here we would call setup_debug_logging_to_file(config)')
        # setup_debug_logging_to_file(config)

    _delayed_handler.release()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def setup_logger():
    from colorlog import ColoredFormatter
    from gettext import gettext as _  # noqa

    try:
        """Return a logging obj with a default ColoredFormatter."""
        formatter = ColoredFormatter(
            "%(asctime)s %(name)-12s (%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s (%(funcName)-5s) %(message_log_color)s%(message)s",  # noqa
            datefmt=None,
            reset=True,
            log_colors={
                'DEBUG': 'cyan',
                'INFO': 'green',
                'WARNING': 'yellow',
                'ERROR': 'red',
                'CRITICAL': 'bold_red',
                'TRACE': 'purple'
            },
            secondary_log_colors={
                'message': {
                    'ERROR': 'red',
                    'CRITICAL': 'red',
                    'DEBUG': 'yellow',
                    'INFO': 'yellow,bg_blue'
                }
            },
            style='%'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        logging.root.setLevel(logging.DEBUG)
    except ImportError:
        # No color available, use default config
        logging.basicConfig(format='%(levelname)s: %(message)s')
        logging.warn("Disabling color, you really want to install colorlog.")
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:1ibrary-gzhu    作者:1ibrary    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:storperf    作者:opnfv    | 项目源码 | 文件源码
def setup_logging(default_path='logging.json',
                  default_level=logging.INFO, env_key='LOG_CFG'):
    """Setup logging configuration
    """

    path = default_path
    value = os.getenv(env_key, None)
    if value:
        path = value
    if os.path.exists(path):
        with open(path, 'rt') as f:
            config = json.load(f)
        logging.config.dictConfig(config)
    else:
        logging.basicConfig(level=default_level)

    socketHandler = logging.handlers.DatagramHandler(
        'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT)
    rootLogger = logging.getLogger('')
    rootLogger.addHandler(socketHandler)
项目:python-mysql-binlog-pubsub    作者:tarzanjw    | 项目源码 | 文件源码
def main(argv):
    """ MySQL binlog to Google Pub/Sub entry point
    Args:
        argv (list): list of command line arguments
    """
    args = _setup_arg_parser(argv)
    conf_file = args.conf

    if conf_file:
        os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file

    if args.logconf:
        logging.config.fileConfig(args.logconf, disable_existing_loggers=False)
    else:
        logging.basicConfig()

    if args.loglevel:
        logging.root.setLevel(logging.getLevelName(args.loglevel.upper()))

    import mysqlbinlog2gpubsub
    mysqlbinlog2gpubsub.start_publishing()
项目:as_mais_lidas    作者:nandopedrosa    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:blitznet    作者:dvornikita    | 项目源码 | 文件源码
def extract_batch(dataset, config):
    with tf.device("/cpu:0"):
        bboxer = PriorBoxGrid(config)
        data_provider = slim.dataset_data_provider.DatasetDataProvider(
            dataset, num_readers=2,
            common_queue_capacity=512, common_queue_min=32)
        if args.segment:
            im, bbox, gt, seg = data_provider.get(['image', 'object/bbox', 'object/label',
                                                   'image/segmentation'])
        else:
            im, bbox, gt = data_provider.get(['image', 'object/bbox', 'object/label'])
            seg = tf.expand_dims(tf.zeros(tf.shape(im)[:2]), 2)
        im = tf.to_float(im)/255
        bbox = yxyx_to_xywh(tf.clip_by_value(bbox, 0.0, 1.0))

        im, bbox, gt, seg = data_augmentation(im, bbox, gt, seg, config)
        inds, cats, refine = bboxer.encode_gt_tf(bbox, gt)

        return tf.train.shuffle_batch([im, inds, refine, cats, seg],
                                      args.batch_size, 2048, 64, num_threads=4)
项目:league    作者:massgo    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:flask_tutorial    作者:python-sorocaba    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()
项目:baka    作者:baka-framework    | 项目源码 | 文件源码
def __init__(self, pathname, **settings):
        """initial config for singleton baka framework

        :param import_name: the name of the application package
        :param settings: *optional dict settings for pyramid configuration
        """
        self.import_name = pathname
        self.settings = settings
        self.__include = {}
        self.__trafaret = trafaret_yaml

        # Only set up a default log handler if the
        # end-user application didn't set anything up.
        if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')):
            formatter = logging.Formatter(logging_format)
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
            log.addHandler(handler)
            log.setLevel(logging.INFO)
项目:baka    作者:baka-framework    | 项目源码 | 文件源码
def resource(self, path, **kwargs):
        def decorator(wrapped, depth=1):
            route_name = kwargs.pop("route_name", None)
            route_name = route_name or wrapped.__name__
            route_name = kwargs.pop("name", route_name)
            wrapped.route_name = route_name

            def callback(scanner, name, cls):
                config = scanner.config.with_package(info.module)
                config.add_route(route_name, path, factory=cls)
                config.add_view(default_options_view, route_name=route_name,
                                request_method='OPTIONS', permission=NO_PERMISSION_REQUIRED)
                config.add_view(unsupported_method_view, route_name=route_name, renderer='json')

            for method in METHODS:
                setattr(wrapped, method, type('ViewDecorator%s' % method,
                                              (ViewDecorator, object),
                                              {'request_method': method,
                                               'state': wrapped,
                                               'kwargs': kwargs}))
            info = venusian.attach(wrapped, callback, 'pyramid', depth=depth)
            return wrapped

        return decorator
项目:baka    作者:baka-framework    | 项目源码 | 文件源码
def run(self, host=None, port=None, **options):
        """ application runner server for development stage. not for production.

        :param host: url host application server
        :param port: number of port
        :param options: dict options for werkzeug wsgi server
        """
        settings = self.config.get_settings()
        _host = '127.0.0.1'
        _port = 5000

        host = host or _host
        port = int(port or _port)
        options.setdefault('use_reloader', settings.get('debug_all'))
        options.setdefault('use_debugger', settings.get('debug_all'))

        from werkzeug.serving import run_simple
        run_simple(host, port, self.config.make_wsgi_app(), **options)
项目:SlackJira    作者:Rastii    | 项目源码 | 文件源码
def _logging_config(config_parser, disable_existing_loggers=False):
    """
    Helper that allows us to use an existing ConfigParser object to load logging
    configurations instead of a filename.

    Note: this code is essentially copy pasta from `logging.config.fileConfig` except
    we skip loading the file.
    """
    formatters = logging.config._create_formatters(config_parser)

    # critical section
    logging._acquireLock()
    try:
        logging._handlers.clear()
        del logging._handlerList[:]
        # Handlers add themselves to logging._handlers
        handlers = logging.config._install_handlers(config_parser, formatters)
        logging.config._install_loggers(config_parser, handlers, disable_existing_loggers)
    finally:
        logging._releaseLock()
项目:SlackJira    作者:Rastii    | 项目源码 | 文件源码
def main(args=None):
    args = get_parser_args(args)
    _logging_config(args.config)
    logger = logging.getLogger(__name__)

    logger.info("Loading configurations")
    slackbot_config = resources.SlackBotConfig.from_config(args.config)

    # Since we can't inject the settings into the bot, let's load all the settings
    # into the module
    slackbot_config.load_into_settings_module(slackbot.settings)
    # Load the config into the settings...
    # TODO: PR to be able to inject settings instead of auto magically loading them from a module
    slackbot.settings.SLACK_JIRA_CONF = args.config

    logger.info("Starting slackbot")
    bot = slackbot.bot.Bot()
    bot.run()
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def my_log():
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        handlers=[logging.FileHandler('message.log', 'a', 'utf-8')])
    # ?????_?????__
    _log = logging.getLogger('app.' + __name__)
    host = '10.0.0.175'
    port = 8080
    # ??? 'xxxx' % (aa, bb)????????
    _log.error('error to connect to %s:%d', host, port)
    _log.addFilter(FilterFunc('foo'))  # ?????foo()???????
    lgg = logging.getLogger('app.network.client')
    lgg.propagate = False  # ??????
    lgg.error('do you see me?')  # ????????
    lgg.setLevel(logging.CRITICAL)
    lgg.error('now you see me?')
    logging.disable(logging.DEBUG)  # ????????
    # ??log??????main?????????
    config.fileConfig('applogcfg.ini')
项目:sonic-py-swsssdk    作者:Azure    | 项目源码 | 文件源码
def setup_logging(config_file_path, log_level=logging.INFO):
    """
    Logging configuration helper.

    :param config_file_path: file path to logging configuration file.
    https://docs.python.org/3/library/logging.config.html#object-connections
    :param log_level: defaults to logging.INFO
    :return: None - access the logger by name as described in the config--or the "root" logger as a backup.
    """
    try:
        with open(config_file_path, 'rt') as f:
            config = json.load(f)
        logging.config.dictConfig(config)
    except (ValueError, IOError, OSError):
        # json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError
        logging.basicConfig(log_level=log_level)
        logging.root.exception(
            "Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: "
            "[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def _load_config():
    # Fills the global CONFIG dictionary using default and custom config
    # Returns an error if the custom config is invalid
    global CONFIG
    try:
        cfg = _load_default_config()
        custom_cfg = _load_custom_config()
        if custom_cfg:
            CONFIG = _merge(cfg, custom_cfg)
        else:
            CONFIG = cfg
    except yaml.YAMLError as exc:
        # Try to point to the line that threw an error
        if hasattr(exc, 'problem_mark'):
            mark = exc.problem_mark
            return 'Error in YAML at position: ({}:{})'.format(mark.line + 1,
                                                               mark.column + 1)
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def get(entity, param):
    """
    Returns the configuration value belonging to a specified entity
    (e.g. neo4j) and parameter (e.g. host).

    :param entity: The configuration entity
    :param param: The configuration parameter
    :return: The configuration value
    :raises ValueError if a requested parameter is not configured
    """
    try:
        value = _get_config()[entity][param]
        LOGGER.debug('Found config: {}:{} => {}'.format(entity, param, value))
        return _get_config()[entity][param]
    except KeyError:
        # Should _never_ happen in production!
        msg = 'Parameter {} is not present for entity {}!'.format(param,
                                                                  entity)
        LOGGER.critical(msg)
        raise ValueError(msg)
项目:python-aws-ecr-deployer    作者:filc    | 项目源码 | 文件源码
def _setup_requests(app):

    def _init_request():
        session = request.environ['beaker.session']
        session.save()

        _setup_connector(
            app=current_app,
            app_config=current_app.config,
            session=session
        )

    @app.before_request
    def before_request():
        init_request = _init_request()
        return init_request
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def make_web_app():
    logging.config.dictConfig(config.LOGGING_CONFIG)

    settings = {
        'debug': constants.DEBUG,
        'template_path': os.path.join(
            os.path.dirname(__file__), "web", "templates"
        ),
        'static_path': os.path.join(
            os.path.dirname(__file__), 'web', 'static'
        ),
        'default_handler_class ': BaseHandler
    }

    app = tornado.web.Application(url_handlers, **settings)
    return app
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url)

    with context.begin_transaction():
        context.run_migrations()