Python base64 模块,decodebytes() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用base64.decodebytes()

项目:Isa-Framework    作者:c0hb1rd    | 项目源码 | 文件源码
def load_local_session(self):

        # ????? Session ?????????????????
        if self.__storage_path__ is not None:

            # ??????????? Session ???????????????? Session ID
            session_path_list = os.listdir(self.__storage_path__)

            # ??????????
            for session_id in session_path_list:
                # ??????????
                path = os.path.join(self.__storage_path__, session_id)

                # ????????
                with open(path, 'rb') as f:
                    content = f.read()

                # ??????? base64 ??
                content = base64.decodebytes(content)

                # ? Session ID ???????????????????
                self.__session_map__[session_id] = json.loads(content.decode())

    # ??????????
项目:lampost_lib    作者:genzgd    | 项目源码 | 文件源码
def _open_server(self):
        self.server = SMTP(params['smtp_server'], params.get("smtp_port", 587))
        self.server.ehlo()
        self.server.starttls()
        if params.get('use_gmail_oauth2'):
            auth_email, access_token = _get_oauth_info()
            auth_string = b'user=' + bytes(params['sender_email_address'],
                                           'ascii') + b'\1auth=Bearer ' + access_token + b'\1\1'
            log.info(auth_string)
            code, msg = self.server.docmd('AUTH', 'XOAUTH2 ' + (base64.b64encode(auth_string)).decode('ascii'))
            log.info("Code {}  Message {}", code, base64.decodebytes(msg))
            if code == 235:
                return True
            code, msg = self.server.docmd('')
            log.info("code {}, Message {}", code, msg)
            return False
        try:
            self.server.login(params['sender_email_address'], params['sender_password'])
            return True
        except SMTPAuthenticationError:
            log.warn("SMTP Password Authentication Failed", ex_info=True)
            return False
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def construct_yaml_binary(self, node):
            # type: (Any) -> Any
            try:
                value = self.construct_scalar(node).encode('ascii')
            except UnicodeEncodeError as exc:
                raise ConstructorError(
                    None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
            try:
                if hasattr(base64, 'decodebytes'):
                    return base64.decodebytes(value)
                else:
                    return base64.decodestring(value)
            except binascii.Error as exc:
                raise ConstructorError(
                    None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def construct_python_bytes(self, node):
            # type: (Any) -> Any
            try:
                value = self.construct_scalar(node).encode('ascii')
            except UnicodeEncodeError as exc:
                raise ConstructorError(
                    None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
            try:
                if hasattr(base64, 'decodebytes'):
                    return base64.decodebytes(value)
                else:
                    return base64.decodestring(value)
            except binascii.Error as exc:
                raise ConstructorError(
                    None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:odin    作者:imito    | 项目源码 | 文件源码
def load_lre_list():
  """ The header include following column:
  * name: LDC2017E22/data/ara-acm/ar-20031215-034005_0-a.sph
  * lre: {'train17', 'eval15', 'train15', 'dev17', 'eval17'}
  * language: {'ara-arb', 'eng-sas', 'fre-hat', 'zho-wuu',
               'eng-gbr', 'ara-ary', 'eng-usg', 'spa-lac',
               'ara-apc', 'qsl-pol', 'spa-eur', 'fre-waf',
               'zho-cdo', 'qsl-rus', 'spa-car', 'ara-arz',
               'zho-cmn', 'por-brz', 'zho-yue', 'zho-nan',
               'ara-acm'}
  * corpus: {'pcm', 'alaw', 'babel', 'ulaw', 'vast', 'mls14'}
  * duration: {'3', '30', '5', '15', '10', '20', '1000', '25'}

  Note
  ----
  Suggested namming scheme:
    `lre/lang/corpus/dur/base_name`
  """
  link = b'aHR0cHM6Ly9zMy5hbWF6b25hd3MuY29tL2FpLWRhdGFzZXRzL2xyZV9saXN0LnR4dA==\n'
  link = str(base64.decodebytes(link), 'utf-8')
  path = get_file(fname=os.path.basename(link),
                  origin=link,
                  outdir=get_datasetpath(root='~'))
  return np.genfromtxt(fname=path, dtype=str, delimiter=' ',
                       skip_header=1)
项目:px    作者:genotrance    | 项目源码 | 文件源码
def get_response_sspi(self, challenge=None):
        dprint("pywin32 SSPI")
        if challenge:
            try:
                challenge = base64.decodebytes(challenge.encode("utf-8"))
            except:
                challenge = base64.decodestring(challenge)
        output_buffer = None
        try:
            error_msg, output_buffer = self.sspi_client.authorize(challenge)
        except pywintypes.error:
            traceback.print_exc(file=sys.stdout)
            return None

        response_msg = output_buffer[0].Buffer
        try:
            response_msg = base64.encodebytes(response_msg.encode("utf-8"))
        except:
            response_msg = base64.encodestring(response_msg)
        response_msg = response_msg.decode("utf-8").replace('\012', '')
        return response_msg
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def _is_valid_ssh_rsa_public_key(openssh_pubkey):
    # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression
    # A "good enough" check is to see if the key starts with the correct header.
    import struct
    try:
        from base64 import decodebytes as base64_decode
    except ImportError:
        # deprecated and redirected to decodebytes in Python 3
        from base64 import decodestring as base64_decode
    parts = openssh_pubkey.split()
    if len(parts) < 2:
        return False
    key_type = parts[0]
    key_string = parts[1]

    data = base64_decode(key_string.encode())  # pylint:disable=deprecated-method
    int_len = 4
    str_len = struct.unpack('>I', data[:int_len])[0]  # this should return 7
    return data[int_len:int_len + str_len] == key_type.encode()
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def is_valid_ssh_rsa_public_key(openssh_pubkey):
    # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression # pylint: disable=line-too-long
    # A "good enough" check is to see if the key starts with the correct header.
    import struct
    try:
        from base64 import decodebytes as base64_decode
    except ImportError:
        # deprecated and redirected to decodebytes in Python 3
        from base64 import decodestring as base64_decode

    parts = openssh_pubkey.split()
    if len(parts) < 2:
        return False
    key_type = parts[0]
    key_string = parts[1]

    data = base64_decode(key_string.encode())  # pylint:disable=deprecated-method
    int_len = 4
    str_len = struct.unpack('>I', data[:int_len])[0]  # this should return 7
    return data[int_len:int_len + str_len] == key_type.encode()
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def decode_basic_auth(logger, http_auth):
    """
    Decodes basic auth from the header string.

    :param logger: logging instance
    :type logger: Logger
    :param http_auth: Basic authentication header
    :type http_auth: string
    :returns: tuple -- (username, passphrase) or (None, None) if empty.
    :rtype: tuple
    """
    if http_auth is not None:
        if http_auth.lower().startswith('basic '):
            try:
                decoded = tuple(base64.decodebytes(
                    http_auth[6:].encode('utf-8')).decode().split(':'))
                if logger:
                    logger.debug('Credentials given: %s', decoded)
                return decoded
            except base64.binascii.Error:
                if logger:
                    logger.info(
                        'Bad base64 data sent. Setting to no user/pass.')
    # Default meaning no user or password
    return (None, None)
项目:ipysankeywidget    作者:ricklupton    | 项目源码 | 文件源码
def save_png(self, filename):
        """Save the diagram to a PNG file.

        The widget must be displayed first before the PNG data is available. To
        display the widget and save an image at the same time, use
        `auto_save_png`.

        Parameters
        ----------
        filename : string
        """
        if self.png:
            data = base64.decodebytes(bytes(self.png, 'ascii'))
            with open(filename, 'wb') as f:
                f.write(data)
        else:
            warnings.warn('No png image available! Try auto_save_png() instead?')
项目:scripts    作者:vulnersCom    | 项目源码 | 文件源码
def get(self, key, default=None, version=None):
        """
            Fetch a given key from the cache. If the key does not exist, return
            default, which itself defaults to None.
        """
        key = self.make_key(key, version)
        self.validate_key(key)

        data = self._coll.find_one({'key': key})
        if not data:
            return default

        unencoded = base64.decodebytes(data['data'])
        unpickled = pickle.loads(unencoded)

        return unpickled
项目:scripts    作者:vulnersCom    | 项目源码 | 文件源码
def get_many(self, keys, version=None):
        """
            Fetch a bunch of keys from the cache. For certain backends (memcached,
            pgsql) this can be *much* faster when fetching multiple values.

            Returns a dict mapping each key in keys to its value. If the given
            key is missing, it will be missing from the response dict.
        """
        out = {}
        parsed_keys = {}

        for key in keys:
            pkey = self.make_key(key, version)
            self.validate_key(pkey)
            parsed_keys[pkey] = key

        data = self._coll.find({'key': {'$in': parsed_keys.keys()}})
        for result in data:
            unencoded = base64.decodebytes(result['data'])
            unpickled = pickle.loads(unencoded)
            out[parsed_keys[result['key']]] = unpickled

        return out
项目:PyWebDAV3    作者:andrewleech    | 项目源码 | 文件源码
def parse_request(self):
        if not six.moves.BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self):
            return False

        if self.DO_AUTH:
            authorization = self.headers.get('Authorization', '')
            if not authorization:
                self.send_autherror(401, b"Authorization Required")
                return False
            scheme, credentials = authorization.split()
            if scheme != 'Basic':
                self.send_error(501)
                return False
            credentials = base64.decodebytes(credentials.encode()).decode()
            user, password = credentials.split(':')
            if not self.get_userinfo(user, password, self.command):
                self.send_autherror(401, b"Authorization Required")
                return False
        return True
项目:skp_edu_docker    作者:TensorMSA    | 项目源码 | 文件源码
def _internal_service_call(self, share_data) :
        try:
            # internal : IMAGE
            print("?????????? ??? ?? ?? ?? ?? ?????????? ")

            temp = {}
            request_type = share_data.get_request_type()
            decode_text = base64.decodebytes(str.encode(share_data.get_request_data()))
            temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)]
            ml = MultiValueDict(temp)
            # fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb')
            # fp.write(decode_text)
            # fp.close()
            # CNN Prediction
            if(request_type == "image"):
                return_val = PredictNetCnn().run('nn00004', None, ml )
                name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"}
                print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0])
                share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????")
            else :
                share_data.set_output_data("??? ?? ??? ????")

            return share_data
        except Exception as e:
            raise Exception(e)
