Python coloredlogs 模块,install() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用coloredlogs.install()

项目:Bluecat-CSV-Importer    作者:callmehjelleh    | 项目源码 | 文件源码
def __init__(self, filename, user_input, verbose, address, username, password, configuration, upload):
        self.user_input = user_input
        self.verbose = verbose
        self.bam_client = None
        self.logger = logging.getLogger(__name__)
        self.id_list = {}
        self.address = address
        self.username = username
        self.password = password
        self.configuration = configuration
        self.filename = filename
        self.upload = upload

        if self.verbose:
            coloredlogs.install(logger=self.logger, level='DEBUG', format="%(levelname)s:%(msg)s")
        else:
            coloredlogs.install(logger=self.logger, level='WARNING', format="%(levelname)s:%(msg)s")

    # <summary>
    # Main entry point for the program
    # </summary>
    # <param name="export" type="boolean">
    # Determines application execution path (e.g. whether to import or export data to/from the BAM service)
项目:docker-deploy-kubernetes    作者:paralin    | 项目源码 | 文件源码
def __init__(self):
        parser = argparse.ArgumentParser(
                description="Deploys to kubernetes with azk.io",
                usage='''deploy <command> [<args>]

Commands:
    deploy      Builds and pushes new images, and syncs Kubernetes.
    push        Builds and pushes new images, but doesn't update Kubernetes.
    sync        Creates/updates kubernetes resources, but doesn't build/push anything.
    loadconfig  Just check the kubernetes config and connectivity.
    loadsource  Just check that the project source is mounted properly, and generate proposed Kubernetes resources.
                '''
                )
        parser.add_argument('command', help='Subcommand to run')
        self.target_setup = None
        self.api = None
        args = parser.parse_args(sys.argv[1:])
        coloredlogs.install(level=('DEBUG'), fmt='%(message)s', isatty=True)
        if not hasattr(self, args.command):
            parser.print_help()
            exit(1)
        getattr(self, args.command)()
项目:factoriommo-agent    作者:factoriommo    | 项目源码 | 文件源码
def main():
    parser = ArgParser(default_config_files=['/etc/factoriomcd.ini', '~/.factoriomcd.ini'])
    parser.add('-d', '--debug', action='store_true')
    parser.add('-v', '--verbose', action='store_true')

    parser.add('--log-file', default="/opt/factorio/server.out")

    parser.add('--server-id', default="1")

    parser.add('--rcon-host', default="localhost")
    parser.add('--rcon-password', default="asdasd")
    parser.add('--rcon-port', default=31337)

    parser.add('--ws-url', default="ws://127.0.0.1:8000/ws_v1/server_callback/1/")
    parser.add('--ws-password', default="asdasd")

    options = parser.parse_args()
    if options.verbose:
        coloredlogs.install(level='DEBUG')
        logger.debug("FactorioMCd initializing...")
    else:
        coloredlogs.install(level='INFO')

    FactorioMCd(options).run()
项目:almar    作者:scriptotek    | 项目源码 | 文件源码
def configure_logging(config):
    logging.config.dictConfig(config)

    # By default the install() function installs a handler on the root logger,
    # this means that log messages from your code and log messages from the
    # libraries that you use will all show up on the terminal.
    coloredlogs.install(level=logging.DEBUG,
                        fmt='%(asctime)s %(levelname)-8s %(message)s',
                        datefmt='%Y-%m-%d %H:%I:%S')

    logging.getLogger('requests').setLevel(logging.WARNING)
    logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING)

    if sys.stdout.isatty():
        colorama.init(autoreset=True)
    else:
        # We're being piped, so skip colors
        colorama.init(strip=True)
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def setup_logger(options):
    level = 'INFO'
    if options.verbose:
        level = 'DEBUG'
    coloredlogs.install(
        level=level,
        level_styles={
            'warn': {
                'color': 'yellow'
            },
            'error': {
                'color': 'red',
                'bold': True,
            },
        },
        fmt='%(message)s',
        isatty=True
    )
    log.debug("Logger set to DEBUG")
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def log_warn_only():
    """Drop to warning level and down to get around gen.generate() log.info
    output"""
    coloredlogs.install(
        level='WARNING',
        level_styles={
            'warn': {
                'color': 'yellow'
            },
            'error': {
                'color': 'red',
                'bold': True,
            },
        },
        fmt='%(message)s',
        isatty=True
    )
