我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用logbook.ERROR。
def filter_keyed_by_status(keys, values_with_status, context=None, level=logbook.ERROR): """Filter `values_with_status` (a `list` of `(bool, value)`); return dict with corresponding keys. >>> dict(filter_keyed_by_status(['one', 'two'], [(True, 'foo'), (False, 'bar')])) {'one': 'foo'} """ values = collections.OrderedDict() for key, (status, value_or_traceback) in zip(keys, values_with_status): if not status: logger.log(level, "[{!s}] failed to retrieve data for '{!s}':\n{!s}", context, key, value_or_traceback) else: values[key] = value_or_traceback return values
def start_loop(cfg: Config, noop=False): handlers = [] handlers.append(StreamHandler(sys.stdout, level=cfg.log_level)) logger = Logger("Heart") logger.info("Initializing Oshino v{0}".format(get_version())) logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit" .format(cfg.interval)) if cfg.sentry_dsn: try: client = SentryClient(cfg.sentry_dsn) handlers.append(SentryHandler(client, level=logbook.ERROR, bubble=True)) except InvalidDsn: logger.warn("Invalid Sentry DSN '{0}' providen. Skipping" .format(cfg.sentry_dsn)) setup = NestedSetup(handlers) setup.push_application() loop = create_loop() try: loop.run_until_complete(main_loop(cfg, logger, cfg.riemann.transport(noop), forever, loop=loop)) finally: loop.close()
def get_color(self, record): level = record.level if level >= ERROR: return 'red' if level >= NOTICE: return 'yellow' if level == DEBUG: return 'darkteal' return 'lightgray'
def __get_logbook_logging_level(level_str): # logbook levels: # CRITICAL = 15 # ERROR = 14 # WARNING = 13 # NOTICE = 12 # INFO = 11 # DEBUG = 10 # TRACE = 9 # NOTSET = 0 level_str = level_str.upper().strip() if level_str == 'CRITICAL': return logbook.CRITICAL elif level_str == 'ERROR': return logbook.ERROR elif level_str == 'WARNING': return logbook.WARNING elif level_str == 'NOTICE': return logbook.NOTICE elif level_str == 'INFO': return logbook.INFO elif level_str == 'DEBUG': return logbook.DEBUG elif level_str == 'TRACE': return logbook.TRACE elif level_str == 'NOTSET': return logbook.NOTSET else: raise ValueError("Unknown logbook log level: {}".format(level_str))
def __get_logging_method(log_level): method_table = { logbook.DEBUG: logger.debug, logbook.INFO: logger.info, logbook.WARNING: logger.warning, logbook.ERROR: logger.error, logbook.CRITICAL: logger.critical, } method = method_table.get(log_level) if method is None: raise ValueError("unknown log level: {}".format(log_level)) return method
def filter_by_status(values_with_status, context=None, level=logbook.ERROR): """Filter a `list` of `(bool, value)` pairs on the `bool` value, warning on `False` values. >>> filter_by_status([(True, 'foo'), (False, 'bar')]) ['foo'] """ values = list() for (status, value_or_traceback) in values_with_status: if not status: logger.log(level, "[{!s}] failed to retrieve data:\n{!s}", context, value_or_traceback) else: values.append(value_or_traceback) return values
def _get_device_details(self, devices_response): # logger.debug("bitfit devices:\n{!s}", json.dumps(devices_response, indent=2)) deferreds = [self._get_device_by_id(device['id']) for device in devices_response.get('items', [])] deferred_list = defer.DeferredList(deferreds, consumeErrors=True) # shouldn't fail since we're working off bitfit's own data for the inputs deferred_list.addCallback(stethoscope.api.utils.filter_by_status, context=sys._getframe().f_code.co_name, level=logbook.ERROR) return deferred_list
def get_devices_by_email(self, email): deferred_list = defer.DeferredList([ threads.deferToThread(super(DeferredGoogleDataSource, self)._get_mobile_devices_by_email, email), threads.deferToThread(super(DeferredGoogleDataSource, self)._get_chromeos_devices_by_email, email), ], consumeErrors=True) deferred_list.addCallback(stethoscope.api.utils.filter_by_status, context=sys._getframe().f_code.co_name, level=logbook.ERROR) deferred_list.addCallback(chain.from_iterable) deferred_list.addCallback(list) return deferred_list
def _get_devices_by_id(self, device_ids): deferred_list = defer.DeferredList([self._get_device_by_id(device_id) for device_id in device_ids], consumeErrors=True) # working off JAMF's own data, so shouldn't fail deferred_list.addCallback(stethoscope.api.utils.filter_by_status, context=sys._getframe().f_code.co_name, level=logbook.ERROR) return deferred_list
def _get_logging_level(verbosity): # noinspection PyPackageRequirements import logbook return { 1: logbook.CRITICAL, 2: logbook.ERROR, 3: logbook.WARNING, 4: logbook.NOTICE, 5: logbook.INFO, 6: logbook.DEBUG, 7: logbook.TRACE, }[verbosity]
def main(): options = parse_option() initialize_cli(options) if is_execute_tc_command(options.tc_command_output): check_tc_command_installation() is_delete_all = options.is_delete_all else: subprocrunner.SubprocessRunner.default_is_dry_run = True is_delete_all = True set_logger(False) try: verify_network_interface(options.device) except NetworkInterfaceNotFoundError as e: logger.error("{:s}: {}".format(e.__class__.__name__, e)) return errno.EINVAL subprocrunner.SubprocessRunner.clear_history() tc = create_tc_obj(options) if options.log_level == logbook.INFO: subprocrunner.set_log_level(logbook.ERROR) normalize_tc_value(tc) return_code = 0 if is_delete_all: return_code = tc.delete_all_tc() else: return_code = tc.delete_tc() command_history = "\n".join(tc.get_command_history()) if options.tc_command_output == TcCommandOutput.STDOUT: print(command_history) return return_code elif options.tc_command_output == TcCommandOutput.SCRIPT: set_logger(True) write_tc_script( Tc.Command.TCDEL, command_history, filename_suffix=options.device) return return_code logger.debug("command history\n{}".format(command_history)) return return_code