Python os.path 模块,isfile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.isfile()

项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_uu_encoding(self):
        """
        Test the encoding of data; this is nessisary prior to a post
        """

        # First we take a binary file
        binary_filepath = join(self.var_dir, 'joystick.jpg')
        assert isfile(binary_filepath)

        # Initialize Codec
        encoder = CodecUU(work_dir=self.test_dir)

        content = encoder.encode(binary_filepath)

        # We should have gotten an ASCII Content Object
        assert isinstance(content, NNTPAsciiContent) is True

        # We should actually have content associated with out data
        assert len(content) > 0
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_hexdump(self):
        """converts binary content to hexidecimal in a standard
        easy to read format
        """
        # Compare File
        hexdump_file = join(self.var_dir, 'hexdump.txt')
        assert isfile(hexdump_file)

        all_characters = ''.join(map(chr, range(0, 256)))
        with open(hexdump_file, 'r') as fd_in:
            ref_data = fd_in.read()

        # when reading in content, there is a new line appeneded
        # after the last line (even if one isn't otherwise present)
        # rstrip() to just simplify the test by stripping off
        # all trailing whitespace
        assert hexdump(all_characters) == ref_data.rstrip()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def session(self, reset=False):
        """
        Returns a database session
        """
        if not self._loaded:
            return False

        if not isfile(self.db_path):
            reset = True

        if self._db and reset is True:
            self._db = None

        if self._db is None:
            # Reset our database
            self._db = NNTPGetDatabase(engine=self.engine, reset=reset)

        # Acquire our session
        return self._db.session()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def session(self, reset=False):
        """
        Returns a database session
        """
        if not self._loaded:
            return False

        if not isfile(self.db_path):
            reset = True

        if self._db and reset is True:
            self._db = None

        if self._db is None:
            # Reset our database
            self._db = NNTPPostDatabase(engine=self.engine, reset=reset)

        # Acquire our session
        return self._db.session()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def database_reset(ctx):
    """
    Reset's the database based on the current configuration
    """
    logger.info('Resetting database ...')
    ctx['NNTPSettings'].open(reset=True)
    __db_prep(ctx)

    db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search')
    logger.debug('Scanning %s for databases...' % db_path)
    with pushd(db_path, create_if_missing=True):
        for entry in listdir(db_path):
            db_file = join(db_path, entry)
            if not isfile(db_file):
                continue

            try:
                unlink(db_file)
                logger.info('Removed %s ...' % entry)
            except:
                logger.warning('Failed to remove %s ...' % entry)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def dirsize(src):
    """
    Takes a source directory and returns the entire size of all of it's
    content(s) in bytes.

    The function returns None if the size can't be properly calculated.
    """
    if not isdir(src):
        # Nothing to return
        return 0

    try:
        with pushd(src, create_if_missing=False):
            size = sum(getsize(f) for f in listdir('.') if isfile(f))

    except (OSError, IOError):
        return None

    # Return our total size
    return size
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def enumerate_backups_entities():
        """Enumerates the entities of all the available backups"""
        if isdir(Backuper.backups_dir):

            # Look for subdirectories
            for directory in listdir(Backuper.backups_dir):
                entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo')

                # Ensure the entity.pickle file exists
                if isfile(entity_file):

                    # Load and yield it
                    with open(entity_file, 'rb') as file:
                        with BinaryReader(stream=file) as reader:
                            try:
                                yield reader.tgread_object()
                            except TypeNotFoundError:
                                # Old user, scheme got updated, don't care.
                                pass

    #endregion

    #region Backup exists and deletion
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def backup_propic(self):
        """Backups the profile picture for the given
           entity as the current peer profile picture, returning its path"""

        # Allow multiple versions of the profile picture
        # TODO Maybe this should be another method, because when downloading media... We also have multiple versions
        filename = self.media_handler.get_propic_path(self.entity, allow_multiple=True)
        generic_filename = self.media_handler.get_propic_path(self.entity)
        if filename:  # User may not have a profile picture
            if not isfile(filename):
                # Only download the file if it doesn't exist yet
                self.client.download_profile_photo(self.entity.photo,
                                                   file_path=filename,
                                                   add_extension=False)
                # If we downloaded a new version, copy it to the "default" generic file
                if isfile(generic_filename):
                    remove(generic_filename)
                shutil.copy(filename, generic_filename)

            # The user may not have a profile picture
            return generic_filename
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def storageindex(self):
        #get the filelist
        onlyfiles = [ f for f in listdir(self.indexdata) if isfile(join(self.indexdata,f)) ]
        #read from using pandas
        for f in onlyfiles:
            df = pd.read_csv(self.indexdata+"/"+f)
            s=f.split('.')
            name = s[0][2:8]
            records = json.loads(df.T.to_json()).values()
            for row in records:
                row['date'] = datetime.datetime.strptime(row['date'], "%Y-%m-%d")
            print name
            self.index[name].insert_many(records)



    #storage stock pool into database
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def find_rain(src, paths=[]):
  if src[0] == '/':
    paths = ['']
  elif src[0] != '.':
    paths = get_paths() + paths

  for path in paths:
    if isfile(join(path, src) + '.rn'):
      return join(path, src) + '.rn'
    elif isfile(join(path, src)) and src.endswith('.rn'):
      return join(path, src)
    elif isdir(join(path, src)) and isfile(join(path, src, '_pkg.rn')):
      return join(path, src, '_pkg.rn')


