Python resource 模块,error() 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用resource.error()

项目:selectors2    作者:SethMichaelLarson    | 项目源码 | 文件源码
def test_selector_raises_timeout_error_on_interrupt_over_time(self):
        selectors2._DEFAULT_SELECTOR = None

        mock_socket = mock.Mock()
        mock_socket.fileno.return_value = 1

        def slow_interrupting_select(*args, **kwargs):
            time.sleep(0.2)
            error = OSError()
            error.errno = errno.EINTR
            raise error

        patch_select_module(self, select=slow_interrupting_select)

        selector = self.make_selector()
        selector.register(mock_socket, selectors2.EVENT_READ)

        try:
            selector.select(timeout=0.1)
        except OSError as e:
            self.assertEqual(e.errno, errno.ETIMEDOUT)
        else:
            self.fail('Didn\'t raise an OSError')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = mkstemp()
        ofhandle, ofname = mkstemp()
        efhandle, efname = mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print("this tests triggers the Crash Reporter, "
                      "that is intentional", end='')
                sys.stdout.flush()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_pipe_cloexec(self):
        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        p1 = subprocess.Popen([sys.executable, sleeper],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE, close_fds=False)

        self.addCleanup(p1.communicate, b'')

        p2 = subprocess.Popen([sys.executable, fd_status],
                              stdout=subprocess.PIPE, close_fds=False)

        output, error = p2.communicate()
        result_fds = set(map(int, output.split(b',')))
        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
                            p1.stderr.fileno()])

        self.assertFalse(result_fds & unwanted_fds,
                         "Expected no fds from %r to be open in child, "
                         "found %r" %
                              (unwanted_fds, result_fds & unwanted_fds))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = tempfile.mkstemp()
        ofhandle, ofname = tempfile.mkstemp()
        efhandle, efname = tempfile.mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print "this tests triggers the Crash Reporter, that is intentional"
                sys.stdout.flush()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = tempfile.mkstemp()
        ofhandle, ofname = tempfile.mkstemp()
        efhandle, efname = tempfile.mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print "this tests triggers the Crash Reporter, that is intentional"
                sys.stdout.flush()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = mkstemp()
        ofhandle, ofname = mkstemp()
        efhandle, efname = mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print("this tests triggers the Crash Reporter, "
                      "that is intentional", end='')
                sys.stdout.flush()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pipe_cloexec(self):
        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        p1 = subprocess.Popen([sys.executable, sleeper],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE, close_fds=False)

        self.addCleanup(p1.communicate, b'')

        p2 = subprocess.Popen([sys.executable, fd_status],
                              stdout=subprocess.PIPE, close_fds=False)

        output, error = p2.communicate()
        result_fds = set(map(int, output.split(b',')))
        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
                            p1.stderr.fileno()])

        self.assertFalse(result_fds & unwanted_fds,
                         "Expected no fds from %r to be open in child, "
                         "found %r" %
                              (unwanted_fds, result_fds & unwanted_fds))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = mkstemp()
        ofhandle, ofname = mkstemp()
        efhandle, efname = mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print "this tests triggers the Crash Reporter, that is intentional"
                sys.stdout.flush()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = mkstemp()
        ofhandle, ofname = mkstemp()
        efhandle, efname = mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        if resource is not None:
            try:
                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
            except (ValueError, resource.error):
                pass

        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print "this tests triggers the Crash Reporter, that is intentional"
                sys.stdout.flush()
项目:deb-kazoo    作者:openstack    | 项目源码 | 文件源码
def test_huge_file_descriptor(self):
        from kazoo.handlers.threading import _HAS_EPOLL
        if not _HAS_EPOLL:
            self.skipTest('only run on systems with epoll()')
        import resource
        import socket
        from kazoo.handlers.utils import create_tcp_socket
        try:
            resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
        except (ValueError, resource.error):
            self.skipTest('couldnt raise fd limit high enough')
        fd = 0
        socks = []
        while fd < 4000:
            sock = create_tcp_socket(socket)
            fd = sock.fileno()
            socks.append(sock)
        h = self._makeOne()
        h.start()
        h.select(socks, [], [])
        with self.assertRaises(ValueError):
            h._select(socks, [], [])
        h._epoll_select(socks, [], [])
        h.stop()
