Python shutil 模块,rmtree() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.rmtree()

项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __close_computation(self, client=None, await_async=False, task=None):
        self.__server_locations.clear()
        if self._cur_computation:
            close_tasks = [SysTask(self.__close_node, node, self._cur_computation,
                                   await_async=await_async) for node in self._nodes.itervalues()]
            close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation)
                                for node in self._disabled_nodes.itervalues()
                                if node.status == Scheduler.NodeDiscovered])
            for close_task in close_tasks:
                yield close_task.finish()
        if self.__cur_client_auth:
            computation_path = os.path.join(self.__dest_path, self.__cur_client_auth)
            if os.path.isdir(computation_path):
                shutil.rmtree(computation_path, ignore_errors=True)
        if self._cur_computation and self._cur_computation.status_task:
            self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed,
                                                                  id(self._cur_computation)))
        self.__cur_client_auth = self._cur_computation = None
        self.__computation_sched_event.set()
        if client:
            client.send('closed')
        raise StopIteration(0)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __close_computation(self, client=None, await_async=False, task=None):
        self.__server_locations.clear()
        if self._cur_computation:
            close_tasks = [SysTask(self.__close_node, node, self._cur_computation,
                                   await_async=await_async) for node in self._nodes.values()]
            close_tasks.extend([SysTask(self.__close_node, node, self._cur_computation)
                                for node in self._disabled_nodes.values()
                                if node.status == Scheduler.NodeDiscovered])
            for close_task in close_tasks:
                yield close_task.finish()
        if self.__cur_client_auth:
            computation_path = os.path.join(self.__dest_path, self.__cur_client_auth)
            if os.path.isdir(computation_path):
                shutil.rmtree(computation_path, ignore_errors=True)
        if self._cur_computation and self._cur_computation.status_task:
            self._cur_computation.status_task.send(DispycosStatus(Scheduler.ComputationClosed,
                                                                  id(self._cur_computation)))
        self.__cur_client_auth = self._cur_computation = None
        self.__computation_sched_event.set()
        if client:
            client.send('closed')
        raise StopIteration(0)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _delete_directory_contents(self, dirpath, filter_func):
        """Delete all files in a directory.

        :param dirpath: path to directory to clear
        :type dirpath: ``unicode`` or ``str``
        :param filter_func function to determine whether a file shall be
            deleted or not.
        :type filter_func ``callable``

        """
        if os.path.exists(dirpath):
            for filename in os.listdir(dirpath):
                if not filter_func(filename):
                    continue
                path = os.path.join(dirpath, filename)
                if os.path.isdir(path):
                    shutil.rmtree(path)
                else:
                    os.unlink(path)
                self.logger.debug('Deleted : %r', path)
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def generate2():
    """
    Call an external Python 2 program to retrieve the AST symbols of that
    language version
    :return:
    """
    import subprocess as sp
    import tempfile, shutil, sys, traceback

    tempdir = tempfile.mkdtemp()
    tempfile = os.path.join(tempdir, "py2_ast_code.py")

    py2_proc_out = ""
    try:
        with open(tempfile, 'w') as py2code:
            py2code.write(generate_str + WRITESYMS_CODE)

        py2_proc_out = sp.check_output(["python2", tempfile]).decode()
    finally:
        try:
            shutil.rmtree(tempdir)
        except:
            print("Warning: error trying to delete the temporal directory:", file=sys.stderr)
            print(traceback.format_exc(), file=sys.stderr)
    return set(py2_proc_out.splitlines())
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def ensure_removed(self, path):
        if os.path.exists(path):
            if os.path.isdir(path) and not os.path.islink(path):
                logger.debug('Removing directory tree at %s', path)
                if not self.dry_run:
                    shutil.rmtree(path)
                if self.record:
                    if path in self.dirs_created:
                        self.dirs_created.remove(path)
            else:
                if os.path.islink(path):
                    s = 'link'
                else:
                    s = 'file'
                logger.debug('Removing %s %s', s, path)
                if not self.dry_run:
                    os.remove(path)
                if self.record:
                    if path in self.files_written:
                        self.files_written.remove(path)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_install():
    tempdir = mkdtemp()
    def get_supported():
        return list(wheel.pep425tags.get_supported()) + [('py3', 'none', 'win32')]
    whl = WheelFile(TESTWHEEL, context=get_supported)
    assert whl.supports_current_python(get_supported)
    try:
        locs = {}
        for key in ('purelib', 'platlib', 'scripts', 'headers', 'data'):
            locs[key] = os.path.join(tempdir, key)
            os.mkdir(locs[key])
        whl.install(overrides=locs)
        assert len(os.listdir(locs['purelib'])) == 0
        assert check(locs['platlib'], 'hello.pyd')
        assert check(locs['platlib'], 'hello', 'hello.py')
        assert check(locs['platlib'], 'hello', '__init__.py')
        assert check(locs['data'], 'hello.dat')
        assert check(locs['headers'], 'hello.dat')
        assert check(locs['scripts'], 'hello.sh')
        assert check(locs['platlib'], 'test-1.0.dist-info', 'RECORD')
    finally:
        shutil.rmtree(tempdir)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run(self):
        self.run_command("egg_info")
        from glob import glob

        for pattern in self.match:
            pattern = self.distribution.get_name() + '*' + pattern
            files = glob(os.path.join(self.dist_dir, pattern))
            files = [(os.path.getmtime(f), f) for f in files]
            files.sort()
            files.reverse()

            log.info("%d file(s) matching %s", len(files), pattern)
            files = files[self.keep:]
            for (t, f) in files:
                log.info("Deleting %s", f)
                if not self.dry_run:
                    if os.path.isdir(f):
                        shutil.rmtree(f)
                    else:
                        os.unlink(f)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def build_and_install(self, setup_script, setup_base):
        args = ['bdist_egg', '--dist-dir']

        dist_dir = tempfile.mkdtemp(
            prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
        )
        try:
            self._set_fetcher_options(os.path.dirname(setup_script))
            args.append(dist_dir)

            self.run_setup(setup_script, setup_base, args)
            all_eggs = Environment([dist_dir])
            eggs = []
            for key in all_eggs:
                for dist in all_eggs[key]:
                    eggs.append(self.install_egg(dist.location, setup_base))
            if not eggs and not self.dry_run:
                log.warn("No eggs found in %s (setup script problem?)",
                         dist_dir)
            return eggs
        finally:
            rmtree(dist_dir)
            log.set_verbosity(self.verbose)  # restore our log verbosity
