Python os 模块,readlink() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.readlink()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def lookupmodule(self, filename):
        """Helper function for break/clear parsing -- may be overridden.

        lookupmodule() translates (possibly incomplete) file or module name
        into an absolute file name.
        """
        if os.path.isabs(filename) and  os.path.exists(filename):
            return filename
        f = os.path.join(sys.path[0], filename)
        if  os.path.exists(f) and self.canonic(f) == self.mainpyfile:
            return f
        root, ext = os.path.splitext(filename)
        if ext == '':
            filename = filename + '.py'
        if os.path.isabs(filename):
            return filename
        for dirname in sys.path:
            while os.path.islink(dirname):
                dirname = os.readlink(dirname)
            fullname = os.path.join(dirname, filename)
            if os.path.exists(fullname):
                return fullname
        return None
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _resolve_link(path):
    """Internal helper function.  Takes a path and follows symlinks
    until we either arrive at something that isn't a symlink, or
    encounter a path we've seen before (meaning that there's a loop).
    """
    paths_seen = set()
    while islink(path):
        if path in paths_seen:
            # Already seen this path, so we must have a symlink loop
            return None
        paths_seen.add(path)
        # Resolve where the link points to
        resolved = os.readlink(path)
        if not isabs(resolved):
            dir = dirname(path)
            path = normpath(join(dir, resolved))
        else:
            path = normpath(resolved)
    return path
项目:myDotFiles    作者:GuidoFe    | 项目源码 | 文件源码
def execute(self):
        from ranger.container.file import File

        new_path = self.rest(1)
        cf = self.fm.thisfile

        if not new_path:
            return self.fm.notify('Syntax: relink <newpath>', bad=True)

        if not cf.is_link:
            return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)

        if new_path == os.readlink(cf.path):
            return

        try:
            os.remove(cf.path)
            os.symlink(new_path, cf.path)
        except OSError as err:
            self.fm.notify(err)

        self.fm.reset()
        self.fm.thisdir.pointed_obj = cf
        self.fm.thisfile = cf
