Python typing 模块,TextIO() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用typing.TextIO()

项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def _get_read_alignments(f: TextIO, reads: ReadMapping) -> AlignmentsT:
    logger.info("Pass 2 of alignments GFA2 file to import all pairwise local "
                "alignments...")
    read_alignments = defaultdict(dict)

    la_iter = map(gfa.gfa2_line_to_la(reads),
                  (l for l in f if l.startswith('E')))

    for la in la_iter:
        a_read, b_read = la.get_oriented_reads()
        read_alignments[a_read][b_read] = la
        read_alignments[b_read][a_read] = la.switch()

    logger.info("Done.")

    return read_alignments
项目:gopythongo    作者:gopythongo    | 项目源码 | 文件源码
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO:
        """
        :param filename: the name of the file in the generated config folder
        :param mode: pass an ``int`` here if you want to modify the files mode (will be umasked)
        :return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on*
        """
        if os.path.isfile(filename):
            raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path")

        self.ensure_config_folder()
        f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8"))

        if mode:
            os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode))

        return f
项目:wifimitm    作者:mvondracek    | 项目源码 | 文件源码
def get_personalized_dictionaries(target: WirelessAccessPoint) -> List[TextIO]:
    """
    Create and return dictionary personalized by available AP details.
    :type target: WirelessAccessPoint
    :param target: targeted AP

    :rtype: List[TextIO]
    :return: list of opened personalized dictionaries
    """
    dictionaries = []
    if re.match(r'^UPC\d{7}$', target.essid):
        t = pipes.Template()
        t.prepend('upc_keys {} {}'.format(target.essid, '24'), '.-')
        t.append('grep "  -> WPA2 phrase for "', '--')
        t.append('sed "s/^  -> WPA2 phrase for \S* = \'\(.*\)\'$/\\1/"', '--')
        d = t.open('dictionary-pipeline', 'r')
        dictionaries.append(d)

    return dictionaries
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def fetch_complete(self) -> TextIO:
        """Provide complete feed contents.

        Returns:
            Text stream of complete feed contents, that should be used
            in with-statement to be closed afterwards.

        Raises:
            requests.HTTPError: When an HTTP error occurs when fetching feed.
        """

        session = default_new_session(self.session)

        resp = session.get(self.complete_url)
        resp.raise_for_status()

        with self._decode_contents(resp.content) as text:
            yield text
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def load(cls: Type['ModPack'], stream: TextIO) -> 'ModPack':
        """Load mod-pack data from a file stream.

        Keyword arguments:
            stream: The text stream to load the data from.

        Returns:
            Loaded mod-pack.
        """

        validator = cerberus.Validator(cerberus.schema_registry.get('pack'))

        if not validator.validate(yaml.load(stream)):
            msg = _('Modpack file contains invalid data'), validator.errors
            raise exceptions.InvalidStream(*msg)
        else:
            data = validator.document
            return cls(
                game=data['game'],
                path=Path(data['files']['path']),
                mods=OrderedDict((d.mod.id, d) for d in data['files']['mods']),
                dependencies=OrderedDict((d.mod.id, d) for d in data['files']['dependencies']),
            )
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def __init__(self, filename: str, terminal: TextIO) -> None:
        self.terminal = terminal
        parent_directory = os.path.dirname(filename)
        os.makedirs(parent_directory, exist_ok=True)
        self.log = open(filename, 'a')
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def gfa2_parse_segments(f: TextIO) -> ReadMapping:
    read_iter = map(gfa2_segment_to_read, (l for l in f if l.startswith('S')))
    return {r.id: r for r in read_iter}
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def write_graph(f: TextIO, g: AssemblyGraph, version=2, **kwargs):
    if version == 1:
        return gfa1_write_graph(f, g, **kwargs)
    else:
        return gfa2_write_graph(f, g, **kwargs)
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def gfa1_write_graph(f: TextIO, g: AssemblyGraph,
                     with_orig_segments: SegmentMapping=None):
    f.write(gfa_header("1.0"))

    segments = set()
    for n in g.nodes_iter():
        segment_name = str(n)[:-1]
        if segment_name in segments:
            continue

        segments.add(segment_name)

        if with_orig_segments and segment_name in with_orig_segments:
            n = with_orig_segments[segment_name]

        parts = ["S", segment_name, "*", "LN:i:{}".format(n.length)]
        f.write(gfa_line(*parts))

    for u, v, d in g.edges_iter(data=True):
        uid = str(u)[:-1]
        u_strand = str(u)[-1:]
        vid = str(v)[:-1]
        v_strand = str(v)[-1:]

        # Fake CIGAR string just indicating overlap length
        overlap = str(d['overlap_len'])+"M"

        parts = ["L", uid, u_strand, vid, v_strand, overlap]
        f.write(gfa_line(*parts))
