Python bitstring 模块,BitArray() 实例源码


项目:sturdy-palm-tree    作者:dominicgs    | 项目源码 | 文件源码
def rx_stream(self, count=None, secs=None):
        start = time.time()
        while count is None or count > 0:
            buffer =, 64)
            if count is not None:
                count -= 1
            if secs is not None:
                if time.time() >= start+secs:
            pkt = BitArray(bytes=buffer)
            metadata = pkt.unpack('uint:8, uint:8, uint:8, uint:8, uint:32,'
                                  'int:8, int:8, int:8, uint:8')
            metanames = ('pkt_type', 'status', 'channel', 'clkn_high',
                         'clk100ns', 'rssi_max', 'rssi_min', 'rss_avg',
            yield dict(zip(metanames, metadata)), pkt[112:]
项目:python-steamcommunity    作者:DrBomb    | 项目源码 | 文件源码
def generateAuthCode(secret,offset=0):
    secret = b64decode(secret)
    secret = BitArray(bytes=secret,length=len(secret)*8)
    buff = BitArray(8*8)
    timestamp = timeOffset(offset)
    buff[4*8:] = int(timestamp/30)
    auth_hmac =,buff.tobytes(),hashlib.sha1)
    hashed = auth_hmac.digest()
    hashed = BitArray(bytes=hashed,length=len(hashed)*8)
    start = hashed[(19*8):(19*8)+8] & BitArray('0x0f')
    hash_slice = hashed[*8:(*8)+(4*8)]
    fullcode = hash_slice & BitArray("0x7fffffff")
    fullcode =
    chars = '23456789BCDFGHJKMNPQRTVWXY'
    code = ""
    for x in range(5):
        code += chars[int(fullcode % len(chars))]
        fullcode = fullcode/len(chars)
    return code
项目:lsdj-wave-cruncher    作者:iLambda    | 项目源码 | 文件源码
def string(length):
    def make_string_field(parent, **field_options):
        length_in_bits = length * 8

        def encode_string(value):
            if type(value) != bytes:
                value = value.encode('utf-8')

            return BitArray(bytes=value)

        def decode_string(encoded):
            return encoded.bytes

        return BreadField(length_in_bits, encode_string, decode_string,
                          str_format=field_options.get('str_format', None))

    return make_string_field
项目:lsdj-wave-cruncher    作者:iLambda    | 项目源码 | 文件源码
def new(spec, type_name='bread_struct', data=None):
    struct = build_struct(spec, type_name)

    if data is None:
        data = BitArray(bytearray(int(math.ceil(len(struct) / 8.0))))

    if len(struct) > len(data):
        raise ValueError(
            ("Data being parsed isn't long enough; expected at least %d "
             "bits, but data is only %d bits long") %
            (len(struct), len(data)))

    struct._offset = 0

    return struct
项目:dmr_utils    作者:n0mjs710    | 项目源码 | 文件源码
def __encode_voice_header( self, _rx_slot, _sync, _dtype ):
        _src_id = _rx_slot.rf_src
        _dst_id = _rx_slot.dst_id
        _cc =
        # create lc
        lc = '\x00\x00\x00' + _dst_id + _src_id         # PF + Reserved + FLCO + FID + Service Options + Group Address + Source Address
        # encode lc into info
        full_lc_encode = bptc.encode_header_lc(lc)
        _rx_slot.emblc = bptc.encode_emblc(lc)          # save off the emb lc for voice frames B-E
        _rx_slot.emblc[5] = bitarray(32)                # NULL message (F)
        # create slot_type
        slot_type = chr((_cc << 4) | (_dtype & 0x0f))   # data type is Header or Term
        # generate FEC for slot type
        slot_with_fec  = BitArray(uint=golay.encode_2087(slot_type), length=20)
        # construct final frame - info[0:98] + slot_type[0:10] + DMR_DATA_SYNC_MS + slot_type[10:20] + info[98:196]
        frame_bits = full_lc_encode[0:98] + slot_with_fec[0:10] + decode.to_bits(_sync) + slot_with_fec[10:20] + full_lc_encode[98:196]
        return decode.to_bytes(frame_bits)

    # Create a voice header DMR frame
