Python io 模块,IOBase() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.IOBase()

项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def __is_or_has_file(self, data):
        '''
        Figure out if we have been given a file-like object as one of the inputs to the function that called this.
        Is a bit clunky because 'file' doesn't exist as a bare-word type check in Python 3 and built in file objects
        are not instances of io.<anything> in Python 2

        https://stackoverflow.com/questions/1661262/check-if-object-is-file-like-in-python
        Returns:
            Boolean - True if we have a file-like object
        '''
        if (hasattr(data, 'file')):
            data = data.file

        try:
            return isinstance(data, file)
        except NameError:
            from io import IOBase
            return isinstance(data, IOBase)
项目:python-mysql-pool    作者:LuciferJack    | 项目源码 | 文件源码
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
项目:handfontgen    作者:nixeneko    | 项目源码 | 文件源码
def outputpapertemplate(self, dest, listchar, output=None):
        if output == None:
            output = PyPDF2.PdfFileWriter()

        while listchar:
            iopage = self.outputtemplateonepage(listchar)
            page = PyPDF2.PdfFileReader(iopage)
            output.addPage(page.getPage(0))

        if dest != None:
            if isinstance(dest, str): # when dest is a file path
                destdir = os.path.dirname(dest)
                if destdir != '' and not os.path.isdir(destdir):
                    os.makedirs(destdir)
                with open(dest, "wb") as w:
                    output.write(w)
            else: # when dest is io.IOBase
                output.write(dest)
        else:
            return output
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def __init__(self, record_type=None, record_name=None, data=None):
        self._message_begin = self._message_end = False
        self._type = self._name = self._data = ''
        if not (record_type is None and record_name is None):
            self.type = record_type if record_type is not None else 'unknown'
            if record_name is not None:
                self.name = record_name
            if data is not None:
                self.data = data
        elif data is not None:
            if isinstance(data, (bytearray, str)):
                data = io.BytesIO(data)
            if isinstance(data, io.IOBase):
                self._read(data)
            else:
                raise TypeError("invalid data argument type")
项目:python-ebml    作者:QBobWatson    | 项目源码 | 文件源码
def __init__(self, f, summary=True):
        """Args:
         + f: Either a file name or a seekable binary stream.
         + summary: If True, call self.read_summary().
        """
        super().__init__(0)
        if isinstance(f, IOBase):
            self.stream = f
        else:
            self.stream = open(f, 'rb')
        self.stream.seek(0, SEEK_END)
        self.stream_size = self.stream.tell()
        self.stream.seek(0, SEEK_SET)

        if summary:
            self.read_summary()
项目:importacsv    作者:rasertux    | 项目源码 | 文件源码
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
项目:imgkit    作者:jarrekk    | 项目源码 | 文件源码
def _find_options_in_meta(self, content):
        """Reads 'content' and extracts options encoded in HTML meta tags

        :param content: str or file-like object - contains HTML to parse

        returns:
          dict: {config option: value}
        """
        if (isinstance(content, io.IOBase)
            or content.__class__.__name__ == 'StreamReaderWriter'):
            content = content.read()

        found = {}

        for x in re.findall('<meta [^>]*>', content):
            if re.search('name=["\']%s' % self.config.meta_tag_prefix, x):
                name = re.findall('name=["\']%s([^"\']*)' %
                                  self.config.meta_tag_prefix, x)[0]
                found[name] = re.findall('content=["\']([^"\']*)', x)[0]

        return found
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_wrap_non_iobase():
    class FakeFile:
        def close(self):  # pragma: no cover
            pass

        def write(self):  # pragma: no cover
            pass

    wrapped = FakeFile()
    assert not isinstance(wrapped, io.IOBase)

    async_file = trio.wrap_file(wrapped)
    assert isinstance(async_file, AsyncIOWrapper)

    del FakeFile.write

    with pytest.raises(TypeError):
        trio.wrap_file(FakeFile())
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def __init__(self, record_type=None, record_name=None, data=None):
        self._message_begin = self._message_end = False
        self._type = self._name = self._data = ''
        if not (record_type is None and record_name is None):
            self.type = record_type if record_type is not None else 'unknown'
            if record_name is not None:
                self.name = record_name
            if data is not None:
                self.data = data
        elif data is not None:
            if isinstance(data, (bytearray, str)):
                data = io.BytesIO(data)
            if isinstance(data, io.IOBase):
                self._read(data)
            else:
                raise TypeError("invalid data argument type")
