Python io 模块,open() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用io.open()

项目:margy    作者:opentower    | 项目源码 | 文件源码
def add():
    if request.method == 'POST':
        if request.form['submit'] == 'Add':
            addr = request.form['addr'].lstrip().rstrip()
            f = io.open('blastlist.txt', 'a', encoding="utf-8")
            f.write(addr.decode('utf-8') + u'\r\n')
            f.close()
            return render_template('listadded.html',addr=addr)
        elif request.form['submit'] == 'Remove':
            addr = request.form['addr'].lstrip().rstrip()
            f = io.open('blastlist.txt', 'r', encoding="utf-8")
            lines = f.readlines()
            f.close()
            f = io.open('blastlist.txt', 'w', encoding="utf-8")
            for line in lines:
                if addr not in line:
                    f.write(line.decode('utf-8'))
            f.close()
            return render_template('listremoved.html',addr=addr)
项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def __iter__(self):
        """
        Read a file where each line is of the form "word1 word2 ..."
        Yields lists of the form [word1, word2, ...]
        """
        if os.path.isdir(self.fname):
            filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
        else:
            filenames = [self.fname]
        for filename in filenames:
            # with io.open(filename, encoding='utf-8') as f:
            with open(filename) as f:
                doc = f.read()
                for line in doc.split("\n"):
                    #if not line:  continue
                    sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
                    # sent = [word for word in line.strip().split()]
                    sent = [self.begin] + sent + [self.end]
                    yield sent
项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def __iter__(self):
        """
        Read a file where each line is of the form "word1 word2 ..."
        Yields lists of the form [word1, word2, ...]
        """
        #jfbbb
    if os.path.isdir(self.fname):
            filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
        #else:
        #    filenames = [self.fname]

        for langpath in filenames:
            with open(filename) as f:
                doc = f.read()
                for line in doc.split("\n"):
                    #if not line:  continue
                    sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
                    # sent = [word for word in line.strip().split()]
                    sent = [self.begin] + sent + [self.end]
                    yield sent
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def read_manifest(self):
        """Read the manifest file (named by 'self.manifest') and use it to
        fill in 'self.filelist', the list of files to include in the source
        distribution.
        """
        log.info("reading manifest file '%s'", self.manifest)
        manifest = open(self.manifest, 'rb')
        for line in manifest:
            # The manifest must contain UTF-8. See #303.
            if six.PY3:
                try:
                    line = line.decode('UTF-8')
                except UnicodeDecodeError:
                    log.warn("%r not UTF-8 decodable -- skipping" % line)
                    continue
            # ignore comments and blank lines
            line = line.strip()
            if line.startswith('#') or not line:
                continue
            self.filelist.append(line)
        manifest.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def uninstall_link(self):
        if os.path.exists(self.egg_link):
            log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
            egg_link_file = open(self.egg_link)
            contents = [line.rstrip() for line in egg_link_file]
            egg_link_file.close()
            if contents not in ([self.egg_path],
                                [self.egg_path, self.setup_path]):
                log.warn("Link points to %s: uninstall aborted", contents)
                return
            if not self.dry_run:
                os.unlink(self.egg_link)
        if not self.dry_run:
            self.update_pth(self.dist)  # remove any .pth link to us
        if self.distribution.scripts:
            # XXX should also check for entry point scripts!
            log.warn("Note: you must uninstall or replace scripts manually!")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def install_egg_scripts(self, dist):
        if dist is not self.dist:
            # Installing a dependency, so fall back to normal behavior
            return easy_install.install_egg_scripts(self, dist)

        # create wrapper scripts in the script dir, pointing to dist.scripts

        # new-style...
        self.install_wrapper_scripts(dist)

        # ...and old-style
        for script_name in self.distribution.scripts or []:
            script_path = os.path.abspath(convert_path(script_name))
            script_name = os.path.basename(script_path)
            with io.open(script_path) as strm:
                script_text = strm.read()
            self.install_script(dist, script_name, script_text, script_path)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def read_manifest(self):
        """Read the manifest file (named by 'self.manifest') and use it to
        fill in 'self.filelist', the list of files to include in the source
        distribution.
        """
        log.info("reading manifest file '%s'", self.manifest)
        manifest = open(self.manifest, 'rb')
        for line in manifest:
            # The manifest must contain UTF-8. See #303.
            if six.PY3:
                try:
                    line = line.decode('UTF-8')
                except UnicodeDecodeError:
                    log.warn("%r not UTF-8 decodable -- skipping" % line)
                    continue
            # ignore comments and blank lines
            line = line.strip()
            if line.startswith('#') or not line:
                continue
            self.filelist.append(line)
        manifest.close()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def uninstall_link(self):
        if os.path.exists(self.egg_link):
            log.info("Removing %s (link to %s)", self.egg_link, self.egg_base)
            egg_link_file = open(self.egg_link)
            contents = [line.rstrip() for line in egg_link_file]
            egg_link_file.close()
            if contents not in ([self.egg_path],
                                [self.egg_path, self.setup_path]):
                log.warn("Link points to %s: uninstall aborted", contents)
                return
            if not self.dry_run:
                os.unlink(self.egg_link)
        if not self.dry_run:
            self.update_pth(self.dist)  # remove any .pth link to us
        if self.distribution.scripts:
            # XXX should also check for entry point scripts!
            log.warn("Note: you must uninstall or replace scripts manually!")
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def install_egg_scripts(self, dist):
        if dist is not self.dist:
            # Installing a dependency, so fall back to normal behavior
            return easy_install.install_egg_scripts(self, dist)

        # create wrapper scripts in the script dir, pointing to dist.scripts

        # new-style...
        self.install_wrapper_scripts(dist)

        # ...and old-style
        for script_name in self.distribution.scripts or []:
            script_path = os.path.abspath(convert_path(script_name))
            script_name = os.path.basename(script_path)
            with io.open(script_path) as strm:
                script_text = strm.read()
            self.install_script(dist, script_name, script_text, script_path)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
