Python tempfile 模块,mkstemp() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mkstemp()

项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def empty_db(self):
        cmd = [
            "mysqldump",
            "-u%(user)s" % self.db_config,
            "-h%(host)s" % self.db_config,
            "--add_drop-table",
            "--no-data",
            "%(name)s" % self.db_config,
        ]

        tmphandle, tmppath = tempfile.mkstemp(text=True)
        tmpfile = os.fdopen(tmphandle, "w")

        sql_data = subprocess.check_output(cmd, stderr=None).split('\n')
        tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n")
        tmpfile.write("use %(name)s;\n" % self.db_config)
        for line in sql_data:
            if line.startswith("DROP"):
                tmpfile.write(line + '\n')
        tmpfile.close()
        self._run_mysql_cmd("source %s" % tmppath)
        os.remove(tmppath)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
项目:senf    作者:quodlibet    | 项目源码 | 文件源码
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
        text (bool): if the file should be opened in text mode
    Returns:
        Tuple[`int`, `fsnative`]:
            A tuple containing the file descriptor and the file path
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
    path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkstemp(suffix, prefix, dir, text)
项目:senf    作者:quodlibet    | 项目源码 | 文件源码
def mkdtemp(suffix=None, prefix=None, dir=None):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
    Returns:
        `fsnative`: A path to a directory
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkdtemp(suffix, prefix, dir)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def _openDownloadFile(self, buildId, suffix):
        (tmpFd, tmpName) = mkstemp()
        url = self._makeUrl(buildId, suffix)
        try:
            os.close(tmpFd)
            env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
            env["BOB_LOCAL_ARTIFACT"] = tmpName
            env["BOB_REMOTE_ARTIFACT"] = url
            ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
                stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                cwd="/tmp", env=env)
            if ret == 0:
                ret = tmpName
                tmpName = None
                return CustomDownloader(ret)
            else:
                raise ArtifactDownloadError("failed (exit {})".format(ret))
        finally:
            if tmpName is not None: os.unlink(tmpName)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def grab(bbox=None):
    if sys.platform == "darwin":
        fh, filepath = tempfile.mkstemp('.png')
        os.close(fh)
        subprocess.call(['screencapture', '-x', filepath])
        im = Image.open(filepath)
        im.load()
        os.unlink(filepath)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origin lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _compile_module_file(template, text, filename, outputpath, module_writer):
    source, lexer = _compile(template, text, filename,
                             generate_magic_comment=True)

    if isinstance(source, compat.text_type):
        source = source.encode(lexer.encoding or 'ascii')

    if module_writer:
        module_writer(source, outputpath)
    else:
        # make tempfiles in the same location as the ultimate
        # location.   this ensures they're on the same filesystem,
        # avoiding synchronization issues.
        (dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath))

        os.write(dest, source)
        os.close(dest)
        shutil.move(name, outputpath)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_transfer_with_cli(self):
        test_instances = [
            TestUrlInstance(url="/data1", data=b"data1"),
            TestUrlInstance(url="/data2", data=b"data2")
        ]
        self.httpd.test_instances = test_instances
        try:
            fd, filename = tempfile.mkstemp()
            os.close(fd)
            cmd = [TestRequestHandler.ticket_url, "-O", filename]
            parser = cli.get_htsget_parser()
            args = parser.parse_args(cmd)
            with mock.patch("sys.exit") as mocked_exit:
                cli.run(args)
                mocked_exit.assert_called_once_with(0)
            all_data = b"".join(test_instance.data for test_instance in test_instances)
            with open(filename, "rb") as f:
                self.assertEqual(f.read(), all_data)
        finally:
            os.unlink(filename)
项目:HugoPhotoSwipe    作者:GjjvdBurg    | 项目源码 | 文件源码
def create_thumb_js(self, mode=None, pth=None):
        """ Create the thumbnail using SmartCrop.js """
        if pth is None:
            raise ValueError("path can't be None")

        # save a copy of the image with the correct orientation in a temporary 
        # file
        _, tmpfname = tempfile.mkstemp(suffix='.'+settings.output_format)
        self.original_image.save(tmpfname, quality=95)

        # Load smartcrop and set options
        nwidth, nheight = self.resize_dims(mode)
        logging.info("[%s] SmartCrop.js new dimensions: %ix%i" % (self.name, 
            nwidth, nheight))
        command = [settings.smartcrop_js_path, '--width', str(nwidth), 
                '--height', str(nheight), tmpfname, pth]
        logging.info("[%s] SmartCrop.js running crop command." % self.name)
        check_output(command)

        # remove the temporary file
        os.remove(tmpfname)

        return pth
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def Set(self,key,data):
    path = self._GetPath(key)
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
      os.makedirs(directory)
    if not os.path.isdir(directory):
      raise _FileCacheError('%s exists but is not a directory' % directory)
    temp_fd, temp_path = tempfile.mkstemp()
    temp_fp = os.fdopen(temp_fd, 'w')
    temp_fp.write(data)
    temp_fp.close()
    if not path.startswith(self._root_directory):
      raise _FileCacheError('%s does not appear to live under %s' %
                            (path, self._root_directory))
    if os.path.exists(path):
      os.remove(path)
    os.rename(temp_path, path)
