Python lzma 模块,open() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lzma.open()

项目:fem    作者:mlp6    | 项目源码 | 文件源码
def read_header(dispout):
    """ Read header (first 3 words) from disp.dat

    :param dispout: disp.dat filename
    :returns: header (num_nodes, num_dims, num_timesteps)

    """
    import struct

    word_size = 4  # bytes
    if dispout.endswith('.xz'):
        import lzma
        d = lzma.open(dispout, 'rb')
    else:
        d = open(dispout, 'rb')
    num_nodes = struct.unpack('f', d.read(word_size))
    num_dims = struct.unpack('f', d.read(word_size))
    num_timesteps = struct.unpack('f', d.read(word_size))
    header = {'num_nodes': int(num_nodes[0]),
              'num_dims': int(num_dims[0]),
              'num_timesteps': int(num_timesteps[0])}
    return header
项目:fem    作者:mlp6    | 项目源码 | 文件源码
def extract_dt(dyn_file):
    """ extract time step (dt) from dyna input deck

    assumes that input deck is comma-delimited

    :param dyn_file: input.dyn filename
    :returns: dt from input.dyn binary data save parameter

    """
    found_database = False
    with open(dyn_file, 'r') as d:
        for dyn_line in d:
            if found_database:
                line_items = dyn_line.split(',')
                # make sure we're not dealing with a comment
                if '$' in line_items[0]:
                    continue
                else:
                    dt = float(line_items[0])
                    break
            elif '*DATABASE_NODOUT' in dyn_line:
                found_database = True

    return dt
项目:jarbas    作者:datasciencebr    | 项目源码 | 文件源码
def save_companies(self):
        """
        Receives path to the dataset file and create a Company object for
        each row of each file. It creates the related activity when needed.
        """
        skip = ('main_activity', 'secondary_activity')
        keys = tuple(f.name for f in Company._meta.fields if f not in skip)
        with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
            for row in csv.DictReader(file_handler):
                main, secondary = self.save_activities(row)

                filtered = {k: v for k, v in row.items() if k in keys}
                obj = Company.objects.create(**self.serialize(filtered))
                for activity in main:
                    obj.main_activity.add(activity)
                for activity in secondary:
                    obj.secondary_activity.add(activity)
                obj.save()

                self.count += 1
                self.print_count(Company, count=self.count)
项目:C2L2    作者:CoNLL-UD-2017    | 项目源码 | 文件源码
def load_embeddings(self, filename, xz=False):
        if not os.path.isfile(filename):
            print(filename, "does not exist")
            return self

        if xz:
            f = lzma.open(filename, "rt", encoding="utf-8", errors="ignore")
        else:
            f = open(filename, "r")
        found_set = set()
        for line in f:
            l = line.split()
            word = strong_normalize(l[0])
            vec = [float(x) for x in l[1:]]
            if word in self._vocab:
                found_set.add(word)
                self._word_lookup.init_row(self._vocab[word], vec)
        f.close()
        print("Loaded embeddings from", filename)
        print(len(found_set), "hits with vocab size of", len(self._vocab))

        return self
项目:udapi-python    作者:udapi    | 项目源码 | 文件源码
def _token_to_filenames(token):
        if token[0] == '!':
            pattern = token[1:]
            filenames = glob.glob(pattern)
            if not filenames:
                raise RuntimeError('No filenames matched "%s" pattern' % pattern)
        elif token[0] == '@':
            filelist_name = sys.stdin if token == '@-' else token[1:]
            with open(filelist_name) as filelist:
                filenames = [line.rstrip('\n') for line in filelist]
            directory = os.path.dirname(token[1:])
            if directory != '.':
                filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames]
        else:
            filenames = token
        return filenames