项目:seq2seq    作者:google    | 项目源码 | 文件源码
def write_parallel_text(sources, targets, output_prefix):
  """
  Writes two files where each line corresponds to one example
    - [output_prefix].sources.txt
    - [output_prefix].targets.txt

  Args:
    sources: Iterator of source strings
    targets: Iterator of target strings
    output_prefix: Prefix for the output file
  """
  source_filename = os.path.abspath(os.path.join(output_prefix, "sources.txt"))
  target_filename = os.path.abspath(os.path.join(output_prefix, "targets.txt"))

  with io.open(source_filename, "w", encoding='utf8') as source_file:
    for record in sources:
      source_file.write(record + "\n")
  print("Wrote {}".format(source_filename))

  with io.open(target_filename, "w", encoding='utf8') as target_file:
    for record in targets:
      target_file.write(record + "\n")
  print("Wrote {}".format(target_filename))
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def pngValidator(path=None, data=None, fileObj=None):
    """
    Version 3+.

    This checks the signature of the image data.
    """
    assert path is not None or data is not None or fileObj is not None
    if path is not None:
        with open(path, "rb") as f:
            signature = f.read(8)
    elif data is not None:
        signature = data[:8]
    elif fileObj is not None:
        pos = fileObj.tell()
        signature = fileObj.read(8)
        fileObj.seek(pos)
    if signature != pngSignature:
        return False, "Image does not begin with the PNG signature."
    return True, None