项目:py_find_1st    作者:roebel    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:Adafruit_Python_PCA9685    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
项目:fuel-nailgun-extension-iac    作者:openstack    | 项目源码 | 文件源码
def create(self, data):
        if not os.path.exists(const.REPOS_DIR):
            os.mkdir(const.REPOS_DIR)
        repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
        if os.path.exists(repo_path):
            logger.debug('Repo directory exists. Removing...')
            shutil.rmtree(repo_path)

        user_key = data.get('user_key', '')
        if user_key:
            self._create_key_file(data['repo_name'], user_key)
            os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
        repo = Repo.clone_from(data['git_url'], repo_path)

        instance = super(GitRepo, self).create(data)
        instance.repo = repo
        return instance
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def tearDown(self):
        """clean up the test
        """
        shutil.rmtree(defaults.server_side_storage_path)

        # remove generic_temp_folder
        shutil.rmtree(self.temp_test_data_folder, ignore_errors=True)

        # remove repository
        shutil.rmtree(self.test_repo_path, ignore_errors=True)

        # clean up test database
        # from stalker.db.declarative import Base
        # Base.metadata.drop_all(db.DBSession.connection())
        # db.DBSession.commit()
        db.DBSession.remove()
        testing.tearDown()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def close(self):
        # TODO: should I do clean shutdown here? Do I have to?
        if self._makefile_refs < 1:
            self._closed = True
            if self.context:
                CoreFoundation.CFRelease(self.context)
                self.context = None
            if self._client_cert_chain:
                CoreFoundation.CFRelease(self._client_cert_chain)
                self._client_cert_chain = None
            if self._keychain:
                Security.SecKeychainDelete(self._keychain)
                CoreFoundation.CFRelease(self._keychain)
                shutil.rmtree(self._keychain_dir)
                self._keychain = self._keychain_dir = None
            return self.socket.close()
        else:
            self._makefile_refs -= 1
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def ensure_removed(self, path):
        if os.path.exists(path):
            if os.path.isdir(path) and not os.path.islink(path):
                logger.debug('Removing directory tree at %s', path)
                if not self.dry_run:
                    shutil.rmtree(path)
                if self.record:
                    if path in self.dirs_created:
                        self.dirs_created.remove(path)
            else:
                if os.path.islink(path):
                    s = 'link'
                else:
                    s = 'file'
                logger.debug('Removing %s %s', s, path)
                if not self.dry_run:
                    os.remove(path)
                if self.record:
                    if path in self.files_written:
                        self.files_written.remove(path)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def build_and_install(self, setup_script, setup_base):
        args = ['bdist_egg', '--dist-dir']

        dist_dir = tempfile.mkdtemp(
            prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
        )
        try:
            self._set_fetcher_options(os.path.dirname(setup_script))
            args.append(dist_dir)

            self.run_setup(setup_script, setup_base, args)
            all_eggs = Environment([dist_dir])
            eggs = []
            for key in all_eggs:
                for dist in all_eggs[key]:
                    eggs.append(self.install_egg(dist.location, setup_base))
            if not eggs and not self.dry_run:
                log.warn("No eggs found in %s (setup script problem?)",
                         dist_dir)
            return eggs
        finally:
            rmtree(dist_dir)
            log.set_verbosity(self.verbose)  # restore our log verbosity
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def clean(args, config):
    """ Main entrypoint for clean """

    lock_file = join(HACKSPORTS_ROOT, "deploy.lock")

    # remove staging directories
    if os.path.isdir(STAGING_ROOT):
        logger.info("Removing the staging directories")
        shutil.rmtree(STAGING_ROOT)

    # remove lock file
    if os.path.isfile(lock_file):
        logger.info("Removing the stale lock file")
        os.remove(lock_file)

    #TODO: potentially perform more cleaning
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _test_Valgrind(self, valgrind):
        # Clear the device cache to prevent false positives
        deviceCacheDir = os.path.join(scatest.getSdrCache(), ".ExecutableDevice_node", "ExecutableDevice1")
        shutil.rmtree(deviceCacheDir, ignore_errors=True)

        os.environ['VALGRIND'] = valgrind
        try:
            # Checking that the node and device launch as expected
            nb, devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
        finally:
            del os.environ['VALGRIND']

        self.assertFalse(devMgr is None)
        self.assertEquals(len(devMgr._get_registeredDevices()), 1, msg='device failed to launch with valgrind')
        children = getChildren(nb.pid)
        self.assertEqual(len(children), 1)
        devMgr.shutdown()

        # Check that a valgrind logfile exists
        logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0])
        self.assertTrue(os.path.exists(logfile))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def createTestDomain():
    domainName = getTestDomainName()
    print domainName

    domainPath = os.path.join(getSdrPath(), "dom", "domain")
    templatePath = os.path.join(getSdrPath(), "templates", "domain")
    # Create a test domain
    if os.path.isdir(domainPath):
        shutil.rmtree(domainPath)
    # Copy the template over
    shutil.copytree(templatePath, domainPath)
    # Open the DMD file and replace the name, using a very naive method
    dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "r")
    lines = dmd.read()
    dmd.close()
    lines = lines.replace("${DOMAINNAME}", domainName)
    dmd = open(os.path.join(domainPath, "DomainManager.dmd.xml"), "w+")
    dmd.write(lines)
    dmd.close()

    setupDeviceAndDomainMgrPackage()