项目:molmaker    作者:mnmelo    | 项目源码 | 文件源码
def find_exec(name, error='error', extra=""):
    proc = subprocess.Popen(["which", name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    path = proc.communicate()[0]
    if path:
        path = path.strip()
        if os.path.islink(path):
            path = os.readlink(path)
        return path
    else:
        if error in ('error', 'warning'):
            sys.stderr.write("%s: can't find %s in $PATH.\n" % (error, name))
            if extra:
                sys.stderr.write("%s\n" % (extra,))
            if error == 'error':
                sys.exit()
        return False


# Classes
##########
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def lookupmodule(self, filename):
        """Helper function for break/clear parsing -- may be overridden.

        lookupmodule() translates (possibly incomplete) file or module name
        into an absolute file name.
        """
        if os.path.isabs(filename) and  os.path.exists(filename):
            return filename
        f = os.path.join(sys.path[0], filename)
        if  os.path.exists(f) and self.canonic(f) == self.mainpyfile:
            return f
        root, ext = os.path.splitext(filename)
        if ext == '':
            filename = filename + '.py'
        if os.path.isabs(filename):
            return filename
        for dirname in sys.path:
            while os.path.islink(dirname):
                dirname = os.readlink(dirname)
            fullname = os.path.join(dirname, filename)
            if os.path.exists(fullname):
                return fullname
        return None
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def copy(self, target, mode=False):
        """ copy path to target."""
        if self.check(file=1):
            if target.check(dir=1):
                target = target.join(self.basename)
            assert self!=target
            copychunked(self, target)
            if mode:
                copymode(self.strpath, target.strpath)
        else:
            def rec(p):
                return p.check(link=0)
            for x in self.visit(rec=rec):
                relpath = x.relto(self)
                newx = target.join(relpath)
                newx.dirpath().ensure(dir=1)
                if x.check(link=1):
                    newx.mksymlinkto(x.readlink())
                    continue
                elif x.check(file=1):
                    copychunked(x, newx)
                elif x.check(dir=1):
                    newx.ensure(dir=1)
                if mode:
                    copymode(x.strpath, newx.strpath)
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _getAppDirConfig(self, appname):
        try:
            appconfig = self.app.configuration.get("SplunkDeployment.apps")[appname]
            appdir = appconfig.get("directory")
            if os.path.islink(appdir):
                appdir = os.readlink(appdir)
            if not os.path.isdir(appdir):
                raise AppNotFoundException("The app \"" + appname  + "\" does not exist in \"" + appdir + "\".")
            return [appdir, appconfig] 
        except AppNotFoundException as e:
            raise e
        except:
            pass
        appconfig = self.app.configuration.get("SplunkDeployment.apps.default")
        appdir = os.path.join(
            appconfig.get("directory"),
            appname
        )

        if os.path.islink(appdir):
            appdir = os.readlink(appdir)
        if not os.path.isdir(appdir):
            raise AppNotFoundException("The app \"" + appname  + "\" does not exist in \"" + appdir + "\".")
        return [appdir, appconfig]
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
项目:arch-setup-i3wm    作者:i-PUSH    | 项目源码 | 文件源码
def execute(self):
        from ranger.container.file import File

        new_path = self.rest(1)
        cf = self.fm.thisfile

        if not new_path:
            return self.fm.notify('Syntax: relink <newpath>', bad=True)

        if not cf.is_link:
            return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)

        if new_path == os.readlink(cf.path):
            return

        try:
            os.remove(cf.path)
            os.symlink(new_path, cf.path)
        except OSError as err:
            self.fm.notify(err)

        self.fm.reset()
        self.fm.thisdir.pointed_obj = cf
        self.fm.thisfile = cf
项目:iconograph    作者:robot-tools    | 项目源码 | 文件源码
def _SetCurrent(self, image):
    filename = '%d.iso' % (image['timestamp'])
    path = os.path.join(self._image_dir, filename)
    current_path = os.path.join(self._image_dir, 'current')

    try:
      link = os.readlink(current_path)
      link_path = os.path.join(self._image_dir, link)
      if link_path == path:
        return
    except FileNotFoundError:
      pass

    print('Changing current link to:', filename, flush=True)
    temp_path = tempfile.mktemp(dir=self._image_dir)
    os.symlink(filename, temp_path)
    os.rename(temp_path, current_path)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_function_by_ifname(ifname):
    """Get the function by the interface name

    Given the device name, returns the PCI address of a device
    and returns True if the address is in a physical function.
    """
    dev_path = "/sys/class/net/%s/device" % ifname
    sriov_totalvfs = 0
    if os.path.isdir(dev_path):
        try:
            # sriov_totalvfs contains the maximum possible VFs for this PF
            with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd:
                sriov_totalvfs = int(fd.read())
                return (os.readlink(dev_path).strip("./"),
                        sriov_totalvfs > 0)
        except (IOError, ValueError):
            return os.readlink(dev_path).strip("./"), False
    return None, False
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_vf_num_by_pci_address(pci_addr):
    """Get the VF number based on a VF's pci address

    A VF is associated with an VF number, which ip link command uses to
    configure it. This number can be obtained from the PCI device filesystem.
    """
    VIRTFN_RE = re.compile("virtfn(\d+)")
    virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr)
    vf_num = None
    try:
        for vf_path in glob.iglob(virtfns_path):
            if re.search(pci_addr, os.readlink(vf_path)):
                t = VIRTFN_RE.search(vf_path)
                vf_num = t.group(1)
                break
    except Exception:
        pass
    if vf_num is None:
        raise exception.PciDeviceNotFoundById(id=pci_addr)
    return vf_num
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_update_link_to(self):
        for ver in six.moves.range(1, 6):
            for kind in ALL_FOUR:
                self._write_out_kind(kind, ver)
                self.assertEqual(ver, self.test_rc.current_version(kind))
        # pylint: disable=protected-access
        self.test_rc._update_link_to("cert", 3)
        self.test_rc._update_link_to("privkey", 2)
        self.assertEqual(3, self.test_rc.current_version("cert"))
        self.assertEqual(2, self.test_rc.current_version("privkey"))
        self.assertEqual(5, self.test_rc.current_version("chain"))
        self.assertEqual(5, self.test_rc.current_version("fullchain"))
        # Currently we are allowed to update to a version that doesn't exist
        self.test_rc._update_link_to("chain", 3000)
        # However, current_version doesn't allow querying the resulting
        # version (because it's a broken link).
        self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)),
                         "chain3000.pem")
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def get_link_target(link):
    """Get an absolute path to the target of link.

    :param str link: Path to a symbolic link

    :returns: Absolute path to the target of link
    :rtype: str

    :raises .CertStorageError: If link does not exists.

    """
    try:
        target = os.readlink(link)
    except OSError:
        raise errors.CertStorageError(
            "Expected {0} to be a symlink".format(link))

    if not os.path.isabs(target):
        target = os.path.join(os.path.dirname(link), target)
    return os.path.abspath(target)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _fix_symlinks(self):
        """Fixes symlinks in the event of an incomplete version update.

        If there is no problem with the current symlinks, this function
        has no effect.

        """
        previous_symlinks = self._previous_symlinks()
        if all(os.path.exists(link[1]) for link in previous_symlinks):
            for kind, previous_link in previous_symlinks:
                current_link = getattr(self, kind)
                if os.path.lexists(current_link):
                    os.unlink(current_link)
                os.symlink(os.readlink(previous_link), current_link)

        for _, link in previous_symlinks:
            if os.path.exists(link):
                os.unlink(link)