项目:qas    作者:kusha    | 项目源码 | 文件源码
def __init__(self, logging_queue=None, debug=False):
        # self.regular_formatter = logging.Formatter(logging.BASIC_FORMAT)

        self.json_formatter = logging.Formatter('\
{"time": "%(asctime)s", "level": "%(levelname)s", "data": "%(message)s"}')

        # initalize logging if isn't initialized yet
        if not hasattr(self, 'log'):
            self.log = logging.getLogger(self.__class__.__name__)

            # initialize regular logger
            # handler = logging.StreamHandler()
            # handler.setFormatter(self.regular_formatter)
            # self.log.addHandler(handler)
            level = 'DEBUG' if debug else 'INFO'
            coloredlogs.install(level=level)

            # initialize webui communication
            if logging_queue is not None:
                interface_handler = InterfaceHandler(logging_queue)
                interface_handler.setFormatter(self.json_formatter)
                self.log.addHandler(interface_handler)

            # self.log.setLevel(logging.DEBUG)
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, ws_root, config=None, ws_name=None, log_level=None):

        self._ws_root = ws_root
        self._ws_config = dict()

        if config:
            self._ws_config = config

        else:
            self.load_default_config()
            if ws_name:
                self.config['name'] = ws_name
            if log_level:
                self.config['log_level'] = log_level

        coloredlogs.install(level=self.config['log_level'])
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, workspace, preload=False):
        # Assign parameters
        coloredlogs.install(level=workspace.log_level)
        self._workspace = workspace
        self._schemas_local_master = workspace.schemas_local_master
        self._schemas_remote_master = workspace.schemas_remote_master

        self._schemas = {}

        # Configure location for schemas
        self.config_schema_locations()

        # Keep a library of loaded schemas to avoid re-loading
        self._schemas_library = dict()

        self._error_msg = ''

        # if preload, load local cached schema files
        if preload:
            self.preload_local_schemas()
项目:aem-aws-stack-builder    作者:shinesolutions    | 项目源码 | 文件源码
def get_logger(args, name = _default_logger_name):
    try:
        import coloredlogs
        coloredlogs.install(
            isatty = True,
            show_name = False,
            show_severity = False,
            level = logging.NOTSET,
            severity_to_style = {'DEBUG': {'color': 'blue'}},
        )
    except:
        logging.basicConfig(
            stream = sys.stdout,
            format = '%(asctime)s ' + ghn() + ' %(levelname)-8s %(message)s',
            datefmt = "%Y-%m-%d %H:%M:%S",
            level = logging.NOTSET,
        )
    _set_logging_level(args.quiet, args.verbose)
    return logging.getLogger(name)
项目:OpenDoor    作者:potapo    | 项目源码 | 文件源码
def log(level):
        """Log verbose setter message"""

        try:
            import logging
            import verboselogs

        except ImportError:
            sys.exit("""You need logging , verboselogs!
                            install it from http://pypi.python.org/pypi
                            or run pip install logging verboselogs""")

        # set logger level from parent class
        logger =  verboselogs.VerboseLogger('')
        # add the handlers to the logger
        logger.setLevel(getattr(logging, level))

        return logger
项目:apocalypse    作者:dhoomakethu    | 项目源码 | 文件源码
def configure_logger(
        logger=None,
        log_level='DEBUG',
        no_log=False,
        file_log=False,
        console_log=True,
        log_file=None):

    if not logger:
        logger = get_logger()

    if no_log:
        logger.setLevel(logging.ERROR)
        logger.addHandler(logging.NullHandler())
    else:
        logger.setLevel(log_level.upper())
        fmt = (
            "%(asctime)s - %(message)s"
        )
        fmtr = formatter()
        if console_log:
            if USE_COLORED_LOGS:
                coloredlogs.install(level=os.environ.get('COLOREDLOGS_LOG_LEVEL', log_level.upper()),
                                    fmt=fmt,
                                    field_styles=FIELD_STYLES,
                                    level_styles=LEVEL_STYLES,
                                    overridefmt=LEVEL_FORMATS)
            else:
                sh = logging.StreamHandler()
                sh.setFormatter(fmtr)
                sh.setLevel(log_level.upper())
                logger.addHandler(sh)

        if file_log:
            if log_file is not None:
                func_log = os.path.abspath(log_file)
                os.mkdir(os.path.dirname(func_log))
                fh = logging.FileHandler(func_log)
                fh.setFormatter(fmtr)
                fh.setLevel(log_level)
                logger.addHandler(fh)