项目:distributional_perspective_on_RL    作者:Kiwoo    | 项目源码 | 文件源码
def _demo():
    info("hi")
    debug("shouldn't appear")
    set_level(DEBUG)
    debug("should appear")
    dir = "/tmp/testlogging"
    if os.path.exists(dir):
        shutil.rmtree(dir)
    with session(dir=dir):
        record_tabular("a", 3)
        record_tabular("b", 2.5)
        dump_tabular()
        record_tabular("b", -2.5)
        record_tabular("a", 5.5)
        dump_tabular()
        info("^^^ should see a = 5.5")

    record_tabular("b", -2.5)
    dump_tabular()

    record_tabular("a", "longasslongasslongasslongasslongasslongassvalue")
    dump_tabular()
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def start_ab3(tmp_dir_loc, repo_dir, pkg_info, rm_abdir=False):
    start_time = int(time.time())
    os.chdir(tmp_dir_loc)
    if not copy_abd(tmp_dir_loc, repo_dir, pkg_info):
        return False
    # For logging support: ptyprocess.PtyProcessUnicode.spawn(['autobuild'])
    shadow_defines_loc = os.path.abspath(os.path.curdir)
    if not parser_pass_through(pkg_info, shadow_defines_loc):
        return False
    try:
        subprocess.check_call(['autobuild'])
    except:
        return False
    time_span = int(time.time()) - start_time
    print('>>>>>>>>>>>>>>>>>> Time for building\033[36m {} \033[0m:\033[36m {} \033[0mseconds'.format(
        pkg_info['NAME'], time_span))
    if rm_abdir is True:
        shutil.rmtree(os.path.abspath(os.path.curdir) + '/autobuild/')
    # Will get better display later
    return True
