Python base64 模块,standard_b64decode() 实例源码

我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用base64.standard_b64decode()

项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def main(cb, args):
    powershells = cb.process_search_iter('process_name:powershell.exe')
    for s in powershells:
        if s['cmdline']:
            encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
            if encoded != None:
                i = encoded.group(1)
                if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
                    trailingBytes = len(i) % 4
                    if trailingBytes == 3:
                        i = i + '='
                    elif trailingBytes == 2:
                        i = i + '=='
                decodedCommand = base64.standard_b64decode(i)
                try:
                    a = decodedCommand.encode('ascii', 'replace')
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
                    args['server_url'], s['id'], a.replace('\0', ""))
                except UnicodeError:
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
                    args['server_url'], s['id'], s['cmdline'])
                    pass
项目:pogom-linux    作者:PokeHunterProject    | 项目源码 | 文件源码
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
项目:PyDoc    作者:shaun-h    | 项目源码 | 文件源码
def __getDownloadedUserContributed(self):
        dbManager = DBManager.DBManager()
        t = dbManager.InstalledDocsetsByType('usercontributed')
        ds = []
        for d in t:
            aa = UserContributed()
            aa.name = d[1]
            aa.id = d[0]
            aa.path = os.path.join(os.path.abspath('.'),d[2])
            #aa.image = self.__getLocalIcon(d[2])
            imgData = str(d[4])
            if not imgData == '':
                imgdata = base64.standard_b64decode(imgData)
                aa.image = ui.Image.from_data(imgdata)
            else:
                aa.image = self.__getIconWithName('Other')
            aa.authorName = d[6]
            aa.version = d[5]
            ds.append(aa)
        return ds
项目:creator-system-test-framework    作者:CreatorDev    | 项目源码 | 文件源码
def Decode(value, type):
    data = base64.standard_b64decode(value)

    print("Decoding value %s, type %s" % (str(value), str(type)))
    sys.stdout.flush()

    if type is int:
        result = struct.unpack("=q", data)[0]
    elif type is float:
        result = struct.unpack("=d", data)[0]
    elif type is str:
        result = str(data)
    else:
        raise EncodeException("Unknown type %s with value %s" % (str(type(value)),str(value),))

    return result
项目:canister    作者:dagnelies    | 项目源码 | 文件源码
def _buildAuthJWT(config):
    client_id = config.get('canister.auth_jwt_client_id', None)
    secret = config.get('canister.auth_jwt_secret', None)
    encoding = config.get('canister.auth_jwt_encoding', 'clear').lower() # clear, base64std, or base64url

    if not client_id or not secret:
        return None

    import jwt

    if encoding == 'base64std': # with + and /
        secret = base64.standard_b64decode(secret)
    elif encoding == 'base64url': # with - and _
        secret = base64.urlsafe_b64decode(secret)
    elif encoding == 'clear':
        pass
    else:
        raise Exception('Invalid auth_jwt_encoding in config: "%s" (should be "clear", "base64std" or "base64url")' % encoding)

    def validate(token):
        profile = jwt.decode(token, secret, audience=client_id)
        return profile

    return validate
项目:plugin.video.netflix    作者:asciidisco    | 项目源码 | 文件源码
def __parse_crypto_keys(self, headerdata):
        self.__set_master_token(headerdata['keyresponsedata']['mastertoken'])
        # Init Decryption
        enc_key = headerdata['keyresponsedata']['keydata']['encryptionkey']
        hmac_key = headerdata['keyresponsedata']['keydata']['hmackey']
        encrypted_encryption_key = base64.standard_b64decode(enc_key)
        encrypted_sign_key = base64.standard_b64decode(hmac_key)
        cipher_rsa = PKCS1_OAEP.new(self.rsa_key)

        # Decrypt encryption key
        cipher_raw = cipher_rsa.decrypt(encrypted_encryption_key)
        encryption_key_data = json.JSONDecoder().decode(cipher_raw)
        self.encryption_key = base64key_decode(encryption_key_data['k'])

        # Decrypt sign key
        sign_key_raw = cipher_rsa.decrypt(encrypted_sign_key)
        sign_key_data = json.JSONDecoder().decode(sign_key_raw)
        self.sign_key = base64key_decode(sign_key_data['k'])

        self.__save_msl_data()
        self.handshake_performed = True
