Python errno 模块,EOPNOTSUPP 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.EOPNOTSUPP

项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_listen(self, llc, ldl, dlc, bind):
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.listen(object(), 0)
            assert excinfo.value.errno == errno.ENOTSOCK
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.listen(ldl, 0)
            assert excinfo.value.errno == errno.EOPNOTSUPP
            with pytest.raises(TypeError) as excinfo:
                llc.listen(dlc, 0.1)
            assert str(excinfo.value) == "backlog must be int type"
            with pytest.raises(ValueError) as excinfo:
                llc.listen(dlc, -1)
            assert str(excinfo.value) == "backlog can not be negative"
            if bind:
                llc.bind(dlc)
            llc.listen(dlc, 0)
            assert dlc.state.LISTEN is True
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_accept_connect(self, llc, ldl, dlc, peer_miu, send_miu):
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(object())
            assert excinfo.value.errno == errno.ENOTSOCK
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(ldl)
            assert excinfo.value.errno == errno.EOPNOTSUPP
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(dlc)
            assert excinfo.value.errno == errno.EINVAL
            connect_pdu = nfc.llcp.pdu.Connect(4, 32, peer_miu)
            threading.Timer(0.01, llc.dispatch, (connect_pdu,)).start()
            llc.bind(dlc, b'urn:nfc:sn:snep')
            llc.listen(dlc, 0)
            sock = llc.accept(dlc)
            assert isinstance(sock, nfc.llcp.tco.DataLinkConnection)
            assert llc.getsockopt(sock, nfc.llcp.SO_SNDMIU) == send_miu
            assert llc.getpeername(sock) == 32
            assert llc.getsockname(sock) == 4
项目:enigma2    作者:Openeight    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        self.write_file(file1, 'xxx')
        self.write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        try:
            chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except IOError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        self.write_file(file1, 'xxx')
        self.write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        try:
            chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except IOError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        write_file(file1, 'xxx')
        write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags, *, follow_symlinks=True):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        flags = st.st_flags | stat.UF_IMMUTABLE
        try:
            chflags_func(target_file, flags, **kwargs)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except IOError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:enigma2    作者:BlackHole    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "[AdapterSetupConfiguration] error: ",error_no,error_str
                    return True
            else:
                return True
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        self.write_file(file1, 'xxx')
        self.write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        try:
            chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except IOError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        write_file(file1, 'xxx')
        write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags, *, follow_symlinks=True):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        flags = st.st_flags | stat.UF_IMMUTABLE
        try:
            chflags_func(target_file, flags, **kwargs)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except OSError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        self.write_file(file1, 'xxx')
        self.write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        try:
            chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except IOError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:rdiff-backup    作者:sol1    | 项目源码 | 文件源码
def set_hardlinks(self, testdir):
        """Set self.hardlinks to true iff hard linked files can be made"""
        hl_source = testdir.append("hardlinked_file1")
        hl_dir = testdir.append("hl")
        hl_dir.mkdir()
        hl_dest = hl_dir.append("hardlinked_file2")
        hl_source.touch()
        try:
            hl_dest.hardlink(hl_source.path)
            if hl_source.getinode() != hl_dest.getinode():
                raise IOError(errno.EOPNOTSUPP, "Hard links don't compare")
        except (IOError, OSError, AttributeError):
            if Globals.preserve_hardlinks != 0:
                log.Log("Warning: hard linking not supported by filesystem "
                        "at %s" % (self.root_rp.path,), 3)
            self.hardlinks = None
        else: self.hardlinks = 1
