Python string 模块,encode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.encode()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def serve_amfrpc(self, version=0):
        try:
            import pyamf
            import pyamf.remoting.gateway
        except:
            return "pyamf not installed or not in Python sys.path"
        request = current.request
        response = current.response
        if version == 3:
            services = self.amfrpc3_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            pyamf_request = pyamf.remoting.decode(request.body)
        else:
            services = self.amfrpc_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            context = pyamf.get_context(pyamf.AMF0)
            pyamf_request = pyamf.remoting.decode(request.body, context)
        pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
        for name, message in pyamf_request:
            pyamf_response[name] = base_gateway.getProcessor(message)(message)
        response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
        if version == 3:
            return pyamf.remoting.encode(pyamf_response).getvalue()
        else:
            return pyamf.remoting.encode(pyamf_response, context).getvalue()
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def serve_amfrpc(self, version=0):
        try:
            import pyamf
            import pyamf.remoting.gateway
        except:
            return "pyamf not installed or not in Python sys.path"
        request = current.request
        response = current.response
        if version == 3:
            services = self.amfrpc3_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            pyamf_request = pyamf.remoting.decode(request.body)
        else:
            services = self.amfrpc_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            context = pyamf.get_context(pyamf.AMF0)
            pyamf_request = pyamf.remoting.decode(request.body, context)
        pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
        for name, message in pyamf_request:
            pyamf_response[name] = base_gateway.getProcessor(message)(message)
        response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
        if version == 3:
            return pyamf.remoting.encode(pyamf_response).getvalue()
        else:
            return pyamf.remoting.encode(pyamf_response, context).getvalue()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:cetusshop    作者:icetusorg    | 项目源码 | 文件源码
def sign(self):
        logger.debug('Enter sign...')


        logger.debug('self.ret:%s' % (self.ret))

        #string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in sorted(self.ret)])
        #string = 'appid=%s&body=%s&device_info=%s&mch_id=%s&nonce_str=%s' % (self.ret['appid'],self.ret['body'],self.ret['device_info'],self.ret['mch_id'],self.ret['nonce_str'])
        string = 'nima'
        logger.debug('string:' + str(string))
        logger.debug('string:%s' % (string))


        mch_key = System_Config.objects.get(name='wechat_mch_key').val
        logger.debug('mch_key:%s' %(mch_key))
        #stringSignTemp=  '%s&key=%s' % (string,mch_key)
        #logger.debug('stringSignTemp:%s' % (stringSignTemp))

        #signature = hashlib.sha1(string.encode('utf8')).hexdigest()
        #signature = hashlib.md5(stringSignTemp.encode('utf8')).hexdigest().upper()
        #self.other_parameters['sign'] = signature
        #logger.debug('signature:%s' % (signature))
        #return signature
项目:kodi_plugins    作者:pitpompej    | 项目源码 | 文件源码
def playTrack(asin):
    content = trackPostUnicodeGetHLSPage('https://music.amazon.de/dmls/', asin)
    temp_file_path = addonUserDataFolder
    if forceDVDPlayer:
        temp_file_path += "/temp.mp4"
    else:
        temp_file_path += "/temp.m3u8"
    if xbmcvfs.exists(temp_file_path):
        xbmcvfs.delete(temp_file_path)
    m3u_temp_file = xbmcvfs.File(temp_file_path, 'w')
    manifest_match = re.compile('manifest":"(.+?)"',re.DOTALL).findall(content)
    if manifest_match:
        m3u_string = manifest_match[0]
        m3u_string = m3u_string.replace("\\n", os.linesep)
        m3u_temp_file.write(m3u_string.encode("ascii"))
    m3u_temp_file.close()
    play_item = xbmcgui.ListItem(path=temp_file_path)
    play_item = setPlayItemInfo(play_item)
    xbmcplugin.setResolvedUrl(pluginhandle, True, listitem=play_item)
项目:kodi_plugins    作者:pitpompej    | 项目源码 | 文件源码
def search(type):
    keyboard = xbmc.Keyboard('', translation(30015))
    keyboard.doModal()
    if keyboard.isConfirmed() and keyboard.getText():
        search_string = unicode(keyboard.getText(), "utf-8").replace(" ", "+")
        search_string = urllib.quote_plus(search_string.encode("utf8"))
        if siteVersion=="de":
            if type=="albums":
                listAlbums(urlMain+"/s?rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180848031&keywords="+search_string+"&ie=UTF8")
            elif type=="songs":
                listSearchedSongs(urlMain+"/s/ref=sr_nr_p_n_format_browse-bi_2?fst=as%3Aoff&rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180849031&bbn=5686557031&keywords="+search_string+"&ie=UTF8")