项目:udapi-python    作者:udapi    | 项目源码 | 文件源码
def next_filehandle(self):
        """Go to the next file and retrun its filehandle or None (meaning no more files)."""
        filename = self.next_filename()
        if filename is None:
            fhandle = None
        elif filename == '-':
            fhandle = sys.stdin
        else:
            filename_extension = filename.split('.')[-1]
            if filename_extension == 'gz':
                myopen = gzip.open
            elif filename_extension == 'xz':
                myopen = lzma.open
            elif filename_extension == 'bz2':
                myopen = bz2.open
            else:
                myopen = open
            fhandle = myopen(filename, 'rt', encoding=self.encoding)
        self.filehandle = fhandle
        return fhandle
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def open_regular_or_compressed(filename):
    if filename is None:
        return sys.stdin

    if hasattr(filename, 'read'):
        fobj = filename
    else:
        f = filename.lower()
        ext = f.rsplit('.', 1)[-1]
        if ext == 'gz':
            import gzip
            fobj = gzip.GzipFile(filename)
        elif ext == 'bz2':
            import bz2
            fobj = bz2.BZ2File(filename)
        elif ext == 'xz':
            import lzma
            fobj = lzma.open(filename)
        else:
            fobj = open(filename)
    return fobj
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def _convert_any_to_vw(source, format, output, weights, preprocessor, columnspec, named_labels, remap_label, ignoreheader):
    if named_labels is not None:
        assert not isinstance(named_labels, basestring)
        named_labels = set(named_labels)

    rows_source = open_anything(source, format, ignoreheader=ignoreheader)
    output = open(output, 'wb')

    for row in rows_source:
        try:
            vw_line = convert_row_to_vw(row, columnspec, preprocessor=preprocessor, weights=weights, named_labels=named_labels, remap_label=remap_label)
        except Exception:
            log_always('Failed to parse: %r', row)
            raise
        output.write(vw_line)

    flush_and_close(output)
项目:atropos    作者:jdidion    | 项目源码 | 文件源码
def open_compressed_file(filename, mode):
    """Open a compressed file, determining the compression type based on the
    file name.

    Args:
        filename: The file to open.
        mode: The file open mode.

    Returns:
        The opened file.
    """
    ext = os.path.splitext(filename)
    opener = get_file_opener(ext)
    if not opener:
        raise ValueError("{} is not a recognized compression format")
    return opener(filename, mode)
项目:grocsvs    作者:grocsvs    | 项目源码 | 文件源码
def __init__(self, path, mode='w'):
        self.outfile = open(path, mode)
        self.devnull = open(os.devnull, 'w')
        self.closed = False

        # Setting close_fds to True in the Popen arguments is necessary due to
        # <http://bugs.python.org/issue12786>.
        kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull, close_fds=True)
        try:
            self.process = Popen(['pigz'], **kwargs)
            self.program = 'pigz'
        except OSError as e:
            # binary not found, try regular gzip
            try:
                self.process = Popen(['gzip'], **kwargs)
                self.program = 'gzip'
            except (IOError, OSError) as e:
                self.outfile.close()
                self.devnull.close()
                raise
        except IOError as e:
            self.outfile.close()
            self.devnull.close()
            raise
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def download(self):
        """
            Downloads the latest iOS gadget.

            :return:
        """

        download_url = self._get_download_url()

        # stream the download using requests
        dylib = requests.get(download_url, stream=True)

        # save the requests stream to file
        with open(self.ios_dylib_gadget_archive_path, 'wb') as f:
            click.secho('Downloading iOS dylib to {0}...'.format(self.ios_dylib_gadget_archive_path),
                        fg='green', dim=True)

            shutil.copyfileobj(dylib.raw, f)

        return self
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def set_application_binary(self, binary: str = None) -> None:
        """
            Sets the binary that will be patched.

            If a binary is not defined, the applications Info.plist is parsed
            and the CFBundleIdentifier key read.

            :param binary:
            :return:
        """

        if binary is not None:
            click.secho('Using user provided binary name of: {0}'.format(binary))
            self.app_binary = os.path.join(self.app_folder, binary)

            return

        with open(os.path.join(self.app_folder, 'Info.plist'), 'rb') as f:
            info_plist = plistlib.load(f)

        # print the bundle identifier
        click.secho('Bundle identifier is: {0}'.format(info_plist['CFBundleIdentifier']),
                    fg='green', bold=True)

        self.app_binary = os.path.join(self.app_folder, info_plist['CFBundleExecutable'])
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def download(self):
        """
            Downloads the latest Android gadget for this
            architecture.

            :return:
        """

        download_url = self._get_download_url()

        # stream the download using requests
        library = requests.get(download_url, stream=True)
        library_destination = self.get_frida_library_path(packed=True)

        # save the requests stream to file
        with open(library_destination, 'wb') as f:
            click.secho('Downloading {0} library to {1}...'.format(self.architecture,
                                                                   library_destination), fg='green', dim=True)

            shutil.copyfileobj(library.raw, f)

        return self