项目:bandit-ss    作者:zeroSteiner    | 项目源码 | 文件源码
def wrap_file_object(fileobj):
    """Handle differences in Python 2 and 3 around writing bytes."""
    # If it's not an instance of IOBase, we're probably using Python 2 and
    # that is less finnicky about writing text versus bytes to a file.
    if not isinstance(fileobj, io.IOBase):
        return fileobj

    # At this point we're using Python 3 and that will mangle text written to
    # a file written in bytes mode. So, let's check if the file can handle
    # text as opposed to bytes.
    if isinstance(fileobj, io.TextIOBase):
        return fileobj

    # Finally, we've determined that the fileobj passed in cannot handle text,
    # so we use TextIOWrapper to handle the conversion for us.
    return io.TextIOWrapper(fileobj)
项目:editolido    作者:flyingeek    | 项目源码 | 文件源码
def get_gramet_image_url(url_or_fp):
    img_src = ''
    if isinstance(url_or_fp, io.IOBase):
        # noinspection PyUnresolvedReferences
        data = url_or_fp.read()
        u = urlsplit(OGIMET_URL)
    else:
        u = urlsplit(url_or_fp)
        import requests
        r = requests.get(url_or_fp)
        data = r.text
    if data:
        m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data)
        if m:
            img_src = "{url.scheme}://{url.netloc}{path}".format(
                url=u, path=m.group(1))
    return img_src
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def copy_file_data(src_file, dst_file, chunk_size=None):
    """Copy data from one file object to another.

    Arguments:
        src_file (io.IOBase): File open for reading.
        dst_file (io.IOBase): File open for writing.
        chunk_size (int, optional): Number of bytes to copy at
            a time (or `None` to use sensible default).

    """
    chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE
    read = src_file.read
    write = dst_file.write
    # The 'or None' is so that it works with binary and text files
    for chunk in iter(lambda: read(chunk_size) or None, None):
        write(chunk)
项目:Remarkable    作者:jamiemcg    | 项目源码 | 文件源码
def _find_options_in_meta(self, content):
        """Reads 'content' and extracts options encoded in HTML meta tags

        :param content: str or file-like object - contains HTML to parse

        returns:
          dict: {config option: value}
        """
        if (isinstance(content, io.IOBase)
                or content.__class__.__name__ == 'StreamReaderWriter'):
            content = content.read()

        found = {}

        for x in re.findall('<meta [^>]*>', content):
            if re.search('name=["\']%s' % self.configuration.meta_tag_prefix, x):
                name = re.findall('name=["\']%s([^"\']*)' %
                                  self.configuration.meta_tag_prefix, x)[0]
                found[name] = re.findall('content=["\']([^"\']*)', x)[0]

        return found
项目:wechat-async-sdk    作者:ihjmh    | 项目源码 | 文件源码
def _upload_media_py3(self, media_type, media_file, extension=''):
        if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
            extension = media_file.name.split('.')[-1].lower()
            if not is_allowed_extension(extension):
                raise ValueError('Invalid file type.')
            filename = media_file.name
        elif isinstance(media_file, io.BytesIO):
            extension = extension.lower()
            if not is_allowed_extension(extension):
                raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
            filename = 'temp.' + extension
        else:
            raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')

        return self.request.post(
            url='https://api.weixin.qq.com/cgi-bin/media/upload',
            params={
                'type': media_type,
            },
            files={
                'media': (filename, media_file, convert_ext_to_mime(extension))
            }
        )
项目:ivport-v2    作者:ivmech    | 项目源码 | 文件源码
def truncate(self, size=None):
        """
        Resize the stream to the given size in bytes (or the current position
        if size is not specified). This resizing can extend or reduce the
        current file size.  The new file size is returned.

        In prior versions of picamera, truncation also changed the position of
        the stream (because prior versions of these stream classes were
        non-seekable). This functionality is now deprecated; scripts should
        use :meth:`~io.IOBase.seek` and :meth:`truncate` as one would with
        regular :class:`~io.BytesIO` instances.
        """
        if size is not None:
            warnings.warn(
                PiCameraDeprecated(
                    'This method changes the position of the stream to the '
                    'truncated length; this is deprecated functionality and '
                    'you should not rely on it (seek before or after truncate '
                    'to ensure position is consistent)'))
        super(PiArrayOutput, self).truncate(size)
        if size is not None:
            self.seek(size)
