Python contextlib 模块,contextmanager() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.contextmanager()

项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def reconfigure(**kwargs):
    """Reconfigures the given kwargs, restoring the current configuration for
    only those kwargs when the contextmanager exits.
    """
    conf_namespace = staticconf.config.get_namespace(namespace)
    starting_config = {
        k: v for k, v in conf_namespace.get_config_values().iteritems()
        if k in kwargs
    }
    configure_from_dict(kwargs)
    try:
        yield
    finally:
        final_config = {
            k: v for k, v in conf_namespace.get_config_values().iteritems()
            if k not in kwargs
        }
        final_config.update(starting_config)
        staticconf.config.get_namespace(namespace).clear()
        configure_from_dict(final_config)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def capture_new_data_pipeline_messages(topic):
    """contextmanager that moves to the tail of the given topic, and waits to
    receive new messages, returning a function that can be called zero or more
    times which will retrieve decoded data pipeline messages from the topic.

    Returns:
        Callable[[int], List[Message]]: Function that takes a single
            optional argument, count, and returns up to count decoded data pipeline
            messages.  This function does not block, and will return however many
            messages are available immediately.  Default count is 100.
    """
    with capture_new_messages(topic) as get_kafka_messages:
        def get_data_pipeline_messages(count=100):
            kafka_messages = get_kafka_messages(count)
            return [
                create_from_offset_and_message(kafka_message)
                for kafka_message in kafka_messages
            ]

        yield get_data_pipeline_messages
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def randommock():
    """Returns a contextmanager that mocks random.random() at a specific value

    Usage::

        def test_something(randommock):
            with randommock(0.55):
                # test stuff...

    """
    @contextlib.contextmanager
    def _randommock(value):
        with mock.patch('random.random') as mock_random:
            mock_random.return_value = value
            yield

    return _randommock
项目:combine    作者:llllllllll    | 项目源码 | 文件源码
def _get_logger_for_contextmanager(log):
    """Get the canonical logger from a context manager.

    Parameters
    ----------
    log : Logger or None
        The explicit logger passed to the context manager.

    Returns
    -------
    log : Logger
        The logger to use in the context manager.
    """
    if log is not None:
        return log

    # We need to walk up through the context manager, then through
    # @contextmanager and finally into the top level calling frame.
    return _logger_for_frame(_getframe(3))
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_generator_based_context_manager_throw():
    @contextlib.contextmanager
    @_core.enable_ki_protection
    def protected_manager():
        assert _core.currently_ki_protected()
        try:
            yield
        finally:
            assert _core.currently_ki_protected()

    with protected_manager():
        assert not _core.currently_ki_protected()

    with pytest.raises(KeyError):
        # This is the one that used to fail
        with protected_manager():
            raise KeyError
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def cross_validation_lock(self):
        """
        A contextmanager for running a block with our cross validation lock set
        to True.

        At the end of the block, the lock's value is restored to its value
        prior to entering the block.
        """
        if self._cross_validation_lock:
            yield
            return
        else:
            try:
                self._cross_validation_lock = True
                yield
            finally:
                self._cross_validation_lock = False
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def logged_in():
    @contextlib.contextmanager
    def _login(user):
        setattr(ctx_stack.top, 'jwt_user', None)
        if isinstance(user, str) and user == 'NOT_LOGGED_IN':
            _TOKENS.append(None)
            res = None
        else:
            _TOKENS.append(
                flask_jwt.create_access_token(identity=user.id, fresh=True)
            )
            res = user

        yield res

        _TOKENS.pop(-1)
        setattr(ctx_stack.top, 'jwt_user', None)

    yield _login
    _TOKENS.clear()

    _TOKENS.clear()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:LIE    作者:EmbraceLife    | 项目源码 | 文件源码
def redirect_stderr(x):
    """ Redirects stderr to another file-like object.

        This is some compatibility code to support Python 3.4.
    """
    if hasattr(contextlib, 'redirect_stderr'):
        result = contextlib.redirect_stderr
    else:
        @contextlib.contextmanager
        def result(x):
            """ Stand-in for Python 3.5's `redirect_stderr`.

                Notes: Non-reentrant, non-threadsafe
            """
            old_stderr = sys.stderr
            sys.stderr = x
            yield
            sys.stder = old_stderr

    return result(x)

