Python contextlib 模块,nested() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用contextlib.nested()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_spawn(self):
        self._create_instance()
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
                                         None)
            mock_secgrp.return_value = []
            self._create_nova_vm()
            fake_instances = self.fake_ec2_conn.get_only_instances()
            self.assertEqual(len(fake_instances), 1)
            inst = fake_instances[0]
            self.assertEqual(inst.vpc_id, self.vpc.id)
            self.assertEqual(self.subnet_id, inst.subnet_id)
            self.assertEqual(inst.tags['Name'], 'fake_instance')
            self.assertEqual(inst.tags['openstack_id'], self.uuid)
            self.assertEqual(inst.image_id, 'ami-1234abc')
            self.assertEqual(inst.region.name, self.region_name)
            self.assertEqual(inst.key_name, 'None')
            self.assertEqual(inst.instance_type, 't2.small')
        self.reset()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_spawn_with_key(self):
        self._create_instance(key_name='fake_key', key_data='fake_key_data')
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
                                         None)
            mock_secgrp.return_value = []
            self._create_nova_vm()
            fake_instances = self.fake_ec2_conn.get_only_instances()
            self.assertEqual(len(fake_instances), 1)
            inst = fake_instances[0]
            self.assertEqual(inst.key_name, 'fake_key')
        self.reset()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_destory_instance_terminated_on_aws(self):
        self._create_vm_in_aws_nova()
        fake_instances = self.fake_ec2_conn.get_only_instances()
        self.fake_ec2_conn.stop_instances(instance_ids=[fake_instances[0].id])
        self.fake_ec2_conn.terminate_instances(
            instance_ids=[fake_instances[0].id])
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'),
            mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'),
            mock.patch.object(EC2Driver, '_wait_for_state'),
        ) as (fake_stop, fake_terminate, fake_wait):
            self.conn.destroy(self.context, self.instance, None, None)
            fake_stop.assert_not_called()
            fake_terminate.assert_not_called()
            fake_wait.assert_not_called()
        self.reset()
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _stubs(self, network_info, subnet_info, ports_info):
        self.ipam = quark.ipam.QuarkIpamANY()
        with contextlib.nested(
                mock.patch("neutron.common.rpc.get_notifier"),
                mock.patch("neutron.quota.QUOTAS.limit_check")):
            net = network_api.create_network(self.context, network_info)
            mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
            self.context.is_admin = True
            macrng_api.create_mac_address_range(self.context, mac)
            self.context.is_admin = False
            subnet_info['subnet']['network_id'] = net['id']
            sub = subnet_api.create_subnet(self.context, subnet_info)
            ports = []
            for port_info in ports_info:
                port_info['port']['network_id'] = net['id']
                ports.append(port_api.create_port(self.context, port_info))
            yield net, sub, ports
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:ansible-role    作者:mattvonrocketstein    | 项目源码 | 文件源码
def vulture():
    """ try to find dead code paths """
    with api.quiet():
        if not api.local('which vulture').succeeded:
            print 'vulture not found, installing it'
            api.local('pip install vulture')
    ignore_functions_grep = 'egrep -v "{0}"'.format(
        '|'.join(VULTURE_IGNORE_FUNCTIONS))
    excluded = ",".join(VULTURE_EXCLUDE_PATHS)
    excluded_paths = (' --exclude ' + excluded) if excluded else ''
    vulture_cmd = '\n  vulture {pkg_name}{exclude}{pipes}'
    vulture_cmd = vulture_cmd.format(
        pkg_name=PKG_NAME,
        exclude=excluded_paths,
        pipes='|'.join(['', ignore_functions_grep]))
    changedir = api.lcd(os.path.dirname(__file__))
    warn_only = api.settings(warn_only=True)
    be_quit = api.hide('warnings')
    with contextlib.nested(changedir, warn_only, be_quit):
        result = api.local(vulture_cmd, capture=True)
        exit_code = result.return_code
    print result.strip()
    raise SystemExit(exit_code)
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:amira    作者:Yelp    | 项目源码 | 文件源码
def _patch_and_run_amira(
            self, region_name, queue_name, contents, created_objects):
        """Patches all the external dependencies and runs AMIRA."""
        self._results_uploader_mock = MagicMock()

        with nested(
            patch.object(
                S3Handler, '__init__', autospec=True, return_value=None),
            patch.object(
                S3Handler, 'get_contents_as_string', autospec=True,
                side_effect=contents),
            patch.object(
                SqsHandler, '__init__', autospec=True, return_value=None),
            patch.object(
                SqsHandler, 'get_created_objects', autospec=True,
                side_effect=created_objects),
        ) as (
            __,
            self._patched_get_contents_as_string,
            __,
            self._patched_get_created_objects,
        ):
            amira = AMIRA(region_name, queue_name)
            amira.register_results_uploader(self._results_uploader_mock)
            amira.run()