项目:vj4    作者:vijos    | 项目源码 | 文件源码
def main():
  if not options.syslog:
    coloredlogs.install(level=logging.DEBUG if options.debug else logging.INFO,
                        fmt='[%(levelname).1s %(asctime)s %(module)s:%(lineno)d] %(message)s',
                        datefmt='%y%m%d %H:%M:%S')
  else:
    syslog.enable_system_logging(level=logging.DEBUG if options.debug else logging.INFO,
                                 fmt='vj4[%(process)d] %(programname)s %(levelname).1s %(message)s')
  logging.getLogger('sockjs').setLevel(logging.WARNING)
  url = urllib.parse.urlparse(options.listen)
  if url.scheme == 'http':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    host, port_str = url.netloc.rsplit(':', 1)
    sock.bind((host, int(port_str)))
  elif url.scheme == 'unix':
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    try:
      os.remove(url.path)
    except FileNotFoundError:
      pass
    sock.bind(url.path)
  else:
    _logger.error('Invalid listening scheme %s', url.scheme)
    return 1
  for i in range(1, options.prefork):
    pid = os.fork()
    if not pid:
      break
    else:
      atexit.register(lambda: os.kill(pid, signal.SIGTERM))
  web.run_app(app.Application(), sock=sock, access_log=None, shutdown_timeout=0)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def set_log_level(level):
    coloredlogs.install(level=level)
    logging.basicConfig(level=level)


# Main menu
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def set_log_level(level):
    coloredlogs.install(level=level)
    logging.basicConfig(level=level)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def set_log_level(level):
    coloredlogs.install(level=level)
    logging.basicConfig(level=level)


# Main menu
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def set_colored_log_level():
    global logger
    coloredlogs.install(fmt=conf.LOG_FORMAT_DEBUG, datefmt="%m%d %H:%M:%S", level=verboselogs.SPAM)
    logging.basicConfig(format=conf.LOG_FORMAT_DEBUG, level=verboselogs.SPAM)
    logger = verboselogs.VerboseLogger("dev")


# for logger color reset during test
项目:webextaware    作者:cr    | 项目源码 | 文件源码
def main():
    global logger

    args = get_args()

    if args.debug:
        coloredlogs.install(level="DEBUG")

    logger.debug("Command arguments: %s" % args)

    # Adjust file limits
    from_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
    (soft_limit, hard_limit) = from_limit
    soft_limit = min(10000, hard_limit)
    to_limit = (soft_limit, hard_limit)
    logger.debug("Raising open file limit from %s to %s" % (repr(from_limit), repr(to_limit)))
    resource.setrlimit(resource.RLIMIT_NOFILE, to_limit)

    try:
        result = modes.run(args)

    except KeyboardInterrupt:
        logger.critical("User abort")
        return 5

    if result != 0:
        logger.error("Command failed")
        return result
项目:ttn-coverage-tracker    作者:ttn-zh    | 项目源码 | 文件源码
def main():
    args = parse_args()

    loglevel = logging.INFO if args.verbose is None else logging.DEBUG
    logging.basicConfig(level=loglevel)
    coloredlogs.install()

    import_new_features(args.node_eui, args.exclude)
项目:rpl-attacks    作者:dhondta    | 项目源码 | 文件源码
def set_logging(lvl='info'):
    global logger
    try:
        if not (isinstance(lvl, int) and lvl in LOG_LEVELS.values()):
            lvl = LOG_LEVELS[lvl]
    except KeyError:
        return False
    logging.basicConfig(format=LOG_FORMAT, level=lvl)
    if coloredlogs is not None:
        coloredlogs.install(lvl, fmt=LOG_FORMAT)
    if logger is not None:
        logger.setLevel(lvl)
    return True
项目:yajuu    作者:vivescere    | 项目源码 | 文件源码
def __init__(self, media):
        import cfscrape

        self.session = cfscrape.create_scraper()
        self.media = media
        self.links = []
        self.sources = SourceList()

        self.logger = logging.getLogger(
            __name__ + '.' + self.__class__.__name__
        )

        formatter = logging.Formatter(
            '%(levelname)s - \033[92m{}\033[0m: %(message)s'.format(
                self.__class__.__name__
            )
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)

        # Prevent the messages from being propagated to the root logger
        self.logger.propagate = 0

        self.logger.addHandler(handler)
        coloredlogs.install(level='DEBUG')
项目:chat-roulette-python    作者:ph4r05    | 项目源码 | 文件源码
def client(ip, port, message):
    coloredlogs.install()
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    try:
        print('sending %s to %s' % (message, (ip, port)))
        sock.sendall(message)
        response = sock.recv(1024)
        print "Received: {}".format(response)
    finally:
        sock.close()