项目:qqmbr    作者:ischurov    | 项目源码 | 文件源码
def build(**args):
    freezer = Freezer(app)
    if args.get('base_url'):
        app.config['FREEZER_BASE_URL'] = args.get('base_url')
    app.config['mathjax_node'] = args.get('node_mathjax', False)
    app.config['MATHJAX_WHOLEBOOK'] = args.get('node_mathjax', False)
    app.config['FREEZER_DESTINATION'] = os.path.join(curdir, "build")
    app.config['freeze'] = True
    freezer.freeze()

    if args.get('copy_mathjax'):

        mathjax_postfix = os.path.join('assets', "js", "mathjax")
        mathjax_from = os.path.join(scriptdir, mathjax_postfix)
        mathjax_to = os.path.join(curdir, "build", mathjax_postfix)

        try:
            shutil.rmtree(mathjax_to)
        except FileNotFoundError:
            pass

        shutil.copytree(mathjax_from, mathjax_to)
项目:BackManager    作者:linuxyan    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:VEP_TMScripts    作者:uwgraphics    | 项目源码 | 文件源码
def createDir(name, force=False):
    if os.path.exists(name):
        if force:
            shutil.rmtree(name)
        else:
            response = raw_input('%s already exists. Do you wish to overwrite it? (y/n) ' % name)
            if response.lower() == 'y' or response.lower() == 'yes':
                shutil.rmtree(name)
            elif response.lower() == 'n' or response.lower() == 'no':
                print 'Modeler aborted.'
                exit(0)
            else:
                print 'Response not understood.'
                print 'Modeler aborted.'
                exit(1)
    os.mkdir(name)
