Python tempfile 模块,TemporaryDirectory() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tempfile.TemporaryDirectory()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testPrintCopy(self):
    """test the print for a copy"""
    with tempfile.TemporaryDirectory() as tmpdir:
      fake_path_helper = \
        fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
            self.template_path, 'test',
            'db')
      path = os.path.join(tmpdir, 'testfile')
      generator = sqlite_generator.SQLiteGenerator(
          tmpdir, 'test', 'test', ['test'],
          output_handler_file.OutputHandlerFile(path,
                                                file_handler.FileHandler()),
          self.plugin_helper, fake_path_helper)
      generator._PrintCopy(path)  # pylint: disable=protected-access
      expected = "copy " + path
      actual = self._ReadFromFile(path)
    self.assertEqual(expected, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testPrintEdit(self):
    """test the print for a edit"""
    with tempfile.TemporaryDirectory() as tmpdir:
      fake_path_helper = \
        fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
            self.template_path, 'test',
            'db')
      path = os.path.join(tmpdir, 'testfile')
      generator = sqlite_generator.SQLiteGenerator(
          tmpdir, 'test', 'test', ['test'],
          output_handler_file.OutputHandlerFile(path,
                                                file_handler.FileHandler()),
          self.plugin_helper, fake_path_helper)
      generator._PrintEdit(path)  # pylint: disable=protected-access
      expected = 'edit ' + path
      actual = self._ReadFromFile(path)
    self.assertEqual(expected, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testPrint(self):
    """test print"""
    with tempfile.TemporaryDirectory() as tmpdir:
      fake_path_helper = \
        fake_sqlite_plugin_path_helper.FakeSQLitePluginPathHelper(
            self.template_path, 'test', 'db')
      path = os.path.join(tmpdir, 'testfile')
      generator = sqlite_generator.SQLiteGenerator(
          tmpdir, 'test', 'test', ['test'],
          output_handler_file.OutputHandlerFile(
              path, file_handler.FileHandler()),
          self.plugin_helper, fake_path_helper)
      arguments = 'test1', 'test2', 'test3', 'test4', 'test5', 'test6', 'test7'
      generator._Print(*arguments)  # pylint: disable=protected-access
      actual = self._ReadFromFile(path)

    expected = 'create test1create test2create test3create test4copy ' \
               'test5create test6create test7'
    self.assertEqual(expected, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testPluginNameIfExisting(self):
    """test method after getting the plugin Name from the user if the plugin
    Name already exists"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_info='the_plugin',
          prompt_error='the_plugin', )
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          plugin_exists=True, change_bool_after_every_call_plugin_exists=True,
          valid_name=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualName = 'the_plugin'
      controller._path = 'somepath'
      actual = controller.PluginName(None, None, actualName)
      expected = 'Plugin exists. Choose new Name'
      actual_prompt = self._ReadFromFile(path)
      self.assertEqual(expected, actual_prompt)
      self.assertEqual(actualName, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testCreateSQLQueryModelWithUserInputWithError(self):
    """test method CreateEventModelWithUserInput"""
    error_message = "Some Error..."
    fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
        sql_query_data.SQLQueryData(has_error=True,
                                    error_message=error_message)
    )
    sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate'
    name = 'Contact'
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_info=name)
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          folder_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False,
                                                            fake_execution)
      self.assertIsNone(actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testSourcePathIfNotExisting(self):
    """test method after getting the source path from the user"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='the source path')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          folder_exists=False, change_bool_after_every_call_folder_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualPath = 'testpath'
      source_path = controller.SourcePath(None, None, actualPath)
      expected = 'Folder does not exists. Enter correct one'
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(source_path, 'the source path')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testTestPathIfExisting(self):
    """test method after getting the source path from the user"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler())
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          file_exists=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      actualPath = os.path.join(path_helper.TestDatabasePath(),
                                'twitter_ios.db')
      valid_path = controller.TestPath(None, None, actualPath)

      actual_output = self._ReadFromFile(path)
      self.assertEqual(actualPath, controller._testfile)
      self.assertEqual('', actual_output)
      self.assertEqual(valid_path, actualPath)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidatePluginNameIfNotOk(self):
    """test the validate plugin Name method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='valid_name')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_name=False, change_bool_after_every_call_valid_name=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidatePluginName("the_wrong_plugin_")
      expected = ('Plugin is not in a valid format. Choose new Name ['
                  'plugin_name_...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'valid_name')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateRowNameIfNotOk(self):
    """test the validate row name method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='TheValidRowName')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_row_name=False,
          change_bool_after_every_call_valid_row_name=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateRowName("theWrongName")
      expected = ('Row name is not in a valid format. Choose new Name ['
                  'RowName...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'TheValidRowName')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateTimestampStringIfNotOk(self):
    """test the validate timestamp string method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='this,that,bla')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_comma_separated_string=False,
          change_bool_after_every_call_valid_comma_separated_string=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateTimestampString("this, that,bla")
      expected = (
          'Timestamps are not in valid format. Reenter them correctly [name,'
          'name...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'this,that,bla')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testValidateColumnStringIfNotOk(self):
    """test the validate column string method if not ok"""
    with tempfile.TemporaryDirectory() as tmpdir:
      path = os.path.join(tmpdir, 'testfile')
      pathlib.Path(path).touch()

      output_handler = output_handler_file.OutputHandlerFile(
          path, file_handler.FileHandler(), prompt_error='this,that,bla')
      plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
          valid_comma_separated_string=False,
          change_bool_after_every_call_valid_comma_separated_string=True)
      controller = sqlite_controller.SQLiteController(output_handler,
                                                      plugin_helper)
      valid = controller._ValidateColumnString("this, that,bla")
      expected = (
          'Column names are not in valid format. Reenter them correctly [name,'
          'name...]')
      actual = self._ReadFromFile(path)
      self.assertEqual(expected, actual)
      self.assertEqual(valid, 'this,that,bla')
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testGenerateIfNotConfirmed(self):
    """test the generate if confirmed """
    template_path = path_helper.TemplatePath()

    with self.assertRaises(SystemExit):
      with tempfile.TemporaryDirectory() as tmpdir:
        file = os.path.join(tmpdir, 'testfile')
        pathlib.Path(file).touch()

        output_handler = output_handler_file.OutputHandlerFile(
            file, file_handler.FileHandler(), confirm=False)

        plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
            valid_name=False,
            change_bool_after_every_call_valid_name=True)
        controller = sqlite_controller.SQLiteController(output_handler,
                                                        plugin_helper)
        controller.Generate('not used', 'not used')

        self.assertFalse(template_path)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testCopyFile(self):
    """Tests if the copying of a file none existing beforhand works."""
    expected_content = "this is test content."

    with tempfile.TemporaryDirectory() as tmpdir:
      source = os.path.join(tmpdir, self.file)
      destination = os.path.join(tmpdir, "copy", self.file)

      with open(source, "a") as f:
        f.write(expected_content)

      handler = file_handler.FileHandler()
      self.assertFalse(os.path.exists(destination))
      handler.CopyFile(source, destination)
      self.assertTrue(os.path.exists(destination))
      self.assertTrue(filecmp.cmp(destination, source))
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testAddContentIfFileExists(self):
    """Tests if the editing of a file existing works."""
    content = "this is test content. "
    expected = content + content

    with tempfile.TemporaryDirectory() as tmpdir:
      source = os.path.join(tmpdir, self.file)
      with open(source, "a") as f:
        f.write(content)

      handler = file_handler.FileHandler()
      self.assertTrue(os.path.exists(source))
      handler.AddContent(source, content)
      self.assertTrue(os.path.exists(source))

      with open(source, "r") as f:
        actual = f.read()

    self.assertEqual(expected, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testCreateOrModifyFileWithContentIfFileExists(self):
    """Tests if the method create or modify file with content works, if the
    file exists"""
    content = "this is test content. "
    expected = content + content

    with tempfile.TemporaryDirectory() as tmpdir:
      source = os.path.join(tmpdir, self.file)
      with open(source, "a") as f:
        f.write(content)

      handler = file_handler.FileHandler()
      self.assertTrue(os.path.exists(source))
      handler.CreateOrModifyFileWithContent(source, content)
      self.assertTrue(os.path.exists(source))

      with open(source, "r") as f:
        actual = f.read()

    self.assertEqual(expected, actual)
项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testAddContentIfFileAndFolderDoesNotExist(self):
    """Tests if the method create or modify file with content works, if the
    file and Folder does not exist"""
    content = "this is test content. "
    expected = content

    with tempfile.TemporaryDirectory() as tmpdir:
      new_path = os.path.join(tmpdir, "newfolder")
      source = os.path.join(new_path, self.file)
      handler = file_handler.FileHandler()
      self.assertFalse(os.path.exists(source))
      handler.CreateOrModifyFileWithContent(source, content)
      self.assertTrue(os.path.exists(source))

      with open(source, "r") as f:
        actual = f.read()

    self.assertEqual(expected, actual)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testUserConfigOverrides(self):
        """Test that user configs override default.yaml w/ includes"""
        with TemporaryDirectory() as tmp:
            os.chdir(tmp)
            os.mkdir("recipes")
            with open("default.yaml", "w") as f:
                f.write("include:\n")
                f.write("    - included\n")
                f.write("environment:\n")
                f.write("    FOO: BAR\n")
            with open("included.yaml", "w") as f:
                f.write("environment:\n")
                f.write("    FOO: BAZ\n")
            with open("user.yaml", "w") as f:
                f.write("environment:\n")
                f.write("    FOO: USER\n")

            recipeSet = RecipeSet()
            recipeSet.setConfigFiles(["user"])
            recipeSet.parse()

            assert recipeSet.defaultEnv() == { "FOO":"USER"}
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testChangeRemote(self):
        """Test that changed remotes in recipe are updated in the working copy"""
        s1 = self.createGitScm({
            'remote-bar' : 'http://bar.test/baz.git',
        })
        s2 = self.createGitScm({
            'remote-bar' : 'http://bar.test/foo.git',
        })
        with tempfile.TemporaryDirectory() as workspace:
            remotes = self.callAndGetRemotes(workspace, s1)
            self.assertEqual(remotes, {
                "origin" : self.repodir,
                'bar' : 'http://bar.test/baz.git',
            })
            remotes = self.callAndGetRemotes(workspace, s2)
            self.assertEqual(remotes, {
                "origin" : self.repodir,
                'bar' : 'http://bar.test/foo.git',
            })
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testDirAndFile(self):
        """Test hashing a directory with one file.

        The hash sum should stay stable in the long run as this might be used
        for binary artifact matching in the future.
        """

        with TemporaryDirectory() as tmp:
            os.mkdir(os.path.join(tmp, "dir"))
            with open(os.path.join(tmp, "dir", "file"), 'wb') as f:
                f.write(b'abc')

            sum1 = hashDirectory(tmp)
            assert len(sum1) == 20
            assert sum1 == binascii.unhexlify(
                "640f516de78fba0b6d2ddde4451000f142d06b0d")
            sum2 = hashDirectory(tmp)
            assert sum1 == sum2
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testRewriteFile(self):
        """Changing the file content should change the hash sum"""

        with NamedTemporaryFile() as index:
            with TemporaryDirectory() as tmp:
                with open(os.path.join(tmp, "foo"), 'wb') as f:
                    f.write(b'abc')
                sum1 = hashDirectory(tmp, index.name)

                with open(index.name, "rb") as f:
                    assert f.read(4) == b'BOB1'

                with open(os.path.join(tmp, "foo"), 'wb') as f:
                    f.write(b'qwer')
                sum2 = hashDirectory(tmp, index.name)

                with open(index.name, "rb") as f:
                    assert f.read(4) == b'BOB1'

                assert sum1 != sum2
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testUploadPackageNoFail(self):
        """The nofail option must prevent fatal error on upload failures"""

        archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
        archive.wantUpload(True)
        with TemporaryDirectory() as tmp:
            # create simple workspace
            audit = os.path.join(tmp, "audit.json.gz")
            content = os.path.join(tmp, "workspace")
            with open(audit, "wb") as f:
                f.write(b"AUDIT")
            os.mkdir(content)
            with open(os.path.join(content, "data"), "wb") as f:
                f.write(b"DATA")

            # must not throw
            archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 0)
            archive.uploadPackage(ERROR_UPLOAD_ARTIFACT, audit, content, 1)

        # also live-build-id upload errors must not throw with nofail
        archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 0)
        archive.uploadLocalLiveBuildId(ERROR_UPLOAD_ARTIFACT, b'\x00', 1)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testUploadJenkinsNoFail(self):
        """The nofail option must prevent fatal error on upload failures"""

        archive = self.__getArchiveInstance({"flags" : ["upload", "download", "nofail"]})
        archive.wantUpload(True)

        with TemporaryDirectory() as tmp:
            with open(os.path.join(tmp, "error.buildid"), "wb") as f:
                f.write(ERROR_UPLOAD_ARTIFACT)
            self.__createArtifactByName(os.path.join(tmp, "result.tgz"))

            # these uploads must not fail even though they do not succeed
            script = archive.upload(None, "error.buildid", "result.tgz")
            callJenkinsScript(script, tmp)
            script = archive.uploadJenkinsLiveBuildId(None, "error.buildid", "test.buildid")
            callJenkinsScript(script, tmp)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def testInvalidServer(self):
        """Test download on non-existent server"""

        spec = { 'url' : "https://127.1.2.3:7257" }
        archive = SimpleHttpArchive(spec)
        archive.wantDownload(True)
        archive.wantUpload(True)

        # Local
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 0)
        archive.downloadPackage(b'\x00'*20, "unused", "unused", 1)
        self.assertEqual(archive.downloadLocalLiveBuildId(b'\x00'*20, 0), None)

        # Jenkins
        with TemporaryDirectory() as workspace:
            with open(os.path.join(workspace, "test.buildid"), "wb") as f:
                f.write(b'\x00'*20)
            script = archive.download(None, "test.buildid", "result.tgz")
            callJenkinsScript(script, workspace)
