Python tempfile 模块,mktemp() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.mktemp()

项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SADParser(self):
        sad = parsers.SADParser.parse("sdr/dom/waveforms/CommandWrapperWithPropertyOverride/CommandWrapper.sad.xml")
        self.assertEqual(sad.get_id(), "DCE:d206ab51-6342-4976-bac3-55e6902f3489")
        self.assertEqual(sad.get_name(), "CommandWrapperWithPropertyOverride")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
        self.assertEqual(sad.partitioning.get_componentplacement()[0].componentfileref.refid, "CommandWrapper_592b8bd6-b011-4468-9417-705af45e907b")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].id_, "DCE:8c129782-a6a4-4095-8212-757f01de0c09")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].get_usagename(), "CommandWrapper1")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].refid, "DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e")
        self.assertEqual(sad.partitioning.get_componentplacement()[0].get_componentinstantiation()[0].componentproperties.get_simpleref()[0].value, "/bin/date")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_get_swift_hash_env(self, mock_config, mock_service_name):
        mock_config.return_value = None
        mock_service_name.return_value = "testsvc"
        tmpfile = tempfile.mktemp()
        swift_context.SWIFT_HASH_FILE = tmpfile
        with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
            mock_env_get.return_value = str(uuid.uuid4())
            hash_ = swift_context.get_swift_hash()
            mock_env_get.assert_has_calls([
                mock.call('JUJU_MODEL_UUID'),
                mock.call('JUJU_ENV_UUID',
                          mock_env_get.return_value)
            ])

        with open(tmpfile, 'r') as fd:
            self.assertEqual(hash_, fd.read())

        self.assertTrue(mock_config.called)
项目:eClaire    作者:kogan    | 项目源码 | 文件源码
def generate_pdf(card):
    """
    Make a PDF from a card

    :param card: dict from fetcher.py
    :return: Binary PDF buffer
    """
    from eclaire.base import SPECIAL_LABELS

    pdf = FPDF('L', 'mm', (62, 140))
    pdf.set_margins(2.8, 2.8, 2.8)
    pdf.set_auto_page_break(False, margin=0)

    pdf.add_page()

    font = pkg_resources.resource_filename('eclaire', 'font/Clairifont.ttf')
    pdf.add_font('Clairifont', fname=font, uni=True)
    pdf.set_font('Clairifont', size=48)

    pdf.multi_cell(0, 18, txt=card.name.upper(), align='L')

    qrcode = generate_qr_code(card.url)
    qrcode_file = mktemp(suffix='.png', prefix='trello_qr_')
    qrcode.save(qrcode_file)
    pdf.image(qrcode_file, 118, 35, 20, 20)
    os.unlink(qrcode_file)

    # May we never speak of this again.
    pdf.set_fill_color(255, 255, 255)
    pdf.rect(0, 55, 140, 20, 'F')

    pdf.set_font('Clairifont', '', 16)
    pdf.set_y(-4)
    labels = ', '.join([label.name for label in card.labels
                        if label.name not in SPECIAL_LABELS])
    pdf.multi_cell(0, 0, labels, 0, 'R')

    return pdf.output(dest='S')
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_exit_crash():
    # For each Widget subclass, run a simple python script that creates an
    # instance and then shuts down. The intent is to check for segmentation
    # faults when each script exits.
    tmp = tempfile.mktemp(".py")
    path = os.path.dirname(pg.__file__)

    initArgs = {
        'CheckTable': "[]",
        'ProgressDialog': '"msg"',
        'VerticalLabel': '"msg"',
    }

    for name in dir(pg):
        obj = getattr(pg, name)
        if not isinstance(obj, type) or not issubclass(obj, pg.QtGui.QWidget):
            continue

        print(name)
        argstr = initArgs.get(name, "")
        open(tmp, 'w').write(code.format(path=path, classname=name, args=argstr))
        proc = subprocess.Popen([sys.executable, tmp])
        assert proc.wait() == 0

    os.remove(tmp)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_exit_crash():
    # For each Widget subclass, run a simple python script that creates an
    # instance and then shuts down. The intent is to check for segmentation
    # faults when each script exits.
    tmp = tempfile.mktemp(".py")
    path = os.path.dirname(pg.__file__)

    initArgs = {
        'CheckTable': "[]",
        'ProgressDialog': '"msg"',
        'VerticalLabel': '"msg"',
    }

    for name in dir(pg):
        obj = getattr(pg, name)
        if not isinstance(obj, type) or not issubclass(obj, pg.QtGui.QWidget):
            continue

        print(name)
        argstr = initArgs.get(name, "")
        open(tmp, 'w').write(code.format(path=path, classname=name, args=argstr))
        proc = subprocess.Popen([sys.executable, tmp])
        assert proc.wait() == 0

    os.remove(tmp)
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def _smcra_to_str(self, smcra, temp_dir='/tmp/'):
        """
        WHATIF's input are PDB format files.
        Converts a SMCRA object to a PDB formatted string.
        """

        temp_path = tempfile.mktemp( '.pdb', dir=temp_dir )

        io = PDBIO()
        io.set_structure(smcra)
        io.save(temp_path)

        f = open(temp_path, 'r')
        string = f.read()
        f.close()

        os.remove(temp_path)

        return string
