Python contextlib 模块,ExitStack() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用contextlib.ExitStack()

项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def _predict(predictors: Dict[str, str]):
    def predict_inner(args: argparse.Namespace) -> None:
        predictor = _get_predictor(args, predictors)
        output_file = None

        if args.silent and not args.output_file:
            print("--silent specified without --output-file.")
            print("Exiting early because no output will be created.")
            sys.exit(0)

        # ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist
        with ExitStack() as stack:
            input_file = stack.enter_context(args.input_file)  # type: ignore
            if args.output_file:
                output_file = stack.enter_context(args.output_file)  # type: ignore

            _run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)

    return predict_inner
项目:taskcv-2017-public    作者:VisionLearningGroup    | 项目源码 | 文件源码
def adversarial_discriminator(net, layers, scope='adversary', leaky=False):
    if leaky:
        activation_fn = tflearn.activations.leaky_relu
    else:
        activation_fn = tf.nn.relu
    with ExitStack() as stack:
        stack.enter_context(tf.variable_scope(scope))
        stack.enter_context(
            slim.arg_scope(
                [slim.fully_connected],
                activation_fn=activation_fn,
                weights_regularizer=slim.l2_regularizer(2.5e-5)))
        for dim in layers:
            net = slim.fully_connected(net, dim)
        net = slim.fully_connected(net, 2, activation_fn=None)
    return net
项目:igd-exporter    作者:yrro    | 项目源码 | 文件源码
def search(timeout):
    '''
    Search for devices implementing WANCommonInterfaceConfig on the network.

    Search ends the specified number of seconds after the last result (if any) was received.

    Returns an iterator of root device URLs.
    '''
    with contextlib.ExitStack() as stack:
        sockets = []
        sockets.append(stack.enter_context(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)))
        sockets.append(stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)))

        for s in sockets:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            if s.family == socket.AF_INET6:
                s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

            with concurrent.futures.ThreadPoolExecutor(len(sockets)) as ex:
                return itertools.chain.from_iterable(ex.map(lambda s: search_socket(s, timeout, ns['i']), sockets))
项目:senti    作者:stevenxxiu    | 项目源码 | 文件源码
def write_split_emote(cls):
        retweet_re = re.compile(r'RT\s*"?[@?][a-zA-Z0-9_]+:?')
        with open('emote/all.txt', encoding='utf-8') as in_sr, ExitStack() as stack:
            out_srs = {
                name: stack.enter_context(open('emote/class_{}.txt'.format(name), 'w', encoding='utf-8'))
                for name in ['pos', 'neg']
            }
            for i, line in enumerate(in_sr):
                if retweet_re.search(line):
                    continue
                counts = [0, 0, 0]
                for match in cls.emoticon_re.finditer(line):
                    counts[Emoticons.assess_match(match)] += 1
                label = None
                if counts[0] > 0 and counts[1] == 0 and counts[2] == 0:
                    label = 0
                elif counts[0] == 0 and counts[1] == 0 and counts[2] > 0:
                    label = 2
                if label is not None:
                    out_srs[label].write(cls.emoticon_re.sub(' ', line).strip() + '\n')
项目:MUBench    作者:stg-tud    | 项目源码 | 文件源码
def post(url: str, data: object, file_paths: List[str] = None, username: str="", password: str=""):
    request = {
        "url": url,
        "data": json.dumps(data, cls=JSONFloatEncoder)
    }

    if username:
        if not password:
            password = getpass.getpass("Enter password for '{}': ".format(username))
        request["auth"] = (username, password)

    with ExitStack() as es:
        if file_paths:
            request["data"] = {"data": request["data"]}
            request["files"] = [__create_file_tuple(path, es) for path in file_paths]

        response = requests.post(**request)
        response.raise_for_status()
项目:aiotk    作者:AndreLouisCaron    | 项目源码 | 文件源码
def test_exit_stack_exception_propagate():

    h1 = mock.MagicMock()
    h2 = mock.MagicMock()
    v1 = mock.MagicMock()
    v2 = mock.MagicMock()
    error = ValueError('FUUU')

    with pytest.raises(ValueError) as exc:
        with ExitStack() as stack:
            v = stack.enter_context(AutoClose(h1, v=v1))
            assert v is v1
            v = stack.enter_context(AutoClose(h2, v=v2))
            assert v is v2
            raise error
    assert exc.value is error

    h2.close.assert_called_once_with(ValueError, error, mock.ANY)
    h1.close.assert_called_once_with(ValueError, error, mock.ANY)