# find any file from a string
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def find_name(src):
  path = os.path.abspath(src)
  path, name = os.path.split(path)
  fname, ext = os.path.splitext(name)

  if fname == '_pkg':
    _, fname = os.path.split(path)

  mname = normalize_name(fname)

  proot = []
  while path and os.path.isfile(join(path, '_pkg.rn')):
    path, name = os.path.split(path)
    proot.insert(0, normalize_name(name))

  if not src.endswith('_pkg.rn'):
    proot.append(mname)

  qname = '.'.join(proot)

  return (qname, mname)
项目:MercrediFiction    作者:Meewan    | 项目源码 | 文件源码
def clean_epub_directory():
    epubs = listdir(config.EPUB_DIRECTORY)
    if len(epubs) <= config.MAX_EPUB:
        return

    epubs.sort()

    number_to_delete = len(epubs) - config.MAX_EPUB + 2
    deleted = 0
    for t in epubs:
        f = path.join(config.EPUB_DIRECTORY, t)
        if not path.isfile(f):
            continue
        if deleted >= number_to_delete:
            break
        try:
            remove(f)
            deleted += 1
        except OSError:
            pass
项目:mechanic    作者:server-mechanic    | 项目源码 | 文件源码
def loadConfig(self, config):
    configFile = config.getConfigFile()
    if not isfile(configFile):
      self.logger.warn("Config file %s does not exist. Using defaults." % configFile)
      return config

    self.logger.debug("Loading config from %s." % configFile)
    loader = ConfigParser.SafeConfigParser(allow_no_value=True)
    loader.add_section("main")
    loader.set("main", "log_file", config.logFile)
    loader.set("main", "migration_dirs", join(config.migrationDirs, ":"))
    loader.set("main", "pre_migration_dirs", join(config.preMigrationDirs, ":"))
    loader.set("main", "post_migration_dirs", join(config.postMigrationDirs, ":"))
    loader.set("main", "state_dir", config.stateDir)
    loader.set("main", "run_dir", config.runDir)
    loader.read(configFile)

    config.logFile = loader.get("main", "log_file")
    config.migrationDirs = split(loader.get("main", "migration_dirs"), ":")
    config.preMigrationDirs = split(loader.get("main", "pre_migration_dirs"), ":")
    config.postMigrationDirs = split(loader.get("main", "post_migration_dirs"), ":")
    config.stateDir = loader.get("main", "state_dir")
    config.runDir = loader.get("main", "run_dir")

    return config
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _record_filter(args, base_dir):
    """
    Save the filter provided
    """
    filter_file = '{}/.filter'.format(base_dir)

    if not isfile(filter_file):
        # do a touch filter_file
        open(filter_file, 'a').close()

    current_filter = {}
    with open(filter_file) as filehandle:
        current_filter = yaml.load(filehandle)
    if current_filter is None:
        current_filter = {}

    pprint.pprint(current_filter)

    # filter a bunch of salt content and the target key before writing
    rec_args = {k: v for k, v in args.items() if k is not 'target' and not
                k.startswith('__')}
    current_filter[args['target']] = rec_args

    with open(filter_file, 'w') as filehandle:
        yaml.dump(current_filter, filehandle, default_flow_style=False)
项目:DenoiseAverage    作者:Pella86    | 项目源码 | 文件源码
def __init__(self, title = None, pathfile = None, debug_mode = True, debug_level = 0):
        self.path_file = pathfile
        self.debug_mode = debug_mode
        self.starttime = time.perf_counter()
        self.nowtime = time.perf_counter()
        self.lastcall = time.perf_counter() 
        self.debug_level = debug_level

        # create file?
