Python os.path 模块,getsize() 实例源码


项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def dirsize(src):
    Takes a source directory and returns the entire size of all of it's
    content(s) in bytes.

    The function returns None if the size can't be properly calculated.
    if not isdir(src):
        # Nothing to return
        return 0

        with pushd(src, create_if_missing=False):
            size = sum(getsize(f) for f in listdir('.') if isfile(f))

    except (OSError, IOError):
        return None

    # Return our total size
    return size
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def open(self):
        Setup the internal structure.

        NB : Call this function before 
        extracting data from a file.
        if self.file :
        try :
            self.file = open(self.path, 'rb')
        except Exception as e:
            raise Exception("python couldn't open file %s : %s" % (self.path, e))
        self.file_size = path.getsize(
        self.creation_date = datetime.fromtimestamp(path.getctime(
        self.modification_date = datetime.fromtimestamp(path.getmtime(
        self.nomenclature = self.get_nomenclature()
        self.factory = self.get_factory()
        self.layout = self.create_layout()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_headers(self, filename):
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_timestamps(self, filename):
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             shape=(int((filesize - 16384) / 4 / 261), 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + data[:,1]*2**32

            return timestamps
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def open(self):
        Setup the internal structure.

        NB : Call this function before 
        extracting data from a file.
        if self.file :
        try :
            self.file = open(self.path, 'rb')
        except Exception as e:
            raise Exception("python couldn't open file %s : %s" % (self.path, e))
        self.file_size = path.getsize(
        self.creation_date = datetime.fromtimestamp(path.getctime(
        self.modification_date = datetime.fromtimestamp(path.getmtime(
        self.nomenclature = self.get_nomenclature()
        self.factory = self.get_factory()
        self.layout = self.create_layout()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_headers(self, filename):
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_timestamps(self, filename):
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             shape=(int((filesize - 16384) / 4 / 261), 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + data[:,1]*2**32

            return timestamps
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_nev_file(self, filename):
        """ Memory map the Neuralynx .nev file """
        nev_dtype = np.dtype([
            ('reserved', '<i2'),
            ('system_id', '<i2'),
            ('data_size', '<i2'),
            ('timestamp', '<u8'),
            ('event_id', '<i2'),
            ('ttl_input', '<i2'),
            ('crc_check', '<i2'),
            ('dummy1', '<i2'),
            ('dummy2', '<i2'),
            ('extra', '<i4', (8,)),
            ('event_string', 'a128'),

        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nev_dtype, mode='r', offset=16384)
            return None
项目:PyGPS    作者:gregstarr    | 项目源码 | 文件源码
def rinexobs(obsfn,writeh5=None,maxtimes=None):
    stem,ext = splitext(expanduser(obsfn))
    if ext[-1].lower() == 'o': #raw text file
        with open(obsfn,'r') as f:
            lines =
            header,version,headlines,obstimes,sats,svset = scan(lines)
            print('{} is a RINEX {} file, {} kB.'.format(obsfn,version,getsize(obsfn)/1000.0))
            data = processBlocks(lines,header,obstimes,svset,headlines,sats)
            print("finished in {0:.2f} seconds".format(time.time()-t))
    #%% save to disk (optional)
        if writeh5:
            h5fn = stem + '.h5'
            print('saving OBS data to {}'.format(h5fn))
    elif ext.lower() == '.h5':
        data = read_hdf(obsfn,key='OBS')
        print('loaded OBS data from {} to {}'.format(blocks.items[0],blocks.items[-1]))
    return data

# this will scan the document for the header info and for the line on
# which each block starts
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def test_write_gff_file(self, seqprop_with_i, tmpdir):
        """Test writing the features, and that features are now loaded from a file"""
        outpath = tmpdir.join('test_seqprop_with_i_write_gff_file.gff').strpath
        seqprop_with_i.write_gff_file(outfile=outpath, force_rerun=True)

        # Test that the file was written
        assert op.exists(outpath)
        assert op.getsize(outpath) > 0

        # Test that file paths are correct
        assert seqprop_with_i.feature_path == outpath
        assert seqprop_with_i.feature_file == 'test_seqprop_with_i_write_gff_file.gff'
        assert seqprop_with_i.feature_dir == tmpdir

        # Test that features cannot be changed
        with pytest.raises(ValueError):
            seqprop_with_i.features = ['NOFEATURES']
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def clear_cache(force = False):
  If the folder exists, and has more than 5MB of icons in the cache, delete
  it to clear all the icons then recreate it.
  from os.path import getsize, join, isfile, exists
  from os import makedirs, listdir
  from sublime import cache_path
  from shutil import rmtree

  # The icon cache path
  icon_path = join(cache_path(), "GutterColor")

  # The maximum amount of space to take up
  limit = 5242880 # 5 MB

  if exists(icon_path):
    size = sum(getsize(join(icon_path, f)) for f in listdir(icon_path) if isfile(join(icon_path, f)))
    if force or (size > limit): rmtree(icon_path)

  if not exists(icon_path): makedirs(icon_path)
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def test_open(self):
        store = self.create_ssh_store()
        target_filename = 'sample_text_file1.txt'
        with open(self.sample_text_file1, 'rb') as f:
            length = store.put(target_filename, f)
        self.assertEqual(length, getsize(self.sample_text_file1))
        self.assertTrue(exists(join(self.temp_path, target_filename)))

        # Reading
        with, mode='rb') as stored_file, \
                open(self.sample_text_file1, mode='rb') as original_file:

        # Writing
        new_content = b'Some Binary Data'
        with, mode='wb') as stored_file:

        with, mode='rb') as stored_file:
            self.assertEqual(, new_content)
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def test_open(self):
        store = FileSystemStore(self.temp_path, self.base_url)
        target_filename = 'test_open/sample_text_file1.txt'
        with open(self.sample_text_file1, 'rb') as f:
            length = store.put(target_filename, f)
        self.assertEqual(length, getsize(self.sample_text_file1))
        self.assertTrue(exists(join(self.temp_path, target_filename)))

        # Reading
        with, mode='rb') as stored_file, \
                open(self.sample_text_file1, mode='rb') as original_file:

        # Writing
        new_content = b'Some Binary Data'
        with, mode='wb') as stored_file:

        with, mode='rb') as stored_file:
            self.assertEqual(, new_content)
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def addService(self, service):
        from os import path
        from enigma import eServiceCenter, iServiceInformation
        from ServiceReference import ServiceReference
        from time import localtime, time
        self.source = service
        serviceHandler = eServiceCenter.getInstance()
        info =
        sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
        self.DVBdescr = sDescr
        sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
        if sTimeCreate > 1:
            self.timeCreate = localtime(sTimeCreate)
        serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
        name = info and info.getName(service) or "Title" + sDescr
        self.DVBname = name
        self.DVBchannel = serviceref.getServiceName()
        self.inputfile = service.getPath()
        self.filesize = path.getsize(self.inputfile)
        self.estimatedDiskspace = self.filesize
        self.length = info.getLength(service)
项目:better-apidoc    作者:goerz    | 项目源码 | 文件源码
def shall_skip(module, opts):
    # type: (unicode, Any) -> bool
    """Check if we want to skip this module."""
    # skip if the file doesn't exist and not using implicit namespaces
    if not opts.implicit_namespaces and not path.exists(module):
        return True

    # skip it if there is nothing (or just \n or \r\n) in the file
    if path.exists(module) and path.getsize(module) <= 2:
        return True

    # skip if it has a "private" name and this is selected
    filename = path.basename(module)
    if filename != '' and filename.startswith('_') and \
       not opts.includeprivate:
        return True
    return False
项目:cos-python-sdk-v4    作者:tencentyun    | 项目源码 | 文件源码
def get_sha1_by_slice(file_name, slice_size):
        """ Get SHA array based on Qcloud Slice Upload Interface

        :param file_name: local file path
        :param slice_size: slice size in bit
        :return: sha array like [{“offset”:0, “datalen”:1024,”datasha”:”aaa”}, {}, {}]
        from os import path

        with open(file_name, 'rb') as f:

            result = []
            file_size = path.getsize(file_name)
            sha1_obj = Sha1Hash()
            for current_offset in range(0, file_size, slice_size):

                data_length = min(slice_size, file_size - current_offset)
                sha1_val = sha1_obj.inner_digest()
                result.append({"offset": current_offset, "datalen": data_length, "datasha": sha1_val})

            result[-1]['datasha'] = sha1_obj.hexdigest()
            return result
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def tqdm_open(filename, encoding='utf8'):
    ???????? ?????, ????????? ? tqdm
    total = getsize(filename)

    def wrapped_line_iterator(fd):
        with tqdm(total=total, unit="B", unit_scale=True, desc=basename(filename), miniters=1) as pb:
            processed_bytes = 0
            for line in fd:
                processed_bytes += len(line)
                if processed_bytes >= 1024 * 1024:
                    processed_bytes = 0
                yield line

    with open(filename, encoding=encoding) as fd:
        yield wrapped_line_iterator(fd)
项目:mingwpy    作者:mingwpy    | 项目源码 | 文件源码
def hashsize(path):
  Generate SHA-1 hash + file size string for the given
  filename path. Used to check integrity of downloads.
  Resulting string is space separated 'hash size':

    >>> hashsize('')
    'fbb498a1d3a3a47c8c1ad5425deb46b635fac2eb 2006'
  size = getsize(path)
  h = sha1()
  with open(path, 'rb') as source:
    while True:
      # read in 64k blocks, because some files are too big
      # and free memory is not enough
      c =*1024)
      if not c:
  return '%s %s' % (h.hexdigest(), size)
项目:parglare    作者:igordejanovic    | 项目源码 | 文件源码
def timeit(parser_class, file_name, message, **kwargs):
    print(message, 'File:', file_name)
    file_name = join(dirname(__file__), 'test_inputs', file_name)
    file_size = getsize(file_name)
    print('File size: {:.2f}'.format(file_size/1000), 'KB')

    this_folder = dirname(__file__)
    g = Grammar.from_file(join(this_folder, ''))
    parser = parser_class(g, **kwargs)

    t_start = time.time()
    with open(file_name) as f:
    t_end = time.time()

    print('Elapsed time: {:.2f}'.format(t_end - t_start), 'sec')
    print('Speed = {:.2f}'.format(file_size/1000/(t_end - t_start)),
项目:HtmlExtract-Python    作者:xinyi-spark    | 项目源码 | 文件源码
def judge_twice_url(self, save_path, save_type):
        ???????????? ?? title ? mian.html????????????????
        1?title??? Page has moved
        page_path = pjoin(save_path, 'main.html')
        url_path = pjoin(save_path, 'url_file')
        with open(page_path, 'r') as f:
            content =
            title = get_title(content)
        if title == 'Page has moved' or getsize(page_path) <= 3072:
            with open(url_path, 'r') as f:
                url =
            redict_url = self.get_twice_page_url(content)
            # ???????
            if redict_url:
                if redict_url.find('http') != -1:
                        "redict_url: %s" % (redict_url, ))
                        redict_url, save_path, save_type)
项目:enigma2    作者:Openeight    | 项目源码 | 文件源码
def addService(self, service):
        from os import path
        from enigma import eServiceCenter, iServiceInformation
        from ServiceReference import ServiceReference
        from time import localtime, time
        self.source = service
        serviceHandler = eServiceCenter.getInstance()
        info =
        sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
        self.DVBdescr = sDescr
        sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
        if sTimeCreate > 1:
            self.timeCreate = localtime(sTimeCreate)
        serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
        name = info and info.getName(service) or "Title" + sDescr
        self.DVBname = name
        self.DVBchannel = serviceref.getServiceName()
        self.inputfile = service.getPath()
        self.filesize = path.getsize(self.inputfile)
        self.estimatedDiskspace = self.filesize
        self.length = info.getLength(service)
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def as_contigset(fasta_file, xml_file):
    if fasta_file == xml_file or xml_file is None:
        if not op.isfile(fasta_file) or op.getsize(fasta_file) == 0:
            return ContigSet()
        return ContigSet(fasta_file)
    file_size = op.getsize(fasta_file)

    fai_file = fasta_file + ".fai"
    if op.exists(fai_file):

    ds = ContigSet(fasta_file, generateIndices=True)
    if not file_size > 0:
        with open(fai_file, "w") as fai:
    return ds
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    with open('dlink_ftp.dlink.eu_filelist.csv', 'w') as fout:
        cw = csv.writer(fout, dialect='excel')
        cw.writerow(['ftp_url', 'file_size', 'file_date', 'model', 'file_sha1', 'file_md5'])
        with open('dlink_ftp.dlink.eu_filelist.txt', 'r') as fin:
            for line in fin:
                if not line:
                ftpurl, fsize, fdate = line.split('\t', 2)
                fdate = datetime.fromtimestamp(float(fdate))
                fname = 'output/D-Link/' + ftpurl.split('/')[-1]
                sha1 = getFileSha1(fname)
                md5 = getFileMd5(fname)
                fsize = path.getsize(fname)
                model = get_model_from_ftp_url(ftpurl)
                cw.writerow([ftpurl, fsize, fdate, model,sha1,md5])
项目:gobble    作者:openspending    | 项目源码 | 文件源码
def filedata(self):
        filedata = {
            resource.descriptor['path']: {
                'name': resource.descriptor['name'],
                'length': getsize(resource.source),
                'md5': compute_hash(resource.source),
                'type': resource.descriptor.get('mediatype', 'text/'+resource.descriptor['path'].split('.')[-1]),
            } for resource in self
        descriptor_file = {
            basename(self.filepath): {
                'length': getsize(self.filepath),
                'md5': compute_hash(self.filepath),
                'type': 'application/octet-stream',
        return {
            'filedata': filedata,
            'metadata': {
项目:Cayenne-Agent    作者:myDevicesIoT    | 项目源码 | 文件源码
def TestDownload(self):
            a =
            info('Excuting regular download test for network speed')
            url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl
            debug(url + ' ' + download_path)
            request.urlretrieve(url, download_path)
            b =
            c = b - a
            if path.exists(download_path):
                size = path.getsize(download_path)/mb
                self.downloadSpeed = size/c.total_seconds()
                return True
        except socket_error as serr:
                error ('TestDownload:' + str(serr))
                ret = False
                Daemon.OnFailure('cloud', serr.errno)
            exception('TestDownload Failed')
        return False
项目:Cayenne-Agent    作者:myDevicesIoT    | 项目源码 | 文件源码
def rotator(source, dest):
        # print('Log rotator, pid:' + str(getpid()))
        if size > 100000000:
            # files larger than 100MB will be deleted
            tar =, "w:bz2")
        # Remove old myDevices.log backups if they are older than a week. This code can be removed
        #  in later versions if myDevices.log files have been replaced with cayenne.log.
        for old_file in iglob('/var/log/myDevices/myDevices.log*'):
            if path.getmtime(old_file) + 604800 < time.time(): 
    except Exception as ex:
        print('Log rotator failed with: ' +str(ex))
项目:enigma2    作者:BlackHole    | 项目源码 | 文件源码
def addService(self, service):
        from os import path
        from enigma import eServiceCenter, iServiceInformation
        from ServiceReference import ServiceReference
        from time import localtime

        self.source = service
        serviceHandler = eServiceCenter.getInstance()
        info =
        sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
        self.DVBdescr = sDescr
        sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
        if sTimeCreate > 1:
            self.timeCreate = localtime(sTimeCreate)
        serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
        name = info and info.getName(service) or "Title" + sDescr
        self.DVBname = name
        self.DVBchannel = serviceref.getServiceName()
        self.inputfile = service.getPath()
        self.filesize = path.getsize(self.inputfile)
        self.estimatedDiskspace = self.filesize
        self.length = info.getLength(service)
项目:rapidserv    作者:iogf    | 项目源码 | 文件源码
def locate(self, spin, request):
        path = join(,, basename(request.path))
        if not isfile(path):

        # Where we are going to serve files.
        # I might spawn an event like FILE_NOT_FOUND.
        # So, users could use it to send appropriate answers.
        type_file, encoding = guess_type(path)
        default_type = 'application/octet-stream'

        spin.add_header(('Content-Type', type_file if type_file else default_type),
                     ('Content-Length', getsize(path)))

        xmap(spin, OPEN_FILE_ERR, lambda con, err: lose(con))
        drop(spin, path)
项目:smugmugv2py    作者:adhawkins    | 项目源码 | 文件源码
def upload_image(self, filename, album_uri, caption=None, title=None, keywords=None):
        headers = {
            'User-Agent': self.__user_agent,
            'X-Smug-ResponseType': 'JSON',
            'X-Smug-Version': 'v2',
            'Content-Type': guess_type(filename)[0],
            'X-Smug-AlbumUri': album_uri,
            'X-Smug-FileName': filename,
            'Content-Length': path.getsize(filename),

        if caption is not None:

        if title is not None:

        if keywords is not None:

        with open(filename, "rb") as f:
            return self.raw_post(self.UPLOAD_URL, data=data, headers=headers)
项目:seaworthy    作者:praekeltfoundation    | 项目源码 | 文件源码
def shall_skip(module, opts):
    # type: (unicode, Any) -> bool
    """Check if we want to skip this module."""
    # skip if the file doesn't exist and not using implicit namespaces
    if not opts.implicit_namespaces and not path.exists(module):
        return True

    # skip it if there is nothing (or just \n or \r\n) in the file
    if path.exists(module) and path.getsize(module) <= 2:
        return True

    # skip if it has a "private" name and this is selected
    filename = path.basename(module)
    if filename != '' and filename.startswith('_') and \
       not opts.includeprivate:
        return True
    return False
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def traverse_dir(path):
    file_dict = {}
    dir_dict = {}
    count = 1
    for root, dirs, files in walk(path):
        for d in dirs:
            abs_p = join(root, d)
            dir_dict[abs_p] = 0
            count += 1
            if count % 200 == 0:
                print('%s files scanned' % count)

        for f in files:
            abs_p = join(root, f)
            file_dict[abs_p] = getsize(abs_p)
            count += 1
            if count % 200 == 0:
                print('%s files scanned' % count)

    return file_dict, dir_dict
项目:cavedbmanager    作者:masneyb    | 项目源码 | 文件源码
def do_show_bulletin_attachment(request, bulletin_id, localfile, remotefile):
    #pylint: disable=unused-argument
    if not cavedb.perms.is_bulletin_allowed(bulletin_id):
        raise Http404

    if not isfile(localfile):
        raise Http404

    mimetype = guess_type(localfile)[0]
    if mimetype is None:
        mimetype = "application/octet-stream"

        wrapper = FileWrapper(open(localfile, 'rb'))
        response = FileResponse(wrapper, content_type=mimetype)

        if remotefile and (mimetype is None or not mimetype.startswith('image')):
            response['Content-Disposition'] = 'attachment; filename=' + remotefile

        response['Content-Length'] = getsize(localfile)
    except IOError:
        print('Cannot find %s\n' % (localfile), file=sys.stderr)
        raise Http404

    return response
项目:tredparse    作者:humanlongevity    | 项目源码 | 文件源码
def getfilesize(filename, ratio=None):
    rawsize = op.getsize(filename)
    if not filename.endswith(".gz"):
        return rawsize

    import struct

    fo = open(filename, 'rb'), 2)
    r =
    size = struct.unpack('<I', r)[0]
    # This is only ISIZE, which is the UNCOMPRESSED modulo 2 ** 32
    if ratio is None:
        return size

    # Heuristic
    heuristicsize = rawsize / ratio
    while size < heuristicsize:
        size += 2 ** 32
    if size > 2 ** 32:
            "Gzip file estimated uncompressed size: {0}.".format(size))

    return size
项目:eoj3    作者:ultmaster    | 项目源码 | 文件源码
def file_manager(request):
    def slugify(text):
        import re
        return re.sub(r'[ /"#!:]+', '_', text)

    if not is_admin_or_root(request.user):
        raise PermissionDenied
    if request.method == 'POST':
            file = request.FILES['file']
            save_uploaded_file_to(file, settings.UPLOAD_DIR, filename=slugify(
        except Exception as e:
            raise PermissionDenied(repr(e))
    return render(request, 'filemanager.jinja2', context={
        'file_list': list(map(lambda x: {
            'name': x,
            'modified_time': datetime.fromtimestamp(path.getmtime(path.join(settings.UPLOAD_DIR, x))).
            'size': str(path.getsize(path.join(settings.UPLOAD_DIR, x)) // 1024) + "K"
        }, filter(lambda x: path.isfile(path.join(settings.UPLOAD_DIR, x)), listdir(settings.UPLOAD_DIR))))
项目:pisi    作者:examachine    | 项目源码 | 文件源码
def dir_size(dir):
    """ calculate the size of files under a dir
    based on the os module example"""
    # It's really hard to give an approximate value for package's
    # installed size. Gettin a sum of all files' sizes if far from
    # being true. Using 'du' command (like Debian does) can be a
    # better solution :(.
    # Not really, du calculates size on disk, this is much better -- exa
    from os.path import getsize, islink, isdir, exists
    join = join_path

    if exists(dir) and (not isdir(dir) and not islink(dir)):
        #so, this is not a directory but file..
        return getsize(dir)

    if islink(dir):
        return long(len(os.readlink(dir)))

    def sizes():
        for root, dirs, files in os.walk(dir):
            yield sum([getsize(join(root, name)) for name in files if not islink(join(root,name))])
            yield sum([long(len(os.readlink((join(root, name))))) for name in files if islink(join(root,name))])
    return sum( sizes() )
项目:bauhaus    作者:dalexander    | 项目源码 | 文件源码
def __init__(self, inputCsv, resolver):
        if not op.isfile(inputCsv):
            raise ValueError("Missing input file: %s" % inputCsv)
        nbytes = min(32, op.getsize(inputCsv))
        raw= open(inputCsv, 'rb').read(nbytes)
        if raw.startswith(codecs.BOM_UTF8):
            raise TableValidationError("Input CSV file is in UTF-8 format. Please convert to ASCII or remove Byte Order Mark (BOM)")
            with open(inputCsv) as f:
                cr = csv.reader(f)
                allRows = list(cr)
                columnNames, rows = \
                    allRows[0], allRows[1:]
                self.tbl = eztable.Table(columnNames, rows)
            raise TableValidationError("Input CSV file can't be read/parsed:" + str(sys.exc_info()[0]))
项目:berlyne    作者:rugo    | 项目源码 | 文件源码
def _download_wrapped_file(download):
    download_path = download.abspath
    # We do not allow symlinks as downloads for security reasons
    if not path.exists(download_path) or path.islink(download_path):
        return HttpResponse("Download not found", status=HTTP_NOT_FOUND)
    wrapper = FileWrapper(open(download_path, "rb"))
    response = HttpResponse(wrapper, content_type='application/force-download')
    response['Content-Disposition'] = 'attachment; filename="{}"'.format(
    response['Content-Length'] = path.getsize(download_path)
    return response
项目:cos-python3-sdk-v4    作者:a270443177    | 项目源码 | 文件源码
def get_sha1_by_slice(file_name, slice_size):
        """ Get SHA array based on Qcloud Slice Upload Interface

        :param file_name: local file path
        :param slice_size: slice size in bit
        :return: sha array like [{“offset”:0, “datalen”:1024,”datasha”:”aaa”}, {}, {}]
        from os import path

        with open(file_name, 'rb') as f:

            result = []
            file_size = path.getsize(file_name)
            sha1_obj = Sha1Hash()
            for current_offset in range(0, file_size, slice_size):

                data_length = min(slice_size, file_size - current_offset)
                sha1_val = sha1_obj.inner_digest()
                result.append({"offset": current_offset, "datalen": data_length, "datasha": sha1_val})

            result[-1]['datasha'] = sha1_obj.hexdigest()
            return result
项目:libSigNetSim    作者:vincent-noel    | 项目源码 | 文件源码
def compile(self, nb_procs):

        if nb_procs > 1:
            target = "lsa.mpi"

            target = "lsa"

        cmd_comp = "make -f %sMakefile -C %s %s 1>/dev/null" % (

        res_comp = call(cmd_comp,
                                stdout=open("%sout_optim_comp" % self.getTempDirectory(),"w"),
                                stderr=open("%serr_optim_comp" % self.getTempDirectory(),"w"),
                                shell=True, preexec_fn=setpgrp, close_fds=True)

        if res_comp != 0 or getsize(self.getTempDirectory() + "err_optim_comp") > 0:
            return self.OPTIM_FAILURE
            return self.OPTIM_SUCCESS
项目:pi-dashcam    作者:amshali    | 项目源码 | 文件源码
def Cleanup(delete_dir, delete_threshold, freeup_amount):
  free_space = FreeSpaceMB(delete_dir)
  if free_space < delete_threshold:
    files = [f for f in map(lambda x: join(delete_dir, x), listdir(delete_dir)) \
      if isfile(f) and not islink(f)]
    # Sort files acsending based on their modification time.
    files.sort(key=lambda f: getmtime(f))
    freed = 0.0
    # Delete enough files to free up enough space that macthes freeup_amount
    for f in files:
      # Size of file in MB
      f_size = getsize(f) / 1024 / 1024
      print "Deleted ", f
      freed = freed + f_size
      if freed >= freeup_amount:
项目:enigma2-openpli-fulan    作者:Taapat    | 项目源码 | 文件源码
def addService(self, service):
        from os import path
        from enigma import eServiceCenter, iServiceInformation
        from ServiceReference import ServiceReference
        from time import localtime, time
        self.source = service
        serviceHandler = eServiceCenter.getInstance()
        info =
        sDescr = info and info.getInfoString(service, iServiceInformation.sDescription) or ""
        self.DVBdescr = sDescr
        sTimeCreate = info.getInfo(service, iServiceInformation.sTimeCreate)
        if sTimeCreate > 1:
            self.timeCreate = localtime(sTimeCreate)
        serviceref = ServiceReference(info.getInfoString(service, iServiceInformation.sServiceref))
        name = info and info.getName(service) or "Title" + sDescr
        self.DVBname = name
        self.DVBchannel = serviceref.getServiceName()
        self.inputfile = service.getPath()
        self.filesize = path.getsize(self.inputfile)
        self.estimatedDiskspace = self.filesize
        self.length = info.getLength(service)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def __len__(self):
        Returns the length of the content
        if not self.filepath:
            # If there is no filepath, then we're probably dealing with a
            # stream in memory like a StringIO or BytesIO stream.
                # Advance to the end of the file
                ptr =
                # Advance to the end of the file and get our length
                length =, SEEK_END)
                if length != ptr:
                    # Return our pointer
          , SEEK_SET)
                # No Stream or Filepath; nothing has been initialized
                # yet at all so just return 0
                length = 0
            if and self._dirty is True:
                self._dirty = False

            # Get the size
            length = getsize(self.filepath)

        return length
项目:logscan    作者:magedu    | 项目源码 | 文件源码
def __init__(self, filename, counter):
        self.filename = path.abspath(filename)
        self.queue = Queue()
        self.check_chain = CheckerChain(self.queue, counter) = Observer()
        self.fd = None
        self.offset = 0
        if path.isfile(self.filename):
            self.fd = open(self.filename)
            self.offset = path.getsize(self.filename)
项目:logscan    作者:magedu    | 项目源码 | 文件源码
def on_moved(self, event):
        if path.abspath(event.src_path) == self.filename:
        if path.abspath(event.dest_path) == self.filename and path.isfile(self.filename):
            self.fd = open(self.filename)
            self.offset = path.getsize(self.filename)
项目:logscan    作者:magedu    | 项目源码 | 文件源码
def on_created(self, event):
        if path.abspath(event.src_path) == self.filename and path.isfile(self.filename):
            self.fd = open(self.filename)
            self.offset = path.getsize(self.filename)
项目:Telebackup    作者:LonamiWebs    | 项目源码 | 文件源码
def valid_file_exists(file):
        """Determines whether a file exists and its "valid"
           (i.e., the file size is greater than 0; if it's 0, it probably faild dueto an RPC error)"""
        return path.isfile(file) and path.getsize(file) > 0

项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def needs_download(url, filepath):
        if not op.exists(filepath):
            return True
            response = requests.head(url)
            remote_size = int(response.headers['Content-Length'])
            local_size = op.getsize(filepath)
            if remote_size > local_size:
                return True
                return False
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_nse_packets(self, filename):
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             shape=((filesize - 16384) / 2 / 56, 56),
                             mode='r', offset=16384)

            # reconstructing original data
            # first 4 ints -> timestamp in microsec
            timestamps = data[:, 0] + data[:, 1] * 2 ** 16 + data[:,
                                                             2] * 2 ** 32 + \
                         3] * 2 ** 48
            channel_id = data[:, 4] + data[:, 5] * 2 ** 16
            cell_number = data[:, 6] + data[:, 7] * 2 ** 16
            features = [data[:, p] + data[:, p + 1] * 2 ** 16 for p in
                        range(8, 23, 2)]
            features = np.array(features, dtype='i4')

            data_points = data[:, 24:56].astype('i2')
            del data
            return timestamps, channel_id, cell_number, features, data_points
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_data(self, filename):
        """ Memory map of the Neuralynx .ncs file optimized for data
        if getsize(self.sessiondir + sep + filename) > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype=np.dtype(('i2', (522))), mode='r',
            # removing data packet headers and flattening data
            return data[:, 10:]
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ntt_file(self, filename):
        """ Memory map the Neuralynx .nse file """
        nse_dtype = np.dtype([
            ('timestamp', '<u8'),
            ('sc_number', '<u4'),
            ('cell_number', '<u4'),
            ('params', '<u4', (8,)),
            ('data', '<i2', (32, 4)),
        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nse_dtype, mode='r', offset=16384)
            return None