Python binascii 模块,b2a_qp() 实例源码

我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用binascii.b2a_qp()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def set_bytes_content(msg, data, maintype, subtype, cte='base64',
                     disposition=None, filename=None, cid=None,
                     params=None, headers=None):
    _prepare_set(msg, maintype, subtype, headers)
    if cte == 'base64':
        data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
    elif cte == 'quoted-printable':
        # XXX: quoprimime.body_encode won't encode newline characters in data,
        # so we can't use it.  This means max_line_length is ignored.  Another
        # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
        data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
        data = data.decode('ascii')
    elif cte == '7bit':
        # Make sure it really is only ASCII.  The early warning here seems
        # worth the overhead...if you care write your own content manager :).
        data.encode('ascii')
    elif cte in ('8bit', 'binary'):
        data = data.decode('ascii', 'surrogateescape')
    msg.set_payload(data)
    msg['Content-Transfer-Encoding'] = cte
    _finalize_set(msg, disposition, filename, cid, params)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _apply_content_transfer_encoding(self, stream):
        encoding = self.headers[CONTENT_TRANSFER_ENCODING].lower()
        if encoding == 'base64':
            buffer = bytearray()
            while True:
                if buffer:
                    div, mod = divmod(len(buffer), 3)
                    chunk, buffer = buffer[:div * 3], buffer[div * 3:]
                    if chunk:
                        yield base64.b64encode(chunk)
                chunk = next(stream, None)
                if not chunk:
                    if buffer:
                        yield base64.b64encode(buffer[:])
                    return
                buffer.extend(chunk)
        elif encoding == 'quoted-printable':
            for chunk in stream:
                yield binascii.b2a_qp(chunk)
        elif encoding == 'binary':
            yield from stream
        else:
            raise RuntimeError('unknown content transfer encoding: {}'
                               ''.format(encoding))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def set_bytes_content(msg, data, maintype, subtype, cte='base64',
                     disposition=None, filename=None, cid=None,
                     params=None, headers=None):
    _prepare_set(msg, maintype, subtype, headers)
    if cte == 'base64':
        data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
    elif cte == 'quoted-printable':
        # XXX: quoprimime.body_encode won't encode newline characters in data,
        # so we can't use it.  This means max_line_length is ignored.  Another
        # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
        data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
        data = data.decode('ascii')
    elif cte == '7bit':
        # Make sure it really is only ASCII.  The early warning here seems
        # worth the overhead...if you care write your own content manager :).
        data.encode('ascii')
    elif cte in ('8bit', 'binary'):
        data = data.decode('ascii', 'surrogateescape')
    msg.set_payload(data)
    msg['Content-Transfer-Encoding'] = cte
    _finalize_set(msg, disposition, filename, cid, params)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_qp(self):
        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp(b"", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
        self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
        self.assertEqual(binascii.a2b_qp(b"=="), b"=")
        self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
            b"=FF\r\n=FF\r\n=FF")
        self.assertEqual(
            binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
            b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")

        self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
                         b'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
        self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
        self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def encodestring(s, quotetabs=False, header=False):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs=quotetabs, header=header)
    from io import BytesIO
    infp = BytesIO(s)
    outfp = BytesIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_qp(self):
        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp(b"", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
        self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
        self.assertEqual(binascii.a2b_qp(b"=="), b"=")
        self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
            b"=FF\r\n=FF\r\n=FF")
        self.assertEqual(
            binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
            b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")

        self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
                         b'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
        self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
        self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_qp(self):
        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp("", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
        self.assertEqual(binascii.a2b_qp("= "), "= ")
        self.assertEqual(binascii.a2b_qp("=="), "=")
        self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp("\xff\r\n\xff\n\xff"),
            "=FF\r\n=FF\r\n=FF"
        )
        self.assertEqual(
            binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
            "0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
        )

        self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
        self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
        self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp('.'), '=2E')
        self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
        self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_qp(self):
        binascii.a2b_qp(data=b"", header=False)  # Keyword arguments allowed

        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp(b"", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")

        self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
        self.assertEqual(binascii.a2b_qp(b"=="), b"=")
        self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
            b"=FF\r\n=FF\r\n=FF")
        self.assertEqual(
            binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
            b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")

        self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
                         b'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
        self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
        self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def encodestring(s, quotetabs=False, header=False):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs=quotetabs, header=header)
    from io import BytesIO
    infp = BytesIO(s)
    outfp = BytesIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_qp(self):
        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp("", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
        self.assertEqual(binascii.a2b_qp("= "), "= ")
        self.assertEqual(binascii.a2b_qp("=="), "=")
        self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp("\xff\r\n\xff\n\xff"),
            "=FF\r\n=FF\r\n=FF"
        )
        self.assertEqual(
            binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
            "0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
        )

        self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
        self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
        self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp('.'), '=2E')
        self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
        self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:pmatic    作者:LarsMichelsen    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_qp(self):
        # A test for SF bug 534347 (segfaults without the proper fix)
        try:
            binascii.a2b_qp(b"", **{1:1})
        except TypeError:
            pass
        else:
            self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
        self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
        self.assertEqual(binascii.a2b_qp(b"=="), b"=")
        self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
        self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
        self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
        self.assertEqual(
            binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
            b"=FF\r\n=FF\r\n=FF")
        self.assertEqual(
            binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
            b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")

        self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
        self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
                         b'foo=09bar=09\n')

        self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
        self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
        self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def encodestring(s, quotetabs=False, header=False):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs=quotetabs, header=header)
    from io import BytesIO
    infp = BytesIO(s)
    outfp = BytesIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def encodestring(s, quotetabs = 0, header = 0):
    if b2a_qp is not None:
        return b2a_qp(s, quotetabs = quotetabs, header = header)
    from cStringIO import StringIO
    infp = StringIO(s)
    outfp = StringIO()
    encode(infp, outfp, quotetabs, header)
    return outfp.getvalue()