###############################################################################
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def setup_with_context_manager(testcase, cm):
    """
    Use a contextmanager in a test setUp that persists until teardown.
    So instead of:

    with ctxmgr(a, b, c) as v:
        # do something with v that only persists for the `with` statement

    use:

    def setUp(self):
        self.v = setup_with_context_manager(self, ctxmgr(a, b, c))

    def test_foo(self):
        # do something with self.v
    """
    val = cm.__enter__()
    testcase.addCleanup(cm.__exit__, None, None, None)
    return val
项目:expyre    作者:lonetwin    | 项目源码 | 文件源码
def open_expiring(filename, at, unless_modified=True, unless_accessed=True, *args):
    """A contextmanager that provides a open file descriptor to a file that
    will be scheduled for expiry on exit.

    :param str filename: The file to open and schedule for expiry.
    :param at: A datetime object or string as recognized by the `at` command.
    :param bool unless_modified: Whether a condition has to be added to the job
        expiry script to expire the path only if it has not been modified since
        it was scheduled for expiry. Default: True
    :param bool unless_accessed: Whether a condition has to be added to the job
        expiry script to expire the path only if it has not been accessed since
        it was scheduled for expiry. Default: True
    :param *args: Any additional arguments to be passed on to the `open` builtin.
    :return: An open file object.
    :rtype: file
    """
    try:
        with open(filename, *args) as fd:
            yield fd
    except:
        raise
    else:
        expire_path(filename, at, unless_modified, unless_accessed)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def cross_validation_lock(self):
        """
        A contextmanager for running a block with our cross validation lock set
        to True.

        At the end of the block, the lock's value is restored to its value
        prior to entering the block.
        """
        if self._cross_validation_lock:
            yield
            return
        else:
            try:
                self._cross_validation_lock = True
                yield
            finally:
                self._cross_validation_lock = False
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.iteritems())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
        # Issue #21173: len() fragile when keys are both implicitly and
        # explicitly removed.
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_del_and_len_while_iterating(dict, testcontext)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_del_and_len_while_iterating(dict, testcontext)
项目:sysl    作者:anz-bank    | 项目源码 | 文件源码
def ViewMethod(w, tname, visibility, out_tname, name, params=(), throws=(),
               override=False):
    view_tname = 'View' if tname == out_tname else out_tname + '.View'

    with Method(w, visibility, view_tname, name, params, throws,
                override):
        @contextlib.contextmanager
        def view():
            w('return new {}(model) {{', view_tname)
            with w.indent():
                @contextlib.contextmanager
                def enumerator_method(src_var=None):
                    with EnumeratorMethod(w, 'public', out_tname,
                                          source=src_var and (src_var, 'View.this', tname)) as enumerator:
                        yield enumerator

                yield enumerator_method

        yield view

        w('}};')
项目:servoshell    作者:paulrouget    | 项目源码 | 文件源码
def check_flake8(file_name, contents):
    from flake8.main import check_code

    if not file_name.endswith(".py"):
        raise StopIteration

    @contextlib.contextmanager
    def stdout_redirect(where):
        sys.stdout = where
        try:
            yield where
        finally:
            sys.stdout = sys.__stdout__

    ignore = {
        "W291",  # trailing whitespace; the standard tidy process will enforce no trailing whitespace
        "E501",  # 80 character line length; the standard tidy process will enforce line length
    }

    output = StringIO.StringIO()
    with stdout_redirect(output):
        check_code(contents, ignore=ignore)
    for error in output.getvalue().splitlines():
        _, line_num, _, message = error.split(":", 3)
        yield line_num, message.strip()
