Python os 模块,unlink() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.unlink()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def _clean_check(cmd, target):
    """
    Run the command to download target. If the command fails, clean up before
    re-raising the error.
    """
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        if os.access(target, os.F_OK):
            os.unlink(target)
        raise
项目:easydo-ui    作者:easydo-cn    | 项目源码 | 文件源码
def dot2graph(self, dot, format='svg'):
        # windows ???????????????????
        # ?????? NamedTemporaryFile ?????
        with NamedTemporaryFile(delete=False) as dotfile:
            dotfile.write(dot)
        outfile = NamedTemporaryFile(delete=False)
        os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
            dotfile.name, outfile.name, format))
        result = outfile.read()
        outfile.close()
        os.unlink(dotfile.name)
        os.unlink(outfile.name)
        return result
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def test_rs3topng():
    """rs3 file is converted to PNG"""
    png_str = rstviewer.rs3topng(RS3_FILEPATH)

    temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
    temp.close()

    rstviewer.rs3topng(RS3_FILEPATH, temp.name)
    with open(temp.name, 'r') as png_file:
        assert png_str == png_file.read()
        os.unlink(temp.name)

    # generated images might not be 100% identical, probably
    # because of the font used
    with open(EXPECTED_PNG1, 'r') as expected_png_file:
        ident1 = png_str == expected_png_file.read()

    with open(EXPECTED_PNG2, 'r') as expected_png_file:
        ident2 = png_str == expected_png_file.read()

    assert ident1 or ident2
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def test_cli_rs3topng():
    """conversion to PNG on the commandline"""
    temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
    temp_png.close()

    # calling `rstviewer -f png input.rs3 output.png` will end the program
    # with sys.exit(0), so we'll have to catch this here.
    with pytest.raises(SystemExit) as serr:
        cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
        out, err = pytest.capsys.readouterr()
        assert err == 0

    with open(temp_png.name, 'r') as png_file:
        png_str = png_file.read()
        os.unlink(temp_png.name)

    # generated images might not be 100% identical, probably
    # because of the font used
    with open(EXPECTED_PNG1, 'r') as expected_png_file:
        ident1 = png_str == expected_png_file.read()

    with open(EXPECTED_PNG2, 'r') as expected_png_file:
        ident2 = png_str == expected_png_file.read()

    assert ident1 or ident2
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def is_running(name):
    """Test whether task is running under ``name``.

    :param name: name of task
    :type name: ``unicode``
    :returns: ``True`` if task with name ``name`` is running, else ``False``
    :rtype: ``Boolean``

    """
    pidfile = _pid_file(name)
    if not os.path.exists(pidfile):
        return False

    with open(pidfile, 'rb') as file_obj:
        pid = int(file_obj.read().strip())

    if _process_exists(pid):
        return True

    elif os.path.exists(pidfile):
        os.unlink(pidfile)

    return False
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _delete_directory_contents(self, dirpath, filter_func):
        """Delete all files in a directory.

        :param dirpath: path to directory to clear
        :type dirpath: ``unicode`` or ``str``
        :param filter_func function to determine whether a file shall be
            deleted or not.
        :type filter_func ``callable``

        """
        if os.path.exists(dirpath):
            for filename in os.listdir(dirpath):
                if not filter_func(filename):
                    continue
                path = os.path.join(dirpath, filename)
                if os.path.isdir(path):
                    shutil.rmtree(path)
                else:
                    os.unlink(path)
                self.logger.debug('Deleted : %r', path)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _start_kuryr_manage_daemon(self):
        LOG.info("Pool manager started")
        server_address = oslo_cfg.CONF.pool_manager.sock_file
        try:
            os.unlink(server_address)
        except OSError:
            if os.path.exists(server_address):
                raise
        try:
            httpd = UnixDomainHttpServer(server_address, RequestHandler)
            httpd.serve_forever()
        except KeyboardInterrupt:
            pass
        except Exception:
            LOG.exception('Failed to start Pool Manager.')
        httpd.socket.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run(self):
        self.run_command("egg_info")
        from glob import glob

        for pattern in self.match:
            pattern = self.distribution.get_name() + '*' + pattern
            files = glob(os.path.join(self.dist_dir, pattern))
            files = [(os.path.getmtime(f), f) for f in files]
            files.sort()
            files.reverse()

            log.info("%d file(s) matching %s", len(files), pattern)
            files = files[self.keep:]
            for (t, f) in files:
                log.info("Deleting %s", f)
                if not self.dry_run:
                    if os.path.isdir(f):
                        shutil.rmtree(f)
                    else:
                        os.unlink(f)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def zap_pyfiles(self):
        log.info("Removing .py files from temporary directory")
        for base, dirs, files in walk_egg(self.bdist_dir):
            for name in files:
                path = os.path.join(base, name)

                if name.endswith('.py'):
                    log.debug("Deleting %s", path)
                    os.unlink(path)

                if base.endswith('__pycache__'):
                    path_old = path

                    pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc'
                    m = re.match(pattern, name)
                    path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc')
                    log.info("Renaming file from [%s] to [%s]" % (path_old, path_new))
                    try:
                        os.remove(path_new)
                    except OSError:
                        pass
                    os.rename(path_old, path_new)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run(self):
        self.run_command("egg_info")
        from glob import glob

        for pattern in self.match:
            pattern = self.distribution.get_name() + '*' + pattern
            files = glob(os.path.join(self.dist_dir, pattern))
            files = [(os.path.getmtime(f), f) for f in files]
            files.sort()
            files.reverse()

            log.info("%d file(s) matching %s", len(files), pattern)
            files = files[self.keep:]
            for (t, f) in files:
                log.info("Deleting %s", f)
                if not self.dry_run:
                    if os.path.isdir(f):
                        shutil.rmtree(f)
                    else:
                        os.unlink(f)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def save(self):
        """Write changed .pth file back to disk"""
        if not self.dirty:
            return

        rel_paths = list(map(self.make_relative, self.paths))
        if rel_paths:
            log.debug("Saving %s", self.filename)
            lines = self._wrap_lines(rel_paths)
            data = '\n'.join(lines) + '\n'

            if os.path.islink(self.filename):
                os.unlink(self.filename)
            with open(self.filename, 'wt') as f:
                f.write(data)

        elif os.path.exists(self.filename):
            log.debug("Deleting empty %s", self.filename)
            os.unlink(self.filename)

        self.dirty = False
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
    """Create a directory"""
    log("Making dir {} {}:{} {:o}".format(path, owner, group,
                                          perms))
    uid = pwd.getpwnam(owner).pw_uid
    gid = grp.getgrnam(group).gr_gid
    realpath = os.path.abspath(path)
    path_exists = os.path.exists(realpath)
    if path_exists and force:
        if not os.path.isdir(realpath):
            log("Removing non-directory file {} prior to mkdir()".format(path))
            os.unlink(realpath)
            os.makedirs(realpath, perms)
    elif not path_exists:
        os.makedirs(realpath, perms)
    os.chown(realpath, uid, gid)
    os.chmod(realpath, perms)
