Python io 模块,TextIOBase() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.TextIOBase()

项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1 / 0

            def readline(self):
                return 1 / 0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception as e:
            self.assertTrue('ZeroDivisionError' in str(e))
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1 / 0

            def readline(self):
                return 1 / 0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def render_samba_configuration(f: TextIOBase, volume_name: str) -> int:
    """
    Write the samba configuration file out to disk

    :param f: TextIOBase handle to the sambe config file
    :param volume_name: str
    :return: int of bytes written
    """
    bytes_written = 0
    bytes_written += f.write("[{}]\n".format(volume_name))
    bytes_written += f.write(b"path = /mnt/glusterfs\n"
                             b"read only = no\n"
                             b"guest ok = yes\n"
                             b"kernel share modes = no\n"
                             b"kernel oplocks = no\n"
                             b"map archive = no\n"
                             b"map hidden = no\n"
                             b"map read only = no\n"
                             b"map system = no\n"
                             b"store dos attributes = yes\n")
    return bytes_written
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def parse(f: IO[Any]) -> Result:
    """
    Parse a shellscript and return a ShellScript
    :param f: TextIOBase handle to the shellscript file
    :return: Result with Ok or Err
    """
    comments = []
    commands = []
    interpreter = ""

    buf = f.readlines()

    for line in buf:
        trimmed = line.strip()
        if trimmed.startswith("#!"):
            interpreter = trimmed
        elif trimmed.startswith("#"):
            comments.append(str(trimmed))
        else:
            # Skip blank lines
            if trimmed:
                commands.append(str(trimmed))
    return Ok(ShellScript(interpreter=interpreter,
                          comments=comments,
                          commands=commands))
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1 / 0

            def readline(self):
                return 1 / 0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception as e:
            self.assertTrue('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1 / 0

            def readline(self):
                return 1 / 0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:bandit-ss    作者:zeroSteiner    | 项目源码 | 文件源码
def wrap_file_object(fileobj):
    """Handle differences in Python 2 and 3 around writing bytes."""
    # If it's not an instance of IOBase, we're probably using Python 2 and
    # that is less finnicky about writing text versus bytes to a file.
    if not isinstance(fileobj, io.IOBase):
        return fileobj

    # At this point we're using Python 3 and that will mangle text written to
    # a file written in bytes mode. So, let's check if the file can handle
    # text as opposed to bytes.
    if isinstance(fileobj, io.TextIOBase):
        return fileobj

    # Finally, we've determined that the fileobj passed in cannot handle text,
    # so we use TextIOWrapper to handle the conversion for us.
    return io.TextIOWrapper(fileobj)
项目:aws-lambda-redshift-copy    作者:christianhxc    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:RKSV    作者:ztp-at    | 项目源码 | 文件源码
def skipBOM(fd):
    """
    Removes the BOM from UTF-8 files so that we can live in peace.
    :param fd: The file descriptor that may or may not have a BOM at the start.
    :return: The position after the BOM as reported by fd.tell().
    """
    try:
        pos = fd.tell()
    except IOError:
        return 0

    if isinstance(fd, io.TextIOBase):
        fst = fd.read(len(codecs.BOM_UTF8.decode('utf-8')))
        if fst.encode('utf-8') != codecs.BOM_UTF8:
            fd.seek(pos)
    else:
        fst = fd.read(len(codecs.BOM_UTF8))
        if fst != codecs.BOM_UTF8:
            fd.seek(pos)

    return fd.tell()
项目:wps_prov    作者:KNMI    | 项目源码 | 文件源码
def deserialize(self, stream, **kwargs):
        """
        Deserialize from `PROV-XML <http://www.w3.org/TR/prov-xml/>`_
        representation to a :class:`~prov.model.ProvDocument` instance.

        :param stream: Input data.
        """
        if isinstance(stream, io.TextIOBase):
            with io.BytesIO() as buf:
                buf.write(stream.read().encode('utf-8'))
                buf.seek(0, 0)
                xml_doc = etree.parse(buf).getroot()
        else:
            xml_doc = etree.parse(stream).getroot()

        # Remove all comments.
        for c in xml_doc.xpath("//comment()"):
            p = c.getparent()
            p.remove(c)

        document = prov.model.ProvDocument()
        self.deserialize_subtree(xml_doc, document)
        return document
项目:PyEloqua-Examples    作者:colemanja91    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1/0

            def readline(self):
                return 1/0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception as e:
            self.assertTrue('ZeroDivisionError' in str(e))
项目:maas    作者:maas    | 项目源码 | 文件源码
def _observe_mdns(reader, output: io.TextIOBase, verbose: bool):
    """Process the given `reader` for `avahi-browse` events.

    IO is mostly isolated in this function; the transformation functions
    `_observe_all_in_full` and `_observe_resolver_found` can be tested without
    having to deal with IO.

    :param reader: A context-manager yielding a `io.TextIOBase`.
    """
    if verbose:
        observer = _observe_all_in_full
    else:
        observer = _observe_resolver_found
    with reader as infile:
        events = _extract_mdns_events(infile)
        for event in observer(events):
            print(json.dumps(event), file=output, flush=True)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def test_copy_from_propagate_error(self):
        class BrokenRead(_base):
            def read(self, size):
                return 1 / 0

            def readline(self):
                return 1 / 0

        curs = self.conn.cursor()
        # It seems we cannot do this, but now at least we propagate the error
        # self.assertRaises(ZeroDivisionError,
        #     curs.copy_from, BrokenRead(), "tcopy")
        try:
            curs.copy_from(BrokenRead(), "tcopy")
        except Exception, e:
            self.assert_('ZeroDivisionError' in str(e))
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1 / 0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1 / 0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:charm-glusterfs    作者:openstack    | 项目源码 | 文件源码
def write(self, f: TextIOBase) -> Result:
        # Write the run control class back out to a file
        bytes_written = 0
        bytes_written += f.write("{}\n".format(self.interpreter))
        bytes_written += f.write("\n".join(self.comments))
        bytes_written += f.write("\n")
        bytes_written += f.write("\n".join(self.commands))
        bytes_written += f.write("\n")
        return Ok(bytes_written)
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def _get_file_object(csvfile, encoding=None):
        if isinstance(csvfile, str):
            assert encoding, 'encoding required for file path'
            return open(csvfile, 'rt', encoding=encoding, newline='')  # <- EXIT!

        if hasattr(csvfile, 'mode'):
            assert 'b' not in csvfile.mode, "File must be open in text mode ('rt')."
        elif issubclass(csvfile.__class__, io.IOBase):
            assert issubclass(csvfile.__class__, io.TextIOBase), ("Stream object must inherit "
                                                                  "from io.TextIOBase.")
        return csvfile
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def _py2_get_file_object(csvfile, encoding):
        if isinstance(csvfile, str):
            return open(csvfile, 'rb')  # <- EXIT!

        if hasattr(csvfile, 'mode'):
            assert 'b' in csvfile.mode, ("When using Python 2, file must "
                                         "be open in binary mode ('rb').")
        elif issubclass(csvfile.__class__, io.IOBase):
            assert not issubclass(csvfile.__class__, io.TextIOBase), ("When using Python 2, "
                                                                      "must use byte stream "
                                                                      "(not text stream).")
        return csvfile
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1 / 0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_misc(self):
        shell = MockShell()
        f = PseudoOutputFile(shell, 'stdout', 'utf-8')
        self.assertIsInstance(f, io.TextIOBase)
        self.assertEqual(f.encoding, 'utf-8')
        self.assertIsNone(f.errors)
        self.assertIsNone(f.newlines)
        self.assertEqual(f.name, '<stdout>')
        self.assertFalse(f.closed)
        self.assertTrue(f.isatty())
        self.assertFalse(f.readable())
        self.assertTrue(f.writable())
        self.assertFalse(f.seekable())
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_misc(self):
        shell = MockShell()
        f = PseudoInputFile(shell, 'stdin', 'utf-8')
        self.assertIsInstance(f, io.TextIOBase)
        self.assertEqual(f.encoding, 'utf-8')
        self.assertIsNone(f.errors)
        self.assertIsNone(f.newlines)
        self.assertEqual(f.name, '<stdin>')
        self.assertFalse(f.closed)
        self.assertTrue(f.isatty())
        self.assertTrue(f.readable())
        self.assertFalse(f.writable())
        self.assertFalse(f.seekable())
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_misc(self):
        shell = MockShell()
        f = PseudoOutputFile(shell, 'stdout', 'utf-8')
        self.assertIsInstance(f, io.TextIOBase)
        self.assertEqual(f.encoding, 'utf-8')
        self.assertIsNone(f.errors)
        self.assertIsNone(f.newlines)
        self.assertEqual(f.name, '<stdout>')
        self.assertFalse(f.closed)
        self.assertTrue(f.isatty())
        self.assertFalse(f.readable())
        self.assertTrue(f.writable())
        self.assertFalse(f.seekable())
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_misc(self):
        shell = MockShell()
        f = PseudoInputFile(shell, 'stdin', 'utf-8')
        self.assertIsInstance(f, io.TextIOBase)
        self.assertEqual(f.encoding, 'utf-8')
        self.assertIsNone(f.errors)
        self.assertIsNone(f.newlines)
        self.assertEqual(f.name, '<stdin>')
        self.assertFalse(f.closed)
        self.assertTrue(f.isatty())
        self.assertTrue(f.readable())
        self.assertFalse(f.writable())
        self.assertFalse(f.seekable())
项目:nmbs-realtime-feed    作者:datamindedbe    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1 / 0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
项目:opwen-webapp    作者:ascoderu    | 项目源码 | 文件源码
def _workspace(cls) -> TextIOBase:
        return NamedTemporaryFile()
项目:opwen-webapp    作者:ascoderu    | 项目源码 | 文件源码
def _open(cls, fileobj: BytesIO, mode: str='rb') -> TextIOBase:
        return GzipFile(fileobj=fileobj, mode=mode)
项目:opwen-webapp    作者:ascoderu    | 项目源码 | 文件源码
def _download_to_stream(self, blobname: str, container: str,
                            stream: TextIOBase) -> bool:

        try:
            self._azure_client.get_blob_to_stream(container, blobname, stream)
        except AzureMissingResourceHttpError:
            return False
        else:
            return True
项目:opwen-webapp    作者:ascoderu    | 项目源码 | 文件源码
def _upload_from_stream(self, blobname: str, stream: TextIOBase):
        self._azure_client.create_blob_from_stream(self._container,
                                                   blobname, stream)
项目:opwen-webapp    作者:ascoderu    | 项目源码 | 文件源码
def _upload_from_stream(self, blobname: str, stream: TextIOBase):
        upload_directory = path.join(getenv('AZURE_ROOT'), self._container)
        makedirs(upload_directory, exist_ok=True)
        local_filename = path.join(upload_directory, blobname)

        with open(local_filename, 'wb') as fobj:
            copyfileobj(stream, fobj)
项目:aws-lambda-redshift-copy    作者:christianhxc    | 项目源码 | 文件源码
def test_copy_to_propagate_error(self):
        class BrokenWrite(_base):
            def write(self, data):
                return 1/0

        curs = self.conn.cursor()
        curs.execute("insert into tcopy values (10, 'hi')")
        self.assertRaises(ZeroDivisionError,
            curs.copy_to, BrokenWrite(), "tcopy")
项目:odin    作者:imito    | 项目源码 | 文件源码
def is_fileobj(f):
  """ Check if an object `f` is intance of FileIO object created
  by `open()`"""
  return isinstance(f, io.TextIOBase) or \
      isinstance(f, io.BufferedIOBase) or \
      isinstance(f, io.RawIOBase) or \
      isinstance(f, io.IOBase)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_open_common(self):
        p = self.cls(BASE)
        with (p / 'fileA').open('r') as f:
            self.assertIsInstance(f, io.TextIOBase)
            self.assertEqual(f.read(), "this is file A\n")
        with (p / 'fileA').open('rb') as f:
            self.assertIsInstance(f, io.BufferedIOBase)
            self.assertEqual(f.read().strip(), b"this is file A")
        with (p / 'fileA').open('rb', buffering=0) as f:
            self.assertIsInstance(f, io.RawIOBase)
            self.assertEqual(f.read().strip(), b"this is file A")