项目:Falco    作者:VCCRI    | 项目源码 | 文件源码
def set_mapper_number(manifest_file):
    fastq_counts = 0

    if manifest_file.startswith("s3://"):
        s3 = boto3.resource("s3")

        bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)

        with tempfile.TemporaryDirectory() as tmpdirname:
            s3.meta.client.download_file(bucket_name, key_prefix, tmpdirname + "/manifest")

            for line in open(tmpdirname+"/manifest"):
                fastq_counts += 1
    else:
        for line in open(manifest_file):
            fastq_counts += 1

    return fastq_counts
项目:Falco    作者:VCCRI    | 项目源码 | 文件源码
def set_mapper_number(manifest_file):
    accession_counts = 0

    if manifest_file.startswith("s3://"):
        s3_client = boto3.resource("s3")

        bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)

        with tempfile.TemporaryDirectory() as tmpdirname:
            s3_client.Object(bucket_name, key_prefix).download_file(tmpdirname+"/manifest")

            for line in open(tmpdirname+"/manifest"):
                accession_counts += 1
    else:
        for line in open(manifest_file):
            accession_counts += 1

    return accession_counts
项目:runcommands    作者:wylee    | 项目源码 | 文件源码
def copy(contents, config=None, destination_dir=False, **kwargs):
    if config is None:
        config = Config(xyz='123')

    with NamedTemporaryFile('w', delete=False) as tp:
        tp.write(contents)

    source = tp.name

    if destination_dir:
        with TemporaryDirectory() as destination:
            path = copy_file(config, source, destination, **kwargs)
            yield source, destination, path
            os.remove(source)
    else:
        destination = source + '.copy'
        path = copy_file(config, source, destination, **kwargs)
        yield source, destination, path
        os.remove(source)
        os.remove(path)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:search_google    作者:rrwen    | 项目源码 | 文件源码
