Python codecs 模块,StreamReaderWriter() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用codecs.StreamReaderWriter()

项目:linter    作者:ethz-asl    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:chalktalk_docs    作者:loremIpsum1771    | 项目源码 | 文件源码
def read_with_encoding(self, filename, document, codec_info, encoding):
        f = None
        try:
            f = codecs.StreamReaderWriter(open(filename, 'rb'), codec_info[2],
                                          codec_info[3], 'strict')
            lines = f.readlines()
            lines = dedent_lines(lines, self.options.get('dedent'))
            return lines
        except (IOError, OSError):
            return [document.reporter.warning(
                'Include file %r not found or reading it failed' % filename,
                line=self.lineno)]
        except UnicodeError:
            return [document.reporter.warning(
                'Encoding %r used for reading included file %r seems to '
                'be wrong, try giving an :encoding: option' %
                (encoding, filename))]
        finally:
            if f is not None:
                f.close()
项目:logging2    作者:vforgione    | 项目源码 | 文件源码
def __init__(
            self,
            file_path: str,
            mode: Optional[str] = 'a',
            encoding: Optional[str] = 'utf8',
            errors: Optional[str] = 'strict',
            buffering: Optional[int] = 1,
            name: Optional[str] = None,
            level: Optional[LogLevel] = None
    ):
        """Instantiates a new ``FileHandler``

        :param file_path: the path (full or relative) to the log file
        :param mode: the file mode
        :param encoding: the file encoding
        :param errors: how should errors be handled
        :param buffering: should the line be buffered
        :param name: the name of the handler
        :param level: the minimum level of verbosity/priority of the messages this will log
        """
        self.fh: StreamReaderWriter = codecs.open(
            file_path, mode=mode, encoding=encoding, errors=errors,
            buffering=buffering)
        super().__init__(name=name, level=level)
        self.encoding: str = encoding
项目:LeetCode    作者:YJL33    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:OpenSky_BL    作者:fishpepper    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertCountEqual(api, codecs.__all__)
        for api in codecs.__all__:
            getattr(codecs, api)
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()
  for filename in filenames:
    ProcessFile(filename, _cpplint_state.verbose_level)
  _cpplint_state.PrintErrorCounts()

  sys.exit(_cpplint_state.error_count > 0)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = io.BytesIO(b"\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), "\xfc")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = StringIO.StringIO("\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), u"\xfc")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = StringIO.StringIO("\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), u"\xfc")
项目:chalktalk_docs    作者:loremIpsum1771    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = io.BytesIO(b"\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), "\xfc")
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def read_with_encoding(self, filename, document, codec_info, encoding):
        global cache

        f = None
        try:
            if not self.arguments[0] in cache:
                f = codecs.StreamReaderWriter(urllib2.urlopen(self.arguments[0]), codec_info[2],
                                              codec_info[3], 'strict')
                lines = f.readlines()
                cache[self.arguments[0]] = lines
            else:
                lines = cache[self.arguments[0]]

            lines = dedent_lines(lines, self.options.get('dedent'))
            return lines
        except (IOError, OSError, urllib2.URLError):
            return [document.reporter.warning(
                'Include file %r not found or reading it failed' % self.arguments[0],
                line=self.lineno)]
        except UnicodeError:
            return [document.reporter.warning(
                'Encoding %r used for reading included file %r seems to '
                'be wrong, try giving an :encoding: option' %
                (encoding, self.arguments[0]))]
        finally:
            if f is not None:
                f.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = StringIO.StringIO("\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), u"\xfc")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_open(self):
        self.addCleanup(support.unlink, support.TESTFN)
        for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
            with self.subTest(mode), \
                    codecs.open(support.TESTFN, mode, 'ascii') as file:
                self.assertIsInstance(file, codecs.StreamReaderWriter)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = io.BytesIO(b"\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), "\xfc")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = StringIO.StringIO("\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), u"\xfc")
项目:blackmamba    作者:zrzka    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + str(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_streamreaderwriter(self):
        f = io.BytesIO(b"\xc3\xbc")
        info = codecs.lookup("utf-8")
        with codecs.StreamReaderWriter(f, info.streamreader,
                                       info.streamwriter, 'strict') as srw:
            self.assertEqual(srw.read(), "\xfc")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:RST-vscode    作者:tht13    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:tf_aws_ecs_instance_draining_on_scale_in    作者:terraform-community-modules    | 项目源码 | 文件源码
def readline(self):
    "Read a line from elyxer.file"
    self.current = self.file.readline()
    if not isinstance(self.file, codecs.StreamReaderWriter):
      self.current = self.current.decode('utf-8')
    if len(self.current) == 0:
      self.depleted = True
    self.current = self.current.rstrip('\n\r')
    self.linenumber += 1
    self.mustread = False
    Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
    if self.linenumber % 1000 == 0:
      Trace.message('Parsing')
项目:StylisticFingerprinting    作者:robertyuyang    | 项目源码 | 文件源码
def main():
  filenames = ParseArguments(sys.argv[1:])

  # Change stderr to write with replacement characters so we don't die
  # if we try to print something containing non-ASCII characters.
  sys.stderr = codecs.StreamReaderWriter(sys.stderr,
                                         codecs.getreader('utf8'),
                                         codecs.getwriter('utf8'),
                                         'replace')

  _cpplint_state.ResetErrorCounts()

  f = open(_output_file, 'w')

  title = 'path,dir,'
  for i in range(1, 16):
    title += 'PFM%d' % i + ',' 
  for i in range(1, 21):
    title += 'PRM%d' % i + ',' 
  for i in range(1, 26):
    title += 'PLM%d' % i + ',' 
  f.write(title) 

  for filename in filenames:
    #added by Robert
    global _current_filenmae
    _current_filenmae = filename
    print 'current file name: ' + filename
    global _stat
    _stat = _Stat()
    #added by Robert End
    ProcessFile(filename, _cpplint_state.verbose_level, [ExtraCheckLine])
    #added by Roebrt
    global _output_file
    _stat.WriteFile(filename, f)
    #added by Robert end  

  f.close()

  _cpplint_state.PrintErrorCounts()

  _stat.PrintValidFiles() #added by Robert



  sys.exit(_cpplint_state.error_count > 0)