Python codecs 模块,getincrementalencoder() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getincrementalencoder()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few codepoints. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')

        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
        self.assertEqual(encoder.encode('', True), b'')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few code points. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')

        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode('', True), '\xa9\xdc')
        self.assertEqual(encoder.encode('', True), '')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few code points. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')

        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode('', True), '\xa9\xdc')
        self.assertEqual(encoder.encode('', True), '')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few codepoints. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')

        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
        self.assertEqual(encoder.encode('', True), b'')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few codepoints. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')

        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode('', True), '\xa9\xdc')
        self.assertEqual(encoder.encode('', True), '')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertCountEqual(api, codecs.__all__)
        for api in codecs.__all__:
            getattr(codecs, api)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few codepoints. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
        self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')

        self.assertEqual(encoder.encode(u'\u00e6'), '')
        self.assertEqual(encoder.encode('', True), '\xa9\xdc')
        self.assertEqual(encoder.encode('', True), '')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_stateful(self):
        # jisx0213 encoder is stateful for a few codepoints. eg)
        #   U+00E6 => A9DC
        #   U+00E6 U+0300 => ABC4
        #   U+0300 => ABDC

        encoder = codecs.getincrementalencoder('jisx0213')()
        self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
        self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')

        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')

        self.assertEqual(encoder.encode('\u00e6'), b'')
        self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
        self.assertEqual(encoder.encode('', True), b'')
项目:AmericansStopSoros    作者:HelloKitty    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        """Init method."""
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, f, encoding, **kwds):
    # Redirect output to a queue
    import cStringIO
    self.queue = cStringIO.StringIO()
    self.writer = csv.writer(self.queue, **kwds)
    self.stream = f
    self.encoding = encoding
    self.encoder = codecs.getincrementalencoder(self.encoding)()
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def set_tx_encoding(self, encoding, errors='replace'):
        self.output_encoding = encoding
        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
项目:csv_generator    作者:fatboystring    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
            """
            Instantiates the UnicodeWriter instance

            :param f: File like object to write CSV data to
            :param dialect: The dialect for the CSV
            :param encoding: The CSV encoding
            :param kwargs: Keyword args
            """
            self.queue = cStringIO.StringIO()
            self.writer = csv.writer(self.queue, dialect=dialect, **kwargs)
            self.stream = f
            self.encoder = codecs.getincrementalencoder(encoding)()
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def set_tx_encoding(self, encoding, errors='replace'):
        self.output_encoding = encoding
        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