项目:dmr_utils    作者:n0mjs710    | 项目源码 | 文件源码
def send_voice72(self, _rx_slot, _ambe):
        ambe72_1 = BitArray('0x' + ahex(_ambe[0:9]))[0:72]
        ambe72_2 = BitArray('0x' + ahex(_ambe[9:18]))[0:72]
        ambe72_3 = BitArray('0x' + ahex(_ambe[18:27]))[0:72]

        ambe49_1 = ambe_utils.convert72BitTo49BitAMBE(ambe72_1)
        ambe49_2 = ambe_utils.convert72BitTo49BitAMBE(ambe72_2)
        ambe49_3 = ambe_utils.convert72BitTo49BitAMBE(ambe72_3)


        ambe = ambe49_1 + ambe49_2 + ambe49_3
        _frame = self._tempVoice[_rx_slot.vf][:33] + ambe.tobytes() + self._tempVoice[_rx_slot.vf][52:]    # Insert the 3 49 bit AMBE frames
        self.rewriteFrame(_frame, _rx_slot.slot, _rx_slot.dst_id, _rx_slot.rf_src, _rx_slot.repeater_id)
        _rx_slot.vf = (_rx_slot.vf + 1) % 6                         # the voice frame counter which is always mod 6
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = round(highDiffDec/bitsDec,2)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeBlockHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = round(highDiffDec/bitsDec,2)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getBigEndian(hex_val):

    step0 = "0x" + hex_val
    step1 = BitArray(step0)

    step2 = step1.hex

    step3 = step2[::-1]

    step4 = len(step2)
    for b in range(0,step4,2):
        revString = step3[b:(b+2)]
        string += revString[::-1]

    return string
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def getDifficult(bits):
    step0 = getBigEndian(bits)    
    step1 = "0x"+step0
    step2 = BitArray(step1)
    step3 = step2.hex

    first_pair ="0x" + step3[:2]
    firstPair = BitArray(first_pair)

    second_pair ="0x" + step3[2:8]
    secondPair = BitArray(second_pair)

    firstPairVal =
    secondPairVal =

    formula = 2**(8*(firstPairVal -3))
    bitsDec = secondPairVal * formula

    highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
    highDiffDec =

    answer = float(highDiffDec/bitsDec)
    return answer
项目:NuExplorerOS    作者:JetJet13    | 项目源码 | 文件源码
def computeTransHash(rawHash):

    initial_step = "0x" + rawHash
    primary_step = BitArray(initial_step)
    step0 = primary_step.bytes

    step1 = hashlib.sha256(step0)
    step2 = step1.digest()

    step3 = hashlib.sha256(step2)
    step4 = step3.hexdigest()

    step5 = "0x" + step4 
    step6 = BitArray(step5)

    step7 = hexHashReverser(step6)
    return step7

#address must be in decimal form before entering it into base58encode
项目:python-pythorrent    作者:yamatt    | 项目源码 | 文件源码
def recv_bitfield(self, length):
        Received only at the start of a connection when a peer wants to
        tell you all the pieces it has in a very compact form.
        :param length: The size of the payload, a number of bits
            representing the number of pieces
        payload = self.recv(length)
        bits = BitArray(bytes=payload)
        for i in range(len(self.torrent.pieces)):
            sha, piece = self.torrent.pieces.items()[i]
            piece = self.PIECE(sha, self, have=bits[i])
            self.pieces[sha] = piece
