Python tempfile 模块,TemporaryFile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.TemporaryFile()

项目:MOSFiT    作者:guillochon    | 项目源码 | 文件源码
def speak(text, lang='es'):
    """Text to speech. For funp."""
    try:
        from googletrans import Translator
        from gtts import gTTS
        from pygame import mixer
        from tempfile import TemporaryFile

        translator = Translator()
        tts = gTTS(text=translator.translate(text, dest=lang).text, lang=lang)
        mixer.init()

        sf = TemporaryFile()
        tts.write_to_fp(sf)
        sf.seek(0)
        mixer.music.load(sf)
        mixer.music.play()
    except Exception:
        raise
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:coala-langserver    作者:gaocegege    | 项目源码 | 文件源码
def run_coala_with_specific_file(working_dir, file):
    """Run coala in a specified directory."""
    command = ["coala", "--json", "--find-config", "--files", file]
    stdout_file = tempfile.TemporaryFile()
    kwargs = {"stdout": stdout_file,
              "cwd": working_dir}
    process = subprocess.Popen(command, **kwargs)
    retval = process.wait()
    output_str = None

    if retval == 1:
        stdout_file.seek(0)
        output_str = stdout_file.read().decode("utf-8", "ignore")
        if output_str:
            log("Output =", output_str)
        else:
            log("No results for the file")
    elif retval == 0:
        log("No issues found")
    else:
        log("Exited with:", retval)
    stdout_file.close()
    return output_str
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def bytes2zip(bytes):
    """
    RETURN COMPRESSED BYTES
    """
    if hasattr(bytes, "read"):
        buff = TemporaryFile()
        archive = gzip.GzipFile(fileobj=buff, mode='w')
        for b in bytes:
            archive.write(b)
        archive.close()
        buff.seek(0)
        from pyLibrary.env.big_data import FileString, safe_size
        return FileString(buff)

    buff = BytesIO()
    archive = gzip.GzipFile(fileobj=buff, mode='w')
    archive.write(bytes)
    archive.close()
    return buff.getvalue()
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def __init__(self, stream, length, _shared=None):
        """
        :param stream:  THE STREAM WE WILL GET THE BYTES FROM
        :param length:  THE MAX NUMBER OF BYTES WE ARE EXPECTING
        :param _shared: FOR INTERNAL USE TO SHARE THE BUFFER
        :return:
        """
        self.position = 0
        file_ = TemporaryFile()
        if not _shared:
            self.shared = Data(
                length=length,
                locker=Lock(),
                stream=stream,
                done_read=0,
                file=file_,
                buffer=mmap(file_.fileno(), length)
            )
        else:
            self.shared = _shared

        self.shared.ref_count += 1
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def assert_exception_writes_error_message(self, exception, message):
        parser = cli.get_htsget_parser()
        args = parser.parse_args(["https://some.url"])
        saved_stderr = sys.stderr
        try:
            with tempfile.TemporaryFile("w+") as tmp_stderr:
                sys.stderr = tmp_stderr
                with mock.patch("htsget.get") as mocked_get, \
                        mock.patch("sys.exit") as mocked_exit, \
                        mock.patch("logging.basicConfig"):
                    mocked_get.side_effect = exception
                    cli.run(args)
                tmp_stderr.seek(0)
                stderr = tmp_stderr.read().strip()
                mocked_exit.assert_called_once_with(1)
        finally:
            sys.stderr = saved_stderr
        self.assertTrue(stderr.endswith(message))
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def capture(self, data, term_instance=None):
        """
        Stores *data* as a temporary file and returns that file's object.
        *term_instance* can be used by overrides of this function to make
        adjustments to the terminal emulator after the *data* is captured e.g.
        to make room for an image.
        """
        # Remove the extra \r's that the terminal adds:
        data = data.replace(b'\r\n', b'\n')
        logging.debug("capture() len(data): %s" % len(data))
        # Write the data to disk in a temporary location
        self.file_obj = tempfile.TemporaryFile()
        self.file_obj.write(data)
        self.file_obj.flush()
        # Leave it open
        return self.file_obj