项目:pythonista-scripts    作者:khilnani    | 项目源码 | 文件源码
def z_add_owned_piece(self, p):
        #freeze:wasfrozen = False
        #freeze:if self.isFrozen:
        #freeze:    wasfrozen = True
        #freeze:    self.unfreeze()
        self.owned_pieces.add(p)
        self._update()
        #freeze:if wasfrozen:
        #freeze:    self.freeze()

    #freeze:def freeze(self):
    #freeze:    self.isFrozen = True
    #freeze:    self.owned_pieces = list(self.owned_pieces)

    #freeze:def unfreeze(self):
    #freeze:    self.isFrozen = False
    #freeze:    self.owned_pieces = set(self.owned_pieces)

    #freeze:@contextlib.contextmanager
    #freeze:def frozen(self):
    #freeze:    self.freeze()
    #freeze:    yield
    #freeze:    self.unfreeze()
项目:pythonista-scripts    作者:khilnani    | 项目源码 | 文件源码
def set_game(self, g):
        self.game = g
        self.cfg.set_game(g)

    #freeze:def freeze(self):
    #freeze:    """Lock the board in place."""
    #freeze:    self.isfrozen = True
    #freeze:    #self.pieces = list(self.pieces)
    #freeze:    for player in self.players:
    #freeze:        player.freeze()

    #freeze:def unfreeze(self):
    #freeze:    """Unlock the board."""
    #freeze:    self.isfrozen = False
    #freeze:    #self.pieces = set(self.pieces)
    #freeze:    for player in self.players:
    #freeze:        player.unfreeze()

    #freeze:@contextlib.contextmanager
    #freeze:def frozen(self):
    #freeze:    self.freeze()
    #freeze:    yield
    #freeze:    self.unfreeze()
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def singleton_contextmanager(func):
    class CtxManager():
        def __init__(self, func):
            self.count = 0
            self.func_cm = contextmanager(func)
            self._lock = RLock()

        def __enter__(self):
            with self._lock:
                if self.count == 0:
                    self.ctm = self.func_cm()
                    self.obj = self.ctm.__enter__()
                self.count += 1

        def __exit__(self, *args):
            with self._lock:
                self.count -= 1
                if self.count > 0:
                    return
                self.ctm.__exit__(*sys.exc_info())
                del self.ctm
                del self.obj

    return CtxManager(func)
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def singleton_contextmanager_method(func):
    cached_attr_name = '__singleton_contextmanager_method__' + func.__name__

    # Wrap with a context manager to get proper IPython documentation
    @contextmanager
    @wraps(func)
    def inner(self):
        with _singleton_contextmanager_method_attr_lock:
            try:
                cm = getattr(self, cached_attr_name)
            except AttributeError:
                cm = singleton_contextmanager(partial(func, self))
                setattr(self, cached_attr_name, cm)
        with cm as val:
            yield val

    return inner
项目:keepass-menu    作者:frostidaho    | 项目源码 | 文件源码
def open(filename, **credentials):
    """
    A contextmanager to open the KeePass file with `filename`. Use a `password`
    and/or `keyfile` named argument for decryption.

    Files are identified using their signature and a reader suitable for 
    the file format is intialized and returned.

    Note: `keyfile` is currently not supported for v3 KeePass files.
    """
    kdb = None
    try:
        with io.open(filename, 'rb') as stream:
            signature = common.read_signature(stream)
            cls = get_kdb_reader(signature)
            kdb = cls(stream, **credentials)
            yield kdb
            kdb.close()
    except:
        if kdb:
            kdb.close()
        raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def run_xz_decompress(stdout, stderr=None):
    """Run xz --decompress and return a contextmanager object for the proc."""
    xz = None
    try:
        xz = subprocess.Popen(["xz", "--decompress", "--stdout"],
                              stdin=subprocess.PIPE, stdout=stdout,
                              stderr=stderr)
        yield xz
    finally:
        if not xz:
            raise OSError("You must have an 'xz' binary in PATH.")

        xz.stdin.close()
        result = xz.wait()

        if result != 0:
            raise IOError("xz --decompress returned %d." % result)
项目:kur    作者:deepgram    | 项目源码 | 文件源码
def redirect_stderr(x):
    """ Redirects stderr to another file-like object.

        This is some compatibility code to support Python 3.4.
    """
    if hasattr(contextlib, 'redirect_stderr'):
        result = contextlib.redirect_stderr
    else:
        @contextlib.contextmanager
        def result(x):
            """ Stand-in for Python 3.5's `redirect_stderr`.

                Notes: Non-reentrant, non-threadsafe
            """
            old_stderr = sys.stderr
            sys.stderr = x
            yield
            sys.stder = old_stderr

    return result(x)

