Python inspect 模块,isgenerator() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.isgenerator()

项目:easy-py-web-app    作者:ma-ha    | 项目源码 | 文件源码
def default(self, obj):
        if hasattr(obj, "to_json"):
            return self.default(obj.to_json())
        elif hasattr(obj, "__dict__"):
            d = dict(
                (key, value)
                for key, value in inspect.getmembers(obj)
                if not key.startswith("__")
                and not inspect.isabstract(value)
                and not inspect.isbuiltin(value)
                and not inspect.isfunction(value)
                and not inspect.isgenerator(value)
                and not inspect.isgeneratorfunction(value)
                and not inspect.ismethod(value)
                and not inspect.ismethoddescriptor(value)
                and not inspect.isroutine(value)
            )
            return self.default(d)
        return obj
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        self.istest(inspect.isbuiltin, 'sys.exit')
        if check_impl_detail():
            self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.__code__')
        self.istest(inspect.isframe, 'tb.tb_frame')
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.isfunction, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.istraceback, 'tb')
        self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory')
        self.istest(inspect.isgenerator, '(x for x in range(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'GetSetDescriptorType'):
            self.istest(inspect.isgetsetdescriptor,
                        'type(tb.tb_frame).f_locals')
        else:
            self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor,
                        'type(lambda: None).__globals__')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def _uid_str(uid_list: str or [str] or Generator) -> str:
        """
        Prepare list of uid for use in commands: delete/copy/move/seen
        uid_list can be: str, list, tuple, set, fetch generator
        """
        if not uid_list:
            raise MailBox.MailBoxUidParamError('uid_list should not be empty')
        if type(uid_list) is str:
            uid_list = uid_list.split(',')
        if inspect.isgenerator(uid_list):
            uid_list = [msg.uid for msg in uid_list if msg.uid]
        if type(uid_list) not in (list, tuple, set):
            raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list)))
        for uid in uid_list:
            if type(uid) is not str:
                raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid)))
            if not uid.strip().isdigit():
                raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid))
        return ','.join((i.strip() for i in uid_list))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        self.istest(inspect.isbuiltin, 'sys.exit')
        self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.func_code')
        self.istest(inspect.isframe, 'tb.tb_frame')
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.istraceback, 'tb')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
        self.istest(inspect.isgenerator, '(x for x in xrange(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'GetSetDescriptorType'):
            self.istest(inspect.isgetsetdescriptor,
                        'type(tb.tb_frame).f_locals')
        else:
            self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        self.istest(inspect.isbuiltin, 'sys.exit')
        self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.func_code')
        self.istest(inspect.isframe, 'tb.tb_frame')
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.istraceback, 'tb')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
        self.istest(inspect.isgenerator, '(x for x in xrange(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'GetSetDescriptorType'):
            self.istest(inspect.isgetsetdescriptor,
                        'type(tb.tb_frame).f_locals')
        else:
            self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, paths):
        """Delete L{Namespace}s matching C{paths}.

        @param paths: A sequence of L{Namespace.path}s.
        @raises NotEmptyError: Raised if the L{Namespace} is not empty.
        @return: A C{list} of C{(objectID, Namespace.path)} 2-tuples
            representing the deleted L{Namespace}s.
        """
        if isgenerator(paths):
            paths = list(paths)
        if getChildNamespaces(paths).any() or getChildTags(paths).any():
            raise NotEmptyError("Can't delete non-empty namespaces.")

        result = getNamespaces(paths=paths)
        deletedNamespaces = list(result.values(Namespace.objectID,
                                               Namespace.path))
        values = [(objectID, systemTag)
                  for objectID, _ in deletedNamespaces
                  for systemTag in (u'fluiddb/namespaces/description',
                                    u'fluiddb/namespaces/path')]
        if values:
            self._factory.tagValues(self._user).delete(values)
        result.remove()
        return deletedNamespaces
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, values):
        """Delete L{TagValue}s.

        @param values: A sequence of C{(objectID, Tag.path)} 2-tuples to
            delete values for.
        @raise FeatureError: Raised if the given list of values is empty.
        @return: The number of values deleted.
        """
        if isgenerator(values):
            values = list(values)
        if not values:
            raise FeatureError("Can't delete an empty list of tag values.")

        paths = set([path for objectID, path in values])
        objectIDs = set([objectID for objectID, path in values])
        tagIDs = dict(getTags(paths).values(Tag.path, Tag.id))
        values = [(objectID, tagIDs[path]) for objectID, path in values]
        result = getTagValues(values).remove()
        if result:
            touchObjects(objectIDs)
        return result
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, paths):
        """See L{TagAPI.delete}.

        Permissions for deleted L{Tag}s are removed from the cache.
        """
        if isgenerator(paths):
            paths = list(paths)
        # FIXME getObjectIDs is called twice--once here and once in
        # TagAPI.delete.  It would be better if we only did this once, not to
        # mention that this breaks encapsulation by bypassing the model layer
        # and accessing the data layer directly. -jkakar
        objectIDs = set(getObjectIDs(paths))
        RecentObjectActivityCache().clear(objectIDs)
        usernames = set([path.split('/')[0] for path in paths])
        RecentUserActivityCache().clear(usernames)
        PermissionCache().clearTagPermissions(paths)
        return self._api.delete(paths)
