Python hmac 模块,compare_digest() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hmac.compare_digest()

项目:repository-gardener    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def check_signature(header_signature, request_body):
    if not header_signature:
        raise ValueError('No X-Hub-Signature header.')

    algorithm, signature_digest = header_signature.split('=')

    if algorithm != 'sha1':
        raise ValueError('Unsupported digest algorithm {}.'.format(algorithm))

    body_digest = hmac.new(
        webhook_secret(), msg=request_body, digestmod=hashlib.sha1).hexdigest()

    if not hmac.compare_digest(body_digest, signature_digest):
        raise ValueError('Body digest did not match signature digest')

    return True
项目:livre-python    作者:HE-Arc    | 项目源码 | 文件源码
def check_mdp(hashed_mdp, utilisateur_mdp, hashage):
    """Check le mot-de-passe haché.

    Ceci en récupérant le mot de passe haché
    et le salt du mdp haché passée en argument.
    L'objet hachage est construit en fonction du type.

    Args:
        Le mot-de-passe haché,le mot-de-passe
        normale ainsi que le type de hachage.

    Returns:
        Retourne true ou false en fonction
        du test d'égalité.

    """
    mdp, salt = hashed_mdp.split(':')
    contenu = salt + utilisateur_mdp
    h = hashlib.new(hashage)
    h.update(contenu.encode('utf-8'))
    return compare_digest(mdp, h.hexdigest())
项目:Facebook_PyBot    作者:hundredeir    | 项目源码 | 文件源码
def _verify(callback, signature, app_secret_key):
    """
    This function will verify the integrity and authenticity of the callback received

    :param callback: callback received from facebook
    :param signature: X-hub-signature of the request
    :param app_secret_key: facebook app secret key. You can find it on your app page
    :return: True if signature matches else returns false

    """

    method, sign = signature.split("=")
    """
    Now a key will be created of callback using app secret as the key.
    And compared with xsignature found in the the headers of the request.
    If both the keys match then the function will run further otherwise it will halt
    """
    hmac_object = hmac.new(app_secret_key.encode("utf-8"), callback, "sha1")
    key = hmac_object.hexdigest()
    return hmac.compare_digest(sign, key)
项目:arch-security-tracker    作者:archlinux    | 项目源码 | 文件源码
def validate(self):
        self.user = None
        rv = BaseForm.validate(self)
        if not rv:
            return False

        def fail():
            self.password.errors.append(ERROR_INVALID_USERNAME_PASSWORD)
            return False

        user = User.query.filter(User.name == self.username.data).first()
        if not user:
            compare_digest(dummy_password, hash_password(self.password.data, 'the cake is a lie!'))
            return fail()
        if not compare_digest(user.password, hash_password(self.password.data, user.salt)):
            return fail()
        if not user.active:
            self.username.errors.append(ERROR_ACCOUNT_DISABLED)
            return False
        self.user = user
        return True
项目:arch-security-tracker    作者:archlinux    | 项目源码 | 文件源码
def validate(self):
        rv = BaseForm.validate(self)
        if not rv:
            return False

        if current_user.name in self.password.data:
            self.password.errors.append(ERROR_PASSWORD_CONTAINS_USERNAME)
            return False

        if self.password.data != self.password_repeat.data:
            self.password_repeat.errors.append(ERROR_PASSWORD_REPEAT_MISMATCHES)
            return False

        if not compare_digest(current_user.password, hash_password(self.password_current.data, current_user.salt)):
            self.password_current.errors.append(ERROR_PASSWORD_INCORRECT)
            return False

        return True
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def verify(cls, token, timestamp, signature):
        """
        Verify signature of event for security

        Args:
            token (str): Randomly generated alpha numeric string with length 50.
            timestamp (int): Number of seconds passed since January 1, 1970
            signature (str): String with hexadecimal digits generate by HMAC algorithm.

        Returns:
            boolean: True if signature is valid
        """
        if timestamp is not None and signature is not None and token is not None:
            key_bytes = bytes(settings.MAILGUN_KEY, 'latin-1')
            data_bytes = bytes('{}{}'.format(timestamp, token), 'latin-1')

            hmac_digest = hmac.new(
                key=key_bytes,
                msg=data_bytes,
                digestmod=hashlib.sha256
            ).hexdigest()
            return hmac.compare_digest(signature, hmac_digest)

        return False