项目:dmarc-tester    作者:usnistgov    | 项目源码 | 文件源码
def printchar(char):
  """Useful debugging function for milter developers."""
  print ('char: %s [qp=%s][hex=%s][base64=%s]' %
         (char, binascii.b2a_qp(char), binascii.b2a_hex(char),
          binascii.b2a_base64(char)))
#end def printchar(char).
项目:dmarc-tester    作者:usnistgov    | 项目源码 | 文件源码
def __send_response(self, response):
      """Send data down the milter socket.

      Args:
        response: The data to send.
      """
      #logging.debug('  >>> %s', binascii.b2a_qp(response[0]))
      self.push(struct.pack('!I', len(response)))
      self.push(response)
项目:dmarc-tester    作者:usnistgov    | 项目源码 | 文件源码
def read_milter_data(self):

      """ Callback from asynchat once we have read the milter packet length
      worth of bytes on the socket and it is accumulated in our input buffer
      (which is the milter command + data to send to the dispatcher). """

      import binascii
      inbuff = "".join(self.__input)
      self.__input = []
      if not inbuff.startswith("B"):
        if self.chmilt == "Receiver":
          logging.debug(' read: %s %s', self.chmilt, binascii.b2a_qp(inbuff))
      try:
        response = self.__milter_dispatcher.Dispatch(inbuff)
        #logging.debug(' >>> resp: [%s]', response)
        if type(response) == list:
          for r in response:
            self.__send_response(r)
        elif response:
          self.__send_response(response)

        # rinse and repeat :)
        self.found_terminator = self.read_packetlen
        self.set_terminator(MILTER_LEN_BYTES)
      except kimpf.PpyMilterCloseConnection, e:
        self.close()


  #end class ConnectionHandler(asynchat.async_chat).

#end class AsyncPpyMilterServer(asyncore.dispatcher).

#SN July 13, 2015
#Bring configuration processing inside here, instead of in commons.
项目:CTF-chal-code    作者:zwhubuntu    | 项目源码 | 文件源码
def  base64code(s,d):
    global b64str
    global f
    if(d==len(b64str)):
        f.write(binascii.b2a_qp(base64.b64decode(s))+'\n')
    else:
        base64code(s+b64str[d],d+1)
        if b64str[d].isalpha():  
           base64code(s+b64str[d].lower(),d+1)