#        if not isfile(self.path_file):
#            with open(self.path_file, 'w') as f:
#                f.write("-init log file-")

        if title is not None:
            today = datetime.datetime.now()   
            s = title + " program started the " + today.strftime("%d of %b %Y at %H:%M")
            self.log("=============================================================\n" +
                     s +
                     "\n=============================================================")
项目:mycroft-light    作者:MatthewScholefield    | 项目源码 | 文件源码
def download_model(lang, paths):
    model_folder = join(paths.user_config, 'model')
    model_en_folder = join(model_folder, lang)

    if not isdir(model_folder):
        mkdir(model_folder)
    if not isdir(model_en_folder):
        mkdir(model_en_folder)
        file_name = paths.model_dir + '.tar.gz'
        if not isfile(file_name):
            import urllib.request
            import shutil
            url = 'https://github.com/MatthewScholefield/pocketsphinx-models/raw/master/' + lang + '.tar.gz'
            with urllib.request.urlopen(url) as response, open(file_name, 'wb') as file:
                shutil.copyfileobj(response, file)

        import tarfile
        tar = tarfile.open(file_name)
        tar.extractall(path=model_en_folder)
        tar.close()
项目:mycroft-light    作者:MatthewScholefield    | 项目源码 | 文件源码
def generate(self, name, data):
        """
        Translate the data into different formats
        Depending on the format, this can be accessed different ways
        Args:
            name (IntentName): full intent name
            data (dict): dict containing all data from the skill
        Returns:
            bool: Whether the data could be generated
        """
        for dir_fn in [self.rt.paths.skill_vocab, self.rt.paths.skill_formats]:
            file_name = join(dir_fn(name.skill), name.intent + self._extension)
            if isfile(file_name):
                with open(file_name, 'r') as file:
                    log.debug('Generating', self.__class__.__name__ + '...')
                    self.generate_format(file, data)
                return True
        return False
项目:Gnome-Authenticator    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def create_file(file_path):
    """
        Create a file and create parent folder if missing
    """
    if not (path.isfile(file_path) and path.exists(file_path)):
        dirs = file_path.split("/")
        i = 0
        while i < len(dirs) - 1:
            directory = "/".join(dirs[0:i + 1]).strip()
            if not path.exists(directory) and len(directory) != 0:
                makedirs(directory)
                logging.debug("Creating directory %s " % directory)
            i += 1
        mknod(file_path)
        return True
    else:
        return False
项目:Gnome-Authenticator    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def screenshot_area():
    """
        Screenshot an area of the screen using gnome-screenshot
        used to QR scan
    """
    ink_flag = call(['which', 'gnome-screenshot'], stdout=PIPE, stderr=PIPE)
    if ink_flag == 0:
        file_name = path.join(GLib.get_tmp_dir(), NamedTemporaryFile().name)
        p = Popen(["gnome-screenshot", "-a", "-f", file_name],
                  stdout=PIPE, stderr=PIPE)
        output, error = p.communicate()
        if error:
            error = error.decode("utf-8").split("\n")
            logging.error("\n".join([e for e in error]))
        if not path.isfile(file_name):
            logging.debug("The screenshot was not token")
            return False
        return file_name
    else:
        logging.error(
            "Couldn't find gnome-screenshot, please install it first")
        return False