项目:plugin.video.netflix    作者:asciidisco    | 项目源码 | 文件源码
def __load_msl_data(self):
        raw_msl_data = self.load_file(
            msl_data_path=self.kodi_helper.msl_data_path,
            filename='msl_data.json')
        msl_data = json.JSONDecoder().decode(raw_msl_data)
        # Check expire date of the token
        raw_token = msl_data['tokens']['mastertoken']['tokendata']
        base_token = base64.standard_b64decode(raw_token)
        master_token = json.JSONDecoder().decode(base_token)
        exp = int(master_token['expiration'])
        valid_until = datetime.utcfromtimestamp(exp)
        present = datetime.now()
        difference = valid_until - present
        difference = difference.total_seconds() / 60 / 60
        # If token expires in less then 10 hours or is expires renew it
        if difference < 10:
            self.__load_rsa_keys()
            self.__perform_key_handshake()
            return

        self.__set_master_token(msl_data['tokens']['mastertoken'])
        enc_key = msl_data['encryption_key']
        self.encryption_key = base64.standard_b64decode(enc_key)
        self.sign_key = base64.standard_b64decode(msl_data['sign_key'])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_b64decode_invalid_chars(self):
        # issue 1466065: Test some invalid characters.
        tests = ((b'%3d==', b'\xdd'),
                 (b'$3d==', b'\xdd'),
                 (b'[==', b''),
                 (b'YW]3=', b'am'),
                 (b'3{d==', b'\xdd'),
                 (b'3d}==', b'\xdd'),
                 (b'@@', b''),
                 (b'!', b''),
                 (b'YWJj\nYWI=', b'abcab'))
        for bstr, res in tests:
            self.assertEqual(base64.b64decode(bstr), res)
            self.assertEqual(base64.standard_b64decode(bstr), res)
            self.assertEqual(base64.urlsafe_b64decode(bstr), res)

        # Normal alphabet characters not discarded when alternative given
        res = b'\xFB\xEF\xBE\xFF\xFF\xFF'
        self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res)
        self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_b64decode_invalid_chars(self):
        # issue 1466065: Test some invalid characters.
        tests = ((b'%3d==', b'\xdd'),
                 (b'$3d==', b'\xdd'),
                 (b'[==', b''),
                 (b'YW]3=', b'am'),
                 (b'3{d==', b'\xdd'),
                 (b'3d}==', b'\xdd'),
                 (b'@@', b''),
                 (b'!', b''),
                 (b'YWJj\nYWI=', b'abcab'))
        for bstr, res in tests:
            self.assertEqual(base64.b64decode(bstr), res)
            self.assertEqual(base64.standard_b64decode(bstr), res)
            self.assertEqual(base64.urlsafe_b64decode(bstr), res)

        # Normal alphabet characters not discarded when alternative given
        res = b'\xFB\xEF\xBE\xFF\xFF\xFF'
        self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res)
        self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def addcrypted2(self):
        package = self.get_post("source", "ClickAndLoad Package")
        crypted = self.get_post("crypted")
        jk = self.get_post("jk")

        crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
        jk = "%s f()" % jk
        jk = js.eval(jk)
        Key = unhexlify(jk)
        IV = Key

        obj = AES.new(Key, AES.MODE_CBC, IV)
        result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

        result = filter(lambda x: x != "", result)

        self.add_package(package, result, 0)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def addcrypted2(self):
        package = self.get_post("source", "ClickAndLoad Package")
        crypted = self.get_post("crypted")
        jk = self.get_post("jk")

        crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
        jk = "%s f()" % jk
        jk = js.eval(jk)
        Key = unhexlify(jk)
        IV = Key

        obj = AES.new(Key, AES.MODE_CBC, IV)
        result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

        result = filter(lambda x: x != "", result)

        self.add_package(package, result, 0)
