Python tempfile 模块,SpooledTemporaryFile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.SpooledTemporaryFile()

项目:send-cli    作者:ehuggett    | 项目源码 | 文件源码
def encrypt_file(file, keys=secretKeys()):
    '''Encrypt file data with the same method as the Send browser/js client'''
    key = keys.encryptKey
    iv = keys.encryptIV
    encData = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b')
    cipher = Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, iv)

    pbar = progbar(fileSize(file))

    for chunk in iter(lambda: file.read(CHUNK_SIZE), b''):
        encData.write(cipher.encrypt(chunk))
        pbar.update(len(chunk))

    pbar.close()
    encData.write(cipher.digest())
    file.close()

    encData.seek(0)
    return encData
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_text_mode(self):
        # Creating a SpooledTemporaryFile with a text mode should produce
        # a file object reading and writing (Unicode) text strings.
        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
        f.write("abc\n")
        f.seek(0)
        self.assertEqual(f.read(), "abc\n")
        f.write("def\n")
        f.seek(0)
        self.assertEqual(f.read(), "abc\ndef\n")
        f.write("xyzzy\n")
        f.seek(0)
        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
        # Check that Ctrl+Z doesn't truncate the file
        f.write("foo\x1abar\n")
        f.seek(0)
        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
项目:send-cli    作者:ehuggett    | 项目源码 | 文件源码
def api_download(service, fileId, authorisation):
    '''Given a Send url, download and return the encrypted data and metadata'''
    data = tempfile.SpooledTemporaryFile(max_size=SPOOL_SIZE, mode='w+b')

    headers = {'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(authorisation)}
    url = service + 'api/download/' + fileId

    r = requests.get(url, headers=headers, stream=True)
    r.raise_for_status()
    content_length = int(r.headers['Content-length'])

    pbar = progbar(content_length)
    for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
        data.write(chunk)
        pbar.update(len(chunk))
    pbar.close()

    data.seek(0)
    return data
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def initCase(switches, count):
    _failures.failedItems = []
    _failures.failedParseOn = None
    _failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def is_available(cls):
        if (super(cls, cls).is_available() and
           diagnose.check_executable('text2wave') and
           diagnose.check_executable('festival')):

            logger = logging.getLogger(__name__)
            cmd = ['festival', '--pipe']
            with tempfile.SpooledTemporaryFile() as out_f:
                with tempfile.SpooledTemporaryFile() as in_f:
                    logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                           for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=out_f)
                    out_f.seek(0)
                    output = out_f.read().strip()
                    if output:
                        logger.debug("Output was: '%s'", output)
                    return ('No default voice found' not in output)
        return False
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['text2wave']
        with tempfile.NamedTemporaryFile(suffix='.wav') as out_f:
            with tempfile.SpooledTemporaryFile() as in_f:
                in_f.write(phrase)
                in_f.seek(0)
                with tempfile.SpooledTemporaryFile() as err_f:
                    self._logger.debug('Executing %s',
                                       ' '.join([pipes.quote(arg)
                                                 for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=err_f)
                    err_f.seek(0)
                    output = err_f.read()
                    if output:
                        self._logger.debug("Output was: '%s'", output)
            self.play(out_f.name)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['flite']
        if self.voice:
            cmd.extend(['-voice', self.voice])
        cmd.extend(['-t', phrase])
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd.append(fname)
        with tempfile.SpooledTemporaryFile() as out_f:
            self._logger.debug('Executing %s',
                               ' '.join([pipes.quote(arg)
                                         for arg in cmd]))
            subprocess.call(cmd, stdout=out_f, stderr=out_f)
            out_f.seek(0)
            output = out_f.read().strip()
        if output:
            self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def transcribe(self, fp, mode=None):
        cmd = ['julius',
               '-quiet',
               '-nolog',
               '-input', 'stdin',
               '-dfa', self._vocabulary.dfa_file,
               '-v', self._vocabulary.dict_file,
               '-h', self._hmmdefs,
               '-hlist', self._tiedlist,
               '-forcedict']
        cmd = [str(x) for x in cmd]
        self._logger.debug('Executing: %r', cmd)
        with tempfile.SpooledTemporaryFile() as out_f:
            with tempfile.SpooledTemporaryFile() as err_f:
                subprocess.call(cmd, stdin=fp, stdout=out_f, stderr=err_f)
            out_f.seek(0)
            results = [(int(i), text) for i, text in
                       self._pattern.findall(out_f.read())]
        transcribed = [text for i, text in
                       sorted(results, key=lambda x: x[0])
                       if text]
        if not transcribed:
            transcribed.append('')
        self._logger.info('Transcribed: %r', transcribed)
        return transcribed
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'rb+')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_text_newline_and_encoding(self):
        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
                                          newline='', encoding='utf-8')
        f.write("\u039B\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n")
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNone(f.name)
        self.assertIsNone(f.newlines)
        self.assertIsNone(f.encoding)

        f.write("\u039B" * 20 + "\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNotNone(f.name)
        self.assertIsNotNone(f.newlines)
        self.assertEqual(f.encoding, 'utf-8')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_truncate_with_size_parameter(self):
        # A SpooledTemporaryFile can be truncated to zero size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.seek(0)
        f.truncate()
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'')
        # A SpooledTemporaryFile can be truncated to a specific size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(4)
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'abcd')
        # A SpooledTemporaryFile rolls over if truncated to large size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(20)
        self.assertTrue(f._rolled)
        if has_stat:
            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