项目:rex    作者:shellphish    | 项目源码 | 文件源码
def test_binary(self, enable_randomness=True, times=1, timeout=15):
        """
        Test the binary generated
        """

        # dump the binary code
        pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-')
        self.dump_binary(filename=pov_binary_filename)
        os.chmod(pov_binary_filename, 0755)

        pov_tester = CGCPovSimulator()
        result = pov_tester.test_binary_pov(
                pov_binary_filename,
                self.crash.binary,
                enable_randomness=enable_randomness,
                timeout=timeout,
                times=times)

        # remove the generated pov
        os.remove(pov_binary_filename)

        return result
项目:rex    作者:shellphish    | 项目源码 | 文件源码
def generate_report(self, register_setters, leakers):

        stat_name = tempfile.mktemp(dir=".", prefix='rex-results-')

        l.info("exploitation report being written to '%s'", stat_name)

        f = open(stat_name, 'w')
        f.write("Binary %s:\n" % os.path.basename(self.crash.project.filename))
        f.write("Register setting exploits:\n")
        for register_setter in register_setters:
            f.write("\t%s\n" % str(register_setter))
        f.write("\n")
        f.write("Leaker exploits:\n")
        for leaker in leakers:
            f.write("\t%s\n" % str(leaker))

        f.close()
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def start_docker_compose(clients=1):
    inline_logs = conftest.inline_logs

    docker_compose_cmd("up -d")
    if clients > 1:
        docker_compose_cmd("scale mender-client=%d" % clients)

    if inline_logs:
        docker_compose_cmd("logs -f &")
    else:
        tfile = tempfile.mktemp("mender_testing")
        docker_compose_cmd("logs -f --no-color > %s 2>&1 &" % tfile)
        logger.info("docker-compose log file stored here: %s" % tfile)
        log_files.append(tfile)

    ssh_is_opened()

    common.set_setup_type(common.ST_OneClient)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def tearDown(self):
        """
        Clean up any files or directories created using L{TestCase.mktemp}.
        Subclasses must invoke this method if they override it or the
        cleanup will not occur.
        """
        if self._temporaryFiles is not None:
            for temp in self._temporaryFiles:
                if os.path.isdir(temp):
                    shutil.rmtree(temp)
                elif os.path.exists(temp):
                    os.unlink(temp)
        try:
            _exception_from_error_queue()
        except Error, e:
            if e.args != ([],):
                self.fail("Left over errors in OpenSSL error queue: " + repr(e))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _getBatchOutput(self, f):
        fn = tempfile.mktemp()
        open(fn, 'w').write(f)
        port = self.server.getHost().port
        cmds = ('-p %i -l testuser '
                    '-K unix '
                    '-a '
                    '-v -b %s 127.0.0.1') % (port, fn)
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
        log.msg('running %s %s' % (sys.executable, cmds))
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)

        self.server.factory.expectedLoseConnection = 1

        d = getProcessOutputAndValue(sys.executable, cmds, env=env)

        def _cleanup(res):
            os.remove(fn)
            return res

        d.addCallback(lambda res: res[0])
        d.addBoth(_cleanup)

        return d
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def loopbackUNIX(server, client, noisy=True):
    """Run session between server and client protocol instances over UNIX socket."""
    path = tempfile.mktemp()
    from twisted.internet import reactor
    f = policies.WrappingFactory(protocol.Factory())
    serverWrapper = _FireOnClose(f, server)
    f.noisy = noisy
    f.buildProtocol = lambda addr: serverWrapper
    serverPort = reactor.listenUNIX(path, f)
    clientF = LoopbackClientFactory(client)
    clientF.noisy = noisy
    reactor.connectUNIX(path, clientF)
    d = clientF.deferred
    d.addCallback(lambda x: serverWrapper.deferred)
    d.addCallback(lambda x: serverPort.stopListening())
    return d