项目:webkvmmgr    作者:crazw    | 项目源码 | 文件源码
def read_vm_file(self, path, uri="qemu:///system"):
        FILE_OPEN_READ="""{"execute":"guest-file-open", "arguments":{"path":"%s","mode":"r"}}"""
        FILE_READ="""{"execute":"guest-file-read", "arguments":{"handle":%s,"count":%d}}"""
        FILE_CLOSE="""{"execute":"guest-file-close", "arguments":{"handle":%s}}"""

        file_handle=-1
        try:
            file_handle=self.EXE(FILE_OPEN_READ % path)["return"]
            file_content=self.EXE(FILE_READ % (file_handle,1024000))["return"]["buf-b64"]
            file_content = base64.standard_b64decode(file_content)
        except Exception,e:
            logger.exception(e)
            return None
        finally:
            self.EXE(FILE_CLOSE % file_handle)
        return file_content
项目:cti-pattern-matcher    作者:oasis-open    | 项目源码 | 文件源码
def _process_prop_suffix(prop_name, value):
    """
    Some JSON properties have suffixes indicating the type of the value.  This
    will translate the json value into a Python type with the proper semantics,
    so that subsequent property tests will work properly.

    :param prop_name: The JSON property name
    :param value: The JSON property value
    :return: If key is a specially suffixed property name, an instance of an
        appropriate python type.  Otherwise, value itself is returned.
    """

    if prop_name.endswith(u"_hex"):
        # binary type, expressed as hex
        value = binascii.a2b_hex(value)
    elif prop_name.endswith(u"_bin"):
        # binary type, expressed as base64
        value = base64.standard_b64decode(value)

    return value
项目:pogom    作者:favll    | 项目源码 | 文件源码
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
项目:pokescan    作者:joaoanes    | 项目源码 | 文件源码
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms = True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
项目:collection    作者:skywind3000    | 项目源码 | 文件源码
def sign_extract(signature):
    if not signature:
        return None, None, None
    try:
        text = base64.standard_b64decode(signature)
    except:
        return None, None, None
    part = text.split(':')
    if len(part) != 3:
        return None, None, None
    user = part[0].strip('\r\n\t ')
    try:
        ts = long(part[1])
    except:
        return None, None, None
    verify = part[2].strip('\r\n\t ').lower()
    return user, ts, verify

# ?????
项目:warrant    作者:capless    | 项目源码 | 文件源码
def process_challenge(self, challenge_parameters):
        user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP']
        salt_hex = challenge_parameters['SALT']
        srp_b_hex = challenge_parameters['SRP_B']
        secret_block_b64 = challenge_parameters['SECRET_BLOCK']
        # re strips leading zero from a day number (required by AWS Cognito)
        timestamp = re.sub(r" 0(\d) ", r" \1 ",
                           datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"))
        hkdf = self.get_password_authentication_key(user_id_for_srp,
                                                    self.password, hex_to_long(srp_b_hex), salt_hex)
        secret_block_bytes = base64.standard_b64decode(secret_block_b64)
        msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \
              bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8')
        hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
        signature_string = base64.standard_b64encode(hmac_obj.digest())
        response = {'TIMESTAMP': timestamp,
                    'USERNAME': user_id_for_srp,
                    'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64,
                    'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')}
        if self.client_secret is not None:
            response.update({
                "SECRET_HASH":
                self.get_secret_hash(self.username, self.client_id, self.client_secret)})
        return response