项目:WebHashcat    作者:hegusung    | 项目源码 | 文件源码
def csv_masks(request, hashfile_id):
    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    # didn't found the correct way in pure django...
    res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])

    fp = tempfile.SpooledTemporaryFile(mode='w')
    csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
    for item in res:
        csvfile.writerow([item.count, item.password_mask])
    fp.seek(0)   # rewind the file handle

    csvfile_data = fp.read()

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
    response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
    return response
项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def download():
    trackrels = request.query.tracks.split('|')

    # write the archive into a temporary in-memory file-like object
    temp = tempfile.SpooledTemporaryFile()
    with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive:
        for trackrel in trackrels:
            base_wildcard = trackrel.replace("-track.csv", "*")
            paths = config.TRACKDIR.glob(base_wildcard)
            for path in paths:
                archive.write(str(path),
                              str(path.relative_to(config.TRACKDIR))
                              )

    temp.seek(0)

    # force a download; give it a filename and mime type
    response.set_header('Content-Disposition', 'attachment; filename="data.zip"')
    response.set_header('Content-Type', 'application/zip')

    # relying on garbage collector to delete tempfile object
    # (and hence the file itself) when done sending
    return temp
项目:Eagle    作者:magerx    | 项目源码 | 文件源码
def initCase(switches, count):
    Failures.failedItems = []
    Failures.failedParseOn = None
    Failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count)
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'rb+')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_text_newline_and_encoding(self):
        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
                                          newline='', encoding='utf-8')
        f.write("\u039B\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n")
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNone(f.name)
        self.assertIsNone(f.newlines)
        self.assertIsNone(f.encoding)

        f.write("\u039B" * 20 + "\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNotNone(f.name)
        self.assertIsNotNone(f.newlines)
        self.assertEqual(f.encoding, 'utf-8')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_truncate_with_size_parameter(self):
        # A SpooledTemporaryFile can be truncated to zero size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.seek(0)
        f.truncate()
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'')
        # A SpooledTemporaryFile can be truncated to a specific size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(4)
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'abcd')
        # A SpooledTemporaryFile rolls over if truncated to large size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(20)
        self.assertTrue(f._rolled)
        if has_stat:
            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def say(self, phrase):
        cmd = ['text2wave', '-eval', '(voice_%s)' % self.voice]
        with tempfile.SpooledTemporaryFile() as out_f:
            with tempfile.SpooledTemporaryFile() as in_f:
                in_f.write(phrase)
                in_f.seek(0)
                with tempfile.SpooledTemporaryFile() as err_f:
                    self._logger.debug(
                        'Executing %s', ' '.join([pipes.quote(arg)
                                                  for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=err_f)
                    err_f.seek(0)
                    output = err_f.read()
                    if output:
                        self._logger.debug("Output was: '%s'", output)
            out_f.seek(0)
            return out_f.read()
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def say(self, phrase):
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd = [EXECUTABLE, '-o', fname,
                           '--file-format=WAVE',
                           str(phrase)]
        self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                     for arg in cmd]))
        with tempfile.SpooledTemporaryFile() as f:
            subprocess.call(cmd, stdout=f, stderr=f)
            f.seek(0)
            output = f.read()
            if output:
                self._logger.debug("Output was: '%s'", output)

        with open(fname, 'rb') as f:
            data = f.read()
        os.remove(fname)
        return data
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def mp3_to_wave(self, filename):
        mf = mad.MadFile(filename)
        with tempfile.SpooledTemporaryFile() as f:
            wav = wave.open(f, mode='wb')
            wav.setframerate(mf.samplerate())
            wav.setnchannels(1 if mf.mode() == mad.MODE_SINGLE_CHANNEL else 2)
            # 4L is the sample width of 32 bit audio
            wav.setsampwidth(4)
            frame = mf.read()
            while frame is not None:
                wav.writeframes(frame)
                frame = mf.read()
            wav.close()
            f.seek(0)
            data = f.read()
        return data
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:Helix    作者:3lackrush    | 项目源码 | 文件源码
def initCase(switches, count):
    _failures.failedItems = []
    _failures.failedParseOn = None
    _failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
