Python base58 模块,b58decode_check() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用base58.b58decode_check()

项目:lightning-integration    作者:cdecker    | 项目源码 | 文件源码
def encode_fallback(fallback, currency):
    """ Encode all supported fallback addresses.
    """
    if currency == 'bc' or currency == 'tb':
        fbhrp, witness = bech32_decode(fallback)
        if fbhrp:
            if fbhrp != currency:
                raise ValueError("Not a bech32 address for this currency")
            wver = witness[0]
            if wver > 16:
                raise ValueError("Invalid witness version {}".format(witness[0]))
            wprog = u5_to_bitarray(witness[1:])
        else:
            addr = base58.b58decode_check(fallback)
            if is_p2pkh(currency, addr[0]):
                wver = 17
            elif is_p2sh(currency, addr[0]):
                wver = 18
            else:
                raise ValueError("Unknown address type for {}".format(currency))
            wprog = addr[1:]
        return tagged('f', bitstring.pack("uint:5", wver) + wprog)
    else:
        raise NotImplementedError("Support for currency {} not implemented".format(currency))
项目:DnsE16    作者:pooleja    | 项目源码 | 文件源码
def validate_sig(body, sig_str, pkh):
    """
    Validate the signature on the body of the request - throws exception if not valid.
    """
    # Check permission to update
    if (pkh is None):
        raise PermissionError("pkh not found.")

    # Validate the pkh format
    base58.b58decode_check(pkh)
    if (len(pkh) < 20) or (len(pkh) > 40):
        raise PermissionError("Invalid pkh")

    try:
        if not sig_str:
            raise PermissionError("X-Bitcoin-Sig header not found.")
        if not wallet.verify_bitcoin_message(body, sig_str, pkh):
            raise PermissionError("X-Bitcoin-Sig header not valid.")
    except Exception as err:
        logger.error("Failure: {0}".format(err))
        raise PermissionError("X-Bitcoin-Sig header validation failed.")

    return True
项目:lightning-payencode    作者:rustyrussell    | 项目源码 | 文件源码
def encode_fallback(fallback, currency):
    """ Encode all supported fallback addresses.
    """
    if currency == 'bc' or currency == 'tb':
        fbhrp, witness = bech32_decode(fallback)
        if fbhrp:
            if fbhrp != currency:
                raise ValueError("Not a bech32 address for this currency")
            wver = witness[0]
            if wver > 16:
                raise ValueError("Invalid witness version {}".format(witness[0]))
            wprog = u5_to_bitarray(witness[1:])
        else:
            addr = base58.b58decode_check(fallback)
            if is_p2pkh(currency, addr[0]):
                wver = 17
            elif is_p2sh(currency, addr[0]):
                wver = 18
            else:
                raise ValueError("Unknown address type for {}".format(currency))
            wprog = addr[1:]
        return tagged('f', bitstring.pack("uint:5", wver) + wprog)
    else:
        raise NotImplementedError("Support for currency {} not implemented".format(currency))
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def from_b58check(private_key):
        """ Decodes a Base58Check encoded private-key.

        Args:
            private_key (str): A Base58Check encoded private key.

        Returns:
            PrivateKey: A PrivateKey object
        """
        b58dec = base58.b58decode_check(private_key)
        version = b58dec[0]
        assert version in [PrivateKey.TESTNET_VERSION,
                           PrivateKey.MAINNET_VERSION]

        return PrivateKey(int.from_bytes(b58dec[1:], 'big'))
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def from_b58check(key):
        """ Decodes a Base58Check encoded key.

        The encoding must conform to the description in:
        https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format

        Args:
            key (str): A Base58Check encoded key.

        Returns:
            HDPrivateKey or HDPublicKey:
                Either an HD private or
                public key object, depending on what was serialized.
        """
        return HDKey.from_bytes(base58.b58decode_check(key))
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def address_to_key_hash(s):
    """ Given a Bitcoin address decodes the version and
    RIPEMD-160 hash of the public key.

    Args:
        s (bytes): The Bitcoin address to decode

    Returns:
        (version, h160) (tuple): A tuple containing the version and
        RIPEMD-160 hash of the public key.
    """
    n = base58.b58decode_check(s)
    version = n[0]
    h160 = n[1:]
    return version, h160
