Python base64 模块,encodebytes() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用base64.encodebytes()

项目:jgscm    作者:src-d    | 项目源码 | 文件源码
def _read_file(self, blob, format):
        """Reads a non-notebook file.

        blob: instance of :class:`google.cloud.storage.Blob`.
        format:
          If "text", the contents will be decoded as UTF-8.
          If "base64", the raw bytes contents will be encoded as base64.
          If not specified, try to decode as UTF-8, and fall back to base64
        """
        bcontent = blob.download_as_string()

        if format is None or format == "text":
            # Try to interpret as unicode if format is unknown or if unicode
            # was explicitly requested.
            try:
                return bcontent.decode("utf8"), "text"
            except UnicodeError:
                if format == "text":
                    raise web.HTTPError(
                        400, "%s is not UTF-8 encoded" %
                             self._get_blob_path(blob),
                        reason="bad format",
                    )
        return base64.encodebytes(bcontent).decode("ascii"), "base64"
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:Isa-Framework    作者:c0hb1rd    | 项目源码 | 文件源码
def storage(self, session_id):
        # ?? Session ?????????????? Session ID
        session_path = os.path.join(self.__storage_path__, session_id)

        # ????? Session ???????????????
        if self.__storage_path__ is not None:
            data = {}
            for key, v in self.__session_map__[session_id].items():
                if key not in self.ignore_key:
                    data[key] = v
            with open(session_path, 'wb') as f:
                # ????????????
                content = json.dumps(data)

                # ?? base64 ??????????????????????????
                f.write(base64.encodebytes(content.encode()))

    # ???????
项目:tk-photoshopcc    作者:shotgunsoftware    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:siphon-cli    作者:getsiphon    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:ieml    作者:IEMLdev    | 项目源码 | 文件源码
def download_comments_terms(username, password):
    userpass = '%s:%s' % (username, password)
    auth_encoded = base64.encodebytes(userpass.encode('ascii'))[:-1]

    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE

    req = Request(TERM_API_COMMENTS)
    req.add_header('Authorization', 'Basic %s' %  str(auth_encoded, 'utf-8'))

    response = urlopen(req, context=ctx)
    str_response = response.read().decode('utf-8')
    j = json.loads(str_response)

    return {str(script(_t["IEML"]["value"])) : {
                            "comment": _t["commentaire_sur_terme"],
                            "drupal_nid": _t["Nid"]
    } for _t in j}
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:cmdchallenge-site    作者:jarv    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:px    作者:genotrance    | 项目源码 | 文件源码
def get_response_sspi(self, challenge=None):
        dprint("pywin32 SSPI")
        if challenge:
            try:
                challenge = base64.decodebytes(challenge.encode("utf-8"))
            except:
                challenge = base64.decodestring(challenge)
        output_buffer = None
        try:
            error_msg, output_buffer = self.sspi_client.authorize(challenge)
        except pywintypes.error:
            traceback.print_exc(file=sys.stdout)
            return None

        response_msg = output_buffer[0].Buffer
        try:
            response_msg = base64.encodebytes(response_msg.encode("utf-8"))
        except:
            response_msg = base64.encodestring(response_msg)
        response_msg = response_msg.decode("utf-8").replace('\012', '')
        return response_msg
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:cryptocurrency-trader    作者:TobCar    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def GetAuthHeader(self, http_method, http_url):
        """Generates the Authorization header.

        The form of the secure AuthSub Authorization header is
        Authorization: AuthSub token="token" sigalg="sigalg" data="data" sig="sig"
        and  data represents a string in the form
        data = http_method http_url timestamp nonce

        Args:
          http_method: string HTTP method i.e. operation e.g. GET, POST, PUT, etc.
          http_url: string or atom.url.Url HTTP URL to which request is made.

        Returns:
          dict Header to be sent with every subsequent request after authentication.
        """
        timestamp = int(math.floor(time.time()))
        nonce = '%lu' % random.randrange(1, 2 ** 64)
        data = '%s %s %d %s' % (http_method, str(http_url), timestamp, nonce)
        # noinspection PyDeprecation
        sig = encodebytes(str(self.rsa_key.hashAndSign(data))).rstrip()
        header = {'Authorization': '%s"%s" data="%s" sig="%s" sigalg="rsa-sha1"' %
                                   (AUTHSUB_AUTH_LABEL, self.token_string, data, sig)}
        return header
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib.parse.splituser(host)

        if auth:
            auth = urllib.parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:Flask-SocketIO    作者:cutedogspark    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:UMOG    作者:hsab    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:scripts    作者:vulnersCom    | 项目源码 | 文件源码