项目:wechannel    作者:PegasusWang    | 项目源码 | 文件源码
def logged(class_, use_colore=True):
    """logged decorator.

    :param class_: add 'logger' attribute to class
    """
    coloredlogs.install(level=logging.INFO) if use_colore else None
    logging.getLogger("requests").setLevel(logging.WARNING)
    #  logging.basicConfig(level=logging.INFO, format='%(lineno)d %(message)s')
    class_.logger = logging.getLogger(class_.__name__)
    return class_
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, workspace, prj_root, config=None):
        coloredlogs.install(level=workspace.log_level)
        self._prj_root = prj_root
        self._workspace = workspace
        if config:
            self._prj_config = config
        else:
            self.load_default_config()
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def log_level(self, value):
        self.config['log_level'] = value
        coloredlogs.install(level=value)
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, workspace=None):
        """
        Initialize the Validator.
        A workspace may be provided for an easy parameter configuration,
        such as location and extension of descriptors, verbosity level, etc.
        :param workspace: SONATA workspace object
        """
        self._workspace = workspace
        self._syntax = True
        self._integrity = True
        self._topology = True

        # create "virtual" workspace if not provided (don't actually create
        # file structure)
        if not self._workspace:
            self._workspace = Workspace('.', log_level='info')

        # load configurations from workspace
        self._dext = self._workspace.default_descriptor_extension
        self._dpath = '.'
        self._log_level = self._workspace.log_level

        # for package signature validation
        self._pkg_signature = None
        self._pkg_pubkey = None

        # configure logs
        coloredlogs.install(level=self._log_level)

        # descriptors storage
        self._storage = DescriptorStorage()

        # syntax validation
        self._schema_validator = SchemaValidator(self._workspace, preload=True)

        # reset event logger
        evtlog.reset()

        self.source_id = None

        self._fwgraphs = dict()
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def configure(self, syntax=None, integrity=None, topology=None,
                  dpath=None, dext=None, debug=None, pkg_signature=None,
                  pkg_pubkey=None):
        """
        Configure parameters for validation. It is recommended to call this
        function before performing a validation.
        :param syntax: specifies whether to validate syntax
        :param integrity: specifies whether to validate integrity
        :param topology: specifies whether to validate network topology
        :param dpath: directory to search for function descriptors (VNFDs)
        :param dext: extension of descriptor files (default: 'yml')
        :param debug: increase verbosity level of logger
        :param pkg_signature: String package signature to be validated
        :param pkg_pubkey: String package public key to verify signature
        """
        # assign parameters
        if syntax is not None:
            self._syntax = syntax
        if integrity is not None:
            self._integrity = integrity
        if topology is not None:
            self._topology = topology
        if dext is not None:
            self._dext = dext
        if dpath is not None:
            self._dpath = dpath
        if debug is True:
            self._workspace.log_level = 'debug'
            coloredlogs.install(level='debug')
        if debug is False:
            self._workspace.log_level = 'info'
            coloredlogs.install(level='info')
        if pkg_signature is not None:
            self._pkg_signature = pkg_signature
        if pkg_pubkey is not None:
            self._pkg_pubkey = pkg_pubkey
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, args):
        self.start_time = time.time()
        self.service_experiments = list()
        self.function_experiments = list()
        self.generated_services = list()
        # arguments
        self.args = args
        self.args.ped = os.path.join(os.getcwd(), self.args.ped)
        self.work_dir = self.args.work_dir
        self.output_dir = self.args.output_dir
        # logging setup
        coloredlogs.install(level="DEBUG" if args.verbose else "INFO")
        LOG.info("SONATA profiling tool initialized")
        LOG.debug("Arguments: %r" % self.args)
项目:ph0neutria    作者:phage-nz    | 项目源码 | 文件源码
def getModuleLogger(moduleName):
    logger = logging.getLogger(moduleName)
    logger.setLevel(logging.INFO)
    coloredlogs.install(level='INFO')
    return logger
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def run_pipeline():
    pipeline = Pipeline(
        'example1',
        [
            GenerateFile,
            HashFile,
            RemoveFile,
        ]
    )
    coloredlogs.install(logging.DEBUG)
    for x in range(300):
        gevent.spawn(pipeline.backend.enqueue_job, Job.new(dict(job_type='generate-file')))

    pipeline.loop()
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def run_generate_file_step():
    item = GenerateFile()
    coloredlogs.install(logging.DEBUG)
    item.loop()
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def run_hash_file_step():
    step = HashFile()
    coloredlogs.install(logging.DEBUG)
    step.loop()
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def run_remove_file_step():
    step = RemoveFile()
    coloredlogs.install(logging.DEBUG)
    step.loop()
