Python tempfile 模块,_get_candidate_names() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tempfile._get_candidate_names()

项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def __init__(self, module, rootPupyPath):
        '''
        '''
        self.module = module
        self.rootPupyPath = rootPupyPath
        #Constants
        self.x86PowershellPath = "syswow64\WindowsPowerShell\\v1.0\\powershell.exe"
        self.x64PowershellPath = "system32\\WindowsPowerShell\\v1.0\\powershell.exe"
        #Remote paths
        self.remoteTempFolder=self.module.client.conn.modules['os.path'].expandvars("%TEMP%")
        self.systemRoot = self.module.client.conn.modules['os.path'].expandvars("%SYSTEMROOT%")
        self.invokeReflectivePEInjectionRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.txt')
        self.mainPowershellScriptRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.ps1')
        self.pupyDLLRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.txt')
        self.invokeBypassUACRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.ps1')
        #Define Local paths
        self.pupyDLLLocalPath = os.path.join(gettempdir(),'dllFile.txt')
        self.mainPowerShellScriptPrivilegedLocalPath = os.path.join(gettempdir(),'mainPowerShellScriptPrivileged.txt')
        self.invokeReflectivePEInjectionLocalPath = os.path.join(self.rootPupyPath,"pupy", "external", "PowerSploit", "CodeExecution", "Invoke-ReflectivePEInjection.ps1")
        self.invokeBypassUACLocalPath = os.path.join(rootPupyPath, "pupy", "external", "Empire", "privesc", "Invoke-BypassUAC.ps1")
        #Others
        self.HKCU = self.module.client.conn.modules['_winreg'].HKEY_CURRENT_USER
        if "64" in self.module.client.desc['proc_arch']: self.powershellPath = self.module.client.conn.modules['os.path'].join(self.systemRoot, self.x64PowershellPath)
        else: powershellPath = self.module.client.conn.modules['os.path'].join(self.systemRoot, self.x86PowershellPath)
项目:gym-malware    作者:endgameinc    | 项目源码 | 文件源码
def upx_unpack(self, seed=None):
        # dump bytez to a temporary file
        tmpfilename = os.path.join(
            tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))

        with open(tmpfilename, 'wb') as outfile:
            outfile.write(self.bytez)

        with open(os.devnull, 'w') as DEVNULL:
            retcode = subprocess.call(
                ['upx', tmpfilename, '-d', '-o', tmpfilename + '_unpacked'], stdout=DEVNULL, stderr=DEVNULL)

        os.unlink(tmpfilename)

        if retcode == 0:  # sucessfully unpacked
            with open(tmpfilename + '_unpacked', 'rb') as result:
                self.bytez = result.read()

            os.unlink(tmpfilename + '_unpacked')

        return self.bytez
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def test_validate_file_or_dict(self):
        # verify user folder is expanded before load the file
        temp_name = next(tempfile._get_candidate_names())
        file_path = '~/' + temp_name
        local_file_path = os.path.expanduser(file_path)
        with open(local_file_path, 'w') as f:
            f.write('{"prop":"val"}')

        # verify we load the json content correctly
        try:
            res = validate_file_or_dict(file_path)
            self.assertEqual(res['prop'], "val")
        finally:
            os.remove(local_file_path)

        # verify expanduser call won't mess up the json data
        data = '{"~d": "~/haha"}'
        res = validate_file_or_dict(data)
        self.assertEqual(res['~d'], '~/haha')

        # verify expanduser call again, but use single quot
        data = "{'~d': '~/haha'}"
        res = validate_file_or_dict(data)
        self.assertEqual(res['~d'], '~/haha')
项目:ChainConsumer    作者:Samreay    | 项目源码 | 文件源码
def test_file_loading1(self):
        data = self.data[:1000]
        directory = tempfile._get_default_tempdir()
        filename = next(tempfile._get_candidate_names())
        filename = directory + os.sep + filename + ".txt"
        np.savetxt(filename, data)
        consumer = ChainConsumer()
        consumer.add_chain(filename)
        summary = consumer.analysis.get_summary()
        actual = np.array(list(summary.values())[0])
        assert np.abs(actual[1] - 5.0) < 0.5