项目:piperine    作者:DNA-and-Natural-Algorithms-Group    | 项目源码 | 文件源码
def setUp(self):
        # CRN used in this test
        self.crn_rxn = 'A + B -> A + D\n'
        # Establish random temporary filenames
        self.tdir = mkdtemp(prefix='piperine_test')
        fid, self.basename = mkstemp(dir=self.tdir)
        os.close(fid)
        endings = ['.crn', '.fixed', '.pil', '.mfe', '{}_strands.txt', '{}.seqs']
        self.filenames = [ self.basename + suf for suf in endings ]
        self.crn, self.fixed, self.pil, self.mfe, self.strands, self.seqs = self.filenames
        fid, self.fixedscore = mkstemp(suffix='_fixed_score.csv', dir=self.tdir)
        os.close(fid)
        fid, self.reportfile = mkstemp(dir=self.tdir)
        os.close(fid)
        # Write CRN to basename.crn
        with open(self.crn, 'w') as f:
            f.write(self.crn_rxn)
        # Modules and module strings for import tests
        proc = subprocess.Popen(['piperine-design {} -n 3 -D -q'.format(self.crn)], stdout=subprocess.PIPE, shell=True)
        (out, err) = proc.communicate()
        self.ef = energetics.energyfuncs(targetdG=7.7)
        self.trans = translation
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = self.generator.bld.exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret

########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = super(self.__class__, self).exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = super(self.__class__, self).exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = self.generator.bld.exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret

########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def open_compressed(filename, open_flag='r', compression_type='bz2'):
  """Opens a compressed HDF5File with the given opening flags.
  For the 'r' flag, the given compressed file will be extracted to a local space.
  For 'w', an empty HDF5File is created.
  In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function.
  """
  # create temporary HDF5 file name
  hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1]

  if open_flag == 'r':
    # extract the HDF5 file from the given file name into a temporary file name
    tar = tarfile.open(filename, mode="r:" + compression_type)
    memory_file = tar.extractfile(tar.next())
    real_file = open(hdf5_file_name, 'wb')
    real_file.write(memory_file.read())
    del memory_file
    real_file.close()
    tar.close()

  return bob.io.base.HDF5File(hdf5_file_name, open_flag)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, path, threaded=True, timeout=None):
        """
        >>> lock = SQLiteLockFile('somefile')
        >>> lock = SQLiteLockFile('somefile', threaded=False)
        """
        LockBase.__init__(self, path, threaded, timeout)
        self.lock_file = unicode(self.lock_file)
        self.unique_name = unicode(self.unique_name)

        if SQLiteLockFile.testdb is None:
            import tempfile
            _fd, testdb = tempfile.mkstemp()
            os.close(_fd)
            os.unlink(testdb)
            del _fd, tempfile
            SQLiteLockFile.testdb = testdb

        import sqlite3
        self.connection = sqlite3.connect(SQLiteLockFile.testdb)

        c = self.connection.cursor()
        try:
            c.execute("create table locks"
                      "("
                      "   lock_file varchar(32),"
                      "   unique_name varchar(32)"
                      ")")
        except sqlite3.OperationalError:
            pass
        else:
            self.connection.commit()
            import atexit
            atexit.register(os.unlink, SQLiteLockFile.testdb)
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_tpm_rand_block(size=4096):
    global warned
    randpath = None
    try:
        #make a temp file for the output 
        randfd,randpath = tempfile.mkstemp()
        command = "getrandom -size %d -out %s" % (size,randpath)
        tpm_exec.run(command)

        # read in the quote
        f = open(randpath,"rb")
        rand = f.read()
        f.close()
        os.close(randfd)
    except Exception as e:
        if not warned:
            logger.warn("TPM randomness not available: %s"%e)
            warned=True
        return []
    finally:
        if randpath is not None:
            os.remove(randpath)
    return rand
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def write_key_nvram(key):
    if common.STUB_TPM:
        storage = open("tpm_nvram","wb")
        storage.write(key)
        storage.close()
        return

    owner_pw = tpm_initialize.get_tpm_metadata('owner_pw')
    keyFile = None
    try:
        # write out quote
        keyfd,keypath = tempfile.mkstemp()
        keyFile = open(keypath,"wb")
        keyFile.write(key)
        keyFile.close()
        os.close(keyfd)
        tpm_exec.run("nv_definespace -pwdo %s -in 1 -sz %d -pwdd %s -per 40004"%(owner_pw,common.BOOTSTRAP_KEY_SIZE,owner_pw))
        tpm_exec.run("nv_writevalue -pwdd %s -in 1 -if %s"%(owner_pw,keyFile.name))
    finally:
        if keyFile is not None:
            os.remove(keyFile.name)
    return
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def test_ownerpw(owner_pw,reentry=False):
    tmppath = None
    try:
        #make a temp file for the output 
        _,tmppath = tempfile.mkstemp()
        (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) 
        if code!=tpm_exec.EXIT_SUCESS:
            if len(output)>0 and output[0].startswith("Error Authentication failed (Incorrect Password) from TPM_OwnerReadPubek"):
                return False
            elif len(output)>0 and output[0].startswith("Error Defend lock running from TPM_OwnerReadPubek"):
                if reentry:
                    logger.error("Unable to unlock TPM")
                    return False
                # tpm got locked. lets try to unlock it
                logger.error("TPM is locked from too many invalid owner password attempts, attempting to unlock with password: %s"%owner_pw)
                # i have no idea why, but runnig this twice seems to actually work
                tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) 
                tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) 
                return test_ownerpw(owner_pw,True)
            else:
                raise Exception("test ownerpw, getpubek failed with code "+str(code)+": "+str(output))
    finally:
        if tmppath is not None:
            os.remove(tmppath)
    return True
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_pub_ek(): # assumes that owner_pw is correct at this point
    owner_pw = get_tpm_metadata('owner_pw')
    tmppath = None
    try:
        #make a temp file for the output 
        tmpfd,tmppath = tempfile.mkstemp()
        (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) # generates pubek.pem
        if code!=tpm_exec.EXIT_SUCESS:
            raise Exception("getpubek failed with code "+str(code)+": "+str(output))

        # read in the output
        f = open(tmppath,"rb")
        ek = f.read()
        f.close()
        os.close(tmpfd)
    finally:
        if tmppath is not None:
            os.remove(tmppath)

    set_tpm_metadata('ek',ek)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
    """Create temporary file or use existing file.

    This util is needed for creating temporary file with
    specified content, suffix and prefix. If path is not None,
    it will be used for writing content. If the path doesn't
    exist it'll be created.

    :param content: content for temporary file.
    :param path: same as parameter 'dir' for mkstemp
    :param suffix: same as parameter 'suffix' for mkstemp
    :param prefix: same as parameter 'prefix' for mkstemp

    For example: it can be used in database tests for creating
    configuration files.
    """
    if path:
        ensure_tree(path)

    (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
    try:
        os.write(fd, content)
    finally:
        os.close(fd)
    return path
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def _dump(self, file=None, format=None, **options):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format, **options)
        return file
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def convert(cls, report, data):
        "converts the report data to another mimetype if necessary"
        input_format = report.template_extension
        output_format = report.extension or report.template_extension

        if output_format in MIMETYPES:
            return output_format, data

        fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
            prefix='trytond_')
        oext = FORMAT2EXT.get(output_format, output_format)
        with os.fdopen(fd, 'wb+') as fp:
            fp.write(data)
        cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
            '-f', oext, '--stdout', path]
        try:
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
            stdoutdata, stderrdata = proc.communicate()
            if proc.wait() != 0:
                raise Exception(stderrdata)
            return oext, stdoutdata
        finally:
            os.remove(path)
项目:no-YOU-talk-to-the-hand    作者:flashashen    | 项目源码 | 文件源码
def test_basic_config():


    fd, path = tempfile.mkstemp()
    f = os.fdopen(fd,'w')
    f.write(yaml.dump(testcfg))
    f.flush()

    cfg = ny.get_config(path)
    ny.write_supervisor_conf()

    config = ConfigParser.ConfigParser()
    config.readfp(open(cfg['supervisor.conf']))

    # from IPython import embed
    # embed()
    print(config.get('program:testtunnel','command'))
    assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, -1))
                f.write(zlib.compress(pickle.dumps(value), -1))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def create_temporary_ca_file(anchor_list):
    """
    Concatenate all the certificates (PEM format for the export) in
    'anchor_list' and write the result to file to a temporary file
    using mkstemp() from tempfile module. On success 'filename' is
    returned, None otherwise.

    If you are used to OpenSSL tools, this function builds a CAfile
    that can be used for certificate and CRL check.

    Also see create_temporary_ca_file().
    """
    try:
        f, fname = tempfile.mkstemp()
        for a in anchor_list:
            s = a.output(fmt="PEM")
            l = os.write(f, s)
        os.close(f)
    except:
        return None
    return fname