项目:pogom-updated    作者:PokeHunterProject    | 项目源码 | 文件源码
def check_authentication(self, response_dict):
        if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \
           ('expire_timestamp_ms' in response_dict['auth_ticket']) and \
           (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])):

            had_ticket = self._auth_provider.has_ticket()

            auth_ticket = response_dict['auth_ticket']
            self._auth_provider.set_ticket(
                [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])])

            now_ms = get_time(ms=True)
            h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True)

            if had_ticket:
                self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
            else:
                self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def authenticate_client(self):
        validuser = False

        if self.headers.has_key('Authorization'):
            # handle Basic authentication
            (enctype, encstr) =  self.headers.get('Authorization').split()
            (emailid, machine_name) = base64.standard_b64decode(encstr).split(':')
            (auth_machine, auth_uuidstr, auth_email) = authenticate(machine_name)

            if emailid == auth_email:
                print "Authenticated"
                # set authentication cookies on client machines
                validuser = True
                if auth_uuidstr:
                    self.setCookie('UUID',auth_uuidstr)

        elif self.headers.has_key('UUID'):
            # handle cookie based authentication
            id =  self.headers.get('UUID')
            (auth_machine, auth_uuidstr, auth_email) = authenticate(id)

            if auth_uuidstr :
                print "Authenticated"
                validuser = True
        else:
            print 'Authentication failed'

        return validuser
项目:Cloudroid    作者:cyberdb    | 项目源码 | 文件源码
def _to_binary_inst(msg):
    if type(msg) in string_types:
        try:
            return standard_b64decode(msg)
        except:
            return msg
    else:
        try:
            return str(bytearray(msg))
        except:
            return msg
项目:lightstep-tracer-python    作者:lightstep    | 项目源码 | 文件源码
def extract(self, carrier):
        if type(carrier) is not bytearray:
            raise InvalidCarrierException()
        serializedProto = standard_b64decode(carrier)
        state = BinaryCarrier()
        state.ParseFromString(bytes(serializedProto))
        baggage = {}
        for k in state.basic_ctx.baggage_items:
            baggage[k] = state.basic_ctx.baggage_items[k]

        return SpanContext(
            span_id=state.basic_ctx.span_id,
            trace_id=state.basic_ctx.trace_id,
            baggage=baggage,
            sampled=state.basic_ctx.sampled)
项目:dragonchain    作者:dragonchain    | 项目源码 | 文件源码
def _sc_provisioning_helper(self, pl, sc_class, sc_key):
        """
        insert sc code into appropriate dictionary
        :param pl: transaction payload to extract sc from
        :param sc_class: type of sc being dealt with (e.g. tsc, ssc, etc.)
        :param sc_key: transaction type
        """
        try:
            if 'smart_contract' in pl:
                sc = pl['smart_contract']
                sc_impl = sc[sc_class]
                if sc_impl:
                    try:
                        sc_impl = base64.standard_b64decode(sc_impl)
                    except TypeError:
                        raise Exception("The Smart Contract implementation for " + str(sc_key) +
                                        " must be base64 encoded.")

                    func = None
                    # define sc function
                    exec(sc_impl)
                    # store sc function for this txn type (sc_key)
                    self.sc_container[sc_class][sc_key] = func
                else:
                    logger().warning("No smart contract code provided...")
                    return False
            else:
                return False
        except Exception as ex:
            template = "An exception of type {0} occurred. Arguments:\n{1!r}"
            message = template.format(type(ex).__name__, ex.args)
            logger().warning(message)
            return False
        return True