项目:elastalert-ui    作者:steelheaddigital    | 项目源码 | 文件源码
def test_run_rule_calls_garbage_collect(ea):
    start_time = '2014-09-26T00:00:00Z'
    end_time = '2014-09-26T12:00:00Z'
    ea.buffer_time = datetime.timedelta(hours=1)
    ea.run_every = datetime.timedelta(hours=1)
    with contextlib.nested(mock.patch.object(ea.rules[0]['type'], 'garbage_collect'),
                           mock.patch.object(ea, 'run_query')) as (mock_gc, mock_get_hits):
        ea.run_rule(ea.rules[0], ts_to_dt(end_time), ts_to_dt(start_time))

    # Running ElastAlert every hour for 12 hours, we should see self.garbage_collect called 12 times.
    assert mock_gc.call_count == 12

    # The calls should be spaced 1 hour apart
    expected_calls = [ts_to_dt(start_time) + datetime.timedelta(hours=i) for i in range(1, 13)]
    for e in expected_calls:
        mock_gc.assert_any_call(e)
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def __call__(self, f):
        @wrapt.decorator
        def test_with_fixtures(wrapped, instance, args, kwargs):
            fixture_instances = [i for i in list(args) + list(kwargs.values())
                                     if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures]
            dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances)

            if six.PY2:
                with contextlib.nested(*list(dependency_ordered_fixtures)):
                    return wrapped(*args, **kwargs)
            else:
                with contextlib.ExitStack() as stack:
                    for fixture in dependency_ordered_fixtures:
                        stack.enter_context(fixture)
                    return wrapped(*args, **kwargs)

        ff = test_with_fixtures(f)
        arg_names = self.fixture_arg_names(ff)
        return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