项目:mygene.info    作者:biothings    | 项目源码 | 文件源码
def get_all_species(self):
        import tempfile
        outfile = tempfile.mktemp() + '.txt.gz'
        try:
            self.logger.info('Downloading "species.txt.gz"...')
            out_f = open(outfile, 'wb')
            ftp = FTP(self.__class__.ENSEMBL_FTP_HOST)
            ftp.login()
            species_file = '/pub/release-%s/mysql/ensembl_production_%s/species.txt.gz' % (self.release, self.release)
            ftp.retrbinary("RETR " + species_file, out_f.write)
            out_f.close()
            self.logger.info('Done.')

            #load saved file
            self.logger.info('Parsing "species.txt.gz"...')
            species_li = tab2list(outfile, (1, 2, 7), header=0)   # db_name,common_name,taxid
            species_li = [x[:-1] + [is_int(x[-1]) and int(x[-1]) or None] for x in species_li]
            # as of ensembl 87, there are also mouse strains. keep only the "original" one
            species_li = [s for s in species_li if not s[0].startswith("mus_musculus_")]
            self.logger.info('Done.')
        finally:
            os.remove(outfile)
            pass

        return species_li
项目:rtf2xml    作者:paulhtremblay    | 项目源码 | 文件源码
def __init__(self,
            in_file,
            exception_handler,
            bug_handler,
            copy = None,
            run_level = 1,
            ):
        self.__file = in_file
        self.__bug_handler = bug_handler
        self.__copy = copy
        self.__run_level = run_level
        self.__write_to = tempfile.mktemp()
        self.__exception_handler = exception_handler
        self.__bug_handler = bug_handler
        self.__state = 'outside'
        self.__utf_exp = re.compile(r'&#x(.*?);')
项目:rtf2xml    作者:paulhtremblay    | 项目源码 | 文件源码
def __init__(self,
            in_file,
            bug_handler,
            out_file,
            copy = None,
            orig_file = None,
            run_level = 1,
        ):
        self.__file = in_file
        self.__bug_handler = bug_handler
        self.__copy = copy
        self.__run_level = run_level
        self.__write_to = tempfile.mktemp()
        self.__bracket_count = 0
        self.__ob_count = 0
        self.__cb_count = 0
        self.__pict_count = 0
        self.__in_pict = 0
        self.__already_found_pict = 0
        self.__orig_file = orig_file
        self.__initiate_pict_dict()
        self.__out_file = out_file

        # this is left over
        self.__no_ask = 1
项目:rtf2xml    作者:paulhtremblay    | 项目源码 | 文件源码
def __init__(self, 
            in_file , 
            bug_handler,
            copy = None,
            run_level = 1,
            ):
        self.__file = in_file
        self.__bug_handler = bug_handler
        self.__copy = copy
        self.__write_to = tempfile.mktemp()
        self.__bracket_count=0
        self.__ob_count = 0
        self.__cb_count = 0
        self.__after_asterisk = 0
        self.__delete = 0
        self.__initiate_allow()
        self.__ob = 0
        self.__write_cb = 0
        self.__run_level = run_level
        self.__found_delete = 0
        self.__list = 0