项目:autoscan    作者:b01u    | 项目源码 | 文件源码
def initCase(switches, count):
    Failures.failedItems = []
    Failures.failedParseOn = None
    Failures.failedTraceBack = None

    paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="sqlmaptest-%d-" % count)
    paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
    paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")

    logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)

    LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")

    cmdLineOptions = cmdLineParser()

    if switches:
        for key, value in switches.items():
            if key in cmdLineOptions.__dict__:
                cmdLineOptions.__dict__[key] = value

    initOptions(cmdLineOptions, True)
    init()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_exports(self):
        # There are no surprising symbols in the tempfile module
        dict = tempfile.__dict__

        expected = {
            "NamedTemporaryFile" : 1,
            "TemporaryFile" : 1,
            "mkstemp" : 1,
            "mkdtemp" : 1,
            "mktemp" : 1,
            "TMP_MAX" : 1,
            "gettempprefix" : 1,
            "gettempdir" : 1,
            "tempdir" : 1,
            "template" : 1,
            "SpooledTemporaryFile" : 1,
            "TemporaryDirectory" : 1,
        }

        unexp = []
        for key in dict:
            if key[0] != '_' and key not in expected:
                unexp.append(key)
        self.assertTrue(len(unexp) == 0,
                        "unexpected keys: %s" % unexp)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_properties(self):
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'x' * 10)
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+b')
        self.assertIsNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding

        f.write(b'x')
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'rb+')
        self.assertIsNotNone(f.name)
        with self.assertRaises(AttributeError):
            f.newlines
        with self.assertRaises(AttributeError):
            f.encoding
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_text_newline_and_encoding(self):
        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
                                          newline='', encoding='utf-8')
        f.write("\u039B\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n")
        self.assertFalse(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNone(f.name)
        self.assertIsNone(f.newlines)
        self.assertIsNone(f.encoding)

        f.write("\u039B" * 20 + "\r\n")
        f.seek(0)
        self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
        self.assertTrue(f._rolled)
        self.assertEqual(f.mode, 'w+')
        self.assertIsNotNone(f.name)
        self.assertIsNotNone(f.newlines)
        self.assertEqual(f.encoding, 'utf-8')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_truncate_with_size_parameter(self):
        # A SpooledTemporaryFile can be truncated to zero size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.seek(0)
        f.truncate()
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'')
        # A SpooledTemporaryFile can be truncated to a specific size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(4)
        self.assertFalse(f._rolled)
        self.assertEqual(f._file.getvalue(), b'abcd')
        # A SpooledTemporaryFile rolls over if truncated to large size
        f = tempfile.SpooledTemporaryFile(max_size=10)
        f.write(b'abcdefg\n')
        f.truncate(20)
        self.assertTrue(f._rolled)
        if has_stat:
            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
