Python argparse 模块,Namespace() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用argparse.Namespace()

项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def _parse_args(args):
    parser = argparse.ArgumentParser()

    if any([arg == '--version' for arg in args]):
        return argparse.Namespace(version=True)

    parser.add_argument('script', help='Script to run')
    parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'')

    parser.add_argument('--version', action='store_true', help='Print version info and exit')
    parser.add_argument('--clear', action='store_true', help='Clear output directory')
    parser.add_argument('--clear-cache', action='store_true', help='Clear cache before compiling')

    parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2')
    parser.add_argument('--no-threading', '-nt', action='store_true', help='Disable multithreaded compiling')

    # TODO: Make target '*' instead of '?' so multiple targets could be ran from the same command

    return parser.parse_args(args)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def parse(self, argv):
        """
        Parse arguments.

        :param argv: arguments.
        """
        values = dict()

        self.data = None
        self._errors = list()

        for idx, param in enumerate(self.params):
            try:
                values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx)
            except ParamException as e:
                self._errors.append(str(e))

        self.data = Namespace(**values)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def list_action_remove(self, player, values, map_dictionary, view, **kwargs):
        # Check permission.
        if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'):
            await self.instance.chat(
                '$f00You don\'t have the permission to perform this action!',
                player
            )
            return

        # Ask for confirmation.
        cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format(
            map_dictionary['name']
        ), size='sm'))
        if cancel is True:
            return

        # Simulate command.
        await self.remove_map(player, Namespace(nr=map_dictionary['id']))

        # Reload parent view.
        await view.refresh(player)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def run_with_args(args, parser):
    # type: (argparse.Namespace, argparse.ArgumentParser) -> int
    set_logging_parameters(args, parser)
    start_time = time.time()
    ret = OK
    try:
        if args.profile:
            outline("Profiling...")
            profile("ret = whatstyle(args, parser)", locals(), globals())
        else:
            ret = whatstyle(args, parser)
    except IOError as exc:
        # If the output is piped into a pager like 'less' we get a broken pipe when
        # the pager is quit early and that is ok.
        if exc.errno == errno.EPIPE:
            pass
        elif str(exc) == 'Stream closed':
            pass
        else:
            raise
        if not PY2:
            sys.stderr.close()
    iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
    return ret
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def test_start_object(self):
        server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
                                                   html=False, level=6, command=["radamsa"], stdin=True,
                                                   json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
                                                   parameters=[], notify=False, debug=False, content_type="text/plain",
                                                                    utf8=False, nologo=True)))
        server.run()
        json_http = urllib2.urlopen("http://127.0.0.1:8080").read()
        try:
            import requests
            requests.packages.urllib3.disable_warnings()
            json_https = requests.get('https://127.0.0.1:8443', verify=False).content
            self.assertTrue(json_https)
        except ImportError:
            pass
        self.assertTrue(json_http)
        server.stop()
