我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用logging.getLevelName()。
def create_logger(): #logging.basicConfig(format='%(levelname)s - %(message)s') logging.basicConfig(format='%(message)s') root = logging.getLogger() root.setLevel(logging.getLevelName('INFO')) #Add handler for standard output (console) any debug+ #ch = logging.StreamHandler(sys.stdout) #ch.setLevel(logging.getLevelName('DEBUG')) #formatter = logging.Formatter('%(message)s') #ch.setFormatter(formatter) #handler = ColorStreamHandler() #handler.setLevel(logging.getLevelName("DEBUG")) #root.addHandler(handler) return root
def _cli_log_message(msg, logger_name=None, level="INFO"): """ Log a single message to Flightlog. Intended for CLI usage. Calling this function multiple times within the same process will configure duplicate handlers and result in duplicate messages. """ logger = logging.getLogger(logger_name) levelnum = logging.getLevelName(level.upper()) try: int(levelnum) except ValueError: raise ValueError("level must be one of DEBUG, INFO, WARNING, ERROR, CRITICAL") handler = FlightlogHandler(background=False) logger.addHandler(handler) logger.setLevel(levelnum) if msg == "-": msg = sys.stdin.read() for line in msg.splitlines(): if line: logger.log(levelnum, line) exit_code = 0 return None, exit_code
def test_log_msg(): # This test assumes the default message format found around line 139 # of linchpin/cli/context.py lvl=logging.DEBUG msg = 'Test Msg' regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg) lpc = LinchpinCliContext() lpc.load_config(config_path) lpc.setup_logging() lpc.log(msg, level=lvl) with open(logfile) as f: line = f.readline() assert_regexp_matches(line, regex)
def test_log_debug(): lvl=logging.DEBUG msg = 'Debug Msg' regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg) lpc = LinchpinCliContext() lpc.load_config(config_path) lpc.setup_logging() lpc.log_debug(msg) with open(logfile) as f: line = f.readline() assert_regexp_matches(line, regex)
def get_log_config(level): return { 'version': 1, 'formatters': { 'basicFormatter': { 'format': '[%(asctime)s %(levelname)s %(threadName)s] %(name)s: %(message)s' } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'basicFormatter', 'stream': 'ext://sys.stdout' } }, 'loggers': { 'hephaestus': { 'level': logging.getLevelName(level), 'propagate': False, 'handlers': ['console'] } } }
def main(argv): """ MySQL binlog to Google Pub/Sub entry point Args: argv (list): list of command line arguments """ args = _setup_arg_parser(argv) conf_file = args.conf if conf_file: os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file if args.logconf: logging.config.fileConfig(args.logconf, disable_existing_loggers=False) else: logging.basicConfig() if args.loglevel: logging.root.setLevel(logging.getLevelName(args.loglevel.upper())) import mysqlbinlog2gpubsub mysqlbinlog2gpubsub.start_publishing()
def test_invalid_value(self): """ Sends an invalid value to the handler. """ message = 'Needs more cowbell.' context = { 'key': 'test', 'value': "(Don't Fear) The Reaper", } self.handler.handle_invalid_value(message, False, context) self.assertEqual(len(self.logs.records), 1) self.assertEqual(self.logs[0].msg, message) self.assertEqual(getattr(self.logs[0], 'context'), context) # The log message level is set in the handler's initializer. self.assertEqual(self.logs[0].levelname, getLevelName(WARNING)) # No exception info for invalid values (by default). self.assertIsNone(self.logs[0].exc_text)
def _level_write(self, level, str_format, *args): if level < self._level: return levelname = logging.getLevelName(level) message = str_format % args if args else str_format message = strutils.decode(message) frame, filename, line_number, function_name, lines, index = inspect.stack()[2] props = dict( asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], name=self._name, filename=os.path.basename(filename), lineno=line_number, message=message, ) props['levelname'] = Logger.__alias.get(levelname, levelname) output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props) self._write(output)
def init(cls): cls.logger = logging.getLogger(_LOGGER_NAME) logger = cls.logger levelname = os.environ.get('JUBAKIT_LOG_LEVEL', None) if not levelname: # Surpress printing logs by default. logger.addHandler(cls._NullHandler()) logger.setLevel(CRITICAL) return # Setup logger from environment variable. for lvl in (DEBUG, INFO, WARNING, ERROR, CRITICAL): if logging.getLevelName(lvl) == levelname: setup_logger(lvl) break else: setup_logger(INFO) logger.warning('invalid JUBAKIT_LOG_LEVEL (%s) specified; continue with INFO', levelname)
def _level_write(self, level, str_format, *args): if level < self._level: return levelname = logging.getLevelName(level) message = str_format % args if args else str_format message = strutils.decode(message) frame, filename, line_number, function_name, lines, index = inspect.stack()[2] props = dict( asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], name=self._name, filename=os.path.basename(filename), lineno=line_number, message=message, ) props['levelname'] = Logger.__alias.get(levelname, levelname) output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props) self._write(strutils.encode(output, 'utf-8'))
def setup_logging(args_obj): if args_obj.verbose: level = logging.DEBUG else: level = logging.getLevelName(args_obj.log_level.upper()) formatter = logging.Formatter( fmt='%(asctime)-15s - %(levelname)s - %(name)s' '[line:%(lineno)d thread:%(threadName)s(%(thread)d) ' 'process:%(processName)s(%(process)d)]' ' - %(message)s' ) logger = logging.getLogger() logger.setLevel(level) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler)
def test_get_level_name(self): """Test getLevelName returns level constant.""" # NOTE(flaper87): Bug #1517 self.assertEqual(logging.getLevelName('NOTSET'), 0) self.assertEqual(logging.getLevelName('DEBUG'), 10) self.assertEqual(logging.getLevelName('INFO'), 20) self.assertEqual(logging.getLevelName('WARN'), 30) self.assertEqual(logging.getLevelName('WARNING'), 30) self.assertEqual(logging.getLevelName('ERROR'), 40) self.assertEqual(logging.getLevelName('CRITICAL'), 50) self.assertEqual(logging.getLevelName(0), 'NOTSET') self.assertEqual(logging.getLevelName(10), 'DEBUG') self.assertEqual(logging.getLevelName(20), 'INFO') self.assertEqual(logging.getLevelName(30), 'WARNING') self.assertEqual(logging.getLevelName(40), 'ERROR') self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def log_formatter(line): split = line.split(' / ') if len(split) == 4: return { # Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime # "timestamp": date_parse(split[0] + "+00"), "level": logging.getLevelName(split[1]), "message": split[3], "process": split[2], "device_id": None, } elif len(split) == 6: return { # Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime "timestamp": date_parse(split[0]), "level": logging.getLevelName(split[1]), "message": split[5], "process": split[3], "device_id": split[4], } else: raise RuntimeError("The logs in the log file are of an unknown format, cannot continue. " "Line: {}".format(line))
def _has_streamhandler(logger, level=None, fmt=LOG_FORMAT, stream=DEFAULT_STREAM): """Check the named logger for an appropriate existing StreamHandler. This only returns True if a StreamHandler that exaclty matches our specification is found. If other StreamHandlers are seen, we assume they were added for a different purpose. """ # Ensure we are talking the same type of logging levels # if they passed in a string we need to convert it to a number if isinstance(level, basestring): level = logging.getLevelName(level) for handler in logger.handlers: if not isinstance(handler, logging.StreamHandler): continue if handler.stream is not stream: continue if handler.level != level: continue if not handler.formatter or handler.formatter._fmt != fmt: continue return True return False
def main(): config = options() logging.basicConfig(level=logging.getLevelName(config.log_level), format=config.log_format) db_connection = get_connection(config.log_service.db.servers, config.log_service.db.replica_name) collection = get_collection(config.log_service.db.name, config.log_service.db.collection, db_connection) agent_log_service = AgentLogService(config.log_service.bind_address, collection) LOG.info('Starting logging service on {}'.format( config.log_service.bind_address)) agent_log_service.start()
def main(): config = get_inventory_configuration() logging.basicConfig(level=logging.getLevelName(config.log_level), format=config.log_format) loop = zmq.asyncio.ZMQEventLoop() loop.set_debug(config.asyncio_debug) asyncio.set_event_loop(loop) s = InventoryServer(bind_address=config.inventory.bind_address, config=config) try: loop.run_until_complete(s.start()) except KeyboardInterrupt: log.info('Stopping service') s.kill() finally: pending = asyncio.Task.all_tasks(loop=loop) loop.run_until_complete(asyncio.gather(*pending)) loop.close()
def _init_logger(verbosity): # set up the logger global logger logger = logging.getLogger('conda_mirror') logmap = {0: logging.ERROR, 1: logging.WARNING, 2: logging.INFO, 3: logging.DEBUG} loglevel = logmap.get(verbosity, '3') # clear all handlers for handler in logger.handlers: logger.removeHandler(handler) logger.setLevel(loglevel) format_string = '%(levelname)s: %(message)s' formatter = logging.Formatter(fmt=format_string) stream_handler = logging.StreamHandler() stream_handler.setLevel(loglevel) stream_handler.setFormatter(fmt=formatter) logger.addHandler(stream_handler) print("Log level set to %s" % logging.getLevelName(logmap[verbosity]), file=sys.stdout)
def get_configuration(): """ get a configuration (snapshot) that can be used to call configure snapshot = get_configuration() configure(**snapshot) """ root = getLogger() name_levels = [('', logging.getLevelName(root.level))] name_levels.extend( (name, logging.getLevelName(logger.level)) for name, logger in root.manager.loggerDict.items() if hasattr(logger, 'level') ) config_string = ','.join('%s:%s' % x for x in name_levels) return dict(config_string=config_string, log_json=SLogger.manager.log_json)
def initialize_logger(name, log_level): """initializes the logger to a level with a name logger = initialize_logger(name, log_level) Parameters ---------- name : str name of the logger log_level : Returns ------- logging.Logger a logger set with the name and level specified """ level = logging.getLevelName(log_level) logging.basicConfig() logger = logging.getLogger(name) logger.setLevel(level=level) return logger
def main(): args_parser = argparse.ArgumentParser() args_parser.add_argument('-c', '--config') args = args_parser.parse_args() powergslb.system.parse_config(args.config) config = powergslb.system.get_config() logging.basicConfig( format=config.get('logging', 'format'), level=logging.getLevelName(config.get('logging', 'level')) ) service_threads = [ powergslb.monitor.MonitorThread(name='Monitor'), powergslb.server.ServerThread(name='Server') ] service = powergslb.system.SystemService(service_threads) service.start()
def configure_logging(verbosity: int, logPath: str, isDaemon=False): rootLogger = logging.getLogger() if logPath: logPath = Path(logPath).expanduser() else: name = 'i3configger-daemon.log' if isDaemon else 'i3configger.log' logPath = Path(tempfile.gettempdir()) / name if DEBUG: print('logging to %s' % logPath) level = logging.getLevelName('DEBUG') else: level = logging.getLevelName( {0: 'ERROR', 1: 'WARNING', 2: 'INFO'}.get(verbosity, 'DEBUG')) fmt = ('%(asctime)s %(name)s:%(funcName)s:%(lineno)s ' '%(levelname)s: %(message)s') if not rootLogger.handlers: logging.basicConfig(format=fmt, level=level) fileHandler = logging.FileHandler(logPath) fileHandler.setFormatter(logging.Formatter(fmt)) fileHandler.setLevel(level) rootLogger.addHandler(fileHandler)
def get_logger(name, level=INFO, fac=SysLogHandler.LOG_LOCAL1): global LOG_TRANS for lt in LOG_TRANS: if not LOG_TRANS[lt]['old']: LOG_TRANS[lt]['old'] = logging.getLevelName(lt) logging.addLevelName(lt, LOG_TRANS[lt]['new']) fmt = F('[%(name)s.%(funcName)s]: %(message)s') log = logging.getLogger('%s' % name.split('.')[-1]) h = SysLogHandler(address='/dev/log', facility=parse_fac(fac)) h.setFormatter(fmt) log.addHandler(h) # h = StreamHandler(stream=LOGBUF) # h.setFormatter(fmt) # log.addHandler(h) log.setLevel(level) log.success = lambda msg: log.log(LOG_SUCCES, msg) return log
def create_log(self, log_path=None, log_level='DEBUG'): """Create a log file for debug output. Args: log_path: Path to log file. If None or empty the log path name will be the command line invocation name (argv[0]) with a '.log' suffix in the user's home directory. log_level: Log level: 'DEBUG', 'INFO', 'WARNING', 'ERROR', or 'CRITICAL'. Default is 'DEBUG'. """ if log_path is None or not log_path: log_dir = os.path.expanduser('~') log_path = os.path.join(log_dir, sys.argv[0] + '.log') log_path = os.path.expanduser(log_path) logging.basicConfig(filename=os.path.abspath(log_path), filemode='w', level=log_level.upper()) logger = logging.getLogger(__name__) logger.info('Log started %s, level=%s', datetime.datetime.now(), logging.getLevelName(logger.getEffectiveLevel()))
def log_to_stderr(level, formatter=_LOG_FORMATTER, handler=logging.StreamHandler): """Setup logging or set logging level to STDERR. Args: level: a logging level, like logging.INFO formatter: a logging.Formatter object handler: logging.StreamHandler (this argument is for testing) """ global _STDERR_HANDLER _level = get_loglevel(level) if type(_STDERR_HANDLER) is handler: _STDERR_HANDLER.setLevel(_level) else: _STDERR_HANDLER = handler(stream=sys.stderr) _STDERR_HANDLER.setLevel(_level) _STDERR_HANDLER.setFormatter(formatter) logging.getLogger('').addHandler(_STDERR_HANDLER) logging.debug('Setting logging at level=%s', logging.getLevelName(_level))
def log_to_file(filename, level=INFO, formatter=_LOG_FORMATTER, filemode=APPEND, handler=logging.FileHandler): """Setup logging or set logging level to file. Args: filename: string of path/file to write logs level: a logging level, like logging.INFO formatter: a logging.Formatter object filemode: a mode of writing, like app.APPEND or app.CLOBBER handler: logging.FileHandler (this argument is for testing) """ global _FILE_HANDLER _level = get_loglevel(level) if type(_FILE_HANDLER) is handler: _FILE_HANDLER.setLevel(_level) else: _FILE_HANDLER = handler(filename=filename, mode=filemode) _FILE_HANDLER.setLevel(_level) _FILE_HANDLER.setFormatter(formatter) logging.getLogger('').addHandler(_FILE_HANDLER) logging.info('Logging to file %s [mode=\'%s\', level=%s]', os.path.abspath(filename), filemode, logging.getLevelName(_level))
def setup_logger(args): warnings.filterwarnings("ignore", category=TechPreviewWarning) warnings.filterwarnings("ignore", category=SeenWarning) # Set up logging according to command-line verbosity logger = logging.getLogger() logger.setLevel(int(30 - (args.loglevel * 10))) ch = logging.StreamHandler(sys.stderr) formatter = logging.Formatter(u'%(asctime)s %(name)s %(levelname)s: %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logger.info("Set logging level to {0}".format(logging.getLevelName(logger.getEffectiveLevel()))) # If log level is reduced, disable emissions from the `warnings` module; aka the if not logger.isEnabledFor(logging.WARNING): warnings.simplefilter("ignore") return logger
def checkConfig(self): if not self.interface: raise ConfigError("You must configure an interface") if not self.domain: raise ConfigError("You must configure a domain") if not self.realm: raise ConfigError("You must configure a realm") if not self.honey_username: raise ConfigError("You must configure a honeytoken username") if self.log_level.upper() not in {"CRITICAL","ERROR","WARNING","INFO","DEBUG","NOTSET"}: raise ConfigError("Invalid setting for log level") else: level = logging.getLevelName(self.log_level.upper()) logging.getLogger().setLevel(level)
def add_file_handler(self, path, name, level=None): levelname = logging.getLevelName(level) if level is not None \ else 'DEFAULT' filename = '{path}/{name}.{level}.log'.format( path=os.path.abspath(path), name=name, level=levelname) if filename not in self.file_handlers: from logging.handlers import TimedRotatingFileHandler file_handler = TimedRotatingFileHandler(filename, when="midnight", backupCount=7) self.file_handlers[filename] = file_handler if level is not None: file_handler.setLevel(level) self.add_handler(file_handler)
def make_logging_level_names_consistent(): """Rename the standard library's logging levels to match Twisted's. Twisted's new logging system in `twisted.logger` that is. """ for level in list(logging._levelToName): if level == logging.NOTSET: # When the logging level is not known in Twisted it's rendered as # a hyphen. This is not a common occurrence with `logging` but we # cater for it anyway. name = "-" elif level == logging.WARNING: # "Warning" is more consistent with the other level names than # "warn", so there is a fault in Twisted here. However it's easier # to change the `logging` module to match Twisted than vice-versa. name = "warn" else: # Twisted's level names are all lower-case. name = logging.getLevelName(level).lower() # For a preexisting level this will _replace_ the name. logging.addLevelName(level, name)
def fetch_url(cls, session, msites, platform_id, purpose): """Actual method to do fetch url action. Parameters ---------- msites : list a list of Site model class, contains info to build spiders. platform_id : int id of platform, bind fetched url with this id. purpose : {'update', 'archive'} indicate which url to fetch. """ settings = Settings(cls.conf['crawl']['scrapy']) settings.set('ITEM_PIPELINES', {'hoaxy.crawl.pipelines.UrlPipeline': 300}) process = CrawlerProcess(settings) sll = cls.conf['logging']['loggers']['scrapy']['level'] logging.getLogger('scrapy').setLevel(logging.getLevelName(sll)) for ms in msites: for sm in build_spiders_iter(ms, purpose): sm['kwargs']['session'] = session sm['kwargs']['platform_id'] = platform_id process.crawl(sm['cls'], *sm['args'], **sm['kwargs']) process.start()
def fetch_html(cls, session, url_tuples): """Actual method to do fetch html action. Parameters ---------- session : object a SQLAlchemy session object. url_tuples : list a list of url tuple (id, raw, status_code). """ settings = Settings(cls.conf['crawl']['scrapy']) settings.set('ITEM_PIPELINES', {'hoaxy.crawl.pipelines.HtmlPipeline': 300}) process = CrawlerProcess(settings) sll = cls.conf['logging']['loggers']['scrapy']['level'] logging.getLogger('scrapy').setLevel(logging.getLevelName(sll)) logger.warning('Number of url to fetch html is: %s', len(url_tuples)) process.crawl( HtmlSpider, session=session, url_tuples=url_tuples, excluded_domains=cls.conf['crawl']['excluded_domains']) process.start()
def parse_article(cls, session, url_tuples): """Actual method to do parse to article action. Parameters ---------- session : object a SQLAlchemy session object. url_tuples : list a list of url tuple (id, created_at, date_published, canonical, site_id) """ settings = Settings(cls.conf['crawl']['scrapy']) settings.set('ITEM_PIPELINES', {'hoaxy.crawl.pipelines.ArticlePipeline': 300}) process = CrawlerProcess(settings) sll = cls.conf['logging']['loggers']['scrapy']['level'] logging.getLogger('scrapy').setLevel(logging.getLevelName(sll)) logger.info('Number of url to parse is: %s', len(url_tuples)) process.crawl( ArticleParserSpider, session=session, url_tuples=url_tuples, api_key=cls.conf['crawl']['article_parser']['webparser_api_key'],) process.start()
def __init__(self, config): super(self.__class__, self).__init__() self.dirs = config.dirs self.files = config.files self.parsing_dirs = config.dirs is not None self.config = config log_level = logging.getLevelName(config.debug.upper()) log.setLevel(log_level) # mininet version stores a SeismicStatistics object to hold stats extracted from results self.stats = None # This will be set when parsing files to determine the type of experimental # results file we're working with. Can be either 'mininet' or 'networkx' self.experiment_type = None
def __init__(self, virtapi, read_only=False): super(IronicDriver, self).__init__(virtapi) global ironic if ironic is None: ironic = importutils.import_module('ironicclient') # NOTE(deva): work around a lack of symbols in the current version. if not hasattr(ironic, 'exc'): ironic.exc = importutils.import_module('ironicclient.exc') if not hasattr(ironic, 'client'): ironic.client = importutils.import_module( 'ironicclient.client') self.firewall_driver = firewall.load_driver( default='nova.virt.firewall.NoopFirewallDriver') self.node_cache = {} self.node_cache_time = 0 ironicclient_log_level = CONF.ironic.client_log_level if ironicclient_log_level: level = py_logging.getLevelName(ironicclient_log_level) logger = py_logging.getLogger('ironicclient') logger.setLevel(level) self.ironicclient = client_wrapper.IronicClientWrapper()
def parse_expressions(self, expressions): """ Parse a list of logger matching expressions of the form <regex>=<log-level>. Place the compiled regex's and levels in the expressions attribute. """ lines = expressions.split('\n') for line in lines: try: # Use the right split so we can have '='s in the regex regex, level = line.rsplit('=', 1) pattern = re.compile(regex) results = (pattern, logging.getLevelName(level.upper())) self.logger.log( TraceLogger.TRACE, 'Appending %s:%s to logger level expressions' % ( results[0], results[1])) self.expressions.append(results) except Exception, ex: self.logger.\ error('Parser error in log configuration file: %s' % ( line)) self.logger.exception(ex)
def setlogdir(logdir): '''set the log directory''' # set log color logging.addLevelName(logging.INFO, print_style('%s', fore='green') % logging.getLevelName(logging.INFO)) logging.addLevelName(logging.WARNING, print_style('%s', fore='red') % logging.getLevelName(logging.WARNING)) ldir = os.path.dirname(logdir) writelog = os.path.join(ldir, 'log.log') logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename=writelog, filemode='w') console = logging.StreamHandler() console.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s') console.setFormatter(formatter) logging.getLogger('').addHandler(console)
def logging_level(self): """ **Syntax:** logging_level=[CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET] **Description:** Sets the threshold for the logger of this command invocation. Logging messages less severe than `logging_level` will be ignored. """ return getLevelName(self._logger.getEffectiveLevel())
def _build_event_data(record): """ Build an event data dictionary from the specified log record for submission to Seq. :param record: The LogRecord. :type record: StructuredLogRecord :return: A dictionary containing event data representing the log record. :rtype: dict """ if record.args: # Standard (unnamed) format arguments (use 0-base index as property name). log_props_shim = get_global_log_properties(record.name) for (arg_index, arg) in enumerate(record.args or []): log_props_shim[str(arg_index)] = arg event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": log_props_shim } elif isinstance(record, StructuredLogRecord): # Named format arguments (and, therefore, log event properties). event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.msg, "Properties": record.log_props } else: # No format arguments; interpret message as-is. event_data = { "Timestamp": _get_local_timestamp(record), "Level": logging.getLevelName(record.levelno), "MessageTemplate": record.getMessage(), "Properties": _global_log_props } return event_data
def setup_logger(self, level=logging.INFO): """Configure global log settings""" if isinstance(level, int): self.level = logging.getLevelName(level) self.logger = logging.getLogger() self.logger.setLevel(self.level) if not len(self.logger.handlers): ch = logging.StreamHandler(stream=sys.stderr) logformat = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' formatter = logging.Formatter(logformat) ch.setFormatter(formatter) self.logger.addHandler(ch)
def get_log_level(): level = logging.getLevelName(os.environ.get('LOG_LEVEL', '').upper()) if not isinstance(level, int): level = DEFAULT_LEVEL return level
def logLevels(): return [logging.getLevelName(n) for n in xrange(0, logging.CRITICAL + 1, 10)]
def _log(self, level, msg, event=()): event = events.Event(event).union({ "logtime": time.strftime("%Y-%m-%d %H:%M:%SZ", time.gmtime()), "logmsg": msg, "loglevel": logging.getLevelName(level) }) return self._logger.log(level, msg, **{"extra": {"event": event}})
def get_config(config='~/.nyttth/config.yml'): global cfg if not cfg: cfgpath = os.path.expanduser(config) log.debug('reading config from {}'.format(cfgpath)) cfg = dict() if os.path.isfile(cfgpath): with open(cfgpath, 'r') as stream: cfg = yaml.load(stream) else: print 'config not found at {}. Create y/n?'.format(cfgpath) if propmt_yn(): import errno try: os.makedirs(os.path.dirname(cfgpath)) except OSError as e: if e.errno != errno.EEXIST: raise with open(cfgpath, 'w') as cfg_file: cfg_file.write(SAMPLE_CONFIG) print 'Sample configuration has been written to {}.\n You will need to edit '.format(cfgpath) + \ 'this configuration with real values from your networking environment. Exiting.' else: print 'Exiting' exit() if 'log_level' in cfg: # print('setting log level to {}'.format(cfg['log_level'])) log.setLevel(logging.getLevelName(cfg['log_level'])) cfg['basedir'] = os.path.dirname(cfgpath) cfg['supervisor.conf'] = os.path.join(cfg['basedir'],'supervisord.conf') return cfg
def __new__(cls, value): if value in cls._level_instances: return cls._level_instances[value] instance = int.__new__(cls, value) instance.name = logging.getLevelName(value) cls._level_instances[value] = instance return instance
def test_log_state(): lvl=logging.DEBUG msg = '{0}: State Msg'.format(logging.getLevelName(lvl)) regex = '^{0}.*STATE - {1}'.format(logging.getLevelName(lvl), msg) lpc = LinchpinCliContext() lpc.load_config(config_path) lpc.setup_logging() lpc.log_state(msg) with open(logfile) as f: line = f.readline() assert_regexp_matches(line, regex)
def test_log_info(): lvl=logging.INFO msg = 'Info Msg' regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg) lpc = LinchpinCliContext() lpc.load_config(config_path) lpc.setup_logging() lpc.log_info(msg) with open(logfile) as f: line = f.readline() assert_regexp_matches(line, regex)