项目:selectors2    作者:SethMichaelLarson    | 项目源码 | 文件源码
def test_above_fd_setsize(self):
        # A scalable implementation should have no problem with more than
        # FD_SETSIZE file descriptors. Since we don't know the value, we just
        # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
        if hard == resource.RLIM_INFINITY:
            self.skipTest("RLIMIT_NOFILE is infinite")

        try:  # If we're on a *BSD system, the limit tag is different.
            _, bsd_hard = resource.getrlimit(resource.RLIMIT_OFILE)
            if bsd_hard == resource.RLIM_INFINITY:
                self.skipTest("RLIMIT_OFILE is infinite")
            if bsd_hard < hard:
                hard = bsd_hard

        # NOTE: AttributeError resource.RLIMIT_OFILE is not defined on Mac OS.
        except (OSError, resource.error, AttributeError):
            pass

        try:
            resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
            self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
                            (soft, hard))
            limit_nofile = min(hard, 2 ** 16)
        except (OSError, ValueError):
            limit_nofile = soft

        # Guard against already allocated FDs
        limit_nofile -= 256
        limit_nofile = max(0, limit_nofile)

        s = self.make_selector()

        for i in range(limit_nofile // 2):
            rd, wr = self.make_socketpair()
            s.register(rd, selectors2.EVENT_READ)
            s.register(wr, selectors2.EVENT_WRITE)

        self.assertEqual(limit_nofile // 2, len(s.select()))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_start_new_session(self):
        # For code coverage of calling setsid().  We don't care if we get an
        # EPERM error from it depending on the test execution environment, that
        # still indicates that it was called.
        try:
            output = subprocess.check_output(
                    [sys.executable, "-c",
                     "import os; print(os.getpgid(os.getpid()))"],
                    start_new_session=True)
        except OSError as e:
            if e.errno != errno.EPERM:
                raise
        else:
            parent_pgid = os.getpgid(os.getpid())
            child_pgid = int(output)
            self.assertNotEqual(parent_pgid, child_pgid)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = support.findfile("sigchild_ignore.py",
                                           subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" %
                         stderr.decode('utf8'))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise RuntimeError("force the _execute_child() errpipe_data path.")

        with self.assertRaises(RuntimeError):
            self._TestExecuteChildPopen(
                    self, [sys.executable, "-c", "pass"],
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
                                                subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" % stderr)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise RuntimeError("force the _execute_child() errpipe_data path.")

        with self.assertRaises(RuntimeError):
            self._TestExecuteChildPopen(
                    self, [sys.executable, "-c", "pass"],
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
                                                subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" % stderr)
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def raise_limits():
    import resource
    try:
        _, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
        info("Current limits, soft and hard : {} {}".format(_, hard))
        resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
    except ValueError:
        error("Exceeds limit {}, infinity is {}".format(hard, resource.RLIM_INFINITY))
    except resource.error:
        return False
    except OSError as e:
        critical('You may need to check ulimit parameter: {}'.format(e))
        raise e
    return True
项目:PiLL    作者:lofar-astron    | 项目源码 | 文件源码
def create_pipeline_config(working_directory):

    try:
        default_config      = os.environ['LOFARROOT'] + '/share/pipeline/pipeline.cfg'
        default_runtime     = os.popen('grep runtime_directory  ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','')
        default_working     = os.popen('grep working_directory  ' + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','')
        default_clusterdesc = os.popen('grep clusterdesc '        + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','')
        default_logfile     = os.popen('grep log_file '           + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','')
        default_xml         = os.popen('grep xml_stat_file '      + default_config + ' | cut -f2- -d"="').readlines()[0].rstrip('\n').replace(' ','')
        pipeline_cfg        = working_directory + '/pipeline.cfg'
        with open(pipeline_cfg, 'w') as outfile:
            with open(default_config, 'r') as infile:
                for line in infile:
                    outfile.write(line.replace(default_runtime, working_directory)\
                              .replace(default_working, '%(runtime_directory)s')\
                              .replace(default_clusterdesc, '%(working_directory)s/pipeline.clusterdesc')\
                              .replace(default_logfile, '%(runtime_directory)s/%(job_name)s/logs/%(start_time)s/pipeline.log')\
                              .replace(default_xml, '%(runtime_directory)s/%(job_name)s/logs/%(start_time)s/statistics.xml'))
                    pass

        try:
            max_per_node    = os.popen('nproc').readlines()[0].rstrip('\n')
            os.system('echo >> '                                     + pipeline_cfg)
            os.system('echo [remote] >> '                            + pipeline_cfg)
            os.system('echo method = local >> '                      + pipeline_cfg)
            os.system('echo max_per_node = ' + max_per_node + ' >> ' + pipeline_cfg)
            pass
        except IndexError:
            logging.error('The number of available CPUs could not be determined. Please check your installation of nproc.')
            sys.exit(1)
            pass

    except IOError or IndexError:
        logging.error('LOFAR pipeline configuration not found. Please check your installation.')
        sys.exit(1)
        pass

    infile.close()
    outfile.close()

    pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_exception_cwd(self):
        """Test error in the child raised in the parent for a bad cwd."""
        desired_exception = self._get_chdir_exception()
        try:
            p = subprocess.Popen([sys.executable, "-c", ""],
                                 cwd=self._nonexistent_dir)
        except OSError as e:
            # Test that the child process chdir failure actually makes
            # it up to the parent process as the correct exception.
            self.assertEqual(desired_exception.errno, e.errno)
            self.assertEqual(desired_exception.strerror, e.strerror)
        else:
            self.fail("Expected OSError: %s" % desired_exception)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_exception_bad_executable(self):
        """Test error in the child raised in the parent for a bad executable."""
        desired_exception = self._get_chdir_exception()
        try:
            p = subprocess.Popen([sys.executable, "-c", ""],
                                 executable=self._nonexistent_dir)
        except OSError as e:
            # Test that the child process exec failure actually makes
            # it up to the parent process as the correct exception.
            self.assertEqual(desired_exception.errno, e.errno)
            self.assertEqual(desired_exception.strerror, e.strerror)
        else:
            self.fail("Expected OSError: %s" % desired_exception)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise RuntimeError("force the _execute_child() errpipe_data path.")

        with self.assertRaises(RuntimeError):
            self._TestExecuteChildPopen(
                        self, [sys.executable, "-c", "pass"],
                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = support.findfile("sigchild_ignore.py",
                                           subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" %
                         stderr.decode('utf-8'))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise RuntimeError("force the _execute_child() errpipe_data path.")

        with self.assertRaises(RuntimeError):
            self._TestExecuteChildPopen(
                    self, [sys.executable, "-c", "pass"],
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
                                                subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" % stderr)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        if resource is not None:
            try:
                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
            except (ValueError, resource.error):
                pass
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_preexec_errpipe_does_not_double_close_pipes(self):
        """Issue16140: Don't double close pipes on preexec error."""

        def raise_it():
            raise RuntimeError("force the _execute_child() errpipe_data path.")

        with self.assertRaises(RuntimeError):
            self._TestExecuteChildPopen(
                    self, [sys.executable, "-c", "pass"],
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE, preexec_fn=raise_it)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
                                                subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" % stderr)
项目:selectors2    作者:SethMichaelLarson    | 项目源码 | 文件源码
def test_timeout_is_recalculated_after_interrupt(self):
        selectors2._DEFAULT_SELECTOR = None

        mock_socket = mock.Mock()
        mock_socket.fileno.return_value = 1

        class InterruptingSelect(object):
            """ Helper object that imitates a select that interrupts
            after sleeping some time then returns a result. """
            def __init__(self):
                self.call_count = 0
                self.calls = []

            def select(self, *args, **kwargs):
                self.calls.append((args, kwargs))
                self.call_count += 1
                if self.call_count == 1:
                    time.sleep(0.1)
                    error = OSError()
                    error.errno = errno.EINTR
                    raise error
                else:
                    return [1], [], []

        mock_select = InterruptingSelect()

        patch_select_module(self, select=mock_select.select)

        selector = self.make_selector()
        selector.register(mock_socket, selectors2.EVENT_READ)

        result = selector.select(timeout=1.0)

        # Make sure the mocked call actually completed correctly.
        self.assertEqual(len(result), 1)
        self.assertEqual(result[0][0].fileobj, mock_socket)
        self.assertEqual(result[0][1], selectors2.EVENT_READ)

        # There should be two calls to the mock_select.select() function
        self.assertEqual(mock_select.call_count, 2)

        # Timeout should be less in the second call.
        # The structure of mock_select.calls is [(args, kwargs), (args, kwargs)] where
        # args is ([r], [w], [x], timeout).
        self.assertLess(mock_select.calls[1][0][3], mock_select.calls[0][0][3])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_close_fds(self):
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        fds = os.pipe()
        self.addCleanup(os.close, fds[0])
        self.addCleanup(os.close, fds[1])

        open_fds = set(fds)
        # add a bunch more fds
        for _ in range(9):
            fd = os.open("/dev/null", os.O_RDONLY)
            self.addCleanup(os.close, fd)
            open_fds.add(fd)

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=False)
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertEqual(remaining_fds & open_fds, open_fds,
                         "Some fds were closed")

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True)
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertFalse(remaining_fds & open_fds,
                         "Some fds were left open")
        self.assertIn(1, remaining_fds, "Subprocess failed")

        # Keep some of the fd's we opened open in the subprocess.
        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
        fds_to_keep = set(open_fds.pop() for _ in range(8))
        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True,
                             pass_fds=())
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertFalse(remaining_fds & fds_to_keep & open_fds,
                         "Some fds not in pass_fds were left open")
        self.assertIn(1, remaining_fds, "Subprocess failed")

    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
    # descriptor of a pipe closed in the parent process is valid in the
    # child process according to fstat(), but the mode of the file
    # descriptor is invalid, and read or write raise an error.