项目:pep8speaks    作者:OrkoHunter    | 项目源码 | 文件源码
def match_webhook_secret(request):
    """Match the webhook secret sent from GitHub"""
    if os.environ.get("OVER_HEROKU", False):
        if ('X-Hub-Signature' in request.headers and
           request.headers.get('X-Hub-Signature') is not None):
            header_signature = request.headers.get('X-Hub-Signature', None)
        else:
            abort(403)
        sha_name, signature = header_signature.split('=')
        if sha_name != 'sha1':
            abort(501)

        mac = hmac.new(os.environ["GITHUB_PAYLOAD_SECRET"].encode(),
                       msg=request.data,
                       digestmod="sha1")

        if not hmac.compare_digest(str(mac.hexdigest()), str(signature)):
            abort(403)
    return True
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def is_a_valid_mailgun_post(request):
    """
    Taken from
        http://mailgun-documentation.readthedocs.io/en/latest/
            user_manual.html#webhooks
    :param request: Request object
    :return: True or False if the request was signed by mailgun
    """
    token = request.POST['token']
    timestamp = request.POST['timestamp']
    signature = request.POST['signature']
    key = getattr(settings, 'MAILGUN_PRIVATE_API_KEY', '').encode('utf-8')
    msg = ('{}{}'.format(timestamp, token)).encode('utf-8')
    hmac_digest = hmac.new(key=key, msg=msg, digestmod=hashlib.sha256
                           ).hexdigest()
    return hmac.compare_digest(signature, hmac_digest)
项目:omnic    作者:michaelpb    | 项目源码 | 文件源码
def check(self, typestring, querydata):
        if 'url' not in querydata:
            raise InvalidQueryDataException('URL missing')
        if 'digest' not in querydata:
            raise InvalidQueryDataException('digest missing')

        url_string = querydata['url'][0]
        supplied_digest = querydata['digest'][0]
        correct_digest = get_hmac_sha1_digest(
            singletons.settings.HMAC_SECRET,
            url_string,
            typestring,
        )

        if not hmac.compare_digest(supplied_digest.lower(), correct_digest):
            raise InvalidDigestException(
                'Incorrect digest supplied: ' + correct_digest)
项目:great-warrior-bot    作者:candychang    | 项目源码 | 文件源码
def validate_signature(request_body, signature, secret):
    """
    Validate LINE signature on HTTP requests

    Args:
    request_body: Body of HTTP request
    signature: Request header 'X-LINE-ChannelSignature'
    secret: Secret used to create digest from request_body

    Returns:
    True if valid, False otherwise
    """
    if (request_body and signature and secret):
        secret = secret.encode("utf-8")
        sig = signature.encode("utf-8")

        generated_sig = base64.b64encode(hmac.new(secret, request_body, digestmod=hashlib.sha256).digest())

        return hmac.compare_digest(generated_sig, sig)

    else:
        return False
项目:hitman-old    作者:tmacro    | 项目源码 | 文件源码
def check_token(secret, key, token):
    try:
        data = json.loads(dec64(token))
        _log.debug(data)
        if 'sig' in data and 'exp' in data:
            gend , exp = gen_signature(secret, key, exp = data['exp'])
            if hmac.compare_digest(gend, data['sig']):
                if is_expired(data['exp']):
                    _log.debug('Token %s for %s is expired'%(token, h125(secret)))
                    return False
                return True
            else:
                _log.debug('Token %s is not valid'%json.loads(dec64(token)))
                _log.debug('%s - %s'%(gend, data['sig']))
        else:
            _log.debug('Not enough info in token')
    except:
        _log.debug('Failed to decode token %s'%token)
        pass
    return False