项目:resolwe-bio-py    作者:genialis    | 项目源码 | 文件源码
def test_good_response(self, resolwe_mock, requests_mock, os_mock, open_mock):
        resolwe_mock.configure_mock(**self.config)
        os_mock.path.isfile.return_value = True

        # When mocking open one wants it to return a "file-like" mock: (spec=io.IOBase)
        mock_open.return_value = MagicMock(spec=io.IOBase)

        requests_mock.get.return_value = MagicMock(ok=True,
                                                   **{'iter_content.return_value': range(3)})

        Resolwe._download_files(resolwe_mock, self.file_list)
        self.assertEqual(resolwe_mock.logger.info.call_count, 3)

        # This asserts may seem wierd. To check what is happening behind the scenes:
        # print(open_mock.mock_calls)
        self.assertEqual(open_mock.return_value.__enter__.return_value.write.call_count, 6)
        # Why 6? 2 files in self.file_list, each downloads 3 chunks (defined in response mock)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def add_fields(self, *fields):
        to_add = list(fields)

        while to_add:
            rec = to_add.pop(0)

            if isinstance(rec, io.IOBase):
                k = guess_filename(rec, 'unknown')
                self.add_field(k, rec)

            elif isinstance(rec, (MultiDictProxy, MultiDict)):
                to_add.extend(rec.items())

            elif isinstance(rec, (list, tuple)) and len(rec) == 2:
                k, fp = rec
                self.add_field(k, fp)

            else:
                raise TypeError('Only io.IOBase, multidict and (name, file) '
                                'pairs allowed, use .add_field() for passing '
                                'more complex parameters, got {!r}'
                                .format(rec))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, obj, headers=None, *, chunk_size=8192):
        if headers is None:
            headers = CIMultiDict()
        elif not isinstance(headers, CIMultiDict):
            headers = CIMultiDict(headers)

        self.obj = obj
        self.headers = headers
        self._chunk_size = chunk_size
        self._fill_headers_with_defaults()

        self._serialize_map = {
            bytes: self._serialize_bytes,
            str: self._serialize_str,
            io.IOBase: self._serialize_io,
            MultipartWriter: self._serialize_multipart,
            ('application', 'json'): self._serialize_json,
            ('application', 'x-www-form-urlencoded'): self._serialize_form
        }
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _guess_content_length(self, obj):
        if isinstance(obj, bytes):
            return len(obj)
        elif isinstance(obj, str):
            *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
            charset = params.get('charset', 'us-ascii')
            return len(obj.encode(charset))
        elif isinstance(obj, io.StringIO):
            *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
            charset = params.get('charset', 'us-ascii')
            return len(obj.getvalue().encode(charset)) - obj.tell()
        elif isinstance(obj, io.BytesIO):
            return len(obj.getvalue()) - obj.tell()
        elif isinstance(obj, io.IOBase):
            try:
                return os.fstat(obj.fileno()).st_size - obj.tell()
            except (AttributeError, OSError):
                return None
        else:
            return None
项目:TransIP-STACK-API    作者:Paradoxis    | 项目源码 | 文件源码
def upload(self, file, name: str=None, path: str=None) -> StackFile:
        """
        Upload a file to Stack
        :param file: IO pointer or string containing a path to a file
        :param name: Custom name which will be used on the remote server, defaults to file.name
        :param path: Path to upload it to, defaults to current working directory
        :return: Instance of a stack file
        """
        if not path:
            path = self.__cwd

        if isinstance(file, IOBase):
            return self.__upload(file=file, path=path, name=name)

        if isinstance(file, str):
            with open(file, "rb") as fd:
                return self.__upload(file=fd, path=path, name=name)

        raise StackException("File should either be a path to a file on disk or an IO type, got: {}".format(type(file)))
项目:SDK-python    作者:RecastAI    | 项目源码 | 文件源码
def analyse_file(self, filename, token=None, language=None):
    token = token or self.token
    if token is None:
      raise RecastError("Token is missing")

    language = language or self.language

    filename = open(filename, 'rb') if not isinstance(filename, io.IOBase) else filename

    body = {'voice': filename}
    if language is not None:
      body['language'] = language

    response = requests.post(
      Utils.REQUEST_ENDPOINT,
      files=body,
      headers={'Authorization': "Token {}".format(token)}
    )
    if response.status_code != requests.codes.ok:
      raise RecastError(response.json().get('message', ''))

    return Response(response.json()['results'])