#        elif siteVersion=="com":
#            if type=="movies":
#                listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D7613704011&field-keywords="+search_string)
#            elif type=="tv":
#                listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D2858778011&field-keywords="+search_string)
#        elif siteVersion=="co.uk":
#            if type=="movies":
#                listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356010031&field-keywords="+search_string)
#            elif type=="tv":
#                listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356011031&field-keywords="+search_string)
项目:kodi_plugins    作者:pitpompej    | 项目源码 | 文件源码
def __unquote_to_bytes(string):
    string = string.encode('utf-8')
    bits = string.split(b'%')
    if len(bits) == 1:
        return string
    res = [bits[0]]
    append = res.append
    # Delay the initialization of the table to not waste memory
    # if the function is never called
    global _hextobyte
    if _hextobyte is None:
        _hextobyte = dict((a+b, chr(int(a+b,16))) for a in _hexdig for b in _hexdig)
    for item in bits[1:]:
        try:
            append(_hextobyte[item[:2]])
            append(item[2:])
        except KeyError:
            append(b'%')
            append(item)
    return b''.join(res)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def serve_amfrpc(self, version=0):
        try:
            import pyamf
            import pyamf.remoting.gateway
        except:
            return "pyamf not installed or not in Python sys.path"
        request = current.request
        response = current.response
        if version == 3:
            services = self.amfrpc3_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            pyamf_request = pyamf.remoting.decode(request.body)
        else:
            services = self.amfrpc_procedures
            base_gateway = pyamf.remoting.gateway.BaseGateway(services)
            context = pyamf.get_context(pyamf.AMF0)
            pyamf_request = pyamf.remoting.decode(request.body, context)
        pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
        for name, message in pyamf_request:
            pyamf_response[name] = base_gateway.getProcessor(message)(message)
        response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
        if version == 3:
            return pyamf.remoting.encode(pyamf_response).getvalue()
        else:
            return pyamf.remoting.encode(pyamf_response, context).getvalue()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:empyrion-python-api    作者:huhlig    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:pmatic    作者:LarsMichelsen    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:kind2anki    作者:prz3m    | 项目源码 | 文件源码
def send_content(self, connection, request_body):
        connection.putheader("Content-Type", "text/xml")

        #optionally encode the request
        if (self.encode_threshold is not None and
            self.encode_threshold < len(request_body) and
            gzip):
            connection.putheader("Content-Encoding", "gzip")
            request_body = gzip_encode(request_body)

        connection.putheader("Content-Length", str(len(request_body)))
        connection.endheaders(request_body)

    ##
    # Parse response.
    #
    # @param file Stream.
    # @return Response tuple and target method.
