Python typing 模块,Set() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Set()

项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def get_conditions(self) -> List[AbstractCondition]:
        conditions = []
        added_conditions = set()  # type: Set[str]

        # Nodes is a list of nodes
        for source_node in self.get_hyperparameters():
            # This is a list of keys in a dictionary
            # TODO sort the edges by the order of their source_node in the
            # hyperparameter list!
            for target_node in self._children[source_node.name]:
                if target_node not in added_conditions:
                    condition = self._children[source_node.name][target_node]
                    conditions.append(condition)
                    added_conditions.add(target_node)

        return conditions
项目:scheduled-bots    作者:SuLab    | 项目源码 | 文件源码
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]:
    """
    Given a list of pmids, make article items for each
    :param pmids: list of pmids
    :param login: wdi_core login instance
    :param write: actually perform write
    :return: map pmid -> wdid
    """
    pmid_map = dict()
    for pmid in pmids:
        p = wdi_helpers.PubmedItem(pmid)
        if write:
            try:
                pmid_wdid = p.get_or_create(login)
            except Exception as e:
                print("Error creating article pmid: {}, error: {}".format(pmid, e))
                continue
            pmid_map[pmid] = pmid_wdid
        else:
            pmid_map[pmid] = 'Q1'
    return pmid_map
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def get_contact_dict_for_active_monitor(dbcon: DBConnection, monitor_id: int) -> Dict[str, set]:
    """Get all contact addresses/numbers for a specific active monitor.

    Return: Dict[str, Set(str)] for 'email' and 'phone'.
    """
    ret = {
        'email': set(),
        'phone': set(),
    }  # type: Dict[str, set]

    contacts = await get_all_contacts_for_active_monitor(dbcon, monitor_id)
    for contact in contacts:
        if contact.email:
            ret['email'].add(contact.email)
        if contact.phone:
            ret['phone'].add(contact.phone)
    return ret
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def select_channel(
        self,
        versions: typing.Set[CustomVersion],
        update_channel: str = channel.STABLE
    ) -> typing.Union[CustomVersion, None]:
        """
        Selects the latest version, equals or higher than "channel"

        Args:
            versions: versions to select from
            update_channel: member of :class:`Channel`

        Returns: latest version or None

        """
        LOGGER.debug(f'selecting latest version amongst {len(versions)}; active channel: {str(channel)}')
        options = list(self.filter_channel(versions, update_channel))
        if options:
            latest = max(options)
            return latest
        LOGGER.debug('no version passed the test')
        return None
项目:mypyc    作者:JukkaL    | 项目源码 | 文件源码
def analyze_must_defined_regs(
        blocks: List[BasicBlock],
        cfg: CFG,
        initial_defined: Set[Register],
        num_regs: int) -> AnalysisResult[Register]:
    """Calculate always defined registers at each CFG location.

    A register is defined if it has a value along all paths from the initial location.
    """
    return run_analysis(blocks=blocks,
                        cfg=cfg,
                        gen_and_kill=MustDefinedVisitor(),
                        initial=initial_defined,
                        backward=False,
                        kind=MUST_ANALYSIS,
                        universe=set([Register(r) for r in range(num_regs)]))
项目:aiopubsub    作者:qntln    | 项目源码 | 文件源码
def add_listener(self, keys: Union[Key, Set[Key]], callback: ListenerCallback) -> None:
        '''
        Attach ``callback`` to one or more keys. If more than one key is provided, the callback will be
        reading all messages from one queue, and they are guaranteed to arrive in the same order they were published.

        :param keys: One key, or a set of keys.
        '''
        keys, sub_id = self._get_listener_subscription(keys, callback)
        for key in keys:
            self.subscribe(key, sub_id)

        async def consumer() -> None:
            key, msg = await self.consume(sub_id)
            callback(key, msg)

        loop = aiopubsub.loop.Loop(consumer, delay = None)
        loop.start()
        self._listeners[sub_id] = Listener(loop, keys)