项目:ChainConsumer    作者:Samreay    | 项目源码 | 文件源码
def test_file_loading2(self):
        data = self.data[:1000]
        directory = tempfile._get_default_tempdir()
        filename = next(tempfile._get_candidate_names())
        filename = directory + os.sep + filename + ".npy"
        np.save(filename, data)
        consumer = ChainConsumer()
        consumer.add_chain(filename)
        summary = consumer.analysis.get_summary()
        actual = np.array(list(summary.values())[0])
        assert np.abs(actual[1] - 5.0) < 0.5
项目:iCount    作者:tomazc    | 项目源码 | 文件源码
def get_temp_file_name(tmp_dir=None, extension=''):
    """Return an availiable name for temporary file."""
    tmp_name = next(tempfile._get_candidate_names())
    if not tmp_dir:
        tmp_dir = tempfile._get_default_tempdir()
    if extension is not None:
        tmp_name = tmp_name + '.' + extension
    return os.path.join(tmp_dir, tmp_name)
项目:iCount    作者:tomazc    | 项目源码 | 文件源码
def get_temp_file_name(tmp_dir=None, extension=''):
    """Return an availiable name for temporary file."""
    if tmp_dir is None:
        tmp_dir = iCount.TMP_ROOT
    # pylint: disable=protected-access
    tmp_name = next(tempfile._get_candidate_names())
    if not tmp_dir:
        # pylint: disable=protected-access
        tmp_dir = tempfile._get_default_tempdir()
    if extension is not None:
        tmp_name = tmp_name + '.' + extension
    return os.path.join(tmp_dir, tmp_name)
项目:defcon-workshop    作者:devsecops    | 项目源码 | 文件源码
def process(self, fuzzresult):
        temp_name = next(tempfile._get_candidate_names())
        defult_tmp_dir = tempfile._get_default_tempdir()

        filename = os.path.join(defult_tmp_dir, temp_name + ".png")

    subprocess.call(['cutycapt', '--url=%s' % pipes.quote(fuzzresult.url), '--out=%s' % filename])
    self.add_result("Screnshot taken, output at %s" % filename)
项目:gym-malware    作者:endgameinc    | 项目源码 | 文件源码
def upx_pack(self, seed=None):
        # tested with UPX 3.91
        random.seed(seed)
        tmpfilename = os.path.join(
            tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))

        # dump bytez to a temporary file
        with open(tmpfilename, 'wb') as outfile:
            outfile.write(self.bytez)

        options = ['--force', '--overlay=copy']
        compression_level = random.randint(1, 9)
        options += ['-{}'.format(compression_level)]
        # --exact
        # compression levels -1 to -9
        # --overlay=copy [default]

        # optional things:
        # --compress-exports=0/1
        # --compress-icons=0/1/2/3
        # --compress-resources=0/1
        # --strip-relocs=0/1
        options += ['--compress-exports={}'.format(random.randint(0, 1))]
        options += ['--compress-icons={}'.format(random.randint(0, 3))]
        options += ['--compress-resources={}'.format(random.randint(0, 1))]
        options += ['--strip-relocs={}'.format(random.randint(0, 1))]

        with open(os.devnull, 'w') as DEVNULL:
            retcode = subprocess.call(
                ['upx'] + options + [tmpfilename, '-o', tmpfilename + '_packed'], stdout=DEVNULL, stderr=DEVNULL)

        os.unlink(tmpfilename)

        if retcode == 0:  # successfully packed

            with open(tmpfilename + '_packed', 'rb') as infile:
                self.bytez = infile.read()

            os.unlink(tmpfilename + '_packed')

        return self.bytez