项目:VEP_TMScripts    作者:uwgraphics    | 项目源码 | 文件源码
def createDir(name, force=False):
    if os.path.exists(name):
        if force:
            shutil.rmtree(name)
        else:
            response = raw_input('%s already exists. Do you wish to overwrite it? (y/n) ' % name)
            if response.lower() == 'y' or response.lower() == 'yes':
                shutil.rmtree(name)
            elif response.lower() == 'n' or response.lower() == 'no':
                print 'Modeler aborted.'
                exit(0)
            else:
                print 'Response not understood.'
                print 'Modeler aborted.'
                exit(1)
    os.mkdir(name)
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def delete_module(self,name):
        try:
            file = open("{directory}/route.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()

            shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))

            file = open("{directory}/tables.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()
        except Exception as e:
            print e
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def delete_module(self,name):
        try:
            file = open("{directory}/route.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()

            shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))

            file = open("{directory}/tables.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()
        except Exception as e:
            print e
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def delete_module(self,name):
        try:
            file = open("{directory}/route.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()

            shutil.rmtree('{dir_file}/modules/{name}'.format(dir_file=BASE_DIR,name=name))

            file = open("{directory}/tables.py".format(directory=BASE),"r+")
            readfile = file.readlines()
            file.seek(0)
            #delete = ("\n \nfrom openedoo.{module} import {module}".format(module=name))
            for line in readfile:
                if str(name) not in line:
                    file.writelines(line)
            file.truncate()
            file.close()
        except Exception as e:
            print e
项目:Adafruit_Python_ADS1x15    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def close(self):
        # TODO: should I do clean shutdown here? Do I have to?
        if self._makefile_refs < 1:
            self._closed = True
            if self.context:
                CoreFoundation.CFRelease(self.context)
                self.context = None
            if self._client_cert_chain:
                CoreFoundation.CFRelease(self._client_cert_chain)
                self._client_cert_chain = None
            if self._keychain:
                Security.SecKeychainDelete(self._keychain)
                CoreFoundation.CFRelease(self._keychain)
                shutil.rmtree(self._keychain_dir)
                self._keychain = self._keychain_dir = None
            return self.socket.close()
        else:
            self._makefile_refs -= 1
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with ContextualZipFile(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:shift-detect    作者:paolodedios    | 项目源码 | 文件源码
def clean_project_files(path_or_glob, logger) :
    """
    Resolve file name references and ensure they are properly deleted
    """
    if "*" in path_or_glob :
        files_to_clean = glob.glob(path_or_glob)
    else :
        files_to_clean = [os.path.expanduser(path_or_glob)]

    for file_to_clean in files_to_clean :
        if not os.path.exists(file_to_clean) :
            continue

        if os.path.isdir(file_to_clean) :
            logger.info("Removing directory {}".format(file_to_clean))
            shutil.rmtree(file_to_clean)
        else :
            logger.info("Removing file {}".format(file_to_clean))
            os.remove(file_to_clean)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False):
    if keep_data_files: return
    #Remove our tmpdir, but don't fail the test if it doesn't remove
    try:
        shutil.rmtree(tmpdir, ignore_errors=ignore_errors)
    except OSError as oe:
        error = ""
        if oe.errno: error = "%s: " % oe.errno
        if oe.strerror: error += oe.strerror
        if oe.filename: error += " (filename: %s)" % oe.filename
        log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error))
    except:
        log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0]))

##
# Called by signal handlers to clean-up then exit
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_get_picture(self):
    picture_root = os.path.join(
        MockRunMod.load_config(None)[helper.DATA_ROOT], '..', 'pictures')

    try:
      shutil.rmtree(picture_root)
    except:
      pass

    try:
      os.makedirs(picture_root)
    except:
      pass

    with open(os.path.join(picture_root, 'test.jpg'), 'wb') as out:
      out.write(b'abcde')

    rv = self.app.get('/picture?name=test.jpg&mediatype=image/jpeg')

    expected = b'abcde'
    actual = rv.data

    self.assertEqual(expected, actual)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _copy = copy_file.Subscriber(mock_pipeline)
    _copy.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1,
        'tag': 'default',
        helper.COPY_EXT: ['xyz']
    })

    _copy.consume(document.get_document('mock.xyz'), BytesIO(b'juba.'))
    _copy.consume(document.get_document('ignore.doc'), BytesIO(b'mock'))

    expected = ['39bbf948-mock.xyz']

    actual = os.listdir(os.path.join(data_root, 'files', 'xyz'))

    self.assertEqual(expected, actual)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_simple(self):
    mock_pipeline = test_helper.get_mock_pipeline([])

    data_root = os.path.join('local_data', 'unittests')

    if os.path.exists(data_root):
      shutil.rmtree(data_root)

    _store_text = store_text.Subscriber(mock_pipeline)
    _store_text.setup({
        helper.DATA_ROOT: data_root,
        'workers': 1
    })

    doc = document.get_document('mock')
    doc.text = 'mock-mock-mock'

    _store_text.consume(doc, None)

    expected = 'local_data/unittests/text/17404a59-mock'
    actual = doc.meta['text_file']

    self.assertEquals(expected, actual)
项目:tensorboard    作者:dmlc    | 项目源码 | 文件源码
def test_event_logging():
    logdir = './experiment/'
    summary_writer = FileWriter(logdir)
    scalar_value = 1.0
    s = scalar('test_scalar', scalar_value)
    summary_writer.add_summary(s, global_step=1)
    summary_writer.close()
    assert os.path.isdir(logdir)
    assert len(os.listdir(logdir)) == 1

    summary_writer = FileWriter(logdir)
    scalar_value = 1.0
    s = scalar('test_scalar', scalar_value)
    summary_writer.add_summary(s, global_step=1)
    summary_writer.close()
    assert os.path.isdir(logdir)
    assert len(os.listdir(logdir)) == 2

    # clean up.
    shutil.rmtree(logdir)