项目:rl-bareos    作者:kjetilho    | 项目源码 | 文件源码
def deunicodise(string, encoding = None, errors = "replace"):
    """
    Convert unicode 'string' to <type str>, by default replacing
    all invalid characters with '?' or raise an exception.
    """

    if not encoding:
        encoding = Config.Config().encoding

    if type(string) != unicode:
        return str(string)
    debug("DeUnicodising %r using %s" % (string, encoding))
    try:
        return string.encode(encoding, errors)
    except UnicodeEncodeError:
        raise UnicodeEncodeError("Conversion from unicode failed: %r" % string)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _stringify(string):
        # convert to 7-bit ascii if possible
        try:
            return string.encode("ascii")
        except UnicodeError:
            return string
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def encode(self, out):
            out.write("<value><boolean>%d</boolean></value>\n" % self.value)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def encode(self, out):
        out.write("<value><dateTime.iso8601>")
        out.write(self.value)
        out.write("</dateTime.iso8601></value>\n")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def encode(self, out):
        out.write("<value><base64>\n")
        base64.encode(StringIO.StringIO(self.data), out)
        out.write("</base64></value>\n")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def dump_unicode(self, value, write, escape=escape):
            value = value.encode(self.encoding)
            write("<value><string>")
            write(escape(value))
            write("</string></value>\n")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def dump_instance(self, value, write):
        # check for special wrappers
        if value.__class__ in WRAPPERS:
            self.write = write
            value.encode(self)
            del self.write
        else:
            # store instance attributes as a struct (really?)
            self.dump_struct(value.__dict__, write)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _stringify(string):
        # convert to 7-bit ascii if possible
        try:
            return string.encode("ascii")
        except UnicodeError:
            return string
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def encode(self, out):
            out.write("<value><boolean>%d</boolean></value>\n" % self.value)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def encode(self, out):
        out.write("<value><dateTime.iso8601>")
        out.write(self.value)
        out.write("</dateTime.iso8601></value>\n")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def encode(self, out):
        out.write("<value><base64>\n")
        base64.encode(StringIO.StringIO(self.data), out)
        out.write("</base64></value>\n")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def dump_unicode(self, value, write, escape=escape):
            value = value.encode(self.encoding)
            write("<value><string>")
            write(escape(value))
            write("</string></value>\n")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def dump_instance(self, value, write):
        # check for special wrappers
        if value.__class__ in WRAPPERS:
            self.write = write
            value.encode(self)
            del self.write
        else:
            # store instance attributes as a struct (really?)
            self.dump_struct(value.__dict__, write)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, uri, transport=None, encoding=None, verbose=0,
                 allow_none=0, use_datetime=0, context=None):
        # establish a "logical" server connection

        if isinstance(uri, unicode):
            uri = uri.encode('ISO-8859-1')

        # get the url
        import urllib
        type, uri = urllib.splittype(uri)
        if type not in ("http", "https"):
            raise IOError, "unsupported XML-RPC protocol"
        self.__host, self.__handler = urllib.splithost(uri)
        if not self.__handler:
            self.__handler = "/RPC2"

        if transport is None:
            if type == "https":
                transport = SafeTransport(use_datetime=use_datetime, context=context)
            else:
                transport = Transport(use_datetime=use_datetime)
        self.__transport = transport

        self.__encoding = encoding
        self.__verbose = verbose
        self.__allow_none = allow_none
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def wx_signature(data):
    string = '&'.join(['%s=%s' % (key.lower(), data[key]) for key in sorted(data) if data[key] is not None and data[key] is not ''])
    return hashlib.sha1(string.encode('utf-8')).hexdigest()
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def wx_sign_for_pay(params):
    content = '&'.join(['%s=%s' % (key, params[key]) for key in sorted(params) if params[key] is not None and params[key] is not ''])
    content += '&key=' + settings.WEIXIN_KEY
    return hashlib.md5(content.encode('utf-8')).hexdigest().upper()
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def _stringify(string):
        # convert to 7-bit ascii if possible
        try:
            return string.encode("ascii")
        except UnicodeError:
            return string
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def encode(self, out):
            out.write("<value><boolean>%d</boolean></value>\n" % self.value)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def __init__(
            self,
            payload,
            filename=None,
            content_id=None,
            content_type=None,
                encoding='utf-8'):
            if isinstance(payload, str):
                if filename is None:
                    filename = os.path.basename(payload)
                payload = read_file(payload, 'rb')
            else:
                if filename is None:
                    raise Exception('Missing attachment name')
                payload = payload.read()
            filename = filename.encode(encoding)
            if content_type is None:
                content_type = contenttype(filename)
            self.my_filename = filename
            self.my_payload = payload
            MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
            self.set_payload(payload)
            self['Content-Disposition'] = 'attachment; filename="%s"' % filename
            if content_id is not None:
                self['Content-Id'] = '<%s>' % content_id.encode(encoding)
            Encoders.encode_base64(self)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def jwt_b64e(string):
        if isinstance(string, unicode):
            string = string.encode('utf-8', 'strict')
        return base64.urlsafe_b64encode(string).strip(b'=')
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def jwt_b64d(string):
        """base64 decodes a single bytestring (and is tolerant to getting
        called with a unicode string).
        The result is also a bytestring.
        """
        if isinstance(string, unicode):
            string = string.encode('ascii', 'ignore')
        return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def generate_token(self, payload):
        secret = self.secret_key
        if self.salt:
            if callable(self.salt):
                secret = "%s$%s" % (secret, self.salt(payload))
            else:
                secret = "%s$%s" % (secret, self.salt)
            if isinstance(secret, unicode):
                secret = secret.encode('ascii', 'ignore')
        b64h = self.cached_b64h
        b64p = self.jwt_b64e(serializers.json(payload))
        jbody = b64h + '.' + b64p
        mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
        jsign = self.jwt_b64e(mauth.digest())
        return jbody + '.' + jsign
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def load_token(self, token):
        if isinstance(token, unicode):
            token = token.encode('utf-8', 'strict')
        body, sig = token.rsplit('.', 1)
        b64h, b64b = body.split('.', 1)
        if b64h != self.cached_b64h:
            # header not the same
            raise HTTP(400, u'Invalid JWT Header')
        secret = self.secret_key
        tokend = serializers.loads_json(self.jwt_b64d(b64b))
        if self.salt:
            if callable(self.salt):
                secret = "%s$%s" % (secret, self.salt(tokend))
            else:
                secret = "%s$%s" % (secret, self.salt)
            if isinstance(secret, unicode):
                secret = secret.encode('ascii', 'ignore')
        if not self.verify_signature(body, sig, secret):
            # signature verification failed
            raise HTTP(400, u'Token signature is invalid')
        if self.verify_expiration:
            now = time.mktime(datetime.datetime.utcnow().timetuple())
            if tokend['exp'] + self.leeway < now:
                raise HTTP(400, u'Token is expired')
        if callable(self.before_authorization):
            self.before_authorization(tokend)
        return tokend