项目:gfan    作者:hozuki    | 项目源码 | 文件源码
def read_all_stop_words() -> Set[str]:
    # Data source: https://wenku.baidu.com/view/7ca26338376baf1ffc4fad6a.html
    with open("data/chinese_stop_words.txt", mode="r", encoding="utf-8") as local_file:
        text_lines = local_file.readlines()
        text_lines = list(x.replace("\n", "") for x in text_lines)

    with open("data/chinese_stop_symbols.txt", mode="r", encoding="utf-8") as local_file:
        symbol_lines = local_file.readlines()
        symbol_lines = list(x.replace("\n", "") for x in symbol_lines)

    public_stop_words = get_stop_words("zh")

    stop_words: Set[str] = set()
    stop_words = stop_words.union(text_lines)
    stop_words = stop_words.union(symbol_lines)
    stop_words = stop_words.union(public_stop_words)

    return stop_words
项目:steem-python    作者:steemit    | 项目源码 | 文件源码
def _get_blocks(self, blocks: Union[List[int], Set[int]]):
        """ Fetch multiple blocks from steemd at once.

        Warning:
            This method does not ensure that all blocks are returned, or that the results are ordered.
            You will probably want to use `steemd.get_blocks()` instead. 

        Args:
            blocks (list): A list, or a set of block numbers.

        Returns:
            A generator with results.

        """
        results = self.exec_multi_with_futures('get_block', blocks, max_workers=10)
        return ({**x, 'block_num': int(x['block_id'][:8], base=16)} for x in results if x)