项目:rtf2xml    作者:paulhtremblay    | 项目源码 | 文件源码
def __test_bad(self):
        paths = os.listdir(self.__invalid_dir)
        for path in paths:
            path = os.path.join(self.__invalid_dir, path)
            filename, ext = os.path.splitext(path)
            if ext != '.rtf':
                continue
            if os.path.isdir(path):
                continue
            new_filename = tempfile.mktemp()
            out_path = os.path.join(self.__out_dir, new_filename)
            status, msg = self.__run_script(in_file = path, out_file = out_path)
            if not status:
                info = [path, 'Should have produced an error']
                self.__error.append(info)
            try:
                os.remove(out_path)
            except OSError:
                pass
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def test_smrt_archiver_lasttime():
    tmpfile = tempfile.mktemp()
    archiver = Archiver(dbfile=tmpfile)
    rule = 'test/smrt/rules/archiver.yml'
    feed = 'lasttime'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) > 0

            f = {i.indicator: i.__dict__() for i in x}
            assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) == 0
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def test_smrt_archiver_firsttime():
    tmpfile = tempfile.mktemp()
    archiver = Archiver(dbfile=tmpfile)
    rule = 'test/smrt/rules/archiver.yml'
    feed = 'firsttime'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) > 0

            f = {i.indicator: i.__dict__() for i in x}
            assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) == 0
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def test_smrt_archiver_both():
    tmpfile = tempfile.mktemp()
    archiver = Archiver(dbfile=tmpfile)
    rule = 'test/smrt/rules/archiver.yml'
    feed = 'both'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) > 0

            f = {i.indicator: i.__dict__() for i in x}
            assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) == 0
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def test_smrt_archiver_neither():
    tmpfile = tempfile.mktemp()
    archiver = Archiver(dbfile=tmpfile)
    rule = 'test/smrt/rules/archiver.yml'
    feed = 'neither'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) > 0

            f = {i.indicator: i.__dict__() for i in x}

            assert f['216.243.31.2'].get('lasttime') is None

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) == 0
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def test_smrt_archiver_lasttime_clear():
    tmpfile = tempfile.mktemp()
    archiver = Archiver(dbfile=tmpfile)
    rule = 'test/smrt/rules/archiver.yml'
    feed = 'lasttime'

    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) > 0

            f = {i.indicator: i.__dict__() for i in x}
            assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'

    archiver.clear_memcache()
    with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
        assert type(s) is Smrt

        for r, f in s.load_feeds(rule, feed=feed):
            x = list(s.process(r, f))
            assert len(x) == 0
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def get_local_filename(self):
        """ get_local_filename()
        If the filename is an existing file on this filesystem, return
        that. Otherwise a temporary file is created on the local file
        system which can be used by the format to read from or write to.
        """

        if self._uri_type == URI_FILENAME:
            return self._filename
        else:
            # Get filename
            ext = os.path.splitext(self._filename)[1]
            self._filename_local = tempfile.mktemp(ext, 'imageio_')
            # Write stuff to it?
            if self.mode[0] == 'r':
                with open(self._filename_local, 'wb') as file:
                    shutil.copyfileobj(self.get_file(), file)
            return self._filename_local
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_version_2_0_memmap():
    # requires more than 2 byte for header
    dt = [(("%d" % i) * 100, float) for i in range(500)]
    d = np.ones(1000, dtype=dt)
    tf = tempfile.mktemp('', 'mmap', dir=tempdir)

    # 1.0 requested but data cannot be saved this way
    assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype,
                            shape=d.shape, version=(1, 0))

    ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
                            shape=d.shape, version=(2, 0))
    ma[...] = d
    del ma

    with warnings.catch_warnings(record=True) as w:
        warnings.filterwarnings('always', '', UserWarning)
        ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
                                shape=d.shape, version=None)
        assert_(w[0].category is UserWarning)
        ma[...] = d
        del ma

    ma = format.open_memmap(tf, mode='r')
    assert_array_equal(ma, d)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def testAll(self):
        output_name = tempfile.mktemp("-pycom-test")
        cmd = "cscript //nologo testxslt.js doesnt_matter.xml testxslt.xsl " + output_name
        win32com.test.util.ExecuteShellCommand(cmd, self)
        try:
            f=open(output_name)
            try:
                got = f.read()
                if got != expected_output:
                    print "ERROR: XSLT expected output of %r" % (expected_output,)
                    print "but got %r" % (got,)
            finally:
                f.close()
        finally:
            try:
                os.unlink(output_name)
            except os.error:
                pass
项目:iconograph    作者:robot-tools    | 项目源码 | 文件源码
def _SetCurrent(self, image):
    filename = '%d.iso' % (image['timestamp'])
    path = os.path.join(self._image_dir, filename)
    current_path = os.path.join(self._image_dir, 'current')

    try:
      link = os.readlink(current_path)
      link_path = os.path.join(self._image_dir, link)
      if link_path == path:
        return
    except FileNotFoundError:
      pass

    print('Changing current link to:', filename, flush=True)
    temp_path = tempfile.mktemp(dir=self._image_dir)
    os.symlink(filename, temp_path)
    os.rename(temp_path, current_path)
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def _write_styles_file(self):
        self.styles_xml = tempfile.mktemp()

        try:
            self.f = open(self.styles_xml,"wb")
        except IOError as msg:
            errmsg = "%s\n%s" % (_("Could not create %s") % self.styles_xml, msg)
            raise ReportError(errmsg)
        except:
            raise ReportError(_("Could not create %s") % self.styles_xml)

        self.f = open(self.styles_xml,"w")
        self.f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
        self.f.write(
            '<office:document-styles ' +
                _XMLNS +
                'office:version="1.0"> '
            )
        self.f.write(_STYLES_FONTS)
        self.f.write(_STYLES_STYLES)
        self.f.write(_STYLES_AUTOMATIC)
        self.f.write(_STYLES_MASTER)

        self.f.write('</office:document-styles>\n')
        self.f.close()
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def _write_meta_file(self):
        self.meta_xml = tempfile.mktemp()

        try:
            self.f = open(self.meta_xml,"wb")
        except IOError as msg:
            errmsg = "%s\n%s" % (_("Could not create %s") % self.meta_xml, msg)
            raise ReportError(errmsg)
        except:
            raise ReportError(_("Could not create %s") % self.meta_xml)

        self.f = open(self.meta_xml,"w")

        self.f.write(_META %
                {'program': PROGRAM_NAME,
                 'version': VERSION,
                 'name'   : self.name,
                 'time'   : self.time,
                 }
             )
        self.f.close()