项目:icrawler    作者:hellock    | 项目源码 | 文件源码
def feed(self, url_list, offset=0, max_num=0):
        if isinstance(url_list, str):
            if path.isfile(url_list):
                with open(url_list, 'r') as fin:
                    url_list = [line.rstrip('\n') for line in fin]
            else:
                raise IOError('url list file {} not found'.format(url_list))
        elif not isinstance(url_list, list):
            raise TypeError('"url_list" can only be a filename or a str list')

        if offset < 0 or offset >= len(url_list):
            raise ValueError('"offset" exceed the list length')
        else:
            if max_num > 0:
                end_idx = min(len(url_list), offset + max_num)
            else:
                end_idx = len(url_list)
            for i in range(offset, end_idx):
                url = url_list[i]
                self.out_queue.put(url)
                self.logger.debug('put url to url_queue: {}'.format(url))
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def rev_parse_manifest_path(self, cwd):
        """
        Search parent directories for package.json.

        Starting at the current working directory. Go up one directory
        at a time checking if that directory contains a package.json
        file. If it does, return that directory.

        """

        name = 'package.json'
        manifest_path = path.normpath(path.join(cwd, name))

        bin_path = path.join(cwd, 'node_modules/.bin/')

        if path.isfile(manifest_path) and path.isdir(bin_path):
            return manifest_path

        parent = path.normpath(path.join(cwd, '../'))

        if parent == '/' or parent == cwd:
            return None

        return self.rev_parse_manifest_path(parent)
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def clear_cache(force = False):
  """
  If the folder exists, and has more than 5MB of icons in the cache, delete
  it to clear all the icons then recreate it.
  """
  from os.path import getsize, join, isfile, exists
  from os import makedirs, listdir
  from sublime import cache_path
  from shutil import rmtree

  # The icon cache path
  icon_path = join(cache_path(), "GutterColor")

  # The maximum amount of space to take up
  limit = 5242880 # 5 MB

  if exists(icon_path):
    size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f)))
    if force or (size > limit): rmtree(icon_path)

  if not exists(icon_path): makedirs(icon_path)
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def fix_scheme_in_settings(settings_file,current_scheme, new_scheme, regenerate=False):
  """Change the color scheme in the given Settings to a background-corrected one"""
  from os.path import join, normpath, isfile

  settings = load_settings(settings_file)
  settings_scheme = settings.get("color_scheme")
  if current_scheme == settings_scheme:
    new_scheme_path =  join(packages_path(), normpath(new_scheme[len("Packages/"):]))
    if isfile(new_scheme_path) and not regenerate:
      settings.set("color_scheme", new_scheme)
    else:
      generate_scheme_fix(current_scheme, new_scheme_path)
      settings.set("color_scheme", new_scheme)
    save_settings(settings_file)
    return True
  return False
项目:BirdProject    作者:ZlodeiBaal    | 项目源码 | 文件源码
def PrepareDataList(BASE, length):
    List = []
    for M in range(0,min(length,len(BASE))):
        img, text = BASE[M]
        image = misc.imread(img,mode='RGB')
        #image = misc.imresize(image, [227, 227])
        r1 = []
        if isfile(text):
            f = open(text, 'r')
            s = f.readline()
            st = s.split(' ')
            for i in range(0,2):
                r1.append(int(st[i]))
            f.close()
        else: #If there are no txt file - "no bird situation"
            r1.append(0);
            r1.append(0);
        List.append([image,r1])
    return List

# Random test and train list
项目:sublime-simple-import    作者:vinpac    | 项目源码 | 文件源码
def findAllModules(self, project_path):
    modules = []
    if path.isfile(path.join(project_path, 'package.json')):
      packageJsonFile = open(path.join(project_path, 'package.json'))
      try:
        packageJson = json.load(packageJsonFile)
        for key in [
          "dependencies",
          "devDependencies",
          "peerDependencies",
          "optionalDependencies"
        ]:
          if key in packageJson:
            modules += packageJson[key].keys()

      except ValueError:
        SimpleImport.log_error("Failed to load package.json at {0}".format(
          self.project_path
        ))

      # Close file
      packageJsonFile.close()

    return modules
项目:sublime-simple-import    作者:vinpac    | 项目源码 | 文件源码
def getProjectSettings(self):
    SETTINGS_FILE = SimpleImportCommand.SETTINGS_FILE

    if path.isfile(path.join(self.project_path, SETTINGS_FILE)):
      with open(path.join(self.project_path, SETTINGS_FILE)) as raw_json:
        try:
          settings_json = json.load(raw_json)
          if self.interpreter.syntax in settings_json:
            settings = settings_json[self.interpreter.syntax]
            if "$path" in settings:
              settings_on_file = {}
              for match in settings["$path"]:
                if len(match) == 1:
                  settings_on_file.update(match[0])
                else:
                  pattern = '|'.join(match[:-1])
                  if re.search("^{0}".format(pattern), self.view_relpath):
                    settings_on_file.update(match[-1])
              return settings_on_file
            else:
              return settings_json[self.interpreter.syntax]
        except ValueError:
          print("Failed to load .simple-import.json at {0}".format(self.project_path))
