Python errno 模块,EBUSY 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.EBUSY

项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _unsafe_writes(self, src, dest, exception):
      # sadly there are some situations where we cannot ensure atomicity, but only if
      # the user insists and we get the appropriate error we update the file unsafely
      if exception.errno == errno.EBUSY:
          #TODO: issue warning that this is an unsafe operation, but doing it cause user insists
          try:
              try:
                  out_dest = open(dest, 'wb')
                  in_src = open(src, 'rb')
                  shutil.copyfileobj(in_src, out_dest)
              finally: # assuring closed files in 2.4 compatible way
                  if out_dest:
                      out_dest.close()
                  if in_src:
                      in_src.close()
          except (shutil.Error, OSError, IOError):
              e = get_exception()
              self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e))

      else:
          self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def createGroup(self,groupname):
        groupname=self.validateName(groupname)
        dirnam = self.translate(groupname)
        self.lock.acquire()
        try:
            try:
                os.mkdir(dirnam)
                self._dosynccall("createGroup",groupname)
                Log.msg('NameServer','created group',groupname)
            except OSError,x:
                if x.errno in (errno.EEXIST, errno.EBUSY):
                    raise Pyro.errors.NamingError('group already exists',groupname)
                elif x.errno == errno.ENOENT:
                    raise Pyro.errors.NamingError('(parent)group not found')
                else:
                    raise Pyro.errors.NamingError(str(x))
        finally:
            self.lock.release()
项目:modern-paste    作者:LINKIWI    | 项目源码 | 文件源码
def test_scrub_inactive_pastes_error(self):
        pastes = [util.testing.PasteFactory.generate(expiry_time=None) for _ in range(15)]
        [util.testing.AttachmentFactory.generate(paste_id=paste.paste_id, file_name='file') for paste in pastes]
        [database.paste.deactivate_paste(paste.paste_id) for paste in pastes[:10]]

        with mock.patch.object(shutil, 'rmtree') as mock_rmtree:
            mock_rmtree.side_effect = OSError(errno.EBUSY)

            self.assertRaises(
                OSError,
                database.paste.scrub_inactive_pastes,
            )

        with mock.patch.object(shutil, 'rmtree') as mock_rmtree:
            mock_rmtree.side_effect = OSError(errno.ENOENT)

            for paste in pastes:
                self.assertIsNotNone(database.paste.get_paste_by_id(paste.paste_id))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_private_destroy_ebusy_timeout(self):
        # Tests that _destroy will retry 3 times to destroy the guest when an
        # EBUSY is raised, but eventually times out and raises the libvirtError
        ex = fakelibvirt.make_libvirtError(
                fakelibvirt.libvirtError,
                ("Failed to terminate process 26425 with SIGKILL: "
                 "Device or resource busy"),
                error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR,
                int1=errno.EBUSY)

        mock_guest = mock.Mock(libvirt_guest.Guest, id=1)
        mock_guest.poweroff = mock.Mock(side_effect=ex)

        instance = objects.Instance(**self.test_instance)
        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        with mock.patch.object(drvr._host, 'get_guest',
                               return_value=mock_guest):
            self.assertRaises(fakelibvirt.libvirtError, drvr._destroy,
                              instance)

        self.assertEqual(3, mock_guest.poweroff.call_count)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_private_destroy_ebusy_multiple_attempt_ok(self):
        # Tests that the _destroy attempt loop is broken when EBUSY is no
        # longer raised.
        ex = fakelibvirt.make_libvirtError(
                fakelibvirt.libvirtError,
                ("Failed to terminate process 26425 with SIGKILL: "
                 "Device or resource busy"),
                error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR,
                int1=errno.EBUSY)

        mock_guest = mock.Mock(libvirt_guest.Guest, id=1)
        mock_guest.poweroff = mock.Mock(side_effect=[ex, None])

        inst_info = hardware.InstanceInfo(power_state.SHUTDOWN, id=1)
        instance = objects.Instance(**self.test_instance)
        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        with mock.patch.object(drvr._host, 'get_guest',
                               return_value=mock_guest):
            with mock.patch.object(drvr, 'get_info', return_value=inst_info):
                drvr._destroy(instance)

        self.assertEqual(2, mock_guest.poweroff.call_count)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def cmd_destroy_shelf(self, cmdict):
        """ For a shelf, zombify books (mark for zeroing) and remove xattrs
            In (dict)---
                path
                node
            Out (dict) ---
                shelf data
        """
        self.errno = errno.EBUSY
        shelf = self.cmd_get_shelf(cmdict)
        assert not self.db.open_count(
            shelf), '%s has active opens' % shelf.name
        bos = self._list_shelf_books(shelf)
        xattrs = self.db.list_xattrs(shelf)
        for thisbos in bos:
            self.db.delete_bos(thisbos)
            _ = self._set_book_alloc(thisbos, TMBook.ALLOC_ZOMBIE)
        for xattr in xattrs:
            self.db.remove_xattr(shelf, xattr)
        rsp = self.db.delete_shelf(shelf, commit=True)

        return rsp
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def load(volpath):
    """
    Load and return dictionary from the sidecar
    """
    meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(),
                                                DVOL_KEY.encode())
    retry_count = 0
    vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath)
    while True:
        try:
            with open(meta_file, "r") as fh:
                kv_str = fh.read()
            break
        except IOError as open_error:
            # This is a workaround to the timing/locking with metadata files issue #626
            if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT:
                logging.warning("Meta file %s busy for load(), retrying...", meta_file)
                vmdk_utils.log_volume_lsof(vol_name)
                retry_count += 1
                time.sleep(vmdk_utils.VMDK_RETRY_SLEEP)
            else:
                logging.exception("Failed to access %s", meta_file)
                return None

    try:
        return json.loads(kv_str)
    except ValueError:
        logging.exception("load:Failed to decode meta-data for %s", volpath)
        return None
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def save(volpath, kv_dict, key=None, value=None):
    """
    Save the dictionary to side car.
    """
    meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(),
                                                DVOL_KEY.encode())
    kv_str = json.dumps(kv_dict)

    retry_count = 0
    vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath)
    while True:
        try:
            if key:
                with open(meta_file, "r") as rfh:
                    kv_val = rfh.read()
                    try:
                        kv_match = json.loads(kv_val)
                    except ValueError:
                        logging.exception("load:Failed to decode meta-data for %s", volpath)
                        return False

                    if key in kv_match and kv_match[key] != value:
                        return False
            with open(meta_file, "w") as fh:
                fh.write(align_str(kv_str, KV_ALIGN))
            break
        except IOError as open_error:
            # This is a workaround to the timing/locking with metadata files issue #626
            if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT:
                logging.warning("Meta file %s busy for save(), retrying...", meta_file)
                vmdk_utils.log_volume_lsof(vol_name)
                retry_count += 1
                time.sleep(vmdk_utils.VMDK_RETRY_SLEEP)
            else:
                logging.exception("Failed to save meta-data for %s", volpath)
                return False

    return True