项目:rdiff-backup    作者:sol1    | 项目源码 | 文件源码
def clear_rp(self, rp):
        """Delete all the extended attributes in rpath"""
        try:
            for name in rp.conn.xattr.listxattr(encode(rp.path),
                                                rp.issym()):
                try:
                    rp.conn.xattr.removexattr(encode(rp.path),
                                            name, rp.issym())
                except IOError, exc:
                    # SELinux attributes cannot be removed, and we don't want
                    # to bail out or be too noisy at low log levels.
                    if exc[0] == errno.EACCES:
                        log.Log("Warning: unable to remove xattr %s from %s"
                            % (name, repr(rp.path)), 7)
                        continue
                    else: raise
        except IOError, exc:
            if exc[0] == errno.EOPNOTSUPP or exc[0] == errno.EPERM:
                return # if not supported, consider empty
            elif exc[0] == errno.ENOENT: # path is bad
                log.Log("Warning: unable to clear xattrs on %s: %s" %
                        (repr(rp.path), exc), 3)
                return
            else: raise
项目:rdiff-backup    作者:sol1    | 项目源码 | 文件源码
def write_to_rp(self, rp):
        """Write extended attributes to rpath rp"""
        self.clear_rp(rp)
        for (name, value) in self.attr_dict.iteritems():
            try:
                rp.conn.xattr.setxattr(encode(rp.path), name,
                                        value, 0, rp.issym())
            except IOError, exc:
                # Mac and Linux attributes have different namespaces, so
                # fail gracefully if can't call setxattr
                if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.EACCES,
                        errno.ENOENT, errno.EINVAL):
                    log.Log("Warning: unable to write xattr %s to %s"
                            % (name, repr(rp.path)), 6)
                    continue
                else: raise
项目:rdiff-backup    作者:sol1    | 项目源码 | 文件源码
def set_rp_acl(rp, entry_list = None, default_entry_list = None,
               map_names = 1):
    """Set given rp with ACL that acl_text defines.  rp should be local"""
    assert rp.conn is Globals.local_connection
    if entry_list: acl = list_to_acl(entry_list, map_names)
    else: acl = posix1e.ACL()

    try:
        acl.applyto(encode(rp.path))
    except IOError, exc:
        if exc[0] == errno.EOPNOTSUPP:
            log.Log("Warning: unable to set ACL on %s: %s" % 
                    (repr(rp.path), exc), 4)
            return
        else: raise

    if rp.isdir():
        if default_entry_list:
            def_acl = list_to_acl(default_entry_list, map_names)
        else: def_acl = posix1e.ACL()
        def_acl.applyto(encode(rp.path), posix1e.ACL_TYPE_DEFAULT)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_copystat_handles_harmless_chflags_errors(self):
        tmpdir = self.mkdtemp()
        file1 = os.path.join(tmpdir, 'file1')
        file2 = os.path.join(tmpdir, 'file2')
        write_file(file1, 'xxx')
        write_file(file2, 'xxx')

        def make_chflags_raiser(err):
            ex = OSError()

            def _chflags_raiser(path, flags, *, follow_symlinks=True):
                ex.errno = err
                raise ex
            return _chflags_raiser
        old_chflags = os.chflags
        try:
            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
                os.chflags = make_chflags_raiser(err)
                shutil.copystat(file1, file2)
            # assert others errors break it
            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
            self.assertRaises(OSError, shutil.copystat, file1, file2)
        finally:
            os.chflags = old_chflags
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
        st = os.stat(target_file)
        self.assertTrue(hasattr(st, 'st_flags'))

        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
        flags = st.st_flags | stat.UF_IMMUTABLE
        try:
            chflags_func(target_file, flags, **kwargs)
        except OSError as err:
            if err.errno != errno.EOPNOTSUPP:
                raise
            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
            self.skipTest(msg)

        try:
            new_st = os.stat(target_file)
            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
            try:
                fd = open(target_file, 'w+')
            except OSError as e:
                self.assertEqual(e.errno, errno.EPERM)
        finally:
            posix.chflags(target_file, st.st_flags)
