Python django.db.connection 模块,vendor() 实例源码


项目:django-scrub-pii    作者:MatthewWilkes    | 项目源码 | 文件源码
def get_updates_for_model(klass):
    global salt
    if salt is None:
        salt = uuid.uuid4()

    fields = []
    sensitive_fields = get_sensitive_fields(klass)
    if not sensitive_fields:
        return None
    for field in sensitive_fields:
        field_object = klass._meta.get_field(field)
        value_method = get_value_method(field_object, connection.vendor)
        data = {'table_name': klass._meta.db_table, 'field_name': field, 'salt': salt}
        fields.append({'field_name': field, 'value_method': value_method.format(**data)})
    assignments = map(lambda x: ASSIGNMENT_TEMPLATE.format(**x), fields)
    assignments = ",".join(assignments)
    query = UPDATE_QUERY_TEMPLATE.format(table_name=klass._meta.db_table, assignments=assignments)
    return query
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def activate_pragmas_per_connection(sender, connection, **kwargs):
        Activate SQLite3 PRAGMAs that apply on a per-connection basis. A no-op
        right now, but kept around as infrastructure if we ever want to add
        PRAGMAs in the future.

        if connection.vendor == "sqlite":
            cursor = connection.cursor()

            # Shorten the default WAL autocheckpoint from 1000 pages to 500

            # We don't turn on the following pragmas, because they have negligible
            # performance impact. For reference, here's what we've tested:

            # Don't ensure that the OS has fully flushed
            # our data to disk.
            # cursor.execute("PRAGMA synchronous=OFF;")

            # Store cross-database JOINs in memory.
            # cursor.execute("PRAGMA temp_store=MEMORY;")
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def activate_pragmas_on_start():
        Activate a set of PRAGMAs that apply to the database itself,
        and not on a per connection basis.
        from django.db import connection

        if connection.vendor == "sqlite":
            cursor = connection.cursor()

            # WAL's main advantage allows simultaneous reads
            # and writes (vs. the default exclusive write lock)
            # at the cost of a slight penalty to all reads.
项目:django-route    作者:vinayinvicible    | 项目源码 | 文件源码
def test_caching_enabled(admin_client, router, destination):
    # Only sqlite3 logs a begin query within transaction
    atomic_queries = 1 if connection.vendor == 'sqlite' else 0

    with override_settings(ROUTING_CACHE=True):
        with CaptureQueriesContext(connection=connection) as c:
            response = admin_client.get(router.source, follow=True)
            assert response.status_code == 200
            assert_string_equal(response.content, 'destination')
            first = len(c)
            assert first - atomic_queries == 5
            response = admin_client.get(router.source, follow=True)
            assert response.status_code == 200
            assert_string_equal(response.content, 'destination')
            # Should only query for user and session because of condition
            assert len(c) - first - atomic_queries == 2


        with CaptureQueriesContext(connection=connection) as c:
            response = admin_client.get(router.source, follow=True)
            assert response.status_code == 200
            assert_string_equal(response.content, 'home')
            # Only the router query
            assert len(c) == 1
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def test_average_consumption_by_hour(self, now_mock):
        """ Test whether timezones are converted properly when grouping hours. """
        now_mock.return_value = timezone.make_aware(timezone.datetime(2016, 1, 25, 12))
            # This should be stored with local timezone, so +1.
            hour_start=timezone.make_aware(timezone.datetime(2016, 1, 1, 12)),
        hour_stat =[0]

        # @see "Trends are always shown in UTC #76", only PostgreSQL can fix this.
        if connection.vendor == 'postgresql':
            # Regression, Django defaults to UTC, so '11' here.
            self.assertEqual(hour_stat['hour_start'], 12)
            return self.skipTest('Test cannot be fixed for backends other than PostgreSQL')
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def forwards(self, orm):
        # this is shitty, but we dont care
        if connection.vendor == 'sqlite':

        # ideally we would have done this data migration before this change, but
        # it was an oversight
        if not db.dry_run:
            except Exception as e:
                import traceback; traceback.print_exc()

        # Changing field ''
        db.alter_column('sentry_project', 'team_id','sentry.db.models.fields.FlexibleForeignKey')(to=orm['sentry.Team']))
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:zing    作者:evernote    | 项目源码 | 文件源码
def test_backend_db():
    """Ensure that we are always testing sqlite on fast in memory DB"""
    from django.db import connection, connections

    if connection.vendor == "sqlite":
        assert connections.databases["default"]["NAME"] == ":memory:"