项目:python-pythorrent    作者:yamatt    | 项目源码 | 文件源码
def send_bitfield(self):
        Tell the peer all the pieces you have in a really compact form.
        # the pieces are compacted in sequence in to bits
        field = BitArray(
                lambda (sha, piece): sha == piece.digest,
        self.send_payload(5, field.tobytes())
项目:fdxread    作者:lkarsten    | 项目源码 | 文件源码
def checklength(pdu, speclen):
    "pdu is hex encoded, 4 bits per char."
    assert type(pdu) == str
    assert speclen is None or isinstance(speclen, int)

    assert len(pdu) % 2 == 0

    if speclen is not None:
        if len(pdu)/2 != speclen:
            raise DataError("mtype=0x%s: Incorrect length %s (got %s) body: %s"
                            % (pdu[:3*2], speclen, len(pdu)/2, pdu[3*2:]))
    return BitArray(hex=pdu[3*2:-1*2])
项目:fdxread    作者:lkarsten    | 项目源码 | 文件源码
def intdecoder(body, width=8, signed=False):
    assert type(body) == BitArray
    assert width % 8 == 0
    assert width in [8, 16]  # for now, due to fmt.
    fmt = "%03i"
    if width == 16:
        fmt = "%05i"

    s = []
    for idx in range(0, body.len, width):
        value = body[idx:idx+width]
        s += [fmt % (value.intle if signed else value.uintle)]
    return [("ints", " ".join(s)), ('strbody', body.hex)]
项目:PyHIDParser    作者:NZSmartie    | 项目源码 | 文件源码
def pack(self):
        values = _BitArray(self.count*self.size)
        for i in range(self.count):
            offset = i * self.size
                values[offset:offset + self.size] = int(self.physical_range.scale_to(self.logical_range, self._values[i]))
            except ArithmeticError:
                # If the value is outside of the physical range, and NULLs are allowed, then do not modify the value
                if not self.flags & ReportFlags.NULL_STATE:
        return values
项目:PyHIDParser    作者:NZSmartie    | 项目源码 | 文件源码
def serialize(self, report_type: ReportType) -> bytes:
        data = _BitArray()
        for item in self.items:
            if isinstance(item, Report):
                if item.report_type is not report_type:
        return data
项目:tagberry    作者:csailer    | 项目源码 | 文件源码
def toHex(self):
        '''Returns a hex representation of the EPCNumber'''   
        #h = BitArray("0b%s" % self._bits)
        h = hex(int(self._bits,2))
        return h[2:].upper()
项目:tagberry    作者:csailer    | 项目源码 | 文件源码
def toHex(self):
        '''Returns a hex representation of the GDTI-113. Need to make the bits divisble by 4'''   
        h = BitArray("0b%s%s" % ("0",self._bits))
        return h.hex[2:].upper()
项目:tagberry    作者:csailer    | 项目源码 | 文件源码
def getBits(self):
        if(self._fieldValue!=None and self._bitLength!=None and self._bitLength>0):
            ui = BitArray(uint = Conversion().uint32(self._fieldValue),length=self._bitLength)
            return ui.bin[2:]
            return "0".zfill(self._bitLength)
项目:lightning-integration    作者:cdecker    | 项目源码 | 文件源码
def u5_to_bitarray(arr):
    ret = bitstring.BitArray()
    for a in arr:
        ret += bitstring.pack("uint:5", a)
    return ret
项目:lightning-integration    作者:cdecker    | 项目源码 | 文件源码
def tagged_bytes(char, l):
    return tagged(char, bitstring.BitArray(l))

# Discard trailing bits, convert to bytes.
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def __init__(self, src_ase, dst_ase, block_list, options, resume_mgr):
        # type: (Descriptior,,
        #, list,
        #        blobxfer.models.options.SyncCopy,
        #        blobxfer.operations.resume.SyncCopyResumeManager) -> None
        """Ctor for Descriptor
        :param Descriptor self: this
        :param src_ase:
            source Azure Storage Entity
        :param dst_ase:
            destination Azure Storage Entity
        :param list block_list: source blob block list
        :param blobxfer.models.options.SyncCopy options: synccopy options
        :param blobxfer.operations.resume.SyncCopyResumeManager resume_mgr:
            synccopy resume manager
        self._offset = 0
        self._chunk_num = 0
        self._finalized = False
        self._meta_lock = threading.Lock()
        self._resume_mgr = resume_mgr
        self._src_ase = src_ase
        self._dst_ase = dst_ase
        self._src_block_list = block_list
        self._chunk_size = self._compute_chunk_size()
        # calculate the total number of ops required for transfer
        self._total_chunks = self._compute_total_chunks(self._chunk_size)
        self._outstanding_ops = self._total_chunks
        if blobxfer.util.is_not_empty(self._dst_ase.replica_targets):
            self._outstanding_ops *= len(self._dst_ase.replica_targets) + 1
        if self._resume_mgr:
            self._completed_chunks = bitstring.BitArray(
            self._replica_counters = {}
项目:dht-spider    作者:neoql    | 项目源码 | 文件源码
def __init__(self, node_id):
        self.node_id = BitArray(node_id)
        self.root = Bucket(self.node_id)
        self.__nodes = {}
项目:dht-spider    作者:neoql    | 项目源码 | 文件源码
def get_neighbor(self, info_hash):
        info_hash = BitArray(info_hash)
        return self.root.get_neighbor(info_hash)[0]
项目:dht-spider    作者:neoql    | 项目源码 | 文件源码
def __init__(self, node_id, addr):
        self.nid = BitArray(bytes=node_id)
        self.addr = addr
        self.dubious = False
        self.is_bad = False
        self.last_fresh = time.time()