项目:hitman-old    作者:tmacro    | 项目源码 | 文件源码
def check_token(secret, key, token, extra = None):
    try:
        data = json.loads(dec64(token))
        _log.debug(data)
        if 'sig' in data and 'exp' in data:
            gend , exp = gen_signature(secret, key, extra, exp = data['exp'])
            if hmac.compare_digest(gend, data['sig']):
                if is_expired(data['exp']):
                    _log.debug('Token %s for %s is expired'%(token, h512(secret)))
                    return False
                return True
            else:
                _log.debug('Token %s is not valid'%json.loads(dec64(token)))
                _log.debug('%s - %s'%(gend, data['sig']))
        else:
            _log.debug('Not enough info in token')
    except Exception as e:
        _log.debug('Failed to decode token %s'%token)
        _log.exception(e)
    return False
项目:line-bot-sdk-python    作者:line    | 项目源码 | 文件源码
def validate(self, body, signature):
        """Check signature.

        https://devdocs.line.me/en/#webhook-authentication

        :param str body: Request body (as text)
        :param str signature: X-Line-Signature value (as text)
        :rtype: bool
        :return: result
        """
        gen_signature = hmac.new(
            self.channel_secret,
            body.encode('utf-8'),
            hashlib.sha256
        ).digest()

        return compare_digest(
                signature.encode('utf-8'), base64.b64encode(gen_signature)
        )
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def constant_time_string_compare(a, b):
    """Helper for comparing string in constant time, independent
    of the python version being used.

    Args:
        a (str): A string to compare
        b (str): A string to compare
    """

    try:
        return hmac.compare_digest(a, b)
    except AttributeError:

        if len(a) != len(b):
            return False

        result = 0

        for x, y in zip(a, b):
            result |= ord(x) ^ ord(y)

        return result == 0
项目:crypto    作者:erose1337    | 项目源码 | 文件源码
def verify_signature(second_ciphertext, alices_private_key, random_value1, decrypt, random_value_size=32):
    """ usage: verify_signature(response_ciphertext : bytearray, 
                                alices_private_key : bytearray,
                                random_value1 : bytearray,
                                decrypt : function, 
                                random_value_size=32) => shared secret

        Verifies a signature request. Returns a shared secret. """                
    plaintext = decrypt(second_ciphertext, alices_private_key)
    random_value2 = plaintext[-random_value_size:]
    _random_value1 = plaintext[-(2 * random_value_size):-random_value_size]
    bobs_identifier = plaintext[:-(2 * random_value_size)]
    secret = _derive_shared_secret(random_value1, random_value2)
    if hmac.compare_digest(_random_value1, random_value1):        
        output = secret       
    else:
        output = None
    return output
项目:fireq    作者:superdesk    | 项目源码 | 文件源码
def hook(request):
    body = await request.read()
    check_signature = hmac.compare_digest(
        get_signature(body),
        request.headers.get('X-Hub-Signature', '')
    )
    if not check_signature:
        return web.HTTPBadRequest()

    body = await request.json()
    headers = dict(request.headers.items())
    del headers['X-Hub-Signature']
    ref = get_hook_ctx(headers, body, clean=True)
    if ref:
        request.app.loop.create_task(ci(ref))
    return web.json_response(ref)
项目:morpheus    作者:tutorcruncher    | 项目源码 | 文件源码
def call(self, request):
        try:
            event_data = (await request.post())['mandrill_events']
        except KeyError:
            raise HTTPBadRequest(text='"mandrill_events" not found in post data')

        sig_generated = base64.b64encode(
            hmac.new(
                self.app['webhook_auth_key'],
                msg=(self.app['mandrill_webhook_url'] + 'mandrill_events' + event_data).encode(),
                digestmod=hashlib.sha1
            ).digest()
        )
        sig_given = request.headers.get('X-Mandrill-Signature', '<missing>').encode()
        if not hmac.compare_digest(sig_generated, sig_given):
            raise HTTPForbidden(text='invalid signature')
        try:
            events = ujson.loads(event_data)
        except ValueError as e:
            raise HTTPBadRequest(text=f'invalid json data: {e}')

        await self.sender.update_mandrill_webhooks(events)
        return Response(text='message status updated\n')
