Python django.db.models.signals.post_save 模块,disconnect() 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用django.db.models.signals.post_save.disconnect()

项目:EnglishDiary    作者:jupiny    | 项目源码 | 文件源码
def setUp(self):

        # Disable Signals
        post_save.disconnect(post_save_diary, sender=Diary)

        # Run celery task synchronous
        app.conf.update(CELERY_ALWAYS_EAGER=True)

        self.test_username = TEST_USERNAME
        self.test_password = TEST_PASSWORD
        self.test_email = TEST_EMAIL

        # Create a user
        self.user = get_user_model().objects.create_user(
            username=self.test_username,
            password=self.test_password,
            email=self.test_email,
        )

        # Login
        self.client = APIClient()
        self.client.login(
            username=self.test_username,
            password=self.test_password,
        )
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            vars_data.append({
                'name': 'test_var',
                'description': 'Is a test!',
                'value': 'Hello!',
            })

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('test_var')
        self.assertIsNotNone(var_data)
        self.assertEqual(var_data['value'], 'Hello!')
        self.assertEqual(var_data['description'], 'Is a test!')

        # test replace
        content = replace_template_vars('{{ test_var }}')
        self.assertEqual(content, 'Hello!')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_edit_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            var_data = self._search_name(vars_data, 'email')
            self.assertIsNotNone(var_data)
            var_data['value'] = 'Hello Word!'

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('email')
        self.assertIsNotNone(var_data)
        self.assertEqual(var_data['value'], 'Hello Word!')

        # test replace
        content = replace_template_vars('{{ email }}')
        self.assertEqual(content, 'Hello Word!')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            var_data = self._search_name(vars_data, 'email')
            self.assertIsNotNone(var_data)
            vars_data.remove(var_data)

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('email')
        self.assertIsNone(var_data)

        # test replace
        content = replace_template_vars('{{ email }}')
        self.assertEqual(content, '{{ email }}')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_item(self):
        # add value handler
        def handler(urls, **kwarg):
            urls.append({
                'name': 'test',
                'url': 'http://google.fr',
            })

        make_menu.connect(handler)

        # test
        item = self.get('test')
        self.assertIsNotNone(item)
        self.assertEqual(item['url'], 'http://google.fr')

        # clean
        self.assertTrue(make_menu.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_attachment_executed(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Outlook')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
        tracker_url = attachment['tracker_url']

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
            self.client.get(tracker_url)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_email_open(self):
        def handler(instance, **kwarg):
            # get email open event
            if instance.target_tracker.key != TRACKER_EMAIL_OPEN:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Outlook')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        mail_html = mail.outbox[-1].alternatives[0][0]
        tracker_url = mail_html.split('src="')[-1].split('"')[0]

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
            self.client.get(tracker_url)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_edit_campaign_report_item(self):
        def handler(context, **kwarg):
            context['tabs_layout']['Test'] = context['tabs_layout']. \
                pop('Other')

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertIn('<a href="#test" aria-controls="test" role="tab" '
                      'data-toggle="tab">Test</a>', content)
        self.assertIn('id="test"', content)
        self.assertNotIn('<a href="#other" aria-controls="other" '
                         'role="tab" data-toggle="tab">Other</a>', content)
        self.assertNotIn('id="other"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def create_auth(sender, instance, created, **kwargs):
        """
        A hook that run when we create a new network that creates
        an auth user and token that BTSs on the network use to
        authenticate.
        """
        if not instance.auth_group or not instance.auth_user:
            instance.auth_group, created_group = Group.objects.get_or_create(name='network_%s'
                % instance.pk)
            if created_group:
                assign_perm('view_network', instance.auth_group, instance)

            post_save.disconnect(UserProfile.new_user_hook, sender=User)
            instance.auth_user, created_user = User.objects.get_or_create(username='network_%s'
                % instance.pk)
            if created_user:
                Token.objects.create(user=instance.auth_user)
                instance.auth_group.user_set.add(instance.auth_user)
            post_save.connect(UserProfile.new_user_hook, sender=User)
            instance.save()
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_delete_object_is_deleted(self):
        pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
        pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)
            pk = book.pk
            try:
                book.delete()
                klass.nodes.get(pk=pk)
                self.fail('Did not raise when trying to get non-existent book node.')
            except klass.DoesNotExist as e:
                self.assertEqual(str(e), "{'pk': %d}" % pk)
        finally:
            pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
            pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_null_foreignkey_is_disconnected(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            store = StoreFixture(Store, generate_m2m=False).create_one()
            klass = get_node_class_for_model(Store)

            self.assertEqual(store.bestseller.pk,
                             get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(bestseller=True)))

            store.bestseller = None
            store.save()

            self.assertEqual(0, len(klass.nodes.has(bestseller=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-statusboard    作者:edigiacomo    | 项目源码 | 文件源码
def test_previous_status(self):
        from django.db.models.signals import post_save

        def check_different(sender, instance, **kwargs):
            check_different.is_different = (instance._status != instance.status)

        check_different.is_different = None

        post_save.connect(check_different)

        s = Service(name="service", description="test", status=0)
        s.save()
        self.assertFalse(check_different.is_different)
        s.status = 0
        s.save()
        self.assertFalse(check_different.is_different)
        s.status = 1
        s.save()
        self.assertIsNotNone(check_different.is_different)
        self.assertTrue(check_different.is_different)

        post_save.disconnect(check_different)
项目:LIMS-Backend    作者:LeafLIMS    | 项目源码 | 文件源码
def _fire_alerts(self):
        # Re-enable signals to enable testing of them
        post_save.connect(receiver=TriggerSet._fire_triggersets, dispatch_uid='Fire Trigger Sets')
        self._asJaneDoe()
        updated_address = {"city": "London", "country": "England"}
        response = self._client.patch("/addresses/%d/" % self._janeDoeAddress.id,
                                      updated_address, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        self._asJoeBloggs()
        updated_address = {"country": "England"}
        response = self._client.patch("/addresses/%d/" % self._joeBloggsAddress.id,
                                      updated_address, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        # Disable signals to avoid affecting other tests
        post_save.disconnect(receiver=TriggerSet._fire_triggersets,
                             dispatch_uid='Fire Trigger Sets')
项目:alexandriadocs    作者:srtab    | 项目源码 | 文件源码
def test_post(self, mmime):
        # to avoid files extraction
        post_save.disconnect(ImportedArchive.post_save, sender=ImportedArchive)
        data = {
            'archive': SimpleUploadedFile('docs.tar.gz', b'content')
        }
        response = self.client.post(self.url, data, **self.headers)
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
        self.assertEqual(ImportedArchive.objects.count(), 1)
项目:alexandriadocs    作者:srtab    | 项目源码 | 文件源码
def teardown(self):
        for model in self.register_models:
            post_save.disconnect(self.handle_save, sender=model)
            post_delete.disconnect(self.handle_delete, sender=model)
项目:django-audit-tools    作者:PeRDy    | 项目源码 | 文件源码
def unregister(model):
    """Unregister a model to the audit code.

    :param model: Model to unregister.
    :type model: object
    """
    try:
        pre_save.disconnect(_pre_save, sender=model, dispatch_uid=str(model))
        post_save.disconnect(_post_save, sender=model, dispatch_uid=str(model))
        pre_delete.disconnect(_pre_delete, sender=model, dispatch_uid=str(model))
    except Exception as e:
        logger.error("<Unregister> %s", e.message)
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_item(self):
        # add value handler
        def handler(urls, **kwarg):
            item = self._search_name(urls, 'Campaigns')
            self.assertIsNotNone(item)
            urls.remove(item)

        make_menu.connect(handler)

        # test
        item = self.get('Campaigns')
        self.assertIsNone(item)

        # clean
        self.assertTrue(make_menu.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_attachment_executed_with_data(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Thunderbird')
            self.assertEqual(instance.raw, '{"hello": "world!"}')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
        tracker_url = attachment['tracker_url']

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Thunderbird'
            self.client.post(tracker_url, {'hello': 'world!'})

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_email_send(self):
        def handler(instance, **kwarg):
            # get email open event
            if instance.key != TRACKER_EMAIL_SEND:
                # ignore other target event
                return

            self.assertEqual(instance.value, 'success')

            # add entry on db for set test in success => SuccessException make
            # infinite loop
            TrackerInfos.objects.create(
                target_tracker=instance,
                raw='handler_%s_test_ok' % TRACKER_EMAIL_SEND
            )

        post_save.connect(handler, sender=Tracker)

        # send mail
        campaign = self.send_campaign()

        # test if handler has call
        raw = 'handler_%s_test_ok' % TRACKER_EMAIL_SEND
        test_result_tracker = TrackerInfos.objects.filter(raw=raw).first()
        self.assertIsNotNone(test_result_tracker)
        tracker = test_result_tracker.target_tracker
        self.assertEqual(campaign.pk, tracker.campaign.pk)
        self.assertEqual(TRACKER_EMAIL_SEND, tracker.key)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=Tracker))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_landing_page_post(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_LANDING_PAGE_POST:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Firefox')
            self.assertEqual(instance.referer, 'https://webmail.com')
            self.assertIn('"login": "Admin"', instance.raw)
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        tracker_url = mail.outbox[-1].body

        # call landing page
        response = self.client.get(tracker_url)
        self.assertEqual(response.status_code, 200)

        # get form infos
        html = response.content.decode()
        form = BeautifulSoup(html, 'html.parser').find('form')
        post_url = form.get('action')
        post_data = {i.get('name'): i.get('value')
                     for i in form.find_all('input')}
        post_data['login'] = 'Admin'

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Firefox'
            self.client.defaults['HTTP_REFERER'] = 'https://webmail.com'
            self.client.post(post_url, post_data)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_campaign_report_item(self):
        def handler(context, **kwarg):
            context['tabs_layout']['Test'] = []

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertIn('<a href="#test" aria-controls="test" role="tab" '
                      'data-toggle="tab">Test</a>', content)
        self.assertIn('id="test"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_campaign_report_item(self):
        def handler(context, **kwarg):
            del(context['tabs_layout']['Other'])

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertNotIn('<a href="#other" aria-controls="other" '
                         'role="tab" data-toggle="tab">Other</a>', content)
        self.assertNotIn('id="other"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:exchange    作者:boundlessgeo    | 项目源码 | 文件源码
def register_post_save_functions():
    # Disconnect first in case this function is called twice
    logger.debug('Thumbnail: Registering post_save functions.')
    post_save.disconnect(generate_thumbnail, sender=Layer)
    post_save.connect(generate_thumbnail, sender=Layer, weak=False)
    post_save.disconnect(generate_thumbnail, sender=Map)
    post_save.connect(generate_thumbnail, sender=Map, weak=False)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def stop(self):
        post_save.disconnect(self._table_changed)

        self.exit = True
        self.change_event.set()
项目:django-knocker    作者:nephila    | 项目源码 | 文件源码
def _disconnect(self):
        """
        Disconnect signal from current model
        """
        post_save.disconnect(
            notify_items, sender=self.__class__,
            dispatch_uid='knocker_{0}'.format(self.__class__.__name__)
        )
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_create_new_object_is_synced(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)

            self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_add(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = AuthorFixture(Author).create_one()
            book.authors.add(author)
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_clear(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            book.authors.clear()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_clear_reverse(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            author.book_set.clear()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_remove(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            book.authors.remove(author)
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_remove_reverse(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            author.book_set.remove(book)
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def tearDown(self):
        super(TagAlertTestCase, self).tearDown()
        post_save.disconnect(tag_alert, sender=Alert)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def tearDown(self):
        super(TagAnalysisTestCase, self).tearDown()
        post_save.disconnect(tag_analysis, sender=Analysis)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def tearDown(self):
        super(TagCommentTestCase, self).tearDown()
        post_save.disconnect(tag_comment, sender=Comment)
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def _disconnect_user_post_save_for_migrations(self, sender, **kwargs):  # pylint: disable=unused-argument
        """
        Handle pre_migrate signal - disconnect User post_save handler.
        """
        from django.db.models.signals import post_save
        post_save.disconnect(sender=self.auth_user_model, dispatch_uid=USER_POST_SAVE_DISPATCH_UID)
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def tearDown(self):
        post_save.disconnect(self.post_save_listener, sender=User)
项目:mes    作者:osess    | 项目源码 | 文件源码
def unregister(self, model):
        """Removes a model from version control."""
        if not self.is_registered(model):
            raise RegistrationError("{model} has not been registered with django-reversion".format(
                model = model,
            ))
        del self._registered_models[model]
        post_save.disconnect(self._post_save_receiver, model)
        pre_delete.disconnect(self._pre_delete_receiver, model)
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def _disconnect_signals():
    """ used in testing """
    post_save.disconnect(plan_watchers.on_plan_save, TestPlan)
    pre_delete.disconnect(plan_watchers.on_plan_delete, TestPlan)
    pre_save.disconnect(plan_watchers.pre_save_clean, TestPlan)
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def _disconnect_signals():
    # used in testing
    post_save.disconnect(case_watchers.on_case_save, TestCase)
    post_delete.disconnect(case_watchers.on_case_delete, TestCase)
    pre_save.disconnect(case_watchers.pre_save_clean, TestCase)
项目:django-telegram-bot    作者:jlmadurga    | 项目源码 | 文件源码
def set_api(sender, instance, **kwargs):
    #  Always reset the _bot instance after save, in case the token changes.
    instance._bot = BotAPI(instance.token)

    # set webhook
    url = None
    cert = None
    if instance.enabled:
        webhook = reverse('telegrambot:webhook', kwargs={'token': instance.token})        
        from django.contrib.sites.models import Site
        current_site = Site.objects.get_current()
        if instance.https_port is None:
            url = 'https://' + current_site.domain + webhook
        else:
            url = 'https://' + current_site.domain + ':' + str(instance.https_port) + webhook
    if instance.ssl_certificate:
        instance.ssl_certificate.open()
        cert = instance.ssl_certificate

    instance._bot.setWebhook(webhook_url=url, 
                             certificate=cert)
    logger.info("Success: Webhook url %s for bot %s set" % (url, str(instance)))

    #  complete  Bot instance with api data
    if not instance.user_api:
        bot_api = instance._bot.getMe()

        botdict = bot_api.to_dict()
        modelfields = [f.name for f in User._meta.get_fields()]
        params = {k: botdict[k] for k in botdict.keys() if k in modelfields}
        user_api, _ = User.objects.get_or_create(**params)
        instance.user_api = user_api

        # Prevent signal recursion, and save.
        post_save.disconnect(set_api, sender=sender)
        instance.save()
        post_save.connect(set_api, sender=sender)

        logger.info("Success: Bot api info for bot %s set" % str(instance))
项目:tn2    作者:hsoft    | 项目源码 | 文件源码
def handle(self, *app_labels, **options):
        if Project.objects.exists():
            print("We already have data. Skip load_initial_data.")
            return
        assert post_save.disconnect(sender=DiscussionComment, dispatch_uid='discussioncomment_was_saved')
        FIXTURES = [
            'pages',
            'emails',
            'articlecategory',
            'patterncategory',
            'sample_data',
        ]
        for fixture in FIXTURES:
            call_command('loaddata', fixture, app_label='tn2app')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_recursive_connect(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            for depth in range(3):
                db.cypher_query('MATCH (n)-[r]-() WHERE n.type = "ModelNode" DELETE r')  # Delete all relationships
                book_node = get_node_for_object(book).save()
                book_node.recursive_connect(depth)

                if depth == 0:
                    # Max depth 0 means that no recursion should occur, and no connections
                    # can be made, because the connected objects might not exist.
                    for prop in book_node.defined_properties(aliases=False, properties=False).keys():
                        relation = getattr(book_node, prop)
                        try:
                            self.assertEqual(len(relation.all()), 0)
                        except CardinalityViolation:
                            # Will raise CardinalityViolation for nodes which has a single
                            # required relationship
                            continue

                elif depth == 1:
                    self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(store_set=True)))
                    self.assertEqual(0, len(get_node_class_for_model(Store).nodes.has(books=True)))

                    self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(bestseller_stores=True)))
                    self.assertEqual(0, len(get_node_class_for_model(Store).nodes.has(bestseller=True)))

                    self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(publisher=True)))
                    self.assertEqual(1, len(get_node_class_for_model(Publisher).nodes.has(book_set=True)))

                    self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
                    self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(book_set=True)))

                    self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(user=True)))
                    self.assertEqual(0, len(get_node_class_for_model(User).nodes.has(author=True)))

                    self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(tags=True)))
                    self.assertEqual(0, len(get_node_class_for_model(Tag).nodes.has(content_type=True)))

                elif depth == 2:
                    self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(user=True)))
                    self.assertEqual(1, len(get_node_class_for_model(User).nodes.has(author=True)))
                    self.assertEqual(1, len(get_node_class_for_model(Tag).nodes.has(content_type=True)))
                    self.assertEqual(1, len(get_node_class_for_model(ContentType)
                                            .nodes.has(content_type_set_for_tag=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')