项目:infraview    作者:a-dekker    | 项目源码 | 文件源码
def packet_socket():
    '''
    Function to return a list of pids and process names utilizing packet sockets.
    '''

    packetcontent = _packetload()
    packetresult = []
    for line in packetcontent:
        line_array = _remove_empty(line.split(' '))
        inode = line_array[8].rstrip()
        pid = _get_pid_of_inode(inode)
        try:
            exe = os.readlink('/proc/' + pid + '/exe')
        except BaseException:
            exe = None

        nline = [pid, exe]
        packetresult.append(nline)
    return packetresult
项目:ncbi-genome-download    作者:kblin    | 项目源码 | 文件源码
def need_to_create_symlink(directory, checksums, filetype, symlink_path):
    """Check if we need to create a symlink for an existing file"""
    # If we don't have a symlink path, we don't need to create a symlink
    if symlink_path is None:
        return False

    pattern = EFormats.get_content(filetype)
    filename, _ = get_name_and_checksum(checksums, pattern)
    full_filename = os.path.join(directory, filename)
    symlink_name = os.path.join(symlink_path, filename)

    if os.path.islink(symlink_name):
        existing_link = os.readlink(symlink_name)
        if full_filename == existing_link:
            return False

    return True
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def is_git_dir(d):
    """ This is taken from the git setup.c:is_git_directory
    function.

    @throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here,
            but at least clearly indicates that we don't support it.
            There is the unlikely danger to throw if we see directories which just look like a worktree dir,
            but are none."""
    if osp.isdir(d):
        if osp.isdir(osp.join(d, 'objects')) and osp.isdir(osp.join(d, 'refs')):
            headref = osp.join(d, 'HEAD')
            return osp.isfile(headref) or \
                (osp.islink(headref) and
                 os.readlink(headref).startswith('refs'))
        elif (osp.isfile(osp.join(d, 'gitdir')) and
              osp.isfile(osp.join(d, 'commondir')) and
              osp.isfile(osp.join(d, 'gitfile'))):
            raise WorkTreeRepositoryUnsupported(d)
    return False
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def lookupmodule(self, filename):
        """Helper function for break/clear parsing -- may be overridden.

        lookupmodule() translates (possibly incomplete) file or module name
        into an absolute file name.
        """
        if os.path.isabs(filename) and  os.path.exists(filename):
            return filename
        f = os.path.join(sys.path[0], filename)
        if  os.path.exists(f) and self.canonic(f) == self.mainpyfile:
            return f
        root, ext = os.path.splitext(filename)
        if ext == '':
            filename = filename + '.py'
        if os.path.isabs(filename):
            return filename
        for dirname in sys.path:
            while os.path.islink(dirname):
                dirname = os.readlink(dirname)
            fullname = os.path.join(dirname, filename)
            if os.path.exists(fullname):
                return fullname
        return None
项目:MyPythonLib    作者:BillWang139967    | 项目源码 | 文件源码
def _read_fstab(self, line, **params):
        if not line or line.startswith('#'): return
        fields = line.split()
        dev = fields[0]
        config = {
            'dev':    fields[0],
            'mount':  fields[1],
            'fstype': fields[2],
        }
        if dev.startswith('/dev/'):
            try:
                devlink = os.readlink(dev)
                dev = os.path.abspath(os.path.join(os.path.dirname(devlink), dev))
            except:
                pass
            dev = dev.replace('/dev/', '')
            if dev == params['devname']:
                return config
        elif dev.startswith('UUID='):
            uuid = dev.replace('UUID=', '')
            partinfo = si.Server.partinfo(devname=params['devname'])
            if partinfo['uuid'] == uuid:
                return config
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def lookupmodule(self, filename):
        """Helper function for break/clear parsing -- may be overridden.

        lookupmodule() translates (possibly incomplete) file or module name
        into an absolute file name.
        """
        if os.path.isabs(filename) and  os.path.exists(filename):
            return filename
        f = os.path.join(sys.path[0], filename)
        if  os.path.exists(f) and self.canonic(f) == self.mainpyfile:
            return f
        root, ext = os.path.splitext(filename)
        if ext == '':
            filename = filename + '.py'
        if os.path.isabs(filename):
            return filename
        for dirname in sys.path:
            while os.path.islink(dirname):
                dirname = os.readlink(dirname)
            fullname = os.path.join(dirname, filename)
            if os.path.exists(fullname):
                return fullname
        return None