def _base_set(self, mode, key, value, timeout=None):

        timeout = timeout or self.default_timeout

        # Only UTC here for Mongo auto-purge

        now = datetime.utcnow()
        expires = now + timedelta(seconds=timeout)

        #

        pickled = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
        encoded = base64.encodebytes(pickled).strip()

        if mode == 'set':
            # Set sets new data. And there is no matter, that key exists
            self._coll.update({'key':key} ,{'key':key, 'data': encoded, 'expires': expires}, upsert = True)

        elif mode == 'add':
            if self._coll.find_one({'key': key}):
                return False
            self._coll.insert({'key': key, 'data': encoded, 'expires': expires})

        return True
项目:smileybot    作者:sillylyn    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:blackmamba    作者:zrzka    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib.parse.splituser(host)

        if auth:
            auth = urllib.parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:beepboop    作者:nicolehe    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:bitcoinelasticsearch    作者:currentsea    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" + status)

    return sock
项目:bitcoinelasticsearch    作者:currentsea    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" + status)

    return sock
项目:deb-python-websocket-client    作者:openstack    | 项目源码 | 文件源码
def _tunnel(sock, host, port, auth):
    debug("Connecting proxy...")
    connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
    # TODO: support digest auth.
    if auth and auth[0]:
        auth_str = auth[0]
        if auth[1]:
            auth_str += ":" + auth[1]
        encoded_str = base64encode(auth_str.encode()).strip().decode()
        connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
    connect_header += "\r\n"
    dump("request header", connect_header)

    send(sock, connect_header)

    try:
        status, resp_headers = read_headers(sock)
    except Exception as e:
        raise WebSocketProxyException(str(e))

    if status != 200:
        raise WebSocketProxyException(
            "failed CONNECT via proxy status: %r" % status)

    return sock
项目:hackathon    作者:vertica    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def get_host_info(self, host):

        x509 = {}
        if isinstance(host, tuple):
            host, x509 = host

        auth, host = urllib_parse.splituser(host)

        if auth:
            auth = urllib_parse.unquote_to_bytes(auth)
            auth = base64.encodebytes(auth).decode("utf-8")
            auth = "".join(auth.split()) # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = []

        return host, extra_headers, x509

    ##
    # Connect to server.
    #
    # @param host Target host.
    # @return An HTTPConnection object
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def write(self, fd, bytes):
        """
        Write a block of bytes to an open file descriptor (that is open with one of the writing modes

        :param fd: file descriptor
        :param bytes: bytes block to write
        :return:

        :note: don't overkill the node with large byte chunks, also for large file upload check the upload method.
        """
        args = {
            'fd': fd,
            'block': base64.encodebytes(bytes).decode(),
        }

        return self._client.json('filesystem.write', args)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def base64_encode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.encodebytes(input), len(input))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def encode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.encodebytes(input)
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def encrypt(self, source_obj):
        encrypter = self._cipher.encryptor()
        source = json.dumps(source_obj)
        source = source.encode(self._encoding)
        need_to_padding = 16 - (len(source) % 16)
        if need_to_padding > 0:
            source = source + b'\x00' * need_to_padding
        enc_data = encrypter.update(source) + encrypter.finalize()
        b64_enc_data = base64.encodebytes(enc_data)
        return urllib.parse.quote(b64_enc_data)
项目:noscrapy    作者:hwms    | 项目源码 | 文件源码
def download_image_base64(self, url):
        response = requests.get(url)
        return base64.encodebytes(response.content)
项目:petronia    作者:groboclown    | 项目源码 | 文件源码
def represent_binary(self, data):
        if hasattr(base64, 'encodebytes'):
            data = base64.encodebytes(data).decode('ascii')
        else:
            data = base64.encodestring(data).decode('ascii')
        return self.represent_scalar('tag:yaml.org,2002:binary', data, style='|')
