Python typing 模块,NamedTuple() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用typing.NamedTuple()

项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def test_suggestPayout():
    MockedReportWrapper = NamedTuple('MockedReportWrapper', [('getReportBody', Callable),
                                                             ('getReportWeakness', Callable),
                                                             ('getVulnDomains', Callable)])
    MockedReportWrapperXSS = MockedReportWrapper(getReportBody=lambda: 'XSS',
                                                 getReportWeakness=lambda: 'XSS',
                                                 getVulnDomains=lambda: [])
    assert payout.suggestPayout(MockedReportWrapperXSS) == config.payoutDB['xss']['average']
    for vulnType in config.payoutDB:
        for domain in config.payoutDB[vulnType]:
            MockedReportWrapperVuln = MockedReportWrapper(getReportBody=lambda: vulnType,
                                                          getReportWeakness=lambda: vulnType,
                                                          getVulnDomains=lambda: [domain])
            assert payout.suggestPayout(MockedReportWrapperVuln) == config.payoutDB[vulnType][domain]
    MockedReportWrapperNone = MockedReportWrapper(getReportBody=lambda: '',
                                                  getReportWeakness=lambda: '',
                                                  getVulnDomains=lambda: [])
    assert payout.suggestPayout(MockedReportWrapperNone) is None
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 name: str,
                 parent_decoder: Decoder,
                 beam_size: int,
                 length_normalization: float,
                 max_steps: int = None,
                 save_checkpoint: str = None,
                 load_checkpoint: str = None) -> None:
        check_argument_types()
        ModelPart.__init__(self, name, save_checkpoint, load_checkpoint)

        self.parent_decoder = parent_decoder
        self._beam_size = beam_size
        self._length_normalization = length_normalization

        # In the n+1th step, outputs  of lenght n will be collected
        # and the n+1th step of decoder (which is discarded) will be executed
        if max_steps is None:
            max_steps = parent_decoder.max_output_len
        self._max_steps = tf.constant(max_steps + 1)
        self.max_output_len = max_steps

        # Feedables
        self._search_state = None  # type: SearchState
        self._decoder_state = None  # type: NamedTuple

        # Output
        self.outputs = self._decoding_loop()
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 name: str,
                 parent_decoder: Decoder,
                 beam_size: int,
                 length_normalization: float,
                 max_steps: int = None,
                 save_checkpoint: str = None,
                 load_checkpoint: str = None) -> None:
        check_argument_types()
        ModelPart.__init__(self, name, save_checkpoint, load_checkpoint)

        self.parent_decoder = parent_decoder
        self._beam_size = beam_size
        self._length_normalization = length_normalization

        # In the n+1th step, outputs  of lenght n will be collected
        # and the n+1th step of decoder (which is discarded) will be executed
        if max_steps is None:
            max_steps = parent_decoder.max_output_len
        self._max_steps = tf.constant(max_steps + 1)
        self.max_output_len = max_steps

        # Feedables
        self._search_state = None  # type: SearchState
        self._decoder_state = None  # type: NamedTuple

        # Output
        self.outputs = self._decoding_loop()
项目:neuralmonkey    作者:ufal    | 项目源码 | 文件源码
def __init__(self,
                 name: str,
                 parent_decoder: Decoder,
                 beam_size: int,
                 length_normalization: float,
                 max_steps: int = None,
                 save_checkpoint: str = None,
                 load_checkpoint: str = None) -> None:
        check_argument_types()
        ModelPart.__init__(self, name, save_checkpoint, load_checkpoint)

        self.parent_decoder = parent_decoder
        self._beam_size = beam_size
        self._length_normalization = length_normalization

        # In the n+1th step, outputs  of lenght n will be collected
        # and the n+1th step of decoder (which is discarded) will be executed
        if max_steps is None:
            max_steps = parent_decoder.max_output_len
        self._max_steps = tf.constant(max_steps + 1)
        self.max_output_len = max_steps

        # Feedables
        self._search_state = None  # type: SearchState
        self._decoder_state = None  # type: NamedTuple

        # Output
        self.outputs = self._decoding_loop()
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def cte_constructor(cls, where=None, limit=None, offset=None, name=None) -> CTE:
    query = cls.Options.db_table.select()
    if where is not None:
        query = query.where(where)
    if limit is not None:
        query = query.limit(limit)
    if offset is not None:
        query = query.offset(offset)

    name = name or '{}_cte'.format(inflection.tableize(cls.__name__))
    return query.cte(name=name)


# With NamedTuple we can't use mixins normally,
# so use classmethod() call directly
项目:eventsourcing_helpers    作者:fyndiq    | 项目源码 | 文件源码
def test_to_message_from_dto(self):
        """
        Test that we can serialize a DTO to a message.
        """
        fields = [('id', None)]
        FooEvent = message_factory(NamedTuple('FooEvent', fields))
        dto = FooEvent(id=1)
        message = to_message_from_dto(dto)

        assert message['class'] == 'FooEvent'
        assert message['data']['id'] == 1
