Python logbook 模块,Logger() 实例源码

我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用logbook.Logger()

项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def analyze(context=None, results=None):
    import matplotlib.pyplot as plt
    import logbook
    logbook.StderrHandler().push_application()
    log = logbook.Logger('Algorithm')

    fig = plt.figure()
    ax1 = fig.add_subplot(211)

    results.algorithm_period_return.plot(ax=ax1,color='blue',legend=u'????')
    ax1.set_ylabel(u'??')
    results.benchmark_period_return.plot(ax=ax1,color='red',legend=u'????')

    plt.show()

# capital_base is the base value of capital
#
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def _get_logger_for_contextmanager(log):
    """Get the canonical logger from a context manager.

    Parameters
    ----------
    log : Logger or None
        The explicit logger passed to the context manager.

    Returns
    -------
    log : Logger
        The logger to use in the context manager.
    """
    if log is not None:
        return log

    # We need to walk up through the context manager, then through
    # @contextmanager and finally into the top level calling frame.
    return _logger_for_frame(_getframe(3))
项目:DPQ    作者:DiggerPlus    | 项目源码 | 文件源码
def __init__(self, queues, name=None, rv_ttl=500, connection=None):  # noqa
        if connection is None:
            connection = resolve_connection()
        self.connection = connection
        if isinstance(queues, Queue):
            queues = [queues]
        self._name = name
        self.queues = queues
        self.validate_queues()
        self.rv_ttl = rv_ttl
        self._state = 'starting'
        self._is_horse = False
        self._horse_pid = 0
        self._stopped = False
        self.log = Logger('worker')
        self.failed_queue = get_failed_queue(connection=self.connection)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
                 decryption_key=None, encryption_type='wpa-pwd', output_file=None,
                 decode_as=None, tshark_path=None):
        self._packets = []
        self.current_packet = 0
        self.display_filter = display_filter
        self.only_summaries = only_summaries
        self.output_file = output_file
        self.running_processes = set()
        self.loaded = False
        self.decode_as = decode_as
        self.log = logbook.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
        self.tshark_path = tshark_path
        self.debug = False

        self.eventloop = eventloop
        if self.eventloop is None:
            self.setup_eventloop()
        if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS:
            self.encryption = (decryption_key, encryption_type.lower())
        else:
            raise UnknownEncyptionStandardException("Only the following standards are supported: %s."
                                                    % ', '.join(self.SUPPORTED_ENCRYPTION_STANDARDS))