项目:run-google-java-format    作者:plume-lib    | 项目源码 | 文件源码
def temporary_file_name():
    return os.path.join(temp_dir, next(tempfile._get_candidate_names()))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:f5-automation-workflows-multicloud    作者:f5devcentral    | 项目源码 | 文件源码
def merge(self, verify=True):
        temp_name = next(tempfile._get_candidate_names())
        remote_path = "/var/config/rest/downloads/{0}".format(temp_name)
        temp_path = '/tmp/' + temp_name

        if self.client.check_mode:
            return True

        self.upload_to_device(temp_name)
        self.move_on_device(remote_path)
        response = self.merge_on_device(
            remote_path=temp_path, verify=verify
        )
        self.remove_temporary_file(remote_path=temp_path)
        return response
项目:ansible_f5    作者:mcgonagle    | 项目源码 | 文件源码
def merge(self, verify=True):
        temp_name = next(tempfile._get_candidate_names())
        remote_path = "/var/config/rest/downloads/{0}".format(temp_name)
        temp_path = '/tmp/' + temp_name

        if self.client.check_mode:
            return True

        self.upload_to_device(temp_name)
        self.move_on_device(remote_path)
        response = self.merge_on_device(
            remote_path=temp_path, verify=verify
        )
        self.remove_temporary_file(remote_path=temp_path)
        return response
项目:ansible_f5    作者:mcgonagle    | 项目源码 | 文件源码
def src(self):
        if self._values['src'] is not None:
            return self._values['src']
        result = next(tempfile._get_candidate_names())
        return result
项目:ansible_f5    作者:mcgonagle    | 项目源码 | 文件源码
def merge(self, verify=True):
        temp_name = next(tempfile._get_candidate_names())
        remote_path = "/var/config/rest/downloads/{0}".format(temp_name)
        temp_path = '/tmp/' + temp_name

        if self.client.check_mode:
            return True

        self.upload_to_device(temp_name)
        self.move_on_device(remote_path)
        response = self.merge_on_device(
            remote_path=temp_path, verify=verify
        )
        self.remove_temporary_file(remote_path=temp_path)
        return response
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:CElegansBehaviour    作者:ChristophKirst    | 项目源码 | 文件源码
def py_func(func, inp, Tout, name=None, grad=None):
  """Redfine tf.py_func to include gradients"""
  temp_name = next(tempfile._get_candidate_names())
  _name = 'PyFuncGrad%s' %temp_name;
  tf.RegisterGradient(_name)(grad)
  g = tf.get_default_graph()
  with g.gradient_override_map({"PyFunc": _name}):
    return tf.py_func(func, inp, Tout, name=name)