项目:scriptcwl    作者:NLeSC    | 项目源码 | 文件源码
def validate(self, inline=False):
        """Validate workflow object.

        This method currently validates the workflow object with the use of
        cwltool. It writes the workflow to a tmp CWL file, reads it, validates
        it and removes the tmp file again. By default, the workflow is written
        to file using absolute paths to the steps. Optionally, the steps can be
        saved inline.
        """
        # define tmpfile
        (fd, tmpfile) = tempfile.mkstemp()
        os.close(fd)
        try:
            # save workflow object to tmpfile,
            # do not recursively call validate function
            self.save(tmpfile, inline=inline, validate=False, relative=False,
                      wd=False)
            # load workflow from tmpfile
            document_loader, processobj, metadata, uri = load_cwl(tmpfile)
        finally:
            # cleanup tmpfile
            os.remove(tmpfile)
项目:scriptcwl    作者:NLeSC    | 项目源码 | 文件源码
def _pack(self, fname, encoding):
        """Save workflow with ``--pack`` option

        This means that al tools and subworkflows are included in the workflow
        file that is created. A packed workflow cannot be loaded and used in
        scriptcwl.
        """
        (fd, tmpfile) = tempfile.mkstemp()
        os.close(fd)
        try:
            self.save(tmpfile, validate=False, wd=False, inline=False,
                      relative=False, pack=False)
            document_loader, processobj, metadata, uri = load_cwl(tmpfile)
        finally:
            # cleanup tmpfile
            os.remove(tmpfile)

        with codecs.open(fname, 'wb', encoding=encoding) as f:
            f.write(print_pack(document_loader, processobj, uri, metadata))
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def create_temporary_ca_file(anchor_list):
    """
    Concatenate all the certificates (PEM format for the export) in
    'anchor_list' and write the result to file to a temporary file
    using mkstemp() from tempfile module. On success 'filename' is
    returned, None otherwise.

    If you are used to OpenSSL tools, this function builds a CAfile
    that can be used for certificate and CRL check.

    Also see create_temporary_ca_file().
    """
    try:
        f, fname = tempfile.mkstemp()
        for a in anchor_list:
            s = a.output(fmt="PEM")
            l = os.write(f, s)
        os.close(f)
    except:
        return None
    return fname
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _findLib_gcc(name):
        expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
        fdout, ccout = tempfile.mkstemp()
        os.close(fdout)
        cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
              'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
        try:
            f = os.popen(cmd)
            try:
                trace = f.read()
            finally:
                rv = f.close()
        finally:
            try:
                os.unlink(ccout)
            except OSError, e:
                if e.errno != errno.ENOENT:
                    raise
        if rv == 10:
            raise OSError, 'gcc or cc command not found'
        res = re.search(expr, trace)
        if not res:
            return None
        return res.group(0)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def mktemp(self, *args, **kwds):
        """create temp file that's cleaned up at end of test"""
        self.require_writeable_filesystem()
        fd, path = tempfile.mkstemp(*args, **kwds)
        os.close(fd)
        queue = self._mktemp_queue
        if queue is None:
            queue = self._mktemp_queue = []
            def cleaner():
                for path in queue:
                    if os.path.exists(path):
                        os.remove(path)
                del queue[:]
            self.addCleanup(cleaner)
        queue.append(path)
        return path
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        timeout = self._normalize_timeout(timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def set(self, key, value, timeout=None):
        timeout = self._normalize_timeout(timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        ffi = FFI(backend=self.Backend())
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def __init__(self, executable_path, port=0, service_args=None, log_path=None):
        """
        Creates a new instance of the Service

        :Args:
         - executable_path : Path to PhantomJS binary
         - port : Port the service is running on
         - service_args : A List of other command line options to pass to PhantomJS
         - log_path: Path for PhantomJS service to log to
        """
        self.service_args= service_args
        if self.service_args is None:
            self.service_args = []
        else:
            self.service_args=service_args[:]
        if not log_path:
            log_path = "ghostdriver.log"
        if not self._args_contain("--cookies-file="):
            self._cookie_temp_file = tempfile.mkstemp()[1]
            self.service_args.append("--cookies-file=" + self._cookie_temp_file)
        else:
            self._cookie_temp_file = None

        service.Service.__init__(self, executable_path, port=port, log_file=open(log_path, 'w'))