Python django 模块,db() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db()

项目:maas    作者:maas    | 项目源码 | 文件源码
def _ensureConnection(self):
        # If connection is already made close it.
        from django.db import connection
        if connection.connection is not None:
            connection.close()

        # Loop forever until a connection can be made.
        while True:
            try:
                connection.ensure_connection()
            except Exception:
                log.err(_why=(
                    "Error starting: "
                    "Connection to database cannot be established."))
                time.sleep(1)
            else:
                # Connection made, now close it.
                connection.close()
                break
项目:SuperPACs    作者:SpencerNorris    | 项目源码 | 文件源码
def uploadDonations(self,donation_list):
        print("database congress size:",len(Representative.objects.all()))
        for donation in donation_list:
            donation_dict = {}

            rep = Representative.objects.get(propublicaid=donation["propublica_candidate_id"])
            sup = SuperPAC.objects.get(fecid=donation["committee_id"])

            donation_dict["representative_id"] = rep.id
            donation_dict["superpac_id"] = sup.id
            donation_dict["amount"] = donation["amount"]
            donation_dict["uid"] = donation["unique_id"]
            donation_dict["support"] = donation["support_or_oppose"]

            ##Simple try catch block to avoid duplicate donation problems
            with transaction.atomic():
                ##Django 1.5/1.6 transaction bug requires above check
                try:
                    Donation.objects.create(**donation_dict)
                except django.db.utils.IntegrityError:
                    pass
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def test_undefined(self):
        from django.db import models
        from django.db.models import fields

        class UndefinedClass(models.Model):
            pass

        alias = adapter.DjangoClassAlias(UndefinedClass, None)

        x = UndefinedClass()

        alias.applyAttributes(x, {
            'id': pyamf.Undefined
        })

        self.assertEqual(x.id, fields.NOT_PROVIDED)

        x.id = fields.NOT_PROVIDED

        attrs = alias.getEncodableAttributes(x)
        self.assertEqual(attrs, {'id': pyamf.Undefined})
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def test_non_field_prop(self):
        from django.db import models

        class Book(models.Model):
            def _get_number_of_odd_pages(self):
                return 234

            # note the lack of a setter callable ..
            numberOfOddPages = property(_get_number_of_odd_pages)

        alias = adapter.DjangoClassAlias(Book, 'Book')

        x = Book()

        self.assertEqual(
            alias.getEncodableAttributes(x),
            {'numberOfOddPages': 234, 'id': None}
        )

        # now we test sending the numberOfOddPages attribute
        alias.applyAttributes(x, {'numberOfOddPages': 24, 'id': None})

        # test it hasn't been set
        self.assertEqual(x.numberOfOddPages, 234)
项目:blog_django    作者:chnpmy    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:dream_blog    作者:fanlion    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:MxOnline    作者:myTeemo    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:djangoblog    作者:liuhuipy    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:xadmin-markdown-editor    作者:bluenknight    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:django-forcedfields    作者:monotonee    | 项目源码 | 文件源码
def test_db_type(self):
        """
        Test simple output of the field's overridden "db_type" method.

        """
        for test_config in test_utils.FC_TEST_CONFIGS:
            field_kwarg_string = test_utils.create_dict_string(test_config.kwargs_dict)
            for db_alias in test_utils.get_db_aliases():
                db_connection = django.db.connections[db_alias]
                db_backend = db_connection.settings_dict['ENGINE']
                with self.subTest(backend=db_backend, kwargs=field_kwarg_string):
                    field = forcedfields.FixedCharField(**test_config.kwargs_dict)
                    returned_db_type = field.db_type(db_connection)
                    expected_db_type = test_config.db_type_dict[db_alias]

                    self.assertEqual(returned_db_type, expected_db_type)
