Python io 模块,TextIOWrapper() 实例源码


项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def generate_info():
    tickets_archive_path = ROOT_DIR_PATH.joinpath('')
    ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL'])

    with zipfile.ZipFile(str(tickets_archive_path)) as zf:
        for name in zf.namelist():
            stem, ext = os.path.splitext(name)
            if ext != '.csv':
            with as f:
                # Zipfile only opens file in binary mode, but csv only accepts
                # text files, so we need to wrap this.
                # See <>.
                textfile = io.TextIOWrapper(f, encoding='utf8', newline='')
                for row in csv.DictReader(textfile):
                    yield Registration(row)
项目:django-webpack    作者:csinchok    | 项目源码 | 文件源码
def run(self, *args, **options):
        p_path = os.path.join('/proc', str(os.getppid()), 'cmdline')
        with open(p_path, 'rb') as f:
            p_cmdline ='\x00')

        p = None
        if b'runserver' not in p_cmdline:
            self.stdout.write("Starting webpack-dev-server...")
            p = webpack_dev_server()
            wrapper = io.TextIOWrapper(p.stdout, line_buffering=True)
            first_line = next(wrapper)
            webpack_host = first_line.split()[-1]



        if p:
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def open_str(self, name: str, encoding='utf8'):
        """Open a file in unicode mode or raise FileNotFoundError.

        The return value is a StringIO in-memory buffer.
        with self:
            # File() calls with the VPK object we need directly.
            if isinstance(name, VPKFile):
                file = name
                    file = self._ref[name]
                except KeyError:
                    raise FileNotFoundError(name)
            # Wrap the data to treat it as bytes, then
            # wrap that to decode and clean up universal newlines.
            return io.TextIOWrapper(io.BytesIO(, encoding)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
项目:target-csv    作者:singer-io    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--config', help='Config file')
    args = parser.parse_args()

    if args.config:
        with open(args.config) as input:
            config = json.load(input)
        config = {}

    if not config.get('disable_collection', False):'Sending version information to ' +
                    'To disable sending anonymous usage data, set ' +
                    'the config parameter "disable_collection" to true')

    input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
    state = persist_lines(config.get('delimiter', ','),
                          config.get('quotechar', '"'),

    logger.debug("Exiting normally")
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def gettext_popen_wrapper(args, os_err_exc_type=CommandError, stdout_encoding="utf-8"):
    Makes sure text obtained from stdout of gettext utilities is Unicode.
    # This both decodes utf-8 and cleans line endings. Simply using
    # popen_wrapper(universal_newlines=True) doesn't properly handle the
    # encoding. This goes back to popen's flaky support for encoding:
    # This is a solution for #23271, #21928.
    # No need to do anything on Python 2 because it's already a byte-string there.
    manual_io_wrapper = six.PY3 and stdout_encoding != DEFAULT_LOCALE_ENCODING

    stdout, stderr, status_code = popen_wrapper(args, os_err_exc_type=os_err_exc_type,
                                                universal_newlines=not manual_io_wrapper)
    if manual_io_wrapper:
        stdout = io.TextIOWrapper(io.BytesIO(stdout), encoding=stdout_encoding).read()
    if six.PY2:
        stdout = stdout.decode(stdout_encoding)
    return stdout, stderr, status_code
项目    作者:biothings    | 项目源码 | 文件源码
def anyfile(infile, mode='r', encoding="utf8"):
    return a file handler with the support for gzip/zip comppressed files
    if infile is a two value tuple, then first one is the compressed file;
      the second one is the actual filename in the compressed file.
      e.g., ('', 'aa.txt')

    if isinstance(infile, tuple):
        infile, rawfile = infile[:2]
        rawfile = os.path.splitext(infile)[0]
    filetype = os.path.splitext(infile)[1].lower()
    if filetype == '.gz':
        import gzip
        in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding)
    elif filetype == '.zip':
        import zipfile
        in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding)
        in_f = open(infile, mode, encoding=encoding)
    return in_f
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def open_str(self, name: str, encoding='utf8'):
        """Return a string buffer for a 'file'. 

        This performs universal newlines conversion.
        The encoding argument is ignored for files which are
        originally text.
        # We don't need this, but it should match other filesystems.

            filename, data = self._mapping[self._clean_path(name)]
        except KeyError:
            raise FileNotFoundError(name)
        if isinstance(data, bytes):
            # Decode on the fly, with universal newlines.
            return io.TextIOWrapper(
            # None = universal newlines mode directly.
            # No encoding is needed obviously.
            return io.StringIO(data, newline=None)
项目:atc_alexa    作者:ckuzma    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def read_cases(file):
    open = try_open_zip(file)
    if not open:
        open = try_open_tar(file)
        if not open:
            raise FormatError(file, 'not a zip file or tar file')
        config = TextIOWrapper(open('config.ini'),
                               encoding='utf-8', errors='replace')
        return read_legacy_cases(config, open)
    except FileNotFoundError:
        config = open('config.yaml')
        return read_yaml_cases(config, open)
    except FileNotFoundError:
    raise FormatError('config file not found')
项目:flake8-rst-docstrings    作者:peterjc    | 项目源码 | 文件源码
def load_source(self):
        """Load the source for the specified file."""
        if self.filename in self.STDIN_NAMES:
            self.filename = 'stdin'
            if sys.version_info[0] < 3:
                self.source =
                self.source = TextIOWrapper(sys.stdin.buffer,
            # Could be a Python 2.7 StringIO with no context manager, sigh.
            # with tokenize_open(self.filename) as fd:
            #     self.source =
            handle = tokenize_open(self.filename)
            self.source =
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def __write_merged_imports(self, file: TextIOWrapper, imports: list):
        Sorts and write the given imports to the file, adding a blank line between
        each group
        f -- the file object (needs write permission)
        imports -- the list of imports
        sorted_imports = self.sort_imports(imports)
        previous_group = -1
        for imp in sorted_imports:
            group = self.get_import_group(imp)
            if not (group == previous_group):
                if not (previous_group == -1):
                previous_group = group
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def source_to_unicode(txt, errors='replace', skip_encoding_cookie=True):
    """Converts a bytes string with python source code to unicode.

    Unicode strings are passed through unchanged. Byte strings are checked
    for the python source file encoding cookie to determine encoding.
    txt can be either a bytes buffer or a string containing the source
    if isinstance(txt, str):
        return txt
    if isinstance(txt, bytes):
        buffer = BytesIO(txt)
        buffer = txt
        encoding, _ = detect_encoding(buffer.readline)
    except SyntaxError:
        encoding = "ascii"
    text = TextIOWrapper(buffer, encoding, errors=errors, line_buffering=True)
    text.mode = 'r'
    if skip_encoding_cookie:
        return u"".join(strip_encoding_cookie(text))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering == None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
        proc = subprocess.Popen(cmd,
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_fileobj_readlines(self):
        self.tar.extract("ustar/regtype", TEMPDIR)
        tarinfo = self.tar.getmember("ustar/regtype")
        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
            lines1 = fobj1.readlines()

        fobj = self.tar.extractfile(tarinfo)
            fobj2 = io.TextIOWrapper(fobj)
            lines2 = fobj2.readlines()
            self.assertTrue(lines1 == lines2,
                    "fileobj.readlines() failed")
            self.assertTrue(len(lines2) == 114,
                    "fileobj.readlines() failed")
            self.assertTrue(lines2[83] ==
                    "I will gladly admit that Python is not the fastest running scripting language.\n",
                    "fileobj.readlines() failed")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_add(self):
        # Add copies of a sample message
        keys = []
        keys.append(self._box.add(self._template % 0))
        self.assertEqual(len(self._box), 1)
        self.assertEqual(len(self._box), 2)
        self.assertEqual(len(self._box), 3)
        self.assertEqual(len(self._box), 4)
        self.assertEqual(len(self._box), 5)
        self.assertEqual(len(self._box), 6)
        with self.assertWarns(DeprecationWarning):
        self.assertEqual(len(self._box), 7)
        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
        for i in (1, 2, 3, 4, 5, 6):
项目:py-cloud-compute-cannon    作者:Autodesk    | 项目源码 | 文件源码
def open(self, mode='r', encoding=None):
        """Return file-like object

            mode (str): access mode (only reading modes are supported)
            encoding (str): text decoding method for text access (default: system default)

            io.BytesIO OR io.TextIOWrapper: buffer accessing the file as bytes or characters
        access_type = self._get_access_type(mode)

        if access_type == 't' and encoding is not None and encoding != self.encoded_with:
            warnings.warn('Attempting to decode %s as "%s", but encoding is declared as "%s"'
                          % (self, encoding, self.encoded_with))

        if encoding is None:
            encoding = self.encoded_with

        buffer = io.BytesIO(self._contents)
        if access_type == 'b':
            return buffer
            return io.TextIOWrapper(buffer, encoding=encoding)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def open(filename):
    """Open a file in read only mode using the encoding detected by
    buffer = _builtin_open(filename, 'rb')
        encoding, lines = detect_encoding(buffer.readline)
        text = TextIOWrapper(buffer, encoding, line_buffering=True)
        text.mode = 'r'
        return text
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def load(self, file):
            tree = json.load(io.TextIOWrapper(file, encoding='utf8'))
            tree = Audit.SCHEMA.validate(tree)
            self.__artifact = Artifact.fromData(tree["artifact"])
            self.__references = {
                r["artifact-id"] : Artifact.fromData(r) for r in tree["references"]
        except schema.SchemaError as e:
            raise ParseError("Invalid audit record: " + str(e))
        except ValueError as e:
            raise ParseError("Invalid json: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def save(self, file):
        tree = {
            "artifact" : self.__artifact.dump(),
            "references" : [ a.dump() for a in self.__references.values() ]
            with, 'wb', 6) as gzf:
                json.dump(tree, io.TextIOWrapper(gzf, encoding='utf8'))
        except OSError as e:
            raise BuildError("Cannot write audit: " + str(e))
项目:pyecharts-snapshot    作者:chfw    | 项目源码 | 文件源码
def make_a_snapshot(file_name, output_name, delay=DEFAULT_DELAY):
    file_type = output_name.split('.')[-1]
    pixel_ratio = 2
    shell_flag = False
    if sys.platform == 'win32':
        shell_flag = True
    __actual_delay_in_ms = int(delay * 1000)

    # add shell=True and it works on Windows now.
    proc_params = [
        os.path.join(get_resource_dir('phantomjs'), 'snapshot.js'),
        file_name.replace('\\', '/'),
    proc = subprocess.Popen(
        proc_params, stdout=subprocess.PIPE, shell=shell_flag)
    if PY2:
        content =
        content = content.decode('utf-8')
        content = io.TextIOWrapper(proc.stdout, encoding="utf-8").read()
    content_array = content.split(',')
    if len(content_array) != 2:
        raise Exception("No snapshot taken by phantomjs. " +
                        "Please make sure it is installed " +
                        "and available on your path")
    base64_imagedata = content_array[1]
    imagedata = decode_base64(base64_imagedata.encode('utf-8'))
    if file_type in ['pdf', 'gif']:
        save_as(imagedata, output_name, file_type)
    elif file_type in ['png', 'jpeg']:
        save_as_png(imagedata, output_name)
        raise Exception(NOT_SUPPORTED_FILE_TYPE % file_type)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _wrap_reader_for_text(fp, encoding):
    if isinstance(, bytes):
        fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
    return fp
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _wrap_writer_for_text(fp, encoding):
    except TypeError:
        fp = io.TextIOWrapper(fp, encoding)
    return fp
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if sys.version_info < (3,) or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)

# Install it throughout the distutils
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:smart-card-removinator    作者:nkinder    | 项目源码 | 文件源码
def __init__(self, port=None):
        """Opens a connection to a Removinator controller on the specified
        serial port.

        If a port is not specified, an attempt will be made to auto-discover
        the Removinator controller.

        :param port: port that the Removinator is connected to
        :type port: str
        :raises: :exc:`ConnectError`
        :ivar last_result: The result output from the last executed command
        :ivar last_response: The response output from the last executed command
        :ivar port: The port that the Removinator is connected to

        self.port = port
        self.last_result = ''
        self.last_response = ''

        # Attempt to discover the Removinator controller if a
        # port was not specified.
        if port is None:
            port = _discover_removinator()

        # Open a connection to the Removinator controller.
            self.connection = serial.Serial(
            self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection,
        except serial.SerialException as e:
            raise ConnectError('Unable to open connection to Removinator '
                               'controller on port {0}'.format(port))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def readlines(filename):
        """Read the source code."""
            with open(filename, 'rb') as f:
                (coding, lines) = tokenize.detect_encoding(f.readline)
                f = TextIOWrapper(f, coding, line_buffering=True)
                return [l.decode(coding) for l in lines] + f.readlines()
        except (LookupError, SyntaxError, UnicodeError):
            # Fall back if file encoding is improperly declared
            with open(filename, encoding='latin-1') as f:
                return f.readlines()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def stdin_get_value():
        return TextIOWrapper(sys.stdin.buffer, errors='ignore').read()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def __init__(self, stream, encoding, errors, **extra):
        self._stream = stream = _FixupStream(stream)
        io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)

    # The io module is a place where the Python 3 text behavior
    # was forced upon Python 2, so we need to unbreak
    # it to look like Python 2.
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def write(self, x):
            if isinstance(x, str) or is_bytes(x):
                except Exception:
                return self.buffer.write(str(x))
            return io.TextIOWrapper.write(self, x)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _wrap_reader_for_text(fp, encoding):
    if isinstance(, bytes):
        fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
    return fp
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _wrap_writer_for_text(fp, encoding):
    except TypeError:
        fp = io.TextIOWrapper(fp, encoding)
    return fp
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)

# Install it throughout the distutils
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)