项目:cyaron    作者:luogu-dev    | 项目源码 | 文件源码
def close(self):
        """Delete the IO object and close the input file and the output file"""
        if self.__closed:
            # avoid double close
            return
        deleted = False
        try:
            # on posix, one can remove a file while it's opend by a process
            # the file then will be not visable to others, but process still have the file descriptor
            # it is recommand to remove temp file before close it on posix to avoid race
            # on nt, it will just fail and raise OSError so that after closing remove it again
            self.__del_files()
            deleted = True
        except OSError:
            pass
        if isinstance(self.input_file, IOBase):
            self.input_file.close()
        if isinstance(self.output_file, IOBase):
            self.output_file.close()
        if not deleted:
            self.__del_files()
        self.__closed = True
项目:aws-encryption-sdk-python    作者:awslabs    | 项目源码 | 文件源码
def __new__(cls, **kwargs):
        """Patch for abstractmethod-like enforcement in io.IOBase grandchildren."""
        if (
                not (hasattr(cls, '_read_bytes') and callable(cls._read_bytes))
                or not (hasattr(cls, '_prep_message') and callable(cls._read_bytes))
                or not hasattr(cls, '_config_class')
        ):
            raise TypeError("Can't instantiate abstract class {}".format(cls.__name__))

        instance = super(_EncryptionStream, cls).__new__(cls)

        config = kwargs.pop('config', None)
        if not isinstance(config, instance._config_class):  # pylint: disable=protected-access
            config = instance._config_class(**kwargs)  # pylint: disable=protected-access
        instance.config = config

        instance.bytes_read = 0
        instance.output_buffer = b''
        instance._message_prepped = False  # pylint: disable=protected-access
        instance.source_stream = instance.config.source
        instance._stream_length = instance.config.source_length  # pylint: disable=protected-access

        return instance
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def deserialize(data):
    if isinstance(data, IOBase):
        try:
            data.seek(0)
        except UnsupportedOperation:
            pass
        if hasattr(data, 'readall'):
            data = data.readall()
        else:
            data = data.read()
    if isinstance(data, bytes):
        data = str(data, encoding='utf-8')
    if isinstance(data, str):
        try:
            data = json.loads(data, object_hook=collections.OrderedDict)
        except json.JSONDecodeError as e:
            data = yaml.load(data)
    return data
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def is_file(ob):
    return isinstance(ob, io.IOBase)
项目:mbin    作者:fanglab    | 项目源码 | 文件源码
def _is_filelike_object(f):
    try:
        return isinstance(f, (file, io.IOBase))
    except NameError:
        # 'file' is not a class in python3
        return isinstance(f, io.IOBase)
项目:synergy-service    作者:openstack    | 项目源码 | 文件源码
def _is_file(f):
        return isinstance(f, io.IOBase)
项目:aiosparql    作者:aio-libs    | 项目源码 | 文件源码
def put(self, data: Union[bytes, IOBase], *, format: str,
                  graph: Optional[IRI] = None):
        async with self._crud_request("PUT", graph=graph, data=data,
                                      content_type=format) as resp:
            resp.raise_for_status()
项目:aiosparql    作者:aio-libs    | 项目源码 | 文件源码
def post(self, data: Union[bytes, IOBase], *, format: str,
                   graph: Optional[IRI] = None):
        async with self._crud_request("POST", graph=graph, data=data,
                                      content_type=format) as resp:
            resp.raise_for_status()
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def __iter__(self):  # type: ignore
        # Until https://github.com/python/typing/issues/11
        # there's no good way to tell mypy about custom
        # iterators that subclass io.IOBase.
        """Let this class act as an iterator."""
        return self
项目:uniq    作者:CiscoDevNet    | 项目源码 | 文件源码
def sanitize_for_serialization(obj):
        """
        Sanitize an object for Request.

        If obj is None, return None.
        If obj is str, int, float, bool, return directly.
        If obj is datetime.datetime, datetime.date convert to string in iso8601 format.
        If obj is list, santize each element in the list.
        If obj is dict, return the dict.
        If obj is swagger model, return the properties dict.
        """
        if isinstance(obj, type(None)):
            return None
        elif isinstance(obj, (str, int, float, bool, io.IOBase, tuple)):
            return obj
        elif isinstance(obj, list):
            return [NbClientManager.sanitize_for_serialization(sub_obj) for sub_obj in obj]
        elif isinstance(obj, (datetime.datetime, datetime.date)):
            return obj.isoformat()
        else:
            if isinstance(obj, dict):
                obj_dict = obj
            else:
                # Convert model obj to dict except attributes `swaggerTypes`, `attributeMap`
                # and attributes which value is not None.
                # Convert attribute name to json key in model definition for
                # request.
                obj_dict = {obj.attributeMap[key]: val
                            for key, val in obj.__dict__.items()
                            if key != 'swaggerTypes' and key != 'attributeMap' and val is not None}
            return {key: NbClientManager.sanitize_for_serialization(val)
                    for (key, val) in obj_dict.items()}