项目:plugin.video.unofficial9anime    作者:Prometheusx-git    | 项目源码 | 文件源码
def _exec_(self, source):
            source = '''\
            (function() {{
                {0};
                {1};
            }})()'''.format(
                encode_unicode_codepoints(self._source),
                encode_unicode_codepoints(source)
            )
            source = str(source)

            # backward compatibility
            with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
                js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
                try:
                    script = engine.compile(source)
                except js_errors as e:
                    raise exceptions.RuntimeError(e)
                try:
                    value = script.run()
                except js_errors as e:
                    raise exceptions.ProgramError(e)
                return self.convert(value)
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:FMoviesPlus.bundle    作者:coder-alpha    | 项目源码 | 文件源码
def exec_(self, source):
            source = '''\
            (function() {{
                {0};
                {1};
            }})()'''.format(
                encode_unicode_codepoints(self._source),
                encode_unicode_codepoints(source)
            )
            source = str(source)

            import PyV8
            import contextlib
            #backward compatibility
            with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
                js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
                try:
                    script = engine.compile(source)
                except js_errors as e:
                    raise RuntimeError(e)
                try:
                    value = script.run()
                except js_errors as e:
                    raise ProgramError(e)
                return self.convert(value)
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:zpy    作者:albertaleksieiev    | 项目源码 | 文件源码
def _exec_(self, source):
            source = '''\
            (function() {{
                {0};
                {1};
            }})()'''.format(
                encode_unicode_codepoints(self._source),
                encode_unicode_codepoints(source)
            )
            source = str(source)

            # backward compatibility
            with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
                js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
                try:
                    script = engine.compile(source)
                except js_errors as e:
                    raise exceptions.RuntimeError(e)
                try:
                    value = script.run()
                except js_errors as e:
                    raise exceptions.ProgramError(e)
                return self.convert(value)
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:LIS-Tempest    作者:LIS    | 项目源码 | 文件源码
def test_pkey_calls_paramiko_RSAKey(self):
        with contextlib.nested(
            mock.patch('paramiko.RSAKey.from_private_key'),
            mock.patch('cStringIO.StringIO')) as (rsa_mock, cs_mock):
            cs_mock.return_value = mock.sentinel.csio
            pkey = 'mykey'
            ssh.Client('localhost', 'root', pkey=pkey)
            rsa_mock.assert_called_once_with(mock.sentinel.csio)
            cs_mock.assert_called_once_with('mykey')
            rsa_mock.reset_mock()
            cs_mock.rest_mock()
            pkey = mock.sentinel.pkey
            # Shouldn't call out to load a file from RSAKey, since
            # a sentinel isn't a basestring...
            ssh.Client('localhost', 'root', pkey=pkey)
            rsa_mock.assert_not_called()
            cs_mock.assert_not_called()
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def nested(*managers):
        exits = []
        vars = []
        exc = (None, None, None)
        try:
            for mgr in managers:
                exit = mgr.__exit__
                enter = mgr.__enter__
                vars.append(enter())
                exits.append(exit)
            yield vars
        except:
            exc = sys.exc_info()
        finally:
            while exits:
                exit = exits.pop()
                try:
                    if exit(*exc):
                        exc = (None, None, None)
                except:
                    exc = sys.exc_info()
            if exc != (None, None, None):
                reraise(exc[0], exc[1], exc[2])
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def nested(*contexts):
        with contextlib.ExitStack() as stack:
            yield [stack.enter_context(c) for c in contexts]
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def nested(*contexts):
        """
        Reimplementation of nested in python 3.
        """
        with ExitStack() as stack:
            results = [
                stack.enter_context(context)
                for context in contexts
            ]
            yield results
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def create_upload_func(self, ns, definition, path, operation):
        request_schema = definition.request_schema or Schema()
        response_schema = definition.response_schema or Schema()

        @self.add_route(path, operation, ns)
        @wraps(definition.func)
        def upload(**path_data):
            request_data = load_query_string_data(request_schema)

            if not request.files:
                raise BadRequest("No files were uploaded")

            uploads = [
                temporary_upload(name, fileobj)
                for name, fileobj
                in request.files.items()
                if not self.exclude_func(name, fileobj)
            ]
            with nested(*uploads) as files:
                response_data = definition.func(files, **merge_data(path_data, request_data))
                if response_data is None:
                    return "", 204

            return dump_response_data(response_schema, response_data, operation.value.default_code)

        if definition.request_schema:
            upload = qs(definition.request_schema)(upload)
        if definition.response_schema:
            upload = response(definition.response_schema)(upload)
        return upload
项目:kolla-kubernetes    作者:openstack    | 项目源码 | 文件源码
def nested(*contexts):
        with contextlib.ExitStack() as stack:
            yield [stack.enter_context(c) for c in contexts]
项目:bifrost    作者:ledatelescope    | 项目源码 | 文件源码
def flatten(self, *args):
        """Flatten a nested tuple/list of tuples/lists"""
        flattened_list = []
        for element in args:
            if isinstance(element, (tuple, list)):
                flattened_list.extend(self.flatten(*element))
            else:
                flattened_list.extend([element])
        return flattened_list