项目:aurora    作者:carnby    | 项目源码 | 文件源码
def load_models(self):
        now = datetime.now()
        self.rec_candidates = []

        if self.last_model_update is None or (now - self.last_model_update).days >= 1:
            print('loading model', now)
            model_path = '{0}/it-topics'.format(settings.PORTRAIT_FOLDER)
            lda_filename = os.readlink('{0}/current_lda_model.gensim'.format(model_path))

            self.lda_model = gensim.models.ldamulticore.LdaMulticore.load(lda_filename)
            self.topic_graph = nx.read_gpickle('{0}/current_topic_graph.nx'.format(model_path))

            with gzip.open('{0}/current_candidates.json.gz'.format(model_path), 'rt') as f:
                self.rec_candidates = json.load(f)
                print('loaded', len(self.rec_candidates), 'candidates')

            self.last_model_update = datetime.now()
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def readlink(path):
    """Wrapper around os.readlink()."""
    assert isinstance(path, basestring), path
    path = os.readlink(path)
    # readlink() might return paths containing null bytes ('\x00')
    # resulting in "TypeError: must be encoded string without NULL
    # bytes, not str" errors when the string is passed to other
    # fs-related functions (os.*, open(), ...).
    # Apparently everything after '\x00' is garbage (we can have
    # ' (deleted)', 'new' and possibly others), see:
    # https://github.com/giampaolo/psutil/issues/717
    path = path.split('\x00')[0]
    # Certain paths have ' (deleted)' appended. Usually this is
    # bogus as the file actually exists. Even if it doesn't we
    # don't care.
    if path.endswith(' (deleted)') and not path_exists_strict(path):
        path = path[:-10]
    return path
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def get_proc_inodes(self, pid):
        inodes = defaultdict(list)
        for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)):
            try:
                inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd))
            except OSError as err:
                # ENOENT == file which is gone in the meantime;
                # os.stat('/proc/%s' % self.pid) will be done later
                # to force NSP (if it's the case)
                if err.errno in (errno.ENOENT, errno.ESRCH):
                    continue
                elif err.errno == errno.EINVAL:
                    # not a link
                    continue
                else:
                    raise
            else:
                if inode.startswith('socket:['):
                    # the process is using a socket
                    inode = inode[8:][:-1]
                    inodes[inode].append((pid, int(fd)))
        return inodes
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def exe(self):
        try:
            return readlink("%s/%s/exe" % (self._procfs_path, self.pid))
        except OSError as err:
            if err.errno in (errno.ENOENT, errno.ESRCH):
                # no such file error; might be raised also if the
                # path actually exists for system processes with
                # low pids (about 0-20)
                if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)):
                    return ""
                else:
                    if not pid_exists(self.pid):
                        raise NoSuchProcess(self.pid, self._name)
                    else:
                        raise ZombieProcess(self.pid, self._name, self._ppid)
            if err.errno in (errno.EPERM, errno.EACCES):
                raise AccessDenied(self.pid, self._name)
            raise
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def cwd(self):
        """Return process current working directory."""
        # sometimes we get an empty string, in which case we turn
        # it into None
        if OPENBSD and self.pid == 0:
            return None  # ...else it would raise EINVAL
        elif NETBSD:
            with wrap_exceptions_procfs(self):
                return os.readlink("/proc/%s/cwd" % self.pid)
        elif hasattr(cext, 'proc_open_files'):
            # FreeBSD < 8 does not support functions based on
            # kinfo_getfile() and kinfo_getvmmap()
            return cext.proc_cwd(self.pid) or None
        else:
            raise NotImplementedError(
                "supported only starting from FreeBSD 8" if
                FREEBSD else "")
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def terminal(self):
        procfs_path = self._procfs_path
        hit_enoent = False
        tty = wrap_exceptions(
            cext.proc_basic_info(self.pid, self._procfs_path)[0])
        if tty != cext.PRNODEV:
            for x in (0, 1, 2, 255):
                try:
                    return os.readlink(
                        '%s/%d/path/%d' % (procfs_path, self.pid, x))
                except OSError as err:
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def open_files(self):
        retlist = []
        hit_enoent = False
        procfs_path = self._procfs_path
        pathdir = '%s/%d/path' % (procfs_path, self.pid)
        for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)):
            path = os.path.join(pathdir, fd)
            if os.path.islink(path):
                try:
                    file = os.readlink(path)
                except OSError as err:
                    # ENOENT == file which is gone in the meantime
                    if err.errno == errno.ENOENT:
                        hit_enoent = True
                        continue
                    raise
                else:
                    if isfile_strict(file):
                        retlist.append(_common.popenfile(file, int(fd)))
        if hit_enoent:
            # raise NSP if the process disappeared on us
            os.stat('%s/%s' % (procfs_path, self.pid))
        return retlist