项目:underthesea    作者:magizbox    | 项目源码 | 文件源码
def download_component(component_name):
    try:
        component = [component for component in components if
                     component["name"] == component_name][0]
        try:
            folder = dirname(dirname(__file__))
            file_name = join(folder, join(*component["destination"]))
            if isfile(file_name):
                print(
                "Component '{}' is already existed.".format(component["name"]))
            else:
                print("Start download component '{}'".format(component["name"]))
                print(file_name)
                download_file(component["url"], file_name)
                print("Finish download compoent '{}'".format(component["name"]))
        except Exception as e:
            print(e)
            print("Cannot download component '{}'".format(component["name"]))
    except:
        message = "Error: Component with name '{}' does not exist.".format(
            component_name)
        print(message)
项目:PyBackup    作者:FRReinert    | 项目源码 | 文件源码
def clean_list(self):
        '''
        Manage the list of items
        to backup
        '''

        # Case <backup_itens> is empty 
        if self.backup_itens == []:
            msg = "After version 0.0.4 <backup_itens> cannot be empty"
            self.log.update_log(msg)
            raise BaseException(msg) from None

        # Add items
        for item in self.backup_itens:
            if path.isfile(path.abspath(item)) or path.isdir(path.abspath(item)):
                self.backup_list.append(path.abspath(item))
            else:
                log.update_log("Invalid item. It'll be wiped from backup list: <%s>" % item, 'INFO')
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def renameCB(self, newname):
        if newname and newname != 'bootname' and newname != self.oldname:
            if not path.exists('/boot/%s' %newname) and path.isfile('/boot/%s' %self.oldname):
                ret = system("mv -fn '/boot/%s' '/boot/%s'" %(self.oldname,newname))
                if ret:
                    self.session.open(MessageBox, _('Rename failed!'), MessageBox.TYPE_ERROR)
                else:
                    bootname = self.readlineFile('/boot/bootname').split('=')
                    if len(bootname) == 2 and bootname[1] == self.oldname:
                        self.writeFile('/boot/bootname', '%s=%s' %(bootname[0],newname))
                        self.getCurrent()
                        return
                    elif self.bootname == self.oldname:
                        self.getCurrent()
                        return
                    self.list[self.selection] = newname
                    self["config"].setText(_("Select Image: %s") %newname)
            else:
                if not path.exists('/boot/%s' %self.oldname):
                    self.getCurrent()
                    txt = _("File not found - rename failed!")
                else:
                    txt = _("Name already exists - rename failed!")
                self.session.open(MessageBox, txt, MessageBox.TYPE_ERROR)
项目:ceph-medic    作者:ceph    | 项目源码 | 文件源码
def get_host_file(_path=None):
    """
    If ``_path`` is passed in, return that (makes it easier for the caller if
    it can pass a ``None``) otherwise look into the most common locations for
    a host file and if found, return that.
    """
    if _path:
        return _path
    # if no path is passed onto us try and look in the cwd for a hosts file
    if os.path.isfile('hosts'):
        logger.info(
            'found and loaded the hosts file from the current working directory: %s',
            os.getcwd()
        )
        return path.abspath('hosts')

    # if that is not the case, try for /etc/ansible/hosts
    if path.isfile('/etc/ansible/hosts'):
        return '/etc/ansible/hosts'
    logger.warning('unable to find an Ansible hosts file to work with')
    logger.warning('tried locations: %s, %s', os.getcwd(), '/etc/ansible/hosts')
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _generate_sparse_format_file(self, feature_file):
        sparse_file = insert_modifier_in_filename(feature_file, 'sparse_format')
        if path.isfile(sparse_file):
            self.logger.info("Re-using previously generated sparse format file: %s" % sparse_file)
        else:
            self.logger.info('Generating a sparse version of the feature file (zeros replaced with empty columns '
                             'which the ranker knows how to deal with)')
            temp_file = get_temp_file(sparse_file)
            with smart_file_open(temp_file, 'w') as outfile:
                writer = csv.writer(outfile)
                with smart_file_open(feature_file) as infile:
                    reader = csv.reader(infile)
                    for row in reader:
                        writer.writerow(row[:1] + row[2:])
            move(temp_file, sparse_file)
        self.logger.info('Done generating file: %s' % sparse_file)

        return self._get_file_size(sparse_file), sparse_file
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _drop_answer_id_col_from_feature_file(self, train_file_location):
        file_without_aid = insert_modifier_in_filename(train_file_location, 'no_aid')
        if path.isfile(file_without_aid):
            self.logger.info('Found a previously generated version of the training file without answer id column, '
                             're-using it: %s' % file_without_aid)
        else:
            self.logger.info('Generating a version of the feature file without answer id (which is what ranker'
                             ' training expects')
            temp_file = get_temp_file(file_without_aid)
            with smart_file_open(temp_file, 'w') as outfile:
                writer = csv.writer(outfile)
                with smart_file_open(train_file_location) as infile:
                    reader = csv.reader(infile)
                    for row in reader:
                        writer.writerow(row[:1] + row[2:])
            move(temp_file, file_without_aid)
            self.logger.info('Done generating file: %s' % file_without_aid)
        return file_without_aid
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def str_is_existing_path(path: str) -> str:
    """
    >>> import tempfile

    >>> with tempfile.TemporaryFile() as f:
    ...     str_is_existing_file(f.name) == f.name
    True

    >>> with tempfile.TemporaryDirectory() as path:
    ...     str_is_existing_path(path) == path
    True

    >>> str_is_existing_path('')
    Traceback (most recent call last):
    argparse.ArgumentTypeError: Given path is not an existing file or directory.

    >>> str_is_existing_path('/non/existing/dir')
    Traceback (most recent call last):
    argparse.ArgumentTypeError: Given path is not an existing file or directory.
    """
    if isfile(path) or isdir(path):
        return path
    else:
        raise ArgumentTypeError("Given path is not an existing file or directory.")
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def str_is_existing_file(path: str) -> str:
    """
    >>> import tempfile

    >>> with tempfile.TemporaryFile() as f:
    ...     str_is_existing_file(f.name) == f.name
    True

    >>> str_is_existing_file('/home')
    Traceback (most recent call last):
    argparse.ArgumentTypeError: Given path is not an existing file.

    >>> str_is_existing_file('')
    Traceback (most recent call last):
    argparse.ArgumentTypeError: Given path is not an existing file.

    >>> str_is_existing_file('/non/existing/file')
    Traceback (most recent call last):
    argparse.ArgumentTypeError: Given path is not an existing file.
    """
    if isfile(path):
        return path
    else:
        raise ArgumentTypeError("Given path is not an existing file.")