项目:linuxacademy-dl    作者:vassim    | 项目源码 | 文件源码
def decrypt(self):
        decrypted_chunk = SpooledTemporaryFile(
            max_size=self.POOL_SIZE,
            mode='wb+'
        )
        cipher = AES.new(self.key, AES.MODE_CBC, self.iv)

        next_chunk = ''
        finished = False

        while not finished:
            chunk, next_chunk = next_chunk, \
                self.chunk_stream.read(1024 * AES.block_size)

            chunk = cipher.decrypt(chunk)

            if len(next_chunk) == 0:
                chunk = self.pkcs7_reverse_padded_chunk(chunk)
                finished = True
            if chunk:
                decrypted_chunk.write(chunk)

        decrypted_chunk.seek(0)
        return decrypted_chunk
项目:skorch    作者:dnouri    | 项目源码 | 文件源码
def __setstate__(self, state):
        disable_cuda = False
        for key in self.cuda_dependent_attributes_:
            if key not in state:
                continue
            dump = state.pop(key)
            with tempfile.SpooledTemporaryFile() as f:
                f.write(dump)
                f.seek(0)
                if state['use_cuda'] and not torch.cuda.is_available():
                    disable_cuda = True
                    val = torch.load(
                        f, map_location=lambda storage, loc: storage)
                else:
                    val = torch.load(f)
            state[key] = val
        if disable_cuda:
            warnings.warn(
                "Model configured to use CUDA but no CUDA devices "
                "available. Loading on CPU instead.",
                DeviceWarning)
            state['use_cuda'] = False

        self.__dict__.update(state)
项目:qgis_resources_sharing    作者:akbargumbira    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        if environ.get('HTTP_CONTENT_ENCODING', '') == 'gzip':
            try:
                environ['wsgi.input'].tell()
                wsgi_input = environ['wsgi.input']
            except (AttributeError, IOError, NotImplementedError):
                # The gzip implementation in the standard library of Python 2.x
                # requires working '.seek()' and '.tell()' methods on the input
                # stream.  Read the data into a temporary file to work around
                # this limitation.
                wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024)
                shutil.copyfileobj(environ['wsgi.input'], wsgi_input)
                wsgi_input.seek(0)

            environ['wsgi.input'] = gzip.GzipFile(filename=None, fileobj=wsgi_input, mode='r')
            del environ['HTTP_CONTENT_ENCODING']
            if 'CONTENT_LENGTH' in environ:
                del environ['CONTENT_LENGTH']

        return self.app(environ, start_response)
项目:wsgiprox    作者:webrecorder    | 项目源码 | 文件源码
def buffer_iter(cls, orig_iter, buff_size=65536):
        out = SpooledTemporaryFile(buff_size)
        size = 0

        for buff in orig_iter:
            size += len(buff)
            out.write(buff)

        content_length_str = str(size)
        out.seek(0)

        def read_iter():
            while True:
                buff = out.read(buff_size)
                if not buff:
                    break
                yield buff

        return content_length_str, read_iter()


# ============================================================================
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def getFile(self, site, inner_path):
        # Use streamFile if client supports it
        if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310:
            return self.streamFile(site, inner_path)

        location = 0
        if config.use_tempfiles:
            buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
        else:
            buff = StringIO()

        s = time.time()
        while True:  # Read in 512k parts
            res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})

            if not res or "body" not in res:  # Error
                return False

            buff.write(res["body"])
            res["body"] = None  # Save memory
            if res["location"] == res["size"]:  # End of file
                break
            else:
                location = res["location"]

        self.download_bytes += res["location"]
        self.download_time += (time.time() - s)
        if self.site:
            self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
        buff.seek(0)
        return buff

    # Download file out of msgpack context to save memory and cpu
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def streamFile(self, site, inner_path):
        location = 0
        if config.use_tempfiles:
            buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
        else:
            buff = StringIO()

        s = time.time()
        while True:  # Read in 512k parts
            res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)

            if not res:  # Error
                self.log("Invalid response: %s" % res)
                return False

            if res["location"] == res["size"]:  # End of file
                break
            else:
                location = res["location"]

        self.download_bytes += res["location"]
        self.download_time += (time.time() - s)
        self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
        buff.seek(0)
        return buff

    # Send a ping request