项目:fem    作者:mlp6    | 项目源码 | 文件源码
def open_dispout(dispout):
    """open dispout file for reading

    :param dispout: (str) dispout filename (disp.dat)
    :return: dispout file object
    """
    if dispout.endswith('.xz'):
        import lzma
        dispout = lzma.open(dispout, 'rb')
    else:
        dispout = open(dispout, 'rb')

    return dispout
项目:fem    作者:mlp6    | 项目源码 | 文件源码
def create_dat(nodout="nodout", dispout="disp.dat", legacynodes=False):
    """create binary data file

    :param str nodout: nodout file created by ls-dyna (default="nodout")
    :param str dispout: default = "disp.dat"
    :param boolean legacynodes: node IDs written every timestep (default=False)
    """
    header_written = False
    timestep_read = False
    timestep_count = 0
    writenode = True

    with open(nodout, 'r') as nodout:
        with open_dispout(dispout) as dispout:
            for line in nodout:
                if 'nodal' in line:
                    timestep_read = True
                    timestep_count += 1
                    data = []
                    continue
                if timestep_read is True:
                    if line[0:2] == '\n':  # done reading the time step
                        timestep_read = False
                        # if this was the first time, everything needed to
                        # be read to # get node count for header
                        if not header_written:
                            header = generate_header(data, nodout)
                            write_headers(dispout, header)
                            header_written = True
                            print('Time Step: ', end="", flush=True)
                        if timestep_count > 1 and not legacynodes:
                            writenode = False
                        print("%i, " % timestep_count, end="", flush=True)
                        process_timestep_data(data, dispout, writenode)
                    else:
                        raw_data = parse_line(line)
                        data.append(list(raw_data))

    print("done.", flush=True)

    return 0
项目:fem    作者:mlp6    | 项目源码 | 文件源码
def count_timesteps(outfile):
    """count timesteps written to nodout

    searches for 'time' in lines, and then removes 1 extra entry that occurs
    for t = 0

    grep will be used on linux systems (way faster)

    :param outfile: usually 'nodout'
    :returns: int ts_count

    """
    from sys import platform

    print("Reading number of time steps... ", end="", flush=True)
    if platform == "linux":
        from subprocess import PIPE, Popen
        p = Popen('grep time %s | wc -l' % outfile, shell=True, stdout=PIPE)
        ts_count = int(p.communicate()[0].strip().decode())
    else:
        print("Non-linux OS detected -> using slower python implementation",
              flush=True)
        ts_count = 0
        with open(outfile, 'r') as f:
            for line in f:
                if 'time' in line:
                    ts_count += 1

    ts_count -= 1  # rm extra time count

    print('there are {}.'.format(ts_count), flush=True)

    return ts_count
项目:jarbas    作者:datasciencebr    | 项目源码 | 文件源码
def receipts(self):
        """Returns a Generator with batches of receipts text."""
        print('Loading receipts text dataset…', end='\r')
        with lzma.open(self.path, mode='rt') as file_handler:
            batch = []
            for row in csv.DictReader(file_handler):
                batch.append(self.serialize(row))
                if len(batch) >= self.batch_size:
                    yield batch
                    batch = []
            yield batch
