Python genericpath 模块,exists() 实例源码


def declaration_path(name):
    """Return the path to an included declaration"""
    from os.path import dirname, join, exists
    import  metatabdecl
    from metatab.exc import IncludeError

    d = dirname(metatabdecl.__file__)

    path = join(d, name)

    if not exists(path):
        path = join(d, name + '.csv')

    if not exists(path):
        raise IncludeError("No local declaration file for name '{}' ".format(name))

    return path

# From
def make_dir_structure(base_dir):
    """Make the build directory structure. """

    def maybe_makedir(*args):

        p = join(base_dir, *args)

        if exists(p) and not isdir(p):
            raise IOError("File '{}' exists but is not a directory ".format(p))

        if not exists(p):

def check_atlas(atlas):
    """Validation of atlas input."""

    # when its a name for pre-defined atlas
    if isinstance(atlas, str):
        if not pexists(atlas): # just a name
            atlas = atlas.lower()
            if atlas not in parcellate.atlas_list:
                raise ValueError('Invalid choice of atlas. Accepted : {}'.format(parcellate.atlas_list))
        elif os.path.isdir(atlas): # cortical atlas in Freesurfer org
            if not parcellate.check_atlas_annot_exist(atlas):
                raise ValueError('Given atlas folder does not contain Freesurfer label annot files. '
                                 'Needed : given_atlas_dir/label/?h.aparc.annot')
        elif pexists(atlas): # may be a volumetric atlas?
                atlas = nibabel.load(atlas)
                raise ValueError('Unable to read the provided image volume. Must be a nifti 2d volume, readable by nibabel.')
            raise ValueError('Unable to decipher or use the given atlas.')
        raise NotImplementedError('Atlas must be a string, providing a name or path to Freesurfer folder or a 3D nifti volume.')

    return atlas
def make_output_path_graph(out_dir, subject, str_prefixes):
    "Constructs path to save a multigraph to disk."

    if out_dir is not None:
        # get outpath returned from hiwenet, based on dist name and all other parameters
        # choose out_dir name  based on dist name and all other parameters
        out_subject_dir = pjoin(out_dir, subject)
        if not pexists(out_subject_dir):

        if isinstance(str_prefixes, str):
            str_prefixes = [str_prefixes, ]

        out_file_name = '{}_graynet.graphml'.format('_'.join(str_prefixes))
        out_weights_path = pjoin(out_subject_dir, out_file_name)
        out_weights_path = None

    return out_weights_path
def check_subjects(subjects_info):
    "Ensure subjects are provided and their data exist."

    if isinstance(subjects_info, str):
        if not pexists(subjects_info):
            raise IOError('path to subject list does not exist: {}'.format(subjects_info))
        subjects_list = np.genfromtxt(subjects_info, dtype=str)
    elif isinstance(subjects_info, collections.Iterable):
        if len(subjects_info) < 1:
            raise ValueError('Empty subject list.')
        subjects_list = subjects_info
        raise ValueError('Invalid value provided for subject list. \n '
                         'Must be a list of paths, or path to file containing list of paths, one for each subject.')

    subject_id_list = np.atleast_1d(subjects_list)
    num_subjects = subject_id_list.size
    if num_subjects < 1:
        raise ValueError('Input subject list is empty.')

    num_digits_id_size = len(str(num_subjects))
    max_id_width = max(map(len, subject_id_list))

    return subject_id_list, num_subjects, max_id_width, num_digits_id_size
def check_params_single_edge(base_features, in_dir, atlas, smoothing_param, node_size, out_dir, return_results):


    if atlas.lower() not in parcellate.atlas_list:
        raise ValueError('Invalid atlas choice. Use one of {}'.format(parcellate.atlas_list))

    if not pexists(in_dir):
        raise IOError('Input directory at {} does not exist.'.format(in_dir))

    if out_dir is None and return_results is False:
        raise ValueError('Results are neither saved to disk or being received when returned.\n'
                         'Specify out_dir (not None) or make return_results=True')

    if out_dir is not None and not pexists(out_dir):

    # no checks on subdivison size yet, as its not implemented