项目:smartchangelog    作者:ngouzy    | 项目源码 | 文件源码
def set_commit_editmsg(msg: str) -> Iterator[TextIO]:
    filename = 'COMMIT_EDITMSG'
    with open(filename, mode='w') as f:
        f.write(msg)
    try:
        yield cast(TextIO, f)
    finally:
        if os.path.isfile(filename):
            os.remove(filename)
项目:arsenic    作者:HDE    | 项目源码 | 文件源码
def subprocess_based_service(cmd: List[str],
                                   service_url: str,
                                   log_file: TextIO) -> WebDriver:
    check_event_loop()
    closers = []
    try:
        if log_file is os.devnull:
            log_file = DEVNULL
        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=log_file,
            stderr=log_file,
        )
        closers.append(partial(stop_process, process))
        session = ClientSession()
        closers.append(session.close)
        count = 0
        while True:
            try:
                await tasked(session.request(
                    url=service_url + '/status',
                    method='GET'
                ))
                break
            except:
                # TODO: make this better
                count += 1
                if count > 30:
                    raise Exception('not starting?')
                await asyncio.sleep(0.5)
        return WebDriver(
            Connection(session, service_url),
            closers,
        )
    except:
        for closer in reversed(closers):
            await closer()
        raise
项目:wifimitm    作者:mvondracek    | 项目源码 | 文件源码
def update(self, print_stream: Optional[TextIO]=None, print_prefix: Optional[str]='MITMf 1> '):
        """
        Update state of running process from process' feedback.
        Read new output from stdout and stderr, check if process is alive.
        :type print_stream: Optional[TextIO]
        :param print_stream: Print information about HTTP traffic from MITMf's stdout to provided stream.
        :type print_prefix: Optional[str]
        :param print_prefix: Prepend provided string in the beginning of every line printed to `print_stream`.
        """
        super().update()
        # Is process running? State would be changed after reading stdout and stderr.
        self.poll()

        # check every added line in stdout
        if self.stdout_r and not self.stdout_r.closed:
            for line in self.stdout_r:
                if self.state == self.State.STARTED and line == '|_ SMB server online\n':
                    self.state = self.State.SPOOFING

                elif self.state == self.State.SPOOFING and line != '\n':
                    if print_stream:
                        print(print_prefix + line, end='', file=print_stream)

        # check every added line in stderr
        if self.stderr_r and not self.stderr_r.closed:
            for line in self.stderr_r:
                if ' * Running on http://127.0.0.1:9999/ (Press CTRL+C to quit)\n' == line:
                    continue
                # NOTE: stderr should be now empty
                logger.warning("Unexpected stderr of 'mitmf': '{}'. {}".format(line, str(self)))

        # Change state if process was not running in the time of poll() call in the beginning of this method.
        # NOTE: Process' poll() needs to be called in the beginning of this method and returncode checked in the end
        # to ensure all feedback (stdout and stderr) is read and states are changed accordingly.
        # If the process exited, its state is not changed immediately. All available feedback is read and then
        # the state is changed to self.State.TERMINATED. State, flags,stats and others can be changed during reading
        # the available feedback even if the process exited. But self.State.TERMINATED is assigned here if
        # the process exited.
        if self.returncode is not None:
            self.state = self.State.TERMINATED
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def resolve(self, include_file: str, fpname: str) -> t.TextIO:
        """Resolve include_file and return a readable file like object.

        :param include_file: File to be include.
        :param fpname: Main configuration filename.
        :return: Return a readable file-like object for the specified resource.
        """
        parts = urlparse(include_file)
        if parts.scheme not in _VALID_SCHEMAS_:
            raise exc.InvalidResourceScheme(
                "Supported resources: {resources}. Got {include} in {fpname}".format(
                    resources=', '.join(_VALID_SCHEMAS_),
                    include=include_file,
                    fpname=fpname
                )
            )

        package = parts.netloc
        args = package.split('.') + [parts.path.lstrip('/')]
        path = os.path.join(*args)

        req = pkg_resources.Requirement.parse(package)

        if not _resource_manager.resource_exists(req, path):
            raise exc.NonExistingInclude(
                "Could not find {include}".format(include=include_file)
            )

        config_source = _resource_manager.resource_stream(req, path)
        return config_source
项目:unidump    作者:Codepoints    | 项目源码 | 文件源码
def __init__(self, linelength: int = None, encoding: str = None,
                 lineformat: str = None, output: TextIO = None) -> None:
        if linelength is not None:
            self.linelength = linelength
        if encoding is not None:
            self.encoding = encoding
        if lineformat is not None:
            self.lineformat = lineformat
        if output is not None:
            self.output = output
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def extract_deps(fd: TextIO) -> Generator[str, None, None]:
    """Extract dependencies from file."""
    yield from (
        line for line in fd
        if line and not line.startswith(('#', 'git+'))
    )