项目:deb-python-fasteners    作者:openstack    | 项目源码 | 文件源码
def test_bad_acquire(self):
        lock_file = os.path.join(self.lock_dir, 'lock')
        lock = BrokenLock(lock_file, errno.EBUSY)
        self.assertRaises(threading.ThreadError, lock.acquire)
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def _initdb_1(self):
        # root is not a NamedTree but a directory
        try:
            os.mkdir(self.dbroot)
        except OSError,x:
            if x.errno not in (errno.EEXIST, errno.EBUSY):
                raise
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_fail_open(self, usb_context):
        device = self.Device(0x1000, 0x2000, 1, 2, [
            self.Settings([
                self.Endpoint(0x0004, 0x0002),
                self.Endpoint(0x0084, 0x0002),
            ])
        ])
        device.open = MagicMock()
        device.open.side_effect = [
            nfc.clf.transport.libusb.USBErrorAccess,
            nfc.clf.transport.libusb.USBErrorBusy,
            nfc.clf.transport.libusb.USBErrorNoDevice,
        ]
        usb_context.return_value.getDeviceList.return_value = [device]

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.EACCES

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.EBUSY

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.ENODEV
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def main(args):
    print("This is the %s version of nfcpy run in Python %s\non %s" %
          (nfc.__version__, platform.python_version(), platform.platform()))
    print("I'm now searching your system for contactless devices")

    found = 0
    for vid, pid, bus, dev in nfc.clf.transport.USB.find("usb"):
        if (vid, pid) in nfc.clf.device.usb_device_map:
            path = "usb:{0:03d}:{1:03d}".format(bus, dev)
            try:
                clf = nfc.ContactlessFrontend(path)
                print("** found %s" % clf.device)
                clf.close()
                found += 1
            except IOError as error:
                if error.errno == errno.EACCES:
                    usb_device_access_denied(bus, dev, vid, pid, path)
                elif error.errno == errno.EBUSY:
                    usb_device_found_is_busy(bus, dev, vid, pid, path)

    if args.search_tty:
        for dev in nfc.clf.transport.TTY.find("tty")[0]:
            path = "tty:{0}".format(dev[8:])
            try:
                clf = nfc.ContactlessFrontend(path)
                print("** found %s" % clf.device)
                clf.close()
                found += 1
            except IOError as error:
                if error.errno == errno.EACCES:
                    print("access denied for device with path %s" % path)
                elif error.errno == errno.EBUSY:
                    print("the device with path %s is busy" % path)
    else:
        print("I'm not trying serial devices because you haven't told me")
        print("-- add the option '--search-tty' to have me looking")
        print("-- but beware that this may break other serial devs")

    if not found:
        print("Sorry, but I couldn't find any contactless device")
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
项目:charm-ceph-osd    作者:openstack    | 项目源码 | 文件源码
def test_umount_ebusy(self):
        self.ctypes.util.find_library.return_value = 'libc.so.6'
        umount_mock = Mock()
        self.ctypes.CDLL.return_value = umount_mock
        umount_mock.umount.side_effect = umount_busy
        self.ctypes.get_errno.return_value = errno.EBUSY

        ret = ceph.umount('/some/osd/mount')
        umount_mock.assert_has_calls([
            call.umount('/some/osd/mount', 1),
            call.umount('/some/osd/mount', 2),
        ])
        assert ret == 0
