我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.EACCES。
def safe_listdir(path): """ Attempt to list contents of path, but suppress some exceptions. """ try: return os.listdir(path) except (PermissionError, NotADirectoryError): pass except OSError as e: # Ignore the directory if does not exist, not a directory or # permission denied ignorable = ( e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT) # Python 2 on Windows needs to be handled this way :( or getattr(e, "winerror", None) == 267 ) if not ignorable: raise return ()
def __init__(self, path, factory=None, create=True): """Initialize a single-file mailbox.""" Mailbox.__init__(self, path, factory, create) try: f = open(self._path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: if create: f = open(self._path, 'wb+') else: raise NoSuchMailboxError(self._path) elif e.errno in (errno.EACCES, errno.EROFS): f = open(self._path, 'rb') else: raise self._file = f self._toc = None self._next_key = 0 self._pending = False # No changes require rewriting the file. self._locked = False self._file_length = None # Used to record mailbox size
def __init__(self, debug=False, logfile=None): logging.Logger.__init__(self, 'VirtualBMC') try: if logfile is not None: self.handler = logging.FileHandler(logfile) else: self.handler = logging.StreamHandler() formatter = logging.Formatter(DEFAULT_LOG_FORMAT) self.handler.setFormatter(formatter) self.addHandler(self.handler) if debug: self.setLevel(logging.DEBUG) else: self.setLevel(logging.INFO) except IOError, e: if e.errno == errno.EACCES: pass
def live(self): if not self.proc_path: return self.info("Proc path: %s" % self.proc_path) key = choice(self.proc_keys) filename = path_join(self.proc_path, key) data = self.generator.createValue() self.info("Write data in %s: (len=%s) %r" % (filename, len(data), data)) try: output = open(filename, 'w') output.write(data) output.close() except IOError, err: if err.errno in (EINVAL, EPERM): pass elif err.errno in (ENOENT, EACCES): self.error("Unable to write %s: %s" % (filename, err)) self.removeKey(key) else: raise
def readBytes(self, address, size): debug("Read %s bytes at %s" % (size, formatAddress(address))) if not self.read_mem_file: filename = '/proc/%u/mem' % self.pid try: self.read_mem_file = open(filename, 'rb', 0) except IOError, err: message = "Unable to open %s: fallback to ptrace implementation" % filename if err.errno != EACCES: error(message) else: info(message) self.readBytes = self._readBytes return self.readBytes(address, size) try: mem = self.read_mem_file mem.seek(address) return mem.read(size) except (IOError, ValueError), err: raise ProcessError(self, "readBytes(%s, %s) error: %s" % ( formatAddress(address), size, err))
def get_free_port(port, port_retries): for port in random_ports(port, port_retries+1): try: s = socket.socket() s.bind(('', port)) port = s.getsockname()[1] s.close() return port except socket.error as e: if e.errno == errno.EADDRINUSE: print('The port %i is already in use, trying another port.' % port) continue elif e.errno in (errno.EACCES, getattr(errno, 'WSAEACCES', errno.EACCES)): print("Permission to listen on port %i denied" % port) continue else: raise return None # http://stackoverflow.com/questions/13593223/making-sure-a-python-script-with-subprocesses-dies-on-sigint
def set_directory(self, directory): """Set the directory where the downloaded file will be placed. May raise OSError if the supplied directory path is not suitable. """ if not path.exists(directory): raise OSError(errno.ENOENT, "You see no directory there.", directory) if not path.isdir(directory): raise OSError(errno.ENOTDIR, "You cannot put a file into " "something which is not a directory.", directory) if not os.access(directory, os.X_OK | os.W_OK): raise OSError(errno.EACCES, "This directory is too hard to write in to.", directory) self.destDir = directory
def _testDetectFilesRemoved(self): writeFileName = sibpath(plugins.__file__, 'pluginextra.py') try: wf = file(writeFileName, 'w') except IOError, ioe: if ioe.errno == errno.EACCES: raise unittest.SkipTest( "No permission to add things to twisted.plugins") else: raise else: try: wf.write(begintest) wf.close() # Generate a cache with pluginextra in it. list(plugin.getPlugins(plugin.ITestPlugin)) finally: self._unimportPythonModule( sys.modules['twisted.plugins.pluginextra'], True) plgs = list(plugin.getPlugins(plugin.ITestPlugin)) self.assertEquals(1, len(plgs))
def _mkstemp_inner(dir, pre, suf, flags): """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" names = _get_candidate_names() for seq in xrange(TMP_MAX): name = names.next() file = _os.path.join(dir, pre + name + suf) try: fd = _os.open(file, flags, 0600) _set_cloexec(fd) return (fd, _os.path.abspath(file)) except OSError, e: if e.errno == _errno.EEXIST: continue # try again if _os.name == 'nt' and e.errno == _errno.EACCES: # On windows, when a directory with the chosen name already # exists, EACCES error code is returned instead of EEXIST. continue raise raise IOError, (_errno.EEXIST, "No usable temporary file name found") # User visible interfaces.
def __init__(self, path, factory=None, create=True): """Initialize a single-file mailbox.""" Mailbox.__init__(self, path, factory, create) try: f = open(self._path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: if create: f = open(self._path, 'wb+') else: raise NoSuchMailboxError(self._path) elif e.errno in (errno.EACCES, errno.EROFS): f = open(self._path, 'rb') else: raise self._file = f self._toc = None self._next_key = 0 self._pending = False # No changes require rewriting the file. self._pending_sync = False # No need to sync the file self._locked = False self._file_length = None # Used to record mailbox size
def acquire(self, blocking=False): import fcntl # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY self.fd = os.open(self.filename, flags) mode = fcntl.LOCK_EX if not blocking: mode |= fcntl.LOCK_NB try: fcntl.flock(self.fd, mode) self.locked = True return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES): raise os.close(self.fd) self.fd = None return False
def acquire(self, blocking=False): import msvcrt # @UnresolvedImport flags = os.O_CREAT | os.O_WRONLY mode = msvcrt.LK_NBLCK if blocking: mode = msvcrt.LK_LOCK self.fd = os.open(self.filename, flags) try: msvcrt.locking(self.fd, mode, 1) return True except IOError: e = sys.exc_info()[1] if e.errno not in (errno.EAGAIN, errno.EACCES, errno.EDEADLK): raise os.close(self.fd) self.fd = None return False
def _try_acquire(self, blocking, watch): try: self.trylock() except IOError as e: if e.errno in (errno.EACCES, errno.EAGAIN): if not blocking or watch.expired(): return False else: raise _utils.RetryAgain() else: raise threading.ThreadError("Unable to acquire lock on" " `%(path)s` due to" " %(exception)s" % { 'path': self.path, 'exception': e, }) else: return True
def get_all_inodes(self): inodes = {} for pid in pids(): try: inodes.update(self.get_proc_inodes(pid)) except OSError as err: # os.listdir() is gonna raise a lot of access denied # exceptions in case of unprivileged user; that's fine # as we'll just end up returning a connection with PID # and fd set to None anyway. # Both netstat -an and lsof does the same so it's # unlikely we can do any better. # ENOENT just means a PID disappeared on us. if err.errno not in ( errno.ENOENT, errno.ESRCH, errno.EPERM, errno.EACCES): raise return inodes
def wrap_exceptions(fun): """Decorator which translates bare OSError and IOError exceptions into NoSuchProcess and AccessDenied. """ @functools.wraps(fun) def wrapper(self, *args, **kwargs): try: return fun(self, *args, **kwargs) except EnvironmentError as err: # ENOENT (no such file or directory) gets raised on open(). # ESRCH (no such process) can get raised on read() if # process is gone in meantime. if err.errno in (errno.ENOENT, errno.ESRCH): raise NoSuchProcess(self.pid, self._name) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise return wrapper
def exe(self): try: return readlink("%s/%s/exe" % (self._procfs_path, self.pid)) except OSError as err: if err.errno in (errno.ENOENT, errno.ESRCH): # no such file error; might be raised also if the # path actually exists for system processes with # low pids (about 0-20) if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)): return "" else: if not pid_exists(self.pid): raise NoSuchProcess(self.pid, self._name) else: raise ZombieProcess(self.pid, self._name, self._ppid) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise
def wrap_exceptions(fun): """Decorator which translates bare OSError exceptions into NoSuchProcess and AccessDenied. """ @functools.wraps(fun) def wrapper(self, *args, **kwargs): try: return fun(self, *args, **kwargs) except OSError as err: if err.errno == errno.ESRCH: if not pid_exists(self.pid): raise NoSuchProcess(self.pid, self._name) else: raise ZombieProcess(self.pid, self._name, self._ppid) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise return wrapper
def _send_signal(self, sig): if self.pid == 0: # see "man 2 kill" raise ValueError( "preventing sending signal to process with PID 0 as it " "would affect every process in the process group of the " "calling process (os.getpid()) instead of PID 0") try: os.kill(self.pid, sig) except OSError as err: if err.errno == errno.ESRCH: if OPENBSD and pid_exists(self.pid): # We do this because os.kill() lies in case of # zombie processes. raise ZombieProcess(self.pid, self._name, self._ppid) else: self._gone = True raise NoSuchProcess(self.pid, self._name) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise
def wrap_exceptions(fun): """Call callable into a try/except clause and translate ENOENT, EACCES and EPERM in NoSuchProcess or AccessDenied exceptions. """ def wrapper(self, *args, **kwargs): try: return fun(self, *args, **kwargs) except EnvironmentError as err: # ENOENT (no such file or directory) gets raised on open(). # ESRCH (no such process) can get raised on read() if # process is gone in meantime. if err.errno in (errno.ENOENT, errno.ESRCH): if not pid_exists(self.pid): raise NoSuchProcess(self.pid, self._name) else: raise ZombieProcess(self.pid, self._name, self._ppid) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise return wrapper
def __init__(self, debug=False, logfile=None): logging.Logger.__init__(self, 'VirtualBMC') try: if logfile is not None: self.handler = logging.FileHandler(logfile) else: self.handler = logging.StreamHandler() formatter = logging.Formatter(DEFAULT_LOG_FORMAT) self.handler.setFormatter(formatter) self.addHandler(self.handler) if debug: self.setLevel(logging.DEBUG) else: self.setLevel(logging.INFO) except IOError as e: if e.errno == errno.EACCES: pass
def convert_errno(e): """ Convert an errno value (as from an C{OSError} or C{IOError}) into a standard SFTP result code. This is a convenience function for trapping exceptions in server code and returning an appropriate result. @param e: an errno code, as from C{OSError.errno}. @type e: int @return: an SFTP error code like L{SFTP_NO_SUCH_FILE}. @rtype: int """ if e == errno.EACCES: # permission denied return SFTP_PERMISSION_DENIED elif (e == errno.ENOENT) or (e == errno.ENOTDIR): # no such file return SFTP_NO_SUCH_FILE else: return SFTP_FAILURE
def _convert_status(self, msg): """ Raises EOFError or IOError on error status; otherwise does nothing. """ code = msg.get_int() text = msg.get_string() if code == SFTP_OK: return elif code == SFTP_EOF: raise EOFError(text) elif code == SFTP_NO_SUCH_FILE: # clever idea from john a. meinel: map the error codes to errno raise IOError(errno.ENOENT, text) elif code == SFTP_PERMISSION_DENIED: raise IOError(errno.EACCES, text) else: raise IOError(text)
def _try_except_permissionerror_iter(try_iter, except_iter): if sys.version_info >= (3, 3): try: for x in try_iter(): yield x except PermissionError as exc: for x in except_iter(exc): yield x else: try: for x in try_iter(): yield x except EnvironmentError as exc: if exc.errno not in (EPERM, EACCES): raise else: for x in except_iter(exc): yield x
def test_bind_by_addr(self, llc, raw, ldl, dlc): llc.bind(raw, 16) assert llc.getsockname(raw) == 16 for i, sock in enumerate([ldl, dlc]): with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(sock, 16) assert excinfo.value.errno == errno.EACCES llc.bind(ldl, 63) assert llc.getsockname(ldl) == 63 with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, 63) assert excinfo.value.errno == errno.EADDRINUSE with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, 64) assert excinfo.value.errno == errno.EFAULT with pytest.raises(nfc.llcp.Error) as excinfo: llc.bind(dlc, -1) assert excinfo.value.errno == errno.EFAULT llc.bind(dlc, 62) assert llc.getsockname(dlc) == 62
def test_selector_error(self): s = self.make_selector() rd, wr = self.make_socketpair() s.register(rd, selectors2.EVENT_READ) def alarm_exception(*args): err = OSError() err.errno = errno.EACCES raise err self.set_alarm(SHORT_SELECT, alarm_exception) try: s.select(LONG_SELECT) except OSError as e: self.assertEqual(e.errno, errno.EACCES) except Exception as e: self.fail("Raised incorrect exception: " + str(e)) else: self.fail("select() didn't raise OSError") # Test ensures that _syscall_wrapper properly raises the # exception that is raised from an interrupt handler.
def send(self, req, **kwargs): resp = requests.Response() try: resp.raw = urlopen(req.url) content_length = resp.raw.headers.get('content-length') if content_length is not None: resp.headers['Content-Length'] = content_length except IOError as e: if e.errno == errno.EACCES: resp.status_code = requests.codes.forbidden elif e.errno == errno.ENOENT: resp.status_code = requests.codes.not_found else: resp.status_code = requests.codes.bad_request except OSError: resp.status_code = requests.codes.bad_request else: resp.status_code = requests.codes.ok return resp
def test_non_retryable_exception_handling(self): """ Tests a resumable upload that fails with a non-retryable exception """ harness = CallbackTestHarness( exception=OSError(errno.EACCES, 'Permission denied')) res_upload_handler = ResumableUploadHandler(num_retries=1) small_src_file_as_string, small_src_file = self.make_small_file() small_src_file.seek(0) dst_key = self._MakeKey(set_contents=False) try: dst_key.set_contents_from_file( small_src_file, cb=harness.call, res_upload_handler=res_upload_handler) self.fail('Did not get expected OSError') except OSError, e: # Ensure the error was re-raised. self.assertEqual(e.errno, 13)
def test_non_retryable_exception_handling(self): """ Tests resumable download that fails with a non-retryable exception """ harness = CallbackTestHarness( exception=OSError(errno.EACCES, 'Permission denied')) res_download_handler = ResumableDownloadHandler(num_retries=1) dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() try: small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) self.fail('Did not get expected OSError') except OSError, e: # Ensure the error was re-raised. self.assertEqual(e.errno, 13)
def test_get_ioerror_slow_path(cache): cache.reset('eviction_policy', 'least-recently-used') cache.set(0, 0) disk = mock.Mock() put = mock.Mock() fetch = mock.Mock() disk.put = put put.side_effect = [(0, True)] disk.fetch = fetch io_error = IOError() io_error.errno = errno.EACCES fetch.side_effect = io_error with mock.patch.object(cache, '_disk', disk): cache.get(0)
def test_pop_ioerror_eacces(cache): assert cache.set(0, 0) disk = mock.Mock() put = mock.Mock() fetch = mock.Mock() disk.put = put put.side_effect = [(0, True)] disk.fetch = fetch io_error = IOError() io_error.errno = errno.EACCES fetch.side_effect = io_error with mock.patch.object(cache, '_disk', disk): cache.pop(0)