项目:healthchecks_asgards    作者:andela    | 项目源码 | 文件源码
def handle(self, *args, **options):
        with connection.cursor() as cursor:
            if connection.vendor == "postgresql":
                return "Created PostgreSQL trigger"
            if connection.vendor == "mysql":
                return "Created MySQL trigger"
            if connection.vendor == "sqlite":
                return "Created SQLite trigger"
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def get_queryset(self):
        channel_id = self.kwargs['channel_id']
        attempted_mastery_logs = MasteryLog.objects.filter(attemptlogs__isnull=False)
        query_node = ContentNode.objects.get(pk=self.kwargs['content_node_id'])
        if self.request.query_params.get('last_active_time'):
            # Last active time specified
            datetime_cutoff = parse(self.request.query_params.get('last_active_time'))
            datetime_cutoff = - datetime.timedelta(7)
        # Set on the kwargs to pass into the serializer
        self.kwargs['last_active_time'] = datetime_cutoff.isoformat()
        recent_content_items = ContentSummaryLog.objects.filter_by_topic(query_node).filter(
            Q(progress__gt=0) | Q(masterylogs__in=attempted_mastery_logs),
            user__in=list(get_members_or_user(self.kwargs['collection_kind'], self.kwargs['collection_id'])),
            end_timestamp__gte=datetime_cutoff).values_list('content_id', flat=True)
        if connection.vendor == 'postgresql':
            pks_with_unique_content_ids = ContentNode.objects.order_by('content_id').distinct('content_id').filter(
                channel_id=channel_id, content_id__in=recent_content_items).values_list('pk', flat=True)
            # note from rtibbles:
            # As good as either I or jamalex could come up with to ensure that we only return
            # unique content_id'ed ContentNodes from the coach recent report endpoint.
            # Would have loved to use distinct('content_id'), but unfortunately DISTINCT ON is Postgresql only
            pks_with_unique_content_ids = ContentNode.objects.filter(
                channel_id=channel_id, content_id__in=recent_content_items).values('content_id').order_by('lft').annotate(
                pk=Min('pk')).values_list('pk', flat=True)
        return ContentNode.objects.filter(pk__in=pks_with_unique_content_ids).order_by('lft')
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:django-clubhouse    作者:chazmead    | 项目源码 | 文件源码
def get_queryset(self):
        qs = super(AutocompleteLookup, self).get_queryset()
        qs = self.get_filtered_queryset(qs)
        qs = self.get_searched_queryset(qs)
        if connection.vendor == 'postgresql':
            ordering = list(self.model._meta.ordering)
            distinct_columns = [o.lstrip('-') for o in ordering] + []
            return qs.order_by(*ordering).distinct(*distinct_columns)
            return qs.distinct()