项目:PyDoc    作者:shaun-h    | 项目源码 | 文件源码
def __getUserContributed(self):
        server = self.serverManager.getDownloadServer(self.localServer)
        url = server.url
        if not url[-1] == '/':
            url = url + '/'
        url = url + self.jsonServerLocation
        data = requests.get(url).text
        data = ast.literal_eval(data)
        usercontributed = []
        defaultIcon = self.__getIconWithName('Other')
        for k,d in data['docsets'].items():
            u = UserContributed()
            u.name = d['name']
            if 'aliases' in d.keys():
                u.aliases = d['aliases']
            u.version = d['version']
            u.archive = d['archive']
            u.authorName = d['author']['name']
            if 'icon' in d.keys():
                imgdata = base64.standard_b64decode(d['icon'])
                u.image = ui.Image.from_data(imgdata)
                u.imageData = d['icon']
            else:
                u.image = defaultIcon
            u.onlineid = k
            u.status = 'online'
            usercontributed.append(u)
        return sorted(usercontributed, key=lambda x: x.name.lower())
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def VerifyMessageFromAddress(self,addr,message,sig):
        #Check a signature (r,s) for the message m signed by the Bitcoin 
        # address "addr".

        sign=base64.standard_b64decode(sig)
        (r,s)=(Byte2Int(sign[1:33]),Byte2Int(sign[33:65]))

        z=Byte2Int(Hash(Hash(MsgMagic(message),"SHA256"),"SHA256"))        

        val=ord(sign[0])
        if val<27 or val>=35:
            return False

        if val>=31:
            uncompressed=False
            val-=4
        else:
            uncompressed=True

        x=r
        y2=(pow(x,3,self.p) + self.a*x + self.b) % self.p
        y=Cipolla(y2,self.p)

        for _ in range(2):
            kG=EllipticCurvePoint([x,y,1],self.a,self.b,self.p,self.n)  
            mzG=self*((-z)%self.n)
            Q=(kG*s+mzG)*InvMod(r,self.n)

            if self.AddressFromPublicKey(Q,uncompressed)==addr:
                return True

            y=self.p-y

        return False
项目:plugin.video.netflix    作者:asciidisco    | 项目源码 | 文件源码
def __decrypt_payload_chunks(self, payloadchunks):
        decrypted_payload = ''
        for chunk in payloadchunks:
            payloadchunk = json.JSONDecoder().decode(chunk)
            payload = payloadchunk.get('payload')
            decoded_payload = base64.standard_b64decode(payload)
            encryption_envelope = json.JSONDecoder().decode(decoded_payload)
            # Decrypt the text
            cipher = AES.new(
                self.encryption_key,
                AES.MODE_CBC,
                base64.standard_b64decode(encryption_envelope['iv']))
            ciphertext = encryption_envelope.get('ciphertext')
            plaintext = cipher.decrypt(base64.standard_b64decode(ciphertext))
            # unpad the plaintext
            plaintext = json.JSONDecoder().decode(Padding.unpad(plaintext, 16))
            data = plaintext.get('data')

            # uncompress data if compressed
            if plaintext.get('compressionalgo') == 'GZIP':
                decoded_data = base64.standard_b64decode(data)
                data = zlib.decompress(decoded_data, 16 + zlib.MAX_WBITS)
            else:
                data = base64.standard_b64decode(data)
            decrypted_payload += data

        decrypted_payload = json.JSONDecoder().decode(decrypted_payload)[1]['payload']['data']
        decrypted_payload = base64.standard_b64decode(decrypted_payload)
        return json.JSONDecoder().decode(decrypted_payload)
项目:plugin.video.netflix    作者:asciidisco    | 项目源码 | 文件源码
def __set_master_token(self, master_token):
        self.mastertoken = master_token
        raw_token = master_token['tokendata']
        base_token = base64.standard_b64decode(raw_token)
        decoded_token = json.JSONDecoder().decode(base_token)
        self.sequence_number = decoded_token.get('sequencenumber')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def base64PayloadDecoder(data):
    return base64.standard_b64decode(data)
项目:django-oscar-api-checkout    作者:thelabnyc    | 项目源码 | 文件源码
def _session_unpickle(utfed):
    base64ed = utfed.encode('utf8')
    pickled = base64.standard_b64decode(base64ed)
    obj = pickle.loads(pickled)
    return obj
项目:TigerHost    作者:naphatkrit    | 项目源码 | 文件源码
def wsse_digest(secret, b64_encoded_nonce, timestamp):
    m = hashlib.sha256()
    m.update(base64.standard_b64decode(b64_encoded_nonce))
    m.update(timestamp)
    m.update(secret)
    digest = m.digest()
    return base64.standard_b64encode(digest)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode,
                        base64.b85decode,
                        base64.a85decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_decode_nonascii_str(self):
        decode_funcs = (base64.b64decode,
                        base64.standard_b64decode,
                        base64.urlsafe_b64decode,
                        base64.b32decode,
                        base64.b16decode,
                        base64.b85decode,
                        base64.a85decode)
        for f in decode_funcs:
            self.assertRaises(ValueError, f, 'with non-ascii \xcb')