项目:threatdetectionservice    作者:flyballlabs    | 项目源码 | 文件源码
def copyfile(src, dest, symlink=True):
    if not os.path.exists(src):
        # Some bad symlink in the src
        logger.warn('Cannot find file %s (bad symlink)', src)
        return
    if os.path.exists(dest):
        logger.debug('File %s already exists', dest)
        return
    if not os.path.exists(os.path.dirname(dest)):
        logger.info('Creating parent directories for %s' % os.path.dirname(dest))
        os.makedirs(os.path.dirname(dest))
    if not os.path.islink(src):
        srcpath = os.path.abspath(src)
    else:
        srcpath = os.readlink(src)
    if symlink and hasattr(os, 'symlink') and not is_win:
        logger.info('Symlinking %s', dest)
        try:
            os.symlink(srcpath, dest)
        except (OSError, NotImplementedError):
            logger.info('Symlinking failed, copying to %s', dest)
            copyfileordir(src, dest)
    else:
        logger.info('Copying to %s', dest)
        copyfileordir(src, dest)
项目:kostka    作者:pixers    | 项目源码 | 文件源码
def mount_lowerdirs(self):
        # First, build a dependency graph in order to avoid duplicate entries
        dependencies = {}

        def dependency_path(dep):
            container = self.__class__(dep['imageName'])
            path = dep.get('path', os.readlink(os.path.join(container.path, 'overlay.fs')))
            return os.path.join(dep['imageName'], path)

        pending_deps = set(map(dependency_path, self.dependencies))
        while len(pending_deps) > 0:
            path = pending_deps.pop()
            name = path.split('/')[-2]
            if name not in dependencies:
                dependencies[path] = set(map(dependency_path, self.__class__(name).dependencies))
                pending_deps |= dependencies[path]

        # Then sort it topologically. The list is reversed, because overlayfs
        # will check the mounts in order they are given, so the base fs has to
        # be the last one.
        dependencies = reversed(list(toposort_flatten(dependencies)))
        return ':'.join(os.path.join(self.metadata_dir, dep) for dep in dependencies)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def recursive_set_attributes(module, b_path, follow, file_args):
    changed = False
    for b_root, b_dirs, b_files in os.walk(b_path):
        for b_fsobj in b_dirs + b_files:
            b_fsname = os.path.join(b_root, b_fsobj)
            if not os.path.islink(b_fsname):
                tmp_file_args = file_args.copy()
                tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
            else:
                tmp_file_args = file_args.copy()
                tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
                if follow:
                    b_fsname = os.path.join(b_root, os.readlink(b_fsname))
                    if os.path.isdir(b_fsname):
                        changed |= recursive_set_attributes(module, b_fsname, follow, file_args)
                    tmp_file_args = file_args.copy()
                    tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict')
                    changed |= module.set_fs_attributes_if_different(tmp_file_args, changed)
    return changed
项目:build-calibre    作者:kovidgoyal    | 项目源码 | 文件源码
def lcopy(src, dst, no_hardlinks=False):
    try:
        if issymlink(src):
            linkto = os.readlink(src)
            os.symlink(linkto, dst)
            return True
        else:
            if no_hardlinks:
                shutil.copy(src, dst)
            else:
                hardlink(src, dst)
            return False
    except EnvironmentError as err:
        if err.errno == errno.EEXIST:
            os.unlink(dst)
            return lcopy(src, dst)
        raise
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_file_contents(code: models.File) -> bytes:
    """Get the contents of the given :class:`.models.File`.

    :param code: The file object to read.
    :returns: The contents of the file with newlines.
    """
    if code.is_directory:
        raise APIException(
            'Cannot display this file as it is a directory.',
            f'The selected file with id {code.id} is a directory.',
            APICodes.OBJECT_WRONG_TYPE, 400
        )

    filename = code.get_diskname()
    if os.path.islink(filename):
        raise APIException(
            f'This file is a symlink to `{os.readlink(filename)}`.',
            'The file {} is a symlink'.format(code.id), APICodes.INVALID_STATE,
            410
        )
    with open(filename, 'rb') as codefile:
        return codefile.read()