项目:paperwork-backend    作者:openpaperwork    | 项目源码 | 文件源码
def load(self, config):
        try:
            value = config.get(self.section, self.token)
            value = value.strip()
            if value != "None":
                try:
                    value = base64.decodebytes(value.encode("utf-8")).decode(
                        'utf-8')
                except Exception as exc:
                    logger.warning(
                        "Failed to decode work dir path ({})".format(value),
                        exc_info=exc
                    )
                value = FS.safe(value)
            else:
                value = None
            self.value = value
            return
        except (configparser.NoOptionError, configparser.NoSectionError):
            pass
        self.value = self.default_value_func()
项目:maas    作者:maas    | 项目源码 | 文件源码
def _retrieve_content(self, compression, encoding, content):
        """Extract the content of the sent file."""
        # Select the appropriate decompressor.
        if compression is None:
            decompress = lambda s: s
        elif compression == 'bzip2':
            decompress = bz2.decompress
        else:
            raise ValueError('Invalid compression: %s' % compression)

        # Select the appropriate decoder.
        if encoding == 'base64':
            decode = base64.decodebytes
        else:
            raise ValueError('Invalid encoding: %s' % encoding)

        return decompress(decode(content.encode("ascii")))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def decrypt(self, source):
        decrypter = self._cipher.decryptor()
        source = urllib.parse.unquote(source)
        source = base64.decodebytes(source.encode("utf-8"))
        data_bytes = decrypter.update(source) + decrypter.finalize()
        return data_bytes.rstrip(b"\x00").decode(self._encoding)