项目:jarbas    作者:datasciencebr    | 项目源码 | 文件源码
def suspicions(self):
        """Returns a Generator with batches of suspicions."""
        print('Loading suspicions dataset…', end='\r')
        with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
            batch = []
            for row in csv.DictReader(file_handler):
                batch.append(self.serialize(row))
                if len(batch) >= self.batch_size:
                    yield batch
                    batch = []
            yield batch
项目:jarbas    作者:datasciencebr    | 项目源码 | 文件源码
def reimbursements(self):
        """Returns a Generator with a dict object for each row."""
        with lzma.open(self.path, 'rt') as file_handler:
            yield from DictReader(file_handler)
项目:sarjitsu    作者:distributed-system-analysis    | 项目源码 | 文件源码
def verify_contents(thefile, tgt_hostname=None, callback=None):
    """
    Given a sysstat binary data file verify that it contains a set of well
    formed data values.

    The optional 'tgt_hostname' argument is checked against the file header's
    stored hostname value.

    The optional 'callback' argument, if provided, should be an instance of
    the ContentAction class, where for each magic structure, file header, file
    activity set, record header and record payload read the appropriate method
    will be invoked, with the 'eof' method invoked at the end.

    One of the following exceptions will be raised if a problem is found with
    the file:

        Invalid: The file header or record header metadata values do not make
                 sense in relation to each other

        Corruption: The file appears to be corrupted in some way

        Truncated: The file does not appear to contain all the data as
                   described by the file header or a given record header
    """
    try:
        with lzma.open(thefile, "rb") as fp:
            verify_contents_fp(fp, tgt_hostname, callback)
    except lzma.LZMAError:
        with open(thefile, "rb") as fp:
            verify_contents_fp(fp, tgt_hostname, callback)
项目:sarjitsu    作者:distributed-system-analysis    | 项目源码 | 文件源码
def fetch_fileheader(thefile):
    """
    Fetch the sysstat FileHeader object for the given file path.
    """
    try:
        with lzma.open(thefile, "rb") as fp:
            res = fetch_fileheader_with_fp(fp)
    except lzma.LZMAError:
        with open(thefile, "rb") as fp:
            res = fetch_fileheader_with_fp(fp)
    return res
项目:C2L2    作者:CoNLL-UD-2017    | 项目源码 | 文件源码
def load_vocab(self, filename):
        with open(filename, "rb") as f:
            vocab = pickle.load(f)
        self._load_vocab(vocab)
        return self
项目:C2L2    作者:CoNLL-UD-2017    | 项目源码 | 文件源码
def save_vocab(self, filename):
        with open(filename, "wb") as f:
            pickle.dump(self._fullvocab, f)
        return self
项目:C2L2    作者:CoNLL-UD-2017    | 项目源码 | 文件源码
def save_model(self, filename):
        self.save_vocab(filename + ".vocab")
        with open(filename + ".params", "wb") as f:
            pickle.dump(self._args, f)
        self._model.save(filename + ".model")
        return self
项目:C2L2    作者:CoNLL-UD-2017    | 项目源码 | 文件源码
def load_model(self, filename, **kwargs):
        self.load_vocab(filename + ".vocab")
        with open(filename + ".params", "rb") as f:
            args = pickle.load(f)
            args.update(kwargs)
            self.create_parser(**args)
        self.init_model()
        self._model.load(filename + ".model")
        return self
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def write_file(filename, data):
    if isinstance(data, list):
        data = ''.join(data)
    else:
        assert isinstance(data, str), type(data)
    if filename in STDOUT_NAMES:
        sys.stdout.write(data)
    else:
        fobj = open(filename, 'w')
        fobj.write(data)
        flush_and_close(fobj)
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def get_num_features(filename):
    counting = False
    count = 0
    for line in open(filename):
        if counting:
            count += 1
        else:
            if line.strip() == ':0':
                counting = True
    return count