项目:django-mfa    作者:MicroPyramid    | 项目源码 | 文件源码
def strings_equal(s1, s2):
    """
    Timing-attack resistant string comparison.

    Normal comparison using == will short-circuit on the first mismatching
    character. This avoids that by scanning the whole string, though we
    still reveal to a timing attack whether the strings are the same
    length.
    """
    try:
        s1 = unicodedata.normalize('NFKC', str(s1))
        s2 = unicodedata.normalize('NFKC', str(s2))
    except:
        s1 = unicodedata.normalize('NFKC', unicode(s1))
        s2 = unicodedata.normalize('NFKC', unicode(s2))
    return compare_digest(s1, s2)
项目:coretools    作者:iotile    | 项目源码 | 文件源码
def test_hash_only():
    """Verify that hash signatures are calculated correctly."""

    data = gen_test_data(100)

    auth = BasicAuthProvider()
    sig = auth.sign_report(0, BasicAuthProvider.NoKey, data)

    digest = sig['signature']

    calced = hashlib.sha256(data).digest()

    assert hmac.compare_digest(calced, digest)

    full = auth.verify_report(0, BasicAuthProvider.NoKey, data, calced)
    assert full['verified']
    assert full['bit_length'] == 256

    short = auth.verify_report(0, BasicAuthProvider.NoKey, data, calced[:16])
    assert short['verified']
    assert short['bit_length'] == 128

    none = auth.verify_report(0, BasicAuthProvider.NoKey, data, bytearray())
    assert not none['verified']
项目:appengine_multiblog    作者:tstillwell    | 项目源码 | 文件源码
def valid_user(cookie_str):
    """ Returns username after validating the cookie session data """
    if cookie_str is None:
        return None
    cookie_parts = cookie_str.split("|")
    if len(cookie_parts) != 2:
        return None
    if hmac.compare_digest(str(cookie_parts[1]),
                           str(cookie_hash(cookie_parts[0]))):
        user_query = ndb.gql("""SELECT * FROM User
                             WHERE current_session ='%s'"""
                             % cookie_parts[0])
        current_user = user_query.get()
        if (current_user and
                datetime.datetime.now() < current_user.session_expires):
            current_user.session_expires = (datetime.datetime.now() +
                                            datetime.timedelta(hours=1))
            current_user.put()
            return current_user.username
    else:
        return None
项目:pastamaker    作者:sileht    | 项目源码 | 文件源码
def authentification():
    # Only SHA1 is supported
    header_signature = flask.request.headers.get('X-Hub-Signature')
    if header_signature is None:
        LOG.warning("Webhook without signature")
        flask.abort(403)

    try:
        sha_name, signature = header_signature.split('=')
    except ValueError:
        sha_name = None

    if sha_name != 'sha1':
        LOG.warning("Webhook signature malformed")
        flask.abort(403)

    mac = utils.compute_hmac(flask.request.data)
    if not hmac.compare_digest(mac, str(signature)):
        LOG.warning("Webhook signature invalid")
        flask.abort(403)
项目:imouto    作者:Hanaasagi    | 项目源码 | 文件源码
def verify_secure_value(name: str, value: Union[str, bytes], *,
                        secret: str) -> Union[str, None]:
    """verify the signature
    if correct return the value after base64 decode
    else return None
    """
    parts = tob(value).split(b"|")
    if len(parts) != 3:
        return None

    signature = _generate_signature(secret, name, parts[0], parts[1])
    # print(signature, parts[2])
    if not hmac.compare_digest(parts[2], signature):
        return None
    else:
        return touni(base64.b64decode(parts[0]))
项目:github-pr-closer    作者:jirutka    | 项目源码 | 文件源码
def verify_signature(secret: str, signature: str, resp_body: BytesIO) -> None:
    """Verify HMAC-SHA1 signature of the given response body.

    The signature is expected to be in format ``sha1=<hex-digest>``.
    """
    try:
        alg, digest = signature.lower().split('=', 1)
    except (ValueError, AttributeError):
        raise InvalidSignatureError('signature is malformed')

    if alg != 'sha1':
        raise InvalidSignatureError("expected type sha1, but got %s" % alg)

    computed_digest = hmac.new(secret.encode('utf-8'),  # type: ignore
                               msg=resp_body.getbuffer(),
                               digestmod=hashlib.sha1).hexdigest()

    if not hmac.compare_digest(computed_digest, digest):
        raise InvalidSignatureError('digests do not match')
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def compare_digest(a, b, _xor_bytes=_xor_bytes):
        left = None
        right = b
        if len(a) == len(b):
            left = a
            result = 0
        if len(a) != len(b):
            left = b
            result = 1

        for x, y in zip(left, right):
            result |= _xor_bytes(x, y)
        return result == 0