项目:NoWannaNoCry    作者:dolang    | 项目源码 | 文件源码
def cli_args():
    """Parse the command line arguments.

    :return: The parsed arguments.
    :rtype: argparse.Namespace
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--check', action='store_true',
                        help="check if the system is vulnerable to WCry")
    parser.add_argument('-m', '--mitigate', action='store_true',
                        help="mitigate the system's vulnerability by disabling the"
                             " SMBv1 protocol, if necessary; implies --check")
    parser.add_argument('-f', '--fix', action='store_true')
    parser.add_argument('--download-directory',
                        help="Optionally specify a directory where the Microsoft"
                             " KB update is saved when using --fix")
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit(1)
    # else:
    return parser.parse_args()
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def register_command(handler: Callable[[argparse.Namespace], None],
                     main_parser: Optional[ArgParserType]=None,
                    ) -> Callable[[argparse.Namespace], None]:
    if main_parser is None:
        main_parser = global_argparser
    if id(main_parser) not in _subparsers:
        subparsers = main_parser.add_subparsers(title='commands',
                                                dest='command')
        _subparsers[id(main_parser)] = subparsers
    else:
        subparsers = _subparsers[id(main_parser)]

    @functools.wraps(handler)
    def wrapped(args):
        handler(args)

    doc_summary = handler.__doc__.split('\n\n')[0]
    inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
                                         description=handler.__doc__,
                                         help=doc_summary)
    inner_parser.set_defaults(function=wrapped)
    wrapped.register_command = functools.partial(register_command,
                                                 main_parser=inner_parser)
    wrapped.add_argument = inner_parser.add_argument
    return wrapped
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __call__(
            self,
            parser,  # type: argparse.ArgumentParser
            namespace,  # type: argparse.Namespace
            values,  # type: Union[ARGPARSE_TEXT, Sequence[Any], None]
            option_string=None  # type: Optional[ARGPARSE_TEXT]
    ):
        # type: (...) -> None
        """Checks to make sure that the destination is empty before writing.

        :raises parser.error: if destination is already set
        """
        if getattr(namespace, self.dest) is not None:  # type: ignore # typeshed doesn't know about Action.dest yet?
            parser.error('{} argument may not be specified more than once'.format(option_string))
            return
        setattr(namespace, self.dest, values)  # type: ignore # typeshed doesn't know about Action.dest yet?
项目:Static-UPnP    作者:nigelb    | 项目源码 | 文件源码
def start(self):
        self.setup_sockets()
        import StaticUPnP_Settings
        permissions = Namespace(**StaticUPnP_Settings.permissions)
        print(permissions)
        if permissions.drop_permissions:
            self.drop_privileges(permissions.user, permissions.group)

        self.running = Value(ctypes.c_int, 1)
        self.queue = Queue()
        self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running))
        self.reciever_thread.start()
        self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,))
        self.schedule_thread.start()
        self.response_thread = Process(target=self.response_handler, args=(self.queue, self.running))
        self.response_thread.start()
项目:Static-UPnP    作者:nigelb    | 项目源码 | 文件源码
def get_interface_addresses(logger):
    import StaticUPnP_Settings
    interface_config = Namespace(**StaticUPnP_Settings.interfaces)
    ip_addresses = StaticUPnP_Settings.ip_addresses
    if len(ip_addresses) == 0:
        import netifaces
        ifs = netifaces.interfaces()
        if len(interface_config.include) > 0:
            ifs = interface_config.include
        if len(interface_config.exclude) > 0:
            for iface in interface_config.exclude:
                ifs.remove(iface)

        for i in ifs:
            addrs = netifaces.ifaddresses(i)
            if netifaces.AF_INET in addrs:
                for addr in addrs[netifaces.AF_INET]:
                    ip_addresses.append(addr['addr'])
                    logger.info("Regestering multicast on %s: %s"%(i, addr['addr']))
    return ip_addresses
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def parse_args() -> argparse.Namespace:  # pragma: no cover
    """
    Parses the Command Line Arguments using argparse

    :return: The parsed arguments
    """

    parser = argparse.ArgumentParser()
    parser.add_argument("connection", help="The Type of Connection to use")

    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Activates verbose output")
    parser.add_argument("-d", "--debug", action="store_true",
                        help="Activates debug-level logging output")
    parser.add_argument("-q", "--quiet", action="store_true",
                        help="Disables all text output")
    parser.add_argument("-c", "--config",
                        default=os.path.join(os.path.expanduser("~"),
                                             ".kudubot"),
                        help="Overrides the configuration directory location")

    return parser.parse_args()
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_init_2(tmpdir):
    "it should open sam file if provided"

    make_bam(tmpdir.strpath, """
             123456789_123456789_
        r1 + ...........
        r1 -       ......*....
        r2 +    .........*.
        r2 -        .....*.......
    """)

    o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)

    init(o)

    assert isinstance(o.cfdna, AlignmentFile)
    assert o.gdna == None
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_init_3(tmpdir):
    "it should generate a proper output file name if not provided"

    make_bam(tmpdir.strpath, """
             123456789_123456789_
        r1 + ...........
        r1 -       ......*....
        r2 +    .........*.
        r2 -        .....*.......
    """)

    o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)

    init(o)

    assert o.output != None
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_3():
    "it should ignore when 3+ reads share the same name"

    o = Namespace(verbos=False, qual=20, mismatch_limit=-1)

    reads = (
        ("r1", 'A', 60, 2, 11, -1, 2, 9,  False, True),
        ("r1", 'C', 60, 2, 11, -1, 2, -9, True, True),
        ("r1", 'C', 60, 2, 11, -1, 2, 9,  False,  True),
        ("r2", 'C', 60, 2, 11, -1, 0, 0,  True,  False)
    )

    unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads)

    assert len(unique_pairs) == 0
    assert len(unique_single) == 1
    assert nerror == 3
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_4():
    "it should ignore when base in overlap area inconsistent between two reads"

    o = Namespace(verbos=False, qual=20, mismatch_limit=-1)

    reads = (
        ("r1", 'A', 60, 2, 11, -1, 4, 11,  False, True),
        ("r1", 'C', 60, 4, 13, -1, 2, -11, True, True),
        ("r2", 'C', 60, 3, 12, -1, 5, 11,  False, True),
        ("r2", 'C', 60, 5, 14, -1, 3, -11, True, True)
    )

    unique_pairs, unique_single, *_, ninconsis = aggregate_reads(o, reads)

    assert len(unique_pairs) == 1
    assert ninconsis == 2
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_aggregate_reads_5():
    "it should drop reads that has too much mismatch"

    o = Namespace(verbos=False, qual=20, mismatch_limit=2)

    reads = (
        ("r1", 'C', 60, 2, 11, 1, 4, 11,  False, True),
        ("r1", 'C', 60, 4, 13, 1, 2, -11, True, True),
        ("r2", 'C', 60, 3, 12, 3, 5, 11,  False, True),
        ("r2", 'C', 60, 5, 14, 1, 3, -11, True, True),
        ("r3", 'C', 60, 6, 14, 3, 0, 0, True, False),
        ("r4", 'C', 60, 7, 14, 1, 0, 0, True, False)
    )

    unique_pairs, unique_single, *_, nlowq, ninconsis = aggregate_reads(o, reads)

    assert len(unique_pairs) == 1
    assert len(unique_single) == 1
    assert nlowq == 3
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_count_different_type_1():
    "it should NOT count dna that more than 10% reads have different bases"

    o = Namespace(verbos=False, allow_inconsist=False)

    pair = {
        ('ref', 2, True): [
            ('A', 60),
            ('T', 60),
            ('T', 60)
        ],
        ('ref', 3, False): [
            ('A', 60),
            ('T', 60),
            ('T', 60)
        ]
    }

    mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A')

    assert inconsis == 6
    assert sum((mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa)) == 0
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_get_reads_1(tmpdir):
    "it should get all but only the reads that covers the given position"

    make_bam(tmpdir.strpath, """
             123456789_123456789_12
        r1 + ...........
        r1 -      ......*....
        r2 +   .........*.
        r2 -       .....*.......
        r3 +       ...........
        r3 -            ....*......
        r4 +       ...........
        r4 -            ...........
             123456789_123456789_12
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') )  == 2
    assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7
    assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_get_reads_2(tmpdir):
    "it should read properties correctly"

    make_bam(tmpdir.strpath, """
             123456789_123
        r1 + ...*.......
        r1 -   .*.........
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    r = next(get_reads(o, sam, 'ref', '4'))

    assert r[0] == "r1" # name
    assert r[3] == 0 # 0-based pos
    assert r[4] == 11 # length
    assert r[5] == -1 # mismatch, not caculated
    assert r[6] == 2 # mate pos
    assert r[7] == 13 # template length
    assert r[8] == False # is_reverse
    assert r[9] == True # paired and mapped
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_pad_softclip_2(tmpdir):
    "it should ignore more than two reads which share the same name"

    make_bam(tmpdir.strpath, """
        r1 + __.*.......
        r1 -   .*.......__
        r1 -   .*.......__
        r2 +   .*.......__
        r2 -   .*.......__
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
项目:MrBam    作者:OpenGene    | 项目源码 | 文件源码
def test_pad_softclip_3(tmpdir):
    "it should pad softclipped bases"

    make_bam(tmpdir.strpath, """
             123456789_123
        r1 + __.*.......
        r1 -   .*.........
        r2 - ...*.......
        r2 +   .*.......__
    """)

    o = Namespace(verbos=False, mismatch_limit=-1)
    sam = AlignmentFile(tmpdir.join("test.bam").strpath)

    adjusted_pos = pad_softclip(sam)

    assert adjusted_pos["r1"] == (0, 13) # 0-based position
    assert adjusted_pos["r2"] == (0, 13)
项目:webhook2lambda2sqs    作者:jantman    | 项目源码 | 文件源码
def get_api_id(config, args):
    """
    Get the API ID from Terraform, or from AWS if that fails.

    :param config: configuration
    :type config: :py:class:`~.Config`
    :param args: command line arguments
    :type args: :py:class:`argparse.Namespace`
    :return: API Gateway ID
    :rtype: str
    """
    try:
        logger.debug('Trying to get Terraform rest_api_id output')
        runner = TerraformRunner(config, args.tf_path)
        outputs = runner._get_outputs()
        depl_id = outputs['rest_api_id']
        logger.debug("Terraform rest_api_id output: '%s'", depl_id)
    except Exception:
        logger.info('Unable to find API rest_api_id from Terraform state;'
                    ' querying AWS.', exc_info=1)
        aws = AWSInfo(config)
        depl_id = aws.get_api_id()
        logger.debug("AWS API ID: '%s'", depl_id)
    return depl_id
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def _predict(predictors: Dict[str, str]):
    def predict_inner(args: argparse.Namespace) -> None:
        predictor = _get_predictor(args, predictors)
        output_file = None

        if args.silent and not args.output_file:
            print("--silent specified without --output-file.")
            print("Exiting early because no output will be created.")
            sys.exit(0)

        # ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist
        with ExitStack() as stack:
            input_file = stack.enter_context(args.input_file)  # type: ignore
            if args.output_file:
                output_file = stack.enter_context(args.output_file)  # type: ignore

            _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)

    return predict_inner