项目:FreeDiscovery    作者:FreeDiscovery    | 项目源码 | 文件源码
def _load_erdm_ground_truth(outdir):
    """A helper function to load Legal TREC 2009 data"""
    with open(os.path.join(outdir, 'seed_relevant.txt'), 'rt') as fh:
        relevant_files = [el.strip() for el in fh.readlines()]

    with open(os.path.join(outdir, 'seed_non_relevant.txt'), 'rt') as fh:
        non_relevant_files = [el.strip() for el in fh.readlines()]

    if platform.system() == 'Windows':
        relevant_files = [el.replace('/', '\\') for el in relevant_files]
        non_relevant_files = [el.replace('/', '\\') for el in non_relevant_files]
    return non_relevant_files, relevant_files
项目:atropos    作者:jdidion    | 项目源码 | 文件源码
def __init__(self, path, mode='w'):
        self.name = path
        self.outfile = open(path, mode)
        self.devnull = open(os.devnull, 'w')
        self.closed = False
        try:
            # Setting close_fds to True is necessary due to
            # http://bugs.python.org/issue12786
            self.process = Popen(
                [get_program_path('gzip')], stdin=PIPE, stdout=self.outfile,
                stderr=self.devnull, close_fds=True)
        except IOError:
            self.outfile.close()
            self.devnull.close()
            raise
项目:atropos    作者:jdidion    | 项目源码 | 文件源码
def open_gzip_file(filename, mode, use_system=True):
    """Open a gzip file, preferring the system gzip program if `use_system`
    is True, falling back to the gzip python library.

    Args:
        mode: The file open mode.
        use_system: Whether to try to use the system gzip program.
    """
    if use_system:
        try:
            if 'r' in mode:
                gzfile = GzipReader(filename)
            else:
                gzfile = GzipWriter(filename)
            if 't' in mode:
                gzfile = io.TextIOWrapper(gzfile)
            return gzfile
        except:
            pass

    gzfile = gzip.open(filename, mode)
    if 'b' in mode:
        if 'r' in mode:
            gzfile = io.BufferedReader(gzfile)
        else:
            gzfile = io.BufferedWriter(gzfile)
    return gzfile
项目:atropos    作者:jdidion    | 项目源码 | 文件源码
def open_lzma_file(filename, mode, **kwargs):
    """Open a LZMA (xz) file.
    """
    return lzma.open(filename, mode)
项目:rnn-morpheme-analyzer    作者:mitaki28    | 项目源码 | 文件源码
def load(filename):
    with lzma.open(filename, 'rb') as dataset:
        while True:
            try:
                yield pickle.load(dataset)
            except EOFError:
                break
项目:rnn-morpheme-analyzer    作者:mitaki28    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(
        description='dataset generator'
    )
    parser.add_argument(
        '-p', '--possibility',
        type=float,
        default=0.9,
        help='possibility to add train dataset'
    )
    parser.add_argument(
        'source',
        help='path to mecab-processed corpus (xz compressed)'
    )
    parser.add_argument(
        'train',
        help='path for writing training dataset (xz compressed)'
    )
    parser.add_argument(
        'test',
        help='path for writing testing dataset (xz compressed)'
    )
    args = parser.parse_args()
    with lzma.open(args.source, 'rt') as source,\
         lzma.open(args.train, 'wb') as train,\
         lzma.open(args.test, 'wb') as test:
            separate(source, args.possibility, train, test)