###############################################################################
项目:BigBrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def query2(self, query, bindata=None):
        """
        Alias for query method which make use of the python 'with' statement.
        The contextmanager approach ensures that the generated cursor object instance
        is always closed whenever the execution goes out of the 'with' statement block.
        Example:

        >> with self.console.storage.query(query) as cursor:
        >>     if not cursor.EOF:
        >>         cursor.getRow()
        >>         ...

        :param query: The query to execute.
        :param bindata: Data to bind to the given query.
        """
        cursor = None
        try:
            cursor = self.query(query, bindata)
            yield cursor
        finally:
            if cursor:
                cursor.close()
项目:mysql_streamer    作者:Yelp    | 项目源码 | 文件源码
def reconfigure(ns=namespace, **kwargs):
    """Reconfigures the given kwargs, restoring the current configuration for
    only those kwargs when the contextmanager exits.

    Args:
        ns: Namespace of the conf
    """
    conf_namespace = staticconf.config.get_namespace(ns)
    starting_config = {
        k: v for k, v in conf_namespace.get_config_values().iteritems()
        if k in kwargs
    }
    staticconf.DictConfiguration(kwargs, namespace=ns)
    try:
        yield
    finally:
        final_config = {
            k: v for k, v in conf_namespace.get_config_values().iteritems()
            if k not in kwargs
        }
        final_config.update(starting_config)
        staticconf.config.get_namespace(ns).clear()
        staticconf.DictConfiguration(final_config, namespace=ns)
项目:mysql_streamer    作者:Yelp    | 项目源码 | 文件源码
def _register_signal_handlers(self):
        """Register the handler SIGUSR2, which will toggle a profiler on and off.
        """
        try:
            signal.signal(signal.SIGINT, self._handle_shutdown_signal)
            signal.signal(signal.SIGTERM, self._handle_shutdown_signal)
            signal.signal(signal.SIGUSR2, self._handle_profiler_signal)
            yield
        finally:
            # Cleanup for the profiler signal handler has to happen here,
            # because signals that are handled don't unwind up the stack in the
            # way that normal methods do.  Any contextmanager or finally
            # statement won't live past the handler function returning.
            signal.signal(signal.SIGUSR2, signal.SIG_DFL)
            if self._profiler_running:
                self._disable_profiler()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_create_without_pause(self):
        self.flags(virt_type='lxc', group='libvirt')

        @contextlib.contextmanager
        def fake_lxc_disk_handler(*args, **kwargs):
            yield

        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
        instance = objects.Instance(**self.test_instance)

        with test.nested(
              mock.patch.object(drvr, '_lxc_disk_handler',
                                side_effect=fake_lxc_disk_handler),
              mock.patch.object(drvr, 'plug_vifs'),
              mock.patch.object(drvr, 'firewall_driver'),
              mock.patch.object(drvr, '_create_domain'),
              mock.patch.object(drvr, 'cleanup')) as (
              _handler, cleanup, firewall_driver, create, plug_vifs):
            domain = drvr._create_domain_and_network(self.context, 'xml',
                                                     instance, None, None)
            self.assertEqual(0, create.call_args_list[0][1]['pause'])
            self.assertEqual(0, domain.resume.call_count)
项目:contextlib2    作者:jazzband    | 项目源码 | 文件源码
def test_contextmanager_finally(self):
        state = []
        @contextmanager
        def woohoo():
            state.append(1)
            try:
                yield 42
            finally:
                state.append(999)
        with self.assertRaises(ZeroDivisionError):
            with woohoo() as x:
                self.assertEqual(state, [1])
                self.assertEqual(x, 42)
                state.append(x)
                raise ZeroDivisionError()
        self.assertEqual(state, [1, 42, 999])
