Python eventlet 模块,hubs() 实例源码

我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用eventlet.hubs()

项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_schedule(self):
        hub = hubs.get_hub()
        # clean up the runloop, preventing side effects from previous tests
        # on this thread
        if hub.running:
            hub.abort()
            eventlet.sleep(0)
        called = []
        # t = timer.Timer(0, lambda: (called.append(True), hub.abort()))
        # t.schedule()
        # let's have a timer somewhere in the future; make sure abort() still works
        # (for pyevent, its dispatcher() does not exit if there is something scheduled)
        # XXX pyevent handles this, other hubs do not
        # hubs.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort()))
        hubs.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort()))
        hub.default_sleep = lambda: 0.0
        hub.switch()
        assert called
        assert not hub.running
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_sleep(self):
        # even if there was an error in the mainloop, the hub should continue
        # to work
        start = time.time()
        eventlet.sleep(DELAY)
        delay = time.time() - start

        assert delay >= DELAY * \
            0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
                delay, DELAY)

        def fail():
            1 // 0

        hubs.get_hub().schedule_call_global(0, fail)

        start = time.time()
        eventlet.sleep(DELAY)
        delay = time.time() - start

        assert delay >= DELAY * \
            0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
                delay, DELAY)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_exceptionleaks(self):
        # tests expected behaviour with all versions of greenlet
        def test_gt(sem):
            try:
                raise KeyError()
            except KeyError:
                sem.release()
                hubs.get_hub().switch()

        # semaphores for controlling execution order
        sem = eventlet.Semaphore()
        sem.acquire()
        g = eventlet.spawn(test_gt, sem)
        try:
            sem.acquire()
            assert sys.exc_info()[0] is None
        finally:
            g.kill()
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_kill(self):
        """ Checks that killing a process after the hub runloop dies does
        not immediately return to hub greenlet's parent and schedule a
        redundant timer. """
        hub = hubs.get_hub()

        def dummyproc():
            hub.switch()

        g = eventlet.spawn(dummyproc)
        eventlet.sleep(0)  # let dummyproc run
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        # kill dummyproc, this schedules a timer to return execution to
        # this greenlet before throwing an exception in dummyproc.
        # it is from this timer that execution should be returned to this
        # greenlet, and not by propogating of the terminating greenlet.
        g.kill()
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub, there should be no existing timers
            # that switch back to this greenlet and so this hub.switch()
            # call should block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_parent(self):
        """ Checks that a terminating greenthread whose parent
        was a previous, now-defunct hub greenlet returns execution to
        the hub runloop and not the hub greenlet's parent. """
        hub = hubs.get_hub()

        def dummyproc():
            pass

        g = eventlet.spawn(dummyproc)
        assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
        self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
                          KeyboardInterrupt())

        assert not g.dead  # check dummyproc hasn't completed
        with eventlet.Timeout(0.5, self.CustomException()):
            # we now switch to the hub which will allow
            # completion of dummyproc.
            # this should return execution back to the runloop and not
            # this greenlet so that hub.switch() would block indefinitely.
            self.assertRaises(self.CustomException, hub.switch)
        assert g.dead  # sanity check that dummyproc has completed
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def verify_hub_empty():

    def format_listener(listener):
        return 'Listener %r for greenlet %r with run callback %r' % (
            listener, listener.greenlet, getattr(listener.greenlet, 'run', None))

    from eventlet import hubs
    hub = hubs.get_hub()
    readers = hub.get_readers()
    writers = hub.get_writers()
    num_readers = len(readers)
    num_writers = len(writers)
    num_timers = hub.get_timers_count()
    assert num_readers == 0 and num_writers == 0, \
        "Readers: %s (%d) Writers: %s (%d)" % (
            ', '.join(map(format_listener, readers)), num_readers,
            ', '.join(map(format_listener, writers)), num_writers,
        )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:tabmaster    作者:NicolasMinghetti    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:infiblog    作者:RajuKoushik    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:logo-gen    作者:jellene4eva    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:compatify    作者:hatooku    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_cancel_immediate(self):
        hub = hubs.get_hub()
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(2000):
            t = hubs.get_hub().schedule_call_global(60, noop)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
        # there should be fewer than 1000 new timers and canceled
        self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
        self.assert_less_than_equal(hub.timers_canceled, 1000)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_cancel_accumulated(self):
        hub = hubs.get_hub()
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(2000):
            t = hubs.get_hub().schedule_call_global(60, noop)
            eventlet.sleep()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1, hub.timers)
        # there should be fewer than 1000 new timers and canceled
        self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
        self.assert_less_than_equal(hub.timers_canceled, 1000)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_cancel_proportion(self):
        # if fewer than half the pending timers are canceled, it should
        # not clean them out
        hub = hubs.get_hub()
        uncanceled_timers = []
        stimers = hub.get_timers_count()
        scanceled = hub.timers_canceled
        for i in six.moves.range(1000):
            # 2/3rds of new timers are uncanceled
            t = hubs.get_hub().schedule_call_global(60, noop)
            t2 = hubs.get_hub().schedule_call_global(60, noop)
            t3 = hubs.get_hub().schedule_call_global(60, noop)
            eventlet.sleep()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count() + 1)
            uncanceled_timers.append(t2)
            uncanceled_timers.append(t3)
        # 3000 new timers, plus a few extras
        self.assert_less_than_equal(stimers + 3000,
                                    stimers + hub.get_timers_count())
        self.assertEqual(hub.timers_canceled, 1000)
        for t in uncanceled_timers:
            t.cancel()
            self.assert_less_than_equal(hub.timers_canceled,
                                        hub.get_timers_count())
        eventlet.sleep()
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_local(self):
        lst = [1]
        eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
        eventlet.sleep(0)
        eventlet.sleep(DELAY * 2)
        assert lst == [1], lst
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_ordering(self):
        lst = []
        hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
        hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
        hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
        while len(lst) < 3:
            eventlet.sleep(DELAY)
        self.assertEqual(lst, [1, 2, 3])
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_debug_listeners(self):
        hubs.get_hub().set_debug_listeners(True)
        hubs.get_hub().set_debug_listeners(False)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_timer_exceptions(self):
        hubs.get_hub().set_timer_exceptions(True)
        hubs.get_hub().set_timer_exceptions(False)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_kqueue_unsupported(self):
        # https://github.com/eventlet/eventlet/issues/38
        # get_hub on windows broken by kqueue
        module_source = r'''
from __future__ import print_function

# Simulate absence of kqueue even on platforms that support it.
import select
try:
    del select.kqueue
except AttributeError:
    pass

from eventlet.support.six.moves import builtins

original_import = builtins.__import__

def fail_import(name, *args, **kwargs):
    if 'epoll' in name:
        raise ImportError('disabled for test')
    if 'kqueue' in name:
        print('kqueue tried')
    return original_import(name, *args, **kwargs)

builtins.__import__ = fail_import


import eventlet.hubs
eventlet.hubs.get_default_hub()
print('ok')
'''
        self.write_to_tempfile('newmod', module_source)
        output, _ = self.launch_subprocess('newmod.py')
        self.assertEqual(output, 'kqueue tried\nok\n')
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def using_pyevent(_f):
    from eventlet.hubs import get_hub
    return 'pyevent' in type(get_hub()).__module__
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def patch(self):
        hubs.use_hub()
        eventlet.monkey_patch(os=False)
        patch_sendfile()