项目:search_google    作者:rrwen    | 项目源码 | 文件源码
def setUp(self):
    file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
    with open(file_path, 'r') as in_file:
      defaults = json.load(in_file)
    buildargs = {
      'serviceName': 'customsearch',
      'version': 'v1',
      'developerKey': defaults['build_developerKey']
    }
    cseargs = {
      'q': 'google',
      'num': 1,
      'fileType': 'png',
      'cx': defaults['cx']
    }
    self.results = search_google.api.results(buildargs, cseargs)
    tempfile = TemporaryFile()
    self.tempfile = str(tempfile.name)
    tempfile.close()
    self.tempdir = str(TemporaryDirectory().name)
项目:pytorch-dist    作者:apaszke    | 项目源码 | 文件源码
def __call__(self, test_case):
        module = self.constructor(*self.constructor_args)
        input = self._get_input()

        if self.reference_fn is not None:
            out = test_case._forward(module, input)
            if isinstance(out, Variable):
                out = out.data
            ref_input = self._unpack_input(deepcopy(input))
            expected_out = self.reference_fn(ref_input, test_case._get_parameters(module)[0])
            test_case.assertEqual(out, expected_out)

        # TODO: do this with in-memory files as soon as torch.save will support it
        with TemporaryFile() as f:
            test_case._forward(module, input)
            torch.save(module, f)
            f.seek(0)
            module_copy = torch.load(f)
            test_case.assertEqual(test_case._forward(module, input), test_case._forward(module_copy, input))

        self._do_test(test_case, module, input)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
        """ save targetfd descriptor, and open a new
            temporary file there.  If no tmpfile is
            specified a tempfile.Tempfile() will be opened
            in text mode.
        """
        self.targetfd = targetfd
        if tmpfile is None and targetfd != 0:
            f = tempfile.TemporaryFile('wb+')
            tmpfile = dupfile(f, encoding="UTF-8")
            f.close()
        self.tmpfile = tmpfile
        self._savefd = os.dup(self.targetfd)
        if patchsys:
            self._oldsys = getattr(sys, patchsysdict[targetfd])
        if now:
            self.start()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def writeorg(self, data):
        """ write a string to the original file descriptor
        """
        tempfp = tempfile.TemporaryFile()
        try:
            os.dup2(self._savefd, tempfp.fileno())
            tempfp.write(data)
        finally:
            tempfp.close()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, msg, buffer = None, scheduler = None):
        """Produce this message.

        @param msg: The message I am to produce.
        @type msg: L{IMessage}

        @param buffer: A buffer to hold the message in.  If None, I will
            use a L{tempfile.TemporaryFile}.
        @type buffer: file-like
        """
        self.msg = msg
        if buffer is None:
            buffer = tempfile.TemporaryFile()
        self.buffer = buffer
        if scheduler is None:
            scheduler = iterateInReactor
        self.scheduler = scheduler
        self.write = self.buffer.write
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pytest_configure(config):
    import py
    if config.option.pastebin == "all":
        tr = config.pluginmanager.getplugin('terminalreporter')
        # if no terminal reporter plugin is present, nothing we can do here;
        # this can happen when this function executes in a slave node
        # when using pytest-xdist, for example
        if tr is not None:
            # pastebin file will be utf-8 encoded binary file
            config._pastebinfile = tempfile.TemporaryFile('w+b')
            oldwrite = tr._tw.write
            def tee_write(s, **kwargs):
                oldwrite(s, **kwargs)
                if py.builtin._istext(s):
                    s = s.encode('utf-8')
                config._pastebinfile.write(s)
            tr._tw.write = tee_write
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, targetfd, tmpfile=None):
        self.targetfd = targetfd
        try:
            self.targetfd_save = os.dup(self.targetfd)
        except OSError:
            self.start = lambda: None
            self.done = lambda: None
        else:
            if targetfd == 0:
                assert not tmpfile, "cannot set tmpfile with stdin"
                tmpfile = open(os.devnull, "r")
                self.syscapture = SysCapture(targetfd)
            else:
                if tmpfile is None:
                    f = TemporaryFile()
                    with f:
                        tmpfile = safe_text_dupfile(f, mode="wb+")
                if targetfd in patchsysdict:
                    self.syscapture = SysCapture(targetfd, tmpfile)
                else:
                    self.syscapture = NoCapture()
            self.tmpfile = tmpfile
            self.tmpfile_fd = tmpfile.fileno()