def check_params_multiedge(base_feature_list, input_dir, atlas, smoothing_param,
                           node_size, out_dir, return_results):
    """Validation of parameters and appropriate type casting if necessary."""


    if atlas.lower() not in parcellate.atlas_list:
        raise ValueError('Invalid atlas choice. Use one of {}'.format(parcellate.atlas_list))

    if not pexists(input_dir):
        raise IOError('Input directory at {} does not exist.'.format(input_dir))

    if out_dir is None and return_results is False:
        raise ValueError('Results are neither saved to disk or being received when returned.\n'
                         'Specify out_dir (not None) or make return_results=True')

    if out_dir is not None and not pexists(out_dir):

    # no checks on subdivison size yet, as its not implemented

def add_image(self, base_path, file):
        if not os.path.exists(file):
            file = os.path.join(base_path, file)
        texture = Image(source=file).texture
        print('loading', file)
        texture.mag_filter = 'nearest'
        if texture is None:
            sys.exit('failed to locate image file %r' % file)

        id = self.firstgid
        th = self.tile_height + self.spacing
        tw = self.tile_width + self.spacing
        for j in range(texture.height // th):
            for i in range(texture.width // tw):
                x = (i * tw) + self.margin
                # convert the y coordinate to OpenGL (0 at bottom of texture)
                y = texture.height - ((j + 1) * th)
                tile = texture.get_region(x, y, self.tile_width, self.tile_height)
                self.tiles.append(Tile(id, tile, self))
                id += 1
def save(config, is_system=False):
        Save configuration ``config``.
        location = SYSTEM_CONFIG if is_system else RUNTIME_CONFIG
  "Saving config: " + location)
            dir = location.replace("/jarbas_runtime.conf", "").replace(
                "/jarbas.conf", "")
            if not exists(dir):
                loc_config = load_commented_json(location)
                loc_config = {}

            with open(location, 'w') as f:
                config = loc_config.update(config)
                json.dump(config, f)
        except Exception as e:
def new_metatab_file(mt_file, template):
    template = template if template else 'metatab'

    if not exists(mt_file):
        doc = make_metatab_file(template)


        return True


        return False
def ensure_dir(path):
    if path and not exists(path):
def get_eval_dir(self):
        answer_dir = join(self.dir, "answers")
        if not exists(answer_dir):
        return answer_dir
def get_best_weights(self):
        if exists(self.best_weight_dir):
            return tf.train.latest_checkpoint(self.best_weight_dir)
        return None
def restore_checkpoint(self, sess, var_list=None, load_ema=True):
        Restores either the best weights or the most recent checkpoint, assuming the correct
        variables have already been added to the tf default graph e.g., .get_prediction()
        has been called the model stored in `self`.
        Automatically detects if EMA weights exists, and if they do loads them instead
        checkpoint = self.get_best_weights()
        if checkpoint is None:
            print("Loading most recent checkpoint")
            checkpoint = self.get_latest_checkpoint()
            print("Loading best weights")

        if load_ema:
            if var_list is None:
                # Same default used by `Saver`
                var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) + \

            # Automatically check if there are EMA variables, if so use those
            reader = tf.train.NewCheckpointReader(checkpoint)
            ema = tf.train.ExponentialMovingAverage(0)
            ema_names = {ema.average_name(x): x for x in var_list
                         if reader.has_tensor(ema.average_name(x))}
            if len(ema_names) > 0:
                print("Found EMA weights, loading them")
                ema_vars = set(x for x in ema_names.values())
                var_list = { v for v in var_list if v not in ema_vars}

        saver = tf.train.Saver(var_list)
        saver.restore(sess, checkpoint)
def reseed(self):
        # this will only work if app has permission to modify seed.db, for example NOT IOS.
        # PS) hopefully it's obvious that this will only effect new games based off of new seed.db.
        # all_tmx_files_to_db will just update seed.db if it exists, but for now, just wipe the slate clean to be sure
        if exists('seed.db'):
            print('deleting seed.db, existing...')
        Notification(message='Reseed complete!').open()