项目:serenata-toolbox    作者:datasciencebr    | 项目源码 | 文件源码
def test_translate_csv_with_reimbursement_with_net_value_with_decimal_comma(self):
        csv_with_decimal_comma = os.path.join(self.fixtures_path, 'Ano-with-decimal-comma.csv')
        path_with_decimal_point = os.path.join(self.fixtures_path, 
             'reimbursements-with-decimal-point.csv')
        with open(path_with_decimal_point, 'r') as csv_expected:
            expected = csv_expected.read()

        xz_path = Dataset('')._translate_file(csv_with_decimal_comma)
        with lzma.open(xz_path) as xz_file:
            output = xz_file.read().decode('utf-8')
        self.assertEqual(output, expected)
项目:albemarle    作者:SeanTater    | 项目源码 | 文件源码
def glove_():
    vecs = np.memmap("glovesmall.arr", np.float32).reshape((-1, 300))
    words = open("glovewords.txt").read().splitlines()
    return dict(zip(words, vecs))
项目:albemarle    作者:SeanTater    | 项目源码 | 文件源码
def germanw2v_():
    vecs = np.memmap("german.vecbin", np.float32).reshape((-1, 300))
    words = open("german.words").read().splitlines()
    return dict(zip(words, vecs))
项目:albemarle    作者:SeanTater    | 项目源码 | 文件源码
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language.get(w, veczero) for w in words(l)]
        + ([veczero] * (n_steps - len(words(l))))
        for l in book ]
    lens = np.array([len(l) for l in book], dtype=np.int32)

    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return (book, lens)
项目:albemarle    作者:SeanTater    | 项目源码 | 文件源码
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language(w, 0) for w in l]
        + ([veczero] * (n_steps - len(l)))
        for l in book ]
    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return book
项目:albemarle    作者:SeanTater    | 项目源码 | 文件源码
def get_book(name, language):
    book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines()
    book = [
        [ language(w) for w in l]
        + ([language(' ')] * (n_steps - len(l)))
        for l in book ]
    lens = np.array([len(l) for l in book], dtype=np.int32)

    for verse in book:
        assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse))
    return (book, lens)
项目:fastq-and-furious    作者:lgautier    | 项目源码 | 文件源码
def benchmark_screed(fn):
    import screed
    total_seq = int(0)
    t0 = time.time()
    it = screed.open(fn)
    for i, e in enumerate(it):
        total_seq += len(e.sequence)
        if i % REFRESH_RATE == 0:
            t1 = time.time()
            print('\r%.2fMB/s' % (total_seq/(1E6)/(t1-t0)), end='', flush=True)
    print()
    print('%i entries' % (i+1))
项目:fastq-and-furious    作者:lgautier    | 项目源码 | 文件源码
def _opener(filename):
    if filename.endswith('.gz'):
        import gzip
        return gzip.open
    elif filename.endswith('.bz2'):
        import bz2
        return bz2.open
    elif filename.endswith('.lzma'):
        import lzma
        return lzma.open
    else:
        return open
项目:fastq-and-furious    作者:lgautier    | 项目源码 | 文件源码
def _screed_iter(fn):
    import screed
    it = screed.open(fn)
    for i, e in enumerate(it):
        yield (i, e.name.encode('ascii'), str(e.sequence).encode('ascii'))
项目:fastq-and-furious    作者:lgautier    | 项目源码 | 文件源码
def _ngs_plumbing_iter(fn, mode, buffering):
    import ngs_plumbing.fastq
    openfunc = _opener(fn)
    with open(fn, mode, buffering = buffering) as f:
        with openfunc(f) as fh: 
            it = ngs_plumbing.fastq.read_fastq(fh)
            for i, e in enumerate(it):
                yield (i, e.header[1:], e.sequence)
项目:fastq-and-furious    作者:lgautier    | 项目源码 | 文件源码
def _fastqandfurious_iter(fn, mode, buffering):
    from fastqandfurious import fastqandfurious
    bufsize = int(5E4)
    openfunc = _opener(fn)
    with open(fn, mode, buffering = buffering) as f:
        with openfunc(f) as fh: 
            it = fastqandfurious.readfastq_iter(fh, bufsize)
            for i, e in enumerate(it):
                yield (i, e.header, e.sequence)