项目:embeddings    作者:vzhong    | 项目源码 | 文件源码
def ensure_file(name, url=None, force=False, logger=logging.getLogger(), postprocess=None):
        """
        Ensures that the file requested exists in the cache, downloading it if it does not exist.

        Args:
            name (str): name of the file.
            url (str): url to download the file from, if it doesn't exist.
            force (bool): whether to force the download, regardless of the existence of the file.
            logger (logging.Logger): logger to log results.
            postprocess (function): a function that, if given, will be applied after the file is downloaded. The function has the signature ``f(fname)``

        Returns:
            str: file name of the downloaded file.

        """
        fname = Embedding.path(name)
        if not path.isfile(fname) or force:
            if url:
                logger.critical('Downloading from {} to {}'.format(url, fname))
                Embedding.download_file(url, fname)
                if postprocess:
                    postprocess(fname)
            else:
                raise Exception('{} does not exist!'.format(fname))
        return fname
项目:PSPNet-Keras-tensorflow    作者:Vladkryvoruchko    | 项目源码 | 文件源码
def __init__(self, nb_classes, resnet_layers, input_shape, weights):
        """Instanciate a PSPNet."""
        self.input_shape = input_shape
        json_path = join("weights", "keras", weights + ".json")
        h5_path = join("weights", "keras", weights + ".h5")
        if isfile(json_path) and isfile(h5_path):
            print("Keras model & weights found, loading...")
            with open(json_path, 'r') as file_handle:
                self.model = model_from_json(file_handle.read())
            self.model.load_weights(h5_path)
        else:
            print("No Keras model & weights found, import from npy weights.")
            self.model = layers.build_pspnet(nb_classes=nb_classes,
                                             resnet_layers=resnet_layers,
                                             input_shape=self.input_shape)
            self.set_npy_weights(weights)
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def run_makefile(make_directory):
    """
    Runs a makefile in a given directory.

    Args:
        make_directory: directory where the Makefile is located.
    """

    make_path = path.join(make_directory, "Makefile")

    if not path.isfile(make_path):
        raise InternalException(make_path + " does not exist.")

    shell = spur.LocalShell()

    try:
        shell.run(["make", "-C", make_directory])
    except Exception as e:
        raise InternalException(str(e))
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def analyze_problems():
    """
    Checks the sanity of inserted problems.
    Includes weightmap and grader verification.

    Returns:
        A list of error strings describing the problems.
    """

    grader_missing_error = "{}: Missing grader at '{}'."
    unknown_weightmap_pid = "{}: Has weightmap entry '{}' which does not exist."

    problems = get_all_problems()

    errors = []

    for problem in problems:
        if not isfile(join(grader_base_path, problem["grader"])):
            errors.append(grader_missing_error.format(problem["name"], problem["grader"]))

        for pid in problem["weightmap"].keys():
            if safe_fail(get_problem, pid=pid) is None:
                errors.append(unknown_weightmap_pid.format(problem["name"], pid))
    return errors
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def get_generator(pid):
    """
    Gets a handle on a problem generator module.

    Args:
        pid: the problem pid
    Returns:
        The loaded module
    """

    generator_path = get_generator_path(pid)

    if not path.isfile(generator_path):
        raise InternalException("Could not find {}.".format(generator_path))

    return imp.load_source(generator_path[:-3], generator_path)
