Python sqlalchemy.event 模块,Events() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlalchemy.event.Events()

项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _listen(cls, target, identifier, fn, propagate=True):

        def listen(target_cls, *arg):
            listen_cls = target()
            if propagate and issubclass(target_cls, listen_cls):
                return fn(target_cls, *arg)
            elif not propagate and target_cls is listen_cls:
                return fn(target_cls, *arg)

        def remove(ref):
            event.Events._remove(orm.instrumentation._instrumentation_factory,
                                            identifier, listen)

        target = weakref.ref(target.class_, remove)
        event.Events._listen(orm.instrumentation._instrumentation_factory,
                        identifier, listen)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _listen(cls, target, identifier, fn, raw=False, propagate=False):
        if not raw:
            orig_fn = fn

            def wrap(state, *arg, **kw):
                return orig_fn(state.obj(), *arg, **kw)
            fn = wrap

        event.Events._listen(target, identifier, fn, propagate=propagate)
        if propagate:
            for mgr in target.subclass_managers(True):
                event.Events._listen(mgr, identifier, fn, True)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _listen(cls, target, identifier, fn,
                            raw=False, retval=False, propagate=False):

        if not raw or not retval:
            if not raw:
                meth = getattr(cls, identifier)
                try:
                    target_index = \
                        inspect.getargspec(meth)[0].index('target') - 1
                except ValueError:
                    target_index = None

            wrapped_fn = fn

            def wrap(*arg, **kw):
                if not raw and target_index is not None:
                    arg = list(arg)
                    arg[target_index] = arg[target_index].obj()
                if not retval:
                    wrapped_fn(*arg, **kw)
                    return orm.interfaces.EXT_CONTINUE
                else:
                    return wrapped_fn(*arg, **kw)
            fn = wrap

        if propagate:
            for mapper in target.self_and_descendants:
                event.Events._listen(mapper, identifier, fn, propagate=True)
        else:
            event.Events._listen(target, identifier, fn)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _listen(cls, target, identifier, fn, active_history=False,
                                        raw=False, retval=False,
                                        propagate=False):
        if active_history:
            target.dispatch._active_history = True

        # TODO: for removal, need to package the identity
        # of the wrapper with the original function.

        if not raw or not retval:
            orig_fn = fn

            def wrap(target, value, *arg):
                if not raw:
                    target = target.obj()
                if not retval:
                    orig_fn(target, value, *arg)
                    return value
                else:
                    return orig_fn(target, value, *arg)
            fn = wrap

        event.Events._listen(target, identifier, fn, propagate)

        if propagate:
            manager = orm.instrumentation.manager_of_class(target.class_)

            for mgr in manager.subclass_managers(True):
                event.Events._listen(mgr[target.key], identifier, fn, True)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _listen(cls, target, identifier, fn, retval=False):
        target._has_events = True

        if not retval:
            if identifier == 'before_execute':
                orig_fn = fn

                def wrap_before_execute(conn, clauseelement,
                                                multiparams, params):
                    orig_fn(conn, clauseelement, multiparams, params)
                    return clauseelement, multiparams, params
                fn = wrap_before_execute
            elif identifier == 'before_cursor_execute':
                orig_fn = fn

                def wrap_before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
                    orig_fn(conn, cursor, statement,
                        parameters, context, executemany)
                    return statement, parameters
                fn = wrap_before_cursor_execute

        elif retval and \
            identifier not in ('before_execute', 'before_cursor_execute'):
            raise exc.ArgumentError(
                    "Only the 'before_execute' and "
                    "'before_cursor_execute' engine "
                    "event listeners accept the 'retval=True' "
                    "argument.")
        event.Events._listen(target, identifier, fn)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def _set_dispatch(cls, dispatch_cls):
        dispatch = event.Events._set_dispatch(cls, dispatch_cls)
        dispatch_cls._active_history = False
        return dispatch