我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用logbook.Logger()。
def analyze(context=None, results=None): import matplotlib.pyplot as plt import logbook logbook.StderrHandler().push_application() log = logbook.Logger('Algorithm') fig = plt.figure() ax1 = fig.add_subplot(211) results.algorithm_period_return.plot(ax=ax1,color='blue',legend=u'????') ax1.set_ylabel(u'??') results.benchmark_period_return.plot(ax=ax1,color='red',legend=u'????') plt.show() # capital_base is the base value of capital #
def _get_logger_for_contextmanager(log): """Get the canonical logger from a context manager. Parameters ---------- log : Logger or None The explicit logger passed to the context manager. Returns ------- log : Logger The logger to use in the context manager. """ if log is not None: return log # We need to walk up through the context manager, then through # @contextmanager and finally into the top level calling frame. return _logger_for_frame(_getframe(3))
def __init__(self, queues, name=None, rv_ttl=500, connection=None): # noqa if connection is None: connection = resolve_connection() self.connection = connection if isinstance(queues, Queue): queues = [queues] self._name = name self.queues = queues self.validate_queues() self.rv_ttl = rv_ttl self._state = 'starting' self._is_horse = False self._horse_pid = 0 self._stopped = False self.log = Logger('worker') self.failed_queue = get_failed_queue(connection=self.connection)
def __init__(self, display_filter=None, only_summaries=False, eventloop=None, decryption_key=None, encryption_type='wpa-pwd', output_file=None, decode_as=None, tshark_path=None): self._packets = [] self.current_packet = 0 self.display_filter = display_filter self.only_summaries = only_summaries self.output_file = output_file self.running_processes = set() self.loaded = False self.decode_as = decode_as self.log = logbook.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL) self.tshark_path = tshark_path self.debug = False self.eventloop = eventloop if self.eventloop is None: self.setup_eventloop() if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS: self.encryption = (decryption_key, encryption_type.lower()) else: raise UnknownEncyptionStandardException("Only the following standards are supported: %s." % ', '.join(self.SUPPORTED_ENCRYPTION_STANDARDS))
def get_logger(name=None, level=None): """ Return a :class:`logbook.Logger`. Arguments: - `name`: The name of a specific sub-logger. - `level`: A :mod:`logbook` logging level. """ if _logger_convert_level_handler is None: _logging_error() # At this point, logbook is either defined or an error has been returned if level is None: level = logbook.NOTSET base_name = 'saltyrtc' name = base_name if name is None else '.'.join((base_name, name)) # Create new logger and add to group logger = logbook.Logger(name=name, level=level) logger_group.add_logger(logger) return logger
def register_augments(client: processor.QClient, augments_cfg: list, logger: Logger): for augment in augments_cfg: if not augment.is_valid(): logger.warn("Augment '{0}' failed to pass validation" .format(augment)) continue inst = augment.instance print(inst) processor.register_augment(client, augment.key, inst.activate, logger)
def instrumentation(client: processor.QClient, logger: Logger, interval: int, delta: int, events_count: int, pending_events: int): send_heartbeat(client.event, logger, int(interval * 1.5)) send_timedelta(client.event, logger, delta, interval) send_metrics_count(client.event, logger, events_count) send_pending_events_count(client.event, logger, events_count)
def main_loop(cfg: Config, logger: Logger, transport_cls: Generic[T], continue_fn: callable, loop: BaseEventLoop): riemann = cfg.riemann transport = transport_cls(riemann.host, riemann.port) client = processor.QClient(transport) agents = create_agents(cfg.agents) register_augments(client, cfg.augments, logger) executor = cfg.executor_class(max_workers=cfg.executors_count) loop.set_default_executor(executor) init(agents) while True: ts = time() (done, pending) = await step(client, agents, timeout=cfg.interval * 1.5, loop=loop) te = time() td = te - ts instrumentation(client, logger, cfg.interval, td, len(client.queue.events), len(pending)) await processor.flush(client, transport, logger) if continue_fn(): await asyncio.sleep(cfg.interval - int(td), loop=loop) else: logger.info("Stopping Oshino") break client.on_stop()
def start_loop(cfg: Config, noop=False): handlers = [] handlers.append(StreamHandler(sys.stdout, level=cfg.log_level)) logger = Logger("Heart") logger.info("Initializing Oshino v{0}".format(get_version())) logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit" .format(cfg.interval)) if cfg.sentry_dsn: try: client = SentryClient(cfg.sentry_dsn) handlers.append(SentryHandler(client, level=logbook.ERROR, bubble=True)) except InvalidDsn: logger.warn("Invalid Sentry DSN '{0}' providen. Skipping" .format(cfg.sentry_dsn)) setup = NestedSetup(handlers) setup.push_application() loop = create_loop() try: loop.run_until_complete(main_loop(cfg, logger, cfg.riemann.transport(noop), forever, loop=loop)) finally: loop.close()
def get_logger(self): return Logger(self.__class__.__name__)
def log_duration(operation, level='info', log=None): """Log the duration of some process. Parameters ---------- operation : str What is being timed? level : str, optional The level to log the start and end messages at. log : Logger, optional The logger object to write to. By default this is the logger for the calling frame. """ log = _get_logger_for_contextmanager(log) log.log(level.upper(), operation) start = datetime.now() try: yield finally: now = datetime.now() log.log( level.upper(), 'completed {} (completed in {})', operation, naturaldelta(now - start), )
def _logger_for_frame(f): """Return the memoized logger object for the given stackframe. Parameters ---------- f : frame The frame to get the logger for. Returns ------- logger : Logger The memoized logger object. """ return _mem_logger(f.f_globals['__name__'])
def init_class_fixtures(cls): super(WithLogger, cls).init_class_fixtures() cls.log = Logger() cls.log_handler = cls.enter_class_context( cls.make_log_handler().applicationbound(), )
def logbook_logger(): return logbook.Logger(__name__)
def __init__(self, request): self.request = request self.build_cache_id = static_cache.build_cache_id layout_render = pyramid.renderers.get_renderer('nflpool:templates/shared/_layout.pt') impl = layout_render.implementation() self.layout = impl.macros['layout'] log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "") self.log = logbook.Logger(log_name)
def get_startup_log(): return logbook.Logger("App")
def main_work_horse(self, job): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences # that are different from the worker. random.seed() self._is_horse = True self.log = Logger('horse') success = self.perform_job(job) # os._exit() is the way to exit from childs after a fork(), in # constrast to the regular sys.exit() os._exit(int(not success))
def get_logger(name, debug=True): logbook.set_datetime_format('local') handler = StreamHandler(sys.stdout) if debug else NullHandler() handler.push_application() return Logger(os.path.basename(name))
def get_logger(name='monkey??????????', file_log=file_stream, level=''): """ get logger Factory function """ logbook.set_datetime_format('local') ColorizedStderrHandler(bubble=False, level=level).push_thread() logbook.TimedRotatingFileHandler( os.path.join(LOG_DIR, '%s.log' % name), date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread() return logbook.Logger(name)
def rpc_server(socket, protocol, dispatcher): log = Logger('rpc_server') log.debug('starting up...') while True: try: message = socket.recv_multipart() except Exception as e: log.warning('Failed to receive message from client, ignoring...') log.exception(e) continue log.debug('Received message %s from %r', message[-1], message[0]) # assuming protocol is threadsafe and dispatcher is theadsafe, as long # as its immutable def handle_client(message): try: request = protocol.parse_request(message[-1]) except RPCError as e: log.exception(e) response = e.error_respond() else: response = dispatcher.dispatch(request) log.debug('Response okay: %r', response) # send reply message[-1] = response.serialize() log.debug('Replying %s to %r', message[-1], message[0]) socket.send_multipart(message) gevent.spawn(handle_client, message)
def get_logger(name='jiekou', file_log=file_stream, level=''): """ get logger Factory function """ logbook.set_datetime_format('local') ColorizedStderrHandler(bubble=False, level=level).push_thread() logbook.TimedRotatingFileHandler( os.path.join(LOG_DIR, '%s.log' % name), date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread() return logbook.Logger(name)
def make_logger(channel_name, log_level): import appconfigpy logger = logbook.Logger(channel_name) if log_level == QUIET_LOG_LEVEL: logger.disable() logger.level = log_level ptr.set_log_level(log_level) simplesqlite.set_log_level(log_level) appconfigpy.set_log_level(log_level) return logger
def __init__(self, request): self.request = request self.build_cache_id = static_cache.build_cache_id log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "") self.log = logbook.Logger(log_name)
def __init__(self, *args, **kwargs): super().__init__(command_prefix=when_mentioned_or(setup_file["discord"]["command_prefix"]), description="A bot for weebs programmed by Recchan") # Set a custom user agent for Pixie self.http.user_agent = user_agent # Logging setup redirect_logging() StreamHandler(sys.stderr).push_application() self.logger = Logger("Pixie") self.logger.level = getattr(logbook, setup_file.get("log_level", "INFO"), logbook.INFO) logging.root.setLevel(self.logger.level)
def get_logger(logger_name="magine-services"): return logbook.Logger(logger_name)
def test_init(self, outputs): with patch.dict('sys.modules', **{ 'yara': MagicMock(), }): from fibratus.binding.yar import YaraBinding with patch('os.path.exists', return_value=True), \ patch('os.path.isdir', return_value=True), \ patch('glob.glob', return_value=['silent_banker.yar']), \ patch('yara.compile') as yara_compile_mock: YaraBinding(outputs, Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules') yara_compile_mock.assert_called_with(os.path.join('C:\\yara-rules', 'silent_banker.yar'))
def test_init_invalid_path(self, outputs): with patch.dict('sys.modules', **{ 'yara': None, }): from fibratus.binding.yar import YaraBinding with patch('os.path.exists', return_value=False), \ patch('os.path.isdir', return_value=False): with pytest.raises(BindingError) as e: YaraBinding(outputs, Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules-invalid') assert 'C:\\yara-rules-invalid rules path does not exist' in str(e.value)
def test_init_yara_python_not_installed(self, outputs): with patch.dict('sys.modules', **{ 'yara': None, }): from fibratus.binding.yar import YaraBinding with pytest.raises(BindingError) as e: YaraBinding(outputs, Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules') assert 'yara-python package is not installed' in str(e.value)
def test_run(self, outputs): with patch.dict('sys.modules', **{ 'yara': MagicMock(), }): from fibratus.binding.yar import YaraBinding with patch('os.path.exists', return_value=True), \ patch('os.path.isdir', return_value=True), \ patch('glob.glob', return_value=['silent_banker.yar']), \ patch('yara.compile'): yara_binding = YaraBinding(outputs, Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules') yara_binding.run(thread_info=Mock(spec_set=ThreadInfo), kevent=Mock(spec_set=KEvent)) assert yara_binding._rules.match.called
def logger(): return Logger('testlogger')
def run(self): log = Logger("GAF Bot") log.handlers.append(StreamHandler(sys.stdout, bubble=True)) log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w")) self.logger = log self.logger.notice("Logging started") self.logger.notice("Bot process started") with open("bot/config/defaults/default.guildconfig.json") as f: self.default_guild_config = json.load(f) self.logger.debug("Loaded default guild config") self.logger.debug("Connecting to DB") self.db_conn = sqlite3.connect("bot/config/guild_configs.db") self.logger.notice("DB Connection Established") self.db_cursor = self.db_conn.cursor() self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'") exists = self.db_cursor.fetchone() if not exists[0]: self.logger.error("No table found in DB! Creating new one now") self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''') self.logger.debug("Table created") self.load_extension("bot.modules.core") self.logger.notice("Loaded core module") self.logger.notice("Loading other modules") # This bar and the time.sleep() stuff is entirely useless # Like completely # Don't do this # It just looks cool and that makes me happy but really this is terrible # and a complete waste of time time.sleep(0.5) for cog in tqdm.tqdm(self.config["modules"].keys(), desc="Loading modules" ): self.load_extension(f"bot.modules.{cog.lower()}") time.sleep(0.2) time.sleep(0.5) self.logger.debug("Completed loading modules") self.logger.notice("Logging into Discord") super().run(self.config["token"], reconnect=True)
def main(): options = parse_option() initialize_log_handler(options.log_level) logger = logbook.Logger("pingparsing cli") logger.level = options.log_level pingparsing.set_log_level(options.log_level) output = {} if is_use_stdin(): from concurrent import futures pingparsing.set_log_level(options.log_level) max_workers = (multiprocessing.cpu_count() * 2 if options.max_workers is None else options.max_workers) count, deadline = get_ping_param(options) logger.debug("max-workers={}, count={}, deadline={}".format( max_workers, count, deadline)) try: with futures.ProcessPoolExecutor(max_workers) as executor: future_list = [] for dest_or_file in options.destination_or_file: logger.debug("start {}".format(dest_or_file)) future_list.append(executor.submit( parse_ping, logger, dest_or_file, options.interface, count, deadline)) for future in futures.as_completed(future_list): key, ping_data = future.result() output[key] = ping_data finally: logger.debug("shutdown ProcessPoolExecutor") executor.shutdown() else: ping_result_text = sys.stdin.read() ping_parser = pingparsing.PingParsing() ping_parser.parse(ping_result_text) output = ping_parser.as_dict() if options.indent <= 0: print(json.dumps(output)) else: print(json.dumps(output, indent=options.indent)) return 0