Python random 模块,getrandbits() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用random.getrandbits()

项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_uint_multi_port(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_ports = random.sample(
            [d for d in x_series_device.do_ports if d.do_port_width <= 16], 2)

        total_port_width = sum([d.do_port_width for d in do_ports])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                flatten_channel_string([d.name for d in do_ports]),
                line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(total_port_width))
                              for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                task.write(value_to_test)
                time.sleep(0.001)
                values_read.append(task.read())

            assert values_read == values_to_test
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_one_sample_one_line(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_line = random.choice(x_series_device.do_lines).name

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_line, line_grouping=LineGrouping.CHAN_PER_LINE)

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            # Generate random values to test.
            values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_one_line(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_one_line())

            numpy.testing.assert_array_equal(values_read, values_to_test)
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_one_sample_port_byte(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(
            [d for d in x_series_device.do_ports if d.do_port_width <= 8])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_port_byte(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_port_byte())

            numpy.testing.assert_array_equal(values_read, values_to_test)
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_one_sample_port_uint16(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(
            [do for do in x_series_device.do_ports if do.do_port_width <= 16])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_port_uint16(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_port_uint16())

            numpy.testing.assert_array_equal(values_read, values_to_test)
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_one_sample_port_uint32(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(
            [do for do in x_series_device.do_ports if do.do_port_width <= 32])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_port_uint32(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_port_uint32())

            numpy.testing.assert_array_equal(values_read, values_to_test)
项目:run_lambda    作者:ethantkoenig    | 项目源码 | 文件源码
def random_context():
        result = {}
        if random.getrandbits(1):
            result["function_name"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["function_version"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["invoked_function_arn"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["memory_limit_in_mb"] = str(random.randint(100, 200))
        if random.getrandbits(1):
            result["aws_request_id"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["log_group_name"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["log_stream_name"] = RunLambdaCliTest.random_string()
        if random.getrandbits(1):
            result["identity"] = RunLambdaCliTest.random_identity()
        if random.getrandbits(1):
            result["client_context"] = RunLambdaCliTest.random_client_context()
        return result
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwds):
        ## Create shared memory for rendered image
        #pg.dbg(namespace={'r': self})
        if sys.platform.startswith('win'):
            self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
            self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
        else:
            self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
            self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
            fd = self.shmFile.fileno()
            self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        atexit.register(self.close)

        GraphicsView.__init__(self, *args, **kwds)
        self.scene().changed.connect(self.update)
        self.img = None
        self.renderTimer = QtCore.QTimer()
        self.renderTimer.timeout.connect(self.renderView)
        self.renderTimer.start(16)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwds):
        ## Create shared memory for rendered image
        #pg.dbg(namespace={'r': self})
        if sys.platform.startswith('win'):
            self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
            self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
        else:
            self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
            self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
            fd = self.shmFile.fileno()
            self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        atexit.register(self.close)

        GraphicsView.__init__(self, *args, **kwds)
        self.scene().changed.connect(self.update)
        self.img = None
        self.renderTimer = QtCore.QTimer()
        self.renderTimer.timeout.connect(self.renderView)
        self.renderTimer.start(16)
项目:Round1    作者:general-ai-challenge    | 项目源码 | 文件源码
def main():
    port = "5556"
    context = zmq.Context()
    socket = context.socket(zmq.PAIR)
    socket.connect("tcp://localhost:%s" % port)
    socket.send_string(str('hello'))

    message = '00101110'
    cnt = 0
    while True:
        reward = socket.recv()  # 1 or 0, or '-1' for None
        print(reward)
        msg_in = socket.recv()
        print(msg_in)

        # think...
        msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
        if cnt % 2 == 0:
            msg_out = str(message[cnt % 8])
        socket.send(msg_out)
        cnt = cnt + 1
项目:Round1    作者:general-ai-challenge    | 项目源码 | 文件源码
def gen_pseudo_word(self, L=None):
        if not L:
            L = random.randint(1, 8)
        # generate one word that we hadn't used before
        while True:
            if self.readable:
                # alternating between vowels and consonants, sampled with repl.
                _choice, _range = random.choice, range(int(math.ceil(L / 2)))
                v = [_choice(self.V) for i in _range]
                c = [_choice(self.C) for i in _range]
                zipped = zip(v, c) if random.getrandbits(1) else zip(c, v)
                pseudo_word = ''.join([a for b in zipped for a in b])[:L]
            else:
                pseudo_word = ''.join(random.sample(
                                      string.ascii_lowercase, L))
            if pseudo_word not in self.inv_word_mapping:
                return pseudo_word
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def dated_path(obj, file_data):
    try:
        prefix = getattr(obj, 'model_name')
    except BaseException:
        prefix = "undefined"

    parts = op.splitext(file_data.filename)
    rand = random.getrandbits(16)
    filename = u"{name}_{rand}{ext}".format(
        rand=rand, name=parts[0], ext=parts[1]
    )
    filename = secure_filename(filename)
    today = date.today()
    path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
        prefix=prefix, t=today, filename=filename
    )
    return path
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def action_clone_item(self, ids):
        if len(ids) > 1:
            flash(
                "You can select only one item for this action",
                'error'
            )
            return

        model = current_app.db.get_with_content(_id=ids[0])
        clone = deepcopy(model)
        del clone['_id']
        clone['slug'] = f'{clone["slug"]}-{random.getrandbits(32)}'
        clone['_isclone'] = True
        self._on_model_change(None, clone, True)
        self.coll.insert(clone)
        self.after_model_change(None, clone, True)
        return redirect(url_for('.edit_view', id=clone['_id']))


# TODO: Serialize and activate thia action
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
    """
    This function follows from the following discussion:
    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`

    It works like uuid.uuid4 except that tries to use os.urandom() if possible
    and it XORs the output with the tokens uniquely associated with this machine.
    """
    rand_longs = (random.getrandbits(64), random.getrandbits(64))
    if HAVE_URANDOM:
        urand_longs = _struct_2_long_long.unpack(fast_urandom16())
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
    else:
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ ctokens[1])
    return str(uuid.UUID(bytes=byte_s, version=4))
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def main():
    random.seed()
    # ????
    print(random.getrandbits(3))
    print(random.randint(200, 800))
    print(2, 400, 2)
    # ????
    seq = range(1, 10)
    print(random.choice(seq))
    print(random.sample(seq, 4))
    a = list(seq)
    random.shuffle(a)
    print(a)

    # ??
    print(random.random())  # [0.0, 1.0)???????
    print(random.uniform(2.1, 4.99))  # ??????????

    pass
项目:resolver    作者:Imperium-Software    | 项目源码 | 文件源码
def __init__(self, length=0, defined=False, value=None):

        """ Creates a bit string of a certain length, using a certain underlying
        implementation.  """

        self.length = length
        self.fitness = 0
        self.isCacheValid = False
        self.defined = bitarray(length)
        self.data = bitarray(length)

        if defined:
            self.defined = None
        else:
            self.defined.setall(False)

        if value is not None:
            self.data = [bool(X) for X in value]

        for i in range(1, length+1):
            if bool(random.getrandbits(1)):
                self.flip(i)
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def test_wrong_method(self):
        test_host = 'frob.nitz'
        test_path = '/fnord'

        ws = WSConnection(SERVER)

        nonce = bytes(random.getrandbits(8) for x in range(0, 16))
        nonce = base64.b64encode(nonce)

        request = b'POST ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
        request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
        request += b'Connection: Upgrade\r\n'
        request += b'Upgrade: WebSocket\r\n'
        request += b'Sec-WebSocket-Version: 13\r\n'
        request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
        request += b'\r\n'

        ws.receive_bytes(request)
        event = next(ws.events())
        assert isinstance(event, ConnectionFailed)
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def test_bad_connection(self):
        test_host = 'frob.nitz'
        test_path = '/fnord'

        ws = WSConnection(SERVER)

        nonce = bytes(random.getrandbits(8) for x in range(0, 16))
        nonce = base64.b64encode(nonce)

        request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
        request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
        request += b'Connection: Zoinks\r\n'
        request += b'Upgrade: WebSocket\r\n'
        request += b'Sec-WebSocket-Version: 13\r\n'
        request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
        request += b'\r\n'

        ws.receive_bytes(request)
        event = next(ws.events())
        assert isinstance(event, ConnectionFailed)
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def test_bad_upgrade(self):
        test_host = 'frob.nitz'
        test_path = '/fnord'

        ws = WSConnection(SERVER)

        nonce = bytes(random.getrandbits(8) for x in range(0, 16))
        nonce = base64.b64encode(nonce)

        request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
        request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
        request += b'Connection: Upgrade\r\n'
        request += b'Upgrade: WebPocket\r\n'
        request += b'Sec-WebSocket-Version: 13\r\n'
        request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
        request += b'\r\n'

        ws.receive_bytes(request)
        event = next(ws.events())
        assert isinstance(event, ConnectionFailed)
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def test_missing_version(self):
        test_host = 'frob.nitz'
        test_path = '/fnord'

        ws = WSConnection(SERVER)

        nonce = bytes(random.getrandbits(8) for x in range(0, 16))
        nonce = base64.b64encode(nonce)

        request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
        request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
        request += b'Connection: Upgrade\r\n'
        request += b'Upgrade: WebSocket\r\n'
        request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
        request += b'\r\n'

        ws.receive_bytes(request)
        event = next(ws.events())
        assert isinstance(event, ConnectionFailed)
项目:wsproto    作者:python-hyper    | 项目源码 | 文件源码
def test_accept_wrong_subprotocol(self):
        test_host = 'frob.nitz'
        test_path = '/fnord'

        ws = WSConnection(SERVER)

        nonce = bytes(random.getrandbits(8) for x in range(0, 16))
        nonce = base64.b64encode(nonce)

        request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
        request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
        request += b'Connection: Upgrade\r\n'
        request += b'Upgrade: WebSocket\r\n'
        request += b'Sec-WebSocket-Version: 13\r\n'
        request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
        request += b'Sec-WebSocket-Protocol: one, two\r\n'
        request += b'\r\n'

        ws.receive_bytes(request)
        event = next(ws.events())
        assert isinstance(event, ConnectionRequested)
        assert event.proposed_subprotocols == ['one', 'two']

        with pytest.raises(ValueError):
            ws.accept(event, 'three')
项目:modern-paste    作者:LINKIWI    | 项目源码 | 文件源码
def generate(
        cls,
        user_id=lambda: random.getrandbits(16),
        contents=lambda: random_alphanumeric_string(length=8192),
        expiry_time=lambda: int(time.time() + random.getrandbits(16)),
        title=lambda: random_alphanumeric_string(),
        language=lambda: random.choice(['python', 'css', 'javascript', 'text']),
        password=lambda: random_alphanumeric_string(),
        is_api_post=lambda: False,
    ):
        return database.paste.create_new_paste(
            contents=cls.random_or_specified_value(contents),
            user_id=cls.random_or_specified_value(user_id),
            expiry_time=cls.random_or_specified_value(expiry_time),
            title=cls.random_or_specified_value(title),
            language=cls.random_or_specified_value(language),
            password=cls.random_or_specified_value(password),
            is_api_post=cls.random_or_specified_value(is_api_post),
        )
项目:modern-paste    作者:LINKIWI    | 项目源码 | 文件源码
def generate(
        cls,
        paste_id=lambda: random.getrandbits(16),
        file_name=lambda: random_alphanumeric_string(),
        file_size=lambda: random.getrandbits(16),
        mime_type=lambda: 'image/png',
        file_data=lambda: random_alphanumeric_string(8192)
    ):
        with mock.patch.object(database.attachment, '_store_attachment_file'):
            return database.attachment.create_new_attachment(
                paste_id=cls.random_or_specified_value(paste_id),
                file_name=cls.random_or_specified_value(file_name),
                file_size=cls.random_or_specified_value(file_size),
                mime_type=cls.random_or_specified_value(mime_type),
                file_data=cls.random_or_specified_value(file_data),
            )
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def random_marker():
    """ A random marker used to identify a pytest run.

    Some tests will spawn a private chain, the private chain will be one or
    more ethereum nodes on a new subprocesss. These nodes may fail to start on
    concurrent test runs, mostly because of port number conflicts, but even
    though the test fails to start its private chain it may run interacting
    with the geth process from a different test run! This leads to
    unreasonable test errors.

    This fixture creates a random marker used to distinguish pytest runs and
    avoid test failures. Note this could fail for other reasons and fail to
    detect unwanted interations if the user sets the PYTHONHASHSEED to the same
    value.
    """
    random_hex = hex(random.getrandbits(100))

    # strip the leading 0x and trailing L
    return random_hex[2:-1]
项目:freehand-zxcalculus-recognition    作者:emmacaort    | 项目源码 | 文件源码
def circlepoint(c,r):
    """Generate a point [x,y] lying on a circle by the circle equation.

    Args:
        c (list): The position of circle centre.
        r (float): The radius of the circle.

    Returns:
        A point lies on a circle. The point position is random.
    """
    x = random.uniform(-r,r)  # Randomly find the x position.
    negative = bool(random.getrandbits(1))  # Randomly set whether the point is on the positive y side or negative side.
    y = math.sqrt(r**2-x**2)  # The equation of the circle.
    if negative:
        y = -y
    return [x+c[0],y+c[1]]
项目:twittershade    作者:nicolavic98    | 项目源码 | 文件源码
def encode_params(self, base_url, method, params):
        params = params.copy()

        if self.token:
            params['oauth_token'] = self.token

        params['oauth_consumer_key'] = self.consumer_key
        params['oauth_signature_method'] = 'HMAC-SHA1'
        params['oauth_version'] = '1.0'
        params['oauth_timestamp'] = str(int(time()))
        params['oauth_nonce'] = str(getrandbits(64))

        enc_params = urlencode_noplus(sorted(params.items()))

        key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~')

        message = '&'.join(
            urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params])

        signature = (base64.b64encode(hmac.new(
                    key.encode('ascii'), message.encode('ascii'), hashlib.sha1)
                                      .digest()))
        return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
项目:goose-search    作者:rezemika    | 项目源码 | 文件源码
def gouv_api_csv(csv):
    output_csv = ''
    for line in csv.splitlines()[1:]:
        output_csv += line
        # Always fills the data of the result used for tests.
        if bool(random.getrandbits(1)) or "-21.9419851,64.14602" in line: #"64,1460200,-21,9419851" in line:
            output_csv += ',' + ','.join([
                str(fake.latitude()),  # "result_latitude"
                str(fake.longitude()),  # "result_longitude"
                fake.address().replace('\n', ' ').replace(',', ' '),  # "result_label"
                str(random.randint(0, 300)),  # "result_distance"
                "housenumber",  # "result_type"
                "44109_XXXX_984eec",  # "result_id"
                fake.building_number(),  # "result_housenumber"
                fake.street_name(),  # "result_name", the name of the street.
                '',  # "result_street", empty in most case.
                fake.postcode(),  # "result_postcode"
                fake.city(),  # "result_city"
                '"' + fake.postcode()[:2] + ' ' + fake.department()[1] + ', ' + fake.region() + '"',  # "result_context"
                fake.postcode()[:2] + str(random.randint(100, 300)),  # "result_citycode"
            ])
        else:
            output_csv += ',,,,,,,,,,,,,'
        output_csv += '\n'
    return output_csv
项目:goose-search    作者:rezemika    | 项目源码 | 文件源码
def result_properties():
    properties = {}
    properties["access"] = "yes"
    if bool(random.getrandbits(1)):
        properties["name"] = fake.company()
    if bool(random.getrandbits(1)):
        properties["phone"] = fake.phone_number()
    if bool(random.getrandbits(1)):
        properties["fee"] = "yes"
    elif bool(random.getrandbits(1)):
        properties["fee"] = "no"
    if bool(random.getrandbits(1)):
        properties["opening_hours"] = "Mo-Su 09:00-19:00"
    elif bool(random.getrandbits(1)):
        properties["opening_hours"] = "Mo-Th 12:30-16:30"
    return properties
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
    """
    This function follows from the following discussion:
    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`

    It works like uuid.uuid4 except that tries to use os.urandom() if possible
    and it XORs the output with the tokens uniquely associated with this machine.
    """
    rand_longs = (random.getrandbits(64), random.getrandbits(64))
    if HAVE_URANDOM:
        urand_longs = _struct_2_long_long.unpack(fast_urandom16())
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
    else:
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ ctokens[1])
    return str(uuid.UUID(bytes=byte_s, version=4))
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
    """
    This function follows from the following discussion:
    http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09

    It works like uuid.uuid4 except that tries to use os.urandom() if possible
    and it XORs the output with the tokens uniquely associated with this machine.
    """
    rand_longs = (random.getrandbits(64), random.getrandbits(64))
    if HAVE_URANDOM:
        urand_longs = struct.unpack('=QQ', fast_urandom16())
        byte_s = struct.pack('=QQ',
                             rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
                             rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
    else:
        byte_s = struct.pack('=QQ',
                             rand_longs[0] ^ ctokens[0],
                             rand_longs[1] ^ ctokens[1])
    return str(uuid.UUID(bytes=byte_s, version=4))
项目:persist-queue    作者:peter-wangxu    | 项目源码 | 文件源码
def test_random_read_write(self):
        """Test random read/write"""

        q = Queue(self.path)
        n = 0
        for i in range(1000):
            if random.random() < 0.5:
                if n > 0:
                    q.get_nowait()
                    q.task_done()
                    n -= 1
                else:
                    with self.assertRaises(Empty):
                        q.get_nowait()
            else:
                q.put('var%d' % random.getrandbits(16))
                n += 1
项目:qiskit-sdk-py    作者:QISKit    | 项目源码 | 文件源码
def run(self, q_job):
        """Run circuits in q_job"""
        # Generating a string id for the job
        job_id = str(uuid.uuid4())
        result_list = []
        qobj = q_job.qobj
        self._sim = Simulator(gate_fusion=True)
        if 'seed' in qobj['config']:
            self._seed = qobj['config']['seed']
            self._sim._simulator = CppSim(self._seed)
        else:
            self._seed = random.getrandbits(32)
        self._shots = qobj['config']['shots']
        for circuit in qobj['circuits']:
            result_list.append(self.run_circuit(circuit))
        return Result({'job_id': job_id, 'result': result_list,
                       'status': 'COMPLETED'},
                      qobj)
项目:pyASA    作者:xpac1985    | 项目源码 | 文件源码
def test_append_rules(self, asa):
        rules = []
        for i in range(1, 351):
            protocol = choice(["tcp", "udp"])
            if bool(getrandbits(1)):
                src = IPAddress(randint(0, 4294967295))
            else:
                src = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
            if bool(getrandbits(1)):
                dst = IPAddress(randint(0, 4294967295))
            else:
                dst = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
            dst_port = randint(1, 65535)
            src_comp = choice([comp for comp in ServiceComparator])
            dst_comp = choice([comp for comp in ServiceComparator])
            rule = RuleTCPUDP(protocol=protocol, src=src, dst=dst, src_port=i, dst_port=dst_port,
                              src_comp=src_comp, dst_comp=dst_comp)
            rules.append(rule)
        asa.acl.append_rules(settings.test_acl, rules)
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
    """
    This function follows from the following discussion:
    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`

    It works like uuid.uuid4 except that tries to use os.urandom() if possible
    and it XORs the output with the tokens uniquely associated with this machine.
    """
    rand_longs = (random.getrandbits(64), random.getrandbits(64))
    if HAVE_URANDOM:
        urand_longs = _struct_2_long_long.unpack(fast_urandom16())
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
    else:
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ ctokens[1])
    return str(uuid.UUID(bytes=byte_s, version=4))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def run(self):
        if not self.coordinator.enabled:
            return

        log.debug("Coordinator thread for %s starting: %s" %
                  (self.coordinator.filesystem, threading.current_thread()))

        while not self._stopping.is_set():
            # Keep agents busy
            self.generate_actions_for_agents()

            # Stack up waiting actions
            if bool(random.getrandbits(1)):
                self.unassigned_requests.put(self.get_random_action())

            self.cull_completed_requests()

            # TODO: Very infrequently, randomly cancel an agent action

            self._stopping.wait(HSM_COORDINATOR_LOOP_INTERVAL)
项目:tensorflow_yolo2    作者:wenxichen    | 项目源码 | 文件源码
def image_read(self, imname):
        image = misc.imread(imname, mode='RGB').astype(np.float)
        r,c,ch = image.shape
        if r < 299 or c < 299:
            # TODO: check too small images
            # print "##too small!!"
            image = misc.imresize(image, (299, 299, 3))
        elif r > 299 or c > 299:
            image = image[(r-299)/2 : (r-299)/2 + 299, (c-299)/2 : (c-299)/2 + 299, :]
        # print r, c, image.shape
        assert image.shape == (299, 299, 3)
        image = (image / 255.0) * 2.0 - 1.0
        if self.random_noise:
            add_noise = bool(random.getrandbits(1))
            if add_noise:
                eps = random.choice([4.0, 8.0, 12.0, 16.0]) / 255.0 * 2.0
                noise_image = image + eps * np.random.choice([-1, 1], (299,299,3))
                image = np.clip(noise_image, -1.0, 1.0)
        return image
项目:OKUSON    作者:frankluebeck    | 项目源码 | 文件源码
def AdminLogin(req,onlyhead):
    global currentcookie
    passwd = req.query.get('passwd',[''])[0]
    salt = Config.conf['AdministratorPassword'][:2]
    passwd = crypt.crypt(passwd,salt)
    if passwd != Config.conf['AdministratorPassword']:
        return Delegate('/errors/wrongpasswd.html',req,onlyhead)

    random.seed(os.urandom(200));
    currentcookie = str(random.getrandbits(200))
    handler = EH_Generic_class()
    handler.iamadmin = 1
    (header,content) = Site['/adminmenu.html'].getresult(req,onlyhead,handler)
    header['Set-Cookie'] = 'OKUSON='+currentcookie+ \
         ';Path=/;Max-Age=3600;Version=1'
         # Max-Age is one hour
    #header['Location'] = '/adminmenu.html'
    # Taken out to please opera, which does not get the cookie for the
    # login with this header. Max.
    return (header,content)
项目:epi    作者:jakubtuchol    | 项目源码 | 文件源码
def uniform_random(lower_bound, upper_bound):
    """
    Question 5.10: Generates uniform random number
    within lower and upper bounds, inclusive
    """
    num_outcomes = upper_bound - lower_bound + 1
    result = None

    while True:
        result = 0
        i = 0
        while 1 << i < num_outcomes:
            result = (result << 1) | getrandbits(1)
            i += 1
        if result < num_outcomes:
            break
    return result + lower_bound
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
    """
    This function follows from the following discussion:
    `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`

    It works like uuid.uuid4 except that tries to use os.urandom() if possible
    and it XORs the output with the tokens uniquely associated with this machine.
    """
    rand_longs = (random.getrandbits(64), random.getrandbits(64))
    if HAVE_URANDOM:
        urand_longs = _struct_2_long_long.unpack(fast_urandom16())
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
    else:
        byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
                                          rand_longs[1] ^ ctokens[1])
    return str(uuid.UUID(bytes=byte_s, version=4))
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def generateChallengeResponseV2(password, user, server_challenge, server_info, domain = '', client_challenge = None):
    client_timestamp = '\0' * 8

    if not client_challenge:
        client_challenge = ''
        for i in range(0, 8):
            client_challenge += chr(random.getrandbits(8))
    assert len(client_challenge) == 8

    d = MD4()
    d.update(password.encode('UTF-16LE'))
    ntlm_hash = d.digest()   # The NT password hash
    response_key = hmac.new(ntlm_hash, (user.upper() + domain).encode('UTF-16LE')).digest()  # The NTLMv2 password hash. In [MS-NLMP], this is the result of NTOWFv2 and LMOWFv2 functions
    temp = client_timestamp + client_challenge + domain.encode('UTF-16LE') + server_info

    nt_challenge_response = hmac.new(response_key, server_challenge + temp).digest()
    lm_challenge_response = hmac.new(response_key, server_challenge + client_challenge).digest() + client_challenge
    session_key = hmac.new(response_key, nt_challenge_response).digest()

    return nt_challenge_response, lm_challenge_response, session_key


################
# NTLMv1 Methods
################
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def __init__(self, url, mode = "r"):
        import random
        try:
          import xbmc
        except:
          xbmc = None
        self.url = url
        self.remote, self.share, self.path = path = connect(url)
        self.mode = mode
        self.binary = False
        self.canread = False
        self.canwrite = False
        self.closed = True
        self.size = 0
        self.pos = 0
        if xbmc:
          self.tmp_path = os.path.join(xbmc.translatePath("special://temp/"), "%08x" % (random.getrandbits(32)))
        else:
          self.tmp_path = os.path.join(os.getenv("TEMP") or os.getenv("TMP") or os.getenv("TMPDIR"), "%08x" % (random.getrandbits(32)))
        self.tmp_file = None

        self.__get_mode__()
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def dialog_select(self, heading, list): 
      ID = "%032x" %(random.getrandbits(128))
      response = '<?xml version="1.0" encoding="UTF-8" ?>\n'
      response +='<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/">\n'
      response +='<channel>\n'
      response +='<link>/rss</link>\n'
      response +='<title>'+heading+'</title>\n'
      for option in list:
        response += '<item>\n'
        response += '<title>'+option +'</title>\n'
        response += '<image>%s</image>\n'
        response += '<link>http://'+self.controller.host+'/data/'+threading.current_thread().name +'/'+ID+'/'+str(list.index(option))+'</link>\n'
        response += '</item>\n\n'

      response += '</channel>\n'
      response += '</rss>\n'
      self.controller.send_data(response)

      self.handler.server.shutdown_request(self.handler.request)
      while not self.controller.get_data(ID):
        continue

      return  int(self.controller.get_data(ID))
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_bool_1_chan_1_samp(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_line = random.choice(x_series_device.do_lines).name

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_line, line_grouping=LineGrouping.CHAN_PER_LINE)

            # Generate random values to test.
            values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                task.write(value_to_test)
                time.sleep(0.001)
                values_read.append(task.read())

            assert values_read == values_to_test

            # Verify setting number_of_samples_per_channel (even to 1)
            # returns a list.
            value_read = task.read(number_of_samples_per_channel=1)
            assert isinstance(value_read, list)
            assert len(value_read) == 1
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_bool_n_chan_1_samp(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        number_of_channels = random.randint(2, len(x_series_device.do_lines))
        do_lines = random.sample(x_series_device.do_lines, number_of_channels)

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                flatten_channel_string([d.name for d in do_lines]),
                line_grouping=LineGrouping.CHAN_PER_LINE)

            # Generate random values to test.
            values_to_test = [bool(random.getrandbits(1)) for _ in
                              range(number_of_channels)]

            task.write(values_to_test)
            time.sleep(0.001)
            values_read = task.read()

            assert values_read == values_to_test

            # Verify setting number_of_samples_per_channel (even to 1)
            # returns a list of lists.
            value_read = task.read(number_of_samples_per_channel=1)
            assert isinstance(value_read, list)
            assert isinstance(value_read[0], list)
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_uint_1_chan_1_samp(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(x_series_device.do_ports)

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                task.write(value_to_test)
                time.sleep(0.001)
                values_read.append(task.read())

            assert values_read == values_to_test

            # Verify setting number_of_samples_per_channel (even to 1)
            # returns a list.
            value_read = task.read(number_of_samples_per_channel=1)
            assert isinstance(value_read, list)
            assert len(value_read) == 1
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_many_sample_port_byte(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        number_of_samples = random.randint(2, 20)
        do_port = random.choice(
            [d for d in x_series_device.do_ports if d.do_port_width <= 8])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = numpy.array(
                [int(random.getrandbits(do_port.do_port_width))
                 for _ in range(number_of_samples)], dtype=numpy.uint8)

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            task.start()

            writer.write_many_sample_port_byte(values_to_test)
            time.sleep(0.001)

            # Since we're writing to and reading from ONLY the digital
            # output lines, we can't use sample clocks to correlate the
            # read and write sampling times. Thus, we essentially read
            # the last value written multiple times.
            values_read = numpy.zeros(number_of_samples, dtype=numpy.uint8)
            reader.read_many_sample_port_byte(
                values_read, number_of_samples_per_channel=number_of_samples)

            expected_values = [
                values_to_test[-1] for _ in range(number_of_samples)]
            numpy.testing.assert_array_equal(values_read, expected_values)