项目:ethereum-bip44-python    作者:michailbrynard    | 项目源码 | 文件源码
def from_b58check(private_key):
        """ Decodes a Base58Check encoded private-key.

        Args:
            private_key (str): A Base58Check encoded private key.

        Returns:
            PrivateKey: A PrivateKey object
        """
        b58dec = base58.b58decode_check(private_key)
        version = b58dec[0]
        assert version in [PrivateKey.TESTNET_VERSION,
                           PrivateKey.MAINNET_VERSION]

        return PrivateKey(int.from_bytes(b58dec[1:], 'big'))
项目:ethereum-bip44-python    作者:michailbrynard    | 项目源码 | 文件源码
def from_b58check(key):
        """ Decodes a Base58Check encoded key.

        The encoding must conform to the description in:
        https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format

        Args:
            key (str): A Base58Check encoded key.

        Returns:
            HDPrivateKey or HDPublicKey:
                Either an HD private or
                public key object, depending on what was serialized.
        """
        return HDKey.from_bytes(base58.b58decode_check(key))
项目:bitcoin_in_a_nutshell    作者:pavlovdog    | 项目源码 | 文件源码
def coinbaseMessage(previous_output, receiver_address, my_address, private_key):
    receiver_hashed_pubkey= base58.b58decode_check(receiver_address)[1:].encode("hex")
    my_hashed_pubkey = base58.b58decode_check(my_address)[1:].encode("hex")

    # Transaction stuff
    version = struct.pack("<L", 1)
    lock_time = struct.pack("<L", 0)
    hash_code = struct.pack("<L", 1)

    # Transactions input
    tx_in_count = struct.pack("<B", 1)
    tx_in = {}
    tx_in["outpoint_hash"] = previous_output.decode('hex')[::-1]
    tx_in["outpoint_index"] = struct.pack("<L", 4294967295)
    tx_in["script"] = ("76a914%s88ac" % my_hashed_pubkey).decode("hex")
    tx_in["script_bytes"] = struct.pack("<B", (len(tx_in["script"])))
    tx_in["sequence"] = "ffffffff".decode("hex")

    # Transaction output
    tx_out_count = struct.pack("<B", 1)

    tx_out = {}
    tx_out["value"]= struct.pack("<Q", 100000000 * 12.50033629)
    tx_out["pk_script"]= ("76a914%s88ac" % receiver_hashed_pubkey).decode("hex")
    tx_out["pk_script_bytes"]= struct.pack("<B", (len(tx_out["pk_script"])))

    tx_to_sign = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] +
                  tx_in["script_bytes"] + tx_in["script"] + tx_in["sequence"] + tx_out_count +
                  tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time + hash_code)

    # Signing txn
    hashed_raw_tx = hashlib.sha256(hashlib.sha256(tx_to_sign).digest()).digest()
    sk = ecdsa.SigningKey.from_string(private_key.decode("hex"), curve = ecdsa.SECP256k1)
    vk = sk.verifying_key
    public_key = ('\04' + vk.to_string()).encode("hex")
    sign = sk.sign_digest(hashed_raw_tx, sigencode=ecdsa.util.sigencode_der)

    # Complete txn
    sigscript = sign + "\01" + struct.pack("<B", len(public_key.decode("hex"))) + public_key.decode("hex")

    real_tx = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] +
    struct.pack("<B", (len(sigscript) + 1)) + struct.pack("<B", len(sign) + 1) + sigscript +
    tx_in["sequence"] + tx_out_count + tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time)

    return real_tx
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def _flush(client, wallet, machine_auth, amount=None, payout_address=None, silent=False,
           to_primary=False):
    """ Flushes current off-chain buffer to the blockchain.

    Args:
        client (two1.server.rest_client.TwentyOneRestClient) an object for
            sending authenticated requests to the TwentyOne backend.
        wallet (two1.wallet.Wallet): a user's wallet instance.
        amount (int): The amount to be flushed. Should be more than 10k.
        payout_address (string): The address to flush the Bitcoins to.
        silent (boolean): If True, disables the confirmation prompt.
        to_primary (boolean): If True, flushes to the primary wallet.

    Raises:
        ServerRequestError: if server returns an error code other than 401.
    """

    # check the payout address
    if payout_address:
        try:
            base58.b58decode_check(payout_address)
        except ValueError:
            logger.error(uxstring.UxString.flush_invalid_address)
            return

    # select the wallet and the associated payout address for the flush
    all_wallets = client.list_wallets()
    wallet_payout_address, wallet_name = _select_flush_wallet(client, machine_auth, payout_address,
                                                              all_wallets, to_primary)

    # ask user for confirmation
    if not silent:
        # if the user has not specified a payout_address then the buffer will be flushed to the
        # primary wallet.
        is_local = payout_address is None
        should_continue = _show_confirmation(machine_auth, amount, all_wallets,
                                             wallet_payout_address, wallet_name, to_primary,
                                             is_local)
        if not should_continue:
            return

    # perform flush
    try:
        response = client.flush_earnings(amount=amount, payout_address=wallet_payout_address)
        if response.ok:
            success_msg = uxstring.UxString.flush_success.format(wallet_payout_address)
            logger.info(success_msg)
    except exceptions.ServerRequestError as ex:
        if ex.status_code == 401:
            logger.info(ex.message)
        elif ex.status_code == 400 and ex.data.get("error") == "TO500":
            logger.info(uxstring.UxString.flush_not_enough_earnings.format(amount), fg="red")
        elif ex.status_code == 403 and ex.data.get("error") == "social_account_required_to_flush":
            logger.info("You must connect a social account with your 21.co account before you can flush.", fg="red")
        else:
            raise ex