项目:triple-gan    作者:zhenxuan00    | 项目源码 | 文件源码
def build_log_file(cfg):
    FORMAT="%(asctime)s;%(levelname)s|%(message)s"
    DATEF="%H-%M-%S"
    logging.basicConfig(formatter=FORMAT, level=logging.DEBUG)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler(filename=os.path.join(cfg['outfolder'], 'logfile'+time.strftime("%m-%d")+'.log'))
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s;%(levelname)s|%(message)s", "%H:%M:%S")
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    LEVEL_STYLES = dict(
        debug=dict(color='magenta'),
        info=dict(color='green'),
        verbose=dict(),
        warning=dict(color='blue'),
        error=dict(color='yellow'),
        critical=dict(color='red',bold=True))
    coloredlogs.install(level=logging.DEBUG, fmt=FORMAT, datefmt=DATEF, level_styles=LEVEL_STYLES)


    args_dict = cfg
    sorted_args = sorted(args_dict.items(), key=operator.itemgetter(0))
    logging.info('######################################################')
    logging.info('# --Configurable Parameters In this Model--')
    for name, val in sorted_args:
        logging.info("# " + name + ":\t" + str(get_list_name(val)))
    logging.info('######################################################')
项目:OpenDoor    作者:potapo    | 项目源码 | 文件源码
def info(message, showtime=True, title=True):
        """Info level message"""

        level = 'INFO'
        if True == showtime:
            coloredlogs.install(level=level, fmt='[%(asctime)s] %(levelname)s : %(message)s')
        else:
            coloredlogs.install(level=level, fmt='%(message)s')
        logger = Logger.log(level);
        logger.info(message);
        pass
项目:OpenDoor    作者:potapo    | 项目源码 | 文件源码
def critical(message, showtime=True, title=True):
        # type: (object, object, object) -> object
        """Critical level message"""

        level = 'CRITICAL'
        if True == showtime:
            coloredlogs.install(level=level, fmt='[%(asctime)s] %(levelname)s : %(message)s')
        else:
            coloredlogs.install(level=level, fmt='%(message)s')
        logger = Logger.log(level);
        sys.exit(logger.critical(message));
项目:OpenDoor    作者:potapo    | 项目源码 | 文件源码
def debug(message, showtime=True):
        """Debug level message"""

        level = 'DEBUG'
        if True == showtime:
            coloredlogs.install(level=level, fmt='[%(asctime)s] %(levelname)s : %(message)s')
        else:
            coloredlogs.install(level=level, fmt='%(message)s')
        logger = Logger.log(level);
        logger.debug(message);
        pass
项目:jenskipper    作者:Stupeflix    | 项目源码 | 文件源码
def main(log_level):
    '''
    Pilot Jenkins from the command line.
    '''
    coloredlogs.install(
        fmt='%(levelname)s %(message)s',
        level=log_level
    )
项目:plural    作者:gabrielfalcao    | 项目源码 | 文件源码
def execute_plural_server():  # pragma: no cover
    parser = argparse.ArgumentParser(
        prog='plural-server',
        description='ZMQ Reply Server that executes queries')

    parser.add_argument(
        '-b', '--reply-bind-addr',
        default='tcp://*:6000',
        help='ZMQ address to bind to'
    )

    parser.add_argument(
        '--color',
        action='store_true',
        default=True,
        help='colored logs'
    )
    parser.add_argument(
        '-l', '--log-level',
        default='DEBUG',
        choices=('DEBUG', 'INFO', 'WARNING')
    )

    args = parser.parse_args()
    server = GraphServer()
    if args.color:
        coloredlogs.install(level=args.log_level)

    logging.warning('EXPERIMENTAL FEATURE: server')
    try:
        server.run(args.reply_bind_addr)
    except KeyboardInterrupt:
        server.stop()
        sys.stdout.write("\r")
        sys.stderr.write("\r")
        sys.stdout.flush()
        sys.stderr.flush()
项目:wechat2instapaper    作者:chanjh    | 项目源码 | 文件源码
def main():
    webwx = MyWXBot()

    logger = logging.getLogger(__name__)
    import coloredlogs
    coloredlogs.install(level='DEBUG')


    webwx.start()