项目:bptc_wallet    作者:ceddie    | 项目源码 | 文件源码
def get_majority_vote_in_set_for_event(hashgraph, s: Set[str], x: Event) -> (bool, int):
    """
    Returns the majority vote and the winning amount of stake that a set of witnesses has for another event
    :param hashgraph:
    :param s:
    :param x:
    :return: Tuple containing the majority vote (bool) and the total stake of the majority vote (int)
    """

    stake_for = 0
    stake_against = 0

    for event_id in s:
        event = hashgraph.lookup_table[event_id]
        if x.id in event.votes and event.votes[x.id]:
            stake_for += hashgraph.known_members[event.verify_key].stake
        else:
            stake_against += hashgraph.known_members[event.verify_key].stake

    return Fame.TRUE if stake_for >= stake_against else Fame.FALSE, stake_for if stake_for >= stake_against else stake_against
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def superbubble_nodes(g: AssemblyGraph, source: Node,
                      sink: Node) -> Set[Node]:
    """Find all nodes inside a superbubble."""

    queue = deque([source])
    visited = {source, sink}

    while queue:
        current = queue.popleft()

        for neighbour in g.neighbors_iter(current):
            if neighbour not in visited:
                queue.append(neighbour)
                visited.add(neighbour)

    return visited
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def __init__(self, ploidy: int, copy_from: 'HaplotypeSet'=None):
        self.ploidy = ploidy

        # Nodes spelling each haplotype
        self.haplotypes = []  # type: List[List[Node]]

        # Also keep a set of reads used for each haplotype, useful for
        # relative likelihood calculation
        self.read_sets = []  # type: List[Set[OrientedRead]]

        if isinstance(copy_from, HaplotypeSet):
            for i in range(ploidy):
                self.haplotypes.append(deque(copy_from.haplotypes[i]))
                self.read_sets.append(set(copy_from.read_sets[i]))
        else:
            for i in range(ploidy):
                self.haplotypes.append(deque())
                self.read_sets.append(set())

        self.log_rl = float('-inf')
        self.from_large_bubble = False
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def extend(self, extensions: List[Tuple[Node]],
               ext_read_sets: List[Set[OrientedRead]]) -> 'HaplotypeSet':
        """Extend the haplotype set with a new set of paths."""

        # Make a copy of itself for a new set
        new_set = HaplotypeSet(self.ploidy, copy_from=self)

        for hap_num, (extension, read_set) in enumerate(
                zip(extensions, ext_read_sets)):
            haplotype_nodes = new_set.haplotypes[hap_num]

            # Add the nodes of the extension to each haplotype
            # It's possible that the last node of this haplotype set
            # (which is probably a bubble exit), is also the bubble entrance
            # and thus our start node of our extension.
            if (len(haplotype_nodes) > 0 and
                    haplotype_nodes[-1] == extension[0]):
                haplotype_nodes.extend(extension[1:])
            else:
                haplotype_nodes.extend(extension)

            new_set.read_sets[hap_num].update(read_set)

        return new_set
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def _update_grammar_text(self) -> None:
        """
            "B", {"aB", "bC", "a"} turns into
            "B -> aB | bC | a"
        """
        def transform_production(non_terminal: str, productions: Set[str]):
            return "{} -> {}".format(
                non_terminal, " | ".join(sorted(productions)))

        initial_symbol = self._grammar.initial_symbol()
        productions = self._grammar.productions()

        text = ""

        if initial_symbol in productions:
            text = transform_production(
                initial_symbol, productions[initial_symbol]) + "\n"

        for non_terminal in sorted(set(productions.keys()) - {initial_symbol}):
            text += transform_production(
                non_terminal, productions[non_terminal]) + "\n"

        self.grammarText.setPlainText(text)
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def down(self, visited: FrozenSet[Any]=None) -> Set[Any]:
        """ Returns the set of reachable nodes by going down on this node """
        if visited is None:
            visited = frozenset()

        if self in visited:
            return {self} if self.symbol not in OPERATORS else set()

        visited |= {self}
        if self.symbol == '|':
            return self.left.down(visited) | self.right.down(visited)
        elif self.symbol == '.':
            return self.left.down(visited)
        elif self.symbol == '*' or self.symbol == '?':
            return self.left.down(visited) | self.right.up(visited)
        elif self.symbol == EPSILON:
            return self.right.up(visited)
        return {self}
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def up(self, visited: FrozenSet[Any]=None) -> Set[Any]:
        """ Returns the set of reachable nodes by going up on this node """
        if visited is None:
            visited = frozenset()

        if self.symbol == '|':
            # skip the whole right sub tree
            node = self.right
            while node.symbol == '.' or node.symbol == '|':
                node = node.right
            return node.right.up(visited)
        elif self.symbol == '.':
            return self.right.down(visited)
        elif self.symbol == '*':
            return self.left.down(visited) | self.right.up(visited)
        elif self.symbol == '?':
            return self.right.up(visited)
        else:  # self.symbol == END:
            return {self}
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def remove_state(self, state: str) -> None:
        """ Removes a state """
        # may not remove initial state
        if state != self._initial_state:
            self._states.discard(state)
            self._final_states.discard(state)

            for symbol in self._alphabet:
                # remove useless transitions that come from the removed state
                if (state, symbol) in self._transitions:
                    del self._transitions[state, symbol]

            empty_transitions = set()  # type Set[Tuple[str, str]]
            for actual_state, next_state in self._transitions.items():
                # remove transitions that go to the removed state
                next_state.discard(state)
                if not next_state:
                    empty_transitions.add(actual_state)

            for transition in empty_transitions:
                del self._transitions[transition]
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def _are_undistinguishable(
            self, state_a: str, state_b: str,
            undistinguishable: Set[FrozenSet[str]]) -> bool:
        """
            State a and b are distinguishable if they go to distinguishable
            states for some input symbol.
        """
        for symbol in self._alphabet:
            transition_a = \
                list(self._transitions.get((state_a, symbol), {""}))[0]
            transition_b = \
                list(self._transitions.get((state_b, symbol), {""}))[0]
            if transition_a != transition_b and \
                    frozenset((transition_a, transition_b)) not in \
                    undistinguishable:
                return False
        return True
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def _determinize_state(self, states_set: Set[str]) -> None:
        """
            For a given set of states, verify whether they pertains to the
            actual states of the FA. In negative case, add it and insert
            the transitions properly
        """
        name = "".join(sorted(states_set))
        if name and name not in self._states:
            self.add_state(name)
            if states_set.intersection(self._final_states):
                self._final_states.add(name)
            for symbol in self._alphabet:
                reachable = self._find_reachable(states_set, symbol)
                if reachable:
                    self._transitions[name, symbol] = reachable
                    self._determinize_state(reachable)
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def _has_recursion(self, to_visit: str, visited: Set[str]) -> bool:
        """
            Checks if the automata has recursive states, using a depth
            first search approach.
        """
        if to_visit in visited:
                return True

        visited.add(to_visit)
        reachable = set()  # type: Set[str]

        # Find the reachable through all symbols
        for symbol in self._alphabet:
            reachable.update(self._find_reachable({to_visit}, symbol))

        for state in reachable:
            if self._has_recursion(state, copy.deepcopy(visited)):
                return True

        return False
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def from_regular_grammar(grammar) -> 'NFA':
        """ Converts RegularGrammar to NFA """
        initial_symbol = grammar.initial_symbol()
        productions = grammar.productions()

        states = set(productions.keys()) | {"X"}
        alphabet = set()  # type: Set[str]
        transitions = {}  # type: Dict[Tuple[str, str], Set[str]]
        initial_state = initial_symbol
        final_states = set("X") | \
            ({initial_symbol} if "&" in productions[initial_symbol] else set())

        for non_terminal, prods in productions.items():
            for production in prods:
                if production == "&":
                    continue

                new_transition = "X" if len(production) == 1 else production[1]
                transitions.setdefault(
                    (non_terminal, production[0]), set()).add(new_transition)

                alphabet.add(production[0])

        return NFA(states, alphabet, transitions, initial_state, final_states)