项目:petronia    作者:groboclown    | 项目源码 | 文件源码
def construct_yaml_binary(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:petronia    作者:groboclown    | 项目源码 | 文件源码
def construct_python_bytes(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:jgscm    作者:src-d    | 项目源码 | 文件源码
def test_get_base64(self):
        bucket = self.bucket
        blob = bucket.blob("test.pickle")
        obj = {"one": 1, "two": [2, 3]}
        blob.upload_from_string(pickle.dumps(obj))
        model = self.contents_manager.get(
            self.path("test.pickle"), format="base64")
        self.assertEqual(model["type"], "file")
        self.assertEqual(model["mimetype"], "application/octet-stream")
        self.assertEqual(model["format"], "base64")
        content = model["content"]
        self.assertIsInstance(content, str)
        bd = base64.decodebytes(content.encode())
        self.assertEqual(obj, pickle.loads(bd))
项目:jgscm    作者:src-d    | 项目源码 | 文件源码
def _save_file(self, path, content, format):
        """Uploads content of a generic file to GCS.
        :param: path blob path.
        :param: content file contents string.
        :param: format the description of the input format, can be either
                "text" or "base64".
        :return: created :class:`google.cloud.storage.Blob`.
        """
        bucket_name, bucket_path = self._parse_path(path)
        bucket = self._get_bucket(bucket_name, throw=True)

        if format not in {"text", "base64"}:
            raise web.HTTPError(
                400,
                u"Must specify format of file contents as \"text\" or "
                u"\"base64\"",
            )
        try:
            if format == "text":
                bcontent = content.encode("utf8")
            else:
                b64_bytes = content.encode("ascii")
                bcontent = base64.decodebytes(b64_bytes)
        except Exception as e:
            raise web.HTTPError(
                400, u"Encoding error saving %s: %s" % (path, e)
            )
        blob = bucket.blob(bucket_path)
        blob.upload_from_string(bcontent)
        return blob
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def testNonce(self):
        """ WebSocket key should be a random 16-byte nonce.
        """
        key = _create_sec_websocket_key()
        nonce = base64decode(key.encode("utf-8"))
        self.assertEqual(16, len(nonce))
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def testNonce(self):
        """ WebSocket key should be a random 16-byte nonce.
        """
        key = _create_sec_websocket_key()
        nonce = base64decode(key.encode("utf-8"))
        self.assertEqual(16, len(nonce))
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def load_from_base64(self, b64_msg):
        '''Deserializes a base64-encoded YAML message'''
        return self.load_from_yaml(base64.decodebytes(b64_msg))
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def base64_decodestring(s):
    if six.PY2:
        return base64.decodestring(s)
    else:
        if isinstance(s, str):
            s = s.encode('utf8')
        return base64.decodebytes(s).decode('utf8')
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def open_data(self, url, data=None):
        """Use "data" URL."""
        if not isinstance(url, str):
            raise URLError('data error: proxy support for data protocol currently not implemented')
        # ignore POSTed data
        #
        # syntax of data URLs:
        # dataurl   := "data:" [ mediatype ] [ ";base64" ] "," data
        # mediatype := [ type "/" subtype ] *( ";" parameter )
        # data      := *urlchar
        # parameter := attribute "=" value
        try:
            [type, data] = url.split(',', 1)
        except ValueError:
            raise IOError('data error', 'bad data URL')
        if not type:
            type = 'text/plain;charset=US-ASCII'
        semi = type.rfind(';')
        if semi >= 0 and '=' not in type[semi:]:
            encoding = type[semi+1:]
            type = type[:semi]
        else:
            encoding = ''
        msg = []
        msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
                                            time.gmtime(time.time())))
        msg.append('Content-type: %s' % type)
        if encoding == 'base64':
            # XXX is this encoding/decoding ok?
            data = base64.decodebytes(data.encode('ascii')).decode('latin-1')
        else:
            data = unquote(data)
        msg.append('Content-Length: %d' % len(data))
        msg.append('')
        msg.append(data)
        msg = '\n'.join(msg)
        headers = email.message_from_string(msg)
        f = io.StringIO(msg)
        #f.fileno = None     # needed for addinfourl
        return addinfourl(f, headers, url)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def decode(self, data):
        self.data = base64.decodebytes(data)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def getparser(use_datetime=False, use_builtin_types=False):
    """getparser() -> parser, unmarshaller

    Create an instance of the fastest available parser, and attach it
    to an unmarshalling object.  Return both objects.
    """
    if FastParser and FastUnmarshaller:
        if use_builtin_types:
            mkdatetime = _datetime_type
            mkbytes = base64.decodebytes
        elif use_datetime:
            mkdatetime = _datetime_type
            mkbytes = _binary
        else:
            mkdatetime = _datetime
            mkbytes = _binary
        target = FastUnmarshaller(True, False, mkbytes, mkdatetime, Fault)
        parser = FastParser(target)
    else:
        target = Unmarshaller(use_datetime=use_datetime, use_builtin_types=use_builtin_types)
        if FastParser:
            parser = FastParser(target)
        else:
            parser = ExpatParser(target)
    return parser, target