项目:whatcripto    作者:vandalvnl    | 项目源码 | 文件源码
def decipher(self):
        try:
            return base64.standard_b64decode(str.encode(self.cipher))
        except:
            return ''
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _apply(self, value):
        value = self._filter(value, Type(binary_type)) # type: binary_type

        if self._has_errors:
            return None

        # Strip out whitespace.
        # Technically, whitespace is not part of the Base64 alphabet,
        # but virtually every implementation allows it.
        value = self.whitespace_re.sub(b'', value)

        # Check for invalid characters.
        # Note that Python 3's b64decode does this for us, but we also
        # have to support Python 2.
        # https://docs.python.org/3/library/base64.html#base64.b64decode
        if not self.base64_re.match(value):
            return self._invalid_value(
                value   = value,
                reason  = self.CODE_INVALID,
            )

        # Check to see if we are working with a URL-safe dialect.
        # https://en.wikipedia.org/wiki/Base64#URL_applications
        if (b'_' in value) or (b'-' in value):
            # You can't mix dialects, silly!
            if (b'+' in value) or (b'/' in value):
                return self._invalid_value(
                    value   = value,
                    reason  = self.CODE_INVALID,
                )

            url_safe = True
        else:
            url_safe = False

        # Normalize padding.
        # http://stackoverflow.com/a/9807138/
        value = value.rstrip(b'=')
        value += (b'=' * (4 - (len(value) % 4)))

        try:
            return (
                urlsafe_b64decode(value)
                    if url_safe
                    else standard_b64decode(value)
            )
        except TypeError:
            return self._invalid_value(value, self.CODE_INVALID, exc_info=True)


# noinspection SpellCheckingInspection
项目:plugin.video.netflix    作者:asciidisco    | 项目源码 | 文件源码
def __perform_key_handshake(self):
        header = self.__generate_msl_header(
            is_key_request=True,
            is_handshake=True,
            compressionalgo='',
            encrypt=False)
        esn = self.kodi_helper.get_esn()

        request = {
            'entityauthdata': {
                'scheme': 'NONE',
                'authdata': {
                    'identity': esn
                }
            },
            'headerdata': base64.standard_b64encode(header),
            'signature': '',
        }
        self.kodi_helper.log(msg='Key Handshake Request:')
        self.kodi_helper.log(msg=json.dumps(request))

        try:
            resp = self.session.post(
                url=self.endpoints['manifest'],
                data=json.dumps(request, sort_keys=True))
        except:
            resp = None
            exc = sys.exc_info()
            self.kodi_helper.log(
                msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1]))

        if resp and resp.status_code == 200:
            resp = resp.json()
            if 'errordata' in resp:
                self.kodi_helper.log(msg='Key Exchange failed')
                self.kodi_helper.log(
                    msg=base64.standard_b64decode(resp['errordata']))
                return False
            base_head = base64.standard_b64decode(resp['headerdata'])
            self.__parse_crypto_keys(
                headerdata=json.JSONDecoder().decode(base_head))
        else:
            self.kodi_helper.log(msg='Key Exchange failed')
            self.kodi_helper.log(msg=resp.text)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def addcrypted2():

    package = request.forms.get("source", None)
    crypted = request.forms["crypted"]
    jk = request.forms["jk"]

    crypted = standard_b64decode(unquote(crypted.replace(" ", "+")))
    if JS:
        jk = "%s f()" % jk
        jk = JS.eval(jk)

    else:
        try:
            jk = re.findall(r"return ('|\")(.+)('|\")", jk)[0][1]
        except:
        ## Test for some known js functions to decode
            if jk.find("dec") > -1 and jk.find("org") > -1:
                org = re.findall(r"var org = ('|\")([^\"']+)", jk)[0][1]
                jk = list(org)
                jk.reverse()
                jk = "".join(jk)
            else:
                print "Could not decrypt key, please install py-spidermonkey or ossp-js"

    try:
        Key = unhexlify(jk)
    except:
        print "Could not decrypt key, please install py-spidermonkey or ossp-js"
        return "failed"

    IV = Key

    obj = AES.new(Key, AES.MODE_CBC, IV)
    result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n")

    result = filter(lambda x: x != "", result)

    try:
        if package:
            PYLOAD.addPackage(package, result, 0)
        else:
            PYLOAD.generateAndAddPackages(result, 0)
    except:
        return "failed can't add"
    else:
        return "success\r\n"