项目:omni    作者:openstack    | 项目源码 | 文件源码
def _create_vm_in_aws_nova(self):
        self._create_instance()
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
                                         None)
            mock_secgrp.return_value = []
            self._create_nova_vm()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_add_ssh_keys_key_exists(self):
        fake_key = 'fake_key'
        fake_key_data = 'abcdefgh'
        self.conn.ec2_conn.import_key_pair(fake_key, fake_key_data)
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
            mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
        ) as (fake_get, fake_import):
            fake_get.return_value = True
            self.conn._add_ssh_keys(fake_key, fake_key_data)
            fake_get.assert_called_once_with(fake_key)
            fake_import.assert_not_called()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_add_ssh_keys_key_absent(self):
        fake_key = 'fake_key'
        fake_key_data = 'abcdefgh'
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
            mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
        ) as (fake_get, fake_import):
            fake_get.return_value = False
            self.conn._add_ssh_keys(fake_key, fake_key_data)
            fake_get.assert_called_once_with(fake_key)
            fake_import.assert_called_once_with(fake_key, fake_key_data)
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_spawn_with_network_error(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (None, None, None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_spawn_with_network_error_from_aws(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.return_value = 'ami-1234abc'
            mock_network.return_value = (None, '192.168.10.5', None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_spawn_with_image_error(self):
        self._create_instance()
        self._create_network()
        with contextlib.nested(
            mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
            mock.patch.object(EC2Driver, '_process_network_info'),
            mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
        ) as (mock_image, mock_network, mock_secgrp):
            mock_image.side_effect = exception.BuildAbortException('fake')
            mock_network.return_value = ('subnet-1234abc', '192.168.10.5',
                                         None, None)
            mock_secgrp.return_value = []
            self.assertRaises(exception.BuildAbortException,
                              self._create_nova_vm)
        self.reset()
项目:omni    作者:openstack    | 项目源码 | 文件源码
def test_destroy_instance_not_found(self):
        self._create_instance()
        with contextlib.nested(
            mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'),
            mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'),
            mock.patch.object(EC2Driver, '_wait_for_state'),
        ) as (fake_stop, fake_terminate, fake_wait):
            self.conn.destroy(self.context, self.instance, None, None)
            fake_stop.assert_not_called()
            fake_terminate.assert_not_called()
            fake_wait.assert_not_called()
        self.reset()
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _stubs(self, network_info, subnet_info, port_info):
        self.ipam = quark.ipam.QuarkIpamANY()
        with contextlib.nested(
                mock.patch("neutron.common.rpc.get_notifier"),
                mock.patch("neutron.quota.QUOTAS.limit_check")):
            net = network_api.create_network(self.context, network_info)
            mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
            self.context.is_admin = True
            macrng_api.create_mac_address_range(self.context, mac)
            self.context.is_admin = False
            subnet_info['subnet']['network_id'] = net['id']
            port_info['port']['network_id'] = net['id']
            sub = subnet_api.create_subnet(self.context, subnet_info)
            port = port_api.create_port(self.context, port_info)
            yield net, sub, port
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def unzip(infile, outfile):
    with contextlib.nested(gzip.open(infile, 'rb'), open(outfile, 'w')) as (gz, out):
        for data in iter(functools.partial(gz.read, 1024), ''):
            out.write(data)


###########################################################################
# END: Utility functions
###########################################################################
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def read_files(*file_paths):
    files = []
    for i, p in enumerate(file_paths):
        if p:
            files.append(open(p, mode="r"))
            print 'Opened:', p
        else:
            files.append(EmptyFile())
            print 'WARNING: no path provided for file {} in list.'.format(i)

    with contextlib.nested(*files) as entered_files:
        for lines in izip(*entered_files):
            yield lines
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def read_files(*file_paths):
    files = []
    for i, p in enumerate(file_paths):
        if p:
            files.append(open(p, mode="r"))
            print 'Opened:', p
        else:
            files.append(EmptyFile())
            print 'WARNING: no path provided for file {} in list.'.format(i)

    with contextlib.nested(*files) as entered_files:
        for lines in izip(*entered_files):
            yield lines
项目:Benchmark    作者:SihyeongPark    | 项目源码 | 文件源码
def makeSummaryFiles(iniName):
   os.chdir(dirs['tmp'])

   datacsv = 'filtered_measurements.csv'
   ndatacsv = 'fastest_measurements.csv'
   bulkdatacsv = 'all_measurements.csv'

   with nested( open( join(datacsv), 'w'),
                open( join(ndatacsv), 'w')) as (df,ndf):

      try:
         #bdf = bz2.BZ2File( join(bulkdatacsv), 'w')
         bdf = open( join(bulkdatacsv), 'w')

         writeHeader(df); writeHeader(ndf); writeHeader(bdf)
         walkDatFiles(df,ndf,bdf)

      except (OSError,IOError), err:
         if logger: logger.error(err)
      finally:
         bdf.close()

   try:
      if dirs['dat_sweep']:
         copy2( join(datacsv), dirs['dat_sweep'])
         copy2( join(ndatacsv), dirs['dat_sweep'])
         copy2( join(bulkdatacsv), dirs['dat_sweep'])
         if logger: logger.info('copy %s files to %s','*.csv',dirs['dat_sweep'])

   except (OSError,IOError), err:
      if logger: logger.error(err)



# =============================
# sweep .code & .log
# =============================
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def test_interface_initially_up(self):
        combined_id = WloadEndpointId("host_id", "orchestrator_id",
                                      "workload_id", "endpoint_id")
        ip_type = futils.IPV4
        local_ep = self.create_endpoint(combined_id, ip_type)

        ips = ["1.2.3.4"]
        iface = "tapabcdef"
        data = {
            'state': "active",
            'endpoint': "endpoint_id",
            'mac': stub_utils.get_mac(),
            'name': iface,
            'ipv4_nets': ips,
            'profile_ids': ["prof1"]
        }

        # We can only get on_interface_update calls after the first
        # on_endpoint_update, so trigger that.
        with nested(
                mock.patch('calico.felix.devices.set_routes'),
                mock.patch('calico.felix.devices.configure_interface_ipv4'),
                mock.patch('calico.felix.devices.interface_up'),
                mock.patch('calico.felix.devices.interface_exists'),
        ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
                m_iface_up.return_value = True
                m_iface_exists.return_value = True
                local_ep.on_endpoint_update(data, async=True)
                self.step_actor(local_ep)
                self.assertEqual(local_ep._mac, data['mac'])
                self.assertTrue(m_conf.called)
                self.assertTrue(local_ep._device_in_sync)
                # Interface is up so we should get the routes striaght away.
                m_set_routes.assert_called_once_with(ip_type,
                                                     set(ips),
                                                     iface,
                                                     data['mac'],
                                                     reset_arp=True)
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def test_interface_goes_down_removes_routes(self):
        combined_id = WloadEndpointId("host_id", "orchestrator_id",
                                      "workload_id", "endpoint_id")
        ip_type = futils.IPV4
        local_ep = self.create_endpoint(combined_id, ip_type)

        ips = ["1.2.3.4"]
        iface = "tapabcdef"
        data = {
            'state': "active",
            'endpoint': "endpoint_id",
            'mac': stub_utils.get_mac(),
            'name': iface,
            'ipv4_nets': ips,
            'profile_ids': ["prof1"]
        }

        # We can only get on_interface_update calls after the first
        # on_endpoint_update, so trigger that.
        with nested(
                mock.patch('calico.felix.devices.set_routes'),
                mock.patch('calico.felix.devices.configure_interface_ipv4'),
                mock.patch('calico.felix.devices.interface_up'),
                mock.patch('calico.felix.devices.interface_exists'),
        ) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
                m_iface_up.return_value = True
                m_iface_exists.return_value = True
                local_ep.on_endpoint_update(data, async=True)
                self.step_actor(local_ep)
                self.assertTrue(m_conf.called)  # Check we did the "UP" path.

        # Now pretend send an interface update.
        with mock.patch('calico.felix.devices.set_routes') as m_set_routes:
            local_ep.on_interface_update(False, async=True)
            self.step_actor(local_ep)
            m_set_routes.assert_called_once_with(ip_type,
                                                 set(),
                                                 iface,
                                                 None)
            self.assertTrue(local_ep._device_in_sync)
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def test_configure_interface_ipv6_mainline(self):
        """
        Test that configure_interface_ipv6_mainline
            - opens and writes to the /proc system to enable proxy NDP on the
              interface.
            - calls ip -6 neigh to set up the proxy targets.

        Mainline test has two proxy targets.
        """
        m_open = mock.mock_open()
        rc = futils.CommandOutput("", "")
        if_name = "tap3e5a2b34222"
        proxy_target = "2001::3:4"

        open_patch = mock.patch('__builtin__.open', m_open, create=True)
        m_check_call = mock.patch('calico.felix.futils.check_call',
                                  return_value=rc)

        with nested(open_patch, m_check_call) as (_, m_check_call):
            devices.configure_interface_ipv6(if_name, proxy_target)
            calls = [mock.call('/proc/sys/net/ipv6/conf/%s/proxy_ndp' %
                               if_name,
                               'wb'),
                     M_ENTER,
                     mock.call().write('1'),
                     M_CLEAN_EXIT]
            m_open.assert_has_calls(calls)
            ip_calls = [mock.call(["ip", "-6", "neigh", "add", "proxy",
                                   str(proxy_target), "dev", if_name])]
            m_check_call.assert_has_calls(ip_calls)