我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mktemp()。
def test_SADParser(self): sad = parsers.SADParser.parse("sdr/dom/waveforms/CommandWrapperWithPropertyOverride/CommandWrapper.sad.xml") self.assertEqual(sad.get_id(), "DCE:d206ab51-6342-4976-bac3-55e6902f3489") self.assertEqual(sad.get_name(), "CommandWrapperWithPropertyOverride") self.assertEqual(len(sad.componentfiles.get_componentfile()), 1) self.assertEqual(len(sad.partitioning.get_componentplacement()), 1) self.assertEqual(sad.partitioning.get_componentplacement()[0].componentfileref.refid, "CommandWrapper_592b8bd6-b011-4468-9417-705af45e907b") self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].id_, "DCE:8c129782-a6a4-4095-8212-757f01de0c09") self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].get_usagename(), "CommandWrapper1") self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].refid, "DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e") self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].value, "/bin/date") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") sad.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SAD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_get_swift_hash_env(self, mock_config, mock_service_name): mock_config.return_value = None mock_service_name.return_value = "testsvc" tmpfile = tempfile.mktemp() swift_context.SWIFT_HASH_FILE = tmpfile with mock.patch('lib.swift_context.os.environ.get') as mock_env_get: mock_env_get.return_value = str(uuid.uuid4()) hash_ = swift_context.get_swift_hash() mock_env_get.assert_has_calls([ mock.call('JUJU_MODEL_UUID'), mock.call('JUJU_ENV_UUID', mock_env_get.return_value) ]) with open(tmpfile, 'r') as fd: self.assertEqual(hash_, fd.read()) self.assertTrue(mock_config.called)
def generate_pdf(card): """ Make a PDF from a card :param card: dict from fetcher.py :return: Binary PDF buffer """ from eclaire.base import SPECIAL_LABELS pdf = FPDF('L', 'mm', (62, 140)) pdf.set_margins(2.8, 2.8, 2.8) pdf.set_auto_page_break(False, margin=0) pdf.add_page() font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf') pdf.add_font('Clairifont', fname=font, uni=True) pdf.set_font('Clairifont', size=48) pdf.multi_cell(0, 18, txt=card.name.upper(), align='L') qrcode = generate_qr_code(card.url) qrcode_file = mktemp(suffix='.png', prefix='trello_qr_') qrcode.save(qrcode_file) pdf.image(qrcode_file, 118, 35, 20, 20) os.unlink(qrcode_file) # May we never speak of this again. pdf.set_fill_color(255, 255, 255) pdf.rect(0, 55, 140, 20, 'F') pdf.set_font('Clairifont', '', 16) pdf.set_y(-4) labels = ', '.join([label.name for label in card.labels if label.name not in SPECIAL_LABELS]) pdf.multi_cell(0, 0, labels, 0, 'R') return pdf.output(dest='S')
def test_exit_crash(): # For each Widget subclass, run a simple python script that creates an # instance and then shuts down. The intent is to check for segmentation # faults when each script exits. tmp = tempfile.mktemp(".py") path = os.path.dirname(pg.__file__) initArgs = { 'CheckTable': "[]", 'ProgressDialog': '"msg"', 'VerticalLabel': '"msg"', } for name in dir(pg): obj = getattr(pg, name) if not isinstance(obj, type) or not issubclass(obj, pg.QtGui.QWidget): continue print(name) argstr = initArgs.get(name, "") open(tmp, 'w').write(code.format(path=path, classname=name, args=argstr)) proc = subprocess.Popen([sys.executable, tmp]) assert proc.wait() == 0 os.remove(tmp)
def _smcra_to_str(self, smcra, temp_dir='/tmp/'): """ WHATIF's input are PDB format files. Converts a SMCRA object to a PDB formatted string. """ temp_path = tempfile.mktemp( '.pdb', dir=temp_dir ) io = PDBIO() io.set_structure(smcra) io.save(temp_path) f = open(temp_path, 'r') string = f.read() f.close() os.remove(temp_path) return string
def test_binary(self, enable_randomness=True, times=1, timeout=15): """ Test the binary generated """ # dump the binary code pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-') self.dump_binary(filename=pov_binary_filename) os.chmod(pov_binary_filename, 0755) pov_tester = CGCPovSimulator() result = pov_tester.test_binary_pov( pov_binary_filename, self.crash.binary, enable_randomness=enable_randomness, timeout=timeout, times=times) # remove the generated pov os.remove(pov_binary_filename) return result
def generate_report(self, register_setters, leakers): stat_name = tempfile.mktemp(dir=".", prefix='rex-results-') l.info("exploitation report being written to '%s'", stat_name) f = open(stat_name, 'w') f.write("Binary %s:\n" % os.path.basename(self.crash.project.filename)) f.write("Register setting exploits:\n") for register_setter in register_setters: f.write("\t%s\n" % str(register_setter)) f.write("\n") f.write("Leaker exploits:\n") for leaker in leakers: f.write("\t%s\n" % str(leaker)) f.close()
def start_docker_compose(clients=1): inline_logs = conftest.inline_logs docker_compose_cmd("up -d") if clients > 1: docker_compose_cmd("scale mender-client=%d" % clients) if inline_logs: docker_compose_cmd("logs -f &") else: tfile = tempfile.mktemp("mender_testing") docker_compose_cmd("logs -f --no-color > %s 2>&1 &" % tfile) logger.info("docker-compose log file stored here: %s" % tfile) log_files.append(tfile) ssh_is_opened() common.set_setup_type(common.ST_OneClient)
def tearDown(self): """ Clean up any files or directories created using L{TestCase.mktemp}. Subclasses must invoke this method if they override it or the cleanup will not occur. """ if self._temporaryFiles is not None: for temp in self._temporaryFiles: if os.path.isdir(temp): shutil.rmtree(temp) elif os.path.exists(temp): os.unlink(temp) try: _exception_from_error_queue() except Error, e: if e.args != ([],): self.fail("Left over errors in OpenSSL error queue: " + repr(e))
def _getBatchOutput(self, f): fn = tempfile.mktemp() open(fn, 'w').write(f) port = self.server.getHost().port cmds = ('-p %i -l testuser ' '-K unix ' '-a ' '-v -b %s 127.0.0.1') % (port, fn) cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:] log.msg('running %s %s' % (sys.executable, cmds)) env = os.environ.copy() env['PYTHONPATH'] = os.pathsep.join(sys.path) self.server.factory.expectedLoseConnection = 1 d = getProcessOutputAndValue(sys.executable, cmds, env=env) def _cleanup(res): os.remove(fn) return res d.addCallback(lambda res: res[0]) d.addBoth(_cleanup) return d
def loopbackUNIX(server, client, noisy=True): """Run session between server and client protocol instances over UNIX socket.""" path = tempfile.mktemp() from twisted.internet import reactor f = policies.WrappingFactory(protocol.Factory()) serverWrapper = _FireOnClose(f, server) f.noisy = noisy f.buildProtocol = lambda addr: serverWrapper serverPort = reactor.listenUNIX(path, f) clientF = LoopbackClientFactory(client) clientF.noisy = noisy reactor.connectUNIX(path, clientF) d = clientF.deferred d.addCallback(lambda x: serverWrapper.deferred) d.addCallback(lambda x: serverPort.stopListening()) return d
def get_all_species(self): import tempfile outfile = tempfile.mktemp() + '.txt.gz' try: self.logger.info('Downloading "species.txt.gz"...') out_f = open(outfile, 'wb') ftp = FTP(self.__class__.ENSEMBL_FTP_HOST) ftp.login() species_file = '/pub/release-%s/mysql/ensembl_production_%s/species.txt.gz' % (self.release, self.release) ftp.retrbinary("RETR " + species_file, out_f.write) out_f.close() self.logger.info('Done.') #load saved file self.logger.info('Parsing "species.txt.gz"...') species_li = tab2list(outfile, (1, 2, 7), header=0) # db_name,common_name,taxid species_li = [x[:-1] + [is_int(x[-1]) and int(x[-1]) or None] for x in species_li] # as of ensembl 87, there are also mouse strains. keep only the "original" one species_li = [s for s in species_li if not s[0].startswith("mus_musculus_")] self.logger.info('Done.') finally: os.remove(outfile) pass return species_li
def __init__(self, in_file, exception_handler, bug_handler, copy = None, run_level = 1, ): self.__file = in_file self.__bug_handler = bug_handler self.__copy = copy self.__run_level = run_level self.__write_to = tempfile.mktemp() self.__exception_handler = exception_handler self.__bug_handler = bug_handler self.__state = 'outside' self.__utf_exp = re.compile(r'&#x(.*?);')
def __init__(self, in_file, bug_handler, out_file, copy = None, orig_file = None, run_level = 1, ): self.__file = in_file self.__bug_handler = bug_handler self.__copy = copy self.__run_level = run_level self.__write_to = tempfile.mktemp() self.__bracket_count = 0 self.__ob_count = 0 self.__cb_count = 0 self.__pict_count = 0 self.__in_pict = 0 self.__already_found_pict = 0 self.__orig_file = orig_file self.__initiate_pict_dict() self.__out_file = out_file # this is left over self.__no_ask = 1
def __init__(self, in_file , bug_handler, copy = None, run_level = 1, ): self.__file = in_file self.__bug_handler = bug_handler self.__copy = copy self.__write_to = tempfile.mktemp() self.__bracket_count=0 self.__ob_count = 0 self.__cb_count = 0 self.__after_asterisk = 0 self.__delete = 0 self.__initiate_allow() self.__ob = 0 self.__write_cb = 0 self.__run_level = run_level self.__found_delete = 0 self.__list = 0
def __test_bad(self): paths = os.listdir(self.__invalid_dir) for path in paths: path = os.path.join(self.__invalid_dir, path) filename, ext = os.path.splitext(path) if ext != '.rtf': continue if os.path.isdir(path): continue new_filename = tempfile.mktemp() out_path = os.path.join(self.__out_dir, new_filename) status, msg = self.__run_script(in_file = path, out_file = out_path) if not status: info = [path, 'Should have produced an error'] self.__error.append(info) try: os.remove(out_path) except OSError: pass
def test_smrt_archiver_lasttime(): tmpfile = tempfile.mktemp() archiver = Archiver(dbfile=tmpfile) rule = 'test/smrt/rules/archiver.yml' feed = 'lasttime' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) > 0 f = {i.indicator: i.__dict__() for i in x} assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) == 0
def test_smrt_archiver_firsttime(): tmpfile = tempfile.mktemp() archiver = Archiver(dbfile=tmpfile) rule = 'test/smrt/rules/archiver.yml' feed = 'firsttime' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) > 0 f = {i.indicator: i.__dict__() for i in x} assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) == 0
def test_smrt_archiver_both(): tmpfile = tempfile.mktemp() archiver = Archiver(dbfile=tmpfile) rule = 'test/smrt/rules/archiver.yml' feed = 'both' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) > 0 f = {i.indicator: i.__dict__() for i in x} assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) == 0
def test_smrt_archiver_neither(): tmpfile = tempfile.mktemp() archiver = Archiver(dbfile=tmpfile) rule = 'test/smrt/rules/archiver.yml' feed = 'neither' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) > 0 f = {i.indicator: i.__dict__() for i in x} assert f['216.243.31.2'].get('lasttime') is None with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) == 0
def test_smrt_archiver_lasttime_clear(): tmpfile = tempfile.mktemp() archiver = Archiver(dbfile=tmpfile) rule = 'test/smrt/rules/archiver.yml' feed = 'lasttime' with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) > 0 f = {i.indicator: i.__dict__() for i in x} assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z' archiver.clear_memcache() with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s: assert type(s) is Smrt for r, f in s.load_feeds(rule, feed=feed): x = list(s.process(r, f)) assert len(x) == 0
def get_local_filename(self): """ get_local_filename() If the filename is an existing file on this filesystem, return that. Otherwise a temporary file is created on the local file system which can be used by the format to read from or write to. """ if self._uri_type == URI_FILENAME: return self._filename else: # Get filename ext = os.path.splitext(self._filename)[1] self._filename_local = tempfile.mktemp(ext, 'imageio_') # Write stuff to it? if self.mode[0] == 'r': with open(self._filename_local, 'wb') as file: shutil.copyfileobj(self.get_file(), file) return self._filename_local
def test_version_2_0_memmap(): # requires more than 2 byte for header dt = [(("%d" % i) * 100, float) for i in range(500)] d = np.ones(1000, dtype=dt) tf = tempfile.mktemp('', 'mmap', dir=tempdir) # 1.0 requested but data cannot be saved this way assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype, shape=d.shape, version=(1, 0)) ma = format.open_memmap(tf, mode='w+', dtype=d.dtype, shape=d.shape, version=(2, 0)) ma[...] = d del ma with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', UserWarning) ma = format.open_memmap(tf, mode='w+', dtype=d.dtype, shape=d.shape, version=None) assert_(w[0].category is UserWarning) ma[...] = d del ma ma = format.open_memmap(tf, mode='r') assert_array_equal(ma, d)
def testAll(self): output_name = tempfile.mktemp("-pycom-test") cmd = "cscript //nologo testxslt.js doesnt_matter.xml testxslt.xsl " + output_name win32com.test.util.ExecuteShellCommand(cmd, self) try: f=open(output_name) try: got = f.read() if got != expected_output: print "ERROR: XSLT expected output of %r" % (expected_output,) print "but got %r" % (got,) finally: f.close() finally: try: os.unlink(output_name) except os.error: pass
def _SetCurrent(self, image): filename = '%d.iso' % (image['timestamp']) path = os.path.join(self._image_dir, filename) current_path = os.path.join(self._image_dir, 'current') try: link = os.readlink(current_path) link_path = os.path.join(self._image_dir, link) if link_path == path: return except FileNotFoundError: pass print('Changing current link to:', filename, flush=True) temp_path = tempfile.mktemp(dir=self._image_dir) os.symlink(filename, temp_path) os.rename(temp_path, current_path)
def _write_styles_file(self): self.styles_xml = tempfile.mktemp() try: self.f = open(self.styles_xml,"wb") except IOError as msg: errmsg = "%s\n%s" % (_("Could not create %s") % self.styles_xml, msg) raise ReportError(errmsg) except: raise ReportError(_("Could not create %s") % self.styles_xml) self.f = open(self.styles_xml,"w") self.f.write('<?xml version="1.0" encoding="UTF-8"?>\n') self.f.write( '<office:document-styles ' + _XMLNS + 'office:version="1.0"> ' ) self.f.write(_STYLES_FONTS) self.f.write(_STYLES_STYLES) self.f.write(_STYLES_AUTOMATIC) self.f.write(_STYLES_MASTER) self.f.write('</office:document-styles>\n') self.f.close()
def _write_meta_file(self): self.meta_xml = tempfile.mktemp() try: self.f = open(self.meta_xml,"wb") except IOError as msg: errmsg = "%s\n%s" % (_("Could not create %s") % self.meta_xml, msg) raise ReportError(errmsg) except: raise ReportError(_("Could not create %s") % self.meta_xml) self.f = open(self.meta_xml,"w") self.f.write(_META % {'program': PROGRAM_NAME, 'version': VERSION, 'name' : self.name, 'time' : self.time, } ) self.f.close()
def __getstate__(self): '''This function is neccessary for pickling''' # Translate everything but the svm because that cannot be simply pickled. state = {} for key,value in self.__dict__.iteritems(): if key == '_model': filename = tempfile.mktemp() self._model.save(filename) buffer = open(filename).read() os.remove(filename) state[key] = buffer continue state[key] = value return state
def test_resampling_value(shapefile, bins, samples): samples = (samples//bins) * bins lonlats, filename = shapefile random_filename = tempfile.mktemp() + ".shp" resampling.resample_by_magnitude(filename, random_filename, target_field='lat', bins=bins, output_samples=samples, bootstrap=True ) resampled_sf = shp.Reader(random_filename) resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]] new_coords, new_val, new_othervals = \ geoio.load_shapefile(random_filename, 'lat') assert 'lat' in resampled_shapefields assert 'lon' not in resampled_shapefields assert np.all((samples, 2) == new_coords.shape) assert new_othervals == {} # only the target is retained after resampling
def test_resampling_spatial(shapefile, rows, cols, samples): tiles = rows * cols samples = (samples // tiles) * tiles lonlats, filename = shapefile random_filename = tempfile.mktemp() + ".shp" resampling.resample_spatially(filename, random_filename, target_field='lat', rows=rows, cols=cols, output_samples=samples, bootstrap=True ) resampled_sf = shp.Reader(random_filename) resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]] new_coords, new_val, new_othervals = \ geoio.load_shapefile(random_filename, 'lat') assert 'lat' in resampled_shapefields assert 'lon' not in resampled_shapefields assert np.all((samples, 2) >= new_coords.shape) assert new_othervals == {} # only the target is retained after resampling
def test_do_work(self): # input_file, mask_file, output_file, resampling, extents, jpeg output_file = tempfile.mktemp(suffix='.tif') options = Options(resampling='bilinear', extents=self.extents, jpeg=True, reproject=True) crop.do_work(input_file=self.std2000_no_mask, output_file=output_file, options=options, mask_file=self.mask) # output file was created self.assertTrue(exists(output_file)) # assert jpeg was created self.assertTrue(exists( join(crop.TMPDIR, basename(output_file).split('.')[0] + '.jpg'))) os.remove(output_file)
def atomic_write(path): tmp = tempfile.mktemp( prefix='.' + os.path.basename(path), dir=os.path.dirname(path), ) try: with open(tmp, 'w') as f: yield f except BaseException: os.remove(tmp) raise else: os.rename(tmp, path) # TODO: at some point there will be so many options we'll want to make a config # object or similar instead of adding more arguments here
def screenshot(self, filename=None): """ Return: PIL.Image Raises: EnvironmentError """ tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff') try: idevice("screenshot", "--udid", self.udid, tmpfile) except subprocess.CalledProcessError as e: sys.exit(e.message) try: image = Image.open(tmpfile) image.load() if filename: image.save(filename) return image finally: if os.path.exists(tmpfile): os.unlink(tmpfile)
def _adb_screencap(self, scale=1.0): """ capture screen with adb shell screencap """ remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='screencap-', suffix='.png') local_file = tempfile.mktemp(prefix='atx-screencap-', suffix='.png') self.shell('screencap', '-p', remote_file) try: self.pull(remote_file, local_file) image = imutils.open_as_pillow(local_file) if scale is not None and scale != 1.0: image = image.resize([int(scale * s) for s in image.size], Image.BICUBIC) rotation = self.rotation() if rotation: method = getattr(Image, 'ROTATE_{}'.format(rotation*90)) image = image.transpose(method) return image finally: self.remove(remote_file) os.unlink(local_file)
def _adb_minicap(self, scale=1.0): """ capture screen with minicap https://github.com/openstf/minicap """ remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='minicap-', suffix='.jpg') local_file = tempfile.mktemp(prefix='atx-minicap-', suffix='.jpg') (w, h, r) = self.display params = '{x}x{y}@{rx}x{ry}/{r}'.format(x=w, y=h, rx=int(w*scale), ry=int(h*scale), r=r*90) try: self.shell('LD_LIBRARY_PATH=/data/local/tmp', self.__minicap, '-s', '-P', params, '>', remote_file) self.pull(remote_file, local_file) image = imutils.open_as_pillow(local_file) return image finally: self.remove(remote_file) os.unlink(local_file)
def test_blocking_lock_file(self): my_file = tempfile.mktemp() lock_file = BlockingLockFile(my_file) lock_file._obtain_lock() # next one waits for the lock start = time.time() wait_time = 0.1 wait_lock = BlockingLockFile(my_file, 0.05, wait_time) self.failUnlessRaises(IOError, wait_lock._obtain_lock) elapsed = time.time() - start extra_time = 0.02 if is_win: # for Appveyor extra_time *= 6 # NOTE: Indeterministic failures here... self.assertLess(elapsed, wait_time + extra_time)
def setUp(self): self._dbfile = tempfile.mktemp()
def test_SPDParser(self): # Verify the input is valid spdPath = os.path.abspath("sdr/dom/components/CommandWrapper/CommandWrapper.spd.xml") status = self._xmllint(spdPath, "SPD") self.assertEqual(status, 0, "Input XML isn't DTD valid") spd = parsers.SPDParser.parse(spdPath) self.assertEqual(spd.get_id(), "DCE:458872f6-a316-4082-b1eb-ce5704f5c49d") self.assertEqual(spd.get_name(), "CommandWrapper") self.assertEqual(str(spd.get_author()[0].get_name()[0]), "REDHAWK test author") self.assertEqual(spd.get_propertyfile().get_type(), "PRF") self.assertEqual(spd.get_propertyfile().get_localfile().get_name(), "CommandWrapper.prf.xml") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") spd.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SPD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_PRFParser(self): prf = parsers.PRFParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.prf.xml") props = {} for property in prf.get_simple(): props[property.get_id()] = property for property in prf.get_simplesequence(): props[property.get_id()] = property self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_mode(), "readwrite") self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_name(), "command") self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_type(), "string") self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_value(), "/bin/echo") self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_kind()[0].get_kindtype(), "configure") self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_mode(), "readwrite") self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_name(), "args") self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_type(), "string") self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_values().get_value()[0], "Hello World") self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_kind()[0].get_kindtype(), "configure") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") prf.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "PRF") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_SCDParser(self): scd = parsers.SCDParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.scd.xml") self.assertEqual(scd.get_corbaversion(), "2.2") self.assertEqual(scd.get_componentrepid().get_repid(), "IDL:CF/Resource:1.0") self.assertEqual(scd.get_componenttype(), "resource") self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_repid(), "IDL:CF/Resource:1.0") self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_supportsname(), "Resource") self.assertEqual(scd.get_interfaces().get_interface()[0].get_name(), "Resource") self.assertEqual(scd.get_interfaces().get_interface()[0].get_inheritsinterface()[0].get_repid(), "IDL:CF/LifeCycle:1.0") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") scd.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SCD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_SADParser_usesdeviceref(self): sad = parsers.SADParser.parse("sdr/parser_tests/usesdeviceref.sad.xml") self.assertEqual(sad.get_id(), "colloc_usesdev_1") self.assertEqual(sad.get_name(), "colloc_usesdev") self.assertEqual(len(sad.componentfiles.get_componentfile()), 1) self.assertEqual(len(sad.partitioning.get_hostcollocation()), 1) colloc=sad.partitioning.get_hostcollocation()[0] self.assertEqual(len(colloc.get_componentplacement()),1) comp_place =colloc.get_componentplacement()[0] self.assertEqual(len(comp_place.get_componentinstantiation()),1) comp_ci=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_ci.id_, "P1_1") self.assertEqual(comp_ci.get_usagename(), "P1_1") self.assertEqual(len(colloc.get_usesdeviceref()),1) udev_ref =colloc.get_usesdeviceref()[0] self.assertEqual(udev_ref.refid, "FrontEndTuner_1") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") sad.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SAD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_SADParser_devicerequires(self): sad = parsers.SADParser.parse("sdr/parser_tests/devicerequires.sad.xml") self.assertEqual(sad.get_id(), "device_requires_multicolor") self.assertEqual(sad.get_name(), "device_requires_multicolor") self.assertEqual(len(sad.componentfiles.get_componentfile()), 1) self.assertEqual(len(sad.partitioning.get_componentplacement()), 2) comp_place=sad.partitioning.get_componentplacement()[0] comp_in=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1") self.assertEqual(comp_in.id_, "SimpleComponent_Red") self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red") self.assertEqual(len(comp_in.devicerequires.get_requires()),2) self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color") self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "RED") self.assertEqual(comp_in.devicerequires.get_requires()[1].id, "rank") self.assertEqual(comp_in.devicerequires.get_requires()[1].value, "15") comp_place=sad.partitioning.get_componentplacement()[1] comp_in=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1") self.assertEqual(comp_in.id_, "SimpleComponent_Green") self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green") self.assertEqual(len(comp_in.devicerequires.get_requires()),1) self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color") self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "GREEN") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") sad.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SAD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_SADParser_loggingconfig(self): sad = parsers.SADParser.parse("sdr/parser_tests/loggingconfig.sad.xml") self.assertEqual(sad.get_id(), "device_requires_multicolor") self.assertEqual(sad.get_name(), "device_requires_multicolor") self.assertEqual(len(sad.componentfiles.get_componentfile()), 1) self.assertEqual(len(sad.partitioning.get_componentplacement()), 2) comp_place=sad.partitioning.get_componentplacement()[0] comp_in=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1") self.assertEqual(comp_in.id_, "SimpleComponent_Red") self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red") self.assertEqual(comp_in.loggingconfig.level, "ERROR") self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file") comp_place=sad.partitioning.get_componentplacement()[1] comp_in=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1") self.assertEqual(comp_in.id_, "SimpleComponent_Green") self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green") self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file2") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") sad.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SAD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_SADParser_affinityconfig(self): sad = parsers.SADParser.parse("sdr/parser_tests/affinity.sad.xml") self.assertEqual(sad.get_id(), "device_requires_multicolor") self.assertEqual(sad.get_name(), "device_requires_multicolor") self.assertEqual(len(sad.componentfiles.get_componentfile()), 1) self.assertEqual(len(sad.partitioning.get_componentplacement()), 1) comp_place=sad.partitioning.get_componentplacement()[0] comp_in=comp_place.get_componentinstantiation()[0] self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1") self.assertEqual(comp_in.id_, "SimpleComponent_Red") self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red") self.assertEqual(comp_in.loggingconfig.level, "ERROR") self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file") self.assertEqual(len(comp_in.affinity.get_simpleref()),2) self.assertEqual(comp_in.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class") self.assertEqual(comp_in.affinity.get_simpleref()[0].value, "socket") self.assertEqual(comp_in.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value") self.assertEqual(comp_in.affinity.get_simpleref()[1].value, "0") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") sad.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "SAD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_DCDParser_loggingconfig(self): dcd = parsers.DCDParser.parse("sdr/parser_tests/loggingconfig.dcd.xml") self.assertEqual(dcd.get_id(), "test_GPP_green") self.assertEqual(dcd.get_name(), "test_GPP_green") self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1) self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1) gpp=dcd.partitioning.get_componentplacement()[0] gpp_ci=gpp.get_componentinstantiation()[0] self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP1_file_1") self.assertEqual(gpp_ci.get_id(), "test_GPP_green::GPP_1") self.assertEqual(gpp_ci.get_usagename(), "test_GPP_green::GPP_1") self.assertEqual(gpp_ci.loggingconfig.level, "ERROR") self.assertEqual(gpp_ci.loggingconfig.value, "path/to/my/log/file") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") dcd.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "DCD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def test_DCDParser_affinity(self): dcd = parsers.DCDParser.parse("sdr/parser_tests/affinity.dcd.xml") self.assertEqual(dcd.get_id(), "affinity_parse_1") self.assertEqual(dcd.get_name(), "test_affinity_node_socket") self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1) self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1) gpp=dcd.partitioning.get_componentplacement()[0] gpp_ci=gpp.get_componentinstantiation()[0] self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP_File_1") self.assertEqual(gpp_ci.get_id(), "test_affinity_node:GPP_1") self.assertEqual(gpp_ci.get_usagename(), "GPP_1") self.assertEqual(len(gpp_ci.affinity.get_simpleref()),2) self.assertEqual(gpp_ci.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class") self.assertEqual(gpp_ci.affinity.get_simpleref()[0].value, "socket") self.assertEqual(gpp_ci.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value") self.assertEqual(gpp_ci.affinity.get_simpleref()[1].value, "0") # Verify that we can write the output and still be DTD valid tmpfile = tempfile.mktemp() try: tmp = open(tmpfile, "w") dcd.export(tmp, 0) tmp.close() status = self._xmllint(tmpfile, "DCD") self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML") finally: try: os.remove(tmpfile) except OSError: pass
def setUp(self): self._dbfile = tempfile.mktemp() self._domainBooter_1, self._domainManager_1 = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile) self._domainBooter_2, self._domainManager_2 = launchDomain(2, self._root)
def load_logging_config_uri(orb, uri, binding=None): scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri) if scheme == "file": ossie.utils.log4py.config.fileConfig(path, binding) elif scheme == "sca": q = dict([x.split("=") for x in query.split("&")]) try: fileSys = orb.string_to_object(q["fs"]) except KeyError: logging.warning("sca URI missing fs query parameter") else: if fileSys == None: logging.warning("Failed to lookup file system") else: try: t = tempfile.mktemp() tf = open(t, "w+") scaFile = fileSys.open(path, True) fileSize = scaFile.sizeOf() buf = scaFile.read(fileSize) tf.write(buf) tf.close() scaFile.close() ossie.utils.log4py.config.fileConfig(t) finally: os.remove(t) else: # Invalid scheme logging.warning("Invalid logging config URI scheme")