项目:MetaCI    作者:SalesforceFoundation    | 项目源码 | 文件源码
def validate_github_webhook(request):
    key = settings.GITHUB_WEBHOOK_SECRET
    signature = request.META.get('HTTP_X_HUB_SIGNATURE').split('=')[1]
    if type(key) == unicode:
        key = key.encode()
    mac = hmac.new(key, msg=request.body, digestmod=sha1)
    if not hmac.compare_digest(mac.hexdigest(), signature):
        return False
    return True
项目:aiowing    作者:embali    | 项目源码 | 文件源码
def check_password(self, password=None):
        """Check password"""

        return hmac.compare_digest(self.phash,
                                   crypt.crypt(password, self.phash))
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def auth(secret, headers, raw_payload):
        received_sig = to_bytes(headers['X-Hub-Signature'])

        h = hmac.new(secret, raw_payload, hashlib.sha1).hexdigest()
        correct_sig = b'sha1=' + to_bytes(h)

        if not hmac.compare_digest(received_sig, correct_sig):
            raise WebhookAuthError()
项目:PTE    作者:pwn2winctf    | 项目源码 | 文件源码
def auth(secret, headers, raw_payload):
        received_token = to_bytes(headers['X-Gitlab-Token'])
        if not hmac.compare_digest(secret, received_token):
            raise WebhookAuthError()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bytes_eq(a, b):
        if not isinstance(a, bytes) or not isinstance(b, bytes):
            raise TypeError("a and b must be bytes.")

        return hmac.compare_digest(a, b)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def bytes_eq(a, b):
        if not isinstance(a, bytes) or not isinstance(b, bytes):
            raise TypeError("a and b must be bytes.")

        return hmac.compare_digest(a, b)
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def verify(digest, digest_method, key, message):
    """Verify HMAC digest."""
    return hmac.compare_digest(
        want_bytes(sign(digest_method, key, message)),
        want_bytes(digest))
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def constant_time_compare(val1, val2):
        return hmac.compare_digest(force_bytes(val1), force_bytes(val2))
项目:xmpp-cloud-auth    作者:jsxc    | 项目源码 | 文件源码
def auth_token(self):
        try:
            token = b64decode(self.password.translate(usersafe_encoding) + '=======')
        except:
            logging.debug('Could not decode token (maybe not a token?)')
            return False

        jid = self.username + '@' + self.domain

        if len(token) != 23:
            logging.debug('Token is too short: %d != 23 (maybe not a token?)' % len(token))
            return False

        (version, mac, header) = unpack('> B 16s 6s', token)
        if version != 0:
            logging.debug('Wrong token version (maybe not a token?)')
            return False;

        (secretID, expiry) = unpack('> H I', header)
        if expiry < self.now:
            logging.debug('Token has expired')
            return False

        challenge = pack('> B 6s %ds' % len(jid), version, header, jid)
        response = hmac.new(self.secret, challenge, hashlib.sha256).digest()

        return hmac.compare_digest(mac, response[:16])
项目:coralillo    作者:getfleety    | 项目源码 | 文件源码
def constant_time_compare(val1, val2):
    """Return True if the two strings are equal, False otherwise."""
    return hmac.compare_digest(force_bytes(val1), force_bytes(val2))