项目:eClaire    作者:kogan    | 项目源码 | 文件源码
def generate_pdf(card):
    """
    Make a PDF from a card

    :param card: dict from fetcher.py
    :return: Binary PDF buffer
    """
    from eclaire.base import SPECIAL_LABELS

    pdf = FPDF('L', 'mm', (62, 140))
    pdf.set_margins(2.8, 2.8, 2.8)
    pdf.set_auto_page_break(False, margin=0)

    pdf.add_page()

    font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
    pdf.add_font('Clairifont', fname=font, uni=True)
    pdf.set_font('Clairifont', size=48)

    pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')

    qrcode = generate_qr_code(card.url)
    qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
    qrcode.save(qrcode_file)
    pdf.image(qrcode_file, 118, 35, 20, 20)
    os.unlink(qrcode_file)

    # May we never speak of this again.
    pdf.set_fill_color(255, 255, 255)
    pdf.rect(0, 55, 140, 20, 'F')

    pdf.set_font('Clairifont', '', 16)
    pdf.set_y(-4)
    labels = ', '.join([label.name for label in card.labels
                        if label.name not in SPECIAL_LABELS])
    pdf.multi_cell(0, 0, labels, 0, 'R')

    return pdf.output(dest='S')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def makelink(self, tarinfo, targetpath):
        """Make a (symbolic) link called targetpath. If it cannot be created
          (platform limitation), we try to make a copy of the referenced file
          instead of a link.
        """
        if hasattr(os, "symlink") and hasattr(os, "link"):
            # For systems that support symbolic and hard links.
            if tarinfo.issym():
                if os.path.lexists(targetpath):
                    os.unlink(targetpath)
                os.symlink(tarinfo.linkname, targetpath)
            else:
                # See extract().
                if os.path.exists(tarinfo._link_target):
                    if os.path.lexists(targetpath):
                        os.unlink(targetpath)
                    os.link(tarinfo._link_target, targetpath)
                else:
                    self._extract_member(self._find_link_target(tarinfo), targetpath)
        else:
            try:
                self._extract_member(self._find_link_target(tarinfo), targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _find_grail_rc(self):
        import glob
        import pwd
        import socket
        import tempfile
        tempdir = os.path.join(tempfile.gettempdir(),
                               ".grail-unix")
        user = pwd.getpwuid(os.getuid())[0]
        filename = os.path.join(tempdir, user + "-*")
        maybes = glob.glob(filename)
        if not maybes:
            return None
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        for fn in maybes:
            # need to PING each one until we find one that's live
            try:
                s.connect(fn)
            except socket.error:
                # no good; attempt to clean it out, but don't fail:
                try:
                    os.unlink(fn)
                except IOError:
                    pass
            else:
                return s
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, proxies=None, **x509):
        if proxies is None:
            proxies = getproxies()
        assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
        self.proxies = proxies
        self.key_file = x509.get('key_file')
        self.cert_file = x509.get('cert_file')
        self.addheaders = [('User-Agent', self.version)]
        self.__tempfiles = []
        self.__unlink = os.unlink # See cleanup()
        self.tempcache = None
        # Undocumented feature: if you assign {} to tempcache,
        # it is used to cache files retrieved with
        # self.retrieve().  This is not enabled by default
        # since it does not work for changing documents (and I
        # haven't got the logic to check expiration headers
        # yet).
        self.ftpcache = ftpcache
        # Undocumented feature: you can use a different
        # ftp cache by assigning to the .ftpcache member;
        # in case you want logically independent URL openers
        # XXX This is not threadsafe.  Bah.
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def nextfile(self):
        savestdout = self._savestdout
        self._savestdout = 0
        if savestdout:
            sys.stdout = savestdout

        output = self._output
        self._output = 0
        if output:
            output.close()

        file = self._file
        self._file = 0
        if file and not self._isstdin:
            file.close()

        backupfilename = self._backupfilename
        self._backupfilename = 0
        if backupfilename and not self._backup:
            try: os.unlink(backupfilename)
            except OSError: pass

        self._isstdin = False
        self._buffer = []
        self._bufindex = 0
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def close(self,

              remove=os.unlink,error=os.error):

        if self.pipe:
            rc = self.pipe.close()
        else:
            rc = 255
        if self.tmpfile:
            try:
                remove(self.tmpfile)
            except error:
                pass
        return rc

    # Alias
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def putsequences(self, sequences):
        """Write the set of sequences back to the folder."""
        fullname = self.getsequencesfilename()
        f = None
        for key, seq in sequences.iteritems():
            s = IntSet('', ' ')
            s.fromlist(seq)
            if not f: f = open(fullname, 'w')
            f.write('%s: %s\n' % (key, s.tostring()))
        if not f:
            try:
                os.unlink(fullname)
            except os.error:
                pass
        else:
            f.close()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def removemessages(self, list):
        """Remove one or more messages -- may raise os.error."""
        errors = []
        deleted = []
        for n in list:
            path = self.getmessagefilename(n)
            commapath = self.getmessagefilename(',' + str(n))
            try:
                os.unlink(commapath)
            except os.error:
                pass
            try:
                os.rename(path, commapath)
            except os.error, msg:
                errors.append(msg)
            else:
                deleted.append(n)
        if deleted:
            self.removefromallsequences(deleted)
        if errors:
            if len(errors) == 1:
                raise os.error, errors[0]
            else:
                raise os.error, ('multiple errors:', errors)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def copymessage(self, n, tofolder, ton):
        """Copy one message over a specific destination message,
        which may or may not already exist."""
        path = self.getmessagefilename(n)
        # Open it to check that it exists
        f = open(path)
        f.close()
        del f
        topath = tofolder.getmessagefilename(ton)
        backuptopath = tofolder.getmessagefilename(',%d' % ton)
        try:
            os.rename(topath, backuptopath)
        except os.error:
            pass
        ok = 0
        try:
            tofolder.setlast(None)
            shutil.copy2(path, topath)
            ok = 1
        finally:
            if not ok:
                try:
                    os.unlink(topath)
                except os.error:
                    pass
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def pack(self):
        """Re-name messages to eliminate numbering gaps. Invalidates keys."""
        sequences = self.get_sequences()
        prev = 0
        changes = []
        for key in self.iterkeys():
            if key - 1 != prev:
                changes.append((key, prev + 1))
                if hasattr(os, 'link'):
                    os.link(os.path.join(self._path, str(key)),
                            os.path.join(self._path, str(prev + 1)))
                    os.unlink(os.path.join(self._path, str(key)))
                else:
                    os.rename(os.path.join(self._path, str(key)),
                              os.path.join(self._path, str(prev + 1)))
            prev += 1
        self._next_key = prev + 1
        if len(changes) == 0:
            return
        for name, key_list in sequences.items():
            for old, new in changes:
                if old in key_list:
                    key_list[key_list.index(old)] = new
        self.set_sequences(sequences)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _findLib_gcc(name):
        expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
        fdout, ccout = tempfile.mkstemp()
        os.close(fdout)
        cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
              '$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
        try:
            f = os.popen(cmd)
            try:
                trace = f.read()
            finally:
                rv = f.close()
        finally:
            try:
                os.unlink(ccout)
            except OSError, e:
                if e.errno != errno.ENOENT:
                    raise
        if rv == 10:
            raise OSError, 'gcc or cc command not found'
        res = re.search(expr, trace)
        if not res:
            return None
        return res.group(0)