项目:django-forcedfields    作者:monotonee    | 项目源码 | 文件源码
def test_insert(self):
        """
        Test that insert operations produce expected results.

        """
        for test_config in test_utils.FC_TEST_CONFIGS:
            kwargs_string = test_utils.create_dict_string(test_config.kwargs_dict)
            model_class_name = test_utils.get_fc_model_class_name(**test_config.kwargs_dict)
            model_class = getattr(test_models, model_class_name)
            for insert_value, expected_value in test_config.insert_values_dict.items():
                for db_alias in test_utils.get_db_aliases():
                    db_backend = django.db.connections[db_alias].settings_dict['ENGINE']
                    with self.subTest(
                        backend=db_backend,
                        kwargs=kwargs_string,
                        insert_value=insert_value
                    ):
                        self._test_insert_dict(
                            db_alias,
                            model_class,
                            test_utils.FC_FIELD_ATTRNAME,
                            insert_value,
                            expected_value
                        )
项目:django-forcedfields    作者:monotonee    | 项目源码 | 文件源码
def test_insert(self):
        """
        Test that the values saved during INSERT operations are correct.

        """
        for test_config in test_utils.TS_TEST_CONFIGS:
            kwargs_string = test_utils.create_dict_string(test_config.kwargs_dict)
            model_class_name = test_utils.get_ts_model_class_name(**test_config.kwargs_dict)
            model_class = getattr(test_models, model_class_name)
            for insert_value, expected_value in test_config.insert_values_dict.items():
                for db_alias in test_utils.get_db_aliases():
                    db_backend = django.db.connections[db_alias].settings_dict['ENGINE']
                    with self.subTest(
                        backend=db_backend,
                        kwargs=kwargs_string,
                        insert_value=insert_value
                    ):
                        self._test_insert_dict(
                            db_alias,
                            model_class,
                            test_utils.TS_FIELD_ATTRNAME,
                            insert_value,
                            expected_value
                        )
项目:django-forcedfields    作者:monotonee    | 项目源码 | 文件源码
def test_update(self):
        """
        Test that an UPDATE statement works correctly in specific cases.

        Test that the timestamp field value is unchanged when only auto_now_add is enabled and test
        that the timestamp field is automatically updated when only auto_now_update is enabled.

        Unfortunately, this test is impossible to isolate from any side effects of a broken INSERT
        operation in the timestamp field.

        """
        for alias in test_utils.get_db_aliases():
            engine = django.db.connections[alias].settings_dict['ENGINE']
            with self.subTest(backend=engine):
                self._test_update_no_auto(alias)
                self._test_update_auto(alias)