项目:odoo-brasil    作者:Trust-Code    | 项目源码 | 文件源码
def _find_attachment_ids_email(self):
        atts = super(InvoiceEletronic, self)._find_attachment_ids_email()
        attachment_obj = self.env['ir.attachment']
        if self.model not in ('009'):
            return atts

        tmp = tempfile._get_default_tempdir()
        temp_name = os.path.join(tmp, next(tempfile._get_candidate_names()))

        command_args = ["--dpi", "84", str(self.url_danfe), temp_name]
        wkhtmltopdf = [_get_wkhtmltopdf_bin()] + command_args
        process = subprocess.Popen(wkhtmltopdf, stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
        out, err = process.communicate()
        if process.returncode not in [0, 1]:
            raise UserError(_('Wkhtmltopdf failed (error code: %s). '
                              'Message: %s') % (str(process.returncode), err))
        tmpDanfe = None
        with open(temp_name, 'r') as f:
            tmpDanfe = f.read()

        try:
            os.unlink(temp_name)
        except (OSError, IOError):
            _logger.error('Error when trying to remove file %s' % temp_name)

        if tmpDanfe:
            danfe_id = attachment_obj.create(dict(
                name="Danfe-%08d.pdf" % self.numero,
                datas_fname="Danfe-%08d.pdf" % self.numero,
                datas=base64.b64encode(tmpDanfe),
                mimetype='application/pdf',
                res_model='account.invoice',
                res_id=self.invoice_id.id,
            ))
            atts.append(danfe_id.id)
        return atts
项目:wfuzz    作者:gwen001    | 项目源码 | 文件源码
def process(self, fuzzresult):
        temp_name = next(tempfile._get_candidate_names())
        defult_tmp_dir = tempfile._get_default_tempdir()

        filename = os.path.join(defult_tmp_dir, temp_name + ".png")

    subprocess.call(['cutycapt', '--url=%s' % pipes.quote(fuzzresult.url), '--out=%s' % filename])
    self.add_result("Screnshot taken, output at %s" % filename)
项目:curso-aws-bigdata-ai    作者:paradigmadigital    | 项目源码 | 文件源码
def write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv):
    print('inside write s3 function (bucket: {}, key: {}, json: {}, csv: {})'
          .format(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv))

    tmp_filename = 'tmp{}'.format(next(tempfile._get_candidate_names()))
    print('tmp filename: {}'.format(tmp_filename))

    # write csv file
    f = open('/tmp/{}.csv'.format(tmp_filename), 'w')
    f.write(rekognition_faces_response_csv)
    f.close()

    f = open('/tmp/{}.csv'.format(tmp_filename), 'r')
    s3client.upload_fileobj(f, Bucket=bucket, Key='csv/' + key + '.csv')
    f.close()

    # write json file
    f = open('/tmp/{}.json'.format(tmp_filename), 'w')
    f.write(rekognition_faces_response_json)
    f.close()

    f = open('/tmp/{}.json'.format(tmp_filename), 'r')
    s3client.upload_fileobj(f, Bucket=bucket, Key='json/' + key + '.json')
    f.close()

# --------------- Main handler ------------------
项目:DeepFormants    作者:MLSpeech    | 项目源码 | 文件源码
def generate_tmp_filename(extension):
    return tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + "." + extension
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:pyvips    作者:jcupitt    | 项目源码 | 文件源码
def temp_filename(directory, suffix):
    temp_name = next(tempfile._get_candidate_names())
    filename = os.path.join(directory, temp_name + suffix)

    return filename


# run a 2-ary function on two things -- loop over elements pairwise if the
# things are lists
项目:eggsnspam    作者:wayfair    | 项目源码 | 文件源码
def setUp(self):
        # Get a random temporary file to use for testing
        self.tmp_status_file = next(tempfile._get_candidate_names())
        self.app.config['HEALTHCHECK_STATUS_FILE'] = self.tmp_status_file
        super(HealthViewTestCaseMixin, self).setUp()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_retval(self):
        # _get_candidate_names returns a _RandomNameSequence object
        obj = tempfile._get_candidate_names()
        self.assertIsInstance(obj, tempfile._RandomNameSequence)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_same_thing(self):
        # _get_candidate_names always returns the same object
        a = tempfile._get_candidate_names()
        b = tempfile._get_candidate_names()

        self.assertTrue(a is b)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _mock_candidate_names(*names):
    return support.swap_attr(tempfile,
                             '_get_candidate_names',
                             lambda: iter(names))
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def get_new_file_name():
#    return tempfile._get_candidate_names()
    global ci    
    while True:
        ci += 1
        ci = ci % max_file_i
        yield str(ci)
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def get_images_urls(soup):
    a =  soup.find_all("a",{"class":"gallery__item carousel__item"})
    if len(a) == 0:
        return []
    images_urls = []
    for img in a:
        h = img.get("href")
        parsed = urlparse.urlparse(h)
        image = urlparse.parse_qs(parsed.query)['img_url'][0]
        images_urls.append(image)

    return images_urls

