Python codecs 模块,EncodedFile() 实例源码

我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用codecs.EncodedFile()

项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertCountEqual(api, codecs.__all__)
        for api in codecs.__all__:
            getattr(codecs, api)
项目:OverrideAudit    作者:OdatNurd    | 项目源码 | 文件源码
def _get_packed_pkg_file_contents(self, override_file):
        try:
            with zipfile.ZipFile(self.package_file()) as zFile:
                info = find_zip_entry(zFile, override_file)
                file = codecs.EncodedFile(zFile.open(info, mode="rU"), "utf-8")
                content = io.TextIOWrapper(file, encoding="utf-8").readlines()

                source = "Shipped Packages"
                if self.installed_path is not None:
                    source = "Installed Packages"

                source = os.path.join(source, self.name, override_file)
                mtime = datetime(*info.date_time).strftime("%Y-%m-%d %H:%M:%S")

                return (content, _fixPath(source), mtime)

        except (KeyError, FileNotFoundError):
            print("Error loading %s:%s; cannot find file in sublime-package" %
                  (self.package_file(), override_file))
            return None

        except UnicodeDecodeError:
            print("Error loading %s:%s; unable to decode file contents" %
                  (self.package_file(), override_file))
            return None
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def load(self, file):
      """Load properties from an open file stream"""
      unicodeFile = codecs.EncodedFile(file, 'latin-1')
      try:
         lines = unicodeFile.readlines()
         unicodeFile.close()
      except IOError, e:
         raise
      self._parse(lines)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _store(self, outFile, header=""):
      unicodeOutFile = codecs.EncodedFile(outFile, 'latin-1')
      unicodeOutFile.write(header)
      unicodeOutFile.write(self.getAsString())
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_recoding(self):
        f = io.BytesIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write("a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_basic(self):
        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')

        f = io.BytesIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin1')
        ef.write(b'\xc3\xbc')
        self.assertEqual(f.getvalue(), b'\xfc')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = io.BytesIO(b"\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), b"\xfc")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_recoding(self):
        f = StringIO.StringIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write(u"a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_basic(self):
        f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), '\\\xd5\n\x00\x00\xae')

        f = StringIO.StringIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin1')
        ef.write('\xc3\xbc')
        self.assertEqual(f.getvalue(), '\xfc')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = StringIO.StringIO("\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), "\xfc")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_recoding(self):
        f = StringIO.StringIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write(u"a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_basic(self):
        f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), '\\\xd5\n\x00\x00\xae')

        f = StringIO.StringIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin1')
        ef.write('\xc3\xbc')
        self.assertEqual(f.getvalue(), '\xfc')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = StringIO.StringIO("\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), "\xfc")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_recoding(self):
        f = io.BytesIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write("a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_basic(self):
        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')

        f = io.BytesIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
        ef.write(b'\xc3\xbc')
        self.assertEqual(f.getvalue(), b'\xfc')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = io.BytesIO(b"\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), b"\xfc")
项目:knowledge_linker    作者:glciampaglia    | 项目源码 | 文件源码
def itertriples(path):
    '''
    Iterates over an N-triples file returning triples as tuples.

    Parameters
    ----------
    path : string
        path to N-triples file.
    '''
    if isinstance(path, file):
        ntfile = path
    else:
        if path.endswith('.gz'):
            ntfile = GzipFile(path)
        else:
            ntfile = open(path)
    ntfile = EncodedFile(ntfile, 'utf-8')
    with closing(ntfile):
        for line in ntfile:
            if line.startswith('#'):
                continue
            # remove trailing newline and dot
            line = line.strip().strip('.').strip()
            # the first two whitespaces are guaranteed to split the line
            # correctly. The trailing part may be a property containing
            # whitespaces, so using str.split is not viable.
            s1 = line.find(' ')
            s2 = line.find(' ', s1 + 1)
            triple = line[:s1], line[s1 + 1:s2], line[s2 + 1:]
            yield triple
项目:knowledge_linker    作者:glciampaglia    | 项目源码 | 文件源码
def main():
    parser = ArgumentParser(description=__doc__)
    parser.add_argument('ns_file', metavar='namespaces', help='tab-separated list of namespace codes')
    parser.add_argument('nt_file', metavar='ntriples', help='N-Triples file')
    parser.add_argument('-p', '--properties', action='store_true',
            help='print properties')
    parser.add_argument('-D', '--destination', help='destination path')
    args = parser.parse_args()
    print
    print 'WARNING: the n-triples file must be already sorted by source,'\
            ' destination!'
    print
    global namespaces
    namespaces = NodesIndex.readns(args.ns_file)
    sys.stdout = EncodedFile(sys.stdout, 'utf-8')
    # expand destination path, check it is not an existing file, create it in
    # case it does not exist
    args.destination = os.path.expanduser(os.path.expandvars(args.destination))
    if os.path.exists(args.destination) and not os.path.isdir(args.destination):
        print >> sys.stderr, 'error: not a directory: '\
                '{}'.format(args.destination)
        sys.exit(1)
    elif not os.path.exists(args.destination):
        os.mkdir(args.destination)
        print >> sys.stderr, 'info: created {}'.format(args.destination)
    try:
        tic = time()
        vertexmap, num_triples = _first_pass(args.nt_file, args.properties,
                args.destination)
        _second_pass(args.nt_file, vertexmap, num_triples, args.properties,
                args.destination)
        toc = time()
        etime = timedelta(seconds=round(toc - tic))
        speed = num_triples / (toc - tic)
        print >> sys.stderr, 'info: {:d} triples processed in {} '\
                '({:.2f} triple/s)'.format(num_triples, etime, speed)
    except IOError, e:
        if e.errno == errno.EPIPE: # broken pipe
            sys.exit(0)
        raise
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_recoding(self):
        f = StringIO.StringIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write(u"a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_basic(self):
        f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), '\\\xd5\n\x00\x00\xae')

        f = StringIO.StringIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin1')
        ef.write('\xc3\xbc')
        self.assertEqual(f.getvalue(), '\xfc')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = StringIO.StringIO("\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), "\xfc")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_recoding(self):
        f = io.BytesIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write("a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

        self.assertTrue(f.closed)

# From RFC 3492
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_basic(self):
        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')

        f = io.BytesIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
        ef.write(b'\xc3\xbc')
        self.assertEqual(f.getvalue(), b'\xfc')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = io.BytesIO(b"\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), b"\xfc")
        self.assertTrue(f.closed)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_recoding(self):
        f = StringIO.StringIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write(u"a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_basic(self):
        f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), '\\\xd5\n\x00\x00\xae')

        f = StringIO.StringIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin1')
        ef.write('\xc3\xbc')
        self.assertEqual(f.getvalue(), '\xfc')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = StringIO.StringIO("\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), "\xfc")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_recoding(self):
        f = io.BytesIO()
        f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
        f2.write("a")
        f2.close()
        # Python used to crash on this at exit because of a refcount
        # bug in _codecsmodule.c

# From RFC 3492
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_basic(self):
        f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
        ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
        self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')

        f = io.BytesIO()
        ef = codecs.EncodedFile(f, 'utf-8', 'latin-1')
        ef.write(b'\xc3\xbc')
        self.assertEqual(f.getvalue(), b'\xfc')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_encodedfile(self):
        f = io.BytesIO(b"\xc3\xbc")
        with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
            self.assertEqual(ef.read(), b"\xfc")
项目:django-bom    作者:mpkasp    | 项目源码 | 文件源码
def part_upload_bom(request, part_id):
    try:
        part = Part.objects.get(id=part_id)
    except ObjectDoesNotExist:
        messages.error(request, "No part found with given part_id.")
        return HttpResponseRedirect(reverse('error'))

    if request.method == 'POST':
        form = FileForm(request.POST, request.FILES)
        if form.is_valid():
            csvfile = request.FILES['file']
            dialect = csv.Sniffer().sniff(csvfile.readline())
            csvfile.open()
            reader = csv.reader(
                codecs.EncodedFile(
                    csvfile,
                    "utf-8"),
                delimiter=',',
                dialect=dialect)
            headers = reader.next()
            # Subpart.objects.filter(assembly_part=part).delete()

            for row in reader:
                partData = {}
                for idx, item in enumerate(row):
                    partData[headers[idx]] = item
                if 'part_number' in partData and 'quantity' in partData:
                    civ = full_part_number_to_broken_part(
                        partData['part_number'])
                    subparts = Part.objects.filter(
                        number_class=civ['class'],
                        number_item=civ['item'],
                        number_variation=civ['variation'])

                    if len(subparts) == 0:
                        messages.info(
                            request, "Subpart: {} doesn't exist".format(
                                partData['part_number']))
                        continue

                    subpart = subparts[0]
                    count = partData['quantity']
                    if part == subpart:
                        messages.error(
                            request, "Recursive part association: a part cant be a subpart of itsself")
                        return HttpResponseRedirect(reverse('error'))

                    sp = Subpart(
                        assembly_part=part,
                        assembly_subpart=subpart,
                        count=count)
                    sp.save()
        else:
            messages.error(
                request,
                "File form not valid: {}".format(
                    form.errors))
            return HttpResponseRedirect(reverse('error'))

    return HttpResponseRedirect(request.META.get('HTTP_REFERER', reverse('home')))
项目:macos-st-packages    作者:zce    | 项目源码 | 文件源码
def run(self, edit):
        view = self.view
        encoding = view.settings().get('force_encoding')
        if not encoding:
            encoding = view.settings().get('origin_encoding')
        file_name = view.file_name()
        if not encoding or encoding == 'UTF-8':
            encoding_cache.pop(file_name)
            return
        # remember current folded regions
        regions = [[x.a, x.b] for x in view.folded_regions()]
        if regions:
            view.settings().set('folded_regions', regions)
        vp = view.viewport_position()
        view.settings().set('viewport_position', [vp[0], vp[1]])
        fp = None
        try:
            fp = open(file_name, 'rb')
            contents = codecs.EncodedFile(fp, encoding, 'UTF-8').read()
        except (LookupError, UnicodeEncodeError) as e:
            sublime.error_message(u'Can not convert file encoding of {0} to {1}, it was saved as UTF-8 instead:\n\n{2}'.format
                    (os.path.basename(file_name), encoding, e))
            return
        finally:
            if fp:
                fp.close()
        # write content to temporary file
        tmp_name = os.path.join(TMP_DIR, get_temp_name(file_name))
        fp = open(tmp_name, 'wb')
        fp.write(contents)
        fp.close()
        if not get_setting(view, 'lazy_reload'):
            # os.rename has "Invalid cross-device link" issue
            os.chmod(tmp_name, os.stat(file_name)[0])
            shutil.move(tmp_name, file_name)
        else:
            # copy the timestamp from original file
            mtime = os.path.getmtime(file_name)
            os.utime(tmp_name, (mtime, mtime))
        encoding_cache.set(file_name, encoding)
        view.settings().set('prevent_detect', True)
        sublime.status_message('UTF8 -> {0}'.format(encoding))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def update_publisher(self, pub):
                """Updates the configuration information for the publisher
                defined by the provided Publisher object.
                """

                if self.mirror:
                        raise RepositoryMirrorError()
                if self.read_only:
                        raise RepositoryReadOnlyError()
                if not self.root:
                        raise RepositoryUnsupportedOperationError()

                p5ipath = os.path.join(self.root, "pub.p5i")
                fn = None
                try:
                        dirname = os.path.dirname(p5ipath)
                        fd, fn = tempfile.mkstemp(dir=dirname)

                        st = None
                        try:
                                st = os.stat(p5ipath)
                        except OSError as e:
                                if e.errno != errno.ENOENT:
                                        raise

                        if st:
                                os.fchmod(fd, stat.S_IMODE(st.st_mode))
                                try:
                                        portable.chown(fn, st.st_uid, st.st_gid)
                                except OSError as e:
                                        if e.errno != errno.EPERM:
                                                raise
                        else:
                                os.fchmod(fd, misc.PKG_FILE_MODE)

                        if six.PY2:
                                with os.fdopen(fd, "wb") as f:
                                        with codecs.EncodedFile(f, "utf-8") as ef:
                                                p5i.write(ef, [pub])
                        else:
                               # we use simpleson.dump() in p5i.write(),
                               # simplejson module will produce str objects
                               # in Python 3, therefore fp.write()
                               # must support str input.

                                with open(fd, "w", encoding="utf-8") as fp:
                                        p5i.write(fp, [pub])
                        portable.rename(fn, p5ipath)
                except EnvironmentError as e:
                        if e.errno == errno.EACCES:
                                raise apx.PermissionsException(e.filename)
                        elif e.errno == errno.EROFS:
                                raise apx.ReadOnlyFileSystemException(
                                    e.filename)
                        raise
                finally:
                        if fn and os.path.exists(fn):
                                os.unlink(fn)