Python itertools 模块,cycle() 实例源码


项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        self.iterableCycle = cycle((ImageTk.PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
项目:zellij    作者:nedbat    | 项目源码 | 文件源码
def pieces_under_over(path, segs_to_points, xings):
    """Produce all the pieces of the path, with a bool indicating if each leads to under or over."""
    pieces = list(path_pieces(path, segs_to_points))
    for i, piece in enumerate(pieces):
        xing = xings.get(piece[-1])
        if xing is None:
        if xing.under is not None:
            over = (xing.under != path)
            assert xing.over is not None
            over = (xing.over == path)
        ou = [over, not over]
        if i % 2:
            ou = ou[::-1]
        ou = [True, False]

    yield from zip(pieces, itertools.cycle(ou))
项目:sappho    作者:lily-mayfield    | 项目源码 | 文件源码
def test_jitter(self):
        # Jitter cycles between +1 & -1 times time delta
        jitter_iter = itertools.cycle((1, -1))

        def jitter(dt):
            return dt * next(jitter_iter)

        physics = particle.PhysicsJitter(y=2, jitter=jitter)
        p = particle.Particle(0, 0)
        self.assertEquals(p.y, 0)

        # Jitter one second; uses jitter value 1 * 1 second * scale 2, +2 to y
        physics(1, p)
        self.assertEquals(p.y, 2)

        # Jitter 2s; uses jitter value -1 * 2 second * scale 2, -4 to y
        physics(2, p)
        self.assertEquals(p.y, -2)

        # Jitter 5s; uses jitter value 1 * 5 second * scale 2, +10 to y
        physics(5, p)
        self.assertEquals(p.y, 8)
项目:django-powerpages    作者:Open-E-WEB    | 项目源码 | 文件源码
def __init__(self, console_or_stream=None, **kwargs):
        if isinstance(console_or_stream, Console):
            self.console = console_or_stream
            self.console = Console(console_or_stream)
        self.template = kwargs.get('template', self.TEMPLATE)
        self.progress_template = \
            kwargs.get('progress_template', self.PROGRESS_TEMPLATE)
        self.progress_brick = kwargs.get('progress_brick', self.PROGRESS_BRICK)
        self.progress_num_bricks = \
            kwargs.get('progress_num_bricks', self.PROGRESS_NUM_BRICKS)
        self.percent_template = \
            kwargs.get('percent_template', self.PERCENT_TEMPLATE)
        self.current_template = \
            kwargs.get('current_template', self.CURRENT_TEMPLATE)
        self.total_template = \
            kwargs.get('total_template', self.TOTAL_TEMPLATE)
        self.eta_template = kwargs.get('eta_template', self.ETA_TEMPLATE)
        self.screw_template = kwargs.get('screw_template', self.SCREW_TEMPLATE)
        self.refresh_every = kwargs.get('refresh_every', self.REFRESH_EVERY)
        self.line_template = self.build_line_template()
        self.start_datetime = None
        self.screw_cycle = itertools.cycle(('|', '/', '-', '\\'))
项目:FCN_train    作者:315386775    | 项目源码 | 文件源码
def _match_label_with_color(label, colors, bg_label, bg_color):
    """Return `unique_labels` and `color_cycle` for label array and color list.

    Colors are cycled for normal labels, but the background color should only
    be used for the background.
    # Temporarily set background color; it will be removed later.
    if bg_color is None:
        bg_color = (0, 0, 0)
    bg_color = _rgb_vector([bg_color])

    unique_labels = list(set(label.flat))
    # Ensure that the background label is in front to match call to `chain`.
    if bg_label in unique_labels:
    unique_labels.insert(0, bg_label)

    # Modify labels and color cycle so background color is used only once.
    color_cycle = itertools.cycle(colors)
    color_cycle = itertools.chain(bg_color, color_cycle)

    return unique_labels, color_cycle
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def __init__(self, pid: int, name: str, parser, fields: [list, str]):
        A class representing an FMSPID and its conversion
        :param pid: The PID
        :param name:  A friendly name of this PID
        :param parser: a function that turns a byte array of at most length 8 into one or more readable values
        :param fields: the friendly name of the outputs of this PID eg. 'RPM'
        """ = pid = name
        self.parser = parser
        if type(fields) is str:
            self.fields = [fields]
            self.fields = fields
        self.fieldnames = ['{} ({})'.format(name, unit) for pid_name, unit in
                           zip(cycle([]), self.fields)]
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def insert_and_validate_list_results(self, reverse, slowdown):
        This utility method will execute submit various statements for execution using the ConcurrentExecutorListResults,
        then invoke a separate thread to execute the callback associated with the futures registered
        for those statements. The parameters will toggle various timing, and ordering changes.
        Finally it will validate that the results were returned in the order they were submitted
        :param reverse: Execute the callbacks in the opposite order that they were submitted
        :param slowdown: Cause intermittent queries to perform slowly
        our_handler = MockResponseResponseFuture(reverse=reverse)
        mock_session = Mock()
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        mock_session.execute_async.return_value = our_handler

        t = TimedCallableInvoker(our_handler, slowdown=slowdown)
        results = execute_concurrent(mock_session, statements_and_params)

        while(not our_handler.pending_callbacks.empty()):
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def insert_and_validate_list_generator(self, reverse, slowdown):
        This utility method will execute submit various statements for execution using the ConcurrentExecutorGenResults,
        then invoke a separate thread to execute the callback associated with the futures registered
        for those statements. The parameters will toggle various timing, and ordering changes.
        Finally it will validate that the results were returned in the order they were submitted
        :param reverse: Execute the callbacks in the opposite order that they were submitted
        :param slowdown: Cause intermittent queries to perform slowly
        our_handler = MockResponseResponseFuture(reverse=reverse)
        mock_session = Mock()
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        mock_session.execute_async.return_value = our_handler

        t = TimedCallableInvoker(our_handler, slowdown=slowdown)
            results = execute_concurrent(mock_session, statements_and_params, results_generator=True)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_wrap_round_robin(self):
        cluster = Mock(spec=Cluster)
        cluster.metadata = Mock(spec=Metadata)
        hosts = [Host(str(i), SimpleConvictionPolicy) for i in range(4)]
        for host in hosts:

        def get_replicas(keyspace, packed_key):
            index = struct.unpack('>i', packed_key)[0]
            return list(islice(cycle(hosts), index, index + 2))

        cluster.metadata.get_replicas.side_effect = get_replicas

        policy = TokenAwarePolicy(RoundRobinPolicy())
        policy.populate(cluster, hosts)

        for i in range(4):
            query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name')
            qplan = list(policy.make_query_plan(None, query))

            replicas = get_replicas(None, struct.pack('>i', i))
            other = set(h for h in hosts if h not in replicas)
            self.assertEqual(replicas, qplan[:2])
            self.assertEqual(other, set(qplan[2:]))

        # Should use the secondary policy
        for i in range(4):
            qplan = list(policy.make_query_plan())

            self.assertEqual(set(qplan), set(hosts))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_execute_concurrent(self):
        for num_statements in (0, 1, 2, 7, 10, 99, 100, 101, 199, 200, 201):
            # write
            statement = SimpleStatement(
                "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
            statements = cycle((statement, ))
            parameters = [(i, i) for i in range(num_statements)]

            results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
            self.assertEqual(num_statements, len(results))
            for success, result in results:

            # read
            statement = SimpleStatement(
                "SELECT v FROM test3rf.test WHERE k=%s",
            statements = cycle((statement, ))
            parameters = [(i, ) for i in range(num_statements)]

            results = self.execute_concurrent_helper(self.session, list(zip(statements, parameters)))
            self.assertEqual(num_statements, len(results))
            self.assertEqual([(True, [(i,)]) for i in range(num_statements)], results)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_no_raise_on_first_failure(self):
        statement = SimpleStatement(
            "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
        statements = cycle((statement, ))
        parameters = [(i, i) for i in range(100)]

        # we'll get an error back from the server
        parameters[57] = ('efefef', 'awefawefawef')

        results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
        for i, (success, result) in enumerate(results):
            if i == 57:
                self.assertIsInstance(result, InvalidRequest)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_no_raise_on_first_failure_client_side(self):
        statement = SimpleStatement(
            "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
        statements = cycle((statement, ))
        parameters = [(i, i) for i in range(100)]

        # the driver will raise an error when binding the params
        parameters[57] = 1

        results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
        for i, (success, result) in enumerate(results):
            if i == 57:
                self.assertIsInstance(result, TypeError)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def make_query_plan(self, working_keyspace=None, query=None):
        # not thread-safe, but we don't care much about lost increments
        # for the purposes of load balancing
        pos = self._position
        self._position += 1

        local_live = self._dc_live_hosts.get(self.local_dc, ())
        pos = (pos % len(local_live)) if local_live else 0
        for host in islice(cycle(local_live), pos, pos + len(local_live)):
            yield host

        # the dict can change, so get candidate DCs iterating over keys of a copy
        other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
        for dc in other_dcs:
            remote_live = self._dc_live_hosts.get(dc, ())
            for host in remote_live[:self.used_hosts_per_remote_dc]:
                yield host
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def add_plot_args(parser):
    parser.add_argument('-d', '--data',
        help="""Append a PATH to a privcount tallies.json file,
                and the LABEL we should use for the graph legend for this
                set of experimental results""",
        metavar=("PATH", "LABEL"),
        action=PlotDataAction, dest="experiments")

    parser.add_argument('-p', '--prefix',
        help="a STRING filename prefix for graphs we generate",
        action="store", dest="prefix",

    parser.add_argument('-f', '--format',
        help="""A comma-separated LIST of color/line format strings to cycle to
                matplotlib's plot command (see matplotlib.pyplot.plot)""",
        action="store", dest="lineformats",
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def _copy_from(self, curs, nrecs, srec, copykw):
        f = StringIO()
        for i, c in zip(range(nrecs), cycle(string.ascii_letters)):
            l = c * srec
            f.write("%s\t%s\n" % (i, l))
        curs.copy_from(MinimalRead(f), "tcopy", **copykw)

        curs.execute("select count(*) from tcopy")
        self.assertEqual(nrecs, curs.fetchone()[0])

        curs.execute("select data from tcopy where id < %s order by id",
        for i, (l,) in enumerate(curs):
            self.assertEqual(l, string.ascii_letters[i] * srec)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def _copy_from(self, curs, nrecs, srec, copykw):
        f = StringIO()
        for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)):
            l = c * srec
            f.write("%s\t%s\n" % (i, l))
        curs.copy_from(MinimalRead(f), "tcopy", **copykw)

        curs.execute("select count(*) from tcopy")
        self.assertEqual(nrecs, curs.fetchone()[0])

        curs.execute("select data from tcopy where id < %s order by id",
        for i, (l,) in enumerate(curs):
            self.assertEqual(l, string.ascii_letters[i] * srec)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testLineBuffering(self):
        Test creating a LineBuffer and feeding it some lines.  The lines should
        build up in its internal buffer for a while and then get spat out to
        the writer.
        output = []
        input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
        c = pop3._IteratorBuffer(output.extend, input, 6)
        i = iter(c)
        self.assertEquals(output, []) # nothing is buffer
        self.assertEquals(output, []) # '012' is buffered
        self.assertEquals(output, []) # '012345' is buffered
        self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
        for n in range(5):
        self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def slidesCallback(self):
        # get next slide from iterable cycle
        currentInstance, nameOfSlide = next(self.iterableCycle)

        # assign next slide to Label widget

        # update Window title with current slide

        # recursively repeat the Show
        self.after(self.showTime, self.slidesCallback)

# Start GUI
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def slidesCallback(self):
        # get next slide from iterable cycle
        currentInstance, nameOfSlide = next(self.iterableCycle)

        # assign next slide to Label widget

        # update Window title with current slide

        # recursively repeat the Show
        self.after(self.showTime, self.slidesCallback)

# Start GUI
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def __init__(self, msShowTimeBetweenSlides=1500):
        # initialize tkinter super class

        # time each slide will be shown
        self.showTime = msShowTimeBetweenSlides

        # look for images in current working directory where this module lives
        # try: .jpeg
        chapter_folder = path.realpath(path.dirname(__file__))
        resources_folder = path.join(chapter_folder, 'Resources')
        listOfSlides = [slide for slide in listdir(resources_folder) if slide.endswith('gif') or slide.endswith('jpg')]

        # endlessly read in the slides so we can show them on the tkinter Label 
        self.iterableCycle = cycle((PhotoImage(file=slide), slide) for slide in listOfSlides)

        # create tkinter Label widget which can also display images
        self.slidesLabel = Label(self)

        # create the Frame widget
项目:novajoin    作者:openstack    | 项目源码 | 文件源码
def get_api_servers():
    """Return iterator of glance api_servers.

    Return iterator of glance api_servers to cycle through the
    list, looping around to the beginning if necessary.

    api_servers = []

    ks = keystone_client.get_client()

    catalog = keystone_client.get_service_catalog(ks)

    image_service = catalog.url_for(service_type='image')
    if image_service:

    if CONF.glance_api_servers:
        for api_server in CONF.glance_api_servers:

    return itertools.cycle(api_servers)
项目:DNGR-Keras    作者:MdAsifKhan    | 项目源码 | 文件源码
def cluster(data,true_labels,n_clusters=3):

    km = KMeans(init='k-means++', n_clusters=n_clusters, n_init=10)

    km_means_labels = km.labels_
    km_means_cluster_centers = km.cluster_centers_
    km_means_labels_unique = np.unique(km_means_labels)

    colors_ = cycle(colors.cnames.keys())

    initial_dim = np.shape(data)[1]
    data_2 = tsne(data,2,initial_dim,30)

    plt.figure(figsize=(12, 6))
    plt.scatter(data_2[:,0],data_2[:,1], c=true_labels)
    plt.title('True Labels')

    return km_means_labels
项目:NuGridPy    作者:NuGrid    | 项目源码 | 文件源码
def plot_prof_2(self, mod, species, xlim1, xlim2):

        Plot one species for cycle between xlim1 and xlim2

        mod : string or integer
            Model to plot, same as cycle number.
        species : list
            Which species to plot.
        xlim1, xlim2 : float
            Mass coordinate range.

        pyl.plot(mass,Xspecies,'-',label=str(mod)+', '+species)
项目:pytorch-skipthoughts    作者:kaniblu    | 项目源码 | 文件源码
def chunks(it, n, k):
    buffer = [[] for _ in range(n)]
    buf_it = iter(itertools.cycle(buffer))

    for item in it:
        buf_item = next(buf_it)

        if len(buf_item) == k:
            yield buffer
            buffer = [[] for _ in range(n)]
            buf_it = iter(itertools.cycle(buffer))
            buf_item = next(buf_it)


    if all(buffer):
        yield buffer
项目:mauzr    作者:eqrx    | 项目源码 | 文件源码
def _on_graph(self):
        for cfg in self._cfg:
            size = [str(s) for s in cfg["size"]]
            limits = [str(s) for s in cfg["limits"]]
            g = [cfg["path"], "--imgformat", "PNG",
                 "-w", size[0], "-h", size[1],
                 "--vertical-label", cfg["vlabel"], "-t", cfg["title"],
                 "-s", str(round(time.time()-cfg["length"])),
                 "-l", limits[0], "-u", limits[1]]
            color_iterator = itertools.cycle(("ff0000", "00ff00", "0000ff",
                                              "ff00ff", "ffff00", "00ffff"))

            for entry in cfg["lines"]:
                color = next(color_iterator)

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, key):
        self.__key_gen = itertools.cycle([ord(x) for x in key]).next
        self.__key_xor = lambda s: ''.join(chr(ord(x) ^ self.__key_gen()) for x in s)
        if len(key) == 1:
                from Crypto.Util.strxor import strxor_c
                c = ord(key)
                self.__key_xor = lambda s: strxor_c(s, c)
            except ImportError:
                #logging.debug('Load Crypto.Util.strxor Failed, Use Pure Python Instead.\n')
项目:ropf    作者:kevinkoo001    | 项目源码 | 文件源码
def get_swap_subset(self, subset, other):
        # get a copy of subset!
        subset = subset.copy()
        # print "will check",,, "in", subset

        for lifetime in itertools.cycle((self, other)):
            old_size = subset.size
            for sg in lifetime.subsets:
                if sg.intersects(subset):
                    # print "\t", sg, "intersects", subset
                    if sg.no_swap:
                        # print "\tsg is no swap .. bail out"
                        return None
                    # print "\tmerging them!"
            if subset.size == old_size:
                # print "\tsize did not change, done!"

        return subset
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_lengths():
    s = fields.Schema(f1=fields.KEYWORD(stored=True, scorable=True),
                      f2=fields.KEYWORD(stored=True, scorable=True))
    with TempIndex(s, "testlengths") as ix:
        w = ix.writer()
        items = u("ABCDEFG")
        from itertools import cycle, islice
        lengths = [10, 20, 2, 102, 45, 3, 420, 2]
        for length in lengths:
            w.add_document(f2=u(" ").join(islice(cycle(items), length)))

        with ix.reader() as dr:
            ls1 = [dr.doc_field_length(i, "f1")
                   for i in xrange(0, len(lengths))]
            assert ls1 == [0] * len(lengths)
            ls2 = [dr.doc_field_length(i, "f2")
                   for i in xrange(0, len(lengths))]
            assert ls2 == [byte_to_length(length_to_byte(l)) for l in lengths]
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def main(n = int(sys.argv[1]), n_threads=503, cycle=itertools.cycle):

    def worker(worker_id):

        n = 1
        while True:
            print n
            if n > 0:
                n = (yield (n - 1))
                print worker_id
                raise StopIteration

    threadRing = [worker(w) for w in xrange(1, n_threads + 1)]
    for t in threadRing: foo =           # start exec. gen. funcs
    sendFuncRing = [t.send for t in threadRing]   # speed...
    for send in cycle(sendFuncRing):
            n = send(n)
        except StopIteration:
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_IOBase_finalize(self):
        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
        # class which inherits IOBase and an object of this class are caught
        # in a reference cycle and close() is already in the method cache.
        class MyIO(self.IOBase):
            def close(self):

        # create an instance to populate the method cache
        obj = MyIO()
        obj.obj = obj
        wr = weakref.ref(obj)
        del MyIO
        del obj
        self.assertTrue(wr() is None, wr)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def next_phase(self):
        if not hasattr(self, "_phaser"):
            self._phaser = itertools.cycle(self.phases)
        return next(self._phaser)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, message, file=None, spin_chars="-\\|/",
                 # Empirically, 8 updates/second looks nice
        self._message = message
        if file is None:
            file = sys.stdout
        self._file = file
        self._rate_limiter = RateLimiter(min_update_interval_seconds)
        self._finished = False

        self._spin_cycle = itertools.cycle(spin_chars)

        self._file.write(" " * get_indentation() + self._message + " ... ")
        self._width = 0
项目:variational-text-tensorflow    作者:carpedm20    | 项目源码 | 文件源码
def iterator(self, data_type="train"):
    raw_data = self.get_data_from_type(data_type)
    return itertools.cycle(([self.onehot(data), data] for data in raw_data if data != []))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def next_phase(self):
        if not hasattr(self, "_phaser"):
            self._phaser = itertools.cycle(self.phases)
        return next(self._phaser)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(self, message, file=None, spin_chars="-\\|/",
                 # Empirically, 8 updates/second looks nice
        self._message = message
        if file is None:
            file = sys.stdout
        self._file = file
        self._rate_limiter = RateLimiter(min_update_interval_seconds)
        self._finished = False

        self._spin_cycle = itertools.cycle(spin_chars)

        self._file.write(" " * get_indentation() + self._message + " ... ")
        self._width = 0
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, distance=None):
        self.nodes = itertools.cycle(get_nodes())
        self.distance = distance
项目:zellij    作者:nedbat    | 项目源码 | 文件源码
def test_segment_sort_along(p1, p2, tvals):
    # Get rid of pathological cases.
    assume(p1.distance(p2) > 0.001)

    tvals = [t / 100 for t in tvals]
    fuzz = [1e-10, -1e-10]
    points = [along_the_way(p1, p2, t) for t in tvals]
    points = [Point(x+f, y+f) for (x, y), f in zip(points, itertools.cycle(fuzz))]

    # Calculate the smallest distance between any pair of points.  If we get
    # the wrong answer from sort_along, then the total distance will be off by
    # at least twice this.
    min_gap = min(q1.distance(q2) for q1, q2 in all_pairs(points + [p1, p2]))

    seg = Segment(p1, p2)
    spoints = seg.sort_along(points)

    assert len(spoints) == len(points)
    assert all(pt in points for pt in spoints)

    original = Point(*p1).distance(Point(*p2))
    total = (
        Point(*p1).distance(Point(*spoints[0])) +
        sum(Point(*p).distance(Point(*q)) for p, q in adjacent_pairs(spoints)) +
    # The total distance will be wrong by at least 2*min_gap if it is wrong.
    assert total - original < 2 * min_gap

# Bounds
项目:FCN_train    作者:315386775    | 项目源码 | 文件源码
def test_bg_and_color_cycle():
    image = np.zeros((1, 10))  # dummy image
    label = np.arange(10).reshape(1, -1)
    colors = [(1, 0, 0), (0, 0, 1)]
    bg_color = (0, 0, 0)
    rgb = label2rgb(label, image=image, bg_label=0, bg_color=bg_color,
                    colors=colors, alpha=1)
    assert_close(rgb[0, 0], bg_color)
    for pixel, color in zip(rgb[0, 1:], itertools.cycle(colors)):
        assert_close(pixel, color)
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def __call__(self, bs):
        vals = self.dtype(bs)
        if type(vals) not in (tuple, list):
            vals = (vals,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([]), vals, self.dtype.units)}
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def __call__(self, msg):
        parsed = self.parser(msg)
        # if parsed is None:
        #     return None
        if type(parsed) not in [tuple, list]:
            parsed = (parsed,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([]), parsed, self.fields)}
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def __call__(self, msg):
        parsed = self.parser(msg)
        if type(parsed) not in [tuple, list]:
            parsed = (parsed,)
        return {'{} ({})'.format(pid_name, unit): val for pid_name, val, unit in
                zip(cycle([]), parsed, self.fields)}
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def test_send():
    btl = BluetoothLogger(fields=["speed", "rpm", "soc"], password=password)

    # generate some data and send it
    x = range(1000)
    y = map(lambda v: sin(v * pi / 45) * 5000 + 5000, x)
    speeds = cycle(y)
    print("Sending dummy data")
    while 1:
            row = map(str, [round(next(speeds), 2), 5000, 50])
        except KeyboardInterrupt:
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def run(self):
        for consumer in itertools.cycle(self.consumers):
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_first_failure(self):
        statements = cycle(("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)", ))
        parameters = [(i, i) for i in range(100)]

        # we'll get an error back from the server
        parameters[57] = ('efefef', 'awefawefawef')

            execute_concurrent, self.session, list(zip(statements, parameters)), raise_on_first_error=True)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            self.assertEqual(100, len(list(self.session.execute("SELECT * FROM test3rf.test"))))

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            self.assertEqual(100, len(list(self.session.execute(statement))))

            self.assertEqual(100, len(list(self.session.execute(prepared))))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_paging_state(self):
        Test to validate paging state api
        @since 3.7.0
        @jira_ticket PYTHON-200
        @expected_result paging state should returned should be accurate, and allow for queries to be resumed.

        @test_category queries
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        list_all_results = []
        self.session.default_fetch_size = 3

        result_set = self.session.execute("SELECT * FROM test3rf.test")
            for row in result_set.current_rows:
                self.assertNotIn(row, list_all_results)
            page_state = result_set.paging_state
            result_set = self.session.execute("SELECT * FROM test3rf.test", paging_state=page_state)

        if(len(result_set.current_rows) > 0):
        self.assertEqual(len(list_all_results), 100)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_paging_verify_writes(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, statements_and_params)

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            results = self.session.execute("SELECT * FROM test3rf.test")
            result_array = set()
            result_set = set()
            for result in results:

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            results = self.session.execute(statement)
            result_array = set()
            result_set = set()
            for result in results:

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)

            results = self.session.execute(prepared)
            result_array = set()
            result_set = set()
            for result in results:

            self.assertEqual(set(range(100)), result_array)
            self.assertEqual(set([0]), result_set)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_async_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            self.assertEqual(100, len(list(self.session.execute_async("SELECT * FROM test3rf.test").result())))

            statement = SimpleStatement("SELECT * FROM test3rf.test")
            self.assertEqual(100, len(list(self.session.execute_async(statement).result())))

            self.assertEqual(100, len(list(self.session.execute_async(prepared).result())))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_concurrent_with_paging(self):
        statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
                                    [(i, ) for i in range(100)])
        execute_concurrent(self.session, list(statements_and_params))

        prepared = self.session.prepare("SELECT * FROM test3rf.test")

        for fetch_size in (2, 3, 7, 10, 99, 100, 101, 10000):
            self.session.default_fetch_size = fetch_size
            results = execute_concurrent_with_args(self.session, prepared, [None] * 10)
            self.assertEqual(10, len(results))
            for (success, result) in results:
                self.assertEqual(100, len(list(result)))