##
# Convert a Python tuple or a Fault instance to an XML-RPC packet.
#
# @def dumps(params, **options)
# @param params A tuple or Fault instance.
# @keyparam methodname If given, create a methodCall request for
#     this method name.
# @keyparam methodresponse If given, create a methodResponse packet.
#     If used with a tuple, the tuple must be a singleton (that is,
#     it must contain exactly one element).
# @keyparam encoding The packet encoding.
# @return A string containing marshalled data.
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def from_jsonb64(msg: str):
    return base64.decodebytes(msg.encode())


# TODO: monkeypatch is ugly but I'm in a hurry...
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def set_payout_address(client, wallet):
    """ Set a new address from the HD wallet for payouts.

    Note that is set server-side on a per-account basis. Thus, in the
    case where a single user has different wallets on different
    machines, all mining proceeds on all machines are sent to this
    address.

    Args:
        client (TwentyOneRestClient): rest client used for communication with the backend api
        wallet (two1.wallet.Wallet): a user's wallet instance

    Returns:
        bytes: extra nonce 1 which is required for computing the coinbase transaction
        int: the size in bytes of the extra nonce 2
        int: reward amount given upon successful solution found
    """
    payout_address = wallet.current_address
    auth_resp = client.account_payout_address_post(payout_address)

    user_info = json.loads(auth_resp.text)
    enonce1_base64 = user_info["enonce1"]
    enonce1 = base64.decodebytes(enonce1_base64.encode())
    enonce2_size = user_info["enonce2_size"]
    reward = user_info["reward"]

    return enonce1, enonce2_size, reward
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def get_work(client):
    """ Get work from the pool using the rest client.

    Args:
        client (TwentyOneRestClient): rest client used for communication with the backend api

    Returns:
        WorkNotification: a Swirl work notification message
    """
    try:
        response = client.get_work()
    except exceptions.ServerRequestError as e:
        profile_url = "{}/{}".format(two1.TWO1_WWW_HOST, client.username)
        profile_cta = uxstring.UxString.mining_profile_call_to_action.format(profile_url)
        err_string = None
        if e.status_code == 403 and e.data.get("error") == "TO201":
            err_string = uxstring.UxString.Error.suspended_account
        elif e.status_code == 403 and e.data.get("error") == "TO501":
            err_string = uxstring.UxString.monthly_mining_limit_reached
        elif e.status_code == 403 and e.data.get("error") == "TO502":
            err_string = uxstring.UxString.lifetime_earn_limit_reached
        elif e.status_code == 403 and e.data.get("error") == "TO503":
            err_string = uxstring.UxString.no_earn_allocations.format(two1.TWO1_WWW_HOST,
                                                                      client.username)
        elif e.status_code == 404:
            if bitcoin_computer.has_mining_chip():
                err_string = uxstring.UxString.monthly_mining_limit_reached
            else:
                err_string = uxstring.UxString.earn_limit_reached

        if err_string:
            raise exceptions.MiningDisabledError("{}\n\n{}".format(err_string, profile_cta))
        else:
            raise e

    msg_factory = message_factory.SwirlMessageFactory()
    msg = base64.decodebytes(response.content)
    work = msg_factory.read_object(msg)
    return work
