我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.NamedTemporaryFile()。
def import_key(keyid): key = keyid.strip() if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and key.endswith('-----END PGP PUBLIC KEY BLOCK-----')): juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG) juju_log("Importing ASCII Armor PGP key", level=DEBUG) with tempfile.NamedTemporaryFile() as keyfile: with open(keyfile.name, 'w') as fd: fd.write(key) fd.write("\n") cmd = ['apt-key', 'add', keyfile.name] try: subprocess.check_call(cmd) except subprocess.CalledProcessError: error_out("Error importing PGP key '%s'" % key) else: juju_log("PGP key found (looks like Radix64 format)", level=DEBUG) juju_log("Importing PGP key from keyserver", level=DEBUG) cmd = ['apt-key', 'adv', '--keyserver', 'hkp://keyserver.ubuntu.com:80', '--recv-keys', key] try: subprocess.check_call(cmd) except subprocess.CalledProcessError: error_out("Error importing PGP key '%s'" % key)
def dot2graph(self, dot, format='svg'): # windows ??????????????????? # ?????? NamedTemporaryFile ????? with NamedTemporaryFile(delete=False) as dotfile: dotfile.write(dot) outfile = NamedTemporaryFile(delete=False) os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % ( dotfile.name, outfile.name, format)) result = outfile.read() outfile.close() os.unlink(dotfile.name) os.unlink(outfile.name) return result
def test_rs3topng(): """rs3 file is converted to PNG""" png_str = rstviewer.rs3topng(RS3_FILEPATH) temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp.close() rstviewer.rs3topng(RS3_FILEPATH, temp.name) with open(temp.name, 'r') as png_file: assert png_str == png_file.read() os.unlink(temp.name) # generated images might not be 100% identical, probably # because of the font used with open(EXPECTED_PNG1, 'r') as expected_png_file: ident1 = png_str == expected_png_file.read() with open(EXPECTED_PNG2, 'r') as expected_png_file: ident2 = png_str == expected_png_file.read() assert ident1 or ident2
def test_cli_rs3topng(): """conversion to PNG on the commandline""" temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp_png.close() # calling `rstviewer -f png input.rs3 output.png` will end the program # with sys.exit(0), so we'll have to catch this here. with pytest.raises(SystemExit) as serr: cli(['-f', 'png', RS3_FILEPATH, temp_png.name]) out, err = pytest.capsys.readouterr() assert err == 0 with open(temp_png.name, 'r') as png_file: png_str = png_file.read() os.unlink(temp_png.name) # generated images might not be 100% identical, probably # because of the font used with open(EXPECTED_PNG1, 'r') as expected_png_file: ident1 = png_str == expected_png_file.read() with open(EXPECTED_PNG2, 'r') as expected_png_file: ident2 = png_str == expected_png_file.read() assert ident1 or ident2
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY self.clientConfig.APP_NAME = APP_NAME
def _atomic_write(filename): path = os.path.dirname(filename) try: file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+") yield file file.flush() os.fsync(file.fileno()) os.rename(file.name, filename) finally: try: os.remove(file.name) except OSError as e: if e.errno == 2: pass else: raise e
def upload_to_fileshare_test(self): #pylint: disable=no-self-use """Upload copies files to non-native store correctly with no progress""" import shutil import tempfile temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp()) temp_src_dir = os.path.dirname(temp_file.name) temp_dst_dir = tempfile.mkdtemp() shutil_mock = MagicMock() shutil_mock.copyfile.return_value = None with patch('sfctl.custom_app.shutil', new=shutil_mock): sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False) shutil_mock.copyfile.assert_called_once() temp_file.close() shutil.rmtree(os.path.dirname(temp_file.name)) shutil.rmtree(temp_dst_dir)
def _create_pdf_pdftk(self): with NamedTemporaryFile( mode='wb+', prefix='geo-pyprint_', delete=True ) as map_image_file: self._map_image.save(map_image_file, 'PNG') map_image_file.flush() # TODO: use the configuration to select the template # TODO: use the configuration to select the name of the key in the template pdfjinja = PdfJinja('pdfjinja-template.pdf') pdfout = pdfjinja(dict(map=map_image_file.name)) with NamedTemporaryFile( mode='wb+', prefix='geo-pyprint_', suffix='.pdf', delete=False ) as output_file: pdfout.write(output_file) output_file.flush() return output_file.name
def filter_region(view, txt, command): try: contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False) contents.write(txt.encode('utf-8')) contents.close() script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False) script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8')) script.close() p = subprocess.Popen([script.name], stdout=PIPE, stderr=PIPE, startupinfo=get_startup_info()) out, err = p.communicate() return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip() finally: os.remove(script.name) os.remove(contents.name)
def test_scp(event_loop): async with base.CleanModel() as model: await model.add_machine() await asyncio.wait_for( model.block_until(lambda: model.machines), timeout=240) machine = model.machines['0'] await asyncio.wait_for( model.block_until(lambda: (machine.status == 'running' and machine.agent_status == 'started')), timeout=480) with NamedTemporaryFile() as f: f.write(b'testcontents') f.flush() await machine.scp_to(f.name, 'testfile') with NamedTemporaryFile() as f: await machine.scp_from('testfile', f.name) assert f.read() == b'testcontents'
def test_scp(event_loop): async with base.CleanModel() as model: app = await model.deploy('ubuntu') await asyncio.wait_for( model.block_until(lambda: app.units), timeout=60) unit = app.units[0] await asyncio.wait_for( model.block_until(lambda: unit.machine), timeout=60) machine = unit.machine await asyncio.wait_for( model.block_until(lambda: (machine.status == 'running' and machine.agent_status == 'started')), timeout=480) with NamedTemporaryFile() as f: f.write(b'testcontents') f.flush() await unit.scp_to(f.name, 'testfile') with NamedTemporaryFile() as f: await unit.scp_from('testfile', f.name) assert f.read() == b'testcontents'
def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. This will automatically generate an archive from the charm dir. :param charm_dir: Path to the charm directory :param series: Charm series """ fh = tempfile.NamedTemporaryFile() CharmArchiveGenerator(charm_dir).make_archive(fh.name) with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url
def test_load_config_file(self): with NamedTemporaryFile(mode='w+t') as f: f.write("user_a:password_a:role_a,role_b\n") f.write("user_b:password_b:role_b,role_c\n") f.write("user_c:password_c:role_c,role_c\n") f.flush() self.auth.load_from_file(f.name) # Assert user equality. self.assertEqual(self.auth.users, { 'user_a': 'password_a', 'user_b': 'password_b', 'user_c': 'password_c' }) # Assert role equality. self.assertEqual(self.auth.roles, { 'user_a': set(('role_a', 'role_b')), 'user_b': set(('role_b', 'role_c')), 'user_c': set(('role_c', 'role_c')) })
def test_save_config_file(self): self.auth.add_user('user_a', 'password_a') self.auth.add_roles('user_a', ('role_a', 'role_b')) self.auth.add_user('user_b', 'password_b') self.auth.add_roles('user_b', ('role_b', 'role_c')) self.auth.add_user('user_c', 'password_c') self.auth.add_roles('user_c', ('role_c', 'role_c')) with NamedTemporaryFile(mode='w+t') as f: self.auth.save_to_file(f.name) expected = [ "user_a:password_a:role_a,role_b", "user_b:password_b:role_b,role_c", "user_c:password_c:role_c" ] for a, e in zip(f.readlines(), expected): self.assertEqual(a.strip(), e)
def create_temp_parallel_data(sources, targets): """ Creates a temporary TFRecords file. Args: source: List of source sentences target: List of target sentences Returns: A tuple (sources_file, targets_file). """ file_source = tempfile.NamedTemporaryFile() file_target = tempfile.NamedTemporaryFile() file_source.write("\n".join(sources).encode("utf-8")) file_source.flush() file_target.write("\n".join(targets).encode("utf-8")) file_target.flush() return file_source, file_target
def create_temp_tfrecords(sources, targets): """ Creates a temporary TFRecords file. Args: source: List of source sentences target: List of target sentences Returns: A tuple (sources_file, targets_file). """ output_file = tempfile.NamedTemporaryFile() writer = tf.python_io.TFRecordWriter(output_file.name) for source, target in zip(sources, targets): ex = tf.train.Example() #pylint: disable=E1101 ex.features.feature["source"].bytes_list.value.extend( [source.encode("utf-8")]) ex.features.feature["target"].bytes_list.value.extend( [target.encode("utf-8")]) writer.write(ex.SerializeToString()) writer.close() return output_file
def create_temporary_vocab_file(words, counts=None): """ Creates a temporary vocabulary file. Args: words: List of words in the vocabulary Returns: A temporary file object with one word per line """ vocab_file = tempfile.NamedTemporaryFile() if counts is None: for token in words: vocab_file.write((token + "\n").encode("utf-8")) else: for token, count in zip(words, counts): vocab_file.write("{}\t{}\n".format(token, count).encode("utf-8")) vocab_file.flush() return vocab_file
def test_save_svgz_filename(): import gzip qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False) f.close() qr.save(f.name) f = open(f.name, mode='rb') expected = b'\x1f\x8b\x08' # gzip magic number val = f.read(len(expected)) f.close() f = gzip.open(f.name) try: content = f.read(6) finally: f.close() os.unlink(f.name) assert expected == val assert b'<?xml ' == content
def test_write_unicode_filename(): qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False) f.close() title = 'mürrische Mädchen' desc = '?' qr.save(f.name, title=title, desc=desc) f = open(f.name, mode='rb') root = _parse_xml(f) f.seek(0) val = f.read(6) f.close() os.unlink(f.name) assert b'<?xml ' == val assert title == _get_title(root).text assert desc == _get_desc(root).text
def test_infile_outfile(self): with tempfile.NamedTemporaryFile() as infile: infile.write(self.data.encode()) infile.flush() # outfile will get overwritten by tool, so the delete # may not work on some platforms. Do it manually. outfile = tempfile.NamedTemporaryFile() try: self.assertEqual( self.runTool(args=[infile.name, outfile.name]), ''.encode()) with open(outfile.name, 'rb') as f: self.assertEqual(f.read(), self.expect.encode()) finally: outfile.close() if os.path.exists(outfile.name): os.unlink(outfile.name)
def testRewriteFile(self): """Changing the file content should change the hash sum""" with NamedTemporaryFile() as index: with TemporaryDirectory() as tmp: with open(os.path.join(tmp, "foo"), 'wb') as f: f.write(b'abc') sum1 = hashDirectory(tmp, index.name) with open(index.name, "rb") as f: assert f.read(4) == b'BOB1' with open(os.path.join(tmp, "foo"), 'wb') as f: f.write(b'qwer') sum2 = hashDirectory(tmp, index.name) with open(index.name, "rb") as f: assert f.read(4) == b'BOB1' assert sum1 != sum2
def __init__(self, *args, **kwds): ## Create shared memory for rendered image #pg.dbg(namespace={'r': self}) if sys.platform.startswith('win'): self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)]) self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows else: self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_') self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1)) fd = self.shmFile.fileno() self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE) atexit.register(self.close) GraphicsView.__init__(self, *args, **kwds) self.scene().changed.connect(self.update) self.img = None self.renderTimer = QtCore.QTimer() self.renderTimer.timeout.connect(self.renderView) self.renderTimer.start(16)
def test_plotscene(): tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name print("using %s as a temporary file" % tempfilename) pg.setConfigOption('foreground', (0,0,0)) w = pg.GraphicsWindow() w.show() p1 = w.addPlot() p2 = w.addPlot() p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'}) p1.setXRange(0,5) p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3}) app.processEvents() app.processEvents() ex = pg.exporters.SVGExporter(w.scene()) ex.export(fileName=tempfilename) # clean up after the test is done os.unlink(tempfilename)
def test_css(Chart): """Test css file option""" css = "{{ id }}text { fill: #bedead; }\n" with NamedTemporaryFile('w') as f: f.write(css) f.flush() config = Config() config.css.append('file://' + f.name) chart = Chart(config) chart.add('/', [10, 1, 5]) svg = chart.render().decode('utf-8') assert '#bedead' in svg chart = Chart(css=(_ellipsis, 'file://' + f.name)) chart.add('/', [10, 1, 5]) svg = chart.render().decode('utf-8') assert '#bedead' in svg
def copy(contents, config=None, destination_dir=False, **kwargs): if config is None: config = Config(xyz='123') with NamedTemporaryFile('w', delete=False) as tp: tp.write(contents) source = tp.name if destination_dir: with TemporaryDirectory() as destination: path = copy_file(config, source, destination, **kwargs) yield source, destination, path os.remove(source) else: destination = source + '.copy' path = copy_file(config, source, destination, **kwargs) yield source, destination, path os.remove(source) os.remove(path)
def edit(filename=None, contents=None): editor = get_editor() args = get_editor_args(os.path.basename(os.path.realpath(editor))) args = [editor] + args.split(' ') if filename is None: tmp = tempfile.NamedTemporaryFile() filename = tmp.name if contents is not None: with open(filename, mode='wb') as f: f.write(contents) args += [filename] proc = subprocess.Popen(args, close_fds=True) proc.communicate() with open(filename, mode='rb') as f: return f.read()
def test_simple_case(self): ticket_url = "http://ticket.com" data_url = "http://data.url.com" headers = {"a": "a", "xyz": "ghj"} ticket = {"htsget": { "urls": [{"url": data_url, "headers": headers}]}} data = b"0" * 1024 returned_response = MockedResponse(json.dumps(ticket).encode(), data) with mock.patch("requests.get", return_value=returned_response) as mocked_get: with tempfile.NamedTemporaryFile("wb+") as f: htsget.get(ticket_url, f) f.seek(0) self.assertEqual(f.read(), data) self.assertEqual(mocked_get.call_count, 2) # Note that we only get the arguments for the last call using this method. args, kwargs = mocked_get.call_args self.assertEqual(args[0], data_url) self.assertEqual(kwargs["headers"], headers) self.assertEqual(kwargs["stream"], True)
def test_bearer_token(self): ticket_url = "http://ticket.com" ticket = {"htsget": {"urls": []}} bearer_token = "x" * 1024 returned_response = MockedTicketResponse(json.dumps(ticket).encode()) with mock.patch("requests.get", return_value=returned_response) as mocked_get: with tempfile.NamedTemporaryFile("wb+") as f: htsget.get(ticket_url, f, bearer_token=bearer_token) f.seek(0) self.assertEqual(f.read(), b"") # Because we have no URLs in the returned ticked, it should be called # only once. self.assertEqual(mocked_get.call_count, 1) # Note that we only get the arguments for the last call using this method. args, kwargs = mocked_get.call_args self.assertEqual(args[0], ticket_url) headers = {"Authorization": "Bearer {}".format(bearer_token)} self.assertEqual(kwargs["headers"], headers) self.assertEqual(kwargs["stream"], True)
def test_no_bearer_token(self): ticket_url = "http://ticket.com" ticket = {"htsget": {"urls": []}} returned_response = MockedTicketResponse(json.dumps(ticket).encode()) with mock.patch("requests.get", return_value=returned_response) as mocked_get: with tempfile.NamedTemporaryFile("wb+") as f: htsget.get(ticket_url, f) f.seek(0) self.assertEqual(f.read(), b"") # Because we have no URLs in the returned ticked, it should be called # only once. self.assertEqual(mocked_get.call_count, 1) # Note that we only get the arguments for the last call using this method. args, kwargs = mocked_get.call_args self.assertEqual(args[0], ticket_url) headers = {} self.assertEqual(kwargs["headers"], headers) self.assertEqual(kwargs["stream"], True)
def test_ticket_char_by_char(self): # Tests the streaming code for the ticket response. ticket_url = "http://ticket.com" ticket = {"htsget": {"urls": []}, "padding": "X" * 10} returned_response = MockedTicketResponse( json.dumps(ticket).encode(), char_by_char=True) with mock.patch("requests.get", return_value=returned_response) as mocked_get: with tempfile.NamedTemporaryFile("wb+") as f: htsget.get(ticket_url, f) f.seek(0) self.assertEqual(f.read(), b"") # Because we have no URLs in the returned ticked, it should be called # only once. self.assertEqual(mocked_get.call_count, 1) # Note that we only get the arguments for the last call using this method. args, kwargs = mocked_get.call_args self.assertEqual(args[0], ticket_url) headers = {} self.assertEqual(kwargs["headers"], headers) self.assertEqual(kwargs["stream"], True)
def pyrun(src): """Run python code 'src' in a separate interpreter. Return subprocess exit code. """ if PY3: src = bytes(src, 'ascii') with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as f: f.write(src) f.flush() test_files.append(f.name) code = subprocess.call( [sys.executable, f.name], stdout=None, stderr=None, # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP ) return code
def test_inputs_from_file(self): """Run a task with base64 inputs in a file.""" a = tempfile.NamedTemporaryFile() b = tempfile.NamedTemporaryFile() convert( 'python', {'format': 'object', 'data': (0, 1)}, {'format': 'pickle.base64', 'mode': 'local', 'path': a.name} ) convert( 'python', {'format': 'object', 'data': 2}, {'format': 'pickle.base64', 'mode': 'local', 'path': b.name} ) outputs = self.run_basic_task({ 'a': {'format': 'pickle.base64', 'mode': 'local', 'path': a.name}, 'b': {'format': 'pickle.base64', 'mode': 'local', 'path': b.name} }) self.assertEqual(outputs.get('c'), (0, 1, 0, 1)) self.assertEqual(outputs.get('d'), 4)
def _inline_fetch(spec, **kwargs): taskInput = kwargs.get('task_input', {}) target = taskInput.get('target', 'memory') if target == 'filepath': # Ensure we have a trailing slash tmpDir = os.path.join(kwargs['_tempdir'], '') if 'filename' in taskInput: filename = taskInput['filename'] path = os.path.join(tmpDir, filename) with open(path, 'wb') as out: out.write(spec['data']) else: with tempfile.NamedTemporaryFile( 'wb', prefix=tmpDir, delete=False) as out: out.write(spec['data']) path = out.name return path elif target == 'memory': return spec['data'] else: raise Exception('Invalid fetch target: ' + target)
def test_instantiate(self): try: tmpdir = mkdtemp() tmpfile = NamedTemporaryFile('w+t', dir=tmpdir) data = {'type': 'string'} tmpfile.write(json.dumps(data)) tmpfile.seek(0) # load from string schema = JSONSchema.loads(json.dumps(data)) self.assertEqual(data, schema.attributes) # load from readable object schema = JSONSchema.load(tmpfile) self.assertEqual(data, schema.attributes) # load from file schema = JSONSchema.loadfromfile(tmpfile.name) self.assertEqual(data, schema.attributes) finally: tmpfile.close() rmtree(tmpdir)
def test_read_write(self): # Some smoke tests whether reading, writing, reading alters makes the # configspace incomparable this_file = os.path.abspath(__file__) this_directory = os.path.dirname(this_file) configuration_space_path = os.path.join(this_directory, "..", "test_searchspaces") configuration_space_path = os.path.abspath(configuration_space_path) configuration_space_path = os.path.join(configuration_space_path, "spear-params-mixed.pcs") with open(configuration_space_path) as fh: cs = pcs.read(fh) tf = tempfile.NamedTemporaryFile() name = tf.name tf.close() with open(name, 'w') as fh: pcs_string = pcs.write(cs) fh.write(pcs_string) with open(name, 'r') as fh: pcs_new = pcs.read(fh) self.assertEqual(pcs_new, cs, msg=(pcs_new, cs))
def update_and_hash(arch, contents): ''' Save contents to archive ``arch`` and return the DataAPI's hash value ''' f = tempfile.NamedTemporaryFile(delete=False) try: f.write(contents) f.close() apihash = arch.api.hash_file(f.name)['checksum'] arch.update(f.name) finally: os.remove(f.name) return apihash
def image_to_pdf(self, img, pdf_path=None, **kwargs): """ Convert image to pdf. :param img: image file opened by PIL :param pdf_path: path to save pdf :param kwargs: any parameter accepted by Image.save i.e. quality :return: """ processor = ResizeToFit(width=self.max_size_in_pixels[0], height=self.max_size_in_pixels[1]) img = processor.process(img) # Create a white canvas and paste the image final_img_width = min(img.size[0], self.max_size_in_pixels[0]) final_img_height = min(img.size[1], self.max_size_in_pixels[1]) tmp_image = Image.new("RGB", (final_img_width, final_img_height), "white") margin_left = 0 margin_top = 0 tmp_image.paste(img, (margin_left, margin_top, final_img_width, final_img_height)) # Save the image as .pdf file if not pdf_path: f = NamedTemporaryFile(delete=False) pdf_path = f.name tmp_image.save(pdf_path, "PDF", resolution=100.0, **kwargs) return pdf_path
def prepend(line, path): """ Appends *line* to the _beginning_ of the file at the given *path*. If *line* doesn't end in a newline one will be appended to the end of it. """ if isinstance(line, str): line = line.encode('utf-8') if not line.endswith(b'\n'): line += b'\n' temp = tempfile.NamedTemporaryFile('wb') temp_name = temp.name # We really only need a random path-safe name temp.close() with open(temp_name, 'wb') as temp: temp.write(line) with open(path, 'rb') as r: temp.write(r.read()) # Now replace the original with the modified version shutil.move(temp_name, path)
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) json_task_generator = self._respond_json(ty, project.id) if json_task_generator is not None: datafile = tempfile.NamedTemporaryFile() try: datafile.write(json.dumps(json_task_generator)) datafile.flush() zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) csv_task_generator = self._respond_csv(ty, project.id) if csv_task_generator is not None: # TODO: use temp file from csv generation directly datafile = tempfile.NamedTemporaryFile() try: for line in csv_task_generator: datafile.write(str(line)) datafile.flush() csv_task_generator.close() # delete temp csv file zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write( datafile.name, secure_filename('%s_%s.csv' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage( filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def test_UnicodeWriter(self): """Test UnicodeWriter class works.""" tmp = tempfile.NamedTemporaryFile() uw = util.UnicodeWriter(tmp) fake_csv = ['one, two, three, {"i": 1}'] for row in csv.reader(fake_csv): # change it for a dict row[3] = dict(i=1) uw.writerow(row) tmp.seek(0) err_msg = "It should be the same CSV content" with open(tmp.name, 'rb') as f: reader = csv.reader(f) for row in reader: for item in row: assert item in fake_csv[0], err_msg
def set_arguments(arguments, _view): version = get_version(_view).host_version if 'gdb' in version: # Voltron doesn't like commands that aren't UTF-8, but for exploit work we're going to need # arbitary byte support. A named temporary file that we can source commands from is the best # solution I've come up with so far, despite the fact that it's inelegant. with tempfile.NamedTemporaryFile() as tempf: tempf.write('set args ') tempf.write(arguments) tempf.write('\n') tempf.flush() binjatron.custom_request("command", _build_command_dict("source " + tempf.name)) elif 'lldb' in version: with tempfile.NamedTemporaryFile() as tempf: tempf.write('settings set target.run-args ') tempf.write(arguments) tempf.write('\n') tempf.flush() binjatron.custom_request("command", _build_command_dict("command source " + tempf.name))
def gen_sudo_script(role_list, sudo_list): # receive role_list = [role1, role2] sudo_list = [sudo1, sudo2] # return sudo_alias={'NETWORK': '/sbin/ifconfig, /ls'} sudo_user={'user1': ['NETWORK', 'SYSTEM']} sudo_alias = {} sudo_user = {} for sudo in sudo_list: sudo_alias[sudo.name] = sudo.commands for role in role_list: sudo_user[role.name] = ','.join(sudo_alias.keys()) sudo_j2 = get_template('jperm/role_sudo.j2') sudo_content = sudo_j2.render(Context({"sudo_alias": sudo_alias, "sudo_user": sudo_user})) sudo_file = NamedTemporaryFile(delete=False) sudo_file.write(sudo_content) sudo_file.close() return sudo_file.name
def test_prepare_video2(self): video_content, size, duration, thumbnail_content = media.prepare_video( self.TEST_VIDEO_PATH, max_size=(480, 480), min_size=(0, 0)) self.assertEqual(duration, self.TEST_VIDEO_DURATION, 'Duration changed.') self.assertLessEqual(size[0], 480, 'Invalid width.') self.assertLessEqual(size[1], 480, 'Invalid height.') self.assertEqual( 1.0 * size[0] / size[1], 1.0 * self.TEST_VIDEO_SIZE[0] / self.TEST_VIDEO_SIZE[1], 'Aspect ratio changed.') self.assertGreater(len(video_content), 0, 'No video content returned.') self.assertGreater(len(thumbnail_content), 0, 'No thumbnail content returned.') # Save video, thumbnail content and verify attributes video_output = tempfile.NamedTemporaryFile(prefix='ipae_test_', suffix='.mp4', delete=False) video_output.write(video_content) video_output.close() vidclip_output = VideoFileClip(video_output.name) self.assertAlmostEqual(duration, vidclip_output.duration, places=1) self.assertEqual(size[0], vidclip_output.size[0]) self.assertEqual(size[1], vidclip_output.size[1]) im = Image.open(io.BytesIO(thumbnail_content)) self.assertEqual(size[0], im.size[0]) self.assertEqual(size[1], im.size[1])
def test_remote_video(self): video_url = 'https://raw.githubusercontent.com/johndyer/mediaelement-files/master/big_buck_bunny.mp4' video_content, size, duration, thumbnail_content = media.prepare_video( video_url, aspect_ratios=1.0, max_duration=10) self.assertEqual(duration, 10.0, 'Invalid duration.') self.assertEqual(size[0], size[1], 'Invalid width/length.') self.assertGreater(len(video_content), 0, 'No video content returned.') self.assertGreater(len(thumbnail_content), 0, 'No thumbnail content returned.') # Save video, thumbnail content and verify attributes video_output = tempfile.NamedTemporaryFile(prefix='ipae_test_', suffix='.mp4', delete=False) video_output.write(video_content) video_output.close() vidclip_output = VideoFileClip(video_output.name) self.assertAlmostEqual(duration, vidclip_output.duration, places=1) self.assertEqual(size[0], vidclip_output.size[0]) self.assertEqual(size[1], vidclip_output.size[1]) im = Image.open(io.BytesIO(thumbnail_content)) self.assertEqual(size[0], im.size[0]) self.assertEqual(size[1], im.size[1])
def file_editor(content): with tempfile.NamedTemporaryFile(suffix=".roamer") as temp: if sys.version_info[0] == 3: content = content.encode('utf-8') temp.write(content) temp.flush() if EXTRA_EDITOR_COMMAND: exit_code = call([EDITOR, EXTRA_EDITOR_COMMAND, temp.name]) else: exit_code = call(EDITOR.split() + [temp.name]) if exit_code != 0: sys.exit() temp.seek(0) output = temp.read() if sys.version_info[0] == 3: output = output.decode('UTF-8') return output
def screenshot_area(): """ Screenshot an area of the screen using gnome-screenshot used to QR scan """ ink_flag = call(['which', 'gnome-screenshot'], stdout=PIPE, stderr=PIPE) if ink_flag == 0: file_name = path.join(GLib.get_tmp_dir(), NamedTemporaryFile().name) p = Popen(["gnome-screenshot", "-a", "-f", file_name], stdout=PIPE, stderr=PIPE) output, error = p.communicate() if error: error = error.decode("utf-8").split("\n") logging.error("\n".join([e for e in error])) if not path.isfile(file_name): logging.debug("The screenshot was not token") return False return file_name else: logging.error( "Couldn't find gnome-screenshot, please install it first") return False
def relation_set(relation_id=None, relation_settings=None, **kwargs): """Set relation information for the current unit""" relation_settings = relation_settings if relation_settings else {} relation_cmd_line = ['relation-set'] accepts_file = "--file" in subprocess.check_output( relation_cmd_line + ["--help"], universal_newlines=True) if relation_id is not None: relation_cmd_line.extend(('-r', relation_id)) settings = relation_settings.copy() settings.update(kwargs) for key, value in settings.items(): # Force value to be a string: it always should, but some call # sites pass in things like dicts or numbers. if value is not None: settings[key] = "{}".format(value) if accepts_file: # --file was introduced in Juju 1.23.2. Use it by default if # available, since otherwise we'll break if the relation data is # too big. Ideally we should tell relation-set to read the data from # stdin, but that feature is broken in 1.23.2: Bug #1454678. with tempfile.NamedTemporaryFile(delete=False) as settings_file: settings_file.write(yaml.safe_dump(settings).encode("utf-8")) subprocess.check_call( relation_cmd_line + ["--file", settings_file.name]) os.remove(settings_file.name) else: for key, value in settings.items(): if value is None: relation_cmd_line.append('{}='.format(key)) else: relation_cmd_line.append('{}={}'.format(key, value)) subprocess.check_call(relation_cmd_line) # Flush cache of any relation-gets for local unit flush(local_unit())