项目:vj4    作者:vijos    | 项目源码 | 文件源码
def invoke_by_args():
  import argparse
  import asyncio
  import coloredlogs
  import inspect
  import pprint
  coloredlogs.install(fmt='[%(levelname).1s %(asctime)s %(module)s:%(lineno)d] %(message)s',
                      datefmt='%y%m%d %H:%M:%S')
  parser = argparse.ArgumentParser()
  subparsers = parser.add_subparsers(dest='')
  for name, method in _methods.items():
    subparser = subparsers.add_parser(name)
    argcount = method.__code__.co_argcount
    num_defaults = len(method.__defaults__) if method.__defaults__ else 0
    argoffset = argcount - num_defaults
    for index, argname in enumerate(method.__code__.co_varnames[:argcount]):
      if index < argoffset:
        subparser.add_argument(argname, type=method.__annotations__[argname])
      elif argname in method.__annotations__:
        subparser.add_argument(argname,
                               type=method.__annotations__[argname],
                               nargs='?',
                               default=method.__defaults__[index - argoffset])
  args = parser.parse_args(options.leftovers)
  name = getattr(args, '')
  delattr(args, '')
  if not name:
    parser.print_help()
  else:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(db.init())
    try:
      result = _methods[name](**vars(args))
      if inspect.iscoroutine(result):
        result = loop.run_until_complete(result)
      if options.pretty:
        print_func = pprint.pprint
      else:
        print_func = lambda x: print(x) if x is not None else None
      if hasattr(result, '__aiter__'):
        async def aloop():
          async for entry in result:
            print_func(entry)

        loop.run_until_complete(aloop())
      else:
        print_func(result)
    except KeyboardInterrupt:
      pass
    finally:
      loop.set_exception_handler(lambda loop, context: None)
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    cmd_name, cmd_args = parseopts(args)
    if cmd_name == '--version':
        print(aetros.const.__version__)
        sys.exit(0)

    from aetros.commands.ApiCommand import ApiCommand
    from aetros.commands.PushJobCommand import PushJobCommand
    from aetros.commands.PullJobCommand import PullJobCommand
    from aetros.commands.ServerCommand import ServerCommand
    from aetros.commands.PredictCommand import PredictCommand
    from aetros.commands.PredictionServerCommand import PredictionServerCommand
    from aetros.commands.StartCommand import StartCommand
    from aetros.commands.StartSimpleCommand import StartSimpleCommand
    from aetros.commands.RunCommand import RunCommand
    from aetros.commands.AddCommand import AddCommand
    from aetros.commands.InitCommand import InitCommand
    from aetros.commands.IdCommand import IdCommand
    from aetros.commands.GPUCommand import GPUCommand
    from aetros.commands.AuthenticateCommand import AuthenticateCommand

    commands_dict = {
        'start': StartCommand,
        'start-simple': StartSimpleCommand,
        'authenticate': AuthenticateCommand,
        'predict': PredictCommand,
        'prediction-server': PredictionServerCommand,
        'server': ServerCommand,
        'run': RunCommand,
        'api': ApiCommand,
        'id': IdCommand,
        'push-job': PushJobCommand,
        'pull-job': PullJobCommand,
        'add': AddCommand,
        'init': InitCommand,
        'gpu': GPUCommand,
    }

    if cmd_name not in commands_dict:
        print(("Command %s not found" % (cmd_name,)))
        sys.exit(1)

    level = 'INFO'
    if '--debug' in args or os.getenv('DEBUG') == '1':
        level = 'DEBUG'

    atty = None
    if '1' == os.getenv('AETROS_ATTY'):
        atty = True

    import coloredlogs
    logger = logging.getLogger('aetros-'+cmd_name)
    coloredlogs.install(level=level, logger=logger, isatty=atty)
    command = commands_dict[cmd_name](logger)

    code = command.main(cmd_args)
    sys.exit(code)
项目:bridgy    作者:wagoodman    | 项目源码 | 文件源码
def main():
    coloredlogs.install(fmt='%(message)s')

    if os.geteuid() == 0:
        logger.error("Do not run this as root")
        sys.exit(1)

    config = cfg.Config()
    config.create()
    config.read()
    config.verify()

    version = 'bridgy %s' % __version__
    args = docopt(__doc__, version=version)

    if not tmux.is_installed():
        if args ['--tmux'] or config.dig('ssh', 'tmux'):
            logger.warn("Tmux not installed. Cannot support split screen.")
        args['--tmux'] = False

    if args['-v']:
        coloredlogs.install(fmt='%(message)s', level='DEBUG')

    if args['-d']:
        args['-v'] = True
        coloredlogs.install(fmt='%(message)s', level='DEBUG')
        logger.warn("Performing dry run, no actions will be taken.")

    # why doesn't docopt pick up on this?
    if args['-t']:
        args['--tmux'] = True

    if args['--version']:
        logger.info(version)
        sys.exit(0)

    opts = {
        'ssh': ssh_handler,
        'mount': mount_handler,
        'list-mounts': list_mounts_handler,
        'list-inventory': list_inventory_handler,
        'unmount': unmount_handler,
        'update': update_handler,
        'run': run_handler,
    }

    for opt, handler in list(opts.items()):
        if args[opt]:
            try:
                handler(args, config)
            except utils.UnsupportedPlatform as ex:
                logger.error(ex.message)
                sys.exit(1)