# Setup values preparation
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def _decode_contents(feed: bytes) -> TextIO:
        """Decode the provided data from bz2 to text.

        The :arg:`feed` is assumed to be bz2-encoded text data in utf-8
        encoding.

        Keyword arguments:
            feed: The data to be decoded.

        Returns: Decoded text stream.
        """

        with BytesIO(feed) as compressed, \
                bz2.open(compressed, mode='rt', encoding='utf-8') as stream:
            yield stream
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def dump(self: 'ModPack', stream: TextIO) -> None:
        """Serialize self to a file stream.

        Keyword arguments:
            stream: The text stream to serialize into.
        """

        data = OrderedDict()
        data['game'] = self.game
        data['files'] = OrderedDict()
        data['files']['path'] = str(self.path)
        data['files']['mods'] = list(self.mods.values())
        data['files']['dependencies'] = list(self.dependencies.values())

        yaml.dump(data, stream)
项目:mccurse    作者:khardix    | 项目源码 | 文件源码
def dump(self, file: TextIO) -> None:
        """Store credentials for future use.

        Keyword arguments:
            file: Open YAML text stream to write to.
        """

        yaml.dump(attr.asdict(self), file)
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def map_input(self, data: typing.TextIO) -> typing.Dict[str, typing.Any]:
        result = json.load(data)  # type: typing.Dict[str, typing.Any]
        return result
项目:libiocage    作者:iocage    | 项目源码 | 文件源码
def map_input(self, data: typing.TextIO) -> typing.Dict[str, typing.Any]:
        result = ucl.load(data.read())  # type: typing.Dict[str, typing.Any]
        result["legacy"] = True
        return result
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def write_to_conll_eval_file(prediction_file: TextIO,
                             gold_file: TextIO,
                             verb_index: Optional[int],
                             sentence: List[str],
                             prediction: List[str],
                             gold_labels: List[str]):
    """
    Prints predicate argument predictions and gold labels for a single verbal
    predicate in a sentence to two provided file references.

    Parameters
    ----------
    prediction_file : TextIO, required.
        A file reference to print predictions to.
    gold_file : TextIO, required.
        A file reference to print gold labels to.
    verb_index : Optional[int], required.
        The index of the verbal predicate in the sentence which
        the gold labels are the arguments for, or None if the sentence
        contains no verbal predicate.
    sentence : List[str], required.
        The word tokens.
    prediction : List[str], required.
        The predicted BIO labels.
    gold_labels : List[str], required.
        The gold BIO labels.
    """
    verb_only_sentence = ["-"] * len(sentence)
    if verb_index:
        verb_only_sentence[verb_index] = sentence[verb_index]

    conll_format_predictions = convert_bio_tags_to_conll_format(prediction)
    conll_format_gold_labels = convert_bio_tags_to_conll_format(gold_labels)

    for word, predicted, gold in zip(verb_only_sentence,
                                     conll_format_predictions,
                                     conll_format_gold_labels):
        prediction_file.write(word.ljust(15))
        prediction_file.write(predicted.rjust(15) + "\n")
        gold_file.write(word.ljust(15))
        gold_file.write(gold.rjust(15) + "\n")
    prediction_file.write("\n")
    gold_file.write("\n")
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def gfa2_parse_segments_with_fragments(f: TextIO):
    """Parse all segments and fragments from a GFA2 file and store them in a
    dict.

    In PHASM fragments are used to denote merged reads."""

    segments = {}
    fragments = defaultdict(list)
    for line in f:
        if not line.startswith('S') and not line.startswith('F'):
            continue

        parts = line.strip().split('\t')

        line_type = parts[0].strip()
        segment_name = parts[1].strip()
        if line_type == 'S':
            segments[segment_name] = line

        if line_type == 'F':
            fragments[segment_name].append(gfa2_parse_fragment(line))

    reads = {}
    for segment, line in segments.items():
        if segment not in fragments:
            reads[segment] = gfa2_segment_to_read(line)
        else:
            read = gfa2_segment_to_read(line)
            length = len(read)
            fragment_reads = []
            prefix_lengths = []

            for fragment_info in sorted(fragments[segment],
                                        key=lambda elem: elem[2][0]):
                _, external_id, segment_range, fragment_range = fragment_info
                fragment_length = fragment_range[1] - fragment_range[0]
                fragment_reads.append(external_id)
                prefix_lengths.append(fragment_length)

            prefix_lengths.pop()
            reads[segment] = MergedFragment(read.id, length, fragment_reads,
                                            prefix_lengths)

    return reads