项目:suq    作者:MaxwellBo    | 项目源码 | 文件源码
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]:
    """
    Finds this weeks remaining common breaks between a group of users
    """
    # So, the Mypy type checker treats `List` as invariant, meaning we
    # can't give a `List[B]` to a function that expects a `List[A]` if
    # B is a subclass of A.
    # So we have to cast it in to the function...

    # FIXME: Get rid of these casts when Van Rossum figures out how to write a
    #        proper type system
    breaks = cast(List[Event_], get_shared_breaks(group_members))
    now = datetime.now(BRISBANE_TIME_ZONE)

    ### ... and out.
    return cast(List[Break], get_this_weeks_events(now, breaks))


# FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
项目:holo    作者:TheEnigmaBlade    | 项目源码 | 文件源码
def search_show_ids_by_names(self, *names, exact=False) -> Set[Show]:
        shows = set()
        for name in names:
            debug("Searching shows by name: {}".format(name))
            if exact:
                self.q.execute("SELECT show, name FROM ShowNames WHERE name = ?", (name,))
            else:
                self.q.execute("SELECT show, name FROM ShowNames WHERE name = ? COLLATE alphanum", (name,))
            matched = self.q.fetchall()
            for match in matched:
                debug("  Found match: {} | {}".format(match[0], match[1]))
                shows.add(match[0])
        return shows

# Helper methods

## Conversions
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def init_by_vocabulary(self, lemma_counter: Counter, lemma_to_word_forms: Dict[str, Set[WordForm]],
                           lemma_case: Dict[str, LemmaCase]):
        """
        ?????? ??????? ?? ???????????????? ??????

        :param lemma_counter: Counter ?? ??????.
        :param lemma_to_word_forms: ??????????? ?? ????? ? ?????? ????????? ????????? ??? ??? (?? ?????????)
        :param lemma_case: ??????????? ?? ????? ? ??? ?????????????, ????????? ??? ???? ?????
        """
        for i, (lemma, _) in enumerate(tqdm(lemma_counter.most_common(), desc="Init vocabulary")):
            for word_form in lemma_to_word_forms[lemma]:
                word_form.set_case(lemma_case[word_form.lemma])
                self.word_forms.append(word_form)
                self.word_form_indices[word_form] = len(self.word_forms) - 1
                assert self.word_forms[self.word_form_indices[word_form]] == word_form
                self.lemma_indices[word_form] = i + 1  # 0 - ?????????????? ??? ????????.
        assert self.lemma_indices[SEQ_END_WF] == 1
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def __init__(self, language: str="ru", mode: Mode=Mode.GRAPHEMES, raw_dict_path=None, trie_path=None,
                 zalyzniak_dict=ZALYZNYAK_DICT, cmu_dict=CMU_DICT) -> None:
        self.data = pygtrie.Trie()  # type: Dict[str, Set[Stress]]
        self.raw_dict_path = raw_dict_path
        self.trie_path = trie_path
        if language == "ru" and mode == self.Mode.GRAPHEMES:
            self.__init_defaults(RU_GRAPHEME_STRESS_PATH, RU_GRAPHEME_STRESS_TRIE_PATH)
            if not os.path.exists(self.raw_dict_path):
                from rupo.dict.zaliznyak import ZalyzniakDict
                ZalyzniakDict.convert_to_accent_only(zalyzniak_dict, self.raw_dict_path)
        elif mode == self.Mode.PHONEMES and language == "en":
            self.__init_defaults(EN_PHONEME_STRESS_PATH, EN_PHONEME_STRESS_TRIE_PATH)
            if not os.path.exists(self.raw_dict_path):
                CMUDict.convert_to_phoneme_stress(cmu_dict, self.raw_dict_path)
        else:
            assert False
        if not os.path.isfile(self.raw_dict_path):
            raise FileNotFoundError("Dictionary raw file not found.")
        if os.path.isfile(self.trie_path):
            self.load(self.trie_path)
        else:
            self.create(self.raw_dict_path, self.trie_path)
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def _find_default_mounts() -> Set[str]:
    global config_paths
    basepath = os.getcwd()
    miniparser = configargparse.ArgumentParser()
    miniparser.add_argument(*args_for_setting_config_path, dest="config", action="append",
                            default=[])
    args, _ = miniparser.parse_known_args()

    # type: ignore, because mypy doesn't parse add_argument above correctly
    if not args.config:
        args.config = default_config_files

    paths = set()
    paths.add(basepath)
    for cfg in args.config:
        if os.path.isfile(cfg):
            paths.add(os.path.abspath(os.path.dirname(cfg)))
            config_paths.add(os.path.abspath(os.path.dirname(cfg)))
    return paths