项目:scratchdir    作者:ahawker    | 项目源码 | 文件源码
def spooled(self, max_size: int = 0, mode: str = 'w+b', buffering: int = -1,
                encoding: typing.Optional[str] = None, newline: typing.Optional[str] = None,
                suffix: typing.Optional[str] = DEFAULT_SUFFIX, prefix: typing.Optional[str] = DEFAULT_PREFIX,
                dir: typing.Optional[str] = None) -> typing.IO:
        """
        Create a new spooled temporary file within the scratch dir.

        This returns a :class:`~tempfile.SpooledTemporaryFile` which is a specialized object that wraps a
        :class:`StringIO`/:class:`BytesIO` instance that transparently overflows into a file on the disk once it
        reaches a certain size.

        By default, a spooled file will never roll over to disk.

        :param max_size: (Optional) max size before the in-memory buffer rolls over to disk
        :type max_size: :class:`~int`
        :param mode: (Optional) mode to open the file with
        :type mode: :class:`~str`
        :param buffering: (Optional) size of the file buffer
        :type buffering: :class:`~int`
        :param encoding: (Optional) encoding to open the file with
        :type encoding: :class:`~str`
        :param newline: (Optional) newline argument to open the file with
        :type newline: :class:`~str` or :class:`~NoneType`
        :param suffix: (Optional) filename suffix
        :type suffix: :class:`~str` or :class:`~NoneType`
        :param prefix: (Optional) filename prefix
        :type prefix: :class:`~str` or :class:`~NoneType`
        :param dir: (Optional) relative path to directory within the scratch dir where the file should exist
        :type dir: :class:`~bool`
        :return: SpooledTemporaryFile instance
        :rtype: :class:`~tempfile.SpooledTemporaryFile`
        """
        return tempfile.SpooledTemporaryFile(max_size, mode, buffering, encoding,
                                             newline, suffix, prefix, self.join(dir))
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def _create_temp_file(cls):
        return tempfile.SpooledTemporaryFile(max_size=512*1024)


# ============================================================================
项目:pymsc    作者:jam1garner    | 项目源码 | 文件源码
def readFromBytes(self, b, headerEndianess='>'):
        with tempfile.SpooledTemporaryFile(mode='w+b') as f:
            f.write(b)
            f.seek(0)
            self.readFromFile(f, headerEndianess)
项目:archive-Holmes-Totem-Service-Library    作者:HolmesProcessing    | 项目源码 | 文件源码
def __enter__(self):
        """
        Create the temporary file in memory first, when it uses too much memory
        it is automatically relocated to the filesystem.
        """
        self.file = tempfile.SpooledTemporaryFile(max_size=self.max_size)
        return self.file
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def generate_hash(self):
        """Requests the image as found in `url` and generates a perceptual_hash from it"""
        # This is slow: it has to get the image and spool it to a tempfile, then compute
        # the hash
        return None
#        req = requests.get(self.url)
#        if req.status_code == 200:
#            buff = tempfile.SpooledTemporaryFile(max_size=1e9)
#            downloaded = 0
#            filesize = int(req.headers.get('content-length', 1000))  # Set a default length for the test client
#            for chunk in req.iter_content():
#                downloaded += len(chunk)
#                buff.write(chunk)
#            buff.seek(0)
#            im = PillowImage.open(io.BytesIO(buff.read()))
#            return str(imagehash.average_hash(im))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def do_create(self, max_size=0, dir=None, pre="", suf=""):
        if dir is None:
            dir = tempfile.gettempdir()
        try:
            file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
        except:
            self.failOnException("SpooledTemporaryFile")

        return file
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_basic(self):
        # SpooledTemporaryFile can create files
        f = self.do_create()
        self.assertFalse(f._rolled)
        f = self.do_create(max_size=100, pre="a", suf=".txt")
        self.assertFalse(f._rolled)