项目:linkerUniverse    作者:LinkerNetworks    | 项目源码 | 文件源码
def write_package_in_zip(zip_file, path, package):
    """Write packages files in the zip file

    :param zip_file: zip file where the files will get created
    :type zip_file: zipfile.ZipFile
    :param path: path for the package directory. E.g.
                 universe/repo/packages/M/marathon/0
    :type path: pathlib.Path
    :param package: package information dictionary
    :type package: dict
    :rtype: None
    """

    package = package.copy()
    package.pop('releaseVersion')
    package.pop('minDcosReleaseVersion', None)
    package['packagingVersion'] = "2.0"

    resource = package.pop('resource', None)
    if resource:
        cli = resource.pop('cli', None)
        if cli and 'command' not in package:
            print(('WARNING: Removing binary CLI from ({}, {}) without a '
                  'Python CLI').format(package['name'], package['version']))
        zip_file.writestr(
            str(path / 'resource.json'),
            json.dumps(resource))

    marathon_template = package.pop(
        'marathon',
        {}
    ).get(
        'v2AppMustacheTemplate'
    )
    if marathon_template:
        zip_file.writestr(
            str(path / 'marathon.json.mustache'),
            base64.standard_b64decode(marathon_template))

    config = package.pop('config', None)
    if config:
        zip_file.writestr(
            str(path / 'config.json'),
            json.dumps(config))

    command = package.pop('command', None)
    if command:
        zip_file.writestr(
            str(path / 'command.json'),
            json.dumps(command))

    zip_file.writestr(
        str(path / 'package.json'),
        json.dumps(package))
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def encrypt_file(encryption_material, in_filename,
                     chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
        """
        Encrypts a file
        :param s3_metadata: S3 metadata output
        :param encryption_material: encryption material
        :param in_filename: input file name
        :param chunk_size: read chunk size
        :param tmp_dir: temporary directory, optional
        :return: a encrypted file
        """
        logger = getLogger(__name__)
        decoded_key = base64.standard_b64decode(
            encryption_material.query_stage_master_key)
        key_size = len(decoded_key)
        logger.debug(u'key_size = %s', key_size)

        # Generate key for data encryption
        iv_data = SnowflakeEncryptionUtil.get_secure_random(AES.block_size)
        file_key = SnowflakeEncryptionUtil.get_secure_random(key_size)
        data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_data)

        temp_output_fd, temp_output_file = tempfile.mkstemp(
            text=False, dir=tmp_dir,
            prefix=os.path.basename(in_filename) + "#")
        padded = False
        logger.debug(u'unencrypted file: %s, temp file: %s, tmp_dir: %s',
                     in_filename, temp_output_file, tmp_dir)
        with open(in_filename, u'rb') as infile:
            with os.fdopen(temp_output_fd, u'wb') as outfile:
                while True:
                    chunk = infile.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    elif len(chunk) % AES.block_size != 0:
                        chunk = PKCS5_PAD(chunk, AES.block_size)
                        padded = True
                    outfile.write(data_cipher.encrypt(chunk))
                if not padded:
                    outfile.write(data_cipher.encrypt(
                        AES.block_size * chr(AES.block_size).encode(UTF8)))

        # encrypt key with QRMK
        key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
        enc_kek = key_cipher.encrypt(PKCS5_PAD(file_key, AES.block_size))

        mat_desc = MaterialDescriptor(
            smk_id=encryption_material.smk_id,
            query_id=encryption_material.query_id,
            key_size=key_size * 8)
        metadata = EncryptionMetadata(
            key=base64.b64encode(enc_kek).decode('utf-8'),
            iv=base64.b64encode(iv_data).decode('utf-8'),
            matdesc=matdesc_to_unicode(mat_desc),
        )
        return (metadata, temp_output_file)
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def decrypt_file(metadata, encryption_material, in_filename,
                     chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
        """
        Decrypts a file and stores the output in the temporary directory
        :param metadata: metadata input
        :param encryption_material: encryption material
        :param in_filename: input file name
        :param chunk_size: read chunk size
        :param tmp_dir: temporary directory, optional
        :return: a decrypted file name
        """
        logger = getLogger(__name__)
        key_base64 = metadata.key
        iv_base64 = metadata.iv
        decoded_key = base64.standard_b64decode(
            encryption_material.query_stage_master_key)
        key_bytes = base64.standard_b64decode(key_base64)
        iv_bytes = base64.standard_b64decode(iv_base64)

        key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
        file_key = PKCS5_UNPAD(key_cipher.decrypt(key_bytes))

        data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_bytes)

        temp_output_fd, temp_output_file = tempfile.mkstemp(
            text=False, dir=tmp_dir,
            prefix=os.path.basename(in_filename) + "#")
        total_file_size = 0
        prev_chunk = None
        logger.info(u'encrypted file: %s, tmp file: %s',
                    in_filename, temp_output_file)
        with open(in_filename, u'rb') as infile:
            with os.fdopen(temp_output_fd, u'wb') as outfile:
                while True:
                    chunk = infile.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    total_file_size += len(chunk)
                    d = data_cipher.decrypt(chunk)
                    outfile.write(d)
                    prev_chunk = d
                if prev_chunk is not None:
                    total_file_size -= PKCS5_OFFSET(prev_chunk)
                outfile.truncate(total_file_size)
        return temp_output_file