项目:SWProxy-plugins    作者:lstern    | 项目源码 | 文件源码
def __init__(self, f, fieldnames, dialect=csv.excel, encoding="utf-8", newfile=True, **kwds):
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.DictWriter(self.queue, fieldnames, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
        if newfile:
            self.writebom()
项目:acacia_main    作者:AcaciaTrading    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        # Redirect output to a queue
        self.queue = cStringIO.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def set_tx_encoding(self, encoding, errors='replace'):
        """set encoding for transmitted data"""
        self.output_encoding = encoding
        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_state_handling_encode(self, encoding, u, s):
        for i in range(len(u)+1):
            d = codecs.getincrementalencoder(encoding)()
            part1 = d.encode(u[:i])
            state = d.getstate()
            d = codecs.getincrementalencoder(encoding)()
            d.setstate(state)
            part2 = d.encode(u[i:], True)
            self.assertEqual(s, part1+part2)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_incremental_encode(self):
        self.assertEqual(
            b"".join(codecs.iterencode("python.org", "idna")),
            b"python.org"
        )
        self.assertEqual(
            b"".join(codecs.iterencode("python.org.", "idna")),
            b"python.org."
        )
        self.assertEqual(
            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
            b"xn--pythn-mua.org."
        )
        self.assertEqual(
            b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")),
            b"xn--pythn-mua.org."
        )

        encoder = codecs.getincrementalencoder("idna")()
        self.assertEqual(encoder.encode("\xe4x"), b"")
        self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
        self.assertEqual(encoder.encode("", True), b"org")

        encoder.reset()
        self.assertEqual(encoder.encode("\xe4x"), b"")
        self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
        self.assertEqual(encoder.encode("", True), b"")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_newline_decoding(self, decoder, encoding):
        result = []
        if encoding is not None:
            encoder = codecs.getincrementalencoder(encoding)()
            def _decode_bytewise(s):
                # Decode one byte at a time
                for b in encoder.encode(s):
                    result.append(decoder.decode(bytes([b])))
        else:
            encoder = None
            def _decode_bytewise(s):
                # Decode one char at a time
                for c in s:
                    result.append(decoder.decode(c))
        self.assertEqual(decoder.newlines, None)
        _decode_bytewise("abc\n\r")
        self.assertEqual(decoder.newlines, '\n')
        _decode_bytewise("\nabc")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc")
        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
        decoder.reset()
        input = "abc"
        if encoder is not None:
            encoder.reset()
            input = encoder.encode(input)
        self.assertEqual(decoder.decode(input), "abc")
        self.assertEqual(decoder.newlines, None)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stateless(self):
        # cp949 encoder isn't stateful at all.
        encoder = codecs.getincrementalencoder('cp949')()
        self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
                         b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
                         b'\xa1\xd9\xa1\xad\xa1\xd9')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode('', True), b'')
        self.assertEqual(encoder.encode('', False), b'')
        self.assertEqual(encoder.reset(), None)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_issue5640(self):
        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
        self.assertEqual(encoder.encode('\xff'), b'\\xff')
        self.assertEqual(encoder.encode('\n'), b'\n')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_incrementalencoder(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        output = b''.join(
            encoder.encode(char)
            for char in self.text)
        self.assertEqual(output, self.expected)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_incrementalencoder_final(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        last_index = len(self.text) - 1
        output = b''.join(
            encoder.encode(char, index == last_index)
            for index, char in enumerate(self.text))
        self.assertEqual(output, self.expected_reset)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def set_tx_encoding(self, encoding, errors='replace'):
        """set encoding for transmitted data"""
        self.output_encoding = encoding
        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def set_tx_encoding(self, encoding, errors='replace'):
        self.output_encoding = encoding
        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", errors='replace', **kwds):
            # Redirect output to a queue
            self.queue = cStringIO.StringIO()
            self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
            self.stream = f
            encoder_cls = codecs.getincrementalencoder(encoding)
            self.encoder = encoder_cls(errors=errors)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_incremental_encode(self):
        self.assertEqual(
            "".join(codecs.iterencode(u"python.org", "idna")),
            "python.org"
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"python.org.", "idna")),
            "python.org."
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")),
            "xn--pythn-mua.org."
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")),
            "xn--pythn-mua.org."
        )

        encoder = codecs.getincrementalencoder("idna")()
        self.assertEqual(encoder.encode(u"\xe4x"), "")
        self.assertEqual(encoder.encode(u"ample.org"), "xn--xample-9ta.")
        self.assertEqual(encoder.encode(u"", True), "org")

        encoder.reset()
        self.assertEqual(encoder.encode(u"\xe4x"), "")
        self.assertEqual(encoder.encode(u"ample.org."), "xn--xample-9ta.org.")
        self.assertEqual(encoder.encode(u"", True), "")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def check_newline_decoding(self, decoder, encoding):
        result = []
        if encoding is not None:
            encoder = codecs.getincrementalencoder(encoding)()
            def _decode_bytewise(s):
                # Decode one byte at a time
                for b in encoder.encode(s):
                    result.append(decoder.decode(b))
        else:
            encoder = None
            def _decode_bytewise(s):
                # Decode one char at a time
                for c in s:
                    result.append(decoder.decode(c))
        self.assertEqual(decoder.newlines, None)
        _decode_bytewise("abc\n\r")
        self.assertEqual(decoder.newlines, '\n')
        _decode_bytewise("\nabc")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc")
        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
        decoder.reset()
        input = "abc"
        if encoder is not None:
            encoder.reset()
            input = encoder.encode(input)
        self.assertEqual(decoder.decode(input), "abc")
        self.assertEqual(decoder.newlines, None)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_stateless(self):
        # cp949 encoder isn't stateful at all.
        encoder = codecs.getincrementalencoder('cp949')()
        self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'),
                         '\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u2606\u223c\u2606', True),
                         '\xa1\xd9\xa1\xad\xa1\xd9')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'', True), '')
        self.assertEqual(encoder.encode(u'', False), '')
        self.assertEqual(encoder.reset(), None)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_issue5640(self):
        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
        self.assertEqual(encoder.encode(u'\xff'), b'\\xff')
        self.assertEqual(encoder.encode(u'\n'), b'\n')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_incrementalencoder(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        output = b''.join(
            encoder.encode(char)
            for char in self.text)
        self.assertEqual(output, self.expected)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_incrementalencoder_final(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        last_index = len(self.text) - 1
        output = b''.join(
            encoder.encode(char, index == last_index)
            for index, char in enumerate(self.text))
        self.assertEqual(output, self.expected_reset)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_incremental_encode(self):
        self.assertEqual(
            "".join(codecs.iterencode(u"python.org", "idna")),
            "python.org"
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"python.org.", "idna")),
            "python.org."
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")),
            "xn--pythn-mua.org."
        )
        self.assertEqual(
            "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")),
            "xn--pythn-mua.org."
        )

        encoder = codecs.getincrementalencoder("idna")()
        self.assertEqual(encoder.encode(u"\xe4x"), "")
        self.assertEqual(encoder.encode(u"ample.org"), "xn--xample-9ta.")
        self.assertEqual(encoder.encode(u"", True), "org")

        encoder.reset()
        self.assertEqual(encoder.encode(u"\xe4x"), "")
        self.assertEqual(encoder.encode(u"ample.org."), "xn--xample-9ta.org.")
        self.assertEqual(encoder.encode(u"", True), "")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def check_newline_decoding(self, decoder, encoding):
        result = []
        if encoding is not None:
            encoder = codecs.getincrementalencoder(encoding)()
            def _decode_bytewise(s):
                # Decode one byte at a time
                for b in encoder.encode(s):
                    result.append(decoder.decode(b))
        else:
            encoder = None
            def _decode_bytewise(s):
                # Decode one char at a time
                for c in s:
                    result.append(decoder.decode(c))
        self.assertEqual(decoder.newlines, None)
        _decode_bytewise("abc\n\r")
        self.assertEqual(decoder.newlines, '\n')
        _decode_bytewise("\nabc")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
        _decode_bytewise("abc")
        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
        _decode_bytewise("abc\r")
        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
        decoder.reset()
        input = "abc"
        if encoder is not None:
            encoder.reset()
            input = encoder.encode(input)
        self.assertEqual(decoder.decode(input), "abc")
        self.assertEqual(decoder.newlines, None)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_stateless(self):
        # cp949 encoder isn't stateful at all.
        encoder = codecs.getincrementalencoder('cp949')()
        self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'),
                         '\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'\u2606\u223c\u2606', True),
                         '\xa1\xd9\xa1\xad\xa1\xd9')
        self.assertEqual(encoder.reset(), None)
        self.assertEqual(encoder.encode(u'', True), '')
        self.assertEqual(encoder.encode(u'', False), '')
        self.assertEqual(encoder.reset(), None)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_issue5640(self):
        encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
        self.assertEqual(encoder.encode(u'\xff'), b'\\xff')
        self.assertEqual(encoder.encode(u'\n'), b'\n')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_incrementalencoder(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        output = b''.join(
            encoder.encode(char)
            for char in self.text)
        self.assertEqual(output, self.expected)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_incrementalencoder_final(self):
        encoder = codecs.getincrementalencoder(self.encoding)()
        last_index = len(self.text) - 1
        output = b''.join(
            encoder.encode(char, index == last_index)
            for index, char in enumerate(self.text))
        self.assertEqual(output, self.expected_reset)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder
项目:gpo_tools    作者:rbshaffer    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        # Redirect output to a queue
        self.queue = io.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:gpo_tools    作者:rbshaffer    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        # Redirect output to a queue
        self.queue = io.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:gpo_tools    作者:rbshaffer    | 项目源码 | 文件源码
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
        # Redirect output to a queue
        self.queue = io.StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _get_encoder(self):
        make_encoder = codecs.getincrementalencoder(self._encoding)
        self._encoder = make_encoder(self._errors)
        return self._encoder