项目:archlinux-amdgpu    作者:corngood    | 项目源码 | 文件源码
def hashFile(file):
    block = 64 * 1024
    hash = hashlib.sha256()
    with open(file, 'rb') as f:
        buf = f.read(block)
        while len(buf) > 0:
            hash.update(buf)
            buf = f.read(block)
    return hash.hexdigest()
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def unpack(self):
        """
            Unpacks a downloaded .xz gadget.

            :return:
        """

        click.secho('Unpacking {0}...'.format(self.ios_dylib_gadget_archive_path), dim=True)

        with lzma.open(self.ios_dylib_gadget_archive_path) as f:
            with open(self.ios_dylib_gadget_path, 'wb') as g:
                g.write(f.read())

        return self
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def unpack(self):
        """
            Unpacks a downloaded .xz gadget.

            :return:
        """

        click.secho('Unpacking {0}...'.format(self.get_frida_library_path(packed=True)), dim=True)

        with lzma.open(self.get_frida_library_path(packed=True)) as f:
            with open(self.get_frida_library_path(), 'wb') as g:
                g.write(f.read())

        return self
项目:nmtpy    作者:lium-lst    | 项目源码 | 文件源码
def get_temp_file(suffix="", name=None, delete=False):
    """Creates a temporary file under /tmp."""
    if name:
        name = os.path.join("/tmp", name)
        t = open(name, "w")
        cleanup.register_tmp_file(name)
    else:
        _suffix = "_nmtpy_%d" % os.getpid()
        if suffix != "":
            _suffix += suffix

        t = tempfile.NamedTemporaryFile(suffix=_suffix, delete=delete)
        cleanup.register_tmp_file(t.name)
    return t
项目:nmtpy    作者:lium-lst    | 项目源码 | 文件源码
def fopen(filename, mode=None):
    """GZ/BZ2/XZ-aware file opening function."""
    # NOTE: Mode is not used but kept for not breaking iterators.
    if filename.endswith('.gz'):
        return gzip.open(filename, 'rt')
    elif filename.endswith('.bz2'):
        return bz2.open(filename, 'rt')
    elif filename.endswith(('.xz', '.lzma')):
        return lzma.open(filename, 'rt')
    else:
        # Plain text
        return open(filename, 'r')
项目:vwoptimize    作者:denik    | 项目源码 | 文件源码
def split_file(source, nfolds=None, ignoreheader=False, importance=0, minfoldsize=10000):
    if nfolds is None:
        nfolds = 10

    if isinstance(source, basestring):
        ext = get_real_ext(source)
    else:
        ext = 'xxx'

    if hasattr(source, 'seek'):
        source.seek(0)

    # XXX already have examples_count
    total_lines = 0
    for line in open_regular_or_compressed(source):
        total_lines += 1

    if hasattr(source, 'seek'):
        source.seek(0)

    source = open_regular_or_compressed(source)

    if ignoreheader:
        source.next()
        total_lines -= 1

    foldsize = int(math.ceil(total_lines / float(nfolds)))
    foldsize = max(foldsize, minfoldsize)
    nfolds = int(math.ceil(total_lines / float(foldsize)))

    folds = []

    current_fold = -1
    count = foldsize
    current_fileobj = None
    total_count = 0
    for line in source:
        if count >= foldsize:
            if current_fileobj is not None:
                flush_and_close(current_fileobj)
                current_fileobj = None
            current_fold += 1
            if current_fold >= nfolds:
                break
            fname = get_temp_filename('fold%s.%s' % (current_fold, ext))
            current_fileobj = open(fname, 'w')
            count = 0
            folds.append(fname)
        current_fileobj.write(line)
        count += 1
        total_count += 1

    if current_fileobj is not None:
        flush_and_close(current_fileobj)

    if total_count != total_lines:
        sys.exit('internal error: total_count=%r total_lines=%r source=%r' % (total_count, total_lines, source))

    return folds, total_lines