项目:heaviside    作者:jhuapl-boss    | 项目源码 | 文件源码
def read(obj):
    """Context manager for reading data from multiple sources as a file object

    Args:
        obj (string|Path|file object): Data to read / read from
                                  If obj is a file object, this is just a pass through
                                  If obj is a Path object, this is similar to obj.open()
                                  If obj is a string, this creates a StringIO so
                                     the data can be read like a file object

    Returns:
        file object: File handle containing data
    """
    try: # Python 2 compatibility
        is_unicode = isinstance(obj, unicode)
    except NameError:
        is_unicode = False

    is_open = False
    if isinstance(obj, Path):
        fh = obj.open()
        is_open = True
    elif isinstance(obj, str) or is_unicode:
        fh = StringIO(obj)
        fh.name = '<string>'
    elif isinstance(obj, IOBase):
        fh = obj
    else:
        raise Exception("Unknown input type {}".format(type(obj).__name__))

    try:
        yield fh
    finally:
        if is_open:
            fh.close()
项目:scratchdir    作者:ahawker    | 项目源码 | 文件源码
def is_file_like_obj(obj):
    """
    Helper function to check that the given object implements all public methods of the
    :class:`~io.IOBase` abstract class.
    """
    return all((hasattr(obj, name) for name in dir(io.IOBase) if not name.startswith('_')))
项目:scratchdir    作者:ahawker    | 项目源码 | 文件源码
def test_scratch_file_supports_file_obj_interface(active_scratch_dir, method_name):
    """
    Assert that methods of :class:`~scratchdir.ScratchDir` that are expected to return file-like objects
    do so and these objects implement, atleast, the :class:`~io.IOBase` interface.
    """
    method = getattr(active_scratch_dir, method_name)
    assert is_file_like_obj(method())
项目:pyChomikBox    作者:JuniorJPDJ    | 项目源码 | 文件源码
def __init__(self, file, start):
        assert hasattr(file, 'read') and hasattr(file, 'tell') and hasattr(file, 'seek')
        assert isinstance(start, int)

        self.file, self.start = file, start
        self.total_len = total_len(file)
        self.len = self.total_len - start

        io.IOBase.__init__(self)

        try:
            self.seek(0)
        except:
            pass
项目:plone.server    作者:plone    | 项目源码 | 文件源码
def store(self, data, blob):
        if not isinstance(data, io.IOBase):
            raise NotStorable('Could not store data (not of "file").')

        filename = getattr(data, 'name', None)
        if filename is not None:
            blob.consumeFile(filename)
            return
项目:imgkit    作者:jarrekk    | 项目源码 | 文件源码
def isFile(self, path=None):
        # dirty hack to check where file is opened with codecs module
        # (because it returns 'instance' type when encoding is specified
        if path:
            return isinstance(path, io.IOBase) or path.__class__.__name__ == 'StreamReaderWriter'
        else:
            return 'file' in self.type
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def aclose(self):
        """Like :meth:`io.IOBase.close`, but async.

        This is also shielded from cancellation; if a cancellation scope is
        cancelled, the wrapped file object will still be safely closed.

        """

        # ensure the underling file is closed during cancellation
        with _core.open_cancel_scope(shield=True):
            await trio.run_sync_in_worker_thread(self._wrapped.close)

        await _core.checkpoint_if_cancelled()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _is_file(f):
        return isinstance(f, io.IOBase)