项目:contextlib2    作者:jazzband    | 项目源码 | 文件源码
def test_contextmanager_except(self):
        state = []
        @contextmanager
        def woohoo():
            state.append(1)
            try:
                yield 42
            except ZeroDivisionError as e:
                state.append(e.args[0])
                self.assertEqual(state, [1, 42, 999])
        with woohoo() as x:
            self.assertEqual(state, [1])
            self.assertEqual(x, 42)
            state.append(x)
            raise ZeroDivisionError(999)
        self.assertEqual(state, [1, 42, 999])
项目:contextlib2    作者:jazzband    | 项目源码 | 文件源码
def test_contextmanager_except_pep479(self):
        code = """\
from __future__ import generator_stop
from contextlib import contextmanager
@contextmanager
def woohoo():
    yield
"""
        locals = {}
        exec(code, locals, locals)
        woohoo = locals['woohoo']

        stop_exc = StopIteration('spam')
        try:
            with woohoo():
                raise stop_exc
        except Exception as ex:
            self.assertIs(ex, stop_exc)
        else:
            self.fail('StopIteration was suppressed')
项目:contextlib2    作者:jazzband    | 项目源码 | 文件源码
def test_contextmanager_as_decorator(self):
        @contextmanager
        def woohoo(y):
            state.append(y)
            yield
            state.append(999)

        state = []
        @woohoo(1)
        def test(x):
            self.assertEqual(state, [1])
            state.append(x)
        test('something')
        self.assertEqual(state, [1, 'something', 999])

        # Issue #11647: Ensure the decorated function is 'reusable'
        state = []
        test('something else')
        self.assertEqual(state, [1, 'something else', 999])

# Detailed exception chaining checks only make sense on Python 3
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def cross_validation_lock(self):
        """
        A contextmanager for running a block with our cross validation lock set
        to True.

        At the end of the block, the lock's value is restored to its value
        prior to entering the block.
        """
        if self._cross_validation_lock:
            yield
            return
        else:
            try:
                self._cross_validation_lock = True
                yield
            finally:
                self._cross_validation_lock = False
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def setup_with_context_manager(testcase, cm):
    """Use a contextmanager to setUp a test case.

    If you have a context manager you like::

        with ctxmgr(a, b, c) as v:
            # do something with v

    and you want to have that effect for a test case, call this function from
    your setUp, and it will start the context manager for your test, and end it
    when the test is done::

        def setUp(self):
            self.v = setup_with_context_manager(self, ctxmgr(a, b, c))

        def test_foo(self):
            # do something with self.v

    """
    val = cm.__enter__()
    testcase.addCleanup(cm.__exit__, None, None, None)
    return val
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def contextmanager(fn):
        def oops(*args, **kw):
            raise RuntimeError("Python 2.5 or above is required to use "
                               "context managers.")
        oops.__name__ = fn.__name__
        return oops
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def observation(observe, notify):
    """Simple boilerplate to link to the 'with' statement.

    Contextlib's contextmanager decorator is a very convenient way to
    create simple context managers, specifically the __enter__ and
    __exit__ special methods.
    """

    proxy = Observation(observe, notify)
    try:
        yield proxy
    finally:
        proxy.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
项目:sc-controller    作者:kozec    | 项目源码 | 文件源码
def _validContext(func):
        # Defined inside USBContext so we can access "self.__*".
        @contextlib.contextmanager
        def refcount(self):
            with self.__context_cond:
                if not self.__context_p and self.__auto_open:
                    # BBB
                    warnings.warn(
                        'Use "with USBContext() as context:" for safer cleanup'
                        ' on interpreter shutdown. See also USBContext.open().',
                        DeprecationWarning,
                    )
                    self.open()
                self.__context_refcount += 1
            try:
                yield
            finally:
                with self.__context_cond:
                    self.__context_refcount -= 1
                    if not self.__context_refcount:
                        self.__context_cond.notifyAll()
        if inspect.isgeneratorfunction(func):
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        for value in func(self, *args, **kw):
                            # pylint: enable=not-callable
                            yield value
        else:
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        return func(self, *args, **kw)
                        # pylint: enable=not-callable
        functools.update_wrapper(wrapper, func)
        return wrapper
    # pylint: enable=no-self-argument,protected-access