项目:triage    作者:dssg    | 项目源码 | 文件源码
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def get_qr_image(session: CashdeskSession) -> TemporaryFile:
    # TODO: check qr code
    qr = qrcode.QRCode(
        version=1,
        error_correction=qrcode.constants.ERROR_CORRECT_H,
        box_size=10,
        border=4,
    )
    tz = timezone.get_current_timezone()
    data = '{end}\tEinnahme\t{total}\tKassensession\t#{pk}\t{supervisor}\t{user}'.format(
        end=session.end.astimezone(tz).strftime('%d.%m.%Y\t%H:%M:%S'),
        total='{0:,.2f}'.format(session.get_cash_transaction_total()).translate(str.maketrans(',.', '.,')),
        pk=session.pk,
        supervisor=session.backoffice_user_after.get_full_name(),
        user=session.user.get_full_name(),
    )
    qr.add_data(data)
    qr.make()

    f = TemporaryFile()
    img = qr.make_image()
    img.save(f)
    return f
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_exec_command_stdout():
    # Regression test for gh-2999 and gh-2915.
    # There are several packages (nose, scipy.weave.inline, Sage inline
    # Fortran) that replace stdout, in which case it doesn't have a fileno
    # method.  This is tested here, with a do-nothing command that fails if the
    # presence of fileno() is assumed in exec_command.

    # The code has a special case for posix systems, so if we are on posix test
    # both that the special case works and that the generic code works.

    # Test posix version:
    with redirect_stdout(StringIO()):
        with redirect_stderr(TemporaryFile()):
            exec_command.exec_command("cd '.'")

    if os.name == 'posix':
        # Test general (non-posix) version:
        with emulate_nonposix():
            with redirect_stdout(StringIO()):
                with redirect_stderr(TemporaryFile()):
                    exec_command.exec_command("cd '.'")
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_dump_as_file(self):
        with open(util.get_data_filename('nginx.conf')) as handle:
            parsed = load(handle)
        parsed[-1][-1].append(UnspacedList([['server'],
                               [['listen', ' ', '443', ' ', 'ssl'],
                                ['server_name', ' ', 'localhost'],
                                ['ssl_certificate', ' ', 'cert.pem'],
                                ['ssl_certificate_key', ' ', 'cert.key'],
                                ['ssl_session_cache', ' ', 'shared:SSL:1m'],
                                ['ssl_session_timeout', ' ', '5m'],
                                ['ssl_ciphers', ' ', 'HIGH:!aNULL:!MD5'],
                                [['location', ' ', '/'],
                                 [['root', ' ', 'html'],
                                  ['index', ' ', 'index.html', ' ', 'index.htm']]]]]))

        with tempfile.TemporaryFile(mode='w+t') as f:
            dump(parsed, f)
            f.seek(0)
            parsed_new = load(f)
        self.assertEqual(parsed, parsed_new)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_comments(self):
        with open(util.get_data_filename('minimalistic_comments.conf')) as handle:
            parsed = load(handle)

        with tempfile.TemporaryFile(mode='w+t') as f:
            dump(parsed, f)
            f.seek(0)
            parsed_new = load(f)

        self.assertEqual(parsed, parsed_new)
        self.assertEqual(parsed_new, [
            ['#', " Use bar.conf when it's a full moon!"],
            ['include', 'foo.conf'],
            ['#', ' Kilroy was here'],
            ['check_status'],
            [['server'],
             [['#', ''],
              ['#', " Don't forget to open up your firewall!"],
              ['#', ''],
              ['listen', '1234'],
              ['#', ' listen 80;']]],
        ])
项目:google_streetview    作者:rrwen    | 项目源码 | 文件源码
def setUp(self):
    file_path = resource_filename(Requirement.parse('google_streetview'), 'google_streetview/config.json')
    with open(file_path, 'r') as in_file:
      defaults = json.load(in_file)
    params = [{
      'size': '600x300', # max 640x640 pixels
      'location': '46.414382,10.013988',
      'heading': '151.78',
      'pitch': '-0.76',
      'key': defaults['key']
    }]
    self.results = google_streetview.api.results(params)
    tempfile = TemporaryFile()
    self.tempfile = str(tempfile.name)
    tempfile.close()
    self.tempdir = str(TemporaryDirectory().name)