项目:saltyrtc-server-python    作者:saltyrtc    | 项目源码 | 文件源码
def get_logger(name=None, level=None):
    """
    Return a :class:`logbook.Logger`.

    Arguments:
        - `name`: The name of a specific sub-logger.
        - `level`: A :mod:`logbook` logging level.
    """
    if _logger_convert_level_handler is None:
        _logging_error()

    # At this point, logbook is either defined or an error has been returned
    if level is None:
        level = logbook.NOTSET
    base_name = 'saltyrtc'
    name = base_name if name is None else '.'.join((base_name, name))

    # Create new logger and add to group
    logger = logbook.Logger(name=name, level=level)
    logger_group.add_logger(logger)
    return logger
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def register_augments(client: processor.QClient,
                      augments_cfg: list,
                      logger: Logger):
    for augment in augments_cfg:
        if not augment.is_valid():
            logger.warn("Augment '{0}' failed to pass validation"
                        .format(augment))
            continue

        inst = augment.instance

        print(inst)
        processor.register_augment(client, augment.key, inst.activate, logger)
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def instrumentation(client: processor.QClient,
                    logger: Logger,
                    interval: int,
                    delta: int,
                    events_count: int,
                    pending_events: int):
    send_heartbeat(client.event, logger, int(interval * 1.5))
    send_timedelta(client.event, logger, delta, interval)
    send_metrics_count(client.event, logger, events_count)
    send_pending_events_count(client.event, logger, events_count)
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def main_loop(cfg: Config,
                    logger: Logger,
                    transport_cls: Generic[T],
                    continue_fn: callable,
                    loop: BaseEventLoop):
    riemann = cfg.riemann
    transport = transport_cls(riemann.host, riemann.port)
    client = processor.QClient(transport)
    agents = create_agents(cfg.agents)
    register_augments(client, cfg.augments, logger)
    executor = cfg.executor_class(max_workers=cfg.executors_count)
    loop.set_default_executor(executor)

    init(agents)

    while True:
        ts = time()
        (done, pending) = await step(client,
                                     agents,
                                     timeout=cfg.interval * 1.5,
                                     loop=loop)

        te = time()
        td = te - ts
        instrumentation(client,
                        logger,
                        cfg.interval,
                        td,
                        len(client.queue.events),
                        len(pending))

        await processor.flush(client, transport, logger)
        if continue_fn():
            await asyncio.sleep(cfg.interval - int(td), loop=loop)
        else:
            logger.info("Stopping Oshino")
            break

    client.on_stop()
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def start_loop(cfg: Config, noop=False):
    handlers = []
    handlers.append(StreamHandler(sys.stdout, level=cfg.log_level))
    logger = Logger("Heart")
    logger.info("Initializing Oshino v{0}".format(get_version()))
    logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit"
                .format(cfg.interval))
    if cfg.sentry_dsn:
        try:
            client = SentryClient(cfg.sentry_dsn)
            handlers.append(SentryHandler(client,
                                          level=logbook.ERROR,
                                          bubble=True))
        except InvalidDsn:
            logger.warn("Invalid Sentry DSN '{0}' providen. Skipping"
                        .format(cfg.sentry_dsn))

    setup = NestedSetup(handlers)
    setup.push_application()

    loop = create_loop()
    try:
        loop.run_until_complete(main_loop(cfg,
                                          logger,
                                          cfg.riemann.transport(noop),
                                          forever,
                                          loop=loop))
    finally:
        loop.close()
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def get_logger(self):
        return Logger(self.__class__.__name__)
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def log_duration(operation, level='info', log=None):
    """Log the duration of some process.

    Parameters
    ----------
    operation : str
        What is being timed?
    level : str, optional
        The level to log the start and end messages at.
    log : Logger, optional
        The logger object to write to. By default this is the logger for the
        calling frame.
    """
    log = _get_logger_for_contextmanager(log)

    log.log(level.upper(), operation)
    start = datetime.now()
    try:
        yield
    finally:
        now = datetime.now()
        log.log(
            level.upper(),
            'completed {} (completed in {})',
            operation,
            naturaldelta(now - start),
        )
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def _logger_for_frame(f):
    """Return the memoized logger object for the given stackframe.

    Parameters
    ----------
    f : frame
        The frame to get the logger for.

    Returns
    -------
    logger : Logger
        The memoized logger object.
    """
    return _mem_logger(f.f_globals['__name__'])
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def init_class_fixtures(cls):
        super(WithLogger, cls).init_class_fixtures()
        cls.log = Logger()
        cls.log_handler = cls.enter_class_context(
            cls.make_log_handler().applicationbound(),
        )
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def logbook_logger():
    return logbook.Logger(__name__)
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        layout_render = pyramid.renderers.get_renderer('nflpool:templates/shared/_layout.pt')
        impl = layout_render.implementation()
        self.layout = impl.macros['layout']

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
项目:nflpool    作者:prcutler    | 项目源码 | 文件源码
def get_startup_log():
        return logbook.Logger("App")
项目:DPQ    作者:DiggerPlus    | 项目源码 | 文件源码
def main_work_horse(self, job):
        """This is the entry point of the newly spawned work horse."""
        # After fork()'ing, always assure we are generating random sequences
        # that are different from the worker.
        random.seed()
        self._is_horse = True
        self.log = Logger('horse')

        success = self.perform_job(job)

        # os._exit() is the way to exit from childs after a fork(), in
        # constrast to the regular sys.exit()
        os._exit(int(not success))