# Install it throughout the distutils
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)

# Install it throughout the distutils
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)
项目:dit    作者:filipelbc    | 项目源码 | 文件源码
def begin():
    if _isatty:
        global _pager
        global _file
        _pager = subprocess.Popen(['less', '-F', '-R', '-S', '-X', '-K'],
                                  stdin=subprocess.PIPE, stdout=sys.stdout)
        _file = io.TextIOWrapper(_pager.stdin, 'UTF-8')
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    Backport of ``socket.makefile`` from Python 3.5.
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def read_gzip(filepath):
    # handles buffering # TODO: profile whether this is fast.
    with, 'rb') as f: # leave in binary mode (default), let TextIOWrapper decode
        with io.BufferedReader(f, buffer_size=2**18) as g: # 256KB buffer
            with io.TextIOWrapper(g) as h: # bytes -> unicode
                yield h
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def _wrap_reader_for_text(fp, encoding):
    if isinstance(, bytes):
        fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
    return fp
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def _wrap_writer_for_text(fp, encoding):
    except TypeError:
        fp = io.TextIOWrapper(fp, encoding)
    return fp
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def handle_display_options(self, option_order):
        """If there were any non-global "display-only" options
        (--help-commands or the metadata display options) on the command
        line, display the requested info and return true; else return
        import sys

        if six.PY2 or self.help_commands:
            return _Distribution.handle_display_options(self, option_order)

        # Stdout may be StringIO (e.g. in tests)
        import io
        if not isinstance(sys.stdout, io.TextIOWrapper):
            return _Distribution.handle_display_options(self, option_order)

        # Don't wrap stdout if utf-8 is already the encoding. Provides
        #  workaround for #334.
        if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
            return _Distribution.handle_display_options(self, option_order)

        # Print metadata in UTF-8 no matter the platform
        encoding = sys.stdout.encoding
        errors = sys.stdout.errors
        newline = sys.platform != 'win32' and '\n' or None
        line_buffering = sys.stdout.line_buffering

        sys.stdout = io.TextIOWrapper(
            sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
            return _Distribution.handle_display_options(self, option_order)
            sys.stdout = io.TextIOWrapper(
                sys.stdout.detach(), encoding, errors, newline, line_buffering)

# Install it throughout the distutils