项目:charm-ceph-mon    作者:openstack    | 项目源码 | 文件源码
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:Solfege    作者:RannyeriDev    | 项目源码 | 文件源码
def display_sound_init_error_message(self, e):

        if isinstance(e, soundcard.SoundInitException):
            solfege.win.display_error_message(
            """%s""" % str(e).decode(locale.getpreferredencoding(), 'replace'))

        elif isinstance(e, ImportError):
            solfege.win.display_error_message2(str(e), _("You should configure sound from the preferences window, and try to use an external midi player. Or try to recompile the program and check for error messages to see why the module is not built."))

        elif getattr(e, 'errno', None) == errno.EACCES:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "The errno EACCES indicates that you don't have write "
                "permission to the device."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))

        elif getattr(e, 'errno', None) == errno.EBUSY:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "It seems like some other program is using the device. You "
                "should try to quit that other program and restart Solfege."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))

        else:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "You should configure sound from the 'Sound' page of "
                "the preferences window.\n\n"
                "It is also possible that the OS sound setup is incorrect."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))
项目:charm-ceph    作者:openstack    | 项目源码 | 文件源码
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_flags(self):
                try:
                    os.sendfile(self.sockno, self.fileno, 0, 4096,
                                flags=os.SF_NODISKIO)
                except OSError as err:
                    if err.errno not in (errno.EBUSY, errno.EAGAIN):
                        raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_flags(self):
        try:
            os.sendfile(self.sockno, self.fileno, 0, 4096,
                        flags=os.SF_NODISKIO)
        except OSError as err:
            if err.errno not in (errno.EBUSY, errno.EAGAIN):
                raise
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def initiate_sendfile(self):
        """A wrapper around sendfile."""
        try:
            sent = sendfile(self._fileno, self._filefd, self._offset,
                            self.ac_out_buffer_size)
        except OSError as err:
            if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY:
                return
            elif err.errno in _ERRNOS_DISCONNECTED:
                self.handle_close()
            else:
                if self.tot_bytes_sent == 0:
                    logger.warning(
                        "sendfile() failed; falling back on using plain send")
                    raise _GiveUpOnSendfile
                else:
                    raise
        else:
            if sent == 0:
                # this signals the channel that the transfer is completed
                self.discard_buffers()
                self.handle_close()
            else:
                self._offset += sent
                self.tot_bytes_sent += sent

    # --- utility methods
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_flags(self):
        try:
            os.sendfile(self.sockno, self.fileno, 0, 4096,
                        flags=os.SF_NODISKIO)
        except OSError as err:
            if err.errno not in (errno.EBUSY, errno.EAGAIN):
                raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_write_config_errors_if_unexpected_exception(self):
        dnsconfig = DNSConfig()
        exception = IOError(errno.EBUSY, factory.make_string())
        self.patch(config, 'atomic_write', Mock(side_effect=exception))
        self.assertRaises(IOError, dnsconfig.write_config)