项目:eventsourcing_helpers    作者:fyndiq    | 项目源码 | 文件源码
def setup_method(self):
        self.data = {'id': 1, 'foo': 'bar', 'baz': None}
        fields = [(k, None) for k in self.data.keys()]
        self.namedtuple = NamedTuple('FooEvent', fields)
        self.message = message_factory(self.namedtuple)(**self.data)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    if args.global_flag:
        create_directory(constants.GLOBAL_INIT_DIRECTORY_DEST)
    else:
        create_directory(constants.LOCAL_INIT_DIRECTORY_DEST)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    spark_client = load_spark_client()

    if args.tail:
        utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name)
    else:
        app_logs = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name)
        print(app_logs.log)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(_: typing.NamedTuple):
    spark_client = load_spark_client()
    clusters = spark_client.list_clusters()
    utils.print_clusters(clusters)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    actions = {}

    actions[ClusterAction.create] = cluster_create.execute
    actions[ClusterAction.add_user] = cluster_add_user.execute
    actions[ClusterAction.delete] = cluster_delete.execute
    actions[ClusterAction.get] = cluster_get.execute
    actions[ClusterAction.list] = cluster_list.execute
    actions[ClusterAction.ssh] = cluster_ssh.execute
    actions[ClusterAction.submit] = cluster_submit.execute
    actions[ClusterAction.app_logs] = cluster_app_logs.execute

    func = actions[args.cluster_action]
    func(args)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    actions = dict(
        cluster=cluster.execute,
        init=init.execute
    )
    func = actions[args.action]
    func(args)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    spark_client = load_spark_client()
    cluster_id = args.cluster_id
    cluster = spark_client.get_cluster(cluster_id)
    utils.print_cluster(spark_client, cluster)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    spark_client = load_spark_client()

    log.info('-------------------------------------------')
    log.info('spark cluster id:    {}'.format(args.cluster_id))
    log.info('username:            {}'.format(args.username))
    log.info('-------------------------------------------')

    if args.ssh_key:
        ssh_key = args.ssh_key
    else:
        ssh_key = spark_client.secrets_config.ssh_pub_key

    ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password, spark_client.secrets_config)

    spark_client.create_user(
        cluster_id=args.cluster_id,
        username=args.username,
        password=password,
        ssh_key=ssh_key
    )

    if password:
        log.info('password:            %s', '*' * len(password))
    elif ssh_key:
        log.info('ssh public key:      %s', ssh_key)

    log.info('-------------------------------------------')
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def parse_common_args(args: NamedTuple):
    if args.verbose:
        logger.setup_logging(True)
        log.debug("Verbose logging enabled")
    else:
        logger.setup_logging(False)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def run_software(args: NamedTuple):
    softwares = {}
    softwares[aztk.models.Software.spark] = spark.execute

    func = softwares[args.software]
    func(args)
项目:aztk    作者:Azure    | 项目源码 | 文件源码
def execute(args: typing.NamedTuple):
    spark_client = load_spark_client()
    ssh_conf = SshConfig()

    ssh_conf.merge(
        cluster_id=args.cluster_id,
        username=args.username,
        job_ui_port=args.jobui,
        job_history_ui_port=args.jobhistoryui,
        web_ui_port=args.webui,
        jupyter_port=args.jupyter,
        name_node_ui_port=args.namenodeui,
        rstudio_server_port=args.rstudioserver,
        host=args.host,
        connect=args.connect
    )

    http_prefix = 'http://localhost:'
    log.info("-------------------------------------------")
    log.info("spark cluster id:    %s", ssh_conf.cluster_id)
    log.info("open webui:          %s%s", http_prefix, ssh_conf.web_ui_port)
    log.info("open jobui:          %s%s", http_prefix, ssh_conf.job_ui_port)
    log.info("open jobhistoryui:   %s%s", http_prefix, ssh_conf.job_history_ui_port)
    log.info("open jupyter:        %s%s", http_prefix, ssh_conf.jupyter_port)
    log.info("open namenodeui:     %s%s", http_prefix, ssh_conf.name_node_ui_port)
    log.info("open rstudio server: %s%s", http_prefix, ssh_conf.rstudio_server_port)
    log.info("ssh username:        %s", ssh_conf.username)
    log.info("connect:             %s", ssh_conf.connect)
    log.info("-------------------------------------------")

    # get ssh command
    try:
        ssh_cmd = utils.ssh_in_master(
            client=spark_client,
            cluster_id=ssh_conf.cluster_id,
            webui=ssh_conf.web_ui_port,
            jobui=ssh_conf.job_ui_port,
            jobhistoryui=ssh_conf.job_history_ui_port,
            namenodeui=ssh_conf.name_node_ui_port,
            jupyter=ssh_conf.jupyter_port,
            rstudioserver=ssh_conf.rstudio_server_port,
            username=ssh_conf.username,
            host=ssh_conf.host,
            connect=ssh_conf.connect)

        if not ssh_conf.connect:
            log.info("")
            log.info("Use the following command to connect to your spark head node:")
            log.info("\t%s", ssh_cmd)

    except batch_error.BatchErrorException as e:
        if e.error.code == "PoolNotFound":
            raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.")
        else:
            raise