#temp_name = next(tempfile._get_candidate_names())
项目:tulen    作者:detorto    | 项目源码 | 文件源码
def getCaptchaImage(self, soup):
        i = soup.find("img", { "class" : "form__captcha" })
        url= i.get("src")

        print soup.find_all("input")
        self.last_key = soup.find("input",{"name":"key"})["value"]
        self.last_retpath = soup.find("input",{"name":"retpath"})["value"]
        fname = "./files/"+next(tempfile._get_candidate_names())+".jpg"
        download_image(url, fname)
        return fname
项目:infrared    作者:redhat-openstack    | 项目源码 | 文件源码
def _copy_outside_keys(self):
        """Copy key from out of the workspace into one"""
        paths_map = {}
        real_inv = os.path.join(self.path, os.readlink(self.inventory))

        for line in fileinput.input(real_inv, inplace=True):
            key_defs = re.findall(r"ansible_ssh_private_key_file=\/\S+", line)
            for key_def in key_defs:
                path = key_def.split("=")[-1]
                paths_map.setdefault(path, path)

            new_line = line.strip()
            for mapped_orig, mapped_new in paths_map.iteritems():
                if mapped_orig == mapped_new:
                    keyfilename = os.path.basename(mapped_orig)
                    rand_part = next(tempfile._get_candidate_names())
                    new_fname = "{}-{}".format(keyfilename, rand_part)

                    shutil.copy2(mapped_orig, os.path.join(
                                 self.path, new_fname))
                    paths_map[mapped_orig] = os.path.join(
                        self.path_placeholder, new_fname)
                    new_fname = paths_map[mapped_orig]
                else:
                    new_fname = mapped_new

                new_line = re.sub(mapped_orig, new_fname, new_line)

            print(new_line)
项目:DeepPhoneticToolsTutorial    作者:MLSpeech    | 项目源码 | 文件源码
def generate_tmp_filename(extension):
    return tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + "." + extension
项目:visdom    作者:facebookresearch    | 项目源码 | 文件源码
def video(self, tensor=None, videofile=None, win=None, env=None, opts=None):
        """
        This function plays a video. It takes as input the filename of the video
        or a `LxHxWxC` tensor containing all the frames of the video. The function
        does not support any plot-specific `options`.
        """
        opts = {} if opts is None else opts
        opts['fps'] = opts.get('fps', 25)
        _assert_opts(opts)
        assert tensor is not None or videofile is not None, \
            'should specify video tensor or file'

        if tensor is not None:
            import cv2
            import tempfile
            assert tensor.ndim == 4, 'video should be in 4D tensor'
            videofile = '/tmp/%s.ogv' % next(tempfile._get_candidate_names())
            if cv2.__version__.startswith('2'):  # OpenCV 2
                fourcc = cv2.cv.CV_FOURCC(
                    chr(ord('T')),
                    chr(ord('H')),
                    chr(ord('E')),
                    chr(ord('O'))
                )
            elif cv2.__version__.startswith('3'):  # OpenCV 3
                fourcc = cv2.VideoWriter_fourcc(
                    chr(ord('T')),
                    chr(ord('H')),
                    chr(ord('E')),
                    chr(ord('O'))
                )
            writer = cv2.VideoWriter(
                videofile,
                fourcc,
                opts.get('fps'),
                (tensor.shape[1], tensor.shape[2])
            )
            assert writer.isOpened(), 'video writer could not be opened'
            for i in range(tensor.shape[0]):
                writer.write(tensor[i, :, :, :])
            writer.release()
            writer = None

        extension = videofile.split(".")[-1].lower()
        mimetypes = dict(mp4='mp4', ogv='ogg', avi='avi', webm='webm')
        mimetype = mimetypes.get(extension)
        assert mimetype is not None, 'unknown video type: %s' % extension

        bytestr = loadfile(videofile)
        videodata = """
            <video controls>
                <source type="video/%s" src="data:video/%s;base64,%s">
                Your browser does not support the video tag.
            </video>
        """ % (mimetype, mimetype, base64.b64encode(bytestr).decode('utf-8'))
        return self.text(text=videodata, win=win, env=env, opts=opts)