项目:dcos-rabbitmq    作者:sheepkiller    | 项目源码 | 文件源码
def _unpack_stub_universe_json(self, scratchdir, stub_universe_file):
        stub_universe_json = json.loads(stub_universe_file.read().decode('utf-8'), object_pairs_hook=collections.OrderedDict)

        # put package files into a subdir of scratchdir: avoids conflicts with reuse of scratchdir elsewhere
        pkgdir = os.path.join(scratchdir, 'stub-universe-{}'.format(self._pkg_name))
        os.makedirs(pkgdir)

        if 'packages' not in stub_universe_json:
            raise Exception('Expected "packages" key in root of stub universe JSON: {}'.format(
                self._stub_universe_url))
        if len(stub_universe_json['packages']) != 1:
            raise Exception('Expected single "packages" entry in stub universe JSON: {}'.format(
                self._stub_universe_url))

        # note: we delete elements from package_json they're unpacked, as package_json is itself written to a file
        package_json = stub_universe_json['packages'][0]

        def extract_json_file(package_dict, name):
            file_dict = package_dict.get(name)
            if file_dict is not None:
                del package_dict[name]
                # ensure that the file has a trailing newline (json.dump() doesn't!)
                with open(os.path.join(pkgdir, name + '.json'), 'w') as fileref:
                    content = json.dumps(file_dict, indent=2)
                    fileref.write(content)
                    if not content.endswith('\n'):
                        fileref.write('\n')
        extract_json_file(package_json, 'command')
        extract_json_file(package_json, 'config')
        extract_json_file(package_json, 'resource')

        marathon_json = package_json.get('marathon', {}).get('v2AppMustacheTemplate')
        if marathon_json is not None:
            del package_json['marathon']
            with open(os.path.join(pkgdir, 'marathon.json.mustache'), 'w') as marathon_file:
                marathon_file.write(base64.standard_b64decode(marathon_json).decode())

        if 'releaseVersion' in package_json:
            del package_json['releaseVersion']

        with open(os.path.join(pkgdir, 'package.json'), 'w') as package_file:
            json.dump(package_json, package_file, indent=2)

        return pkgdir