项目:PyCOOLC    作者:aalhour    | 项目源码 | 文件源码
def __init__(self):
        """
        TODO
        :param program_ast: TODO
        :return: None
        """
        super(PyCoolSemanticAnalyser, self).__init__()

        # Initialize the internal program ast instance.
        self._program_ast = None

        # Classes Map: maps each class name (key: String) to its class instance (value: AST.Class).
        # Dict[AnyStr, AST.Class]
        self._classes_map = dict()

        # Class Inheritance Graph: maps a parent class (key: String) to a unique collection of its 
        #   children classes (value: set).
        # Dict[AnyStr, Set]
        self._inheritance_graph = defaultdict(set)

    # #########################################################################
    #                                PUBLIC                                   #
    # #########################################################################
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _get_commands(dist  # type: setuptools.dist.Distribution
                  ):
    # type: (...) -> typing.Dict[str, typing.Set[str]]
    """Find all commands belonging to the given distribution.

    Args:
        dist: The Distribution to search for docopt-compatible docstrings that
            can be used to generate command entry points.

    Returns:
        A dictionary containing a mapping of primary commands to sets of
        subcommands.
    """
    py_files = (f for f in setuptools.findall()
                if os.path.splitext(f)[1].lower() == '.py')
    pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages)
    commands = {}  # type: typing.Dict[str, typing.Set[str]]
    for file_name in pkg_files:
        with open(file_name) as py_file:
            module = typing.cast(ast.Module, ast.parse(py_file.read()))
        module_name = _get_module_name(file_name)
        _append_commands(commands, module_name, _get_module_commands(module))
        _append_commands(commands, module_name, _get_class_commands(module))
        _append_commands(commands, module_name, _get_function_commands(module))
    return commands
项目:rcli    作者:contains-io    | 项目源码 | 文件源码
def _append_commands(dct,  # type: typing.Dict[str, typing.Set[str]]
                     module_name,  # type: str
                     commands  # type:typing.Iterable[_EntryPoint]
                     ):
    # type: (...) -> None
    """Append entry point strings representing the given Command objects.

    Args:
        dct: The dictionary to append with entry point strings. Each key will
            be a primary command with a value containing a list of entry point
            strings representing a Command.
        module_name: The name of the module in which the command object
            resides.
        commands: A list of Command objects to convert to entry point strings.
    """
    for command in commands:
        entry_point = '{command}{subcommand} = {module}{callable}'.format(
            command=command.command,
            subcommand=(':{}'.format(command.subcommand)
                        if command.subcommand else ''),
            module=module_name,
            callable=(':{}'.format(command.callable)
                      if command.callable else ''),
        )
        dct.setdefault(command.command, set()).add(entry_point)
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def set_grade(self, new_grade: float, user: User) -> None:
        """Set the grade to the new grade.

        .. note:: This also passes back the grade to LTI if this is necessary
            (see :py:func:`passback_grade`).

        :param new_grade: The new grade to set
        :param user: The user setting the new grade.
        :returns: Nothing
        """
        self._grade = new_grade
        passback = self.assignment.should_passback
        grade = self.grade
        history = GradeHistory(
            is_rubric=self._grade is None and grade is not None,
            grade=-1 if grade is None else grade,
            passed_back=False,
            work=self,
            user=user
        )
        db.session.add(history)
        db.session.flush()
        if passback:
            psef.tasks.passback_grades([self.id])