项目:eduDjango    作者:yuzhou6    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:Django-IMOOC-Shop    作者:LBruse    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:StudyOnline    作者:yipwinghong    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def test_base_manager(self):
        def show_base_manager(model):
            return "{0} {1}".format(
                repr(type(model._base_manager)),
                repr(model._base_manager.model)
            )

        self.assertEqual(show_base_manager(PlainA), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>")
        self.assertEqual(show_base_manager(PlainB), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>")
        self.assertEqual(show_base_manager(PlainC), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>")

        self.assertEqual(show_base_manager(Model2A), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>")
        self.assertEqual(show_base_manager(Model2B), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2B'>")
        self.assertEqual(show_base_manager(Model2C), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2C'>")

        self.assertEqual(show_base_manager(One2OneRelatingModel), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.One2OneRelatingModel'>")
        self.assertEqual(show_base_manager(One2OneRelatingModelDerived), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.One2OneRelatingModelDerived'>")
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def test_instance_default_manager(self):
        def show_default_manager(instance):
            return "{0} {1}".format(
                repr(type(instance._default_manager)),
                repr(instance._default_manager.model)
            )

        plain_a = PlainA(field1='C1')
        plain_b = PlainB(field2='C1')
        plain_c = PlainC(field3='C1')

        model_2a = Model2A(field1='C1')
        model_2b = Model2B(field2='C1')
        model_2c = Model2C(field3='C1')

        self.assertEqual(show_default_manager(plain_a), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>")
        self.assertEqual(show_default_manager(plain_b), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>")
        self.assertEqual(show_default_manager(plain_c), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>")

        self.assertEqual(show_default_manager(model_2a), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>")
        self.assertEqual(show_default_manager(model_2b), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2B'>")
        self.assertEqual(show_default_manager(model_2c), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2C'>")
项目:xadmin_python3    作者:mahongquan    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def django_debug_cleanup():
    """Resets Django's list of logged queries.

    When DJANGO_DEBUG is set to true, Django will log all generated SQL queries
    in a list, which grows indefinitely.  This is ok for short-lived processes;
    not so much for daemons.  We may want those queries in the short-term, but
    in the long-term the ever-growing list is uninteresting and also bad.

    This should be called once-in-a-while from every thread that has Django
    database access, as the queries list is stored in thread-local data.

    """
    query_count = len(django.db.connection.queries)
    if query_count:
        runtime = sum_django_queries_runtime()
        thread= threading.current_thread()
        _logger.debug("Thread %s/%s: Removing %d logged Django queries "
                      "(total time %.03f):\n%s",
                      thread.ident, thread.name,
                      query_count, runtime,
                      pformat(django.db.connection.queries))
        django.db.reset_queries()
        gc.collect()
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def reset_connection_on_interface_error(func):
    """Decorates function to reset the current thread's Django database
    connection on exceptions that appear to come from connection resets.

    """
    def _reset(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except (InterfaceError, OperationalError,
                DjangoInterfaceError, DjangoOperationalError) as error:
            thread = threading.current_thread()
            _logger.warning("it appears this thread's database connection was "
                            "dropped, resetting it now - you may see further "
                            "errors about this until the situation is fully "
                            "resolved for all threads "
                            "(this thread is '%s', error was '%s')",
                            thread.name, error)
            django.db.connection.connection = None
            raise ResetDBConnectionError("The database connection was reset",
                                         error)

    return wraps(func)(_reset)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def purge_old_job_log_entries():
    """
    Purges old job log entries from the ipdevpoll_job_log db table
    """
    cursor = django.db.connection.cursor()
    # Delete all but the last 100 entries of each netbox/job_name combination,
    # ordered by timestamp
    cursor.execute(
        """
        WITH ranked AS (SELECT id, rank()
                        OVER (PARTITION BY netboxid, job_name
                              ORDER BY end_time DESC)
                        FROM ipdevpoll_job_log)
        DELETE FROM ipdevpoll_job_log USING ranked
              WHERE ipdevpoll_job_log.id = ranked.id AND rank > 100;
        """)
项目:Django-shop    作者:poetries    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def validate_folder(self):
        """Validates whether a folder can be created.
        Performs two types of validation:
        1. Checks if a DB entry is present.
        2. Checks if a physical folder exists in the system."""

        unicoded_title = "".join((i if ord(i) < 128 else '_') for i in unidecode(self.title))
        parent_folder = self.folder

        if parent_folder:
            if ImageFolder.objects.filter(folder=parent_folder, title=self.title).count() > 0:
                raise ValidationError("Folder exists in the DB!", code='db')
            folder_path = os.path.join(settings.MEDIA_ROOT, parent_folder.path, unicoded_title)
            if os.path.isdir(folder_path):
                raise ValidationError("Folder exists in the OS!", code='os')
        else:
            if ImageFolder.objects.filter(folder__isnull=True, title=self.title).count() > 0:
                raise ValidationError("Folder exists in the DB!", code='db')
            folder_path = os.path.join(settings.MEDIA_ROOT, IMAGES_FOLDER_NAME, unicoded_title)
            if os.path.isdir(folder_path):
                raise ValidationError("Folder exists in the OS!", code='os')
项目:MoocOnline    作者:My-captain    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:wger-lycan-clan    作者:andela    | 项目源码 | 文件源码
def database_exists():
    """Detect if the database exists"""

    # can't be imported in global scope as they already require
    # the settings module during import
    from django.db import DatabaseError
    from django.core.exceptions import ImproperlyConfigured
    from wger.manager.models import User

    try:
        # TODO: Use another model, the User could be deactivated
        User.objects.count()
    except DatabaseError:
        return False
    except ImproperlyConfigured:
        print("Your settings file seems broken")
        sys.exit(0)
    else:
        return True
项目:followme    作者:wzqnls    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:mxonline    作者:huwei86    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:finem_imperii    作者:jardiacaj    | 项目源码 | 文件源码
def safe_move(battle_contubernium_in_turn: BattleContuberniumInTurn, target_distance_function):
    turn = battle_contubernium_in_turn.battle_turn

    def tile_availability_test(coords: Coordinates):
        return True if turn.get_contubernium_in_position(coords) is None else False

    if target_distance_function(battle_contubernium_in_turn.coordinates()) > 0:
        path = find_path(battle_contubernium_in_turn, target_distance_function, tile_availability_test)
        if len(path) > 1:
            battle_contubernium_in_turn.moved_this_turn = True
            battle_contubernium_in_turn.x_pos = path[1].x
            battle_contubernium_in_turn.z_pos = path[1].z
            #TODO WARNING: HORRIBLE HACK STARTS HERE
            #(to avoid unique constraint errors when contubs overlap for some reason)
            with transaction.atomic():
                try:
                    battle_contubernium_in_turn.save()
                except django.db.utils.IntegrityError as e:
                    pass
项目:mes    作者:osess    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field_by_name(piece)[0])
    return fields
项目:Charlotte    作者:LiZoRN    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:imooc-django    作者:zaxlct    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:django-postgres-extra    作者:SectorLabs    | 项目源码 | 文件源码
def update(self, **fields):
        """Updates all rows that match the filter."""

        # build up the query to execute
        self._for_write = True
        query = self.query.clone(UpdateQuery)
        query._annotations = None
        query.add_update_values(fields)

        # build the compiler for for the query
        connection = django.db.connections[self.db]
        compiler = PostgresReturningUpdateCompiler(query, connection, self.db)

        # execute the query
        with transaction.atomic(using=self.db, savepoint=False):
            rows = compiler.execute_sql(CURSOR)
        self._result_cache = None

        # send out a signal for each row
        for row in rows:
            signals.update.send(self.model, pk=row[0])

        # the original update(..) returns the amount of rows
        # affected, let's do the same
        return len(rows)
项目:django-postgres-extra    作者:SectorLabs    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        """Initializes a new instance of :see:PostgresManager."""

        super(PostgresManager, self).__init__(*args, **kwargs)

        # make sure our back-end is set and refuse to proceed
        # if it's not set
        db_backend = settings.DATABASES['default']['ENGINE']
        if 'psqlextra' not in db_backend:
            raise ImproperlyConfigured((
                '\'%s\' is not the \'psqlextra.backend\'. '
                'django-postgres-extra cannot function without '
                'the \'psqlextra.backend\'. Set DATABASES.ENGINE.'
            ) % db_backend)

        # hook into django signals to then trigger our own

        django.db.models.signals.post_save.connect(
            self._on_model_save, sender=self.model, weak=False)

        django.db.models.signals.pre_delete.connect(
            self._on_model_delete, sender=self.model, weak=False)

        self._signals_connected = True
项目:django-simple-api    作者:Myagi    | 项目源码 | 文件源码
def to_fields(qs, fieldnames):
    for fieldname in fieldnames:
        model = qs.model
        for fieldname_part in fieldname.split('__'):
            try:
                field = model._meta.get_field(fieldname_part)
            except django.db.models.fields.FieldDoesNotExist:
                rels = model._meta.get_all_related_objects_with_model()
                for relobj, _ in rels:
                    if relobj.get_accessor_name() == fieldname_part:
                        field = relobj.field
                        model = field.model
                        break
            else:
                if (hasattr(field, "one_to_many") and field.one_to_many) or (hasattr(field, "one_to_one") and field.one_to_one):
                    model = field.related_model
                elif field.get_internal_type() in ('ForeignKey', 'OneToOneField', 'ManyToManyField'):
                    model = field.rel.to
        yield field
项目:muxueonline    作者:124608760    | 项目源码 | 文件源码
def get_fields_from_path(model, path):
    """ Return list of Fields given path relative to model.

    e.g. (ModelX, "user__groups__name") -> [
        <django.db.models.fields.related.ForeignKey object at 0x...>,
        <django.db.models.fields.related.ManyToManyField object at 0x...>,
        <django.db.models.fields.CharField object at 0x...>,
    ]
    """
    pieces = path.split(LOOKUP_SEP)
    fields = []
    for piece in pieces:
        if fields:
            parent = get_model_from_relation(fields[-1])
        else:
            parent = model
        fields.append(parent._meta.get_field(piece))
    return fields
项目:SuperPACs    作者:SpencerNorris    | 项目源码 | 文件源码
def uploadRepresentatives(self,congress_list):
        for congressman in congress_list["house"]['results'][0]['members']:
            congress_dict = {} #personal details
            congress_dict["propublicaid"] = congressman['id']
            congress_dict["first_name"] = congressman['first_name']
            congress_dict["last_name"] = congressman['last_name']
            #office details
            congress_dict["district"] = congressman['district']
            congress_dict["state"] = congressman['state']
            congress_dict["party"] = congressman['party']
            congress_dict["chamber"] = "H"

            ##Simple try catch block to avoid duplicate congressman problems
            with transaction.atomic():
                ##Django 1.5/1.6 transaction bug requires above check
                try:
                    Representative.objects.create(**congress_dict)
                except django.db.utils.IntegrityError:
                    pass

        for senator in congress_list["senate"]['results'][0]['members']:
            senator_dict = {}#personal details
            senator_dict["propublicaid"] = senator['id']
            senator_dict["first_name"] = senator['first_name']
            senator_dict["last_name"] = senator['last_name']
            #office details
            senator_dict["state"] = senator['state']
            senator_dict["party"] = senator['party']
            senator_dict["chamber"] = "S"

            ##Simple try catch block to avoid duplicate senator problems
            with transaction.atomic():
                ##Django 1.5/1.6 transaction bug requires above check
                try:
                    Representative.objects.create(**senator_dict)
                except django.db.utils.IntegrityError:
                    pass
项目:SuperPACs    作者:SpencerNorris    | 项目源码 | 文件源码
def uploadSuperPACs(self,superpac_list):
        for superpac in superpac_list:
            superpac_dict = {}
            superpac_dict["name"]=superpac["name"]
            superpac_dict["fecid"]=superpac["committee_id"]

            ##Simple try catch block to avoid duplicate superpac problems
            with transaction.atomic():
                ##Django 1.5/1.6 transaction bug requires above check
                try:
                    SuperPAC.objects.create(**superpac_dict)
                except django.db.utils.IntegrityError:
                    pass
项目:dj-paypal    作者:HearthSim    | 项目源码 | 文件源码
def check_migrations():
    from django.db.migrations.autodetector import MigrationAutodetector
    from django.db.migrations.executor import MigrationExecutor
    from django.db.migrations.state import ProjectState

    changed = set()

    print("Checking {} migrations...".format(APP_NAME))
    for db in settings.DATABASES.keys():
        try:
            executor = MigrationExecutor(connections[db])
        except OperationalError as e:
            sys.exit(
                "Unable to check migrations due to database: {}".format(e)
            )

        autodetector = MigrationAutodetector(
            executor.loader.project_state(),
            ProjectState.from_apps(apps),
        )

        changed.update(
            autodetector.changes(graph=executor.loader.graph).keys()
        )

    if changed and APP_NAME in changed:
        sys.exit(
            "A migration file is missing. Please run "
            "`python makemigrations.py` to generate it."
        )
    else:
        print("All migration files present.")
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def test_NOT_PROVIDED(self):
        from django.db.models import fields

        self.assertEqual(
            pyamf.encode(fields.NOT_PROVIDED, encoding=pyamf.AMF0).getvalue(),
            '\x06'
        )

        encoder = pyamf.get_encoder(pyamf.AMF3)
        encoder.writeElement(fields.NOT_PROVIDED)
        self.assertEqual(encoder.stream.getvalue(), '\x00')
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def test_properties(self):
        """
        See #764
        """
        from django.db import models

        class Foob(models.Model):
            def _get_days(self):
                return 1

            def _set_days(self, val):
                assert 1 == val

            days = property(_get_days, _set_days)

        alias = adapter.DjangoClassAlias(Foob, 'Bar')

        x = Foob()

        self.assertEqual(x.days, 1)

        self.assertEqual(
            alias.getEncodableAttributes(x),
            {'days': 1, 'id': None}
        )

        # now we test sending the numberOfOddPages attribute
        alias.applyAttributes(x, {'id': None})
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_migrations(self):
        """
        Print a warning if the set of migrations on disk don't match the
        migrations in the database.
        """
        from django.db.migrations.executor import MigrationExecutor
        try:
            executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
        except ImproperlyConfigured:
            # No databases are configured (or the dummy one)
            return
        except MigrationSchemaMissing:
            self.stdout.write(self.style.NOTICE(
                "\nNot checking migrations as it is not possible to access/create the django_migrations table."
            ))
            return

        plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
        if plan:
            apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan))
            self.stdout.write(
                self.style.NOTICE(
                    "\nYou have %(unpplied_migration_count)s unapplied migration(s). "
                    "Your project may not work properly until you apply the "
                    "migrations for app(s): %(apps_waiting_migration)s." % {
                        "unpplied_migration_count": len(plan),
                        "apps_waiting_migration": ", ".join(apps_waiting_migration),
                    }
                )
            )
            self.stdout.write(self.style.NOTICE("Run 'python manage.py migrate' to apply them.\n"))
项目:django-modeltrans    作者:zostera    | 项目源码 | 文件源码
def test_order_by_lower(self):
        from django.db.models.functions import Lower

        c = Category.objects.create(name='test')
        Blog.objects.create(title='A', title_nl='c', category=c)
        Blog.objects.create(title='a', title_nl='b', category=c)

        filtered = Blog.objects.filter(category=c)

        # order by title should result in aA because it is case sensitive.
        qs = filtered.order_by('title', 'title_nl')
        self.assertEquals(key(qs, 'title'), ['a', 'A'])

        # order by Lower('title') should result in Aa because lower('A') == lower('A')
        # so the title_nl field should determine the sorting
        qs = filtered.order_by(Lower('title'), 'title_nl')
        self.assertEquals(key(qs, 'title'), ['a', 'A'])

        # applying lower to title_nl should not matter since it is not the same letter
        qs = filtered.order_by(Lower('title_nl'))
        self.assertEquals(key(qs, 'title'), ['a', 'A'])

        # should be the same as previous
        with override('nl'):
            qs = filtered.order_by(Lower('title_i18n'))
            self.assertEquals(key(qs, 'title'), ['a', 'A'])
项目:django-modeltrans    作者:zostera    | 项目源码 | 文件源码
def test_values_kwarg_lower(self):
        from django.db.models.functions import Lower

        qs1 = Blog.objects.values(lower_name=Lower('category__name'))
        qs2 = Blog.objects.values(lower_name=Lower('category__name_en'))
        self.assertEquals(list(qs1), list(qs2))
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_migrations(self):
        """
        Print a warning if the set of migrations on disk don't match the
        migrations in the database.
        """
        from django.db.migrations.executor import MigrationExecutor
        try:
            executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
        except ImproperlyConfigured:
            # No databases are configured (or the dummy one)
            return
        except MigrationSchemaMissing:
            self.stdout.write(self.style.NOTICE(
                "\nNot checking migrations as it is not possible to access/create the django_migrations table."
            ))
            return

        plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
        if plan:
            apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan))
            self.stdout.write(
                self.style.NOTICE(
                    "\nYou have %(unpplied_migration_count)s unapplied migration(s). "
                    "Your project may not work properly until you apply the "
                    "migrations for app(s): %(apps_waiting_migration)s." % {
                        "unpplied_migration_count": len(plan),
                        "apps_waiting_migration": ", ".join(apps_waiting_migration),
                    }
                )
            )
            self.stdout.write(self.style.NOTICE("Run 'python manage.py migrate' to apply them.\n"))
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def check_migrations(self):
        """
        Print a warning if the set of migrations on disk don't match the
        migrations in the database.
        """
        from django.db.migrations.executor import MigrationExecutor
        try:
            executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
        except ImproperlyConfigured:
            # No databases are configured (or the dummy one)
            return
        except MigrationSchemaMissing:
            self.stdout.write(self.style.NOTICE(
                "\nNot checking migrations as it is not possible to access/create the django_migrations table."
            ))
            return

        plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
        if plan:
            apps_waiting_migration = sorted(set(migration.app_label for migration, backwards in plan))
            self.stdout.write(
                self.style.NOTICE(
                    "\nYou have %(unpplied_migration_count)s unapplied migration(s). "
                    "Your project may not work properly until you apply the "
                    "migrations for app(s): %(apps_waiting_migration)s." % {
                        "unpplied_migration_count": len(plan),
                        "apps_waiting_migration": ", ".join(apps_waiting_migration),
                    }
                )
            )
            self.stdout.write(self.style.NOTICE("Run 'python manage.py migrate' to apply them.\n"))
项目:django-cte-forest    作者:matthiask    | 项目源码 | 文件源码
def aggregate(self, *args, **kwargs):
        """
        Returns a dictionary containing the calculations (aggregation)
        over the current queryset

        If args is present the expression is passed as a kwarg using
        the Aggregate object's default alias.
        """
        if self.query.distinct_fields:
            raise NotImplementedError("aggregate() + distinct(fields) not implemented.")
        for arg in args:
            # The default_alias property may raise a TypeError, so we use
            # a try/except construct rather than hasattr in order to remain
            # consistent between PY2 and PY3 (hasattr would swallow
            # the TypeError on PY2).
            try:
                arg.default_alias
            except (AttributeError, TypeError):
                raise TypeError("Complex aggregates require an alias")
            kwargs[arg.default_alias] = arg

        if django.VERSION < (2, 0):
            query = self.query.clone(CTEAggregateQuery)
        else:
            query = self.query.chain(CTEAggregateQuery)
        for (alias, aggregate_expr) in kwargs.items():
            query.add_annotation(aggregate_expr, alias, is_summary=True)
            if not query.annotations[alias].contains_aggregate:
                raise TypeError("%s is not an aggregate expression" % alias)
        return query.get_aggregation(self.db, kwargs.keys())
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def distinct(queryset, base):
    if settings.DATABASES[queryset.db]["ENGINE"] == "django.db.backends.oracle":
        # distinct analogue for Oracle users
        return base.filter(pk__in=set(queryset.values_list('pk', flat=True)))
    return queryset.distinct()


# Obtaining manager instances and names from model options differs after 1.10.
项目:django-seven    作者:iwoca    | 项目源码 | 文件源码
def managed_transaction(func):
        """ This decorator wraps a function so that all sql executions in the function are atomic

            It's used instead of django.db.transaction.commit_on_success in cases where reporting exceptions is necessary
            as commit_on_success swallows exceptions
        """
        @wraps(func)
        @transaction.commit_manually
        def _inner(*args, **kwargs):
            try:
                ret = func(*args, **kwargs)
            except Exception:
                transaction.rollback()
                raise
            else:
                transaction.commit()
                return ret

        return _inner