def setUp(self):
    file_path = resource_filename(Requirement.parse('search_google'), 'search_google/config.json')
    with open(file_path, 'r') as in_file:
      defaults = json.load(in_file)
    buildargs = {
      'serviceName': 'customsearch',
      'version': 'v1',
      'developerKey': defaults['build_developerKey']
    }
    cseargs = {
      'q': 'google',
      'num': 1,
      'fileType': 'png',
      'cx': defaults['cx']
    }
    self.results = search_google.api.results(buildargs, cseargs)
    tempfile = TemporaryFile()
    self.tempfile = str(tempfile.name)
    tempfile.close()
    self.tempdir = str(TemporaryDirectory().name)
项目:activity-browser    作者:LCA-ActivityBrowser    | 项目源码 | 文件源码
def initializePage(self):
        if self.wizard.import_type == 'directory':
            self.import_dir()
            self.unarchive_label.hide()
            self.unarchive_progressbar.hide()
        elif self.wizard.import_type == 'archive':
            self.tempdir = tempfile.TemporaryDirectory()
            self.archivepath = self.field('archivepath')
            self.unarchive()
        else:
            self.download_label.setVisible(True)
            self.download_progressbar.setVisible(True)
            self.unarchive_progressbar.setMaximum(1)
            self.tempdir = tempfile.TemporaryDirectory()
            self.archivepath = os.path.join(self.tempdir.name, 'db.7z')
            import_signals.download_complete.connect(self.unarchive)
            self.download_thread = DownloadThread(
                session, self.wizard.db_url, self.tempdir.name)
            import_signals.download_complete.connect(self.download_thread.exit)
            self.download_thread.start()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_spearman(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'value': ['1.0', '2.0', '3.0']},
                         index=['sample1', 'sample2', 'sample3']))
        with tempfile.TemporaryDirectory() as output_dir:
            alpha_correlation(output_dir, alpha_div, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            jsonp_fp = os.path.join(output_dir, 'category-value.jsonp')
            self.assertTrue(os.path.exists(jsonp_fp))

            self.assertTrue('Spearman' in open(jsonp_fp).read())
            self.assertTrue('"sampleSize": 3' in open(jsonp_fp).read())
            self.assertTrue('"data":' in open(jsonp_fp).read())
            self.assertFalse('filtered' in open(jsonp_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_pearson(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'value': ['1.0', '2.0', '3.0']},
                         index=['sample1', 'sample2', 'sample3']))
        with tempfile.TemporaryDirectory() as output_dir:
            alpha_correlation(output_dir, alpha_div, md, method='pearson')
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            jsonp_fp = os.path.join(output_dir, 'category-value.jsonp')
            self.assertTrue(os.path.exists(jsonp_fp))

            self.assertTrue('Pearson' in open(jsonp_fp).read())
            self.assertTrue('"sampleSize": 3' in open(jsonp_fp).read())
            self.assertTrue('"data":' in open(jsonp_fp).read())
            self.assertFalse('filtered' in open(jsonp_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_alpha_group_significance(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'a or b': ['a', 'b', 'b']},
                         index=['sample1', 'sample2', 'sample3']))

        with tempfile.TemporaryDirectory() as output_dir:
            alpha_group_significance(output_dir, alpha_div, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue(os.path.exists(
                            os.path.join(output_dir,
                                         'category-a%20or%20b.jsonp')))
            self.assertTrue('Kruskal-Wallis (all groups)'
                            in open(index_fp).read())
            self.assertTrue('Kruskal-Wallis (pairwise)'
                            in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_alpha_group_significance_some_numeric(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'a or b': ['a', 'b', 'b'],
                          'bad': ['1.0', '2.0', '3.0']},
                         index=['sample1', 'sample2', 'sample3']))

        with tempfile.TemporaryDirectory() as output_dir:
            alpha_group_significance(output_dir, alpha_div, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue(os.path.exists(
                            os.path.join(output_dir,
                                         'category-a%20or%20b.jsonp')))
            self.assertFalse(os.path.exists(
                             os.path.join(output_dir,
                                          'bad-value.jsonp')))
            self.assertTrue('not categorical:' in open(index_fp).read())
            self.assertTrue('<strong>bad' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_alpha_group_significance_one_group_all_unique_values(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'a or b': ['a', 'b', 'b'],
                          'bad': ['x', 'y', 'z']},
                         index=['sample1', 'sample2', 'sample3']))

        with tempfile.TemporaryDirectory() as output_dir:
            alpha_group_significance(output_dir, alpha_div, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue(os.path.exists(
                            os.path.join(output_dir,
                                         'category-a%20or%20b.jsonp')))
            self.assertFalse(os.path.exists(
                             os.path.join(output_dir,
                                          'category-bad.jsonp')))
            self.assertTrue('number of samples' in open(index_fp).read())
            self.assertTrue('<strong>bad' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_alpha_group_significance_one_group_single_value(self):
        alpha_div = pd.Series([2.0, 4.0, 6.0], name='alpha-div',
                              index=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame({'a or b': ['a', 'b', 'b'],
                          'bad': ['x', 'x', 'x']},
                         index=['sample1', 'sample2', 'sample3']))

        with tempfile.TemporaryDirectory() as output_dir:
            alpha_group_significance(output_dir, alpha_div, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue(os.path.exists(
                            os.path.join(output_dir,
                                         'category-a%20or%20b.jsonp')))
            self.assertFalse(os.path.exists(
                             os.path.join(output_dir,
                                          'category-bad.jsonp')))
            self.assertTrue('only a single' in open(index_fp).read())
            self.assertTrue('<strong>bad' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_alpha_rarefaction_with_phylogeny_and_metadata(self):
        t = biom.Table(np.array([[100, 111, 113], [111, 111, 112]]),
                       ['O1', 'O2'],
                       ['S1', 'S2', 'S3'])
        p = skbio.TreeNode.read(io.StringIO(
            '((O1:0.25, O2:0.50):0.25, O3:0.75)root;'))
        md = qiime2.Metadata(
            pd.DataFrame({'pet': ['russ', 'milo', 'peanut']},
                         index=['S1', 'S2', 'S3']))

        with tempfile.TemporaryDirectory() as output_dir:
            alpha_rarefaction(output_dir, t, max_depth=200, phylogeny=p,
                              metadata=md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue('observed_otus' in open(index_fp).read())
            self.assertTrue('shannon' in open(index_fp).read())
            self.assertTrue('faith_pd' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_simple(self):
        d = [[1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S1'],
             [1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S2'],
             [1.04, 1.5, 2., 2.5, 1.18, 2.82, 2.96, 3., 1, 3., 1., 'S3']]

        data = pd.DataFrame(data=d, columns=['2%', '25%', '50%', '75%', '9%',
                                             '91%', '98%', 'count', 'depth',
                                             'max', 'min', 'sample-id'])

        with tempfile.TemporaryDirectory() as output_dir:
            _alpha_rarefaction_jsonp(output_dir, 'peanut.jsonp', 'shannon',
                                     data, '')

            jsonp_fp = os.path.join(output_dir, 'peanut.jsonp')
            self.assertTrue(os.path.exists(jsonp_fp))
            jsonp_content = open(jsonp_fp).read()
            self.assertTrue('load_data' in jsonp_content)
            self.assertTrue('columns' in jsonp_content)
            self.assertTrue('index' in jsonp_content)
            self.assertTrue('data' in jsonp_content)
            self.assertTrue('sample-id' in jsonp_content)
            self.assertTrue('shannon' in jsonp_content)
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_bioenv(self):
        dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25],
                                   [0.25, 0.00, 0.00],
                                   [0.25, 0.00, 0.00]],
                                  ids=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame([['1.0', 'a'], ['2.0', 'b'], ['3.0', 'c']],
                         index=['sample1', 'sample2', 'sample3'],
                         columns=['metadata1', 'metadata2']))
        with tempfile.TemporaryDirectory() as output_dir:
            bioenv(output_dir, dm, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue('metadata1' in open(index_fp).read())

            self.assertTrue('not numerical' in open(index_fp).read())
            self.assertTrue('<strong>metadata2' in open(index_fp).read())

            self.assertFalse('Warning' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_bioenv_extra_metadata(self):
        dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25],
                                   [0.25, 0.00, 0.00],
                                   [0.25, 0.00, 0.00]],
                                  ids=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame([['1.0', 'a'], ['2.0', 'b'], ['3.0', 'c'],
                          ['4.0', 'd']],
                         index=['sample1', 'sample2', 'sample3', 'sample4'],
                         columns=['metadata1', 'metadata2']))
        with tempfile.TemporaryDirectory() as output_dir:
            bioenv(output_dir, dm, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue(os.path.exists(index_fp))
            self.assertTrue('metadata1' in open(index_fp).read())

            self.assertTrue('not numerical' in open(index_fp).read())
            self.assertTrue('<strong>metadata2' in open(index_fp).read())

            self.assertFalse('Warning' in open(index_fp).read())
项目:q2-diversity    作者:qiime2    | 项目源码 | 文件源码
def test_bioenv_zero_variance_column(self):
        dm = skbio.DistanceMatrix([[0.00, 0.25, 0.25],
                                   [0.25, 0.00, 0.00],
                                   [0.25, 0.00, 0.00]],
                                  ids=['sample1', 'sample2', 'sample3'])
        md = qiime2.Metadata(
            pd.DataFrame([['1.0', '2.0'], ['2.0', '2.0'], ['3.0', '2.0']],
                         index=['sample1', 'sample2', 'sample3'],
                         columns=['metadata1', 'metadata2']))
        with tempfile.TemporaryDirectory() as output_dir:
            bioenv(output_dir, dm, md)
            index_fp = os.path.join(output_dir, 'index.html')
            self.assertTrue('metadata1' in open(index_fp).read())

            self.assertTrue('no variance' in open(index_fp).read())
            self.assertTrue('<strong>metadata2' in open(index_fp).read())

            self.assertFalse('Warning' in open(index_fp).read())
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def run():
    urls = [
        x.strip()
        for x in URLS.strip().splitlines()
        if x.strip() and not x.strip().startswith('#')
    ]

    with tempfile.TemporaryDirectory(prefix='symbols') as tmpdirname:
        downloaded = download_all(urls, tmpdirname)
        save_filepath = 'symbols-for-systemtests.zip'
        total_time_took = 0.0
        total_size = 0
        with zipfile.ZipFile(save_filepath, mode='w') as zf:
            for uri, (fullpath, time_took, size) in downloaded.items():
                total_time_took += time_took
                total_size += size
                if fullpath:
                    path = uri.replace('v1/', '')
                    assert os.path.isfile(fullpath)
                    zf.write(
                        fullpath,
                        arcname=path,
                        compress_type=zipfile.ZIP_DEFLATED,
                    )
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def make_tempdir(prefix=None, suffix=None):
    """Decorator that adds a last argument that is the path to a temporary
    directory that gets deleted after the function has finished.

    Usage::

        @make_tempdir()
        def some_function(arg1, arg2, tempdir, kwargs1='one'):
            assert os.path.isdir(tempdir)
            ...
    """

    def decorator(func):

        @wraps(func)
        def inner(*args, **kwargs):
            with TemporaryDirectory(prefix=prefix, suffix=suffix) as f:
                args = args + (f,)
                return func(*args, **kwargs)

        return inner

    return decorator
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def test_zipfile_timestamp():
    # An environment variable can be used to influence the timestamp on
    # TarInfo objects inside the zip.  See issue #143.  TemporaryDirectory is
    # not a context manager under Python 3.
    with temporary_directory() as tempdir:
        for filename in ('one', 'two', 'three'):
            path = os.path.join(tempdir, filename)
            with codecs.open(path, 'w', encoding='utf-8') as fp:
                fp.write(filename + '\n')
        zip_base_name = os.path.join(tempdir, 'dummy')
        # The earliest date representable in TarInfos, 1980-01-01
        with environ('SOURCE_DATE_EPOCH', '315576060'):
            zip_filename = wheel.archive.make_wheelfile_inner(
                zip_base_name, tempdir)
        with readable_zipfile(zip_filename) as zf:
            for info in zf.infolist():
                assert info.date_time[:3] == (1980, 1, 1)
项目:triage    作者:dssg    | 项目源码 | 文件源码
def test_build_error(experiment_class):
    with testing.postgresql.Postgresql() as postgresql:
        db_engine = create_engine(postgresql.url())
        ensure_db(db_engine)

        with TemporaryDirectory() as temp_dir:
            experiment = experiment_class(
                config=sample_config(),
                db_engine=db_engine,
                model_storage_class=FSModelStorageEngine,
                project_path=os.path.join(temp_dir, 'inspections'),
            )

            with mock.patch.object(experiment, 'build_matrices') as build_mock:
                build_mock.side_effect = RuntimeError('boom!')

                with pytest.raises(RuntimeError):
                    experiment()
项目:triage    作者:dssg    | 项目源码 | 文件源码
def test_build_error_cleanup_timeout(_clean_up_mock, experiment_class):
    with testing.postgresql.Postgresql() as postgresql:
        db_engine = create_engine(postgresql.url())
        ensure_db(db_engine)

        with TemporaryDirectory() as temp_dir:
            experiment = experiment_class(
                config=sample_config(),
                db_engine=db_engine,
                model_storage_class=FSModelStorageEngine,
                project_path=os.path.join(temp_dir, 'inspections'),
                cleanup_timeout=0.02,  # Set short timeout
            )

            with mock.patch.object(experiment, 'build_matrices') as build_mock:
                build_mock.side_effect = RuntimeError('boom!')

                with pytest.raises(TimeoutError) as exc_info:
                    experiment()

    # Last exception is TimeoutError, but earlier error is preserved in
    # __context__, and will be noted as well in any standard traceback:
    assert exc_info.value.__context__ is build_mock.side_effect
项目:puresec-cli    作者:puresec    | 项目源码 | 文件源码
def get_function_root(self, name):
        if not hasattr(self, 'functions_output'):
            self.functions_output = TemporaryDirectory("puresec-serverless-functions-")

        package_name = self._get_function_package_name(name)
        function_root = os.path.join(self.functions_output.name, package_name)
        if os.path.exists(function_root):
            return function_root

        try:
            zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r')
        except FileNotFoundError:
            eprint("error: serverless package did not create a function zip for '{}'", name)
            raise SystemExit(2)
        except BadZipFile:
            eprint("error: serverless package did not create a valid function zip for '{}'", name)
            raise SystemExit(2)

        with zipfile:
            zipfile.extractall(function_root)
        return function_root