项目:segno    作者:heuer    | 项目源码 | 文件源码
def test_save_svgz_filename():
    import gzip
    qr = segno.make_qr('test')
    f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
    f.close()
    qr.save(f.name)
    f = open(f.name, mode='rb')
    expected = b'\x1f\x8b\x08'  # gzip magic number
    val = f.read(len(expected))
    f.close()
    f = gzip.open(f.name)
    try:
        content = f.read(6)
    finally:
        f.close()
    os.unlink(f.name)
    assert expected == val
    assert b'<?xml ' == content
项目:segno    作者:heuer    | 项目源码 | 文件源码
def test_write_unicode_filename():
    qr = segno.make_qr('test')
    f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False)
    f.close()
    title = 'mürrische Mädchen'
    desc = '?'
    qr.save(f.name, title=title, desc=desc)
    f = open(f.name, mode='rb')
    root = _parse_xml(f)
    f.seek(0)
    val = f.read(6)
    f.close()
    os.unlink(f.name)
    assert b'<?xml ' == val
    assert title == _get_title(root).text
    assert desc == _get_desc(root).text
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def test_infile_outfile(self):
        with tempfile.NamedTemporaryFile() as infile:
            infile.write(self.data.encode())
            infile.flush()
            # outfile will get overwritten by tool, so the delete
            # may not work on some platforms. Do it manually.
            outfile = tempfile.NamedTemporaryFile()
            try:
                self.assertEqual(
                    self.runTool(args=[infile.name, outfile.name]),
                    ''.encode())
                with open(outfile.name, 'rb') as f:
                    self.assertEqual(f.read(), self.expect.encode())
            finally:
                outfile.close()
                if os.path.exists(outfile.name):
                    os.unlink(outfile.name)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def finalize(self):
        assert (self.__asynchronous == 0) and not self.__dirty
        if self.__buildIdCache is not None:
            self.__buildIdCache.close()
            self.__buildIdCache = None
        if self.__lock:
            try:
                os.unlink(self.__lock)
            except FileNotFoundError:
                from .tty import colorize
                from sys import stderr
                print(colorize("Warning: lock file was deleted while Bob was still running!", "33"),
                    file=stderr)
            except OSError as e:
                from .tty import colorize
                from sys import stderr
                print(colorize("Warning: cannot unlock workspace: "+str(e), "33"),
                    file=stderr)