项目:foxbms-setup    作者:foxBMS    | 项目源码 | 文件源码
def clean(mcu_switch=None, supress_output=False):
    cmd = TOOLCHAIN_BASIC_CONFIGURE +  ' '
    if mcu_switch is None:
        sphinx_build_dir = os.path.join('build', 'sphinx')
        if os.path.isdir(sphinx_build_dir):
            shutil.rmtree(sphinx_build_dir)
            print "Successfully removed sphinx documentation"
        else:
            print 'Nothing to clean...'
        return
    elif mcu_switch == '-p' or mcu_switch == '-s' or  mcu_switch == '-b' :
        cmd += ' ' + mcu_switch + ' ' + 'clean'
    else:
        print 'Invalid clean argument: \'{}\''.format(mcu_switch)
        sys.exit(1)
    start_process(cmd, supress_output)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def neo4j_test_ws_dir(datafiles):
    return datafiles


# @pytest.fixture(scope="session")
# def workspace(request, data_directory):
#     wsconf_file = data_directory.join("workspace.yaml")
#     temp_root = tempfile.mkdtemp()
#     ws = Workspace("saapy-test-ws",
#                    temp_root,
#                    "saapy-test-ws",
#                    configuration_text=wsconf_file.read_text("utf-8"))
#
#     def fin():
#         shutil.rmtree(temp_root)
#
#     request.addfinalizer(fin)
#     return ws  # provide the fixture value
项目:Adafruit_Python_MCP4725    作者:adafruit    | 项目源码 | 文件源码
def archive_context(filename):
    # extracting the archive
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        with get_zip_class()(filename) as archive:
            archive.extractall()

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:packagecore    作者:BytePackager    | 项目源码 | 文件源码
def stop(self):
        print("Cleaning up self %s." % self.getName())
        # kill the running self
        _uncheckedDockerCommand(["kill", self.getName()])

        # remove self
        _uncheckedDockerCommand(["rm", self.getName()])

        # remove image -- later may be a preference to keep it
        #_uncheckedDockerCommand(["rmi", self.getImageName()])

        # remove shared directory
        try:
            if "BP_LEAVE_FILES" in os.environ:
                pass
            else:
                shutil.rmtree(self.getSharedDir())
        except OSError:
            print("Warning: failed to remove shared directory.", file=sys.stderr)
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def compare_component_output(self, input_path, expected_output_path):
        rendering_engine = self.get_rendering_engine()
        temp_dir = tempfile.gettempdir()
        output_dir = os.path.join(temp_dir, str(uuid.uuid4()))
        process_sketch_archive(zip_path=input_path, compress_zip=False,
                               output_path=output_dir, engine=rendering_engine)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        storage.clear()
        output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4())))
        process_sketch_archive(zip_path=input_path, compress_zip=True,
                               output_path=output_zip, engine=rendering_engine)
        z = zipfile.ZipFile(output_zip)
        z.extractall(output_dir)
        self.assertTrue(dircmp.is_same(expected_output_path, output_dir))
        shutil.rmtree(output_dir)
        os.remove(output_zip)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.__class__._instance = self
        self._nodes = {}
        self._disabled_nodes = {}
        self._avail_nodes = set()
        self._nodes_avail = pycos.Event()
        self._nodes_avail.clear()
        self._shared = False

        self._cur_computation = None
        self.__cur_client_auth = None
        self.__cur_node_allocations = []
        self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
        self.__ping_interval = kwargs.pop('ping_interval', 0)
        self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
        self._node_port = kwargs.pop('dispycosnode_port', 51351)
        self.__server_locations = set()
        self.__job_scheduler_task = None

        kwargs['name'] = 'dispycos_scheduler'
        clean = kwargs.pop('clean', False)
        nodes = kwargs.pop('nodes', [])
        self.pycos = pycos.Pycos.instance(**kwargs)
        self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
        if clean:
            shutil.rmtree(self.__dest_path)
        self.pycos.dest_path = self.__dest_path

        self.__computation_sched_event = pycos.Event()
        self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
        self.__client_task = SysTask(self.__client_proc)
        self.__timer_task = SysTask(self.__timer_proc)
        Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
        self.__client_task.register('dispycos_scheduler')
        self.pycos.discover_peers(port=self._node_port)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.__class__._instance = self
        self._nodes = {}
        self._disabled_nodes = {}
        self._avail_nodes = set()
        self._nodes_avail = pycos.Event()
        self._nodes_avail.clear()
        self._shared = False

        self._cur_computation = None
        self.__cur_client_auth = None
        self.__cur_node_allocations = []
        self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval)
        self.__ping_interval = kwargs.pop('ping_interval', 0)
        self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval)
        self._node_port = kwargs.pop('dispycosnode_port', 51351)
        self.__server_locations = set()
        self.__job_scheduler_task = None

        kwargs['name'] = 'dispycos_scheduler'
        clean = kwargs.pop('clean', False)
        nodes = kwargs.pop('nodes', [])
        self.pycos = pycos.Pycos.instance(**kwargs)
        self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler')
        if clean:
            shutil.rmtree(self.__dest_path)
        self.pycos.dest_path = self.__dest_path

        self.__computation_sched_event = pycos.Event()
        self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes)
        self.__client_task = SysTask(self.__client_proc)
        self.__timer_task = SysTask(self.__timer_proc)
        Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc)
        self.__client_task.register('dispycos_scheduler')
        self.pycos.discover_peers(port=self._node_port)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def sync_directory(src, dest, opts=None):
    if os.path.exists(dest):
        logging.debug('Removing existing directory: %s' % dest)
        shutil.rmtree(dest)
    logging.info('Syncing directory: %s -> %s.' % (src, dest))

    shutil.copytree(src, dest, ignore=get_filter(opts))
    ensure_init(dest)