项目:OdooQuant    作者:haogefeifei    | 项目源码 | 文件源码
def get_logger(name, debug=True):
    logbook.set_datetime_format('local')
    handler = StreamHandler(sys.stdout) if debug else NullHandler()
    handler.push_application()
    return Logger(os.path.basename(name))
项目:python_tk_adb    作者:liwanlei    | 项目源码 | 文件源码
def get_logger(name='monkey??????????', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def rpc_server(socket, protocol, dispatcher):
    log = Logger('rpc_server')
    log.debug('starting up...')
    while True:
        try:
            message = socket.recv_multipart()
        except Exception as e:
            log.warning('Failed to receive message from client, ignoring...')
            log.exception(e)
            continue

        log.debug('Received message %s from %r', message[-1], message[0])

        # assuming protocol is threadsafe and dispatcher is theadsafe, as long
        # as its immutable

        def handle_client(message):
            try:
                request = protocol.parse_request(message[-1])
            except RPCError as e:
                log.exception(e)
                response = e.error_respond()
            else:
                response = dispatcher.dispatch(request)
                log.debug('Response okay: %r', response)

            # send reply
            message[-1] = response.serialize()
            log.debug('Replying %s to %r', message[-1], message[0])
            socket.send_multipart(message)

        gevent.spawn(handle_client, message)
项目:jiekou-python3    作者:liwanlei    | 项目源码 | 文件源码
def get_logger(name='jiekou', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
项目:sqlitebiter    作者:thombashi    | 项目源码 | 文件源码
def make_logger(channel_name, log_level):
    import appconfigpy

    logger = logbook.Logger(channel_name)

    if log_level == QUIET_LOG_LEVEL:
        logger.disable()

    logger.level = log_level
    ptr.set_log_level(log_level)
    simplesqlite.set_log_level(log_level)
    appconfigpy.set_log_level(log_level)

    return logger
项目:FXTest    作者:liwanlei    | 项目源码 | 文件源码
def get_logger(name='jiekou', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
项目:cookiecutter-pyramid-talk-python-starter    作者:mikeckennedy    | 项目源码 | 文件源码
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
项目:cookiecutter-pyramid-talk-python-starter    作者:mikeckennedy    | 项目源码 | 文件源码
def get_startup_log():
        return logbook.Logger("App")
项目:cookiecutter-course    作者:mikeckennedy    | 项目源码 | 文件源码
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
项目:cookiecutter-course    作者:mikeckennedy    | 项目源码 | 文件源码
def get_startup_log():
        return logbook.Logger("App")
项目:Pixie    作者:GetRektByMe    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super().__init__(command_prefix=when_mentioned_or(setup_file["discord"]["command_prefix"]),
                         description="A bot for weebs programmed by Recchan")

        # Set a custom user agent for Pixie
        self.http.user_agent = user_agent

        # Logging setup
        redirect_logging()
        StreamHandler(sys.stderr).push_application()
        self.logger = Logger("Pixie")
        self.logger.level = getattr(logbook, setup_file.get("log_level", "INFO"), logbook.INFO)
        logging.root.setLevel(self.logger.level)
项目:exconf    作者:maginetv    | 项目源码 | 文件源码
def get_logger(logger_name="magine-services"):
    return logbook.Logger(logger_name)
项目:fibratus    作者:rabbitstack    | 项目源码 | 文件源码
def test_init(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': MagicMock(),
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=True), \
                 patch('os.path.isdir', return_value=True), \
                 patch('glob.glob', return_value=['silent_banker.yar']), \
                 patch('yara.compile') as yara_compile_mock:
                    YaraBinding(outputs,
                                Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                    yara_compile_mock.assert_called_with(os.path.join('C:\\yara-rules', 'silent_banker.yar'))
项目:fibratus    作者:rabbitstack    | 项目源码 | 文件源码
def test_init_invalid_path(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': None,
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=False), \
                 patch('os.path.isdir', return_value=False):
                    with pytest.raises(BindingError) as e:
                        YaraBinding(outputs,
                                    Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules-invalid')
                        assert 'C:\\yara-rules-invalid rules path does not exist' in str(e.value)
项目:fibratus    作者:rabbitstack    | 项目源码 | 文件源码
def test_init_yara_python_not_installed(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': None,
        }):
            from fibratus.binding.yar import YaraBinding
            with pytest.raises(BindingError) as e:
                YaraBinding(outputs,
                            Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                assert 'yara-python package is not installed' in str(e.value)
项目:fibratus    作者:rabbitstack    | 项目源码 | 文件源码
def test_run(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': MagicMock(),
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=True), \
                 patch('os.path.isdir', return_value=True), \
                 patch('glob.glob', return_value=['silent_banker.yar']), \
                 patch('yara.compile'):
                yara_binding = YaraBinding(outputs,
                                           Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                yara_binding.run(thread_info=Mock(spec_set=ThreadInfo), kevent=Mock(spec_set=KEvent))
                assert yara_binding._rules.match.called
项目:python-logbook-systemd    作者:vivienm    | 项目源码 | 文件源码
def logger():
    return Logger('testlogger')
项目:GAFBot    作者:DiNitride    | 项目源码 | 文件源码
def run(self):

        log = Logger("GAF Bot")
        log.handlers.append(StreamHandler(sys.stdout, bubble=True))
        log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w"))
        self.logger = log
        self.logger.notice("Logging started")
        self.logger.notice("Bot process started")

        with open("bot/config/defaults/default.guildconfig.json") as f:
            self.default_guild_config = json.load(f)
            self.logger.debug("Loaded default guild config")

        self.logger.debug("Connecting to DB")
        self.db_conn = sqlite3.connect("bot/config/guild_configs.db")
        self.logger.notice("DB Connection Established")
        self.db_cursor = self.db_conn.cursor()
        self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'")
        exists = self.db_cursor.fetchone()
        if not exists[0]:
            self.logger.error("No table found in DB! Creating new one now")
            self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''')
            self.logger.debug("Table created")

        self.load_extension("bot.modules.core")
        self.logger.notice("Loaded core module")

        self.logger.notice("Loading other modules")
        # This bar and the time.sleep() stuff is entirely useless
        # Like completely
        # Don't do this
        # It just looks cool and that makes me happy but really this is terrible
        # and a complete waste of time
        time.sleep(0.5)
        for cog in tqdm.tqdm(self.config["modules"].keys(),
                             desc="Loading modules"
                             ):
            self.load_extension(f"bot.modules.{cog.lower()}")
            time.sleep(0.2)
        time.sleep(0.5)
        self.logger.debug("Completed loading modules")

        self.logger.notice("Logging into Discord")
        super().run(self.config["token"], reconnect=True)
项目:pingparsing    作者:thombashi    | 项目源码 | 文件源码
def main():
    options = parse_option()

    initialize_log_handler(options.log_level)

    logger = logbook.Logger("pingparsing cli")
    logger.level = options.log_level
    pingparsing.set_log_level(options.log_level)

    output = {}
    if is_use_stdin():
        from concurrent import futures

        pingparsing.set_log_level(options.log_level)

        max_workers = (multiprocessing.cpu_count() * 2
                       if options.max_workers is None else options.max_workers)
        count, deadline = get_ping_param(options)
        logger.debug("max-workers={}, count={}, deadline={}".format(
            max_workers, count, deadline))

        try:
            with futures.ProcessPoolExecutor(max_workers) as executor:
                future_list = []
                for dest_or_file in options.destination_or_file:
                    logger.debug("start {}".format(dest_or_file))
                    future_list.append(executor.submit(
                        parse_ping, logger, dest_or_file, options.interface,
                        count, deadline))

                for future in futures.as_completed(future_list):
                    key, ping_data = future.result()
                    output[key] = ping_data
        finally:
            logger.debug("shutdown ProcessPoolExecutor")
            executor.shutdown()
    else:
        ping_result_text = sys.stdin.read()
        ping_parser = pingparsing.PingParsing()
        ping_parser.parse(ping_result_text)
        output = ping_parser.as_dict()

    if options.indent <= 0:
        print(json.dumps(output))
    else:
        print(json.dumps(output, indent=options.indent))

    return 0