Python base64 模块,b64decode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.b64decode()

项目:django-heartbeat    作者:pbs    | 项目源码 | 文件源码
def auth(func):
    @wraps(func)
    def _decorator(request, *args, **kwargs):
        auth = get_auth()
        if auth.get('disable', False) is True:
            return func(request, *args, **kwargs)
        if 'authorized_ips' in auth:
            ip = get_client_ip(request)
            if is_authorized(ip, auth['authorized_ips']):
                return func(request, *args, **kwargs)
        prepare_credentials(auth)
        if request.META.get('HTTP_AUTHORIZATION'):
            authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ')
            if authmeth.lower() == 'basic':
                auth = base64.b64decode(auth).decode('utf-8')
                username, password = auth.split(':')
                if (username == HEARTBEAT['auth']['username'] and
                        password == HEARTBEAT['auth']['password']):
                    return func(request, *args, **kwargs)

        response = HttpResponse(
            "Authentication failed", status=401)
        response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"'
        return response
    return _decorator
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_encryption():
    return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES

abbrev = '{2}'
{0} = base64.b64decode('{1}')

def encrypt(raw):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )

def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)

################################################################################
#                       st_protocol.py stitch_gen variables                    #
################################################################################
项目:docklet    作者:unias    | 项目源码 | 文件源码
def parse_cookie(cookie, securekey):
    logger.info (">> parse cookie : %s" % cookie)
    parts = cookie.split('.')
    part1 = parts[0]
    part2 = '' if len(parts) < 2 else parts[1]
    try:
        text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8')
    except:
        logger.info ("decode cookie failed")
        return None
    logger.info ("cookie content : %s" % text)
    thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest()
    logger.info ("hash from part1 : %s" % thatpart2)
    logger.info ("hash from part2 : %s" % part2)
    if part2 == thatpart2:
        result = json.loads(text)['name']
    else:
        result = None
    logger.info ("parse from cookie : %s" % result)
    return result
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def handleDeviceCall(self, deviceId, payload):
        deviceIdHex = binascii.hexlify(deviceId)
        self.logger.debug("Handling request from device {0} with payload {1}".format(deviceIdHex, payload))          
        payloadDict = json.loads(payload)
        session = self.service.sessions[deviceId]        
        model = DeviceModel().loadFromSession(session, self.deviceConfig, payloadDict)
        if "image" in payloadDict:
            imageType = payloadDict.get("type", "jpg")
            imageData = base64.b64decode(payloadDict["image"])
            fn = "{0}.{1}".format(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), imageType)
            imageFile = os.path.join(self.getDeviceFolder(deviceIdHex, "images"), fn)
            self.logger.info("Received {0} bytes of image data from device {1} - saving to {2}".format(len(imageData), deviceIdHex, imageFile))
            with open(imageFile, 'wb+') as f:
                f.write(imageData)             
            model.newImageUrl = "/upload/{0}/images/{1}".format(deviceIdHex, fn)
        if "values" in payloadDict:
            for variable, value in payloadDict["values"].items():
                self.database.save(deviceIdHex, variable, session.protocol, session.clientAddr[0], session.lastUpdateTime, value)
        with session.lock:
            model.saveTrends(self.trends)
            model.computeTrends(self.trends)
        self.webServer.websocketSend(model.toJSON())
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def choose_aws_role(assertion):
        """ Choose AWS role from SAML assertion """
        aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
        attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
        roles = []
        role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
        root = ET.fromstring(base64.b64decode(assertion))
        for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
            if saml2attribute.get('Name') == aws_attribute_role:
                for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
                    roles.append(role_tuple(*saml2attributevalue.text.split(',')))

        for index, role in enumerate(roles):
            role_name = role.role_arn.split('/')[1]
            print("%d: %s" % (index+1, role_name))
        role_choice = input('Please select the AWS role: ')-1
        return roles[role_choice]
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def test_password_empty_file():
    """Test the encrypt module's CLI function with an empty YAML file."""
    runner = CliRunner()
    with runner.isolated_filesystem():
        initial_data = {'language': 'python'}
        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

            assert 'password' in config
            assert 'secure' in config['password']
            assert config['language'] == 'python'
            assert base64.b64decode(config['password']['secure'])
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def test_password_nonempty_file():
    """Test the encrypt module's CLI function with a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('password', {'secure': 'SUPER_INSECURE_PASSWORD'})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['password']['secure'])
        assert ['language', 'dist', 'password'] == [key for key in config.keys()]
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def test_deploy_empty_file():
    """Test the encrypt module's CLI function with the --deploy flag and an empty YAML file."""
    runner = CliRunner()
    with runner.isolated_filesystem():
        initial_data = {'language': 'python'}
        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')
        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert base64.b64decode(config['deploy']['password']['secure'])
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def test_deploy_nonempty_file():
    """Test the encrypt module's CLI function with the --deploy flag and a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('deploy', {'password': {'secure': 'SUPER_INSECURE_PASSWORD'}})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_PASSWORD')

        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['deploy']['password']['secure'])
        assert ['language', 'dist', 'deploy'] == [key for key in config.keys()]
项目:Travis-Encrypt    作者:mandeep    | 项目源码 | 文件源码
def test_environment_variable_nonempty_file():
    """Test the encrypt module's CLI function with the --env flag and a nonempty YAML file.

    The YAML file includes information that needs to be overwritten."""
    runner = CliRunner()
    with runner.isolated_filesystem():

        initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'),
                                    ('env', {'global': {'secure': 'API_KEY="SUPER_INSECURE_KEY"'}})])

        with open('file.yml', 'w') as file:
            ordered_dump(initial_data, file)

        result = runner.invoke(cli, ['--env', 'mandeep', 'Travis-Encrypt', 'file.yml'],
                               'SUPER_SECURE_API_KEY')

        assert not result.exception

        with open('file.yml') as file:
            config = ordered_load(file)

        assert config['language'] == 'python'
        assert config['dist'] == 'trusty'
        assert base64.b64decode(config['env']['global']['secure'])
        assert ['language', 'dist', 'env'] == [key for key in config.keys()]
项目:py-secretcrypt    作者:Zemanta    | 项目源码 | 文件源码
def _key():
    global __key
    if __key:
        return __key

    data_dir = _key_dir()
    key_file = os.path.join(data_dir, 'key')

    if os.path.isfile(key_file):
        with open(key_file, 'rb') as f:
            __key = base64.b64decode(f.read())
            return __key

    __key = base64.b64encode(os.urandom(16))
    try:
        os.makedirs(data_dir)
    except OSError as e:
        # errno17 == dir exists
        if e.errno != 17:
            raise
    with open(key_file, 'wb') as f:
        f.write(__key)
    return __key
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def get(self, result):
        try:
            s = re.compile("S\s*=\s*'([^']+)").findall(result)[0]
            s = base64.b64decode(s)
            s = s.replace(' ', '')
            s = re.sub('String\.fromCharCode\(([^)]+)\)', r'chr(\1)', s)
            s = re.sub('\.slice\((\d+),(\d+)\)', r'[\1:\2]', s)
            s = re.sub('\.charAt\(([^)]+)\)', r'[\1]', s)
            s = re.sub('\.substr\((\d+),(\d+)\)', r'[\1:\1+\2]', s)
            s = re.sub(';location.reload\(\);', '', s)
            s = re.sub(r'\n', '', s)
            s = re.sub(r'document\.cookie', 'cookie', s)

            cookie = '' ; exec(s)
            self.cookie = re.compile('([^=]+)=(.*)').findall(cookie)[0]
            self.cookie = '%s=%s' % (self.cookie[0], self.cookie[1])

            return self.cookie
        except:
            pass
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def get_cainfo(self):
        """Query the ca service information.

        Args:

        Returns: The base64 encoded CA PEM file content for the caname

        """
        if self._ca_name != "":
            body_data = {"caname": self._ca_name}
        else:
            body_data = {}
        response = self._send_ca_post(path="cainfo", json=body_data)
        _logger.debug("Raw response json {0}".format(response))
        if (response['success'] and response['result']['CAName']
                == self._ca_name):
            return base64.b64decode(response['result']['CAChain'])
        else:
            raise ValueError("get_cainfo failed with errors {0}"
                             .format(response['errors']))
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_inject_file_with_old_agent(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        raw_path = base64.b64decode(b64_path)
        raw_file = base64.b64decode(b64_file)
        new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file))
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'injectfile')
        tmp_arg_dict["value"] = json.dumps({"name": "injectfile",
                                            "value": new_b64})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.agent.inject_file(self.agent, FAKE_ARG_DICT)

        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
        self.agent._get_agent_features.assert_called_once()
项目:cfpp    作者:dcoker    | 项目源码 | 文件源码
def test_kms(self):
        if "CFPP_RUN_KMS_TESTS" not in os.environ:
            return

        import boto3
        import botocore

        output = subprocess.check_output(["cfpp", "-s", "tests",
                                          "tests/kms_test.template"])
        parsed = json.loads(output)["Parameters"]
        without_context = parsed["EncryptedValue"]["Default"]
        with_context = parsed["EncryptedValueWithContext"]["Default"]

        kms = boto3.client('kms')
        kms.decrypt(CiphertextBlob=base64.b64decode(without_context))
        try:
            kms.decrypt(CiphertextBlob=with_context)
            self.fail("expected KMS to fail due to lack of context")
        except botocore.exceptions.ClientError:
            pass

        kms.decrypt(CiphertextBlob=base64.b64decode(with_context),
                    EncryptionContext={"ContextKey": "ContextValue"})
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def unquote(cls, value):
        """Unquote the value for the cookie.  If unquoting does not work a
        :exc:`UnquoteError` is raised.

        :param value: the value to unquote.
        """
        try:
            if cls.quote_base64:
                value = base64.b64decode(value)
            if cls.serialization_method is not None:
                value = cls.serialization_method.loads(value)
            return value
        except Exception:
            # unfortunately pickle and other serialization modules can
            # cause pretty every error here.  if we get one we catch it
            # and convert it into an UnquoteError
            raise UnquoteError()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def post(self):
        authenticated = False
        if 'Authorization' in self.request.headers:
            auth = self.request.headers['Authorization']
            decoded = base64.b64decode(auth[6:])
            authenticated = True
        if authenticated:
            self.response.out.write('OK')
        else:    
            site = GetSite()
            template_values = {}
            template_values['site'] = site
            template_values['message'] = "Authentication required"
            path = os.path.join(os.path.dirname(__file__), 'tpl', 'api')
            t = self.get_template(path,'error.json')
            output = t.render(template_values)
            self.set_status(401, 'Unauthorized')
            self.set_header('Content-type', 'application/json')
            self.set_header('WWW-Authenticate', 'Basic realm="' + site.domain + '"')
            self.write(output)

# Replies
# /api/replies/show.json
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def handshake_cookie_verify(b64_cookie):
        '''
        If b64_cookie matches the expected format for a base-64 encoded
        privcount cookie, return the decoded cookie.
        Otherwise, return False.
        Raises an exception if the cookie is not correctly padded base64.
        '''
        if len(b64_cookie) != PrivCountProtocol.COOKIE_B64_BYTES:
            logging.warning("Invalid cookie: wrong encoded length {} expected {}"
                            .format(len(b64_cookie),
                                    PrivCountProtocol.COOKIE_B64_BYTES))
            return False
        cookie = b64decode(b64_cookie)
        if len(cookie) != PrivCountProtocol.COOKIE_BYTES:
            logging.warning("Invalid cookie: wrong decoded length {} expected {}"
                            .format(len(cookie),
                                    PrivCountProtocol.COOKIE_BYTES))
            return False
        return cookie
项目:binja_dynamics    作者:nccgroup    | 项目源码 | 文件源码
def decode(self, encoded):
        """ Takes the input from the text box and decodes it using the parameter
        set in the combobox """
        mode = self._encodings[self._decoder.currentIndex()]
        if mode == 'raw':
            return str(encoded)
        try:
            if mode == 'hex':
                return encoded.decode('hex')
            if mode == 'b64':
                return b64decode(encoded)
            if mode == 'py2':
                return eval(encoded)
        except:
            log_alert("Failed to decode input")
            return None
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def unquote(cls, value):
        """Unquote the value for the cookie.  If unquoting does not work a
        :exc:`UnquoteError` is raised.

        :param value: the value to unquote.
        """
        try:
            if cls.quote_base64:
                value = base64.b64decode(value)
            if cls.serialization_method is not None:
                value = cls.serialization_method.loads(value)
            return value
        except Exception:
            # unfortunately pickle and other serialization modules can
            # cause pretty every error here.  if we get one we catch it
            # and convert it into an UnquoteError
            raise UnquoteError()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _generate_assertion(self):
      """Generate the assertion that will be used in the request."""
      now = long(time.time())
      payload = {
          'aud': self.token_uri,
          'scope': self.scope,
          'iat': now,
          'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS,
          'iss': self.service_account_name
      }
      payload.update(self.kwargs)
      logger.debug(str(payload))

      private_key = base64.b64decode(self.private_key)
      return crypt.make_signed_jwt(crypt.Signer.from_string(
          private_key, self.private_key_password), payload)

  # Only used in verify_id_token(), which is always calling to the same URI
  # for the certs.
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def proof_open(team, proof):
    assert isinstance(proof, bytes)
    proof = b64decode(proof)

    claimed_chall_id = proof[2*64:].decode('utf-8')
    claimed_chall = Challenge(claimed_chall_id)

    chall_pk = claimed_chall['pk']
    team_pk = team['sign_pk']

    membership_proof = pysodium.crypto_sign_open(proof, chall_pk)
    chall_id = pysodium.crypto_sign_open(membership_proof,
                                         team_pk).decode('utf-8')

    if claimed_chall_id != chall_id:
        raise ValueError('invalid proof')

    return claimed_chall
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def setup_environment():
    root = os.getenv('LAMBDA_TASK_ROOT')
    bin_dir = os.path.join(root, 'bin')
    os.environ['PATH'] += ':' + bin_dir
    os.environ['GIT_EXEC_PATH'] = bin_dir

    ssh_dir = tempfile.mkdtemp()

    ssh_identity = os.path.join(ssh_dir, 'identity')
    with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600),
                   'w') as f:
        f.write(base64.b64decode(os.getenv('SSH_IDENTITY')))

    ssh_config = os.path.join(ssh_dir, 'config')
    with open(ssh_config, 'w') as f:
        f.write('CheckHostIP no\n'
                'StrictHostKeyChecking yes\n'
                'IdentityFile %s\n'
                'UserKnownHostsFile %s\n' %
                (ssh_identity, os.path.join(root, 'known_hosts')))

    os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
项目:newrelic-cli    作者:NativeInstruments    | 项目源码 | 文件源码
def get_monitor_script(self, name):
        # we will need monitor ID. So get the monitor object first
        monitor = self.get_monitor_by_name(name)
        if not monitor:
            raise ItemNotFoundError('Monitor {} not found'.format(name))

        url = '{}/v3/monitors/{}/script'.format(self.base_url, monitor['id'])
        try:
            r = self._get(
                url,
                headers=self.default_headers,
                timeout=self.timeout
            )
        # Make error message more specific
        except ItemNotFoundError:
            raise ItemNotFoundError(
                'Script for monitor {} not found'
                .format(name)
            )
        script_base64 = r.json()['scriptText']
        script = base64.b64decode(script_base64)
        return script
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def prepare_v(instance):
    # be very careful printing K, U, or V as they leak in logs stored on unprotected disks
    if common.DEVELOP_IN_ECLIPSE:
        logger.debug("b64_V (non encrypted): " + instance['v'])

    if instance.get('b64_encrypted_V',"") !="":
        b64_encrypted_V = instance['b64_encrypted_V']
        logger.debug("Re-using cached encrypted V")
    else:
        # encrypt V with the public key
        b64_encrypted_V = base64.b64encode(crypto.rsa_encrypt(crypto.rsa_import_pubkey(instance['public_key']),str(base64.b64decode(instance['v']))))
        instance['b64_encrypted_V'] = b64_encrypted_V

    logger.debug("b64_encrypted_V:" + b64_encrypted_V)
    post_data = {
              'encrypted_key': b64_encrypted_V
            }
    v_json_message = json.dumps(post_data)
    return v_json_message
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def parse_data_uri(data_uri):
    if data_uri is None:
        return None

    data = []

    dataset_uris = data_uri.split("\n")
    for uri in dataset_uris:
        fpos = uri.find(",")
        if fpos == -1: 
            return None

        try:
            data.append(base64.b64decode(uri[fpos:]))
        except Exception as e:
            # skip bad data
            continue

    return data
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def activate_group(uuid,keyblob):
    if common.STUB_TPM:
        return common.TEST_AES_REG_KEY

    group_id = get_group_num(uuid)
    priv_ca = base64.b64decode(keyblob)
    assert len(priv_ca) == 256
    logger.debug('Activating group number %d', group_id)
    body = vtpm_cmd(VTPM_ORD_GROUP_ACTIVATE,
                    struct.pack('>II', group_id, 256) + priv_ca)
    (algId, encScheme, size), body = unpack('>IHH', body)
    logger.info('Received Key. AlgID: 0x%x, encScheme: 0x%x, size: 0x%x',
                algId, encScheme, size)
    logger.info('Key: %r', body)
    assert size == len(body)
    return body
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def db_ssl(rdata, ctxt, ssl_dir):
    if 'ssl_ca' in rdata and ssl_dir:
        ca_path = os.path.join(ssl_dir, 'db-client.ca')
        with open(ca_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_ca']))

        ctxt['database_ssl_ca'] = ca_path
    elif 'ssl_ca' in rdata:
        log("Charm not setup for ssl support but ssl ca found", level=INFO)
        return ctxt

    if 'ssl_cert' in rdata:
        cert_path = os.path.join(
            ssl_dir, 'db-client.cert')
        if not os.path.exists(cert_path):
            log("Waiting 1m for ssl client cert validity", level=INFO)
            time.sleep(60)

        with open(cert_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_cert']))

        ctxt['database_ssl_cert'] = cert_path
        key_path = os.path.join(ssl_dir, 'db-client.key')
        with open(key_path, 'w') as fh:
            fh.write(b64decode(rdata['ssl_key']))

        ctxt['database_ssl_key'] = key_path

    return ctxt
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def configure_cert(self, cn=None):
        ssl_dir = os.path.join('/etc/apache2/ssl/', self.service_namespace)
        mkdir(path=ssl_dir)
        cert, key = get_cert(cn)
        if cn:
            cert_filename = 'cert_{}'.format(cn)
            key_filename = 'key_{}'.format(cn)
        else:
            cert_filename = 'cert'
            key_filename = 'key'

        write_file(path=os.path.join(ssl_dir, cert_filename),
                   content=b64decode(cert))
        write_file(path=os.path.join(ssl_dir, key_filename),
                   content=b64decode(key))
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def configure_ca(self):
        ca_cert = get_ca_cert()
        if ca_cert:
            install_ca_cert(b64decode(ca_cert))
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def decrypt(enc, aes_key=secret):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(aes_key, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def add_bind_server(BHOST,BPORT):
    return '''
    def bind_server(self):
        client_socket=None
        self.stop_bind_server = False
        # if no target is defined, we listen on all interfaces
        if dbg:
            print 'creating server'
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        target = base64.b64decode("{}")
        port = int(base64.b64decode("{}"))
        server.bind((target,port))
        server.listen(5)
        while True:
            if self.stop_bind_server:
                break
            server.settimeout(5)
            try:
                client_socket, addr = server.accept()
                server.settimeout(None)
                client_socket.settimeout(None)
            except Exception as e:
                if dbg:
                    print e
                client_socket=None
                pass
            if client_socket:
                if not self.connected:
                    self.connected = True
                    client_handler(client_socket)
                    self.connected = False
                else:
                    send(client_socket,"[!] Another stitch shell has already been established.\\n")
                    client_socket.close()
            client_socket=None
        server.close()

    def halt_bind_server(self):
        self.stop_bind_server = True\n\n'''.format(BHOST,BPORT)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def add_listen_server(LHOST,LPORT):
    return '''
    def listen_server(self):
        self.stop_listen_server  = False
        while True:
            if self.stop_listen_server :
                break
            while self.connected:
                sleep(5)
                pass
            if dbg:
                print 'trying to connect'
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            client_socket.settimeout(5)
            target = base64.b64decode("{}")
            port = int(base64.b64decode("{}"))
            try:
                client_socket.connect((target,port))
                client_socket.settimeout(300)
                if not self.connected:
                    self.connected = True
                    client_handler(client_socket)
                    self.connected = False
                else:
                    send(client_socket,"[!] Another stitch shell has already been established.\\n")
                    client_socket.close()
            except Exception as e:
                if dbg:
                    print e
                client_socket.close()

    def halt_listen_server(self):
        self.stop_listen_server = True\n\n'''.format(LHOST,LPORT)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_requirements():
    return '''
import os
import re
import sys
import math
import time
import socket
import base64
import shutil
import ctypes
import socket
import struct
import zipfile
import datetime
import requests
import StringIO
import platform
import threading
import subprocess
from Crypto import Random
from Crypto.Cipher import AES
from mss import ScreenshotError
from time import strftime, sleep
from contextlib import contextmanager
from base64 import b64decode as INFO
from zlib import decompress as SEC

from st_utils import *
from st_protocol import *
from st_encryption import *
'''
项目:toxxmlrpc    作者:merlink01    | 项目源码 | 文件源码
def on_friend_lossless_packet_callback(self,friendId,message_type,message):
        logger.debug('Got Lossless: Friend:%s Type:%s Message:%s'%(friendId,message_type,len(message)))

        self.queue_recv_lock.acquire()
        block_number = int(message[:1])

        job_exists = False
        for entry in self.queue_recv:
            if entry['friendId'] == friendId:
                job_exists = True

        if block_number == 0:
            self.queue_recv.append({'data':message[1:],'done':False,'message_type':message_type,'friendId':friendId})
        elif block_number == 1:
            if job_exists:
                self.queue_recv[-1]['data'] += message[1:]
            else:
                logger.info('Got Bad Fragment')
        elif block_number == 2:
            if job_exists:
                self.queue_recv[-1]['done'] = True
                if message_type == 181:
                    self.queue_recv[-1]['data'] = base64.b64decode(self.queue_recv[-1]['data'])
            else:
                logger.info('Got Bad Fragment')
        else:
            self.queue_recv_lock.release()
            logger.info('Got Unknown Fragment')
            raise IOError

        self.queue_recv_lock.release()
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def decode_private_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """

        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        private_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            private_key = f.read()

        if not private_key:
            return False

        # We were successful
        if not self.load(private_key=private_key):
            return False

        return True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def decode_public_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """
        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        self.public_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            try:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            except ValueError:
                # Could not decrypt content
                return False

        if not self.public_key:
            return False

        return True
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(KEY, AES.MODE_CBC, iv )
    return unpad(cipher.decrypt(enc[16:]))
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new(KEY, AES.MODE_CBC, iv )
    return unpad(cipher.decrypt(enc[16:]))
项目:aws-consolidated-admin    作者:awslabs    | 项目源码 | 文件源码
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
项目:aws-consolidated-admin    作者:awslabs    | 项目源码 | 文件源码
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
项目:aws-consolidated-admin    作者:awslabs    | 项目源码 | 文件源码
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
项目:aws-consolidated-admin    作者:awslabs    | 项目源码 | 文件源码
def decrypt(ciphertext):
    return kms.decrypt(
        CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _b64_decode_bytes(b):
    return base64.b64decode(b.encode("ascii"))
项目:sipxecs-voicemail-transcription    作者:andrewsauder    | 项目源码 | 文件源码
def fn_extract_wav(piece):
    """Save the base64 encoded wav data in this email piece and
        return the file path and name as a string
    """
    glog('Extracting wav from piece')

    # parse this piece of the email text to merge the
    #  base64 encoded string into one line and then
    #  decode the string into binary data
    capture = False
    lines = []
    index = 0
    for line in piece.split('\n'):
        if line.strip() == "":
            capture = True
        if capture == True:
            lines.append(line.strip())
            index = index + 1

    b64_data = ''.join(lines)
    data = base64.b64decode(b64_data)

    rs_string = random_string(20)

    if not os.path.exists(GLV_TMP_PATH_TO_SAVE_VOICEMAIL):
        os.makedirs(GLV_TMP_PATH_TO_SAVE_VOICEMAIL)

    rs_path = GLV_TMP_PATH_TO_SAVE_VOICEMAIL + rs_string + '.wav'
    glog("save wav file to " + rs_path)
    with open(rs_path, 'wb') as f:
        f.write(data)

    # return temporary file path and name
    return rs_path
项目:DCPanel    作者:vladgr    | 项目源码 | 文件源码
def decrypt(text):
    """Returns original string"""
    text = base64.b64decode(text.encode())
    aes = AES.new(settings.CRYPT_KEY, AES.MODE_CFB, settings.CRYPT_IV)
    return aes.decrypt(text).decode()
项目:ios-xr-grpc-python    作者:cisco-grpc-connection-libs    | 项目源码 | 文件源码
def _ConvertScalarFieldValue(value, field, require_str=False):
  """Convert a single scalar field value.
  Args:
    value: A scalar value to convert the scalar field value.
    field: The descriptor of the field to convert.
    require_str: If True, the field value must be a str.
  Returns:
    The converted scalar field value
  Raises:
    ParseError: In case of convert problems.
  """
  if field.cpp_type in _INT_TYPES:
    return _ConvertInteger(value)
  elif field.cpp_type in _FLOAT_TYPES:
    return _ConvertFloat(value)
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
    return _ConvertBool(value, require_str)
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
    if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
      return base64.b64decode(value)
    else:
      # Checking for unpaired surrogates appears to be unreliable,
      # depending on the specific Python version, so we check manually.
      if _UNPAIRED_SURROGATE_PATTERN.search(value):
        raise ParseError('Unpaired surrogate')
      return value
  elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
    # Convert an enum value.
    enum_value = field.enum_type.values_by_name.get(value, None)
    if enum_value is None:
      try:
        number = int(value)
        enum_value = field.enum_type.values_by_number.get(number, None)
      except ValueError:
        raise ParseError('Invalid enum value {0} for enum type {1}.'.format(
            value, field.enum_type.full_name))
      if enum_value is None:
        raise ParseError('Invalid enum value {0} for enum type {1}.'.format(
            value, field.enum_type.full_name))
    return enum_value.number