项目:nimo    作者:wolfram2012    | 项目源码 | 文件源码
def __getstate__(self):
        '''This function is neccessary for pickling'''
        # Translate everything but the svm because that cannot be simply pickled.
        state = {}
        for key,value in self.__dict__.iteritems():
            if key == '_model':
                filename = tempfile.mktemp()
                self._model.save(filename)
                buffer = open(filename).read()
                os.remove(filename)
                state[key] = buffer
                continue

            state[key] = value

        return state
项目:nimo    作者:wolfram2012    | 项目源码 | 文件源码
def __getstate__(self):
        '''This function is neccessary for pickling'''
        # Translate everything but the svm because that cannot be simply pickled.
        state = {}
        for key,value in self.__dict__.iteritems():
            if key == '_model':
                filename = tempfile.mktemp()
                self._model.save(filename)
                buffer = open(filename).read()
                os.remove(filename)
                state[key] = buffer
                continue

            state[key] = value

        return state
项目:uncover-ml    作者:GeoscienceAustralia    | 项目源码 | 文件源码
def test_resampling_value(shapefile, bins, samples):
    samples = (samples//bins) * bins
    lonlats, filename = shapefile
    random_filename = tempfile.mktemp() + ".shp"
    resampling.resample_by_magnitude(filename,
                                     random_filename,
                                     target_field='lat',
                                     bins=bins,
                                     output_samples=samples,
                                     bootstrap=True
                                     )
    resampled_sf = shp.Reader(random_filename)
    resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]

    new_coords, new_val, new_othervals = \
        geoio.load_shapefile(random_filename, 'lat')

    assert 'lat' in resampled_shapefields
    assert 'lon' not in resampled_shapefields
    assert np.all((samples, 2) == new_coords.shape)
    assert new_othervals == {}  # only the target is retained after resampling
项目:uncover-ml    作者:GeoscienceAustralia    | 项目源码 | 文件源码
def test_resampling_spatial(shapefile, rows, cols, samples):
    tiles = rows * cols
    samples = (samples // tiles) * tiles
    lonlats, filename = shapefile
    random_filename = tempfile.mktemp() + ".shp"
    resampling.resample_spatially(filename,
                                  random_filename,
                                  target_field='lat',
                                  rows=rows,
                                  cols=cols,
                                  output_samples=samples,
                                  bootstrap=True
                                  )
    resampled_sf = shp.Reader(random_filename)
    resampled_shapefields = [f[0] for f in resampled_sf.fields[1:]]

    new_coords, new_val, new_othervals = \
        geoio.load_shapefile(random_filename, 'lat')

    assert 'lat' in resampled_shapefields
    assert 'lon' not in resampled_shapefields
    assert np.all((samples, 2) >= new_coords.shape)
    assert new_othervals == {}  # only the target is retained after resampling
项目:uncover-ml    作者:GeoscienceAustralia    | 项目源码 | 文件源码
def test_do_work(self):
        # input_file, mask_file, output_file, resampling, extents, jpeg
        output_file = tempfile.mktemp(suffix='.tif')
        options = Options(resampling='bilinear',
                          extents=self.extents,
                          jpeg=True,
                          reproject=True)
        crop.do_work(input_file=self.std2000_no_mask,
                     output_file=output_file,
                     options=options,
                     mask_file=self.mask)

        # output file was created
        self.assertTrue(exists(output_file))

        # assert jpeg was created
        self.assertTrue(exists(
            join(crop.TMPDIR, basename(output_file).split('.')[0] + '.jpg')))

        os.remove(output_file)
项目:dumb-pypi    作者:chriskuehl    | 项目源码 | 文件源码
def atomic_write(path):
    tmp = tempfile.mktemp(
        prefix='.' + os.path.basename(path),
        dir=os.path.dirname(path),
    )
    try:
        with open(tmp, 'w') as f:
            yield f
    except BaseException:
        os.remove(tmp)
        raise
    else:
        os.rename(tmp, path)


# TODO: at some point there will be so many options we'll want to make a config
# object or similar instead of adding more arguments here
项目:ATX    作者:NetEaseGame    | 项目源码 | 文件源码
def screenshot(self, filename=None):
        """
        Return:
            PIL.Image

        Raises:
            EnvironmentError
        """
        tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
        try:
            idevice("screenshot", "--udid", self.udid, tmpfile)
        except subprocess.CalledProcessError as e:
            sys.exit(e.message)

        try:
            image = Image.open(tmpfile)
            image.load()
            if filename:
                image.save(filename)
            return image
        finally:
            if os.path.exists(tmpfile):
                os.unlink(tmpfile)
项目:ATX    作者:NetEaseGame    | 项目源码 | 文件源码
def _adb_screencap(self, scale=1.0):
        """
        capture screen with adb shell screencap
        """
        remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='screencap-', suffix='.png')
        local_file = tempfile.mktemp(prefix='atx-screencap-', suffix='.png')
        self.shell('screencap', '-p', remote_file)
        try:
            self.pull(remote_file, local_file)
            image = imutils.open_as_pillow(local_file)
            if scale is not None and scale != 1.0:
                image = image.resize([int(scale * s) for s in image.size], Image.BICUBIC)
            rotation = self.rotation()
            if rotation:
                method = getattr(Image, 'ROTATE_{}'.format(rotation*90))
                image = image.transpose(method)
            return image
        finally:
            self.remove(remote_file)
            os.unlink(local_file)
