Python hmac 模块,new() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hmac.new()

项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def add_credentials_options(self):
        """Add credentials to the Options dictionary (if necessary)."""

        api_username = self.configuration.username
        api_key      = self.configuration.password

        self.options['api_username'] = api_username

        if self.configuration.secure_auth == True:
            timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

            uri       = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name

            self.options['timestamp'] = timestamp
            params                    = ''.join([api_username, timestamp, uri])
            self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest()
        else:
            self.options['api_key']   = api_key
项目:repository-gardener    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def check_signature(header_signature, request_body):
    if not header_signature:
        raise ValueError('No X-Hub-Signature header.')

    algorithm, signature_digest = header_signature.split('=')

    if algorithm != 'sha1':
        raise ValueError('Unsupported digest algorithm {}.'.format(algorithm))

    body_digest = hmac.new(
        webhook_secret(), msg=request_body, digestmod=hashlib.sha1).hexdigest()

    if not hmac.compare_digest(body_digest, signature_digest):
        raise ValueError('Body digest did not match signature digest')

    return True
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def request(self, method, request_uri, headers, content):
        """Modify the request headers"""
        keys = _get_end2end_headers(headers)
        keylist = "".join(["%s " % k for k in keys])
        headers_val = "".join([headers[k] for k in keys])
        created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
        cnonce = _cnonce()
        request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
        request_digest  = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
        headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
                self.credentials[0],
                self.challenge['realm'],
                self.challenge['snonce'],
                cnonce,
                request_uri,
                created,
                request_digest,
                keylist)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

    Args:
        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

    Returns:
        A string XSRF protection token.
    """
    digester = hmac.new(_to_bytes(key, encoding='utf-8'))
    digester.update(_to_bytes(str(user_id), encoding='utf-8'))
    digester.update(DELIMITER)
    digester.update(_to_bytes(action_id, encoding='utf-8'))
    digester.update(DELIMITER)
    when = _to_bytes(str(when or int(time.time())), encoding='utf-8')
    digester.update(when)
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
项目:dota2-messenger-platform    作者:nico-arianto    | 项目源码 | 文件源码
def verify_request_signature(func):
    @wraps(func)
    def decorated(*args, **kwargs):
        signature = request.headers.get('x-hub-signature', None)
        if signature:
            elements = signature.split('=')
            method = elements[0]
            signature_hash = elements[1]
            expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest()
            if signature_hash != expected_hash:
                LOGGER.error('Signature was invalid')
                return make_response('', 403)
        else:
            LOGGER.error('Could not validate the signature')
        return func(*args, **kwargs)

    return decorated
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def derive_key(self):
        """This method is called to derive the key.  If you're unhappy with
        the default key derivation choices you can override them here.
        Keep in mind that the key derivation in itsdangerous is not intended
        to be used as a security method to make a complex key out of a short
        password.  Instead you should use large random secret keys.
        """
        salt = want_bytes(self.salt)
        if self.key_derivation == 'concat':
            return self.digest_method(salt + self.secret_key).digest()
        elif self.key_derivation == 'django-concat':
            return self.digest_method(salt + b'signer' +
                self.secret_key).digest()
        elif self.key_derivation == 'hmac':
            mac = hmac.new(self.secret_key, digestmod=self.digest_method)
            mac.update(salt)
            return mac.digest()
        elif self.key_derivation == 'none':
            return self.secret_key
        else:
            raise TypeError('Unknown key derivation method')
项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def getHMACFunc(key, hex=True):
    """Return a function that computes the HMAC of its input using the **key**.

    :param bool hex: If True, the output of the function will be hex-encoded.
    :rtype: callable
    :returns: A function which can be uses to generate HMACs.
    """
    h = hmac.new(key, digestmod=DIGESTMOD)
    def hmac_fn(value):
        h_tmp = h.copy()
        h_tmp.update(value)
        if hex:
            return h_tmp.hexdigest()
        else:
            return h_tmp.digest()
    return hmac_fn
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def fetch_request_token(self, oauth_request):
        """Processes a request_token request and returns the
        request token on success.
        """
        try:
            # Get the request token for authorization.
            token = self._get_token(oauth_request, 'request')
        except OAuthError:
            # No token required for the initial token request.
            version = self._get_version(oauth_request)
            consumer = self._get_consumer(oauth_request)
            try:
                callback = self.get_callback(oauth_request)
            except OAuthError:
                callback = None # 1.0, no callback specified.
            self._check_signature(oauth_request, consumer, None)
            # Fetch a new token.
            token = self.data_store.fetch_request_token(consumer, callback)
        return token
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def client_udp_pre_encrypt(self, buf):
        if self.user_key is None:
            if b':' in to_bytes(self.server_info.protocol_param):
                try:
                    items = to_bytes(self.server_info.protocol_param).split(':')
                    self.user_key = self.hashfunc(items[1]).digest()
                    self.user_id = struct.pack('<I', int(items[0]))
                except:
                    pass
            if self.user_key is None:
                self.user_id = os.urandom(4)
                self.user_key = self.server_info.key
        authdata = os.urandom(3)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        uid = struct.unpack('<I', self.user_id)[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(self.user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata + uid
        return buf + hmac.new(self.user_key, buf, self.hashfunc).digest()[:1]
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def server_udp_pre_encrypt(self, buf, uid):
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        authdata = os.urandom(7)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        rand_len = self.udp_rnd_data_len(md5data, self.random_server)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata
        return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def server_udp_post_decrypt(self, buf):
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, buf[-8:-5], self.hashfunc).digest()
        uid = struct.unpack('<I', buf[-5:-1])[0] ^ struct.unpack('<I', md5data[:4])[0]
        uid = struct.pack('<I', uid)
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        if hmac.new(user_key, buf[:-1], self.hashfunc).digest()[:1] != buf[-1:]:
            return (b'', None)
        rand_len = self.udp_rnd_data_len(md5data, self.random_client)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.decrypt(buf[:-8 - rand_len])
        return (out_buf, uid)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def derive_key(self):
        """This method is called to derive the key.  If you're unhappy with
        the default key derivation choices you can override them here.
        Keep in mind that the key derivation in itsdangerous is not intended
        to be used as a security method to make a complex key out of a short
        password.  Instead you should use large random secret keys.
        """
        salt = want_bytes(self.salt)
        if self.key_derivation == 'concat':
            return self.digest_method(salt + self.secret_key).digest()
        elif self.key_derivation == 'django-concat':
            return self.digest_method(salt + b'signer' +
                self.secret_key).digest()
        elif self.key_derivation == 'hmac':
            mac = hmac.new(self.secret_key, digestmod=self.digest_method)
            mac.update(salt)
            return mac.digest()
        elif self.key_derivation == 'none':
            return self.secret_key
        else:
            raise TypeError('Unknown key derivation method')
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def request(self, method, request_uri, headers, content):
        """Modify the request headers"""
        keys = _get_end2end_headers(headers)
        keylist = "".join(["%s " % k for k in keys])
        headers_val = "".join([headers[k] for k in keys])
        created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
        cnonce = _cnonce()
        request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
        request_digest  = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
        headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
                self.credentials[0],
                self.challenge['realm'],
                self.challenge['snonce'],
                cnonce,
                request_uri,
                created,
                request_digest,
                keylist)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_get_user_future(self, access_token, callback):
        """Subclasses must override this to get basic information about the
        user.

        Should return a `.Future` whose result is a dictionary
        containing information about the user, which may have been
        retrieved by using ``access_token`` to make a request to the
        service.

        The access token will be added to the returned dictionary to make
        the result of `get_authenticated_user`.

        For backwards compatibility, the callback-based ``_oauth_get_user``
        method is also supported.
        """
        # By default, call the old-style _oauth_get_user, but new code
        # should override this method instead.
        self._oauth_get_user(access_token, callback)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth signature for the given request.

    See http://oauth.net/core/1.0/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))
    base_string = "&".join(_oauth_escape(e) for e in base_elems)

    key_elems = [escape.utf8(consumer_token["secret"])]
    key_elems.append(escape.utf8(token["secret"] if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.

    See http://oauth.net/core/1.0a/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))

    base_string = "&".join(_oauth_escape(e) for e in base_elems)
    key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
    key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def create_template_loader(self, template_path):
        """Returns a new template loader for the given path.

        May be overridden by subclasses.  By default returns a
        directory-based loader on the given path, using the
        ``autoescape`` and ``template_whitespace`` application
        settings.  If a ``template_loader`` application setting is
        supplied, uses that instead.
        """
        settings = self.application.settings
        if "template_loader" in settings:
            return settings["template_loader"]
        kwargs = {}
        if "autoescape" in settings:
            # autoescape=None means "no escaping", so we have to be sure
            # to only pass this kwarg if the user asked for it.
            kwargs["autoescape"] = settings["autoescape"]
        if "template_whitespace" in settings:
            kwargs["whitespace"] = settings["template_whitespace"]
        return template.Loader(template_path, **kwargs)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_raw_xsrf_token(self):
        """Read or generate the xsrf token in its raw form.

        The raw_xsrf_token is a tuple containing:

        * version: the version of the cookie from which this token was read,
          or None if we generated a new token in this request.
        * token: the raw token data; random (non-ascii) bytes.
        * timestamp: the time this token was generated (will not be accurate
          for version 1 cookies)
        """
        if not hasattr(self, '_raw_xsrf_token'):
            cookie = self.get_cookie("_xsrf")
            if cookie:
                version, token, timestamp = self._decode_xsrf_token(cookie)
            else:
                version, token, timestamp = None, None, None
            if token is None:
                version = None
                token = os.urandom(16)
                timestamp = time.time()
            self._raw_xsrf_token = (version, token, timestamp)
        return self._raw_xsrf_token
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_get_user_future(self, access_token, callback):
        """Subclasses must override this to get basic information about the
        user.

        Should return a `.Future` whose result is a dictionary
        containing information about the user, which may have been
        retrieved by using ``access_token`` to make a request to the
        service.

        The access token will be added to the returned dictionary to make
        the result of `get_authenticated_user`.

        For backwards compatibility, the callback-based ``_oauth_get_user``
        method is also supported.
        """
        # By default, call the old-style _oauth_get_user, but new code
        # should override this method instead.
        self._oauth_get_user(access_token, callback)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth signature for the given request.

    See http://oauth.net/core/1.0/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))
    base_string = "&".join(_oauth_escape(e) for e in base_elems)

    key_elems = [escape.utf8(consumer_token["secret"])]
    key_elems.append(escape.utf8(token["secret"] if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def create_template_loader(self, template_path):
        """Returns a new template loader for the given path.

        May be overridden by subclasses.  By default returns a
        directory-based loader on the given path, using the
        ``autoescape`` and ``template_whitespace`` application
        settings.  If a ``template_loader`` application setting is
        supplied, uses that instead.
        """
        settings = self.application.settings
        if "template_loader" in settings:
            return settings["template_loader"]
        kwargs = {}
        if "autoescape" in settings:
            # autoescape=None means "no escaping", so we have to be sure
            # to only pass this kwarg if the user asked for it.
            kwargs["autoescape"] = settings["autoescape"]
        if "template_whitespace" in settings:
            kwargs["whitespace"] = settings["template_whitespace"]
        return template.Loader(template_path, **kwargs)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_raw_xsrf_token(self):
        """Read or generate the xsrf token in its raw form.

        The raw_xsrf_token is a tuple containing:

        * version: the version of the cookie from which this token was read,
          or None if we generated a new token in this request.
        * token: the raw token data; random (non-ascii) bytes.
        * timestamp: the time this token was generated (will not be accurate
          for version 1 cookies)
        """
        if not hasattr(self, '_raw_xsrf_token'):
            cookie = self.get_cookie("_xsrf")
            if cookie:
                version, token, timestamp = self._decode_xsrf_token(cookie)
            else:
                version, token, timestamp = None, None, None
            if token is None:
                version = None
                token = os.urandom(16)
                timestamp = time.time()
            self._raw_xsrf_token = (version, token, timestamp)
        return self._raw_xsrf_token
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_get_user_future(self, access_token, callback):
        """Subclasses must override this to get basic information about the
        user.

        Should return a `.Future` whose result is a dictionary
        containing information about the user, which may have been
        retrieved by using ``access_token`` to make a request to the
        service.

        The access token will be added to the returned dictionary to make
        the result of `get_authenticated_user`.

        For backwards compatibility, the callback-based ``_oauth_get_user``
        method is also supported.
        """
        # By default, call the old-style _oauth_get_user, but new code
        # should override this method instead.
        self._oauth_get_user(access_token, callback)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth signature for the given request.

    See http://oauth.net/core/1.0/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))
    base_string = "&".join(_oauth_escape(e) for e in base_elems)

    key_elems = [escape.utf8(consumer_token["secret"])]
    key_elems.append(escape.utf8(token["secret"] if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.

    See http://oauth.net/core/1.0a/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))

    base_string = "&".join(_oauth_escape(e) for e in base_elems)
    key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
    key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def create_template_loader(self, template_path):
        """Returns a new template loader for the given path.

        May be overridden by subclasses.  By default returns a
        directory-based loader on the given path, using the
        ``autoescape`` and ``template_whitespace`` application
        settings.  If a ``template_loader`` application setting is
        supplied, uses that instead.
        """
        settings = self.application.settings
        if "template_loader" in settings:
            return settings["template_loader"]
        kwargs = {}
        if "autoescape" in settings:
            # autoescape=None means "no escaping", so we have to be sure
            # to only pass this kwarg if the user asked for it.
            kwargs["autoescape"] = settings["autoescape"]
        if "template_whitespace" in settings:
            kwargs["whitespace"] = settings["template_whitespace"]
        return template.Loader(template_path, **kwargs)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _get_raw_xsrf_token(self):
        """Read or generate the xsrf token in its raw form.

        The raw_xsrf_token is a tuple containing:

        * version: the version of the cookie from which this token was read,
          or None if we generated a new token in this request.
        * token: the raw token data; random (non-ascii) bytes.
        * timestamp: the time this token was generated (will not be accurate
          for version 1 cookies)
        """
        if not hasattr(self, '_raw_xsrf_token'):
            cookie = self.get_cookie("_xsrf")
            if cookie:
                version, token, timestamp = self._decode_xsrf_token(cookie)
            else:
                version, token, timestamp = None, None, None
            if token is None:
                version = None
                token = os.urandom(16)
                timestamp = time.time()
            self._raw_xsrf_token = (version, token, timestamp)
        return self._raw_xsrf_token
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def salted_hmac(key_salt, value, secret=None):
    """
    Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a
    secret (which defaults to settings.SECRET_KEY).

    A different key_salt should be passed in for every application of HMAC.
    """
    if secret is None:
        secret = settings.SECRET_KEY

    key_salt = force_bytes(key_salt)
    secret = force_bytes(secret)

    # We need to generate a derived key from our base key.  We can do this by
    # passing the key_salt and our base key through a pseudo-random function and
    # SHA1 works nicely.
    key = hashlib.sha1(key_salt + secret).digest()

    # If len(key_salt + secret) > sha_constructor().block_size, the above
    # line is redundant and could be replaced by key = key_salt + secret, since
    # the hmac module does the same thing for keys longer than the block size.
    # However, we need to ensure that we *always* do this.
    return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def derive_key(self):
        """This method is called to derive the key.  If you're unhappy with
        the default key derivation choices you can override them here.
        Keep in mind that the key derivation in itsdangerous is not intended
        to be used as a security method to make a complex key out of a short
        password.  Instead you should use large random secret keys.
        """
        salt = want_bytes(self.salt)
        if self.key_derivation == 'concat':
            return self.digest_method(salt + self.secret_key).digest()
        elif self.key_derivation == 'django-concat':
            return self.digest_method(salt + b'signer' +
                self.secret_key).digest()
        elif self.key_derivation == 'hmac':
            mac = hmac.new(self.secret_key, digestmod=self.digest_method)
            mac.update(salt)
            return mac.digest()
        elif self.key_derivation == 'none':
            return self.secret_key
        else:
            raise TypeError('Unknown key derivation method')
项目:aws-security-automation    作者:awslabs    | 项目源码 | 文件源码
def store_mfa(user, seed, region, account):
    SSM_CLIENT = boto3.client('ssm')
    KMS_CLIENT = boto3.client('kms')
    response = KMS_CLIENT.describe_key(
        KeyId='alias/MFAUser',
    )
    keyArn = response['KeyMetadata']['Arn']
    try:
        response = SSM_CLIENT.put_parameter(
            Name='mfa-' + user,
            Description='MFA token seed',
            Value=seed,
            Type='SecureString',
            KeyId=keyArn,
            Overwrite=True
        )
        mfa_store_policy(user, region, account)
        print("Token stored in Parameter Store")
    except Exception as e:
        print("Failed to store seed. You will need to retrieve it from the used log DDB or create a new token manually.")
        response = "Fail"
    return response
项目:aws-security-automation    作者:awslabs    | 项目源码 | 文件源码
def deleteUser(userName, SN):
    try:
        response = client.deactivate_mfa_device(
            UserName=userName,
            SerialNumber=SN
        )
        print("MFA device deactivated, trying to delete device.")
    except:
        print("Unable to deactivate MFA token. Could be that it's not created.")
    try:
        response = client.delete_virtual_mfa_device(
            SerialNumber=SN
        )
        print("MFA device deleted, trying to delete new user.")
    except:
        print("Unable to delete MFA token. Could be that it's not created.")
    try:
        response = client.delete_user(
            UserName=userName
        )
        print("User deleted")
    except:
        print("Unable to delete user: " + userName)
    return True
项目:aws-security-automation    作者:awslabs    | 项目源码 | 文件源码
def generate_token(seed):
    """Summary

    Args:
        seed (TYPE): Description

    Returns:
        TYPE: Description
    """
    seed = base64.b32decode(seed, True)
    hmacHash = hmac.new(
        seed, struct.pack(
            ">Q", int(
                time.time() // 30)),
        hashlib.sha1).digest()
    hashOffset = ord(hmacHash[19]) & 0xf
    token = (struct.unpack(
        ">I",
        hmacHash[hashOffset:hashOffset + 4])[0] & 0x7fffffff) % 10 ** 6
    return token
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def build_hmac_sha1(pkt, pw = '\0' * 20, ip4l = [], ip6l = []):
    if not pkt.haslayer(CARP):
        return None 

    p = pkt[CARP]
    h = hmac.new(pw, digestmod = hashlib.sha1)
    # XXX: this is a dirty hack. it needs to pack version and type into a single 8bit field
    h.update('\x21')
    # XXX: mac addy if different from special link layer. comes before vhid
    h.update(struct.pack('!B', p.vhid))

    sl = []
    for i in ip4l:
        # sort ips from smallest to largest
        sl.append(inet_aton(i))
    sl.sort()

    for i in sl:
        h.update(i)

    # XXX: do ip6l sorting

    return h.digest()
项目:Multi_User_Blog    作者:Nshmais    | 项目源码 | 文件源码
def post(self):
        """
            Creates new post and redirect to new post page.
        """
        if not self.user:
            return self.redirect('/blog')

        subject = self.request.get('subject')
        content = self.request.get('content')

        if subject and content:
            p = Post(parent=blog_key(), user_id=self.user.key().id(),
                     subject=subject, content=content)
            p.put()
            self.redirect('/blog/%s' % str(p.key().id()))
        else:
            error = "subject and content, please!"
            self.render("newpost.html", subject=subject,
                        content=content, error=error)
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def gen_user_breadcrumb(size):
    """
    Used in comments posting.

    :param size:
    :return:
    """
    key = 'iN4$aGr0m'
    dt = int(time.time() * 1000)

    # typing time elapsed
    time_elapsed = randint(500, 1500) + size * randint(500, 1500)

    text_change_event_count = max(1, size / randint(3, 5))

    data = '{size!s} {elapsed!s} {count!s} {dt!s}'.format(**{
        'size': size, 'elapsed': time_elapsed, 'count': text_change_event_count, 'dt': dt
    })
    return '{0!s}\n{1!s}\n'.format(
        base64.b64encode(hmac.new(key.encode('ascii'), data.encode('ascii'), digestmod=hashlib.sha256).digest()),
        base64.b64encode(data.encode('ascii')))
项目:Bella    作者:Trietptm-on-Security    | 项目源码 | 文件源码
def kciCloudHelper(iCloudKey):
    #this function is tailored to keychaindump. Takes an iCloud key, and returns tokens
    msg = base64.b64decode(iCloudKey)
    key = "t9s\"lx^awe.580Gj%'ld+0LG<#9xa?>vb)-fkwb92[}"
    hashed = hmac.new(key, msg, digestmod=hashlib.md5).digest()
    hexedKey = binascii.hexlify(hashed)
    IV = 16 * '0'
    mme_token_file = glob("/Users/%s/Library/Application Support/iCloud/Accounts/*" % get_bella_user()) #this doesnt need to be globber bc only current user's info can be decrypted
    for x in mme_token_file:
        try:
            int(x.split("/")[-1])
            mme_token_file = x
        except ValueError:
            continue
    send_msg("\t%sDecrypting token plist\n\t    [%s]\n" % (blue_star, mme_token_file), False)
    decryptedBinary = subprocess.check_output("openssl enc -d -aes-128-cbc -iv '%s' -K %s < '%s'" % (IV, hexedKey, mme_token_file), shell=True)
    from Foundation import NSData, NSPropertyListSerialization
    binToPlist = NSData.dataWithBytes_length_(decryptedBinary, len(decryptedBinary))
    token_plist = NSPropertyListSerialization.propertyListWithData_options_format_error_(binToPlist, 0, None, None)[0]
    tokz = "[%s | %s]\n" % (token_plist["appleAccountInfo"]["primaryEmail"], token_plist["appleAccountInfo"]["fullName"])
    tokz += "%s:%s\n" % (token_plist["appleAccountInfo"]["dsPrsID"], token_plist["tokens"]["mmeAuthToken"])
    return tokz
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def request(self, method, request_uri, headers, content):
        """Modify the request headers"""
        keys = _get_end2end_headers(headers)
        keylist = "".join(["%s " % k for k in keys])
        headers_val = "".join([headers[k] for k in keys])
        created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
        cnonce = _cnonce()
        request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
        request_digest  = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
        headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
                self.credentials[0],
                self.challenge['realm'],
                self.challenge['snonce'],
                cnonce,
                request_uri,
                created,
                request_digest,
                keylist)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def auth_sub_string_from_body(http_body):
  """Extracts the AuthSub token from an HTTP body string.

  Used to find the new session token after making a request to upgrade a
  single use AuthSub token.

  Args:
    http_body: str The repsonse from the server which contains the AuthSub
        key. For example, this function would find the new session token
        from the server's response to an upgrade token request.

  Returns:
    The raw token value string to use in an AuthSubToken object.
  """
  for response_line in http_body.splitlines():
    if response_line.startswith('Token='):
      # Strip off Token= and return the token value string.
      return response_line[6:]
  return None
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def generate_request_for_access_token(
    request_token, auth_server_url=ACCESS_TOKEN_URL):
  """Creates a request to ask the OAuth server for an access token.

  Requires a request token which the user has authorized. See the
  documentation on OAuth with Google Data for more details:
  http://code.google.com/apis/accounts/docs/OAuth.html#AccessToken

  Args:
    request_token: An OAuthHmacToken or OAuthRsaToken which the user has
        approved using their browser.
    auth_server_url: (optional) The URL at which the OAuth access token is
        requested. Defaults to
        https://www.google.com/accounts/OAuthGetAccessToken

  Returns:
    A new HttpRequest object which can be sent to the OAuth server to
    request an OAuth Access Token.
  """
  http_request = atom.http_core.HttpRequest(auth_server_url, 'POST')
  http_request.headers['Content-Length'] = '0'
  return request_token.modify_request(http_request)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def upgrade_to_access_token(request_token, server_response_body):
  """Extracts access token information from response to an upgrade request.

  Once the server has responded with the new token info for the OAuth
  access token, this method modifies the request_token to set and unset
  necessary fields to create valid OAuth authorization headers for requests.

  Args:
    request_token: An OAuth token which this function modifies to allow it
        to be used as an access token.
    server_response_body: str The server's response to an OAuthAuthorizeToken
        request. This should contain the new token and token_secret which
        are used to generate the signature and parameters of the Authorization
        header in subsequent requests to Google Data APIs.

  Returns:
    The same token object which was passed in.
  """
  token, token_secret = oauth_token_info_from_body(server_response_body)
  request_token.token = token
  request_token.token_secret = token_secret
  request_token.auth_state = ACCESS_TOKEN
  request_token.next = None
  request_token.verifier = None
  return request_token
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

    Args:
        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

    Returns:
        A string XSRF protection token.
    """
    digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8'))
    digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8'))
    digester.update(DELIMITER)
    digester.update(_helpers._to_bytes(action_id, encoding='utf-8'))
    digester.update(DELIMITER)
    when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8')
    digester.update(when)
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def generate_csrf_token(self, csrf_context):
        if self.SECRET_KEY is None:
            raise Exception('must set SECRET_KEY in a subclass of this form for it to work')
        if csrf_context is None:
            raise TypeError('Must provide a session-like object as csrf context')

        session = getattr(csrf_context, 'session', csrf_context)

        if 'csrf' not in session:
            session['csrf'] = sha1(os.urandom(64)).hexdigest()

        self.csrf_token.csrf_key = session['csrf']
        if self.TIME_LIMIT:
            expires = (datetime.now() + self.TIME_LIMIT).strftime(self.TIME_FORMAT)
            csrf_build = '%s%s' % (session['csrf'], expires)
        else:
            expires = ''
            csrf_build = session['csrf']

        hmac_csrf = hmac.new(self.SECRET_KEY, csrf_build.encode('utf8'), digestmod=sha1)
        return '%s##%s' % (expires, hmac_csrf.hexdigest())
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf_token(self, field):
        if not field.data or '##' not in field.data:
            raise ValidationError(field.gettext('CSRF token missing'))

        expires, hmac_csrf = field.data.split('##')

        check_val = (field.csrf_key + expires).encode('utf8')

        hmac_compare = hmac.new(self.SECRET_KEY, check_val, digestmod=sha1)
        if hmac_compare.hexdigest() != hmac_csrf:
            raise ValidationError(field.gettext('CSRF failed'))

        if self.TIME_LIMIT:
            now_formatted = datetime.now().strftime(self.TIME_FORMAT)
            if now_formatted > expires:
                raise ValidationError(field.gettext('CSRF token expired'))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf_token(self, form, field):
        meta = self.form_meta
        if not field.data or '##' not in field.data:
            raise ValidationError(field.gettext('CSRF token missing'))

        expires, hmac_csrf = field.data.split('##', 1)

        check_val = (self.session['csrf'] + expires).encode('utf8')

        hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1)
        if hmac_compare.hexdigest() != hmac_csrf:
            raise ValidationError(field.gettext('CSRF failed'))

        if self.time_limit:
            now_formatted = self.now().strftime(self.TIME_FORMAT)
            if now_formatted > expires:
                raise ValidationError(field.gettext('CSRF token expired'))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def derive_key(self):
        """This method is called to derive the key.  If you're unhappy with
        the default key derivation choices you can override them here.
        Keep in mind that the key derivation in itsdangerous is not intended
        to be used as a security method to make a complex key out of a short
        password.  Instead you should use large random secret keys.
        """
        salt = want_bytes(self.salt)
        if self.key_derivation == 'concat':
            return self.digest_method(salt + self.secret_key).digest()
        elif self.key_derivation == 'django-concat':
            return self.digest_method(salt + b'signer' +
                self.secret_key).digest()
        elif self.key_derivation == 'hmac':
            mac = hmac.new(self.secret_key, digestmod=self.digest_method)
            mac.update(salt)
            return mac.digest()
        elif self.key_derivation == 'none':
            return self.secret_key
        else:
            raise TypeError('Unknown key derivation method')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def generate_csrf_token(self, csrf_context):
        if self.SECRET_KEY is None:
            raise Exception('must set SECRET_KEY in a subclass of this form for it to work')
        if csrf_context is None:
            raise TypeError('Must provide a session-like object as csrf context')

        session = getattr(csrf_context, 'session', csrf_context)

        if 'csrf' not in session:
            session['csrf'] = sha1(os.urandom(64)).hexdigest()

        self.csrf_token.csrf_key = session['csrf']
        if self.TIME_LIMIT:
            expires = (datetime.now() + self.TIME_LIMIT).strftime(self.TIME_FORMAT)
            csrf_build = '%s%s' % (session['csrf'], expires)
        else:
            expires = ''
            csrf_build = session['csrf']

        hmac_csrf = hmac.new(self.SECRET_KEY, csrf_build.encode('utf8'), digestmod=sha1)
        return '%s##%s' % (expires, hmac_csrf.hexdigest())
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf_token(self, field):
        if not field.data or '##' not in field.data:
            raise ValidationError(field.gettext('CSRF token missing'))

        expires, hmac_csrf = field.data.split('##')

        check_val = (field.csrf_key + expires).encode('utf8')

        hmac_compare = hmac.new(self.SECRET_KEY, check_val, digestmod=sha1)
        if hmac_compare.hexdigest() != hmac_csrf:
            raise ValidationError(field.gettext('CSRF failed'))

        if self.TIME_LIMIT:
            now_formatted = datetime.now().strftime(self.TIME_FORMAT)
            if now_formatted > expires:
                raise ValidationError(field.gettext('CSRF token expired'))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def generate_csrf_token(self, csrf_token_field):
        meta = self.form_meta
        if meta.csrf_secret is None:
            raise Exception('must set `csrf_secret` on class Meta for SessionCSRF to work')
        if meta.csrf_context is None:
            raise TypeError('Must provide a session-like object as csrf context')

        session = self.session

        if 'csrf' not in session:
            session['csrf'] = sha1(os.urandom(64)).hexdigest()

        if self.time_limit:
            expires = (self.now() + self.time_limit).strftime(self.TIME_FORMAT)
            csrf_build = '%s%s' % (session['csrf'], expires)
        else:
            expires = ''
            csrf_build = session['csrf']

        hmac_csrf = hmac.new(meta.csrf_secret, csrf_build.encode('utf8'), digestmod=sha1)
        return '%s##%s' % (expires, hmac_csrf.hexdigest())
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf_token(self, form, field):
        meta = self.form_meta
        if not field.data or '##' not in field.data:
            raise ValidationError(field.gettext('CSRF token missing'))

        expires, hmac_csrf = field.data.split('##', 1)

        check_val = (self.session['csrf'] + expires).encode('utf8')

        hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1)
        if hmac_compare.hexdigest() != hmac_csrf:
            raise ValidationError(field.gettext('CSRF failed'))

        if self.time_limit:
            now_formatted = self.now().strftime(self.TIME_FORMAT)
            if now_formatted > expires:
                raise ValidationError(field.gettext('CSRF token expired'))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def derive_key(self):
        """This method is called to derive the key.  If you're unhappy with
        the default key derivation choices you can override them here.
        Keep in mind that the key derivation in itsdangerous is not intended
        to be used as a security method to make a complex key out of a short
        password.  Instead you should use large random secret keys.
        """
        salt = want_bytes(self.salt)
        if self.key_derivation == 'concat':
            return self.digest_method(salt + self.secret_key).digest()
        elif self.key_derivation == 'django-concat':
            return self.digest_method(salt + b'signer' +
                self.secret_key).digest()
        elif self.key_derivation == 'hmac':
            mac = hmac.new(self.secret_key, digestmod=self.digest_method)
            mac.update(salt)
            return mac.digest()
        elif self.key_derivation == 'none':
            return self.secret_key
        else:
            raise TypeError('Unknown key derivation method')