项目:Simple-Search-Engine    作者:HoweZZH    | 项目源码 | 文件源码
def getHash(self, string):
        '''return a unique and stable hash value from hashing the website url'''
        m = hashlib.sha1(string.encode('utf-8'))
        return m.hexdigest()
项目:Simple-Search-Engine    作者:HoweZZH    | 项目源码 | 文件源码
def getHash(url):
    '''return a unique and stable hash value from hashing the website url'''
    m = hashlib.sha1(url.encode('utf-8'))
    return m.hexdigest()
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def sign(self):
        string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in
                           sorted(self.ret)])
        self.ret['signature'] = hashlib.sha1(string.encode('utf-8')).hexdigest()
        return self.ret
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def __init__(
            self,
            payload,
            filename=None,
            content_id=None,
            content_type=None,
                encoding='utf-8'):
            if isinstance(payload, str):
                if filename is None:
                    filename = os.path.basename(payload)
                payload = read_file(payload, 'rb')
            else:
                if filename is None:
                    raise Exception('Missing attachment name')
                payload = payload.read()
            filename = filename.encode(encoding)
            if content_type is None:
                content_type = contenttype(filename)
            self.my_filename = filename
            self.my_payload = payload
            MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
            self.set_payload(payload)
            self['Content-Disposition'] = 'attachment; filename="%s"' % filename
            if content_id is not None:
                self['Content-Id'] = '<%s>' % content_id.encode(encoding)
            Encoders.encode_base64(self)
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def jwt_b64e(string):
        if isinstance(string, unicode):
            string = string.encode('utf-8', 'strict')
        return base64.urlsafe_b64encode(string).strip(b'=')
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def jwt_b64d(string):
        """base64 decodes a single bytestring (and is tolerant to getting
        called with a unicode string).
        The result is also a bytestring.
        """
        if isinstance(string, unicode):
            string = string.encode('ascii', 'ignore')
        return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def generate_token(self, payload):
        secret = self.secret_key
        if self.salt:
            if callable(self.salt):
                secret = "%s$%s" % (secret, self.salt(payload))
            else:
                secret = "%s$%s" % (secret, self.salt)
            if isinstance(secret, unicode):
                secret = secret.encode('ascii', 'ignore')
        b64h = self.cached_b64h
        b64p = self.jwt_b64e(serializers.json(payload))
        jbody = b64h + '.' + b64p
        mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
        jsign = self.jwt_b64e(mauth.digest())
        return jbody + '.' + jsign
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def load_token(self, token):
        if isinstance(token, unicode):
            token = token.encode('utf-8', 'strict')
        body, sig = token.rsplit('.', 1)
        b64h, b64b = body.split('.', 1)
        if b64h != self.cached_b64h:
            # header not the same
            raise HTTP(400, u'Invalid JWT Header')
        secret = self.secret_key
        tokend = serializers.loads_json(self.jwt_b64d(b64b))
        if self.salt:
            if callable(self.salt):
                secret = "%s$%s" % (secret, self.salt(tokend))
            else:
                secret = "%s$%s" % (secret, self.salt)
            if isinstance(secret, unicode):
                secret = secret.encode('ascii', 'ignore')
        if not self.verify_signature(body, sig, secret):
            # signature verification failed
            raise HTTP(400, u'Token signature is invalid')
        if self.verify_expiration:
            now = time.mktime(datetime.datetime.utcnow().timetuple())
            if tokend['exp'] + self.leeway < now:
                raise HTTP(400, u'Token is expired')
        if callable(self.before_authorization):
            self.before_authorization(tokend)
        return tokend
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _stringify(string):
        # convert to 7-bit ascii if possible
        try:
            return string.encode("ascii")
        except UnicodeError:
            return string
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def encode(self, out):
            out.write("<value><boolean>%d</boolean></value>\n" % self.value)