项目:rbd2qcow2    作者:socketpair    | 项目源码 | 文件源码
def do_fallocate(filename: str, size: int):
    log.debug('Fallocating %s for size %d.', filename, size)
    fd = os.open(filename, os.O_RDWR)
    try:
        # os.posix_fallocate:
        # 1. does not support FALLOC_FL_KEEP_SIZE
        # 2. has bug: http://bugs.python.org/issue31106
        # 3. will emulate instead of raising of error if operation is not
        #    supported (see libc docs on posix_fallocate)
        fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, size)
    except OSError as err:
        if err.errno != errno.EOPNOTSUPP:
            raise
        log.warning('Fallocate is not supported on your FS. Skipped.')
    finally:
        os.close(fd)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError, why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def bind(self, *pos, **kw):
        """
        Implements proxy connection for UDP sockets,
        which happens during the bind() phase.
        """
        proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy
        if not proxy_type or self.type != socket.SOCK_DGRAM:
            return _orig_socket.bind(self, *pos, **kw)

        if self._proxyconn:
            raise socket.error(EINVAL, "Socket already bound to an address")
        if proxy_type != SOCKS5:
            msg = "UDP only supported by SOCKS5 proxy type"
            raise socket.error(EOPNOTSUPP, msg)
        super(socksocket, self).bind(*pos, **kw)

        # Need to specify actual local port because
        # some relays drop packets if a port of zero is specified.
        # Avoid specifying host address in case of NAT though.
        _, port = self.getsockname()
        dst = ("0", port)

        self._proxyconn = _orig_socket()
        proxy = self._proxy_addr()
        self._proxyconn.connect(proxy)

        UDP_ASSOCIATE = b"\x03"
        _, relay = self._SOCKS5_request(self._proxyconn, UDP_ASSOCIATE, dst)

        # The relay is most likely on the same host as the SOCKS proxy,
        # but some proxies return a private IP address (10.x.y.z)
        host, _ = proxy
        _, port = relay
        super(socksocket, self).connect((host, port))
        super(socksocket, self).settimeout(self._timeout)
        self.proxy_sockname = ("0.0.0.0", 0)  # Unknown
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def bind(self, *pos, **kw):
        """
        Implements proxy connection for UDP sockets,
        which happens during the bind() phase.
        """
        proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy
        if not proxy_type or self.type != socket.SOCK_DGRAM:
            return _orig_socket.bind(self, *pos, **kw)

        if self._proxyconn:
            raise socket.error(EINVAL, "Socket already bound to an address")
        if proxy_type != SOCKS5:
            msg = "UDP only supported by SOCKS5 proxy type"
            raise socket.error(EOPNOTSUPP, msg)
        _BaseSocket.bind(self, *pos, **kw)

        # Need to specify actual local port because
        # some relays drop packets if a port of zero is specified.
        # Avoid specifying host address in case of NAT though.
        _, port = self.getsockname()
        dst = ("0", port)

        self._proxyconn = _orig_socket()
        proxy = self._proxy_addr()
        self._proxyconn.connect(proxy)

        u_d_p__a_s_s_o_c_i_a_t_e = b"\x03"
        _, relay = self._socks5_request(self._proxyconn, u_d_p__a_s_s_o_c_i_a_t_e, dst)

        # The relay is most likely on the same host as the SOCKS proxy,
        # but some proxies return a private IP address (10.x.y.z)
        host, _ = proxy
        _, port = relay
        _BaseSocket.connect(self, (host, port))
        self.proxy_sockname = ("0.0.0.0", 0)  # Unknown
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def bind(self, *pos, **kw):
        """
        Implements proxy connection for UDP sockets,
        which happens during the bind() phase.
        """
        proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy
        if not proxy_type or self.type != socket.SOCK_DGRAM:
            return _orig_socket.bind(self, *pos, **kw)

        if self._proxyconn:
            raise socket.error(EINVAL, "Socket already bound to an address")
        if proxy_type != SOCKS5:
            msg = "UDP only supported by SOCKS5 proxy type"
            raise socket.error(EOPNOTSUPP, msg)
        _BaseSocket.bind(self, *pos, **kw)

        # Need to specify actual local port because
        # some relays drop packets if a port of zero is specified.
        # Avoid specifying host address in case of NAT though.
        _, port = self.getsockname()
        dst = ("0", port)

        self._proxyconn = _orig_socket()
        proxy = self._proxy_addr()
        self._proxyconn.connect(proxy)

        UDP_ASSOCIATE = b"\x03"
        _, relay = self._SOCKS5_request(self._proxyconn, UDP_ASSOCIATE, dst)

        # The relay is most likely on the same host as the SOCKS proxy,
        # but some proxies return a private IP address (10.x.y.z)
        host, _ = proxy
        _, port = relay
        _BaseSocket.connect(self, (host, port))
        self.proxy_sockname = ("0.0.0.0", 0)  # Unknown
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def bind(self, *pos, **kw):
        """
        Implements proxy connection for UDP sockets,
        which happens during the bind() phase.
        """
        proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy
        if not proxy_type or self.type != socket.SOCK_DGRAM:
            return _orig_socket.bind(self, *pos, **kw)

        if self._proxyconn:
            raise socket.error(EINVAL, "Socket already bound to an address")
        if proxy_type != SOCKS5:
            msg = "UDP only supported by SOCKS5 proxy type"
            raise socket.error(EOPNOTSUPP, msg)
        _BaseSocket.bind(self, *pos, **kw)

        # Need to specify actual local port because
        # some relays drop packets if a port of zero is specified.
        # Avoid specifying host address in case of NAT though.
        _, port = self.getsockname()
        dst = ("0", port)

        self._proxyconn = _orig_socket()
        proxy = self._proxy_addr()
        self._proxyconn.connect(proxy)

        UDP_ASSOCIATE = b"\x03"
        _, relay = self._SOCKS5_request(self._proxyconn, UDP_ASSOCIATE, dst)

        # The relay is most likely on the same host as the SOCKS proxy,
        # but some proxies return a private IP address (10.x.y.z)
        host, _ = proxy
        _, port = relay
        _BaseSocket.connect(self, (host, port))
        self.proxy_sockname = ("0.0.0.0", 0)  # Unknown
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def copystat(src, dst):
    """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
    st = os.stat(src)
    mode = stat.S_IMODE(st.st_mode)
    if hasattr(os, 'utime'):
        os.utime(dst, (st.st_atime, st.st_mtime))
    if hasattr(os, 'chmod'):
        os.chmod(dst, mode)
    if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
        try:
            os.chflags(dst, st.st_flags)
        except OSError as why:
            if (not hasattr(errno, 'EOPNOTSUPP') or
                why.errno != errno.EOPNOTSUPP):
                raise
项目:Tinychat-Bot-Minimal--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def bind(self, *pos, **kw):
        """
        Implements proxy connection for UDP sockets,
        which happens during the bind() phase.
        """
        proxy_type, proxy_addr, proxy_port, rdns, username, password = self.proxy
        if not proxy_type or self.type != socket.SOCK_DGRAM:
            return _orig_socket.bind(self, *pos, **kw)

        if self._proxyconn:
            raise socket.error(EINVAL, "Socket already bound to an address")
        if proxy_type != SOCKS5:
            msg = "UDP only supported by SOCKS5 proxy type"
            raise socket.error(EOPNOTSUPP, msg)
        _BaseSocket.bind(self, *pos, **kw)

        # Need to specify actual local port because
        # some relays drop packets if a port of zero is specified.
        # Avoid specifying host address in case of NAT though.
        _, port = self.getsockname()
        dst = ("0", port)

        self._proxyconn = _orig_socket()
        proxy = self._proxy_addr()
        self._proxyconn.connect(proxy)

        u_d_p__a_s_s_o_c_i_a_t_e = b"\x03"
        _, relay = self._socks5_request(self._proxyconn, u_d_p__a_s_s_o_c_i_a_t_e, dst)

        # The relay is most likely on the same host as the SOCKS proxy,
        # but some proxies return a private IP address (10.x.y.z)
        host, _ = proxy
        _, port = relay
        _BaseSocket.connect(self, (host, port))
        self.proxy_sockname = ("0.0.0.0", 0)  # Unknown