项目:anita    作者:gson1703    | 项目源码 | 文件源码
def download_file(file, url, optional = False):
    try:
        print "Downloading", url + "...",
        sys.stdout.flush()
        my_urlretrieve(url, file)
        print "OK"
        sys.stdout.flush()
    except IOError, e:
        if optional:
            print "missing but optional, so that's OK"
        else:
            print e
        sys.stdout.flush()
        if os.path.exists(file):
            os.unlink(file)
        raise

# Create a file of the given size, containing NULs, without holes.
项目:anita    作者:gson1703    | 项目源码 | 文件源码
def install(self):
        # This is needed for Xen and noemu, where we get the kernel
        # from the dist rather than the installed image
        self.dist.set_workdir(self.workdir)
        if self.vmm == 'noemu':
            self.dist.download()
            self._install()
        else:
            # Already installed?
            if os.path.exists(self.wd0_path()):
                return
            try:
                self._install()
            except:
                if os.path.exists(self.wd0_path()):
                    os.unlink(self.wd0_path())
                raise

    # Boot the virtual machine (installing it first if it's not
    # installed already).  The vmm_args argument applies when
    # booting, but not when installing.  Does not wait for
    # a login prompt.
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def is_running(name):
    """
    Test whether task is running under ``name``

    :param name: name of task
    :type name: ``unicode``
    :returns: ``True`` if task with name ``name`` is running, else ``False``
    :rtype: ``Boolean``

    """
    pidfile = _pid_file(name)
    if not os.path.exists(pidfile):
        return False

    with open(pidfile, 'rb') as file_obj:
        pid = int(file_obj.read().strip())

    if _process_exists(pid):
        return True

    elif os.path.exists(pidfile):
        os.unlink(pidfile)

    return False
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def _delete_directory_contents(self, dirpath, filter_func):
        """Delete all files in a directory

        :param dirpath: path to directory to clear
        :type dirpath: ``unicode`` or ``str``
        :param filter_func function to determine whether a file shall be
            deleted or not.
        :type filter_func ``callable``
        """

        if os.path.exists(dirpath):
            for filename in os.listdir(dirpath):
                if not filter_func(filename):
                    continue
                path = os.path.join(dirpath, filename)
                if os.path.isdir(path):
                    shutil.rmtree(path)
                else:
                    os.unlink(path)
                self.logger.debug('Deleted : %r', path)