项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def rmdir(d):
    ''' Remove an existingdirectory'''
    if os.path.exists(d):
        shutil.rmtree(d)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def gen_makeself(conf_dir,alias):
    mkself_tmp = os.path.join(conf_dir,'tmp')
    conf_mkself = os.path.join(conf_dir,'Installers')
    if not os.path.exists(conf_mkself):
        os.makedirs(conf_mkself)
    if not os.path.exists(mkself_tmp):
        os.makedirs(mkself_tmp)
    if sys.platform.startswith('darwin'):
        alias_app = os.path.join(conf_dir,'{}.app'.format(alias))
        if os.path.exists(alias_app):
            run_command('cp -R {} {}'.format(alias_app,mkself_tmp))
            gen_osx_plist(alias,mkself_tmp)
            gen_st_setup(alias,mkself_tmp)
            mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
            st_log.info(mkself_installer)
            st_log.info(run_command(mkself_installer))
            shutil.rmtree(mkself_tmp)
    else:
        binry_dir = os.path.join(conf_dir,'Binaries')
        alias_dir = os.path.join(binry_dir, alias)
        if os.path.exists(alias_dir):
            run_command('cp -R {} {}'.format(alias_dir,mkself_tmp))
            gen_lnx_daemon(alias,mkself_tmp)
            gen_st_setup(alias,mkself_tmp)
            mkself_installer = 'bash "{}" "{}" "{}/{}_Installer" "Stitch" bash st_setup.sh'.format(mkself_exe, mkself_tmp, conf_mkself,alias)
            st_log.info(mkself_installer)
            st_log.info(run_command(mkself_installer))
            shutil.rmtree(mkself_tmp)
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def tearDown(self):
        """
        Destroy the app
        """
        if self.app:
            self.app.frame.Hide()
            self.app.OnCloseFrame(wx.PyEvent())
            self.app.frame.Destroy()
        del os.environ['PYUPDATER_FILESERVER_DIR']
        del os.environ['WXUPDATEDEMO_TESTING']
        shutil.rmtree(self.fileServerDir)
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def tearDown(self):
        """
        Destroy the app
        """
        if self.app:
            self.app.frame.Hide()
            self.app.OnCloseFrame(wx.PyEvent())
            self.app.frame.Destroy()
        del os.environ['PYUPDATER_FILESERVER_DIR']
        del os.environ['WXUPDATEDEMO_TESTING']
        shutil.rmtree(self.fileServerDir)