项目:SNOMEDToOWL    作者:hsolbrig    | 项目源码 | 文件源码
def _add_defining_attribute(self, coll: Collection, group: int, rels: Set[RF2Files.Relationship]) -> None:
        if group == 0:
            for rel in rels:
                restr = existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId))
                if rel.typeId in self._context.NEVER_GROUPED:
                    coll.append(restr)
                else:
                    coll.append(role_group(self, restr))
        else:
            if len(rels) > 1:
                # A group whose target is an intersection of subjects + inner restrictions
                target, inner_coll = intersection(self)
                [inner_coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)))
                 for rel in rels]
                coll.append(role_group(self, target))
            else:
                rel = list(rels)[0]
                coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)))
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def copy_tree_actions(base: str, include_patterns: t.Union[t.List[str], str] = ["**", "**/.*"],
                      exclude_patterns: t.List[str] = None) -> t.List[Action]:
    """
    Actions for all files and directories in the base directory that match the given patterns.
    It's used to copy a whole directory tree.

    :param base: base directory
    :param include_pattern: patterns that match the paths that should be included
    :param exclude_patterns: patterns that match the paths that should be excluded
    :return: list of actions
    """
    paths = matched_paths(base, include_patterns, exclude_patterns)
    files = set()  # type: t.Set[str]
    dirs = set()  # type: t.Set[str]
    ret = []  # type: t.List[Action]
    for path in paths:
        ret.extend(actions_for_dir_path(path, path_acc=dirs))
        if os.path.isfile(path) and path not in files:
            files.add(path)
            ret.append(CopyFile(normalize_path(path)))
    return ret
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def actions_for_dir_path(path: str, path_acc: t.Set[str] = set()) -> t.List[Action]:
    """
    Returns a list of actions that is needed to create a folder and it's parent folders.

    :param path:
    :param path_acc: paths already examined
    """
    path = abspath(path)
    typecheck_locals(path=FileName(allow_non_existent=False)|DirName(), create=Bool())
    assert os.path.exists(path)
    if path == "" or path == "~":
        return []
    path = normalize_path(path)
    parts = path.split("/")
    ret = []
    for i in range(2 if parts[0] == "~" else 1, len(parts) + 1 if os.path.isdir(abspath(path)) else len(parts)):
        subpath = "/".join(parts[:i])
        subpath_norm = normalize_path(subpath)
        if subpath_norm in path_acc:
            continue
        ret.append(CreateDir(subpath_norm))
        path_acc.add(subpath_norm)
    return ret
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def combine(*messages: t.Tuple[t.Optional['StatMessage']]) -> t.List['StatMessage']:
        """
        Combines all message of the same type and with the same parent in the passed list.
        Ignores None entries.

        :param messages: passed list of messages
        :return: new reduced list
        """
        msgs = set([msg for msg in messages if msg is not None]) # t.Set['StatMessage']
        something_changed = True
        while something_changed:
            something_changed = False
            merged_pair = None # type: t.Tuple['StatMessage', 'StatMessage']
            for (msg, msg2) in itertools.product(msgs, msgs):
                if msg is not msg2:
                    if msg.parent.eq_except_property(msg2.parent) and type(msg) == type(msg2):
                        merged_pair = (msg, msg2)
                        something_changed = True
                        break
            if something_changed:
                msg, msg2 = merged_pair
                msgs.remove(msg)
                msgs.remove(msg2)
                msgs.add(msg + msg2)
        return list(msgs)
项目:typesentry    作者:h2oai    | 项目源码 | 文件源码
def test_typing():
    from typing import Any, List, Set, Dict, Type, Tuple
    assert name_type(Any) == "Any"
    assert name_type(List) == "List"
    assert name_type(List[Any]) == "List"
    assert name_type(List[str]) == "List[str]"
    assert name_type(List[int]) == "List[int]"
    assert name_type(Set) == "Set"
    assert name_type(Set[Any]) == "Set"
    assert name_type(Set[List]) == "Set[List]"
    assert name_type(Dict) == "Dict"
    assert name_type(Dict[Any, Any]) == "Dict"
    assert name_type(Dict[str, int]) == "Dict[str, int]"
    assert name_type(Type) == "Type"
    assert name_type(Type[int]) == "Type[int]"
    assert name_type(Type[MagicType]) == "Type[MagicType]"
    assert name_type(Tuple) == "Tuple"
    assert name_type(Tuple[int]) == "Tuple[int]"
    assert name_type(Tuple[int, str, List]) == "Tuple[int, str, List]"
    assert name_type(Tuple[int, Ellipsis]) == "Tuple[int, ...]"
    assert name_type(Tuple[str, Ellipsis]) == "Tuple[str, ...]"
