我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用django.db.models.signals.post_delete()。
def post_delete_handler(sender, instance, **kwargs): """ Called when row is deleted. """ logger.info( 'Singal sync on post_delete from django to google fusiontables' ' for model: %s and row id: %s', sender._meta.db_table, instance.id ) try: kft = _get_kft_instance(sender=sender, raw=kwargs.get('raw'))[1] kft.delete_rows(sender=sender, row_id=instance._ft_id) except SkipException as exc: logger.debug("Skip synchronization: %s", exc.args) # else: # raise CannotCreateInstanceException( # "Internal error: Cannot create kfusiontables instance." # )
def ready(self): from django.db.models.signals import post_save, post_delete from devour.django.receivers import produce_post_save, produce_post_delete from django.conf import settings import os signal_types = { 'post_save': produce_post_save, 'post_delete': produce_post_delete } if getattr(settings, 'KAFKA_CONFIG', None): try: excl = settings.KAFKA_CONFIG['producers'].get('exclude') except: #TODO: Error handling excl = [] pass for s,f in signal_types.items(): if next((True for ex in excl if s == ex), False): del signal_types[s] for sig in signal_types.keys(): getattr(signals, sig).connect(signal_types[sig])
def handle(self, *args, **options): with mute_signals(post_save, post_delete): logger.info('Destroying all people') Person.objects.all().delete() logger.info('Creating 20 people') people = PersonFactory.create_batch(20) logger.info('Pushing new people to connected clients') SyncConsumer.broadcast('list', people) send_notification( 'info', ('<strong>Database Reset!</strong>' ' Your data is updated automatically!' ' Database intentionally wiped every few minutes.') )
def __init__(self, create=True, update=True, delete=True, custom=None): from actionslog.receivers import action_log_create, action_log_update, action_log_delete self._registry = {} self._signals = {} if create: self._signals[post_save] = action_log_create if update: self._signals[pre_save] = action_log_update if delete: self._signals[post_delete] = action_log_delete if custom is not None: self._signals.update(custom)
def signals(patching): signals = Mock(name='signals') patching('django.db.models.signals.post_save', signals.post_save) patching('django.db.models.signals.post_delete', signals.post_delete) patching('django.db.models.signals.m2m_changed', signals.m2m_changed) return signals
def delete_code_after_game_delete(sender, instance, **kwargs): """ Cleans remaining file field after its related object is deleted. Notes: Signal is registered on post_delete to ensure file remains on disk in case deleting object from database fails. """ instance.code.delete(save=False)
def delete_code_after_robot_delete(sender, instance, **kwargs): """ Cleans remaining file field after its related object is deleted. Notes: Signal is registered on post_delete to ensure file remains on disk in case deleting object from database fails. """ instance.code.delete(save=False)
def unseed_db(): """ Deletes all seed data from the database """ fake_program_ids = ( Program.objects .filter(description__startswith=FAKE_PROGRAM_DESC_PREFIX) .values_list('id', flat=True) ) fake_user_ids = ( User.objects .filter(username__startswith=FAKE_USER_USERNAME_PREFIX) .values_list('id', flat=True) ) fake_tier_ids = ( TierProgram.objects .filter(program__id__in=fake_program_ids) .values_list('tier__id', flat=True) ) fake_final_grade_ids = ( FinalGrade.objects .filter(course_run__course__program__id__in=fake_program_ids) .values_list('id', flat=True) ) financial_aid_ids = ( FinancialAid.objects .filter(Q(user_id__in=fake_user_ids) | Q(tier_program__program__id__in=fake_program_ids)) .values_list('id', flat=True) ) fin_aid_audit_models = [FinancialAidAudit, FinancialAidEmailAudit] with mute_signals(post_delete): with remove_delete_protection(*fin_aid_audit_models): for audit_model in fin_aid_audit_models: audit_model.objects.filter(financial_aid__id__in=financial_aid_ids).delete() for model_cls in [CachedEnrollment, CachedCertificate, CachedCurrentGrade]: model_cls.objects.filter(course_run__course__program__id__in=fake_program_ids).delete() Tier.objects.filter(id__in=fake_tier_ids).delete() FinalGrade.objects.filter(id__in=fake_final_grade_ids).delete() Program.objects.filter(id__in=fake_program_ids).delete() User.objects.filter(id__in=fake_user_ids).delete()
def post_delete_scene(sender, instance, *args, **kwargs): """Overwrites post_delete method of Scene Model to delete also the folder fisically in the disk. """ if exists(instance.dir()): rmtree(instance.dir())
def post_delete_image(sender, instance, *args, **kwargs): """Overwrites post_delete method of Image Model to delete also the file fisically in the disk. """ if instance.file_exists(): remove(instance.file_path())
def teardown(self): models_signals.class_prepared.disconnect(self._post_setup) signals = (models_signals.post_save, models_signals.post_delete) def is_method_of_self(receiver): handler = receiver[1]() return ismethod(handler) and (getattr(handler, METHOD_SELF, None) == self) for signal in signals: with signal.lock: signal.receivers = [receiver for receiver in signal.receivers if not is_method_of_self(receiver)] signal.sender_receivers_cache.clear()
def _post_setup(self, sender, **kwargs): model_name = get_model_ct(sender) if model_name in self.INDEXED_MODELS: models_signals.post_save.connect(self._handle_save, sender=sender) models_signals.post_delete.connect(self._handle_delete, sender=sender) if model_name in self.SENDER_MAP: models_signals.post_save.connect(self._handle_related_save, sender=sender) models_signals.post_delete.connect(self._handle_related_delete, sender=sender)
def cache_deleting_key(sender, instance, **kwargs): """ A model's primary key is removed during deletion; this handler will cache the primary key on a model instance, so it is available in the `_pk_cached` attribute in `post_delete` handlers. """ instance._pk_cached = PrimaryKeyCache(instance.pk)
def line_removing(sender, instance, **kwargs): """ Caches study <-> strain associations prior to deletion of a line and/or study so we can remove a study link from ICE if needed during post_delete. """ if check_ice_cannot_proceed(): return instance.pre_delete_study = instance.study linked = Q(line__id=instance.pk) with transaction.atomic(savepoint=False): instance.pre_delete_strain_ids = set( edd_models.Strain.objects.filter(linked).values_list('id', flat=True) )
def image_feature_detection(sender, instance, **kwargs): if getattr(settings, 'WAGTAILIMAGES_FEATURE_DETECTION_ENABLED', False): # Make sure the image doesn't already have a focal point if not instance.has_focal_point(): # Set the focal point instance.set_focal_point(instance.get_suggested_focal_point()) # Receive the post_delete signal and delete the file associated with the model instance.
def delete_large_file(sender, instance, **kwargs): """Call delete on the LargeFile, now that the relation has been removed. If this was the only resource file referencing this LargeFile then it will be delete. This is done using the `post_delete` signal because only then has the relation been removed. """ try: largefile = instance.largefile except LargeFile.DoesNotExist: pass # Nothing to do. else: if largefile is not None: largefile.delete()
def delete_large_object(sender, instance, **kwargs): """Delete the large object when the `LargeFile` is deleted. This is done using the `post_delete` signal instead of overriding delete on `LargeFile`, so it works correctly for both the model and `QuerySet`. """ if instance.content is not None: post_commit_do(delete_large_object_content_later, instance.content)