项目:azure-cli-shell    作者:Azure    | 项目源码 | 文件源码
def get_parsed_args(self, comp_words):
        """ gets the parsed args from a patched parser """
        active_parsers = self._patch_argument_parser()

        parsed_args = argparse.Namespace()

        self.completing = True
        if USING_PYTHON2:
            # Python 2 argparse only properly works with byte strings.
            comp_words = [ensure_bytes(word) for word in comp_words]

        try:
            stderr = sys.stderr
            sys.stderr = io.open(os.devnull, "w")

            active_parsers[0].parse_known_args(comp_words, namespace=parsed_args)

            sys.stderr.close()
            sys.stderr = stderr
        except BaseException:
            pass

        self.completing = False
        return parsed_args
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_with_config(self, is_file, open_):
        # Create our stand-in config file.
        config_file = textwrap.dedent(u"""\
        ---
        local_paths:
          reporoot: ~/Code
        publish: local
        """)
        is_file.return_value = True
        open_.return_value = io.StringIO(config_file)

        # Get the config and test the result.
        flags = Namespace(user_config='/bogus/file.yaml', command='not_init')
        user_config = main.read_user_config(flags)
        assert user_config == {
            'local_paths': {'reporoot': '~/Code'},
            'publish': 'local',
        }
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_with_ssh_repo(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.MagicMock(spec=github3.github.GitHub)
        login.return_value = gh
        url = 'https://github.com/me/repo/pulls/1/'
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**self.task_kwargs)

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct methods were called.
        login.assert_called_once_with('lukesneeringer', '1335020400')
        gh.repository.assert_called_with('me', 'repo')
        gh.repository().create_pull.assert_called_once_with(
            base='master',
            body='This pull request was generated by artman. '
                 'Please review it thoroughly before merging.',
            head='pubsub-python-v1',
            title='Python GAPIC: Pubsub v1',
        )
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def test_with_http_url(self, login):
        # Set up test data to return when we attempt to make the
        # pull request.
        gh = mock.MagicMock(spec=github3.github.GitHub)
        login.return_value = gh
        url = 'https://github.com/me/repo/pulls/1/'
        gh.repository().create_pull.return_value = Namespace(html_url=url)

        # Run the task.
        task = github.CreateGitHubPullRequest()
        pr = task.execute(**dict(self.task_kwargs, git_repo={
            'location': 'https://github/me/repo/',
        }))

        # Assert we got the correct result.
        assert pr.html_url == url

        # Assert that the correct repository method was still called.
        gh.repository.assert_called_with('me', 'repo')