项目:rankedftw    作者:andersroos    | 项目源码 | 文件源码
def __call__(self):
        status = 1
        with log_exception(status=1):
            args = self.parser.parse_args()
            log_args(args)
            config.log_cached()
            logger = getLogger('django')

            with ExitStack() as stack:
                if self._pid_file:
                    stack.enter_context(pid_file(dirname=config.PID_DIR, max_age=self._pid_file_max_age))

                if self._stoppable:
                    self._stoppable_instance = stoppable()
                    stack.enter_context(self._stoppable_instance)

                status = self.run(args, logger) or 0
        sys.exit(status)
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()
        self._resources = ExitStack()
        self.addCleanup(self._resources.close)
        # Capture builtin print() output.
        self._stdout = StringIO()
        self._stderr = StringIO()
        self._resources.enter_context(
            patch('argparse._sys.stdout', self._stdout))
        # Capture stderr since this is where argparse will spew to.
        self._resources.enter_context(
            patch('argparse._sys.stderr', self._stderr))
        # Set up a few other useful things for these tests.
        self._resources.enter_context(
            patch('ubuntu_image.__main__.logging.basicConfig'))
        self.model_assertion = resource_filename(
            'ubuntu_image.tests.data', 'model.assertion')
        self.classic_gadget_tree = resource_filename(
            'ubuntu_image.tests.data', 'gadget_tree')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_duplicate_volume_name(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first:
    bootloader: u-boot
    structure:
        - name: one
          type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
  second:
    structure:
        - name: two
          type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
  first:
    structure:
        - name: three
          type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(str(cm.exception), 'Duplicate key: first')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_hybrid_volume_type_2(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef,\
00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml @ volumes:first-image:structure:0:type')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_filesystem_bad(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          filesystem: zfs
""")
        self.assertEqual(
            str(cm.exception),
            ("Invalid gadget.yaml value 'zfs' @ "
             'volumes:<volume name>:structure:<N>:filesystem'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_structure_role_system_data_bad_label(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000feedface
          size: 200
          role: system-data
          filesystem-label: foobar
""")
        self.assertEqual(
            str(cm.exception),
            ('`role: system-data` structure must have an implicit label, '
             "or 'writable': foobar"))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_structure_type_role_conflict_1(self):
        # type:none means there's no partition, so you can't have a role of
        # system-{boot,data}.
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: ef
          size: 100
          role: mbr
  second-image:
    structure:
        - type: bare
          size: 200
          role: system-boot
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml: structure role/type conflict')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_structure_type_role_conflict_2(self):
        # type:none means there's no partition, so you can't have a role of
        # system-{boot,data}.
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: ef
          size: 100
          role: mbr
  second-image:
    structure:
        - type: bare
          size: 200
          role: system-data
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml: structure role/type conflict')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_structure_mbr_conflicting_id(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: ef
          role: mbr
          size: 100
          id: 00000000-0000-0000-0000-0000deadbeef
""")
        self.assertEqual(
            str(cm.exception),
            'mbr structures must not specify partition id')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_structure_mbr_conflicting_filesystem(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: ef
          role: mbr
          size: 100
          filesystem: ext4
""")
        self.assertEqual(
            str(cm.exception),
            'mbr structures must not specify a file system')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_special_type_mbr_and_role(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: mbr
          role: mbr
          size: 100
""")
        self.assertEqual(
            str(cm.exception),
            'Type mbr and role fields assigned at the same time, please use '
            'the mbr role instead')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_special_type_mbr_and_filesystem(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: mbr
          size: 100
          filesystem: ext4
""")
        self.assertEqual(
            str(cm.exception),
            'mbr structures must not specify a file system')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_schema(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: bad
    bootloader: u-boot
    structure:
        - type: ef
          size: 400M
""")
        self.assertEqual(
            str(cm.exception),
            "Invalid gadget.yaml value 'bad' @ volumes:<volume name>:schema")
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mbr_with_bogus_type(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: mbr
    bootloader: u-boot
    structure:
        - type: 801
          size: 400M
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml @ volumes:first-image:structure:0:type')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_bootloader(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boat
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(
            str(cm.exception),
            ("Invalid gadget.yaml value 'u-boat' @ "
             'volumes:<volume name>:bootloader'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_missing_bootloader_multiple_volumes(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
  second-image:
    schema: gpt
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
  third-image:
    schema: gpt
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(str(cm.exception), 'No bootloader structure named')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_volume_id(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    id: 3g
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(str(cm.exception),
                         'Invalid gadget.yaml @ volumes:first-image:id')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_integer_volume_id(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    id: 801
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(str(cm.exception),
                         'Invalid gadget.yaml @ volumes:first-image:id')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_disallow_hybrid_volume_id(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          id: 80,00000000-0000-0000-0000-0000deadbeef
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml @ volumes:first-image:structure:0:id')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_offset_write_relative_syntax_error(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          offset-write: some_label%2112
""")
        self.assertEqual(
            str(cm.exception),
            ('Invalid gadget.yaml @ '
             'volumes:first-image:structure:0:offset-write'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_offset_write_larger_than_32bit(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          offset-write: 8G
""")
        self.assertEqual(
            str(cm.exception),
            ('Invalid gadget.yaml @ '
             'volumes:first-image:structure:0:offset-write'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_offset_write_is_4G(self):
        # 4GiB is just outside 32 bits.
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          offset-write: 4G
""")
        self.assertEqual(
            str(cm.exception),
            ('Invalid gadget.yaml @ '
             'volumes:first-image:structure:0:offset-write'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_bad_hybrid_volume_type_2(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef,\
00000000-0000-0000-0000-0000deadbeef
          size: 400M
""")
        self.assertEqual(
            str(cm.exception),
            'Invalid gadget.yaml @ volumes:first-image:structure:0:type')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_volume_filesystem_bad(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          filesystem: zfs
""")
        self.assertEqual(
            str(cm.exception),
            ("Invalid gadget.yaml value 'zfs' @ "
             'volumes:<volume name>:structure:<N>:filesystem'))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_content_spec_b_offset_write_is_4G(self):
        # 4GiB is just outside 32 bits.
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          content:
          - image: foo.img
            offset-write: 4G
""")
        # XXX https://github.com/alecthomas/voluptuous/issues/239
        front, colon, end = str(cm.exception).rpartition(':')
        self.assertEqual(
            front,
            'Invalid gadget.yaml @ volumes:first-image:structure:0:content:0')
        self.assertIn(end, ['offset-write', 'image'])
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_wrong_content_1(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          filesystem: none
          content:
          - source: subdir/
            target: /
""")
        self.assertEqual(str(cm.exception),
                         'filesystem: none missing image file name')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_wrong_content_2(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    schema: gpt
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 400M
          filesystem: ext4
          content:
          - image: foo.img
""")
        self.assertEqual(str(cm.exception),
                         'filesystem: vfat|ext4 missing source/target')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mbr_structure_not_at_offset_zero_explicit(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - role: mbr
          type: 00000000-0000-0000-0000-0000deadbeef
          size: 446
          offset: 10
""")
        self.assertEqual(str(cm.exception),
                         'mbr structure must start at offset 0')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mbr_structure_not_at_offset_zero_implicit(self):
        with ExitStack() as resources:
            cm = resources.enter_context(
                self.assertRaises(GadgetSpecificationError))
            parse("""\
volumes:
  first-image:
    bootloader: u-boot
    structure:
        - type: 00000000-0000-0000-0000-0000deadbeef
          size: 2M
        - role: mbr
          type: 00000000-0000-0000-0000-0000deadbeef
          size: 446
""")
        self.assertEqual(str(cm.exception),
                         'mbr structure must start at offset 0')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mkfs_ext4(self):
        with ExitStack() as resources:
            tmpdir = resources.enter_context(TemporaryDirectory())
            results_dir = os.path.join(tmpdir, 'results')
            mock = MountMocker(results_dir)
            resources.enter_context(
                patch('ubuntu_image.helpers.run', mock.run))
            # Create a temporary directory and populate it with some stuff.
            contents_dir = resources.enter_context(TemporaryDirectory())
            with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp:
                fp.write(b'01234')
            with open(os.path.join(contents_dir, 'b.dat'), 'wb') as fp:
                fp.write(b'56789')
            # And a fake image file.
            img_file = resources.enter_context(NamedTemporaryFile())
            mkfs_ext4(img_file, contents_dir)
            # Two files were put in the "mountpoint" directory, but because of
            # above, we have to check them in the results copy.
            with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp:
                self.assertEqual(fp.read(), b'01234')
            with open(os.path.join(mock.results_dir, 'b.dat'), 'rb') as fp:
                self.assertEqual(fp.read(), b'56789')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mkfs_ext4_no_contents(self):
        with ExitStack() as resources:
            tmpdir = resources.enter_context(TemporaryDirectory())
            results_dir = os.path.join(tmpdir, 'results')
            mock = MountMocker(results_dir)
            resources.enter_context(
                patch('ubuntu_image.helpers.run', mock.run))
            # Create a temporary directory, but this time without contents.
            contents_dir = resources.enter_context(TemporaryDirectory())
            # And a fake image file.
            img_file = resources.enter_context(NamedTemporaryFile())
            mkfs_ext4(img_file, contents_dir)
            # Because there were no contents, the `sudo cp` was never called,
            # the mock's shutil.copytree() was also never called, therefore
            # the results_dir was never created.
            self.assertFalse(os.path.exists(results_dir))
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_mkfs_ext4_preserve_ownership(self):
        with ExitStack() as resources:
            tmpdir = resources.enter_context(TemporaryDirectory())
            results_dir = os.path.join(tmpdir, 'results')
            mock = MountMocker(results_dir)
            resources.enter_context(
                patch('ubuntu_image.helpers.run', mock.run))
            # Create a temporary directory and populate it with some stuff.
            contents_dir = resources.enter_context(TemporaryDirectory())
            with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp:
                fp.write(b'01234')
            # And a fake image file.
            img_file = resources.enter_context(NamedTemporaryFile())
            mkfs_ext4(img_file, contents_dir, preserve_ownership=True)
            with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp:
                self.assertEqual(fp.read(), b'01234')
            self.assertTrue(mock.preserves_ownership)
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_hook_fired(self):
        with ExitStack() as resources:
            hooksdir = resources.enter_context(TemporaryDirectory())
            hookfile = os.path.join(hooksdir, 'test-hook')
            resultfile = os.path.join(hooksdir, 'result')
            env = {'UBUNTU_IMAGE_TEST_ENV': 'true'}
            with open(hookfile, 'w') as fp:
                fp.write("""\
#!/bin/sh
echo -n "$UBUNTU_IMAGE_TEST_ENV" >>{}
""".format(resultfile))
            os.chmod(hookfile, 0o744)
            manager = HookManager([hooksdir])
            manager.fire('test-hook', env)
            # Check if the script ran once as expected.
            self.assertTrue(os.path.exists(resultfile))
            with open(resultfile, 'r') as fp:
                self.assertEqual(fp.read(), 'true')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def test_hook_error(self):
        with ExitStack() as resources:
            hooksdir = resources.enter_context(TemporaryDirectory())
            hookfile = os.path.join(hooksdir, 'test-hook')
            with open(hookfile, 'w') as fp:
                fp.write(dedent("""\
                                #!/bin/sh
                                echo -n "error" 1>&2
                                exit 1
                                """))
            os.chmod(hookfile, 0o744)
            manager = HookManager([hooksdir])
            # Check if hook script failures are properly reported
            with self.assertRaises(HookError) as cm:
                manager.fire('test-hook')
            self.assertEqual(cm.exception.hook_name, 'test-hook')
            self.assertEqual(cm.exception.hook_path, hookfile)
            self.assertEqual(cm.exception.hook_retcode, 1)
            self.assertEqual(cm.exception.hook_stderr, 'error')
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def atomic(dst, encoding='utf-8'):
    """Open a temporary file for writing using the given encoding.

    The context manager returns an open file object, into which you can write
    text or bytes depending on the encoding it was opened with.  Upon exit,
    the temporary file is moved atomically to the destination.  If an
    exception occurs, the temporary file is removed.

    :param dst: The path name of the target file.
    :param encoding: The encoding to use for the open file.  If None, then
        file is opened in binary mode.
    """
    directory = os.path.dirname(dst)
    mode = 'wb' if encoding is None else 'wt'
    with ExitStack() as resources:
        fp = resources.enter_context(NamedTemporaryFile(
            mode=mode, encoding=encoding, dir=directory, delete=False))
        yield fp
        os.rename(fp.name, dst)
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def update_changelog(repo, series, version):
    # Update d/changelog.
    with ExitStack() as resources:
        debian_changelog = os.path.join(
            repo.working_dir, 'debian', 'changelog')
        infp = resources.enter_context(
            open(debian_changelog, 'r', encoding='utf-8'))
        outfp = resources.enter_context(atomic(debian_changelog))
        changelog = Changelog(infp)
        # Currently, master is always Zesty.
        changelog.distributions = series
        series_version = {
            'bionic': '18.04',
            'artful': '17.10',
            'zesty': '17.04',
            'xenial': '16.04',
            }[series]
        new_version = '{}+{}ubuntu1'.format(version, series_version)
        changelog.version = new_version
        changelog.write_to_open_file(outfp)
    return new_version
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def __call__(self, f):
        @wrapt.decorator
        def test_with_fixtures(wrapped, instance, args, kwargs):
            fixture_instances = [i for i in list(args) + list(kwargs.values())
                                     if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures]
            dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances)

            if six.PY2:
                with contextlib.nested(*list(dependency_ordered_fixtures)):
                    return wrapped(*args, **kwargs)
            else:
                with contextlib.ExitStack() as stack:
                    for fixture in dependency_ordered_fixtures:
                        stack.enter_context(fixture)
                    return wrapped(*args, **kwargs)

        ff = test_with_fixtures(f)
        arg_names = self.fixture_arg_names(ff)
        return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_create_snapshot(self):
        raw_traces = [(5, (('a.py', 2),))]

        with contextlib.ExitStack() as stack:
            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
                                             return_value=True))
            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
                                             return_value=5))
            stack.enter_context(patch.object(tracemalloc, '_get_traces',
                                             return_value=raw_traces))

            snapshot = tracemalloc.take_snapshot()
            self.assertEqual(snapshot.traceback_limit, 5)
            self.assertEqual(len(snapshot.traces), 1)
            trace = snapshot.traces[0]
            self.assertEqual(trace.size, 5)
            self.assertEqual(len(trace.traceback), 1)
            self.assertEqual(trace.traceback[0].filename, 'a.py')
            self.assertEqual(trace.traceback[0].lineno, 2)