项目:typesentry    作者:h2oai    | 项目源码 | 文件源码
def name(self):
        if self._args is None:
            return "Callable"
        elif self._args[0] is Ellipsis:
            return "Callable[..., %s]" % checker_for_type(self._args[1]).name()
        else:
            return "Callable[[%s], %s]" % (
                ", ".join(checker_for_type(z).name() for z in self._args[:-1]),
                checker_for_type(self._args[-1]).name())



# ------------------------------------------------------------------------------
#
# Set operations with checkers
# ------------------------------------------------------------------------------
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def __init__(self, master_db, lib_name, params, used_names, **kwargs):
        # type: (MasterDB, str, Dict[str, Any], Set[str], **kwargs) -> None
        self._master_db = master_db
        self._lib_name = lib_name
        self._used_names = used_names

        # set parameters
        params_info = self.get_params_info()
        default_params = self.get_default_param_values()
        self.params = {}
        if params_info is None:
            # compatibility with old schematics generators
            self.params.update(params)
            self._prelim_key = self.to_immutable_id((self._get_qualified_name(), params))
            self._cell_name = None
            self._key = None
        else:
            self.populate_params(params, params_info, default_params, **kwargs)
            # get unique cell name
            self._prelim_key = self.compute_unique_key()
            self.update_master_info()

        self.children = None
        self._finalized = False
项目:nirum-python    作者:spoqa    | 项目源码 | 文件源码
def deserialize_abstract_type(cls, data):
    abstract_type_map = {
        typing.Sequence: list,
        typing.List: list,
        typing.Dict: dict,
        typing.Set: set,
        typing.AbstractSet: set,
    }
    cls_origin_type = cls.__origin__
    if cls_origin_type is None:
        cls_origin_type = cls
    iterable_types = {
        typing.Sequence, typing.List, typing.Tuple, typing.Set,
        typing.AbstractSet, typing.Mapping,
    }
    if cls_origin_type in iterable_types:
        return deserialize_iterable_abstract_type(cls, cls_origin_type, data)
    else:
        return abstract_type_map[cls_origin_type](data)
项目:tundra    作者:caiopo    | 项目源码 | 文件源码
def hamiltonian_cycle(g: Graph, start: Vertex) -> List[Vertex]:
    path = [start]

    current = start

    visited: Set[Vertex] = set()

    try:
        while len(visited) != g.order:
            visited.add(current)

            (_, nearest) = min(
                (g.weight[current, v], v)
                for v in g.neighbors(current)
                if v not in visited
            )

            path.append(nearest)

            current = nearest
    except ValueError as e:
        if len(path) == g.order:
            return path

    raise HamiltonianCycleNotFound('graph has dead ends')
项目:tundra    作者:caiopo    | 项目源码 | 文件源码
def dfs(g: Graph, current: Vertex, condition: Test,
        visited: Set = None) -> Optional[Vertex]:

    visited = visited or set()

    if current in visited:
        return None

    visited.add(current)

    if condition(current):
        return current

    for n in g.neighbors(current):
        v = dfs(g, n, condition, visited)

        if v is not None:
            return v

    return None
项目:tundra    作者:caiopo    | 项目源码 | 文件源码
def transitive_closure(
        g: Graph,
        v: Vertex,
        visited: Optional[Set[Vertex]] = None) -> Set[Vertex]:
    """
    Returns a set containing all vertices reachable from v
    """
    visited = visited or set()

    visited.add(v)

    for v_neigh in g.neighbors(v):
        if v_neigh not in visited:
            transitive_closure(g, v_neigh, visited)

    return visited