项目:fulmar    作者:tylderen    | 项目源码 | 文件源码
def update(self, db_name, coll_name, results, task, query, upsert=True):
        self.db_coll = self.db_conn[db_name][coll_name]

        if inspect.isgenerator(results):
            pass
        elif not isinstance(results, list):
            results = [results]

        for r in results:
            assert isinstance(r, dict), 'result saved to mongodb must be dict.'
            if not query:
                query = {'taskid': task.get('taskid')}
            self.db_coll.update(
                query,
                {'$set': r,
                 '$setOnInsert': {'updated_at': time.time()}},
                upsert=upsert,
            )
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def is_closable_iterator(obj):

    # Not an iterator.
    if not is_iterator(obj):
        return False

    # A generator - the easiest thing to deal with.
    import inspect
    if inspect.isgenerator(obj):
        return True

    # A custom iterator. Look for a close method...
    if not (hasattr(obj, 'close') and callable(obj.close)):
        return False

    #  ... which doesn't require any arguments.
    try:
        inspect.getcallargs(obj.close)
    except TypeError:
        return False
    else:
        return True
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        self.istest(inspect.isbuiltin, 'sys.exit')
        if check_impl_detail():
            self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.func_code')
        self.istest(inspect.isframe, 'tb.tb_frame')
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.istraceback, 'tb')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
        self.istest(inspect.isgenerator, '(x for x in xrange(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'GetSetDescriptorType'):
            self.istest(inspect.isgetsetdescriptor,
                        'type(tb.tb_frame).f_locals')
        else:
            self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'type(lambda: None).func_globals')
        else:
            self.assertFalse(inspect.ismemberdescriptor(type(lambda: None).func_globals))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        self.istest(inspect.isbuiltin, 'sys.exit')
        self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.func_code')
        self.istest(inspect.isframe, 'tb.tb_frame')
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.ismethod, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.istraceback, 'tb')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.closed')
        self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace')
        self.istest(inspect.isgenerator, '(x for x in xrange(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'GetSetDescriptorType'):
            self.istest(inspect.isgetsetdescriptor,
                        'type(tb.tb_frame).f_locals')
        else:
            self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
项目:ga4gh-biosamples    作者:EBISPOT    | 项目源码 | 文件源码
def default(self, obj):
        # if hasattr(obj, "to_json"):
        #     return self.default(obj.to_json())
        if isinstance(obj, Enum):
            return obj.name
        elif hasattr(obj, "__dict__"):
            d = dict(
                (key, value)
                for key, value in inspect.getmembers(obj)
                if not key.startswith("__")
                and not inspect.isabstract(value)
                and not inspect.isbuiltin(value)
                and not inspect.isfunction(value)
                and not inspect.isgenerator(value)
                and not inspect.isgeneratorfunction(value)
                and not inspect.ismethod(value)
                and not inspect.ismethoddescriptor(value)
                and not inspect.isroutine(value)
                and not self.isempty(value)
                and not value is None
            )
            return self.default(d)
        return obj
项目:cess    作者:frnsys    | 项目源码 | 文件源码
def call_agents(self, data):
        """call a method on all agents"""
        try:
            d = {'args': [], 'kwargs': {}}
            d.update(data)
            if self.agents:
                results = [getattr(agent, d['func'])(*d['args'], **d['kwargs']) for agent in self.agents.values()]
                if inspect.isgenerator(results[0]):
                    results = yield from asyncio.gather(*results)
            else:
                results = []
            return {'status': 'ok', 'results': results}
        except Exception as e:
            tb = traceback.format_exc()
            logger.exception(e)
            logger.exception(tb)
            return {'status': 'failed', 'exception': e, 'traceback': tb}
项目:cess    作者:frnsys    | 项目源码 | 文件源码
def call_agent(self, data):
        """call a method on an agent and get the result"""
        d = {'args': [], 'kwargs': {}}
        d.update(data)
        id = d['id']

        # check locally
        if id in self.agents:
            agent = self.agents[id]
            result = getattr(agent, d['func'])(*d['args'], **d['kwargs'])
            if inspect.isgenerator(result):
                result = yield from result
            return result

        # pass request to the arbiter
        else:
            d['cmd'] = 'call_agent'
            return (yield from self.arbiter.send_recv(d))
项目:plumeria    作者:sk89q    | 项目源码 | 文件源码
def __getattr__(self, item):
        attr = getattr(self.delegate, item)

        if inspect.iscoroutinefunction(attr) or hasattr(attr,
                                                        "_is_coroutine") and attr._is_coroutine or inspect.iscoroutine(
            attr):
            async def wrapper(*args, **kwargs):
                return self._wrap(await attr(*args, **kwargs))

            return wrapper() if inspect.iscoroutine(attr) else wrapper
        elif inspect.isgeneratorfunction(attr) or inspect.isgenerator(attr):
            def wrapper(*args, **kwargs):
                for entry in attr(*args, **kwargs):
                    yield self._wrap(entry)

            return wrapper if inspect.isgeneratorfunction(attr) else wrapper()
        elif inspect.isfunction(attr):
            def wrapper(*args, **kwargs):
                return self._wrap(attr(*args, **kwargs))

            return wrapper
        else:
            return self._wrap(attr)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def is_closable_iterator(obj):

    # Not an iterator.
    if not is_iterator(obj):
        return False

    # A generator - the easiest thing to deal with.
    import inspect
    if inspect.isgenerator(obj):
        return True

    # A custom iterator. Look for a close method...
    if not (hasattr(obj, 'close') and callable(obj.close)):
        return False

    #  ... which doesn't require any arguments.
    try:
        inspect.getcallargs(obj.close)
    except TypeError:
        return False
    else:
        return True
项目:data_structures    作者:prawn-cake    | 项目源码 | 文件源码
def test_tree(self):
        prefix_tree = PrefixTree()
        values = ['amy', 'ann', 'anne', 'emma', 'rob', 'roger', 'anna']

        # Test setter
        for value in values:
            prefix_tree[value] = value

        # Test getter
        result = prefix_tree['ann']
        self.assertTrue(result)
        self.assertTrue(inspect.isgenerator(result))
        # expect 'ann', 'anne' and 'anna'
        self.assertEqual(len(list(result)), 3)

        # Test containment
        self.assertTrue('amy' in prefix_tree)
        self.assertFalse('am' in prefix_tree)
项目:hbipy    作者:complyue    | 项目源码 | 文件源码
def _send_code(self, code, wire_dir=None):
        if not self.connected:
            raise RuntimeError('HBI wire not connected')

        # convert wire_dir as early as possible, will save conversions in case the code is of complex structure
        if wire_dir is None:
            wire_dir = b''
        elif not isinstance(wire_dir, (bytes, bytearray)):
            wire_dir = str(wire_dir).encode('utf-8')

        # use a generator function to pull code from hierarchy
        def pull_code(container):
            for mc in container:
                if inspect.isgenerator(mc):
                    yield from pull_code(mc)
                else:
                    yield mc

        if inspect.isgenerator(code):
            for c in pull_code(code):
                await self._send_text(c, wire_dir)
        else:
            await self._send_text(code, wire_dir)
项目:kervi    作者:kervi    | 项目源码 | 文件源码
def default(self, obj):
        if hasattr(obj, "to_json"):
            return self.default(obj.to_json())
        elif hasattr(obj, "__dict__"):
            data = dict(
                (key, value)
                for key, value in inspect.getmembers(obj)
                if not key.startswith("__")
                and not inspect.isabstract(value)
                and not inspect.isbuiltin(value)
                and not inspect.isfunction(value)
                and not inspect.isgenerator(value)
                and not inspect.isgeneratorfunction(value)
                and not inspect.ismethod(value)
                and not inspect.ismethoddescriptor(value)
                and not inspect.isroutine(value)
            )
            return self.default(data)
        return obj
项目:kervi    作者:kervi    | 项目源码 | 文件源码
def default(self, obj):
        if hasattr(obj, "to_json"):
            return self.default(obj.to_json())
        elif hasattr(obj, "__dict__"):
            data = dict(
                (key, value)
                for key, value in inspect.getmembers(obj)
                if not key.startswith("__")
                and not inspect.isabstract(value)
                and not inspect.isbuiltin(value)
                and not inspect.isfunction(value)
                and not inspect.isgenerator(value)
                and not inspect.isgeneratorfunction(value)
                and not inspect.ismethod(value)
                and not inspect.ismethoddescriptor(value)
                and not inspect.isroutine(value)
            )
            return self.default(data)
        return obj
项目:kervi    作者:kervi    | 项目源码 | 文件源码
def default(self, obj):
        if hasattr(obj, "to_json"):
            return self.default(obj.to_json())
        elif hasattr(obj, "__dict__"):
            data = dict(
                (key, value)
                for key, value in inspect.getmembers(obj)
                if not key.startswith("__")
                and not inspect.isabstract(value)
                and not inspect.isbuiltin(value)
                and not inspect.isfunction(value)
                and not inspect.isgenerator(value)
                and not inspect.isgeneratorfunction(value)
                and not inspect.ismethod(value)
                and not inspect.ismethoddescriptor(value)
                and not inspect.isroutine(value)
            )
            return self.default(data)
        return obj
项目:vdi-broker    作者:cloudbase    | 项目源码 | 文件源码
def post_process_extensions(self, extensions, resp_obj, request,
                                action_args):
        for ext in extensions:
            response = None
            if inspect.isgenerator(ext):
                # If it's a generator, run the second half of
                # processing
                try:
                    with ResourceExceptionHandler():
                        response = ext.send(resp_obj)
                except StopIteration:
                    # Normal exit of generator
                    continue
                except Fault as ex:
                    response = ex
            else:
                # Regular functions get post-processing...
                try:
                    with ResourceExceptionHandler():
                        response = ext(req=request, resp_obj=resp_obj,
                                       **action_args)
                except Fault as ex:
                    response = ex

            # We had a response...
            if response:
                return response

        return None
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_as_completed(monkeypatch):
    service = RemoteService(DUMMY_SERVICE_URL)
    monkeypatch.setattr(urllib.request, 'urlopen', dummy_urlopen)

    items = [service.call_method_async("test", []) for _ in range(10)]

    data = as_completed(*items)

    assert inspect.isgenerator(data)

    results = list(data)
    assert len(results) == 10
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def isgenerator(o):
        if isinstance(o, UnboundMethod):
            o = o._func
        return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def isgenerator(func):
        try:
            return func.func_code.co_flags & CO_GENERATOR != 0
        except AttributeError:
            return False

# Make a function to help check if an exception is derived from BaseException.
# In Python 2.4, we just use Exception instead.
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def post_process_extensions(self, extensions, resp_obj, request,
                                action_args):
        for ext in extensions:
            response = None
            if inspect.isgenerator(ext):
                # If it's a generator, run the second half of
                # processing
                try:
                    with ResourceExceptionHandler():
                        response = ext.send(resp_obj)
                except StopIteration:
                    # Normal exit of generator
                    continue
                except Fault as ex:
                    response = ex
            else:
                # Regular functions get post-processing...
                try:
                    with ResourceExceptionHandler():
                        response = ext(req=request, resp_obj=resp_obj,
                                       **action_args)
                except exception.VersionNotFoundForAPIMethod:
                    # If an attached extension (@wsgi.extends) for the
                    # method has no version match it is not an error. We
                    # just don't run the extends code
                    continue
                except Fault as ex:
                    response = ex

            # We had a response...
            if response:
                return response

        return None
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def underscore_memoization(func):
    """
    Decorator for methods::

        class A(object):
            def x(self):
                if self._x:
                    self._x = 10
                return self._x

    Becomes::

        class A(object):
            @underscore_memoization
            def x(self):
                return 10

    A now has an attribute ``_x`` written by this decorator.
    """
    name = '_' + func.__name__

    def wrapper(self):
        try:
            return getattr(self, name)
        except AttributeError:
            result = func(self)
            if inspect.isgenerator(result):
                result = list(result)
            setattr(self, name, result)
            return result

    return wrapper
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def is_class_instance(obj):
    """Like inspect.* methods."""
    return not (inspect.isclass(obj) or inspect.ismodule(obj)
                or inspect.isbuiltin(obj) or inspect.ismethod(obj)
                or inspect.ismethoddescriptor(obj) or inspect.iscode(obj)
                or inspect.isgenerator(obj))
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
    """ This is a typical memoization decorator, BUT there is one difference:
    To prevent recursion it sets defaults.

    Preventing recursion is in this case the much bigger use than speed. I
    don't think, that there is a big speed difference, but there are many cases
    where recursion could happen (think about a = b; b = a).
    """
    def func(function):
        def wrapper(obj, *args, **kwargs):
            if evaluator_is_first_arg:
                cache = obj.memoize_cache
            elif second_arg_is_evaluator:  # needed for meta classes
                cache = args[0].memoize_cache
            else:
                cache = obj._evaluator.memoize_cache

            try:
                memo = cache[function]
            except KeyError:
                memo = {}
                cache[function] = memo

            key = (obj, args, frozenset(kwargs.items()))
            if key in memo:
                return memo[key]
            else:
                if default is not NO_DEFAULT:
                    memo[key] = default
                rv = function(obj, *args, **kwargs)
                if inspect.isgenerator(rv):
                    rv = list(rv)
                memo[key] = rv
                return rv
        return wrapper
    return func
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def underscore_memoization(func):
    """
    Decorator for methods::

        class A(object):
            def x(self):
                if self._x:
                    self._x = 10
                return self._x

    Becomes::

        class A(object):
            @underscore_memoization
            def x(self):
                return 10

    A now has an attribute ``_x`` written by this decorator.
    """
    name = '_' + func.__name__

    def wrapper(self):
        try:
            return getattr(self, name)
        except AttributeError:
            result = func(self)
            if inspect.isgenerator(result):
                result = list(result)
            setattr(self, name, result)
            return result

    return wrapper


# for fast_parser, should not be deleted
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
    """ This is a typical memoization decorator, BUT there is one difference:
    To prevent recursion it sets defaults.

    Preventing recursion is in this case the much bigger use than speed. I
    don't think, that there is a big speed difference, but there are many cases
    where recursion could happen (think about a = b; b = a).
    """
    def func(function):
        def wrapper(obj, *args, **kwargs):
            if evaluator_is_first_arg:
                cache = obj.memoize_cache
            elif second_arg_is_evaluator:  # needed for meta classes
                cache = args[0].memoize_cache
            else:
                cache = obj._evaluator.memoize_cache

            try:
                memo = cache[function]
            except KeyError:
                memo = {}
                cache[function] = memo

            key = (obj, args, frozenset(kwargs.items()))
            if key in memo:
                return memo[key]
            else:
                if default is not NO_DEFAULT:
                    memo[key] = default
                rv = function(obj, *args, **kwargs)
                if inspect.isgenerator(rv):
                    rv = list(rv)
                memo[key] = rv
                return rv
        return wrapper
    return func
项目:smartcache    作者:wecatch    | 项目源码 | 文件源码
def _is_iterable(data):
        return isinstance(data, list) or isinstance(data, tuple) or isinstance(data, set) or inspect.isgenerator(data)
项目:novajoin    作者:openstack    | 项目源码 | 文件源码
def call(self, context, method, *args, **kwargs):
        """Call a glance client method."""
        if self.client is None:
            self.client = self._glance_client(context)

        retry_excs = (glanceclient.exc.ServiceUnavailable,
                      glanceclient.exc.InvalidEndpoint,
                      glanceclient.exc.CommunicationError)
        retries = CONF.glance_num_retries
        if retries < 0:
            LOG.warning("Treating negative retries as 0")
            retries = 0

        num_attempts = retries + 1

        for attempt in range(1, num_attempts + 1):
            client = self._glance_client(context)
            try:
                controller = getattr(client,
                                     kwargs.pop('controller', 'images'))
                result = getattr(controller, method)(*args, **kwargs)
                if inspect.isgenerator(result):
                    # Convert generator results to a list, so that we can
                    # catch any potential exceptions now and retry the call.
                    return list(result)
                return result
            except retry_excs as e:
                if attempt < num_attempts:
                    extra = "retrying"
                else:
                    extra = "done trying"

                LOG.exception("Error contacting glance server "
                              "'%(server)s' for '%(method)s', "
                              "%(extra)s.",
                              {'server': self.api_server,
                               'method': method, 'extra': extra})
                if attempt == num_attempts:
                    raise exception.GlanceConnectionFailed(
                        server=str(self.api_server), reason=six.text_type(e))
                time.sleep(1)
项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def fix_http_content_length():
    """ Reverse operate done by cherrypy `_be_ie_unfriendly`. """
    response = cherrypy.serving.response
    if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
        response.body = response.collapse_body().strip()
        response.headers['Content-Length'] = str(len(response.collapse_body()))
项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def unescape_response():
    """ Unescape the html body which escaped by `_cpcompat.escape_html()`. """
    response = cherrypy.serving.response
    if not inspect.isgenerator(response.body): # Dont do this in `stream` mode
        response.body = six.binary_type(unescape_html(response.collapse_body()))
项目:0ops.exed    作者:whisperaven    | 项目源码 | 文件源码
def _json_stream_output(next_handler, *args, **kwargs):
    """ Output JSON in stream mode. """
    cherrypy.response.headers['Content-Type'] = "application/json"
    _outputs = next_handler(*args, **kwargs)
    if inspect.isgenerator(_outputs):
        def _stream_outputs():
            for _content in _outputs:
                yield json.dumps(_content)
        return _stream_outputs()
    else:
        return json.dumps(_outputs)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def test_filter_all(self, mock_filter_one):
        mock_filter_one.side_effect = [True, False, True]
        filter_obj_list = ['obj1', 'obj2', 'obj3']
        container = {}
        base_filter = base_filters.BaseFilter()
        extra_spec = {}
        result = base_filter.filter_all(filter_obj_list, container, extra_spec)
        self.assertTrue(inspect.isgenerator(result))
        self.assertEqual(['obj1', 'obj3'], list(result))
项目:coriolis    作者:cloudbase    | 项目源码 | 文件源码
def post_process_extensions(self, extensions, resp_obj, request,
                                action_args):
        for ext in extensions:
            response = None
            if inspect.isgenerator(ext):
                # If it's a generator, run the second half of
                # processing
                try:
                    with ResourceExceptionHandler():
                        response = ext.send(resp_obj)
                except StopIteration:
                    # Normal exit of generator
                    continue
                except Fault as ex:
                    response = ex
            else:
                # Regular functions get post-processing...
                try:
                    with ResourceExceptionHandler():
                        response = ext(req=request, resp_obj=resp_obj,
                                       **action_args)
                except Fault as ex:
                    response = ex

            # We had a response...
            if response:
                return response

        return None
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def do(f):

    @wraps(f)
    def wrapper(*args, **kwargs):
        gen = f(*args, **kwargs)
        if not inspect.isgenerator(gen):
            res = gen
            def generator_no_yield():
                return res
                yield
            gen = generator_no_yield()
        return Effect(ChainedIntent(gen))

    return wrapper
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def run_task(self, module, task, response):
        """
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
        """
        self.logger = logger = module.logger
        result = None
        exception = None
        stdout = sys.stdout
        self.task = task
        if isinstance(response, dict):
            response = rebuild_response(response)
        self.response = response
        self.save = (task.get('track') or {}).get('save', {})

        try:
            if self.__env__.get('enable_stdout_capture', True):
                sys.stdout = ListO(module.log_buffer)
            self._reset()
            result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)
        except Exception as e:
            logger.exception(e)
            exception = e
        finally:
            follows = self._follows
            messages = self._messages
            logs = list(module.log_buffer)
            extinfo = self._extinfo
            save = self.save

            sys.stdout = stdout
            self.task = None
            self.response = None
            self.save = None

        module.log_buffer[:] = []
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def underscore_memoization(func):
    """
    Decorator for methods::

        class A(object):
            def x(self):
                if self._x:
                    self._x = 10
                return self._x

    Becomes::

        class A(object):
            @underscore_memoization
            def x(self):
                return 10

    A now has an attribute ``_x`` written by this decorator.
    """
    name = '_' + func.__name__

    def wrapper(self):
        try:
            return getattr(self, name)
        except AttributeError:
            result = func(self)
            if inspect.isgenerator(result):
                result = list(result)
            setattr(self, name, result)
            return result

    return wrapper
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False):
    """ This is a typical memoization decorator, BUT there is one difference:
    To prevent recursion it sets defaults.

    Preventing recursion is in this case the much bigger use than speed. I
    don't think, that there is a big speed difference, but there are many cases
    where recursion could happen (think about a = b; b = a).
    """
    def func(function):
        def wrapper(obj, *args, **kwargs):
            if evaluator_is_first_arg:
                cache = obj.memoize_cache
            elif second_arg_is_evaluator:  # needed for meta classes
                cache = args[0].memoize_cache
            else:
                cache = obj.evaluator.memoize_cache

            try:
                memo = cache[function]
            except KeyError:
                memo = {}
                cache[function] = memo

            key = (obj, args, frozenset(kwargs.items()))
            if key in memo:
                return memo[key]
            else:
                if default is not NO_DEFAULT:
                    memo[key] = default
                rv = function(obj, *args, **kwargs)
                if inspect.isgenerator(rv):
                    rv = list(rv)
                memo[key] = rv
                return rv
        return wrapper
    return func