项目:aiosmtpd    作者:aio-libs    | 项目源码 | 文件源码
def setUp(self):
        old_log_level = log.getEffectiveLevel()
        self.addCleanup(log.setLevel, old_log_level)
        self.resources = ExitStack()
        # Create a new event loop, and arrange for that loop to end almost
        # immediately.  This will allow the calls to main() in these tests to
        # also exit almost immediately.  Otherwise, the foreground test
        # process will hang.
        #
        # I think this introduces a race condition.  It depends on whether the
        # call_later() can possibly run before the run_forever() does, or could
        # cause it to not complete all its tasks.  In that case, you'd likely
        # get an error or warning on stderr, which may or may not cause the
        # test to fail.  I've only seen this happen once and don't have enough
        # information to know for sure.
        default_loop = asyncio.get_event_loop()
        loop = asyncio.new_event_loop()
        loop.call_later(0.1, loop.stop)
        self.resources.callback(asyncio.set_event_loop, default_loop)
        asyncio.set_event_loop(loop)
        self.addCleanup(self.resources.close)
项目:aiosmtpd    作者:aio-libs    | 项目源码 | 文件源码
def test_exception_handler_exception(self):
        handler = ErroringErrorHandler()
        controller = ErrorController(handler)
        controller.start()
        self.addCleanup(controller.stop)
        with ExitStack() as resources:
            # Suppress logging to the console during the tests.  Depending on
            # timing, the exception may or may not be logged.
            resources.enter_context(patch('aiosmtpd.smtp.log.exception'))
            client = resources.enter_context(
                SMTP(controller.hostname, controller.port))
            code, response = client.helo('example.com')
        self.assertEqual(code, 500)
        self.assertEqual(response,
                         b'Error: (ValueError) ErroringErrorHandler test')
        self.assertIsInstance(handler.error, ValueError)