项目:morango    作者:learningequality    | 项目源码 | 文件源码
def verify(self, message, signature):

        # assume we're dealing with a base64 encoded signature
        signature = b64decode(signature.encode())

        message = self.ensure_bytes(message)

        # ensure we have a public key we can use use to verify
        if not self._public_key:
            raise Exception("Key object does not have public key defined, and thus cannot be used to verify.")

        return self._verify(message, signature)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        if not os.path.isdir(LOCALEDIR):
            os.makedirs(LOCALEDIR)
        with open(MOFILE, 'wb') as fp:
            fp.write(base64.decodebytes(GNU_MO_DATA))
        with open(UMOFILE, 'wb') as fp:
            fp.write(base64.decodebytes(UMO_DATA))
        with open(MMOFILE, 'wb') as fp:
            fp.write(base64.decodebytes(MMO_DATA))
        self.env = support.EnvironmentVarGuard()
        self.env['LANGUAGE'] = 'xx'
        gettext._translations.clear()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_encoding(self):
        payload = self._au.get_payload()
        self.assertEqual(base64.decodebytes(bytes(payload, 'ascii')),
                self._audiodata)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open_data(self, url, data=None):
        """Use "data" URL."""
        if not isinstance(url, str):
            raise URLError('data error', 'proxy support for data protocol currently not implemented')
        # ignore POSTed data
        #
        # syntax of data URLs:
        # dataurl   := "data:" [ mediatype ] [ ";base64" ] "," data
        # mediatype := [ type "/" subtype ] *( ";" parameter )
        # data      := *urlchar
        # parameter := attribute "=" value
        try:
            [type, data] = url.split(',', 1)
        except ValueError:
            raise IOError('data error', 'bad data URL')
        if not type:
            type = 'text/plain;charset=US-ASCII'
        semi = type.rfind(';')
        if semi >= 0 and '=' not in type[semi:]:
            encoding = type[semi+1:]
            type = type[:semi]
        else:
            encoding = ''
        msg = []
        msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
                                            time.gmtime(time.time())))
        msg.append('Content-type: %s' % type)
        if encoding == 'base64':
            # XXX is this encoding/decoding ok?
            data = base64.decodebytes(data.encode('ascii')).decode('latin1')
        else:
            data = unquote(data)
        msg.append('Content-Length: %d' % len(data))
        msg.append('')
        msg.append(data)
        msg = '\n'.join(msg)
        headers = email.message_from_string(msg)
        f = io.StringIO(msg)
        #f.fileno = None     # needed for addinfourl
        return addinfourl(f, headers, url)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def decode(self, data):
        self.data = base64.decodebytes(data)
项目:analytics-platform-ops    作者:ministryofjustice    | 项目源码 | 文件源码
def construct_yaml_binary(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:analytics-platform-ops    作者:ministryofjustice    | 项目源码 | 文件源码
def construct_python_bytes(self, node):
        try:
            value = self.construct_scalar(node).encode('ascii')
        except UnicodeEncodeError as exc:
            raise ConstructorError(None, None,
                    "failed to convert base64 data into ascii: %s" % exc,
                    node.start_mark)
        try:
            if hasattr(base64, 'decodebytes'):
                return base64.decodebytes(value)
            else:
                return base64.decodestring(value)
        except binascii.Error as exc:
            raise ConstructorError(None, None,
                    "failed to decode base64 data: %s" % exc, node.start_mark)
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def base64_decode(input, errors='strict'):
    assert errors == 'strict'
    return (base64.decodebytes(input), len(input))
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def decode(self, input, final=False):
        assert self.errors == 'strict'
        return base64.decodebytes(input)