项目:NebulaSolarDash    作者:toddlerya    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:bottle_beginner    作者:denzow    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:Natural-Language-Processing-Python-and-NLTK    作者:PacktPublishing    | 项目源码 | 文件源码
def remove_line(fname, line):
    '''Remove line from file by creating a temporary file containing all lines
    from original file except those matching the given line, then copying the
    temporary file back into the original file, overwriting its contents.
    '''
    with lockfile.FileLock(fname):
        tmp = tempfile.TemporaryFile()
        fp = open(fname, 'rw+')
        # write all lines from orig file, except if matches given line
        for l in fp:
            if l.strip() != line:
                tmp.write(l)

        # reset file pointers so entire files are copied
        fp.seek(0)
        tmp.seek(0)
        # copy tmp into fp, then truncate to remove trailing line(s)
        shutil.copyfileobj(tmp, fp)
        fp.truncate()
        fp.close()
        tmp.close()
项目:fileflow    作者:industrydive    | 项目源码 | 文件源码
def get_read_stream(self, dag_id, task_id, execution_date):
        key_name = self.get_key_name(dag_id, task_id, execution_date)
        key = self.bucket.get_key(key_name)

        if key is not None:
            import tempfile
            temp_file_stream = tempfile.TemporaryFile(mode='w+b')
            key.get_file(temp_file_stream)

            # Stream has been read in and is now at the end
            # So reset it to the start
            temp_file_stream.seek(0)

            return temp_file_stream

        message = \
            'S3 key named {key_name} in bucket {bucket_name} does not exist.'.format(key_name=key_name,
                                                                                     bucket_name=self.bucket_name)
        raise StorageDriverError(message)
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def add_here_document(self, interp, name, content, io_number=None):
        if io_number is None:
            io_number = 0

        if name==pyshlex.unquote_wordtree(name):
            content = interp.expand_here_document(('TOKEN', content))

        # Write document content in a temporary file
        tmp = tempfile.TemporaryFile()
        try:
            tmp.write(content)
            tmp.flush()
            tmp.seek(0)
            self._add_descriptor(io_number, FileWrapper('r', tmp))
        except:
            tmp.close()
            raise
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def rman(self, finalscript):
        self._setenv()
        debug("RMAN execution starts")
        BackupLogger.close()
        starttime = datetime.now()
        with TemporaryFile() as f:
            p = Popen([os.path.join(self.oraclehome, 'bin', 'rman'), "log", BackupLogger.logfile, "append"], stdout=f, stderr=f, stdin=PIPE)
            # Send the script to RMAN
            p.communicate(input=finalscript)
        endtime = datetime.now()
        BackupLogger.init()
        debug("RMAN execution time %s" % (endtime-starttime))
        # If RMAN exists with any code except 0, then there was some error
        if p.returncode != 0:
            error("RMAN execution failed with code %d" % p.returncode)
            raise Exception('rman', "RMAN exited with code %d" % p.returncode)
        else:
            debug("RMAN execution successful")
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def sqlplus(self, finalscript, silent=False):
        self._setenv()
        with TemporaryFile() as f:
            args = [os.path.join(self.oraclehome, 'bin', 'sqlplus')]
            if silent:
                args.append('-S')
            args.append('/nolog')
            debug("SQL*Plus execution starts")
            BackupLogger.close()
            p = Popen(args, stdout=f, stderr=f, stdin=PIPE)
            p.communicate(input=finalscript)
            BackupLogger.init()
            if p.returncode != 0:
                error("SQL*Plus exited with code %d" % p.returncode)
                raise Exception('sqlplus', "sqlplus exited with code %d" % p.returncode)
            else:
                debug("SQL*Plus execution successful")
            if silent:
                f.seek(0,0)
                return f.read()
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def sqlldr(self, login, finalscript):
        self._setenv()
        debug("SQLLDR execution starts")
        f1 = mkstemp(suffix=".ctl")
        ftmp = os.fdopen(f1[0], "w")
        ftmp.write(finalscript)
        ftmp.close()
        f2 = mkstemp(suffix=".log")
        os.close(f2[0])
        with TemporaryFile() as f:
            p = Popen([os.path.join(self.oraclehome, 'bin', 'sqlldr'), login, "control=%s" % f1[1], "log=%s" % f2[1], "errors=0", "silent=all"], stdout=f, stderr=None, stdin=None)
            p.communicate()
            if p.returncode != 0:
                error("SQLLDR exited with code %d" % p.returncode)
                raise Exception('sqlldr', "sqlldr exited with code %d" % p.returncode)
            else:
                debug("SQLLDR execution successful")
        os.unlink(f1[1])
        os.unlink(f2[1])
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def setUp(self):
        tree = ArrayTree(10000, 10) # max value of 10000, each block has 10 numbers
        for i in range(5000):
            tree[i] = i

        # Insert extra copies to test frequency
        for i in range(3000):
            tree[i] = i

        tree.set_range(5000, 9001, 100)
        tree.root.build_summary()

        d = {'test': tree}
        f = tempfile.TemporaryFile()
        FileArrayTreeDict.dict_to_file( d, f )
        f.seek(0)
        self.filearraytreedict = FileArrayTreeDict(f)
        self.filearraytree = self.filearraytreedict['test']
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_has_no_name(self):
        # TemporaryFile creates files with no names (on this system)
        dir = tempfile.mkdtemp()
        f = tempfile.TemporaryFile(dir=dir)
        f.write(b'blat')

        # Sneaky: because this file has no name, it should not prevent
        # us from removing the directory it was created in.
        try:
            os.rmdir(dir)
        except:
            ei = sys.exc_info()
            # cleanup
            f.close()
            os.rmdir(dir)
            self.failOnException("rmdir", ei)
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def download_glove(glove):
    if os.path.exists(glove):
        return

    print('Downloading glove...')
    with tempfile.TemporaryFile() as tmp:
        with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res:
            shutil.copyfileobj(res, tmp)
        with zipfile.ZipFile(tmp, 'r') as glove_zip:
            glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove))
    print('Done')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def test_FileExistsIfTrue(self):
    """ test the method that checks if the file exists """
    with tempfile.TemporaryDirectory() as tmpdir:
      with tempfile.TemporaryFile(dir=tmpdir) as fp:
        actual = self.helper.FileExists(fp.name)
    self.assertTrue(actual)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def make_file(self, binary=None):
        """Overridable: return a readable & writable file.

        The file will be used as follows:
        - data is written to it
        - seek(0)
        - data is read from it

        The 'binary' argument is unused -- the file is always opened
        in binary mode.

        This version opens a temporary file for reading and writing,
        and immediately deletes (unlinks) it.  The trick (on Unix!) is
        that the file can still be used, but it can't be opened by
        another process, and it will automatically be deleted when it
        is closed or when the current process terminates.

        If you want a more permanent file, you derive a class which
        overrides this method.  If you want a visible temporary file
        that is nevertheless automatically deleted when the script
        terminates, try defining a __del__ method in a derived class
        which unlinks the temporary files you have created.

        """
        import tempfile
        return tempfile.TemporaryFile("w+b")