项目:python-matchlightsdk    作者:TerbiumLabs    | 项目源码 | 文件源码
def test_feed_download_output(mock_open, connection, feed, start_time,
                              end_time, feed_download_url, feed_report_csv):
    """Verifies feed download writing to a file."""
    mock_open.return_value = mock.MagicMock(spec=io.IOBase)
    httpretty.register_uri(
        httpretty.POST,
        '{}/feed/{}/prepare'.format(
            matchlight.MATCHLIGHT_API_URL_V2,
            feed.name),
        body=json.dumps({'feed_response_id': 1}),
        content_type='application/json', status=200)
    httpretty.register_uri(
        httpretty.POST,
        '{}/feed/{}/link'.format(
            matchlight.MATCHLIGHT_API_URL_V2,
            feed.name),
        responses=[
            httpretty.Response(
                body=json.dumps({'status': 'pending'}),
                content_type='application/json',
                status=200),
            httpretty.Response(
                body=json.dumps({
                    'status': 'ready',
                    'url': feed_download_url,
                }),
                content_type='application/json',
                status=200),
        ],
    )

    body = '\n'.join(feed_report_csv).encode('utf-8')
    httpretty.register_uri(
        httpretty.GET, feed_download_url,
        content_type='text/csv',
        body=body)
    connection.feeds.download(
        feed, start_time, end_time, save_path='/tmp/output')

    file_handle = mock_open.return_value.__enter__.return_value
    file_handle.write.assert_called_once_with(body)
项目:pyfan    作者:raptorz    | 项目源码 | 文件源码
def isIOBase(obj):
        return isinstance(obj, IOBase)
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def _get_file_object(csvfile, encoding=None):
        if isinstance(csvfile, str):
            assert encoding, 'encoding required for file path'
            return open(csvfile, 'rt', encoding=encoding, newline='')  # <- EXIT!

        if hasattr(csvfile, 'mode'):
            assert 'b' not in csvfile.mode, "File must be open in text mode ('rt')."
        elif issubclass(csvfile.__class__, io.IOBase):
            assert issubclass(csvfile.__class__, io.TextIOBase), ("Stream object must inherit "
                                                                  "from io.TextIOBase.")
        return csvfile
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def _py2_get_file_object(csvfile, encoding):
        if isinstance(csvfile, str):
            return open(csvfile, 'rb')  # <- EXIT!

        if hasattr(csvfile, 'mode'):
            assert 'b' in csvfile.mode, ("When using Python 2, file must "
                                         "be open in binary mode ('rb').")
        elif issubclass(csvfile.__class__, io.IOBase):
            assert not issubclass(csvfile.__class__, io.TextIOBase), ("When using Python 2, "
                                                                      "must use byte stream "
                                                                      "(not text stream).")
        return csvfile
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def from_csv(cls, file, encoding=None, **fmtparams):
        """Create a DataSource from a CSV *file* (a path or file-like
        object)::

            source = datatest.DataSource.from_csv('mydata.csv')

        If *file* is an iterable of files, data will be loaded and
        aligned by column name::

            files = ['mydata1.csv', 'mydata2.csv']
            source = datatest.DataSource.from_csv(files)
        """
        if isinstance(file, string_types) or isinstance(file, IOBase):
            file = [file]

        new_cls = cls.__new__(cls)
        temptable = _from_csv(file, encoding, **fmtparams)
        new_cls._connection = temptable.connection
        new_cls._table = temptable.name

        repr_string = '{0}.from_csv({1}{2}{3})'.format(
            new_cls.__class__.__name__,
            repr(file[0]) if len(file) == 1 else repr(file),
            ', {0!r}'.format(encoding) if encoding else '',
            ', **{0!r}'.format(fmtparams) if fmtparams else '',
        )
        new_cls._repr_string = repr_string

        return new_cls
项目:django-bench-runner    作者:scuml    | 项目源码 | 文件源码
def _is_file(f):
        return isinstance(f, io.IOBase)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def file_output(file_object):
    """
    Writes strings to a file, making sure there's a newline at the end

    :param:

     - `file_object`: opened, writable file or name of file to open
    """
    from io import IOBase
    if not isinstance(file_object, IOBase):
        file_object = open(file_object, WRITEABLE)
    while True:
        line = (yield)
        line = line.rstrip(NEWLINE) + NEWLINE
        file_object.write(line)
项目:HackInTheNorth-PYRAG    作者:npcoder2k14    | 项目源码 | 文件源码
def _is_file(f):
        return isinstance(f, io.IOBase)
项目:HackInTheNorth-PYRAG    作者:npcoder2k14    | 项目源码 | 文件源码
def _is_file(f):
        return isinstance(f, io.IOBase)
项目:nml-andythenorth    作者:andythenorth    | 项目源码 | 文件源码
def assemble_file(self, real_file):
        """
        File is about to be closed, last chance to append data.

        @param real_file: Actual output stream.
        @type  real_file: C{io.IOBase}
        """
        real_file.write(self.file.getvalue())