Python django.test 模块,TransactionTestCase() 实例源码

我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用django.test.TransactionTestCase()

项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_process_no_sieve(self):
        """
        Tests the process method for a chute with no sieve.
        """
        mock_doc_id = 1

        email = {'Message-ID': 'abc', 'Subject': 'This is an Urgent Alert'}
        doc_obj = DocumentObj(data=email)

        mailchute = MailChute.objects.get(pk=3)
        mailchute.enabled = True
        mailchute.munger.process = Mock(return_value=mock_doc_id)

        doc_id = mailchute.process(doc_obj)

        mailchute.munger.process.assert_called_once_with(doc_obj)
        self.assertEqual(doc_id, mock_doc_id)


# NOTE: use TransactionTestCase to handle threading
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_multiprocess_muzzled(self):
        """
        Tests muzzling when multiple duplicate Alerts are being processed
        concurrently.
        """
        with patch('alerts.models.Alert._format_title',
                   return_value=self.data['content']['subject']):
            incident_num = 20

            alert_count = Alert.objects.count()

            args = [self.doc_obj]

            alert = self.email_wdog.process(*args)

            # check that a new Alert was created
            self.assertEqual(Alert.objects.count(), alert_count + 1)

            # NOTE: we can't use multiprocessing with Mocks,
            # so we have to settle for using threading to mimic concurrency

            threads = []
            for dummy_index in range(incident_num):
                new_thread = threading.Thread(target=self.email_wdog.process,
                                              args=args)
                threads.append(new_thread)
                new_thread.start()

            for thread in threads:
                thread.join()

            # NOTE: we can't check Alert counts because saved Alerts
            # won't be committed in the TransactionTestCase

            # but we can check that the previous Alert was incremented
            alert = Alert.objects.get(pk=alert.pk)
            self.assertEqual(alert.incidents, incident_num + 1)
项目:mes    作者:osess    | 项目源码 | 文件源码
def __call__(self, test_func):
            from django.test import TransactionTestCase
            if isinstance(test_func, type):
                if not issubclass(test_func, TransactionTestCase):
                    raise Exception(
                        "Only subclasses of Django SimpleTestCase "
                        "can be decorated with override_settings")
                original_pre_setup = test_func._pre_setup
                original_post_teardown = test_func._post_teardown

                def _pre_setup(innerself):
                    self.enable()
                    original_pre_setup(innerself)

                def _post_teardown(innerself):
                    original_post_teardown(innerself)
                    self.disable()
                test_func._pre_setup = _pre_setup
                test_func._post_teardown = _post_teardown
                return test_func
            else:
                @wraps(test_func)
                def inner(*args, **kwargs):
                    with self:
                        return test_func(*args, **kwargs)
            return inner
项目:edx-django-release-util    作者:edx    | 项目源码 | 文件源码
def test_run_migrations_success(self):
        """
        Test the migration success path.
        """
        # Using TransactionTestCase sets up the migrations as set up for the test.
        # Reset the release_util migrations to the very beginning - i.e. no tables.
        call_command("migrate", "release_util", "zero", verbosity=0)

        input_yaml = """
        database: 'default',
        migrations:
          - [release_util, 0001_initial]
          - [release_util, 0002_second]
          - [release_util, 0003_third]
        initial_states:
          - [release_util, zero]
        """
        output = [
            {
                'database': 'default',
                'duration': None,
                'failed_migration': None,
                'migration': 'all',
                'output': None,
                'succeeded_migrations': [
                    ['release_util', '0001_initial'],
                    ['release_util', '0002_second'],
                    ['release_util', '0003_third'],
                ],
                'traceback': None,
                'succeeded': True,
            },
        ]

        out_file = tempfile.NamedTemporaryFile(suffix='.yml')
        in_file = tempfile.NamedTemporaryFile(suffix='.yml')
        in_file.write(input_yaml.encode('utf-8'))
        in_file.flush()

        # Check the stdout output against the expected output.
        self._check_command_output(
            cmd='run_migrations',
            cmd_args=(in_file.name,),
            cmd_kwargs={'output_file': out_file.name},
            output=output,
        )
        in_file.close()

        # Check the contents of the output file against the expected output.
        with open(out_file.name, 'r') as f:
            output_yaml = f.read()
        parsed_yaml = yaml.safe_load(output_yaml)
        self.assertTrue(isinstance(parsed_yaml, list))
        parsed_yaml = self._null_certain_fields(parsed_yaml)
        self.assertEqual(yaml.dump(output), yaml.dump(parsed_yaml))
        out_file.close()
项目:edx-django-release-util    作者:edx    | 项目源码 | 文件源码
def test_run_migrations_failure(self, migration_name, migration_output):
        """
        Test the first, second, and last migration failing.
        """
        # Using TransactionTestCase sets up the migrations as set up for the test.
        # Reset the release_util migrations to the very beginning - i.e. no tables.
        call_command("migrate", "release_util", "zero", verbosity=0)

        input_yaml = """
        database: 'default',
        migrations:
          - [release_util, 0001_initial]
          - [release_util, 0002_second]
          - [release_util, 0003_third]
        initial_states:
          - [release_util, zero]
        """
        out_file = tempfile.NamedTemporaryFile(suffix='.yml')
        in_file = tempfile.NamedTemporaryFile(suffix='.yml')
        in_file.write(input_yaml.encode('utf-8'))
        in_file.flush()

        # A bogus class for creating a migration object that will raise a CommandError.
        class MigrationFail(object):
            atomic = False
            def state_forwards(self, app_label, state):
                pass
            def database_forwards(self, app_label, schema_editor, from_state, to_state):
                raise CommandError("Yo")

        # Insert the bogus object into the first operation of a migration.
        current_migration_list = release_util.tests.migrations.test_migrations.__dict__[migration_name].__dict__['Migration'].operations
        current_migration_list.insert(0, MigrationFail())

        try:
            # Check the stdout output.
            self._check_command_output(
                cmd="run_migrations",
                cmd_args=(in_file.name,),
                cmd_kwargs={'output_file': out_file.name},
                output=migration_output,
                err_output="Migration error: Migration failed for app 'release_util' - migration '{}'.".format(migration_name),
                exit_value=1
            )
        finally:
            # Whether the test passes or fails, always pop the failure migration of the list.
            current_migration_list.pop(0)

        in_file.close()
        out_file.close()