项目:python-qpass    作者:xolox    | 项目源码 | 文件源码
def main():
    """Command line interface for the ``qpass`` program."""
    # Initialize logging to the terminal.
    coloredlogs.install()
    # Prepare for command line argument parsing.
    action = show_matching_entry
    program_opts = dict()
    show_opts = dict(use_clipboard=is_clipboard_supported())
    # Parse the command line arguments.
    try:
        options, arguments = getopt.gnu_getopt(sys.argv[1:], 'elnp:vqh', [
            'edit', 'list', 'no-clipboard', 'password-store=',
            'verbose', 'quiet', 'help',
        ])
        for option, value in options:
            if option in ('-e', '--edit'):
                action = edit_matching_entry
            elif option in ('-l', '--list'):
                action = list_matching_entries
            elif option in ('-n', '--no-clipboard'):
                show_opts['use_clipboard'] = False
            elif option in ('-p', '--password-store'):
                stores = program_opts.setdefault('stores', [])
                stores.append(PasswordStore(directory=value))
            elif option in ('-v', '--verbose'):
                coloredlogs.increase_verbosity()
            elif option in ('-q', '--quiet'):
                coloredlogs.decrease_verbosity()
            elif option in ('-h', '--help'):
                usage(__doc__)
                return
            else:
                raise Exception("Unhandled option! (programming error)")
        if not (arguments or action == list_matching_entries):
            usage(__doc__)
            return
    except Exception as e:
        warning("Error: %s", e)
        sys.exit(1)
    # Execute the requested action.
    try:
        action(QuickPass(**program_opts), arguments,
               **(show_opts if action == show_matching_entry else {}))
    except PasswordStoreError as e:
        # Known issues don't get a traceback.
        logger.error("%s", e)
        sys.exit(1)
    except KeyboardInterrupt:
        # If the user interrupted an interactive prompt they most likely did so
        # intentionally, so there's no point in generating more output here.
        sys.exit(1)
项目:axibot    作者:storborg    | 项目源码 | 文件源码
def main(args=sys.argv):
    p = argparse.ArgumentParser(description='Print with the AxiDraw.')
    p.add_argument('--verbose', action='store_true')
    p.add_argument('--mock', action='store_true')
    p.set_defaults(function=None)

    subparsers = p.add_subparsers(help='sub-command help')

    p_plan = subparsers.add_parser(
        'plan', help='Plan the plotting actions for an SVG file.')
    p_plan.add_argument('filename')
    p_plan.add_argument('--out')
    p_plan.add_argument('--overwrite', action='store_true')
    p_plan.set_defaults(function=plan)

    p_plot = subparsers.add_parser(
        'plot', help='Plot an SVG file directly.')
    p_plot.add_argument('filename')
    p_plot.set_defaults(function=plot)

    p_info = subparsers.add_parser(
        'info', help='Print information about an SVG file.')
    p_info.add_argument('filename')
    p_info.set_defaults(function=info)

    p_server = subparsers.add_parser(
        'server', help='Run a server for remote plotting.')
    p_server.add_argument('--port', type=int, default=8888)
    p_server.set_defaults(function=server)

    p_manual = subparsers.add_parser(
        'manual', help='Manual control shell.')
    p_manual.add_argument('cmd', nargs='*')
    p_manual.set_defaults(function=manual)

    opts, args = p.parse_known_args(args[1:])

    if coloredlogs:
        coloredlogs.install(level='DEBUG' if opts.verbose else 'INFO')
    else:
        logging.basicConfig(level=logging.DEBUG if opts.verbose else
                            logging.INFO)

    if opts.function:
        return opts.function(opts)
    else:
        p.print_help()