项目:intellij-micropython    作者:vlasovskikh    | 项目源码 | 文件源码
def connect_miniterm(port):
    try:
        ser = serial.Serial(port, BAUDRATE, parity=PARITY, rtscts=False,
                            xonxoff=False)
        return Miniterm(ser, echo=False)
    except serial.SerialException as e:
        if e.errno == errno.ENOENT:
            sys.stderr.write(
                "Device %r not found. Check your "
                "MicroPython device path settings.\n" % port)
        elif e.errno == errno.EBUSY:
            # Device is busy. Explain what to do.
            sys.stderr.write(
                "Found the device, but it is busy. "
                "Wait up to 20 seconds, or "
                "press the reset button on the "
                "back of the device next to the yellow light; "
                "then try again.\n"
            )
        elif e.errno == errno.EACCES:
            sys.stderr.write("Found the device, but could not connect.".format(port))
            sys.stderr.write(e)
            sys.stderr.write('On linux, try adding yourself to the "dialout" group')
            sys.stderr.write('sudo usermod -a -G dialout <your-username>')
        else:
            # Try to be as helpful as possible.
            sys.stderr.write("Found the device, but could not connect via" +
                             " port %r: %s\n" % (port, e))
            sys.stderr.write("I'm not sure what to suggest. :-(\n")
        input("Press ENTER to continue")
        sys.exit(1)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def open(self, usb_bus, dev_adr):
        self.usb_dev = None
        self.usb_out = None
        self.usb_inp = None

        for dev in self.context.getDeviceList(skip_on_error=True):
            if ((dev.getBusNumber() == usb_bus and
                 dev.getDeviceAddress() == dev_adr)):
                break
        else:
            log.error("no device {0} on bus {1}".format(dev_adr, usb_bus))
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            first_setting = dev.iterSettings().next()
        except StopIteration:
            log.error("no usb configuration settings, please replug device")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        def transfer_type(x):
            return x & libusb.TRANSFER_TYPE_MASK

        def endpoint_dir(x):
            return x & libusb.ENDPOINT_DIR_MASK

        for endpoint in first_setting.iterEndpoints():
            ep_addr = endpoint.getAddress()
            ep_attr = endpoint.getAttributes()
            if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK:
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN:
                    if not self.usb_inp:
                        self.usb_inp = endpoint
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT:
                    if not self.usb_out:
                        self.usb_out = endpoint

        if not (self.usb_inp and self.usb_out):
            log.error("no bulk endpoints for read and write")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            # workaround the PN533's buggy USB implementation
            self._manufacturer_name = dev.getManufacturer()
            self._product_name = dev.getProduct()
        except libusb.USBErrorIO:
            self._manufacturer_name = None
            self._product_name = None

        try:
            self.usb_dev = dev.open()
            self.usb_dev.claimInterface(0)
        except libusb.USBErrorAccess:
            raise IOError(errno.EACCES, os.strerror(errno.EACCES))
        except libusb.USBErrorBusy:
            raise IOError(errno.EBUSY, os.strerror(errno.EBUSY))
        except libusb.USBErrorNoDevice:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:mock    作者:rpm-software-management    | 项目源码 | 文件源码