# -------------------
# layercontents.plist
# -------------------
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testInvalidLayerContentsFormat(self):
        # bogus
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        with open(path, "w") as f:
            f.write("test")
        reader = UFOReader(self.ufoPath)
        self.assertRaises(UFOLibError, reader.getGlyphSet)
        # dict
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        layerContents = {
            "public.default" : "glyphs",
            "layer 1" : "glyphs.layer 1",
            "layer 2" : "glyphs.layer 2",
        }
        with open(path, "wb") as f:
            writePlist(layerContents, f)
        reader = UFOReader(self.ufoPath)
        self.assertRaises(UFOLibError, reader.getGlyphSet)

    # layer contents invalid name format
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testInvalidLayerContentsFormat(self):
        # bogus
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        with open(path, "w") as f:
            f.write("test")
        self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)
        # dict
        self.makeUFO()
        path = os.path.join(self.ufoPath, "layercontents.plist")
        os.remove(path)
        layerContents = {
            "public.default" : "glyphs",
            "layer 1" : "glyphs.layer 1",
            "layer 2" : "glyphs.layer 2",
        }
        with open(path, "wb") as f:
            writePlist(layerContents, f)
        self.assertRaises(UFOLibError, UFOWriter, self.ufoPath)

    # __init__: layer contents invalid name format
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testGetGlyphSets(self):
        self.makeUFO()
        # hack contents.plist
        path = os.path.join(self.ufoPath, "glyphs.layer 1", "contents.plist")
        with open(path, "wb") as f:
            writePlist(dict(b="a.glif"), f)
        path = os.path.join(self.ufoPath, "glyphs.layer 2", "contents.plist")
        with open(path, "wb") as f:
            writePlist(dict(c="a.glif"), f)
        # now test
        writer = UFOWriter(self.ufoPath)
        # default
        expected = ["a"]
        result = list(writer.getGlyphSet().keys())
        self.assertEqual(expected, result)
        # layer 1
        expected = ["b"]
        result = list(writer.getGlyphSet("layer 1", defaultLayer=False).keys())
        self.assertEqual(expected, result)
        # layer 2
        expected = ["c"]
        result = list(writer.getGlyphSet("layer 2", defaultLayer=False).keys())
        self.assertEqual(expected, result)

    # make a new font with two layers
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testNewFontThreeLayers(self):
        self.clearUFO()
        writer = UFOWriter(self.ufoPath)
        writer.getGlyphSet("layer 1", defaultLayer=False)
        writer.getGlyphSet()
        writer.getGlyphSet("layer 2", defaultLayer=False)
        writer.writeLayerContents(["layer 1", "public.default", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["layer 1", "glyphs.layer 1"], ["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # add a layer to an existing font
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testRenameLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.renameGlyphSet("layer 1", "layer 3")
        writer.writeLayerContents(["public.default", "layer 3", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 3")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [['public.default', 'glyphs'], ['layer 3', 'glyphs.layer 3'], ['layer 2', 'glyphs.layer 2']]
        self.assertEqual(expected, result)
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testRemoveLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.deleteGlyphSet("layer 1")
        writer.writeLayerContents(["public.default", "layer 2"])
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # remove default layer
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testRemoveDefaultLayer(self):
        self.makeUFO()
        writer = UFOWriter(self.ufoPath)
        writer.deleteGlyphSet("public.default")
        # directories
        path = os.path.join(self.ufoPath, "glyphs")
        exists = os.path.exists(path)
        self.assertEqual(False, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 1")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        path = os.path.join(self.ufoPath, "glyphs.layer 2")
        exists = os.path.exists(path)
        self.assertEqual(True, exists)
        # layer contents
        path = os.path.join(self.ufoPath, "layercontents.plist")
        with open(path, "rb") as f:
            result = readPlist(f)
        expected = [["layer 1", "glyphs.layer 1"], ["layer 2", "glyphs.layer 2"]]
        self.assertEqual(expected, result)

    # remove unknown layer
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def testWrite(self):
        writer = UFOWriter(self.dstDir, formatVersion=2)
        writer.setKerningGroupConversionRenameMaps(self.downConversionMapping)
        writer.writeKerning(self.kerning)
        writer.writeGroups(self.groups)
        # test groups
        path = os.path.join(self.dstDir, "groups.plist")
        with open(path, "rb") as f:
            writtenGroups = readPlist(f)
        self.assertEqual(writtenGroups, self.expectedWrittenGroups)
        # test kerning
        path = os.path.join(self.dstDir, "kerning.plist")
        with open(path, "rb") as f:
            writtenKerning = readPlist(f)
        self.assertEqual(writtenKerning, self.expectedWrittenKerning)
        self.tearDownUFO()
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def writeLayerInfo(self, info):
        if self.ufoFormatVersion < 3:
            raise GlifLibError("layerinfo.plist is not allowed in UFO %d." % self.ufoFormatVersion)
        # gather data
        infoData = {}
        for attr in list(layerInfoVersion3ValueData.keys()):
            if hasattr(info, attr):
                try:
                    value = getattr(info, attr)
                except AttributeError:
                    raise GlifLibError("The supplied info object does not support getting a necessary attribute (%s)." % attr)
                if value is None or (attr == 'lib' and not value):
                    continue
                infoData[attr] = value
        # validate
        infoData = validateLayerInfoVersion3Data(infoData)
        # write file
        path = os.path.join(self.dirName, LAYERINFO_FILENAME)
        with open(path, "wb") as f:
            writePlist(infoData, f)

    # read caching
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def readBytesFromPath(self, path, encoding=None):
        """
        Returns the bytes in the file at the given path.
        The path must be relative to the UFO path.
        Returns None if the file does not exist.
        An encoding may be passed if needed.
        """
        fullPath = os.path.join(self._path, path)
        if not self._checkForFile(fullPath):
            return None
        if os.path.isdir(fullPath):
            raise UFOLibError("%s is a directory." % path)
        if encoding:
            f = open(fullPath, encoding=encoding)
        else:
            f = open(fullPath, "rb", encoding=encoding)
        data = f.read()
        f.close()
        return data
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def getReadFileForPath(self, path, encoding=None):
        """
        Returns a file (or file-like) object for the
        file at the given path. The path must be relative
        to the UFO path. Returns None if the file does not exist.
        An encoding may be passed if needed.

        Note: The caller is responsible for closing the open file.
        """
        fullPath = os.path.join(self._path, path)
        if not self._checkForFile(fullPath):
            return None
        if os.path.isdir(fullPath):
            raise UFOLibError("%s is a directory." % path)
        if encoding:
            f = open(fullPath, "rb", encoding=encoding)
        else:
            f = open(fullPath, "r")
        return f
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def writeFileAtomically(text, path, encoding="utf-8"):
    """
    Write text into a file at path. Do this sort of atomically
    making it harder to cause corrupt files. This also checks to see
    if text matches the text that is already in the file at path.
    If so, the file is not rewritten so that the modification date
    is preserved. An encoding may be passed if needed.
    """
    if os.path.exists(path):
        with open(path, "r", encoding=encoding) as f:
            oldText = f.read()
        if text == oldText:
            return
        # if the text is empty, remove the existing file
        if not text:
            os.remove(path)
    if text:
        with open(path, "w", encoding=encoding) as f:
            f.write(text)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def consume(self, doc, payload):
    """
    Write text to target directory, using a combination of filename and file
    ID as path.

    :param doc: Document object.
    :param payload: File pointer beloning to document.
    :type doc: ``gransk.core.document.Document``
    :type payload: ``file``
    """
    new_filename = '%s-%s' % \
      (doc.docid[0:8], document.secure_path(os.path.basename(doc.path)))

    if not os.path.exists(self.root):
      os.makedirs(self.root)

    new_path = os.path.join(self.root, new_filename)

    with io.open(new_path, 'w', encoding='utf-8') as out:
      out.write(doc.text)

    doc.meta['text_file'] = new_path
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def __call__(self, value):

        if value is None:
            return value

        path = unicode(value)

        if not os.path.isabs(path):
            path = os.path.join(self.directory, path)

        try:
            value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering)
        except IOError as error:
            raise ValueError('Cannot open {0} with mode={1} and buffering={2}: {3}'.format(
                value, self.mode, self.buffering, error))

        return value
项目:scientific-paper-summarisation    作者:EdCo95    | 项目源码 | 文件源码
def downloadAllJournalArticles(skip = False, dontskip = ""):
# download journal articles from a list of journal names
    if dontskip != "":
        skip = True
    f = io.open("../compsci_journals.txt")  # all Elsevier CS journal names
    for l in f:
        if len(l) > 2:
            j = l.strip("\n")
            # skip the ones for which there are already folders, we assume for those downloading has finished
            if skip == True:
                if j.lower().replace(" ", "_") in os.listdir("../elsevier_papers_xml"):
                    if not j == dontskip:
                        print("Skipping journal:", j)
                        continue
            print("Downloading articles for journal:", j)
            jurl = getJournalURL(j)
            downloadArticles("../elsevier_papers_xml/" + j.lower().replace(" ", "_") + "/", jurl)
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)
项目:margy    作者:opentower    | 项目源码 | 文件源码
def whitelist():
    with io.open('static/whitelist.txt', 'r', encoding="utf-8") as f: #gets the contents of whitelist.txt so they can be displayed
        data = f.read().replace('@', ' [at] ').replace('.', ' [dot] ')
    return render_template('whitelist.html',data=data)
项目:margy    作者:opentower    | 项目源码 | 文件源码
def tail():
    if request.method == 'POST':
        fi = request.form['file']
        if os.path.isfile(fi):
            n = int(request.form['n'])
            le = io.open(fi, 'r', encoding='utf-8')
            taildata = le.read()[-n:]
            le.close()
        else:
            taildata = "No such file."
        return render_template('tail.html',taildata=taildata)
项目:margy    作者:opentower    | 项目源码 | 文件源码
def wladd():
    if request.method == 'POST':
        addr = request.form['addr'].lstrip().rstrip()
        f = io.open('static/whitelist.txt', 'a', encoding="utf-8")
        f.write(addr.decode('utf-8') + u'\r\n')
        f.close()
        return render_template('wladd.html')
项目:margy    作者:opentower    | 项目源码 | 文件源码
def unsub():
    if request.method == 'POST':
        addr = request.form['addr'].lstrip().rstrip()
        f = io.open('unsubscribers.txt', 'a', encoding="utf-8")
        f.write(addr.decode('utf-8') + u'\r\n')
        f.close()
        f = io.open('static/whitelist.txt', 'r', encoding="utf-8")
        lines = f.readlines()
        f.close()
        f = io.open('static/whitelist.txt', 'w', encoding="utf-8")
        for line in lines:
            if addr not in line:
                f.write(line.decode('utf-8'))
        f.close()
        return render_template('unsubscribed.html',addr=addr)
项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def save(self, filename):
        info_dict = {
            "tokens":self.tokens,
            "strings":self.strings,
            "s2t":dict(self.s2t),
            "i2t":dict(self.i2t),
            "unk":self.unk,
            "START_TOK":self.START_TOK,
            "END_TOK":self.END_TOK
        }
        with open(filename, "w") as f: pickle.dump(info_dict, f)
项目:lang-reps    作者:chaitanyamalaviya    | 项目源码 | 文件源码
def load(cls, filename):
        with open(filename, "r") as f:
            info_dict = pickle.load(f)
            v = Vocab()
            v.tokens = info_dict["tokens"]
            v.strings = info_dict["strings"]
            v.unk = info_dict["unk"]
            v.START_TOK = info_dict["START_TOK"]
            v.END_TOK = info_dict["END_TOK"]
            defaultf = (lambda :v.unk) if (v.unk is not None) else Token.not_found
            v.s2t = defaultdict(defaultf, info_dict["s2t"])
            v.i2t = defaultdict(defaultf, info_dict["i2t"])
            return v
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def parse_init():
    with open(os.path.join(HERE, PKG_NAME, '__init__.py')) as f:
        file_data = f.read()
    return [regex.search(file_data).group(2) for regex in
            (AUTHOR, DOCSTRING, VERSION)]
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def read(*filenames, **kwargs):
    encoding = kwargs.get('encoding', 'utf-8')
    sep = kwargs.get('sep', '\n')
    buf = []
    for filename in filenames:
        with io.open(filename, encoding=encoding) as f:
            buf.append(f.read())
    return sep.join(buf)
项目:azure-docs-sdk-python    作者:MicrosoftDocs    | 项目源码 | 文件源码
def rewrite_yml(data):
    with io.open('toc.yml', 'w', encoding='utf8') as outfile:
        yaml.dump(data, outfile, default_flow_style=False, allow_unicode=True)
项目:GELUs    作者:hendrycks    | 项目源码 | 文件源码
def embeddings_to_dict(filename):
    '''
    :param filename: the file name of the word embeddings | file is assumed
    to follow this format: "word[tab]dimension 1[space]dimension 2[space]...[space]dimension 50"
    :return: a dictionary with keys that are words and values that are the embedding of a word
    '''
    with io.open(filename, 'r', encoding='utf-8') as f:
        word_vecs = {}
        for line in f:
            line = line.strip('\n').split()
            word_vecs[line[0]] = np.array([float(s) for s in line[1:]])

    return word_vecs
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def load_test_fixture(fixture_path):
    path = os.path.dirname(os.path.abspath(__file__))
    fixture_file = open(path + '/' + fixture_path)
    input = fixture_file.read()
    fixture_file.close()
    sys.stdin = StringIO(input)
    sys.stdout = StringIO()
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def load_data(file: str):
    with io.open(os.path.join(__abspath__, 'test_data', file)) as afile:
        input_str = afile.read().replace('PATH', os.path.join(__abspath__, 'test_data'))
    sys.stdin = io.StringIO(input_str)
    sys.stdout = io.StringIO()
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(path, quiet=False):
    """
    Downloads all available hash files to a given path.

    :param path: Path to download directory
    :param quiet: If set to True, no progressbar is displayed
    """
    if os.path.isdir(path):
        session = requests.Session()
        session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'}
        max_num = max(list(map(int, re.sub(r'[\<\>]',
                                           '',
                                           '\n'.join(re.findall(r'\>[1-9][0-9]{2}\<',
                                                                session.get('https://virusshare.com/hashes.4n6').text
                                                                )
                                                     )
                                           ).split('\n')
                               )
                           )
                      )
        if not quiet:
            p = progressbar.ProgressBar(max_value=max_num)
        for i in range(max_num):
            filename = str(i).zfill(3) + '.md5'
            if os.path.exists(os.path.join(path, filename)):
                continue
            if not quiet:
                p.update(i)
            url = URL + filename
            head = session.head(url)
            if head.status_code == 200:
                body = session.get(url, stream=True)
                with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile:
                    for chunk in body.iter_content(chunk_size=1024):
                        afile.write(b'' + chunk)
                body.close()
    else:
        print('Given path is not a directory.')
        sys.exit(1)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):
        searchhash = ''
        if self.data_type == 'hash':
            searchhash = self.getData()
            if len(searchhash) != 32:
                self.report({'isonvs': 'unknown',
                             'hash': searchhash})
        elif self.data_type == 'file':
            filepath = self.getParam('file')
            hasher = hashlib.md5()
            with io.open(filepath, mode='rb') as afile:
                for chunk in iter(lambda: afile.read(65536), b''):
                    hasher.update(chunk)
            searchhash = hasher.hexdigest()
        else:
            self.error('Unsupported data type.')

        # Read files
        for file in self.filelist:
            filepath = os.path.join(self.path, file)
            if not os.path.isfile(filepath):
                continue
            with io.open(filepath, 'r') as afile:
                for line in afile:
                    # Skipping comments
                    if line[0] == '#':
                        continue
                    if searchhash.lower() in line:
                        self.report({'isonvs': True,
                                     'md5': searchhash})
        self.report({'isonvs': False,
                     'md5': searchhash})
项目:cemu    作者:hugsy    | 项目源码 | 文件源码
def readme():
    import io
    with io.open('README.rst', "r", encoding="utf-8") as f:
        long_description = f.read()

    return long_description
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run_script(self, script_name, namespace):
        script = 'scripts/' + script_name
        if not self.has_metadata(script):
            raise ResolutionError("No script named %r" % script_name)
        script_text = self.get_metadata(script).replace('\r\n', '\n')
        script_text = script_text.replace('\r', '\n')
        script_filename = self._fn(self.egg_info, script)
        namespace['__file__'] = script_filename
        if os.path.exists(script_filename):
            source = open(script_filename).read()
            code = compile(source, script_filename, 'exec')
            exec(code, namespace, namespace)
        else:
            from linecache import cache
            cache[script_filename] = (
                len(script_text), 0, script_text.split('\n'), script_filename
            )
            script_code = compile(script_text, script_filename, 'exec')
            exec(script_code, namespace, namespace)