# Backwards Compatibility Classes
# ===============================
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def __radd__(self, other):
        new_file = TemporaryFile()
        new_file.write(other)
        self.file.seek(0)
        for l in self.file:
            new_file.write(l)
        new_file.seek(0)
        return FileString(new_file)
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def safe_size(source):
    """
    READ THE source UP TO SOME LIMIT, THEN COPY TO A FILE IF TOO BIG
    RETURN A str() OR A FileString()
    """

    if source is None:
        return None

    total_bytes = 0
    bytes = []
    b = source.read(MIN_READ_SIZE)
    while b:
        total_bytes += len(b)
        bytes.append(b)
        if total_bytes > MAX_STRING_SIZE:
            try:
                data = FileString(TemporaryFile())
                for bb in bytes:
                    data.write(bb)
                del bytes
                del bb
                b = source.read(MIN_READ_SIZE)
                while b:
                    total_bytes += len(b)
                    data.write(b)
                    b = source.read(MIN_READ_SIZE)
                data.seek(0)
                Log.note("Using file of size {{length}} instead of str()",  length= total_bytes)

                return data
            except Exception as e:
                Log.error("Could not write file > {{num}} bytes",  num= total_bytes, cause=e)
        b = source.read(MIN_READ_SIZE)

    data = b"".join(bytes)
    del bytes
    return data
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def __radd__(self, other):
        new_file = TemporaryFile()
        new_file.write(other)
        self.file.seek(0)
        for l in self.file:
            new_file.write(l)
        new_file.seek(0)
        return FileString(new_file)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __init__(self, archiver, url):
        self.archiver = archiver
        self.tmp = TemporaryFile()
        self.url = url
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_write_read_string(self):
        with tempfile.TemporaryFile() as f:
            value = u'test'
            write_string(f, value)
            f.seek(0)
            self.assertEqual(read_string(f), value)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_write_read_longstring(self):
        with tempfile.TemporaryFile() as f:
            value = u'test'
            write_longstring(f, value)
            f.seek(0)
            self.assertEqual(read_longstring(f), value)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_write_read_stringmap(self):
        with tempfile.TemporaryFile() as f:
            value = {'key': 'value'}
            write_stringmap(f, value)
            f.seek(0)
            self.assertEqual(read_stringmap(f), value)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_write_read_inet(self):
        with tempfile.TemporaryFile() as f:
            value = ('192.168.1.1', 9042)
            write_inet(f, value)
            f.seek(0)
            self.assertEqual(read_inet(f), value)

        with tempfile.TemporaryFile() as f:
            value = ('2001:db8:0:f101::1', 9042)
            write_inet(f, value)
            f.seek(0)
            self.assertEqual(read_inet(f), value)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def default_stream_factory(total_content_length, filename, content_type,
                           content_length=None):
    """The stream factory that is used per default."""
    if total_content_length > 1024 * 500:
        return TemporaryFile('wb+')
    return BytesIO()
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_cli_error(self):
        cmd = [
            sys.executable, "htsget_dev.py", TestRequestHandler.ticket_url + "XXX",
            "-O", self.output_file]
        with tempfile.TemporaryFile("wb+") as stderr, \
                tempfile.TemporaryFile("wb+") as stdout:
            ret = subprocess.call(cmd, stderr=stderr, stdout=stdout)
            self.assertEqual(ret, 1)
            stderr.seek(0)
            stdout.seek(0)
            self.assertGreater(len(stderr.read()), 0)
            self.assertEqual(len(stdout.read()), 0)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_bad_scheme(self):
        with tempfile.TemporaryFile("w+") as temp_file:
            for bad_scheme in ["htt://as", "file:///home", "ftp://x.y/sdf"]:
                ticket = get_ticket(urls=[
                    get_http_ticket("http://a.b"),
                    get_http_ticket("htp")])
                dm = StoringUrlsDownloadManager(ticket, temp_file)
                self.assertRaises(ValueError, dm.run)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_basic_http_parsing(self):
        headers = {"a": "b", "b": "c"}
        ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL, headers)])
        with tempfile.TemporaryFile("w+") as temp_file:
            dm = StoringUrlsDownloadManager(ticket, temp_file)
            dm.run()
        self.assertEqual(dm.stored_urls[0], (EXAMPLE_URL, headers))
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_basic_data_uri_parsing(self):
        data_uri = "data:application/vnd.ga4gh.bam;base64,SGVsbG8sIFdvcmxkIQ=="
        ticket = get_ticket(urls=[get_data_uri_ticket(data_uri)])
        with tempfile.TemporaryFile("w+") as temp_file:
            dm = StoringUrlsDownloadManager(ticket, temp_file)
            dm.run()
        self.assertEqual(dm.stored_urls[0], urlparse(data_uri))
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_num_retries(self):
        ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)])
        with tempfile.TemporaryFile("w+") as temp_file:
            for num_retries in range(10):
                with mock.patch("time.sleep") as mock_sleep, \
                        mock.patch("logging.warning") as mock_warning:
                    dm = RetryCountDownloadManager(
                        ticket, temp_file, max_retries=num_retries)
                    self.assertEqual(dm.max_retries, num_retries)
                    self.assertRaises(exceptions.RetryableError, dm.run)
                    self.assertEqual(dm.attempt_counts[EXAMPLE_URL], num_retries + 1)
                    self.assertEqual(mock_sleep.call_count, num_retries)
                    self.assertEqual(mock_warning.call_count, num_retries)