项目:django-jsonfield-compat    作者:kbussell    | 项目源码 | 文件源码
def is_db_postgresql():
    return connection.vendor == 'postgresql'
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:wagtail-translation    作者:skirsdeda    | 项目源码 | 文件源码
def _update_descendant_lang_url_paths(self, old_page):
    cursor = connection.cursor()
    if connection.vendor == 'sqlite':
        field_update_fmt = "{0} = %s || substr({0}, %s)"
    elif connection.vendor == 'mysql':
        field_update_fmt = "{0} = CONCAT(%s, substring({0}, %s))"
    elif connection.vendor in ('mssql', 'microsoft'):
        field_update_fmt = "{0} = CONCAT(%s, (SUBSTRING({0}, 0, %s)))"
        field_update_fmt = "{0} = %s || substring({0} from %s)"

    exec_args = []
    update_fields_sql = []
    for lang_code in mt_settings.AVAILABLE_LANGUAGES:
        url_path_attr = build_localized_fieldname('url_path', lang_code)
        new_url_path = getattr(self, url_path_attr)
        old_url_path = getattr(old_page, url_path_attr)
        if new_url_path != old_url_path:
            exec_args.append(len(old_url_path) + 1)

    if not update_fields_sql:
        # in case page was moved but parent did not change
        # nothing has to be updated

    update_sql = """
    UPDATE wagtailcore_page
    SET {} WHERE path LIKE %s AND id <> %s
    exec_args.append(self.path + '%')
    cursor.execute(update_sql, exec_args)
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def ready(self):
        """ Performs an DB engine check, as we maintain some engine specific queries. """
        if (connection.vendor not in settings.DSMRREADER_SUPPORTED_DB_VENDORS):  # pragma: no cover
            # Temporary for backwards compatibility
                    'Unsupported database engine "{}" active, '
                    'some features might not work properly'.format(connection.vendor)
                RuntimeWarning, __file__, 0
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def average_consumption_by_hour(max_weeks_ago):
    """ Calculates the average consumption by hour. Measured over all consumption data of the past X months. """
    sql_extra = {
        # Ugly engine check, but still beter than iterating over a hundred thousand items in code.
        'postgresql': "date_part('hour', hour_start)",
        'sqlite': "strftime('%H', hour_start)",
        'mysql': "extract(hour from hour_start)",

    # Only PostgreSQL supports this builtin.
    set_time_zone_sql = connection.ops.set_time_zone_sql()

    if set_time_zone_sql:
        connection.connection.cursor().execute(set_time_zone_sql, [settings.TIME_ZONE])  # pragma: no cover

    hour_statistics = HourStatistics.objects.filter(
        # This greatly helps reducing the queryset size, but also makes it more relevant. - timezone.timedelta(weeks=max_weeks_ago)
        'hour_start': sql_extra
        avg_electricity_merged=Avg(models.F('electricity1') + models.F('electricity2')),
        avg_electricity_returned_merged=Avg(models.F('electricity1_returned') + models.F('electricity2_returned')),
    # Force evaluation, as we want to reset timezone in cursor below.
    hour_statistics = list(hour_statistics)

    if set_time_zone_sql:
        # Prevents "database connection isn't set to UTC" error.
        connection.connection.cursor().execute(set_time_zone_sql, ['UTC'])  # pragma: no cover

    return hour_statistics
项目:mes    作者:osess    | 项目源码 | 文件源码
def get_deleted(self, model_class, db=None, model_db=None):
        Returns all the deleted versions for the given model class.

        The results are returned with the most recent versions first.
        db = db or DEFAULT_DB_ALIAS
        model_db = model_db or db
        content_type = ContentType.objects.db_manager(db).get_for_model(model_class)
        live_pk_queryset = model_class._default_manager.db_manager(model_db).all().values_list("pk", flat=True)
        versioned_objs = self._get_versions(db).filter(
            content_type = content_type,
        if has_int_pk(model_class):
            # If the model and version data are in different databases, decouple the queries.
            if model_db != db:
                live_pk_queryset = list(live_pk_queryset.iterator())
            # We can do this as a fast, in-database join.
            deleted_version_pks = versioned_objs.exclude(
                object_id_int__in = live_pk_queryset
            # This join has to be done as two separate queries.
            deleted_version_pks = versioned_objs.exclude(
                object_id__in = list(live_pk_queryset.iterator())
        deleted_version_pks = deleted_version_pks.exclude(
            type = VERSION_DELETE,
            latest_pk = Max("pk")
        ).values_list("latest_pk", flat=True)
        # HACK: MySQL deals extremely badly with this as a subquery, and can hang infinitely.
        # TODO: If a version is identified where this bug no longer applies, we can add a version specifier.
        if connection.vendor == "mysql":
            deleted_version_pks = list(deleted_version_pks)
        # Return the deleted versions!
        return self._get_versions(db).filter(pk__in=deleted_version_pks).order_by("-pk")

    # Signal receivers.
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def can_migrate(self, connection):
        Return True if the model can/should be migrated on the `connection`.
        `connection` can be either a real connection or a connection alias.
        if self.proxy or self.swapped or not self.managed:
            return False
        if isinstance(connection, six.string_types):
            connection = connections[connection]
        if self.required_db_vendor:
            return self.required_db_vendor == connection.vendor
        if self.required_db_features:
            return all(getattr(connection.features, feat, False)
                       for feat in self.required_db_features)
        return True
项目:zing    作者:evernote    | 项目源码 | 文件源码
def set_project_resource(request, path_obj, dir_path, filename):
    """Loads :cls:`pootle_app.models.Directory` and
    :cls:`pootle_store.models.Store` models and populates the
    request object.

    This is the same as `set_resource` but operates at the project level
    across all languages.

    :param path_obj: A :cls:`pootle_project.models.Project` object.
    :param dir_path: Path relative to the root of `path_obj`.
    :param filename: Optional filename.
    query_ctx_path = ''.join(['/%/', path_obj.code, '/'])
    query_pootle_path = query_ctx_path + dir_path

    obj_directory = getattr(path_obj, 'directory', path_obj)
    ctx_path = obj_directory.pootle_path
    resource_path = dir_path
    pootle_path = ctx_path + dir_path

    # List of TP paths available for user
    user_tps = TranslationProject.objects.for_user(request.user)
    user_tps = user_tps.filter(
    ).values_list('pootle_path', flat=True)
    user_tps_regex = '^%s' % u'|'.join(list(user_tps))
    sql_regex = 'REGEXP'
    if connection.vendor == 'postgresql':
        sql_regex = '~'

    if filename:
        query_pootle_path = query_pootle_path + filename
        pootle_path = pootle_path + filename
        resource_path = resource_path + filename

        resources =
                'pootle_store_store.pootle_path LIKE %s',
                'pootle_store_store.pootle_path ' + sql_regex + ' %s',
            ], params=[query_pootle_path, user_tps_regex]
        resources =
                'pootle_app_directory.pootle_path LIKE %s',
                'pootle_app_directory.pootle_path ' + sql_regex + ' %s',
            ], params=[query_pootle_path, user_tps_regex]

    if not resources.exists():
        raise Http404 = None = None
    request.pootle_path = pootle_path

    request.resource_obj = ProjectResource(resources, pootle_path)
    request.resource_path = resource_path
    request.ctx_obj = path_obj or request.resource_obj
    request.ctx_path = ctx_path