项目:ATX    作者:NetEaseGame    | 项目源码 | 文件源码
def _adb_minicap(self, scale=1.0):
        """
        capture screen with minicap

        https://github.com/openstf/minicap
        """
        remote_file = tempfile.mktemp(dir='/data/local/tmp/', prefix='minicap-', suffix='.jpg')
        local_file = tempfile.mktemp(prefix='atx-minicap-', suffix='.jpg')
        (w, h, r) = self.display
        params = '{x}x{y}@{rx}x{ry}/{r}'.format(x=w, y=h, rx=int(w*scale), ry=int(h*scale), r=r*90)
        try:
            self.shell('LD_LIBRARY_PATH=/data/local/tmp', self.__minicap, '-s', '-P', params, '>', remote_file)
            self.pull(remote_file, local_file)
            image = imutils.open_as_pillow(local_file)
            return image
        finally:
            self.remove(remote_file)
            os.unlink(local_file)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_blocking_lock_file(self):
        my_file = tempfile.mktemp()
        lock_file = BlockingLockFile(my_file)
        lock_file._obtain_lock()

        # next one waits for the lock
        start = time.time()
        wait_time = 0.1
        wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
        self.failUnlessRaises(IOError, wait_lock._obtain_lock)
        elapsed = time.time() - start
        extra_time = 0.02
        if is_win:
            # for Appveyor
            extra_time *= 6  # NOTE: Indeterministic failures here...
        self.assertLess(elapsed, wait_time + extra_time)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def setUp(self):
        self._dbfile = tempfile.mktemp()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SPDParser(self):
        # Verify the input is valid
        spdPath = os.path.abspath("sdr/dom/components/CommandWrapper/CommandWrapper.spd.xml")
        status = self._xmllint(spdPath, "SPD")
        self.assertEqual(status, 0, "Input XML isn't DTD valid")

        spd = parsers.SPDParser.parse(spdPath)
        self.assertEqual(spd.get_id(), "DCE:458872f6-a316-4082-b1eb-ce5704f5c49d")
        self.assertEqual(spd.get_name(), "CommandWrapper")
        self.assertEqual(str(spd.get_author()[0].get_name()[0]), "REDHAWK test author")
        self.assertEqual(spd.get_propertyfile().get_type(), "PRF")
        self.assertEqual(spd.get_propertyfile().get_localfile().get_name(), "CommandWrapper.prf.xml")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            spd.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SPD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_PRFParser(self):
        prf = parsers.PRFParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.prf.xml")
        props = {}
        for property in prf.get_simple():
            props[property.get_id()] = property
        for property in prf.get_simplesequence():
            props[property.get_id()] = property
        self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_mode(), "readwrite")
        self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_name(), "command")
        self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_type(), "string")
        self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_value(), "/bin/echo")
        self.assertEqual(props["DCE:a4e7b230-1d17-4a86-aeff-ddc6ea3df26e"].get_kind()[0].get_kindtype(), "configure")

        self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_mode(), "readwrite")
        self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_name(), "args")
        self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_type(), "string")
        self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_values().get_value()[0], "Hello World")
        self.assertEqual(props["DCE:5d8bfe8d-bc25-4f26-8144-248bc343aa53"].get_kind()[0].get_kindtype(), "configure")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            prf.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "PRF")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SCDParser(self):
        scd = parsers.SCDParser.parse("sdr/dom/components/CommandWrapper/CommandWrapper.scd.xml")
        self.assertEqual(scd.get_corbaversion(), "2.2")
        self.assertEqual(scd.get_componentrepid().get_repid(), "IDL:CF/Resource:1.0")
        self.assertEqual(scd.get_componenttype(), "resource")
        self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_repid(), "IDL:CF/Resource:1.0")
        self.assertEqual(scd.get_componentfeatures().get_supportsinterface()[0].get_supportsname(), "Resource")
        self.assertEqual(scd.get_interfaces().get_interface()[0].get_name(), "Resource")
        self.assertEqual(scd.get_interfaces().get_interface()[0].get_inheritsinterface()[0].get_repid(), "IDL:CF/LifeCycle:1.0")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            scd.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SCD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SADParser_usesdeviceref(self):
        sad = parsers.SADParser.parse("sdr/parser_tests/usesdeviceref.sad.xml")
        self.assertEqual(sad.get_id(), "colloc_usesdev_1")
        self.assertEqual(sad.get_name(), "colloc_usesdev")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_hostcollocation()), 1)
        colloc=sad.partitioning.get_hostcollocation()[0]
        self.assertEqual(len(colloc.get_componentplacement()),1)
        comp_place =colloc.get_componentplacement()[0]
        self.assertEqual(len(comp_place.get_componentinstantiation()),1)
        comp_ci=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_ci.id_, "P1_1")
        self.assertEqual(comp_ci.get_usagename(), "P1_1")
        self.assertEqual(len(colloc.get_usesdeviceref()),1)
        udev_ref =colloc.get_usesdeviceref()[0]
        self.assertEqual(udev_ref.refid, "FrontEndTuner_1")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SADParser_devicerequires(self):
        sad = parsers.SADParser.parse("sdr/parser_tests/devicerequires.sad.xml")
        self.assertEqual(sad.get_id(), "device_requires_multicolor")
        self.assertEqual(sad.get_name(), "device_requires_multicolor")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
        comp_place=sad.partitioning.get_componentplacement()[0]
        comp_in=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
        self.assertEqual(comp_in.id_, "SimpleComponent_Red")
        self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
        self.assertEqual(len(comp_in.devicerequires.get_requires()),2)
        self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
        self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "RED")
        self.assertEqual(comp_in.devicerequires.get_requires()[1].id, "rank")
        self.assertEqual(comp_in.devicerequires.get_requires()[1].value, "15")
        comp_place=sad.partitioning.get_componentplacement()[1]
        comp_in=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
        self.assertEqual(comp_in.id_, "SimpleComponent_Green")
        self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
        self.assertEqual(len(comp_in.devicerequires.get_requires()),1)
        self.assertEqual(comp_in.devicerequires.get_requires()[0].id, "color")
        self.assertEqual(comp_in.devicerequires.get_requires()[0].value, "GREEN")


        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SADParser_loggingconfig(self):
        sad = parsers.SADParser.parse("sdr/parser_tests/loggingconfig.sad.xml")
        self.assertEqual(sad.get_id(), "device_requires_multicolor")
        self.assertEqual(sad.get_name(), "device_requires_multicolor")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_componentplacement()), 2)
        comp_place=sad.partitioning.get_componentplacement()[0]
        comp_in=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
        self.assertEqual(comp_in.id_, "SimpleComponent_Red")
        self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
        self.assertEqual(comp_in.loggingconfig.level, "ERROR")
        self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
        comp_place=sad.partitioning.get_componentplacement()[1]
        comp_in=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
        self.assertEqual(comp_in.id_, "SimpleComponent_Green")
        self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Green")
        self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file2")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_SADParser_affinityconfig(self):
        sad = parsers.SADParser.parse("sdr/parser_tests/affinity.sad.xml")
        self.assertEqual(sad.get_id(), "device_requires_multicolor")
        self.assertEqual(sad.get_name(), "device_requires_multicolor")
        self.assertEqual(len(sad.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(sad.partitioning.get_componentplacement()), 1)
        comp_place=sad.partitioning.get_componentplacement()[0]
        comp_in=comp_place.get_componentinstantiation()[0]
        self.assertEqual(comp_place.componentfileref.refid, "SimpleComponent_SPD_1")
        self.assertEqual(comp_in.id_, "SimpleComponent_Red")
        self.assertEqual(comp_in.get_usagename(), "SimpleComponent_Red")
        self.assertEqual(comp_in.loggingconfig.level, "ERROR")
        self.assertEqual(comp_in.loggingconfig.value, "path/to/my/log/file")
        self.assertEqual(len(comp_in.affinity.get_simpleref()),2)
        self.assertEqual(comp_in.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
        self.assertEqual(comp_in.affinity.get_simpleref()[0].value, "socket")
        self.assertEqual(comp_in.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
        self.assertEqual(comp_in.affinity.get_simpleref()[1].value, "0")


        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            sad.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "SAD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_DCDParser_loggingconfig(self):
        dcd = parsers.DCDParser.parse("sdr/parser_tests/loggingconfig.dcd.xml")
        self.assertEqual(dcd.get_id(), "test_GPP_green")
        self.assertEqual(dcd.get_name(), "test_GPP_green")
        self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
        gpp=dcd.partitioning.get_componentplacement()[0]
        gpp_ci=gpp.get_componentinstantiation()[0]
        self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP1_file_1")
        self.assertEqual(gpp_ci.get_id(), "test_GPP_green::GPP_1")
        self.assertEqual(gpp_ci.get_usagename(),  "test_GPP_green::GPP_1")
        self.assertEqual(gpp_ci.loggingconfig.level, "ERROR")
        self.assertEqual(gpp_ci.loggingconfig.value, "path/to/my/log/file")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            dcd.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "DCD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_DCDParser_affinity(self):
        dcd = parsers.DCDParser.parse("sdr/parser_tests/affinity.dcd.xml")
        self.assertEqual(dcd.get_id(), "affinity_parse_1")
        self.assertEqual(dcd.get_name(), "test_affinity_node_socket")
        self.assertEqual(len(dcd.componentfiles.get_componentfile()), 1)
        self.assertEqual(len(dcd.partitioning.get_componentplacement()), 1)
        gpp=dcd.partitioning.get_componentplacement()[0]
        gpp_ci=gpp.get_componentinstantiation()[0]
        self.assertEqual(gpp.get_componentfileref().get_refid(), "GPP_File_1")
        self.assertEqual(gpp_ci.get_id(), "test_affinity_node:GPP_1")
        self.assertEqual(gpp_ci.get_usagename(),  "GPP_1")
        self.assertEqual(len(gpp_ci.affinity.get_simpleref()),2)
        self.assertEqual(gpp_ci.affinity.get_simpleref()[0].refid, "affinity::exec_directive_class")
        self.assertEqual(gpp_ci.affinity.get_simpleref()[0].value, "socket")
        self.assertEqual(gpp_ci.affinity.get_simpleref()[1].refid, "affinity::exec_directive_value")
        self.assertEqual(gpp_ci.affinity.get_simpleref()[1].value, "0")

        # Verify that we can write the output and still be DTD valid
        tmpfile = tempfile.mktemp()
        try:
            tmp = open(tmpfile, "w")
            dcd.export(tmp, 0)
            tmp.close()
            status = self._xmllint(tmpfile, "DCD")
            self.assertEqual(status, 0, "Python parser did not emit DTD compliant XML")
        finally:
            try:
                os.remove(tmpfile)
            except OSError:
                pass
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def setUp(self):
        self._dbfile = tempfile.mktemp()
        self._domainBooter_1, self._domainManager_1 = self.launchDomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
        self._domainBooter_2, self._domainManager_2 = launchDomain(2, self._root)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def load_logging_config_uri(orb, uri, binding=None):
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
    if scheme == "file":
        ossie.utils.log4py.config.fileConfig(path, binding)
    elif scheme == "sca":
        q = dict([x.split("=") for x in query.split("&")])
        try:
            fileSys = orb.string_to_object(q["fs"])
        except KeyError:
            logging.warning("sca URI missing fs query parameter")
        else:
            if fileSys == None:
                logging.warning("Failed to lookup file system")
            else:
                try:
                    t = tempfile.mktemp()

                    tf = open(t, "w+")
                    scaFile = fileSys.open(path, True)
                    fileSize = scaFile.sizeOf()
                    buf = scaFile.read(fileSize)
                    tf.write(buf)
                    tf.close()
                    scaFile.close()

                    ossie.utils.log4py.config.fileConfig(t)
                finally:
                    os.remove(t)
    else:
        # Invalid scheme
        logging.warning("Invalid logging config URI scheme")