Python os 模块,fdopen() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fdopen()

项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def empty_db(self):
        cmd = [
            "mysqldump",
            "-u%(user)s" % self.db_config,
            "-h%(host)s" % self.db_config,
            "--add_drop-table",
            "--no-data",
            "%(name)s" % self.db_config,
        ]

        tmphandle, tmppath = tempfile.mkstemp(text=True)
        tmpfile = os.fdopen(tmphandle, "w")

        sql_data = subprocess.check_output(cmd, stderr=None).split('\n')
        tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n")
        tmpfile.write("use %(name)s;\n" % self.db_config)
        for line in sql_data:
            if line.startswith("DROP"):
                tmpfile.write(line + '\n')
        tmpfile.close()
        self._run_mysql_cmd("source %s" % tmppath)
        os.remove(tmppath)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def main():
        '''
        Run code specifed by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def Set(self,key,data):
    path = self._GetPath(key)
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
      os.makedirs(directory)
    if not os.path.isdir(directory):
      raise _FileCacheError('%s exists but is not a directory' % directory)
    temp_fd, temp_path = tempfile.mkstemp()
    temp_fp = os.fdopen(temp_fd, 'w')
    temp_fp.write(data)
    temp_fp.close()
    if not path.startswith(self._root_directory):
      raise _FileCacheError('%s does not appear to live under %s' %
                            (path, self._root_directory))
    if os.path.exists(path):
      os.remove(path)
    os.rename(temp_path, path)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_file_win32(fname):
    try:
        fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
    except OSError:
        raise IOError('Cannot read from %r' % fname)
    f = os.fdopen(fd, 'rb')
    m = md5()
    try:
        while fname:
            fname = f.read(200000)
            m.update(fname)
    finally:
        f.close()
    return m.digest()

# always save these
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_file_win32(fname):
    try:
        fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
    except OSError:
        raise IOError('Cannot read from %r' % fname)
    f = os.fdopen(fd, 'rb')
    m = md5()
    try:
        while fname:
            fname = f.read(200000)
            m.update(fname)
    finally:
        f.close()
    return m.digest()

# always save these
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def h_file_win32(fname):
    try:
        fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
    except OSError:
        raise IOError('Cannot read from %r' % fname)
    f = os.fdopen(fd, 'rb')
    m = md5()
    try:
        while fname:
            fname = f.read(200000)
            m.update(fname)
    finally:
        f.close()
    return m.digest()

# always save these
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def setup_environment():
    root = os.getenv('LAMBDA_TASK_ROOT')
    bin_dir = os.path.join(root, 'bin')
    os.environ['PATH'] += ':' + bin_dir
    os.environ['GIT_EXEC_PATH'] = bin_dir

    ssh_dir = tempfile.mkdtemp()

    ssh_identity = os.path.join(ssh_dir, 'identity')
    with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
                   'w') as f:
        f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))

    ssh_config = os.path.join(ssh_dir, 'config')
    with open(ssh_config, 'w') as f:
        f.write('CheckHostIP no\n'
                'StrictHostKeyChecking yes\n'
                'IdentityFile %s\n'
                'UserKnownHostsFile %s\n' %
                (ssh_identity, os.path.join(root, 'known_hosts')))

    os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def convert(cls, report, data):
        "converts the report data to another mimetype if necessary"
        input_format = report.template_extension
        output_format = report.extension or report.template_extension

        if output_format in MIMETYPES:
            return output_format, data

        fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
            prefix='trytond_')
        oext = FORMAT2EXT.get(output_format, output_format)
        with os.fdopen(fd, 'wb+') as fp:
            fp.write(data)
        cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
            '-f', oext, '--stdout', path]
        try:
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
            stdoutdata, stderrdata = proc.communicate()
            if proc.wait() != 0:
                raise Exception(stderrdata)
            return oext, stdoutdata
        finally:
            os.remove(path)
项目:no-YOU-talk-to-the-hand    作者:flashashen    | 项目源码 | 文件源码
def test_basic_config():


    fd, path = tempfile.mkstemp()
    f = os.fdopen(fd,'w')
    f.write(yaml.dump(testcfg))
    f.flush()

    cfg = ny.get_config(path)
    ny.write_supervisor_conf()

    config = ConfigParser.ConfigParser()
    config.readfp(open(cfg['supervisor.conf']))

    # from IPython import embed
    # embed()
    print(config.get('program:testtunnel','command'))
    assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:questionnaire    作者:kylebebak    | 项目源码 | 文件源码
def stdout_redirected(to):
    """Lifted from: https://stackoverflow.com/questions/4675728/redirect-stdout-to-a-file-in-python

    This is the only way I've found to redirect stdout with curses. This way the
    output from questionnaire can be piped to another program, without piping
    what's written to the terminal by the prompters.
    """
    stdout = sys.stdout

    stdout_fd = fileno(stdout)
    # copy stdout_fd before it is overwritten
    with os.fdopen(os.dup(stdout_fd), 'wb') as copied:
        stdout.flush()  # flush library buffers that dup2 knows nothing about
        try:
            os.dup2(fileno(to), stdout_fd)  # $ exec >&to
        except ValueError:  # filename
            with open(to, 'wb') as to_file:
                os.dup2(to_file.fileno(), stdout_fd)  # $ exec > to
        try:
            yield stdout  # allow code to be run with the redirected stdout
        finally:
            # restore stdout to its previous value
            stdout.flush()
            os.dup2(copied.fileno(), stdout_fd)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def main():
        '''
        Run code specified by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def create_file(self, name, excl=False, mode="wb", **kwargs):
        """Creates a file with the given name in this storage.

        :param name: the name for the new file.
        :param excl: if True, try to open the file in "exclusive" mode.
        :param mode: the mode flags with which to open the file. The default is
            ``"wb"``.
        :return: a :class:`whoosh.filedb.structfile.StructFile` instance.
        """

        if self.readonly:
            raise ReadOnlyError

        path = self._fpath(name)
        if excl:
            flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
            if hasattr(os, "O_BINARY"):
                flags |= os.O_BINARY
            fd = os.open(path, flags)
            fileobj = os.fdopen(fd, mode)
        else:
            fileobj = open(path, mode)

        f = StructFile(fileobj, name=name, **kwargs)
        return f
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_FILE_stored_explicitly():
    ffi = FFI()
    ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
    lib = ffi.verify("""
        #include <stdio.h>
        FILE *myfile;
        int myprintf11(const char *out, int value) {
            return fprintf(myfile, out, value);
        }
    """)
    import os
    fdr, fdw = os.pipe()
    fw1 = os.fdopen(fdw, 'wb', 256)
    lib.myfile = ffi.cast("FILE *", fw1)
    #
    fw1.write(b"X")
    r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
    fw1.close()
    assert r == len("hello, 42!\n")
    #
    result = os.read(fdr, 256)
    os.close(fdr)
    # the 'X' might remain in the user-level buffer of 'fw1' and
    # end up showing up after the 'hello, 42!\n'
    assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        ffi = FFI(backend=self.Backend())
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:conda-tools    作者:groutr    | 项目源码 | 文件源码
def _decompress_bz2(filename, blocksize=900*1024):
    """
    Decompress .tar.bz2 to .tar on disk (for faster access)

    Use TemporaryFile to guarentee write access.
    """
    if not filename.endswith('.tar.bz2'):
        return filename

    fd, path = mkstemp()
    with os.fdopen(fd, 'wb') as fo:
        with open(filename, 'rb') as fi:
            z = bz2.BZ2Decompressor()

            for block in iter(lambda: fi.read(blocksize), b''):
                fo.write(z.decompress(block))
    return path
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def spawn(argv, stdout=False):
    """Asynchronously run a program. argv[0] is the executable name, which
    must be fully qualified or in the path. If stdout is True, return
    a file object corresponding to the child's standard output; otherwise,
    return the child's process ID.

    argv must be strictly str objects to avoid encoding confusion.
    """

    types = map(type, argv)
    if not (min(types) == max(types) == str):
        raise TypeError("executables and arguments must be str objects")
    logger.debug("Running %r" % " ".join(argv))
    args = GLib.spawn_async(argv=argv, flags=GLib.SpawnFlags.SEARCH_PATH,
                            standard_output=stdout)

    if stdout:
        return os.fdopen(args[2])
    else:
        return args[0]
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def main():
  if sys.platform in ['win32', 'cygwin']:
    return 0

  # This script is called by gclient. gclient opens its hooks subprocesses with
  # (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom
  # output processing that breaks printing '\r' characters for single-line
  # updating status messages as printed by curl and wget.
  # Work around this by setting stderr of the update.sh process to stdin (!):
  # gclient doesn't redirect stdin, and while stdin itself is read-only, a
  # dup()ed sys.stdin is writable, try
  #   fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi')
  # TODO: Fix gclient instead, http://crbug.com/95350
  return subprocess.call(
      [os.path.join(os.path.dirname(__file__), 'update.sh')] +  sys.argv[1:],
      stderr=os.fdopen(
        os.dup(
            sys.stdin.fileno())))
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def close(self):
        """ Implements GVSvgDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".svg":
            self._filename += ".svg"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the SVG file.
        os.system( 'dot -Tsvg:cairo -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVSvgzDoc
#
#-------------------------------------------------------------------------------
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def close(self):
        """ Implements GVSvgzDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-5:] != ".svgz":
            self._filename += ".svgz"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the SVGZ file.
        os.system( 'dot -Tsvgz -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVPngDoc
#
#-------------------------------------------------------------------------------
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def close(self):
        """ Implements GVPngDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".png":
            self._filename += ".png"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the PNG file.
        os.system( 'dot -Tpng -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVJpegDoc
#
#-------------------------------------------------------------------------------
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def close(self):
        """ Implements GVGifDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".gif":
            self._filename += ".gif"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the GIF file.
        os.system( 'dot -Tgif -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVPdfGvDoc
#
#-------------------------------------------------------------------------------
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = self.default_timeout
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            f = os.fdopen(fd, 'wb')
            try:
                pickle.dump(int(time() + timeout), f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            finally:
                f.close()
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            pass
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def safe_open(path, mode="w", chmod=None, buffering=None):
    """Safely open a file.

    :param str path: Path to a file.
    :param str mode: Same os `mode` for `open`.
    :param int chmod: Same as `mode` for `os.open`, uses Python defaults
        if ``None``.
    :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
        defaults if ``None``.

    """
    # pylint: disable=star-args
    open_args = () if chmod is None else (chmod,)
    fdopen_args = () if buffering is None else (buffering,)
    return os.fdopen(
        os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
        mode, *fdopen_args)
项目:vyper    作者:admiralobvious    | 项目源码 | 文件源码
def _init_dirs(self):
        test_dirs = ['a a', 'b', 'c\c', 'D_']
        config = 'improbable'

        root = tempfile.mkdtemp()

        def cleanup():
            try:
                os.removedirs(root)
            except (FileNotFoundError, OSError):
                pass

        os.chdir(root)

        for dir_ in test_dirs:
            os.mkdir(dir_, 0o0750)

            f = '{0}.toml'.format(config)
            flags = os.O_WRONLY | os.O_CREAT
            rel_path = '{0}/{1}'.format(dir_, f)
            abs_file_path = os.path.join(root, rel_path)
            with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp:
                fp.write("key = \"value is {0}\"\n".format(dir_))

        return root, config, cleanup
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = self.default_timeout
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(int(time() + timeout), f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True