项目:dsmr-reader    作者:dennissiemensma    | 项目源码 | 文件源码
def create():
    """ Creates a backup of the database. Optionally gzipped. """
    # Backup file with day name included, for weekly rotation.
    backup_file = os.path.join(get_backup_directory(), 'dsmrreader-{}-backup-{}.sql'.format(
        connection.vendor, formats.date_format(, 'l')

    # PostgreSQL backup.
    if connection.vendor == 'postgresql':  # pragma: no cover
        backup_process = subprocess.Popen(
            ], env={
                'PGPASSWORD': settings.DATABASES['default']['PASSWORD']
            stdout=open(backup_file, 'w')  # pragma: no cover
    # MySQL backup.
    elif connection.vendor == 'mysql':  # pragma: no cover
        backup_process = subprocess.Popen(
                '--host', settings.DATABASES['default']['HOST'],
                '--user', settings.DATABASES['default']['USER'],
            stdout=open(backup_file, 'w')  # pragma: no cover
    # SQLite backup.
    elif connection.vendor == 'sqlite':  # pragma: no cover
        backup_process = subprocess.Popen(
            stdout=open(backup_file, 'w')
        )   # pragma: no cover
        raise NotImplementedError('Unsupported backup backend: {}'.format(connection.vendor))  # pragma: no cover

    backup_settings = BackupSettings.get_solo()

    if backup_settings.compress:

    backup_settings.latest_backup =