def rmtree(path, selinux=False, exclude=()):
    """Version of shutil.rmtree that ignores no-such-file-or-directory errors,
       tries harder if it finds immutable files and supports excluding paths"""
    if os.path.islink(path):
        raise OSError("Cannot call rmtree on a symbolic link")
    try_again = True
    retries = 0
    failed_to_handle = False
    failed_filename = None
    if path in exclude:
        return
    while try_again:
        try_again = False
        try:
            names = os.listdir(path)
            for name in names:
                fullname = os.path.join(path, name)
                if fullname not in exclude:
                    try:
                        mode = os.lstat(fullname).st_mode
                    except OSError:
                        mode = 0
                    if stat.S_ISDIR(mode):
                        try:
                            rmtree(fullname, selinux=selinux, exclude=exclude)
                        except OSError as e:
                            if e.errno in (errno.EPERM, errno.EACCES, errno.EBUSY):
                                # we alrady tried handling this on lower level and failed,
                                # there's no point in trying again now
                                failed_to_handle = True
                            raise
                    else:
                        os.remove(fullname)
            os.rmdir(path)
        except OSError as e:
            if failed_to_handle:
                raise
            if e.errno == errno.ENOENT:  # no such file or directory
                pass
            elif exclude and e.errno == errno.ENOTEMPTY:  # there's something excluded left
                pass
            elif selinux and (e.errno == errno.EPERM or e.errno == errno.EACCES):
                try_again = True
                if failed_filename == e.filename:
                    raise
                failed_filename = e.filename
                os.system("chattr -R -i %s" % path)
            elif e.errno == errno.EBUSY:
                retries += 1
                if retries > 1:
                    raise
                try_again = True
                getLog().debug("retrying failed tree remove after sleeping a bit")
                time.sleep(2)
            else:
                raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def set_memory_hardlimit(cgrp, limit):
    """Set the cgroup hard-limits to the desired value.

    The logic here is complicated since the ordering of operations depends on
    weither we are lowering or raising the value.
    """
    def _lower_memory_hardlimit(cgrp, limit):
        """Lower the cgroup memory hardlimit."""
        cgroups.set_value('memory', cgrp,
                          'memory.limit_in_bytes', limit)
        cgroups.set_value('memory', cgrp,
                          'memory.memsw.limit_in_bytes', limit)

    def _raise_memory_hardlimit(cgrp, limit):
        """Raise the cgroup memory hardlimit."""
        cgroups.set_value('memory', cgrp,
                          'memory.memsw.limit_in_bytes', limit)
        cgroups.set_value('memory', cgrp,
                          'memory.limit_in_bytes', limit)

    memory_hardlimit_funs = [_lower_memory_hardlimit, _raise_memory_hardlimit]
    while memory_hardlimit_funs:
        try:
            memory_hardlimit_fun = memory_hardlimit_funs.pop(0)
            memory_hardlimit_fun(cgrp, limit)
            break

        except IOError as err:
            if err.errno == errno.EBUSY:
                # Resource busy, this means the cgroup is already using more
                # then the limit we are trying to set.
                raise TreadmillCgroupError('Unable to set hard limit to %d. '
                                           'Cgroup %r memory over limit.' %
                                           (limit, cgrp))

            elif err.errno == errno.EINVAL:
                # This means we did the hard limit operation in the wrong
                # order. If we have a different ordering to try, go ahead.
                if memory_hardlimit_funs:
                    continue

            # For any other case, raise the exception
            raise
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except OSError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except OSError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def remove(self, pkgplan):
                path = self.get_installed_path(pkgplan.image.get_root())
                try:
                        os.rmdir(path)
                except OSError as e:
                        if e.errno == errno.ENOENT:
                                pass
                        elif e.errno in (errno.EEXIST, errno.ENOTEMPTY):
                                # Cannot remove directory since it's
                                # not empty.
                                pkgplan.salvage(path)
                        elif e.errno == errno.ENOTDIR:
                                # Either the user or another package has changed
                                # this directory into a link or file.  Salvage
                                # what's there and drive on.
                                pkgplan.salvage(path)
                        elif e.errno == errno.EBUSY and os.path.ismount(path):
                                # User has replaced directory with mountpoint,
                                # or a package has been poorly implemented.
                                if not self.attrs.get("implicit"):
                                        err_txt = _("Unable to remove {0}; it is "
                                            "in use as a mountpoint. To "
                                            "continue, please unmount the "
                                            "filesystem at the target "
                                            "location and try again.").format(
                                            path)
                                        raise apx.ActionExecutionError(self,
                                            details=err_txt, error=e,
                                            fmri=pkgplan.origin_fmri) 
                        elif e.errno == errno.EBUSY:
                                # os.path.ismount() is broken for lofs
                                # filesystems, so give a more generic
                                # error.
                                if not self.attrs.get("implicit"):
                                        err_txt = _("Unable to remove {0}; it "
                                            "is in use by the system, another "
                                            "process, or as a "
                                            "mountpoint.").format(path)
                                        raise apx.ActionExecutionError(self,
                                            details=err_txt, error=e,
                                            fmri=pkgplan.origin_fmri)
                        elif e.errno != errno.EACCES: # this happens on Windows
                                raise