项目:Gank-Alfred-Workflow    作者:hujiaweibujidao    | 项目源码 | 文件源码
def is_running(name):
    """
    Test whether task is running under ``name``

    :param name: name of task
    :type name: ``unicode``
    :returns: ``True`` if task with name ``name`` is running, else ``False``
    :rtype: ``Boolean``

    """
    pidfile = _pid_file(name)
    if not os.path.exists(pidfile):
        return False

    with open(pidfile, 'rb') as file_obj:
        pid = int(file_obj.read().strip())

    if _process_exists(pid):
        return True

    elif os.path.exists(pidfile):
        os.unlink(pidfile)

    return False
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service_resume(service_name, init_dir="/etc/init",
                   initd_dir="/etc/init.d"):
    """Resume a system service.

    Reenable starting again at boot. Start the service"""
    upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
    sysv_file = os.path.join(initd_dir, service_name)
    if init_is_systemd():
        service('enable', service_name)
    elif os.path.exists(upstart_file):
        override_path = os.path.join(
            init_dir, '{}.override'.format(service_name))
        if os.path.exists(override_path):
            os.unlink(override_path)
    elif os.path.exists(sysv_file):
        subprocess.check_call(["update-rc.d", service_name, "enable"])
    else:
        raise ValueError(
            "Unable to detect {0} as SystemD, Upstart {1} or"
            " SysV {2}".format(
                service_name, upstart_file, sysv_file))

    started = service_running(service_name)
    if not started:
        started = service_start(service_name)
    return started
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def test_cli_rs3tohtml():
    """conversion to HTML on the commandline"""
    temp_html = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
    temp_html.close()

    cli([RS3_FILEPATH, temp_html.name])
    with open(temp_html.name, 'r') as html_file:
        assert EXPECTED_HTML in html_file.read()
        os.unlink(temp_html.name)
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def rs3topng(rs3_filepath, png_filepath=None):
    """Convert a RS3 file into a PNG image of the RST tree.

    If no output filename is given, the PNG image is returned
    as a string (which is useful for embedding).
    """
    try:
        from selenium import webdriver
        from selenium.common.exceptions import WebDriverException
    except ImportError:
        raise ImportError(
            'Please install selenium: pip install selenium')

    html_str = rs3tohtml(rs3_filepath)

    temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
    temp.write(html_str.encode('utf8'))
    temp.close()

    try:
        driver = webdriver.PhantomJS()
    except WebDriverException as err:
        raise WebDriverException(
           'Please install phantomjs: http://phantomjs.org/\n' + err.msg)

    driver.get(temp.name)
    os.unlink(temp.name)

    png_str = driver.get_screenshot_as_png()
    if png_filepath:
        with open(png_filepath, 'w') as png_file:
            png_file.write(png_str)
    else:
        return png_str
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def release(self):
        """Release the lock by deleting `self.lockfile`."""
        self._locked = False
        try:
            os.unlink(self.lockfile)
        except (OSError, IOError) as err:  # pragma: no cover
            if err.errno != 2:
                raise err
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def cache_data(self, name, data):
        """Save ``data`` to cache under ``name``.

        If ``data`` is ``None``, the corresponding cache file will be
        deleted.

        :param name: name of datastore
        :param data: data to store. This may be any object supported by
                the cache serializer

        """
        serializer = manager.serializer(self.cache_serializer)

        cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))

        if data is None:
            if os.path.exists(cache_path):
                os.unlink(cache_path)
                self.logger.debug('Deleted cache file : %s', cache_path)
            return

        with atomic_writer(cache_path, 'wb') as file_obj:
            serializer.dump(data, file_obj)

        self.logger.debug('Cached data saved at : %s', cache_path)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_bearer_token(self, m_cfg, m_get):
        token_content = (
            "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl"
            "cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc"
            "3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn"
            "Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5"
            "pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi"
            "LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a"
            "WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi"
            "JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef"
            "DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI"
            "iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80"
            "ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW"
            "GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb"
            "5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4"
            "w_8Wkn2y-Tg")
        token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
        try:
            m_cfg.kubernetes.token_file = token_file.name
            token_file.write(token_content)
            token_file.close()
            m_cfg.kubernetes.ssl_verify_server_crt = False

            path = '/test'
            client = k8s_client.K8sClient(self.base_url)
            client.get(path)
            headers = {
                'Authorization': 'Bearer {}'.format(token_content)}
            m_get.assert_called_once_with(
                self.base_url + path, cert=(None, None), headers=headers,
                verify=False)
        finally:
            os.unlink(m_cfg.kubernetes.token_file)
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def get_exit_code(self):
        self.process.wait()
        os.unlink(self.tmp_script_filename)
        return self.process.returncode
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def release(self):
        if not self.is_locked():
            raise NotLocked("%s is not locked" % self.path)
        elif not self.i_am_locking():
            raise NotMyLock("%s is locked, but not by me" % self.path)
        os.unlink(self.lock_file)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def release(self):
        if not self.is_locked():
            raise NotLocked("%s is not locked" % self.path)
        elif not os.path.exists(self.unique_name):
            raise NotMyLock("%s is locked, but not by me" % self.path)
        os.unlink(self.unique_name)
        os.rmdir(self.lock_file)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def break_lock(self):
        if os.path.exists(self.lock_file):
            for name in os.listdir(self.lock_file):
                os.unlink(os.path.join(self.lock_file, name))
            os.rmdir(self.lock_file)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def acquire(self, timeout=None):
        try:
            open(self.unique_name, "wb").close()
        except IOError:
            raise LockFailed("failed to create %s" % self.unique_name)

        timeout = timeout if timeout is not None else self.timeout
        end_time = time.time()
        if timeout is not None and timeout > 0:
            end_time += timeout

        while True:
            # Try and create a hard link to it.
            try:
                os.link(self.unique_name, self.lock_file)
            except OSError:
                # Link creation failed.  Maybe we've double-locked?
                nlinks = os.stat(self.unique_name).st_nlink
                if nlinks == 2:
                    # The original link plus the one I created == 2.  We're
                    # good to go.
                    return
                else:
                    # Otherwise the lock creation failed.
                    if timeout is not None and time.time() > end_time:
                        os.unlink(self.unique_name)
                        if timeout > 0:
                            raise LockTimeout("Timeout waiting to acquire"
                                              " lock for %s" %
                                              self.path)
                        else:
                            raise AlreadyLocked("%s is already locked" %
                                                self.path)
                    time.sleep(timeout is not None and timeout / 10 or 0.1)
            else:
                # Link creation succeeded.  We're good to go.
                return