def fromxml(cls, tag, tilemap, firstgid=None):
        if 'source' in tag.attrib:
            firstgid = int(tag.attrib['firstgid'])
            path = tag.attrib['source']
            if not os.path.exists(path):
                path = os.path.join(tilemap.file_path, path)
            with open(path) as f:
                tileset = ElementTree.fromstring(
            return cls.fromxml(tileset, firstgid)

        name = tag.attrib['name']
        if firstgid is None:
            firstgid = int(tag.attrib['firstgid'])
        tile_width = int(tag.attrib['tilewidth'])
        tile_height = int(tag.attrib['tileheight'])
        spacing = int(tag.get('spacing', 0))
        margin = int(tag.get('margin', 0))

        tileset = cls(name, tile_width, tile_height, firstgid,
                      spacing, margin)

        for c in tag.getchildren():
            if c.tag == "image":
                # create a tileset
                tileset.add_image(tilemap.file_path, c.attrib['source'])
            elif c.tag == 'tile':
                gid = tileset.firstgid + int(c.attrib['id'])
        return tileset
def _create_if_not_exists(self):
        if exists(self._file_path):
        dist_file = join(dirname(__file__), '..', 'resources', 'config-dist.json')
        copy(dist_file, self._file_path)
def __load(config, location):
        if exists(location) and isfile(location):
                    config, load_commented_json(location))
                LOG.debug("Configuration '%s' loaded" % location)
            except Exception, e:
                LOG.error("Error loading configuration '%s'" % location)
            LOG.debug("Configuration '%s' not found" % location)
        return config
def download(self, url):

        working_dir = self.working_dir if self.working_dir else ''

        r = Resource()

        # For local files, don't download, just reference in place.
        if url.scheme == 'file':
            r.cache_path = Url(url.resource_url).path
            r.download_time = None

            # Many places the file may exist
            locations = {  # What a mess ...
                abspath(join(working_dir, r.cache_path)),

            for l in locations:
                if exists(l):
                    r.sys_path = l
                raise DownloadError(("File resource does not exist. Found none of:"
                                     "\n{}\n\nWorking dir = {}\ncache_path={}\nspec_path={}")
                                    .format('\n'.join(locations), working_dir, r.cache_path, url.path))

            # Not a local file, so actually need to download it.
                r.cache_path, r.download_time = self._download_with_lock(url.resource_url)
            except AccessError as e:
                # Try again, using a URL that we may have configured an account for. This is
                # primarily S3 urls, with Boto or AWS credential
                    r.cache_path, r.download_time = self._download_with_lock(url.auth_resource_url)
                except AttributeError:
                    raise e

            r.sys_path = self.cache.getsyspath(r.cache_path)

        return r
def _download(self, url, cache_path):
        import requests

        def copy_callback(read, total):
            if self.callback:
                self.callback('copy_file', read, total)

        if self.callback:
            self.callback('download', url, 0)

        if url.startswith('s3:'):

            from appurl.url import Url

            s3url = Url(url)

                with, 'wb') as f:
            except Exception as e:
                raise DownloadError("Failed to fetch S3 url '{}': {}".format(url, e))

        elif url.startswith('ftp:'):
            from contextlib import closing

            with closing(urlopen(url)) as fin:

                with, 'wb') as fout:

                    read_len = 16 * 1024
                    total_len = 0
                    while 1:
                        buf =
                        if not buf:
                        total_len += len(buf)

                        if self.callback:
                            copy_callback(len(buf), total_len)


                r = requests.get(url, stream=True)
            except SSLError as e:
                raise DownloadError("Failed to GET {}: {} ".format(url, e))

            # Requests will auto decode gzip responses, but not when streaming. This following
            # monkey patch is recommended by a core developer at
            if r.headers.get('content-encoding') == 'gzip':
       = functools.partial(, decode_content=True)

            with, 'wb') as f:
                copy_file_or_flo(r.raw, f, cb=copy_callback)

            assert self.cache.exists(cache_path)