项目:chat-roulette-python    作者:ph4r05    | 项目源码 | 文件源码
def app_main(self):
        # Backup original arguments for later parsing
        args_src = sys.argv

        # Parse our argument list
        parser = argparse.ArgumentParser(description='Chat roulette python server')
        parser.add_argument('-n', '--non-interactive', dest='noninteractive', action='store_const', const=True,
                            help='non-interactive mode of operation, command line only')
        parser.add_argument('-r', '--attempts', dest='attempts', type=int, default=3,
                            help='number of attempts in non-interactive mode')
        parser.add_argument('-l','--pid-lock', dest='pidlock', type=int, default=-1,
                            help='number of attempts for pidlock acquire')
        parser.add_argument('--debug', dest='debug', action='store_const', const=True,
                            help='enables debug mode')
        parser.add_argument('--verbose', dest='verbose', action='store_const', const=True,
                            help='enables verbose mode')
        parser.add_argument('--force', dest='force', action='store_const', const=True, default=False,
                            help='forces some action')

        parser.add_argument('commands', nargs=argparse.ZERO_OR_MORE, default=[],
                            help='commands to process')

        self.args = parser.parse_args(args=args_src[1:])
        self.noninteractive = self.args.noninteractive

        # Fixing cmd2 arg parsing, call cmdLoop
        sys.argv = [args_src[0]]
        for cmd in self.args.commands:
            sys.argv.append(cmd)

        # Terminate after execution is over on the non-interactive mode
        if self.noninteractive:
            sys.argv.append('quit')

        if self.args.debug:
            coloredlogs.install(level=logging.DEBUG)

        self.cmdloop()
        sys.argv = args_src

        # Noninteractive - return the last result from the operation (for scripts)
        if self.noninteractive:
            sys.exit(self.last_result)
项目:wechannel    作者:PegasusWang    | 项目源码 | 文件源码
def retry(retries=CONFIG.CRAWLER.RETRY or 3, sleep=CONFIG.CRAWLER.SLEEP,
          changeip=False):
    """??????????????????????retrying
    pip install retrying
    https://github.com/rholder/retrying
    ??????????
    http://mp.weixin.qq.com/s?__biz=MzAwMDU1MTE1OQ==&mid=2653547274&idx=1&sn=52e5037b163146c1656eedce2da1ecd8&scene=1&srcid=0527MEXhNRZATtlTPhinD5Re#rd

    :param retries: number int of retry times.
    301 Moved Temporarily
    401 Unauthorized
    403 Forbidden
    404 Not Found
    408 Request Timeout
    429 Too Many Requests
    503 Service Unavailable
    """
    def _retry(func):
        @wraps(func)
        def _wrapper(*args, **kwargs):
            index = 0
            while index < retries:
                index += 1
                try:
                    response = func(*args, **kwargs)
                    if response.status_code in (301, 302, 404, 500):
                        print('status_code', response.status_code)
                        break
                    elif response.status_code != 200:
                        print(response.status_code)
                        if changeip:
                            change_ip()
                        continue
                    else:
                        break
                except Exception as e:
                    # ???????????
                    # traceback.print_exc()
                    response = None
                    if isinstance(e, Timeout):
                        if sleep is not None:
                            time.sleep(sleep + randint(1, 10))
                        continue
                    elif isinstance(e, TooManyRedirects):
                        break

            return response
        return _wrapper
    return _retry
项目:openadms-node    作者:dabamos    | 项目源码 | 文件源码
def setup_logging(is_quiet: bool = False,
                  is_debug: bool = False,
                  verbosity: int = 6,
                  log_file: str = 'openadms.log') -> None:
    """Setups the logger and logging handlers.

    Args:
        is_quiet: Disable output.
        is_debug: Print debug messages.
        verbosity: Verbosity level (1 - 9).
        log_file: Path of the log file.
    """
    # Enable verbose logging.
    verboselogs.install()

    # Basic logging configuration.
    console_level = logging.DEBUG if is_debug else logging.INFO
    root.setLevel(console_level)

    fmt = '%(asctime)s - %(levelname)8s - %(name)26s - %(message)s'
    date_fmt = '%Y-%m-%dT%H:%M:%S'
    formatter = logging.Formatter(fmt, date_fmt)

    # File handler.
    file_level = {
        1: logging.CRITICAL,
        2: logging.ERROR,
        3: verboselogs.SUCCESS,
        4: logging.WARNING,
        5: verboselogs.NOTICE,
        6: logging.INFO,
        7: verboselogs.VERBOSE,
        8: logging.DEBUG,
        9: verboselogs.SPAM
    }.get(verbosity, 6)

    fh = logging.handlers.RotatingFileHandler(log_file,
                                              maxBytes=LOG_FILE_MAX_SIZE,
                                              backupCount=LOG_FILE_BACKUP_COUNT,
                                              encoding='utf8')
    fh.setLevel(file_level)
    fh.setFormatter(formatter)
    root.addHandler(fh)

    if is_quiet:
        # Silence logger output.
        console_level = 100

    # Colorized output of log messages.
    coloredlogs.install(level=console_level,
                        fmt=fmt,
                        datefmt=date_fmt,
                        logger=root)

    # Add filter to log handlers, to exclude log messages from HBMQTT.
    for handler in logging.root.handlers:
        handler.addFilter(RootFilter())