项目:QuitStore    作者:AKSW    | 项目源码 | 文件源码
def getgraphsfromdir(self, path=None):
        """Get the files that are part of the repository (tracked or not).

        Returns:
            A list of filepathes.
        """
        if path is None:
            path = self.getRepoPath()
        files = [f for f in listdir(path) if isfile(join(path, f))]
        graphfiles = {}
        for file in files:
            format = guess_format(file)
            if format is not None:
                graphfiles[file] = format

        return graphfiles
项目:conda-tools    作者:groutr    | 项目源码 | 文件源码
def _parse(self):
        """
        parse the history file and return a list of
        tuples(datetime strings, set of distributions/diffs, comments)
        """
        res = []
        if not isfile(self.path):
            return res
        sep_pat = re.compile(r'==>\s*(.+?)\s*<==')
        with open(self.path, 'r') as f:
            lines = f.read().splitlines()
        for line in lines:
            line = line.strip()
            if not line:
                continue
            m = sep_pat.match(line)
            if m:
                res.append((m.group(1), set(), []))
            elif line.startswith('#'):
                res[-1][2].append(line)
            else:
                res[-1][1].add(line)
        return res
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def delete(self, filename=''):
        """Deletes given file or directory. If no filename is passed, current
        directory is removed.
        """
        self._raise_if_none()
        fn = path_join(self.path, filename)

        try:
            if isfile(fn):
                remove(fn)
            else:
                removedirs(fn)
        except OSError as why:
            if why.errno == errno.ENOENT:
                pass
            else:
                raise why
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def delete(self, filename=''):
        """Deletes given file or directory. If no filename is passed, current
        directory is removed.
        """
        self._raise_if_none()
        fn = path_join(self.path, filename)

        try:
            if isfile(fn):
                remove(fn)
            else:
                removedirs(fn)
        except OSError as why:
            if why.errno == errno.ENOENT:
                pass
            else:
                raise why
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_uuenc_single_part(self):
        """
        Decodes a single UUEncoded message
        """
        # Input File
        encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
        assert isfile(encoded_filepath)

        # Compare File
        decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(decoded_filepath)

        # Initialize Codec
        ud_py = CodecUU(work_dir=self.test_dir)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            article = ud_py.decode(fd_in)

        # our content should be valid
        assert isinstance(article, NNTPBinaryContent)

        # Verify the actual article itself reports itself
        # as being okay (structurally)
        assert article.is_valid() is True

        with open(decoded_filepath, 'r') as fd_in:
            decoded = fd_in.read()

        # Compare our processed content with the expected results
        assert decoded == article.getvalue()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_partial_download(self):
        """
        Test the handling of a download that is explicitly ordered to abort
        after only some content is retrieved.  A way of 'peeking' if you will.
        """

        # Input File
        encoded_filepath = join(self.var_dir, 'uuencoded.tax.jpg.msg')
        assert isfile(encoded_filepath)

        # Compare File
        decoded_filepath = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(decoded_filepath)

        # Initialize Codec (restrict content to be no larger then 10 bytes)
        ud_py = CodecUU(work_dir=self.test_dir, max_bytes=10)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            article = ud_py.decode(fd_in)

        # our content should be valid
        assert isinstance(article, NNTPBinaryContent)

        # Our article should not be considered valid on an
        # early exit
        assert article.is_valid() is False

        with open(decoded_filepath, 'r') as fd_in:
            decoded = fd_in.read()

        # Compare our processed content with the expected results
        length = len(article.getvalue())

        # Even though we have't decoded all of our content, we're
        # still the same as the expected result up to what has been
        # processed.
        assert decoded[0:length] == article.getvalue()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_nzbfile_generation(self):
        """
        Tests the creation of NZB Files
        """
        nzbfile = join(self.tmp_dir, 'test.nzbfile.nzb')
        payload = join(self.var_dir, 'uudecoded.tax.jpg')
        assert isfile(nzbfile) is False
        # Create our NZB Object
        nzbobj = NNTPnzb()

        # create a fake article
        segpost = NNTPSegmentedPost(basename(payload))
        content = NNTPBinaryContent(payload)

        article = NNTPArticle('testfile', groups='newsreap.is.awesome')

        # Note that our nzb object segment tracker is not marked as being
        # complete. This flag gets toggled when we add segments manually to
        # our nzb object or if we parse an NZB-File
        assert(nzbobj._segments_loaded is None)

        # Add our Content to the article
        article.add(content)
        # now add our article to the NZBFile
        segpost.add(article)
        # now add our Segmented Post to the NZBFile
        nzbobj.add(segpost)

        # Since .add() was called, this will be set to True now
        assert(nzbobj._segments_loaded is True)

        # Store our file
        assert nzbobj.save(nzbfile) is True
        assert isfile(nzbfile) is True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_bad_files(self):
        """
        Test different variations of bad file inputs
        """
        # No parameters should create a file
        nzbfile = join(self.var_dir, 'missing.file.nzb')
        assert not isfile(nzbfile)

        nzbobj = NNTPnzb(nzbfile=nzbfile)
        assert nzbobj.is_valid() is False
        assert nzbobj.gid() is None

        # Test Length
        assert len(nzbobj) == 0
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_01(self):
        """
        Open a stream to a file we can read for decoding; This test
        specifically focuses on var/group.list
        """

        # Initialize Codec
        ch_py = CodecGroups()

        encoded_filepath = join(self.var_dir, 'group.list')
        assert isfile(encoded_filepath)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            # This module always returns 'True' expecting more
            # but content can be retrieved at any time
            assert ch_py.decode(fd_in) is True

        # This is where the value is stored
        assert isinstance(ch_py.decoded, NNTPMetaContent)
        assert isinstance(ch_py.decoded.content, list)

        # The number of lines in group.list parsed should all be valid
        assert len(ch_py.decoded.content) == ch_py._total_lines

        # Test our reset
        ch_py.reset()

        assert isinstance(ch_py.decoded, NNTPMetaContent)
        assert isinstance(ch_py.decoded.content, list)
        assert len(ch_py.decoded.content) == 0
        assert len(ch_py.decoded.content) == ch_py._total_lines
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_decoding_03(self):
        """
        Open a stream to a file we can read for decoding; This test
        specifically focuses on var/headers.test03.msg
        """

        # Initialize Codec
        ch_py = CodecHeader()

        encoded_filepath = join(self.var_dir, 'headers.test03.msg')
        assert isfile(encoded_filepath)

        # Read data and decode it
        with open(encoded_filepath, 'r') as fd_in:
            # Decodes content and stops when complete
            assert isinstance(ch_py.decode(fd_in), NNTPHeader)

            # Read in the white space since it is actually the first line
            # after the end of headers delimiter
            assert fd_in.readline().strip() == 'First Line without spaces'

        #print '\n'.join(["assert ch_py['%s'] == '%s'" % (k, v) \
        #                 for k, v in ch_py.items()])

        assert len(ch_py) == 10

        # with the 10 lines processed, our line_count
        # should be set to 10
        assert ch_py._lines == 10

        # assert False
        assert ch_py.is_valid() == True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_is_valid(self):
        """
        Tests different key combinations that would cause the different
        return types from is_valid()
        """

        # Initialize Codec
        ch_py = CodecHeader()

        encoded_filepath = join(self.var_dir, 'headers.test03.msg')
        assert isfile(encoded_filepath)

        # We haven't done any processing yet
        assert ch_py.is_valid() is None

        # Populate ourselves with some keys
        with open(encoded_filepath, 'r') as fd_in:
            # Decodes content and stops when complete
            assert isinstance(ch_py.decode(fd_in), NNTPHeader)

        # keys should be good!
        assert ch_py.is_valid() is True

        for k in ( 'DMCA', 'Removed', 'Cancelled', 'Blocked' ):
            # Intentially create a bad key:
            ch_py['X-%s' % k] = 'True'

            # We should fail now
            assert ch_py.is_valid() is False

            # it will become valid again once we clear the key
            del ch_py['X-%s' % k]
            assert ch_py.is_valid() is True