项目:private_url_shortener    作者:mya-systems    | 项目源码 | 文件源码
def get_redirect_url(self, pk_encoded, sig):

        pk = base(pk_encoded, 62, 10, string=True)
        short_url = None
        result = None

        try:
            short_url = ShortenedURL.objects.get(pk=pk)
        except ShortenedURL.DoesNotExist:
            pass

        valid_request = short_url and hmac.compare_digest(short_url.sig, sig)

        if valid_request and not short_url.is_expired:
            result = short_url.url
        elif valid_request and short_url.is_expired:
            result = settings.PRIVATE_SHORTENER_REDIRECT_EXPIRED_URL
            self.log_expired_url(short_url)
        elif not valid_request and short_url:
            result = settings.PRIVATE_SHORTENER_REDIRECT_INVALID_URL
            self.log_invalid_sig(short_url, sig)
        else:
            result = settings.PRIVATE_SHORTENER_REDIRECT_INVALID_URL
            self.log_invalid_pk(pk_encoded)

        self.log_result(result)

        if not result:
            raise Http404

        return result
项目:private_url_shortener    作者:mya-systems    | 项目源码 | 文件源码
def has_permission(self, request, view):
        return bool(
            settings.PRIVATE_SHORTENER_API_SECRET_KEY is None or
            hmac.compare_digest(
                settings.PRIVATE_SHORTENER_API_SECRET_KEY,
                request.META.get('HTTP_AUTHORIZATION', ''),
            ),
        )
项目:aws-lambda-github-webhook    作者:chuyskywalker    | 项目源码 | 文件源码
def validate_secret(header_signature, msg):
    with open(os.path.join(os.path.dirname(__file__), "..", "ghsecret.txt")) as f:
        secret = f.read().strip()

    if header_signature is None:
        logger.error("Header signature missing")
        return False

    sha_name, signature = header_signature.split('=')
    if sha_name != 'sha1':
        logger.error("Header signature not signed with sha1")
        return False

    # HMAC requires the key to be bytes, but data is string
    mac = hmac.new(str(secret), msg=msg, digestmod=hashlib.sha1)

    # Get ours vs. theirs
    expected = str(mac.hexdigest())
    received = str(signature)

    # Timing attack secure comparison
    matches = hmac.compare_digest(expected, received)

    if not matches:
        logger.error("Header signature ({}) does not match expected ({})".format(received, expected))

    return matches
项目:django-github-webhook    作者:fladi    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        secret = self.get_secret()
        if not secret:
            raise ImproperlyConfigured('GitHub webhook secret ist not defined.')
        if 'HTTP_X_HUB_SIGNATURE' not in request.META:
            return HttpResponseBadRequest('Request does not contain X-GITHUB-SIGNATURE header')
        if 'HTTP_X_GITHUB_EVENT' not in request.META:
            return HttpResponseBadRequest('Request does not contain X-GITHUB-EVENT header')
        digest_name, signature = request.META['HTTP_X_HUB_SIGNATURE'].split('=')
        if digest_name != 'sha1':
            return HttpResponseBadRequest('Unsupported X-HUB-SIGNATURE digest mode found: {}'.format(digest_name))
        mac = hmac.new(
            secret.encode('utf-8'),
            msg=request.body,
            digestmod=hashlib.sha1
        )
        if not hmac.compare_digest(mac.hexdigest(), signature):
            return HttpResponseBadRequest('Invalid X-HUB-SIGNATURE header found')
        event = request.META['HTTP_X_GITHUB_EVENT']
        if event not in self.get_allowed_events():
            return HttpResponseBadRequest('Unsupported X-GITHUB-EVENT header found: {}'.format(event))
        handler = getattr(self, event, None)
        if not handler:
            return HttpResponseBadRequest('Unsupported X-GITHUB-EVENT header found: {}'.format(event))
        payload = json.loads(request.body.decode('utf-8'))
        response = handler(payload, request, *args, **kwargs)
        return JsonResponse(response)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def compare(a, b):
    """ Compares two strings and not vulnerable to timing attacks """
    if HAVE_COMPARE_DIGEST:
        return hmac.compare_digest(a, b)
    result = len(a) ^ len(b)
    for i in xrange(len(b)):
        result |= ord(a[i % len(a)]) ^ ord(b[i])
    return result == 0
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def webhook_verify_signature(self, data, signature):
        """Verify the content with signature

        :param data: Request data to be verified
        :param signature: The signature of data
        :type signature: str
        :return: If the content is verified
        :rtype: bool
        """
        h = hmac.new(
            self.webhooks_secret.encode('utf-8'),
            data,
            hashlib.sha1
        )
        return hmac.compare_digest(h.hexdigest(), signature)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def bytes_eq(a, b):
        if not isinstance(a, bytes) or not isinstance(b, bytes):
            raise TypeError("a and b must be bytes.")

        return hmac.compare_digest(a, b)