项目:aetros-cli    作者:aetros    | 项目源码 | 文件源码
def is_generator(obj):
    import inspect

    return obj is not None and (inspect.isgeneratorfunction(obj) or inspect.isgenerator(obj) or hasattr(obj, 'next') or hasattr(obj, '__next__'))
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def isgenerator(o):
        if isinstance(o, UnboundMethod):
            o = o._func
        return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def isgenerator(func):
        try:
            return func.func_code.co_flags & CO_GENERATOR != 0
        except AttributeError:
            return False

# Make a function to help check if an exception is derived from BaseException.
# In Python 2.4, we just use Exception instead.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_excluding_predicates(self):
        global tb
        self.istest(inspect.isbuiltin, 'sys.exit')
        self.istest(inspect.isbuiltin, '[].append')
        self.istest(inspect.iscode, 'mod.spam.__code__')
        try:
            1/0
        except:
            tb = sys.exc_info()[2]
            self.istest(inspect.isframe, 'tb.tb_frame')
            self.istest(inspect.istraceback, 'tb')
            if hasattr(types, 'GetSetDescriptorType'):
                self.istest(inspect.isgetsetdescriptor,
                            'type(tb.tb_frame).f_locals')
            else:
                self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals))
        finally:
            # Clear traceback and all the frames and local variables hanging to it.
            tb = None
        self.istest(inspect.isfunction, 'mod.spam')
        self.istest(inspect.isfunction, 'mod.StupidGit.abuse')
        self.istest(inspect.ismethod, 'git.argue')
        self.istest(inspect.ismodule, 'mod')
        self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory')
        self.istest(inspect.isgenerator, '(x for x in range(2))')
        self.istest(inspect.isgeneratorfunction, 'generator_function_example')
        if hasattr(types, 'MemberDescriptorType'):
            self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days')
        else:
            self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, paths):
        """Delete L{Tag}s matching C{paths}.

        L{TagValue}s and permissions associated with the deleted L{Tag}s are
        removed by cascading deletes in the database schema.

        @param paths: A sequence of L{Tag.path}s.
        @return: A C{list} of C{(objectID, Tag.path)} 2-tuples representing the
            L{Tag}s that were removed.
        """
        if isgenerator(paths):
            paths = list(paths)
        result = getTags(paths=paths)
        deletedTagPaths = list(result.values(Tag.objectID, Tag.path))
        # Delete the fluiddb/tags/description tag values stored for removed
        # tags.  Associated TagValue's are removed by an ON DELETE CASCADE
        # trigger.
        self._factory.tagValues(self._user).delete(
            [(objectID, path) for objectID, _ in deletedTagPaths
             for path in [u'fluiddb/tags/description', u'fluiddb/tags/path']])

        # Touch all the objects for the given tag paths.
        objectIDs = list(getObjectIDs(paths))
        touchObjects(objectIDs)

        result.remove()
        return deletedTagPaths
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, usernames):
        """Delete L{User}s matching C{username}s.

        @param usernames: A sequence of L{User.username}s.
        @raise FeatureError: Raised if no L{User.username}s are provided.
        @raise UnknownUserError: Raised if one or more usernames don't match
            existing L{User}s.
        @return: A  C{list} of C{(objectID, User.username)} 2-tuples
            representing the L{User}s that that were removed.
        """
        if isgenerator(usernames):
            usernames = list(usernames)
        if not usernames:
            raise FeatureError('At least one username must be provided.')

        usernames = set(usernames)
        result = getUsers(usernames=usernames)
        existingUsernames = set(result.values(User.username))
        unknownUsernames = usernames - existingUsernames
        if unknownUsernames:
            raise UnknownUserError(list(unknownUsernames))

        admin = getUser(u'fluiddb')
        deletedUsers = list(result.values(User.objectID, User.username))
        # FIXME: Deleting a user will leave the permission exception lists
        # containing the user in a corrupt state.
        result.remove()
        self._factory.tagValues(admin).delete(
            [(objectID, systemTag) for objectID, _ in deletedUsers
             for systemTag in [u'fluiddb/users/username',
                               u'fluiddb/users/name',
                               u'fluiddb/users/email',
                               u'fluiddb/users/role']])
        return deletedUsers
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def delete(self, paths):
        """See L{NamespaceAPI.delete}.

        @raise PermissionDeniedError: Raised if the user is not authorized to
            delete a given L{Namespace}.
        """
        if isgenerator(paths):
            paths = list(paths)
        pathsAndOperations = [(path, Operation.DELETE_NAMESPACE)
                              for path in paths]
        deniedOperations = checkPermissions(self._user, pathsAndOperations)
        if deniedOperations:
            raise PermissionDeniedError(self._user.username, deniedOperations)

        return self._api.delete(paths)