项目:jsonschema-extractor    作者:toumorokoshi    | 项目源码 | 文件源码
def test_typing_extractor_register(typing_extractor):

    def extract_set(extractor, typ):
        subtype = Any
        if typ.__args__ and typ.__args__[0] is not Any:
            subtype = typ.__args__[0]
        return {
            "type": "array",
            "title": "set",
            "items": extractor.extract(extractor, subtype)
        }

    typing_extractor.register(set, extract_set)

    assert typing_extractor.extract(typing_extractor, Set[int]) == {
        "type": "array",
        "title": "set",
        "items": {"type": "integer"}
    }
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def from_type(t):
    '''Converts a type `t` to a Typeable
    '''
    if isinstance(t, Typeable):
        return t

    if isinstance(t, list):
        if len(t) != 1:
            if len(t) < 1:
                reason = 'Missing type parameter'
            else:
                reason = 'Too many type parameters, only homogenous lists allowed'
            msg = 'Can only use literal list alias with a single type, `{}` is invalid: {}'
            raise ValueError(msg.format(repr(t), reason))
        t0 = from_type(t[0]).typ
        return from_type(typing.List[t0])
    elif isinstance(t, set):
        if len(t) != 1:
            if len(t) < 1:
                reason = 'Missing type parameter'
            else:
                reason = 'Too many type parameters, only homogenous sets allowed'
            msg = 'Can only use literal set alias with a single type, `{}` is invalid: {}'
            raise ValueError(msg.format(repr(t), reason))
        t0 = from_type(next(iter(t))).typ
        return from_type(typing.Set[t0])
    elif isinstance(t, tuple):
        args = tuple([from_type(a).typ for a in t])
        return from_type(typing.Tuple[args])

    return _from_typing36(t)
项目:speccer    作者:bensimner    | 项目源码 | 文件源码
def test_setint_convert():
    it = typeable.from_type({int})
    assert it.typ is typing.Set[int]
    assert it.origin.typ is typing.Set
    assert it.origin.origin is None
    assert it.origin.args == []
    assert it.arity == 0
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def __init__(self, seed: Union[int, None] = None) -> None:
        self._hyperparameters = OrderedDict()  # type: OrderedDict[str, Hyperparameter]
        self._hyperparameter_idx = dict()  # type: Dict[str, int]
        self._idx_to_hyperparameter = dict()  # type: Dict[int, str]

        # Use dictionaries to make sure that we don't accidently add
        # additional keys to these mappings (which happened with defaultdict()).
        # This once broke auto-sklearn's equal comparison of configuration
        # spaces when _children of one instance contained  all possible
        # hyperparameters as keys and empty dictionaries as values while the
        # other instance not containing these.
        self._children = OrderedDict()   # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]]
        self._parents = OrderedDict()   # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]]

        # changing this to a normal dict will break sampling because there is
        #  no guarantee that the parent of a condition was evaluated before
        self._conditionals = set()   # type: Set[str]
        self.forbidden_clauses = []  # type: List['AbstractForbiddenComponent']
        self.random = np.random.RandomState(seed)

        self._children['__HPOlib_configuration_space_root__'] = OrderedDict()

        # caching
        self._parent_conditions_of = dict()
        self._child_conditions_of = dict()
        self._parents_of = dict()
        self._children_of = dict()
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def __init__(self, nodes: Set[Node], in_node: Node, out_node: Node, edges: Set[Edge]):
        """Control flow graph representation.

        :param nodes: set of nodes of the control flow graph
        :param in_node: entry node of the control flow graph
        :param out_node: exit node of the control flow graph
        :param edges: set of edges of the control flow graph
        """
        self._nodes = {node.identifier: node for node in nodes}
        self._in_node = in_node
        self._out_node = out_node
        self._edges = {(edge.source, edge.target): edge for edge in edges}
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def in_edges(self, node: Node) -> Set[Edge]:
        """Ingoing edges of a given node.

        :param node: given node
        :return: set of ingoing edges of the node
        """
        return {self.edges[(source, target)] for (source, target) in self.edges if target == node}
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def predecessors(self, node: Node) -> Set[Node]:
        """Predecessors of a given node.

        :param node: given node
        :return: set of predecessors of the node
        """
        return {edge.source for edge in self.in_edges(node)}
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def out_edges(self, node: Node) -> Set[Edge]:
        """Outgoing edges of a given node.

        :param node: given node
        :return: set of outgoing edges of the node
        """
        return {self.edges[(source, target)] for (source, target) in self.edges if source == node}
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def successors(self, node: Node) -> Set[Node]:
        """Successors of a given node.

        :param node: given node
        :return: set of successors of the node
        """
        return {edge.target for edge in self.out_edges(node)}
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def result(self, result: Set[Expression]):
        self._result = result