项目:pyhomekit    作者:henridwyer    | 项目源码 | 文件源码
def m4_receive_srp_verify_response(self,
                                       parsed_ktlvs: Dict[str, bytes]) -> None:
        """Verify accessory's proof."""
        if from_bytes(parsed_ktlvs['kTLVType_State'], False) != 4:
            raise ValueError(
                "Received wrong message for M4 {}".format(parsed_ktlvs))
        self.M2 = from_bytes(parsed_ktlvs['kTLVType_Proof'])

        M2_calc = H(self.A, self.M1, self.K)
        if not compare_digest(
                to_bytes(M2_calc), parsed_ktlvs['kTLVType_Proof']):
            raise ValueError("Authentication failed - invalid prood received.")
项目:windseed    作者:embali    | 项目源码 | 文件源码
def check_password(self, password=None):
        """
        Check password
        """
        return hmac.compare_digest(self.phash,
                                   crypt.crypt(password, self.phash))
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def compare_digest(s1, s2):
        return s1 == s2

# websocket supported version.
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _validate(headers, key, subprotocols):
    subproto = None
    for k, v in _HEADERS_TO_CHECK.items():
        r = headers.get(k, None)
        if not r:
            return False, None
        r = r.lower()
        if v != r:
            return False, None

    if subprotocols:
        subproto = headers.get("sec-websocket-protocol", None).lower()
        if not subproto or subproto not in [s.lower() for s in subprotocols]:
            error("Invalid subprotocol: " + str(subprotocols))
            return False, None

    result = headers.get("sec-websocket-accept", None)
    if not result:
        return False, None
    result = result.lower()

    if isinstance(result, six.text_type):
        result = result.encode('utf-8')

    value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
    hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
    success = compare_digest(hashed, result)

    if success:
        return True, subproto
    else:
        return False, None
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def compare_digest(s1, s2):
        return s1 == s2

# websocket supported version.
项目:Treehacks    作者:andrewsy97    | 项目源码 | 文件源码
def _validate(headers, key, subprotocols):
    subproto = None
    for k, v in _HEADERS_TO_CHECK.items():
        r = headers.get(k, None)
        if not r:
            return False, None
        r = r.lower()
        if v != r:
            return False, None

    if subprotocols:
        subproto = headers.get("sec-websocket-protocol", None).lower()
        if not subproto or subproto not in [s.lower() for s in subprotocols]:
            error("Invalid subprotocol: " + str(subprotocols))
            return False, None

    result = headers.get("sec-websocket-accept", None)
    if not result:
        return False, None
    result = result.lower()

    if isinstance(result, six.text_type):
        result = result.encode('utf-8')

    value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
    hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
    success = compare_digest(hashed, result)

    if success:
        return True, subproto
    else:
        return False, None
项目:sample-platform    作者:CCExtractor    | 项目源码 | 文件源码
def compare_digest(a, b):
        """
        ** From Django source **
        Run a constant time comparison against two strings
        Returns true if a and b are equal.
        a and b must both be the same length, or False is
        returned immediately
        """
        if len(a) != len(b):
            return False

        result = 0
        for ch_a, ch_b in zip(a, b):
            result |= ord(ch_a) ^ ord(ch_b)
        return result == 0
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def bytes_eq(a, b):
        if not isinstance(a, bytes) or not isinstance(b, bytes):
            raise TypeError("a and b must be bytes.")

        return hmac.compare_digest(a, b)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def bytes_eq(a, b):
        if not isinstance(a, bytes) or not isinstance(b, bytes):
            raise TypeError("a and b must be bytes.")

        return hmac.compare_digest(a, b)