项目:CTF-chal-code    作者:zwhubuntu    | 项目源码 | 文件源码
def  base64code(s,d):
    global b64str
    global f
    if(d==len(b64str)):
        f.write(binascii.b2a_qp(base64.b64decode(s))+'\n')
    else:
        base64code(s+b64str[d],d+1)
        if b64str[d].isalpha():  
            base64code(s+b64str[d].lower(),d+1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def encode(input, output, quotetabs, header=False):
    """Read 'input', apply quoted-printable encoding, and write to 'output'.

    'input' and 'output' are binary file objects. The 'quotetabs' flag
    indicates whether embedded tabs and spaces should be quoted. Note that
    line-ending tabs and spaces are always encoded, as per RFC 1521.
    The 'header' flag indicates whether we are encoding spaces as _ as per RFC
    1522."""

    if b2a_qp is not None:
        data = input.read()
        odata = b2a_qp(data, quotetabs=quotetabs, header=header)
        output.write(odata)
        return

    def write(s, output=output, lineEnd=b'\n'):
        # RFC 1521 requires that the line ending in a space or tab must have
        # that trailing character encoded.
        if s and s[-1:] in b' \t':
            output.write(s[:-1] + quote(s[-1:]) + lineEnd)
        elif s == b'.':
            output.write(quote(s) + lineEnd)
        else:
            output.write(s + lineEnd)

    prevline = None
    while 1:
        line = input.readline()
        if not line:
            break
        outline = []
        # Strip off any readline induced trailing newline
        stripped = b''
        if line[-1:] == b'\n':
            line = line[:-1]
            stripped = b'\n'
        # Calculate the un-length-limited encoded line
        for c in line:
            c = bytes((c,))
            if needsquoting(c, quotetabs, header):
                c = quote(c)
            if header and c == b' ':
                outline.append(b'_')
            else:
                outline.append(c)
        # First, write out the previous line
        if prevline is not None:
            write(prevline)
        # Now see if we need any soft line breaks because of RFC-imposed
        # length limitations.  Then do the thisline->prevline dance.
        thisline = EMPTYSTRING.join(outline)
        while len(thisline) > MAXLINESIZE:
            # Don't forget to include the soft line break `=' sign in the
            # length calculation!
            write(thisline[:MAXLINESIZE-1], lineEnd=b'=\n')
            thisline = thisline[MAXLINESIZE-1:]
        # Write out the current line
        prevline = thisline
    # Write out the last line, without a trailing newline
    if prevline is not None:
        write(prevline, lineEnd=stripped)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def encode(input, output, quotetabs, header=False):
    """Read 'input', apply quoted-printable encoding, and write to 'output'.

    'input' and 'output' are binary file objects. The 'quotetabs' flag
    indicates whether embedded tabs and spaces should be quoted. Note that
    line-ending tabs and spaces are always encoded, as per RFC 1521.
    The 'header' flag indicates whether we are encoding spaces as _ as per RFC
    1522."""

    if b2a_qp is not None:
        data = input.read()
        odata = b2a_qp(data, quotetabs=quotetabs, header=header)
        output.write(odata)
        return

    def write(s, output=output, lineEnd=b'\n'):
        # RFC 1521 requires that the line ending in a space or tab must have
        # that trailing character encoded.
        if s and s[-1:] in b' \t':
            output.write(s[:-1] + quote(s[-1:]) + lineEnd)
        elif s == b'.':
            output.write(quote(s) + lineEnd)
        else:
            output.write(s + lineEnd)

    prevline = None
    while 1:
        line = input.readline()
        if not line:
            break
        outline = []
        # Strip off any readline induced trailing newline
        stripped = b''
        if line[-1:] == b'\n':
            line = line[:-1]
            stripped = b'\n'
        # Calculate the un-length-limited encoded line
        for c in line:
            c = bytes((c,))
            if needsquoting(c, quotetabs, header):
                c = quote(c)
            if header and c == b' ':
                outline.append(b'_')
            else:
                outline.append(c)
        # First, write out the previous line
        if prevline is not None:
            write(prevline)
        # Now see if we need any soft line breaks because of RFC-imposed
        # length limitations.  Then do the thisline->prevline dance.
        thisline = EMPTYSTRING.join(outline)
        while len(thisline) > MAXLINESIZE:
            # Don't forget to include the soft line break `=' sign in the
            # length calculation!
            write(thisline[:MAXLINESIZE-1], lineEnd=b'=\n')
            thisline = thisline[MAXLINESIZE-1:]
        # Write out the current line
        prevline = thisline
    # Write out the last line, without a trailing newline
    if prevline is not None:
        write(prevline, lineEnd=stripped)