项目:ParlAI    作者:facebookresearch    | 项目源码 | 文件源码
def print_args(self):
        """Print out all the arguments in this parser."""
        if not self.opt:
            self.parse_args(print_args=False)
        values = {}
        for key, value in self.opt.items():
            values[str(key)] = str(value)
        for group in self._action_groups:
            group_dict = {
                a.dest: getattr(self.args, a.dest, None)
                for a in group._group_actions
            }
            namespace = argparse.Namespace(**group_dict)
            count = 0
            for key in namespace.__dict__:
                if key in values:
                    if count == 0:
                        print('[ ' + group.title + ': ] ')
                    count += 1
                    print('[  ' + key + ': ' + values[key] + ' ]')
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def _test_standalone_sequana(qtbot, tmpdir):
    wkdir = TemporaryDirectory()
    inputdir = os.path.realpath(
            sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz")).rsplit(os.sep,1)[0]

    # Standalone for sequana given a wkdir and pipeline and input_dir
    args = Namespace(pipeline="quality_control", wkdir=wkdir.name,
                input_directory=inputdir)
    widget = sequana_gui.SequanaGUI(ipython=False, user_options=args)
    qtbot.addWidget(widget)
    assert widget.mode == "sequana"
    widget.force = True
    widget.save_project()

    widget.click_run()
    count = 0
    while widget.process.state() and count < 5:
        time.sleep(0.5)
        count+=0.5
    widget.click_stop()
    time.sleep(1)
项目:python-mysql-binlog-pubsub    作者:tarzanjw    | 项目源码 | 文件源码
def _setup_arg_parser(argv):
    """ Parse arguments from command line
    Args:
        argv list: by default it's command line arguments

    Returns:
        argparse.Namespace: parsed argument
    """
    parser = argparse.ArgumentParser(
        description='MySQL binlog to Google Cloud Pub/Sub')
    parser.add_argument('conf',
                        help='configuration file for publishing')
    parser.add_argument('--loglevel', '-l', default=None,
                        help='log level for root')

    if os.path.isfile('logging.ini'):
        _log_file = 'logging.ini'
    else:
        _log_file = None

    parser.add_argument('--logconf', default=_log_file,
                        help='INI file log configuration')
    args = parser.parse_args(argv)
    return args
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def RunTestsCommand(args):
  """Checks test type and dispatches to the appropriate function.

  Args:
    args: argparse.Namespace object.

  Returns:
    Integer indicated exit code.

  Raises:
    Exception: Unknown command name passed in, or an exception from an
        individual test runner.
  """
  command = args.command

  ProcessCommonOptions(args)
  logging.info('command: %s', ' '.join(sys.argv))
  if args.enable_platform_mode or command in _DEFAULT_PLATFORM_MODE_TESTS:
    return RunTestsInPlatformMode(args)

  if command == 'python':
    return _RunPythonTests(args)
  else:
    raise Exception('Unknown test type.')
项目:clinner    作者:PeRDy    | 项目源码 | 文件源码
def __init__(self, args=None, parse_args=True):
        self.cli = CLI()
        self.args, self.unknown_args = argparse.Namespace(), []
        if parse_args:
            self.args, self.unknown_args = self.parse_arguments(args=args)
            self.settings = self.args.settings or os.environ.get('CLINNER_SETTINGS')

            # Inject parameters related to current stage as environment variables
            self.inject()

            # Load settings
            settings.build_from_module(self.args.settings)

            # Apply quiet mode
            if self.args.quiet:
                self.cli.disable()
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def add_domains(args_or_config, domains):
    """Registers new domains to be used during the current client run.

    Domains are not added to the list of requested domains if they have
    already been registered.

    :param args_or_config: parsed command line arguments
    :type args_or_config: argparse.Namespace or
        configuration.NamespaceConfig
    :param str domain: one or more comma separated domains

    :returns: domains after they have been normalized and validated
    :rtype: `list` of `str`

    """
    validated_domains = []
    for domain in domains.split(","):
        domain = util.enforce_domain_sanity(domain.strip())
        validated_domains.append(domain)
        if domain not in args_or_config.domains:
            args_or_config.domains.append(domain)

    return validated_domains
项目:linode-cli    作者:linode    | 项目源码 | 文件源码
def update(namespace, username=None):
    """
    This updates a Namespace (as returned by ArgumentParser) with config values
    if they aren't present in the Namespace already.
    """
    if not os.path.isfile(_get_config_path()):
        return namespace

    if not username:
        username = "DEFAULT"

    conf = _get_config()

    if not username in conf:
        print("User {} is not configured.".format(username))
        sys.exit(1)

    return update_namespace(namespace, conf[username])
项目:python-tempestconf    作者:redhat-openstack    | 项目源码 | 文件源码
def test_remove_values_having_hyphen(self):
        api_exts = "dvr, l3-flavors, rbac-policies, project-id"
        remove_exts = ["dvr", "project-id"]
        args = Namespace(
            remove={
                "network-feature-enabled.api-extensions": remove_exts
            }
        )
        self.conf = self._get_conf("v2.0", "v3")
        self.conf.set("network-feature-enabled", "api-extensions", api_exts)
        self.conf.remove_values(args)
        conf_exts = self.conf.get("network-feature-enabled", "api-extensions")
        conf_exts = conf_exts.split(',')
        for ext in api_exts.split(','):
            if ext in remove_exts:
                self.assertFalse(ext in conf_exts)
            else:
                self.assertTrue(ext in conf_exts)
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def parse_arguments(args: list) -> Namespace:
    """
    Parses the arguments passed on invocation in a dict and return it
    """
    parser = ArgumentParser(description="A tool to combine multiple merge tools")

    parser.add_argument('-b', '--base', required=True)
    parser.add_argument('-l', '--local', required=True)
    parser.add_argument('-r', '--remote', required=True)
    parser.add_argument('-m', '--merged', required=True)

    # convert to absolute path
    parsed_arg = parser.parse_args(args)

    parsed_arg.base = os.path.abspath(parsed_arg.base)
    parsed_arg.local = os.path.abspath(parsed_arg.local)
    parsed_arg.remote = os.path.abspath(parsed_arg.remote)
    parsed_arg.merged = os.path.abspath(parsed_arg.merged)
    return parsed_arg
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def merge(config: RawConfigParser,
          args: Namespace,
          launcher: ToolsLauncher,
          analyser: ConflictedFileAnalyser) -> int:
    """
    Handle the merge tools chain for the given argument
    config -- the current amt configuration
    args -- the arguments with the base, local, remote and merged file names
    launcher -- the launcher helper
    """
    if not (config.has_option(SECT_AMT, OPT_TOOLS)):
        raise RuntimeError('Missing the {0}.{1} configuration'.format(SECT_AMT, OPT_TOOLS))

    tools = config.get(SECT_AMT, OPT_TOOLS).split(';')
    merge_result = ERROR_NO_TOOL

    for tool in tools:
        merge_result = merge_with_tool(tool, config, args, launcher, analyser)
        if merge_result == 0:
            return 0

    print(" [AMT] ? Sorry, it seems we can't solve it this time")

    return merge_result
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        options.cfg = argparse.Namespace()
        options.cfg.corpus = None
        options.cfg.current_server = "MockConnection"
        options.cfg.MODE = QUERY_MODE_TOKENS
        options.cfg.stopword_list = []
        options.cfg.context_mode = CONTEXT_NONE
        options.cfg.drop_on_na = True
        options.cfg.drop_duplicates = True
        options.cfg.benchmark = False
        options.cfg.verbose = False

        self.Session = Session()
        self.Session.Resource = BaseResource()

        self.manager = Manager()
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        self.maxDiff = None
        options.cfg = argparse.Namespace()
        options.cfg.number_of_tokens = 0
        options.cfg.limit_matches = False
        options.cfg.regexp = False
        options.cfg.query_case_sensitive = False
        options.get_configuration_type = lambda: SQL_MYSQL
        options.get_resource = _monkeypatch_get_resource
        self.Session = MockOptions()
        self.Session.Resource = self.resource
        self.Session.Lexicon = None
        self.Session.Corpus = None

        self.link = coquery.links.Link(
                        self.resource.name, "corpus_word",
                        self.external.name, "word_label",
                        join="LEFT JOIN")
        options.cfg.current_server = "Default"
        options.cfg.table_links = {}
        options.cfg.table_links[options.cfg.current_server] = [self.link]
项目:coquery    作者:gkunter    | 项目源码 | 文件源码
def setUp(self):
        self.maxDiff = None
        options.cfg = argparse.Namespace()
        options.cfg.number_of_tokens = 0
        options.cfg.limit_matches = False
        options.cfg.regexp = False
        options.cfg.query_case_sensitive = False
        options.get_configuration_type = lambda: SQL_MYSQL
        options.get_resource = _monkeypatch_get_resource
        self.Session = MockOptions()
        self.Session.Resource = self.resource
        self.Session.Lexicon = None
        self.Session.Corpus = None

        self.link = coquery.links.Link(
                        self.resource.name, "word_label",
                        self.external.name, "word_label",
                        join="LEFT JOIN")
        options.cfg.current_server = "Default"
        options.cfg.table_links = {}
        options.cfg.table_links[options.cfg.current_server] = [self.link]
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        self.server = mock.Mock()
        self.flow = mock.Mock()
        self.storage = mock.Mock()
        self.credentials = mock.Mock()

        self.flow.step1_get_authorize_url.return_value = (
            'http://example.com/auth')
        self.flow.step2_exchange.return_value = self.credentials

        self.flags = argparse.Namespace(
            noauth_local_webserver=True, logging_level='INFO')
        self.server_flags = argparse.Namespace(
            noauth_local_webserver=False,
            logging_level='INFO',
            auth_host_port=[8080, ],
            auth_host_name='localhost')
项目:CSB    作者:csb-toolbox    | 项目源码 | 文件源码
def parse(self, args):
        """
        Parse the command line arguments.

        @param args: the list of user-provided command line arguments --
                     normally sys.argv[1:]
        @type args: tuple of str        

        @return: an object initialized with the parsed arguments
        @rtype: argparse.Namespace
        """
        try:
            return self.parser.parse_args(args)
        except SystemExit as se:
            if se.code > 0:
                raise AppExit('Bad command line', ExitCodes.USAGE_ERROR)
            else:
                raise AppExit(code=ExitCodes.CLEAN)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Mapping Quality Zero constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(MQ0FFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.mq_score
        elif isinstance(args, dict):
            try:
                self.threshold = float(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create MQ0F filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Mapping Quality Zero constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(MQ0Filter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.mq_score
        elif isinstance(args, dict):
            try:
                self.threshold = float(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create MQ0 filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """AD Ratio constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(DP4Filter, self).__init__(args)

        # Change the threshold to custom dp value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.ad_ratio
        elif isinstance(args, dict):
            try:
                self.threshold = float(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create DP4 filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Mapping Quality constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(MQFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.mq_score
        elif isinstance(args, dict):
            try:
                self.threshold = int(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be an integer!")
                raise Exception("Could not create MQ filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Depth constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(GQFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.gq_score
        elif isinstance(args, dict):
            try:
                self.threshold = int(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be an integer!")
                raise Exception("Could not create GQ filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Depth constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(QualFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.gq_score
        elif isinstance(args, dict):
            try:
                self.threshold = float(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a float!")
                raise Exception("Could not create QUAL filter from parameters: %s" % args)
项目:PHEnix    作者:phe-bioinformatics    | 项目源码 | 文件源码
def __init__(self, args):
        """Min Depth constructor."""
        # This needs to happen first, because threshold is initialised here.
        super(UncallableGTFilter, self).__init__(args)

        # Change the threshold to custom gq value.
        self.threshold = self._default_threshold
        if isinstance(args, argparse.Namespace):
            self.threshold = args.gq_score
        elif isinstance(args, dict):
            try:
                self.threshold = str(args.get(self.parameter))
            except (TypeError, ValueError):
                logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
                logging.error("This parameter requires to be a string!")
                self.threshold = None
项目:open-project-linter    作者:OpenNewsLabs    | 项目源码 | 文件源码
def parse_linter_args():
    """Parse command-line arguments and set up the command-line
    interface and help.

    Defaults to current working directory for the directory arg and the
    copy of rules.py in the directory this module is in for the rules arg.

    Returns
    -------
    argparse.Namespace
        object subclass with arguments as attributes for each given argument
    """
    parser = argparse.ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument('-d', '--directory', help="The local path to your repository's base directory. Defaults to the current working directory.",
       default=os.getcwd()
    )
    parser.add_argument('-r', '--rules', help='The path to the rules configuration file, a YAML file containing the rules you would like to check for. Defaults to path/to/openlinter/rules.yml.',
        default=os.path.join(get_current_script_dir(), 'rules.yml')
    )
    parser.add_argument('-v', '--version', action='version', version='1.0.1')
    return parser.parse_args()