项目:ripe-atlas-scripts    作者:farrokhi    | 项目源码 | 文件源码
def base64_encodebytes(data):
    if hasattr(base64, "encodebytes"):
        return base64.encodebytes(data)
    return base64.encodestring(data)
项目:jgscm    作者:src-d    | 项目源码 | 文件源码
def test_save_file(self):
        self.contents_manager.save({
            "type": "file",
            "content": "blah-blah-blah",
            "format": "text"
        }, self.path("test.txt"))
        bucket = self.bucket
        blob = bucket.blob("test.txt")
        self.assertTrue(blob.exists())
        try:
            self.assertEqual(blob.download_as_string(), b"blah-blah-blah")
        finally:
            blob.delete()

        obj = {"one": 1, "two": [2, 3]}
        self.contents_manager.save({
            "type": "file",
            "content": base64.encodebytes(pickle.dumps(obj)).decode("ascii"),
            "format": "base64"
        }, self.path("test.pickle"))
        bucket = self.bucket
        blob = bucket.blob("test.pickle")
        self.assertTrue(blob.exists())
        try:
            self.assertEqual(blob.download_as_string(), pickle.dumps(obj))
        finally:
            blob.delete()
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def sign_sha1(body, pri_key):
    '''
    sign body with pri_key by hash SHA1, return base64 bytes

    body: binary
    pur_key: binary
    '''
    rsa_pri = RSA.importKey(pri_key)
    pk = PKCS1_v1_5.new(rsa_pri)
    h = SHA.new(body)
    cbs = pk.sign(h)
    return base64.encodebytes(cbs)
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def base64_encode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.encodebytes(input), len(input))
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def encode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.encodebytes(input)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def encode_image(cls, bpy_image):
        import tempfile
        import base64
        with tempfile.TemporaryDirectory() as directory:
            filename = directory + "/i.png"
            bpy_image.filepath_raw = filename
            bpy_image.save()
            return base64.encodebytes(open(filename, "rb").read()).decode('ascii')
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _validate(headers, key, subprotocols):
    subproto = None
    for k, v in _HEADERS_TO_CHECK.items():
        r = headers.get(k, None)
        if not r:
            return False, None
        r = r.lower()
        if v != r:
            return False, None

    if subprotocols:
        subproto = headers.get("sec-websocket-protocol", None).lower()
        if not subproto or subproto not in [s.lower() for s in subprotocols]:
            error("Invalid subprotocol: " + str(subprotocols))
            return False, None

    result = headers.get("sec-websocket-accept", None)
    if not result:
        return False, None
    result = result.lower()

    if isinstance(result, six.text_type):
        result = result.encode('utf-8')

    value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
    hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
    success = compare_digest(hashed, result)

    if success:
        return True, subproto
    else:
        return False, None
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _create_sec_websocket_key():
    randomness = os.urandom(16)
    return base64encode(randomness).decode('utf-8').strip()
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _validate(headers, key, subprotocols):
    subproto = None
    for k, v in _HEADERS_TO_CHECK.items():
        r = headers.get(k, None)
        if not r:
            return False, None
        r = r.lower()
        if v != r:
            return False, None

    if subprotocols:
        subproto = headers.get("sec-websocket-protocol", None).lower()
        if not subproto or subproto not in [s.lower() for s in subprotocols]:
            error("Invalid subprotocol: " + str(subprotocols))
            return False, None

    result = headers.get("sec-websocket-accept", None)
    if not result:
        return False, None
    result = result.lower()

    if isinstance(result, six.text_type):
        result = result.encode('utf-8')

    value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
    hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
    success = compare_digest(hashed, result)

    if success:
        return True, subproto
    else:
        return False, None
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _create_sec_websocket_key():
    randomness = os.urandom(16)
    return base64encode(randomness).decode('utf-8').strip()
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def base64_encodestring(s):
    if six.PY2:
        return base64.encodestring(s)
    else:
        if isinstance(s, str):
            s = s.encode('utf8')
        return base64.encodebytes(s).decode('utf8')
项目:twittershade    作者:nicolavic98    | 项目源码 | 文件源码
def generate_headers(self):
        return {b"Authorization": b"Basic " + encodebytes(
                ("%s:%s" %(self.username, self.password))
                .encode('utf8')).strip(b'\n')
                }