项目:neo-python    作者:CityOfZion    | 项目源码 | 文件源码
def PrivateKeyFromNEP2(nep2_key, passphrase):
        """
        Gets the private key from a NEP-2 encrypted private key

        Args:
            nep2_key (str): The nep-2 encrypted private key
            passphrase (str): The password to encrypt the private key with, as unicode string

        Returns:
            bytes: The private key
        """
        if not nep2_key or len(nep2_key) != 58:
            raise ValueError('Please provide a nep2_key with a length of 58 bytes (LEN: {0:d})'.format(len(nep2_key)))

        ADDRESS_HASH_SIZE = 4
        ADDRESS_HASH_OFFSET = len(NEP_FLAG) + len(NEP_HEADER)

        try:
            decoded_key = base58.b58decode_check(nep2_key)
        except Exception as e:
            raise ValueError("Invalid nep2_key")

        address_hash = decoded_key[ADDRESS_HASH_OFFSET:ADDRESS_HASH_OFFSET + ADDRESS_HASH_SIZE]
        encrypted = decoded_key[-32:]

        pwd_normalized = bytes(unicodedata.normalize('NFC', passphrase), 'utf-8')
        derived = scrypt.hash(pwd_normalized, address_hash,
                              N=SCRYPT_ITERATIONS,
                              r=SCRYPT_BLOCKSIZE,
                              p=SCRYPT_PARALLEL_FACTOR,
                              buflen=SCRYPT_KEY_LEN_BYTES)

        derived1 = derived[:32]
        derived2 = derived[32:]

        cipher = AES.new(derived2, AES.MODE_ECB)
        decrypted = cipher.decrypt(encrypted)
        private_key = xor_bytes(decrypted, derived1)

        # Now check that the address hashes match. If they don't, the password was wrong.
        kp_new = KeyPair(priv_key=private_key)
        kp_new_address = kp_new.GetAddress()
        kp_new_address_hash_tmp = hashlib.sha256(kp_new_address.encode('utf-8')).digest()
        kp_new_address_hash_tmp2 = hashlib.sha256(kp_new_address_hash_tmp).digest()
        kp_new_address_hash = kp_new_address_hash_tmp2[:4]
        if (kp_new_address_hash != address_hash):
            raise ValueError("Wrong passphrase")

        return private_key