我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用django.db.connection.vendor()。
def get_updates_for_model(klass): global salt if salt is None: salt = uuid.uuid4() fields = [] sensitive_fields = get_sensitive_fields(klass) if not sensitive_fields: return None for field in sensitive_fields: field_object = klass._meta.get_field(field) value_method = get_value_method(field_object, connection.vendor) data = {'table_name': klass._meta.db_table, 'field_name': field, 'salt': salt} fields.append({'field_name': field, 'value_method': value_method.format(**data)}) assignments = map(lambda x: ASSIGNMENT_TEMPLATE.format(**x), fields) assignments = ",".join(assignments) query = UPDATE_QUERY_TEMPLATE.format(table_name=klass._meta.db_table, assignments=assignments) return query
def activate_pragmas_per_connection(sender, connection, **kwargs): """ Activate SQLite3 PRAGMAs that apply on a per-connection basis. A no-op right now, but kept around as infrastructure if we ever want to add PRAGMAs in the future. """ if connection.vendor == "sqlite": cursor = connection.cursor() # Shorten the default WAL autocheckpoint from 1000 pages to 500 cursor.execute(CONNECTION_PRAGMAS) # We don't turn on the following pragmas, because they have negligible # performance impact. For reference, here's what we've tested: # Don't ensure that the OS has fully flushed # our data to disk. # cursor.execute("PRAGMA synchronous=OFF;") # Store cross-database JOINs in memory. # cursor.execute("PRAGMA temp_store=MEMORY;")
def activate_pragmas_on_start(): """ Activate a set of PRAGMAs that apply to the database itself, and not on a per connection basis. :return: """ from django.db import connection if connection.vendor == "sqlite": cursor = connection.cursor() # http://www.sqlite.org/wal.html # WAL's main advantage allows simultaneous reads # and writes (vs. the default exclusive write lock) # at the cost of a slight penalty to all reads. cursor.execute(START_PRAGMAS)
def test_caching_enabled(admin_client, router, destination): # Only sqlite3 logs a begin query within transaction atomic_queries = 1 if connection.vendor == 'sqlite' else 0 with override_settings(ROUTING_CACHE=True): with CaptureQueriesContext(connection=connection) as c: response = admin_client.get(router.source, follow=True) assert response.status_code == 200 assert_string_equal(response.content, 'destination') first = len(c) assert first - atomic_queries == 5 response = admin_client.get(router.source, follow=True) assert response.status_code == 200 assert_string_equal(response.content, 'destination') # Should only query for user and session because of condition assert len(c) - first - atomic_queries == 2 router.delete() with CaptureQueriesContext(connection=connection) as c: response = admin_client.get(router.source, follow=True) assert response.status_code == 200 assert_string_equal(response.content, 'home') # Only the router query assert len(c) == 1
def test_average_consumption_by_hour(self, now_mock): """ Test whether timezones are converted properly when grouping hours. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2016, 1, 25, 12)) HourStatistics.objects.create( # This should be stored with local timezone, so +1. hour_start=timezone.make_aware(timezone.datetime(2016, 1, 1, 12)), electricity1=1, electricity2=0, electricity1_returned=0, electricity2_returned=0, ) hour_stat = dsmr_stats.services.average_consumption_by_hour(max_weeks_ago=4)[0] # @see "Trends are always shown in UTC #76", only PostgreSQL can fix this. if connection.vendor == 'postgresql': # Regression, Django defaults to UTC, so '11' here. self.assertEqual(hour_stat['hour_start'], 12) else: return self.skipTest('Test cannot be fixed for backends other than PostgreSQL')
def forwards(self, orm): # this is shitty, but we dont care if connection.vendor == 'sqlite': transaction.set_autocommit(True) # ideally we would have done this data migration before this change, but # it was an oversight if not db.dry_run: try: self.fix_missing_teams(orm) except Exception as e: import traceback; traceback.print_exc() raise # Changing field 'Project.team' db.alter_column('sentry_project', 'team_id', self.gf('sentry.db.models.fields.FlexibleForeignKey')(to=orm['sentry.Team']))
def can_migrate(self, connection): """ Return True if the model can/should be migrated on the `connection`. `connection` can be either a real connection or a connection alias. """ if self.proxy or self.swapped or not self.managed: return False if isinstance(connection, six.string_types): connection = connections[connection] if self.required_db_vendor: return self.required_db_vendor == connection.vendor if self.required_db_features: return all(getattr(connection.features, feat, False) for feat in self.required_db_features) return True
def test_backend_db(): """Ensure that we are always testing sqlite on fast in memory DB""" from django.db import connection, connections if connection.vendor == "sqlite": assert connections.databases["default"]["NAME"] == ":memory:"
def handle(self, *args, **options): with connection.cursor() as cursor: if connection.vendor == "postgresql": _pg(cursor) return "Created PostgreSQL trigger" if connection.vendor == "mysql": _mysql(cursor) return "Created MySQL trigger" if connection.vendor == "sqlite": _sqlite(cursor) return "Created SQLite trigger"
def get_queryset(self): channel_id = self.kwargs['channel_id'] attempted_mastery_logs = MasteryLog.objects.filter(attemptlogs__isnull=False) query_node = ContentNode.objects.get(pk=self.kwargs['content_node_id']) if self.request.query_params.get('last_active_time'): # Last active time specified datetime_cutoff = parse(self.request.query_params.get('last_active_time')) else: datetime_cutoff = timezone.now() - datetime.timedelta(7) # Set on the kwargs to pass into the serializer self.kwargs['last_active_time'] = datetime_cutoff.isoformat() recent_content_items = ContentSummaryLog.objects.filter_by_topic(query_node).filter( Q(progress__gt=0) | Q(masterylogs__in=attempted_mastery_logs), user__in=list(get_members_or_user(self.kwargs['collection_kind'], self.kwargs['collection_id'])), end_timestamp__gte=datetime_cutoff).values_list('content_id', flat=True) if connection.vendor == 'postgresql': pks_with_unique_content_ids = ContentNode.objects.order_by('content_id').distinct('content_id').filter( channel_id=channel_id, content_id__in=recent_content_items).values_list('pk', flat=True) else: # note from rtibbles: # As good as either I or jamalex could come up with to ensure that we only return # unique content_id'ed ContentNodes from the coach recent report endpoint. # Would have loved to use distinct('content_id'), but unfortunately DISTINCT ON is Postgresql only pks_with_unique_content_ids = ContentNode.objects.filter( channel_id=channel_id, content_id__in=recent_content_items).values('content_id').order_by('lft').annotate( pk=Min('pk')).values_list('pk', flat=True) return ContentNode.objects.filter(pk__in=pks_with_unique_content_ids).order_by('lft')
def get_queryset(self): qs = super(AutocompleteLookup, self).get_queryset() qs = self.get_filtered_queryset(qs) qs = self.get_searched_queryset(qs) if connection.vendor == 'postgresql': ordering = list(self.model._meta.ordering) distinct_columns = [o.lstrip('-') for o in ordering] + [self.model._meta.pk.column] return qs.order_by(*ordering).distinct(*distinct_columns) else: return qs.distinct()
def is_db_postgresql(): return connection.vendor == 'postgresql'
def _update_descendant_lang_url_paths(self, old_page): cursor = connection.cursor() if connection.vendor == 'sqlite': field_update_fmt = "{0} = %s || substr({0}, %s)" elif connection.vendor == 'mysql': field_update_fmt = "{0} = CONCAT(%s, substring({0}, %s))" elif connection.vendor in ('mssql', 'microsoft'): field_update_fmt = "{0} = CONCAT(%s, (SUBSTRING({0}, 0, %s)))" else: field_update_fmt = "{0} = %s || substring({0} from %s)" exec_args = [] update_fields_sql = [] for lang_code in mt_settings.AVAILABLE_LANGUAGES: url_path_attr = build_localized_fieldname('url_path', lang_code) new_url_path = getattr(self, url_path_attr) old_url_path = getattr(old_page, url_path_attr) if new_url_path != old_url_path: update_fields_sql.append(field_update_fmt.format(url_path_attr)) exec_args.append(new_url_path) exec_args.append(len(old_url_path) + 1) if not update_fields_sql: # in case page was moved but parent did not change # nothing has to be updated return update_sql = """ UPDATE wagtailcore_page SET {} WHERE path LIKE %s AND id <> %s """.format(','.join(update_fields_sql)) exec_args.append(self.path + '%') exec_args.append(self.id) cursor.execute(update_sql, exec_args)
def ready(self): """ Performs an DB engine check, as we maintain some engine specific queries. """ if (connection.vendor not in settings.DSMRREADER_SUPPORTED_DB_VENDORS): # pragma: no cover # Temporary for backwards compatibility warnings.showwarning( _( 'Unsupported database engine "{}" active, ' 'some features might not work properly'.format(connection.vendor) ), RuntimeWarning, __file__, 0 )
def average_consumption_by_hour(max_weeks_ago): """ Calculates the average consumption by hour. Measured over all consumption data of the past X months. """ sql_extra = { # Ugly engine check, but still beter than iterating over a hundred thousand items in code. 'postgresql': "date_part('hour', hour_start)", 'sqlite': "strftime('%H', hour_start)", 'mysql': "extract(hour from hour_start)", }[connection.vendor] # Only PostgreSQL supports this builtin. set_time_zone_sql = connection.ops.set_time_zone_sql() if set_time_zone_sql: connection.connection.cursor().execute(set_time_zone_sql, [settings.TIME_ZONE]) # pragma: no cover hour_statistics = HourStatistics.objects.filter( # This greatly helps reducing the queryset size, but also makes it more relevant. hour_start__gt=timezone.now() - timezone.timedelta(weeks=max_weeks_ago) ).extra({ 'hour_start': sql_extra }).values('hour_start').order_by('hour_start').annotate( avg_electricity1=Avg('electricity1'), avg_electricity2=Avg('electricity2'), avg_electricity1_returned=Avg('electricity1_returned'), avg_electricity2_returned=Avg('electricity2_returned'), avg_electricity_merged=Avg(models.F('electricity1') + models.F('electricity2')), avg_electricity_returned_merged=Avg(models.F('electricity1_returned') + models.F('electricity2_returned')), avg_gas=Avg('gas'), ) # Force evaluation, as we want to reset timezone in cursor below. hour_statistics = list(hour_statistics) if set_time_zone_sql: # Prevents "database connection isn't set to UTC" error. connection.connection.cursor().execute(set_time_zone_sql, ['UTC']) # pragma: no cover return hour_statistics
def get_deleted(self, model_class, db=None, model_db=None): """ Returns all the deleted versions for the given model class. The results are returned with the most recent versions first. """ db = db or DEFAULT_DB_ALIAS model_db = model_db or db content_type = ContentType.objects.db_manager(db).get_for_model(model_class) live_pk_queryset = model_class._default_manager.db_manager(model_db).all().values_list("pk", flat=True) versioned_objs = self._get_versions(db).filter( content_type = content_type, ) if has_int_pk(model_class): # If the model and version data are in different databases, decouple the queries. if model_db != db: live_pk_queryset = list(live_pk_queryset.iterator()) # We can do this as a fast, in-database join. deleted_version_pks = versioned_objs.exclude( object_id_int__in = live_pk_queryset ).values_list("object_id_int") else: # This join has to be done as two separate queries. deleted_version_pks = versioned_objs.exclude( object_id__in = list(live_pk_queryset.iterator()) ).values_list("object_id") deleted_version_pks = deleted_version_pks.exclude( type = VERSION_DELETE, ).annotate( latest_pk = Max("pk") ).values_list("latest_pk", flat=True) # HACK: MySQL deals extremely badly with this as a subquery, and can hang infinitely. # TODO: If a version is identified where this bug no longer applies, we can add a version specifier. if connection.vendor == "mysql": deleted_version_pks = list(deleted_version_pks) # Return the deleted versions! return self._get_versions(db).filter(pk__in=deleted_version_pks).order_by("-pk") # Signal receivers.
def set_project_resource(request, path_obj, dir_path, filename): """Loads :cls:`pootle_app.models.Directory` and :cls:`pootle_store.models.Store` models and populates the request object. This is the same as `set_resource` but operates at the project level across all languages. :param path_obj: A :cls:`pootle_project.models.Project` object. :param dir_path: Path relative to the root of `path_obj`. :param filename: Optional filename. """ query_ctx_path = ''.join(['/%/', path_obj.code, '/']) query_pootle_path = query_ctx_path + dir_path obj_directory = getattr(path_obj, 'directory', path_obj) ctx_path = obj_directory.pootle_path resource_path = dir_path pootle_path = ctx_path + dir_path # List of TP paths available for user user_tps = TranslationProject.objects.for_user(request.user) user_tps = user_tps.filter( project__code=path_obj.code, ).values_list('pootle_path', flat=True) user_tps_regex = '^%s' % u'|'.join(list(user_tps)) sql_regex = 'REGEXP' if connection.vendor == 'postgresql': sql_regex = '~' if filename: query_pootle_path = query_pootle_path + filename pootle_path = pootle_path + filename resource_path = resource_path + filename resources = Store.objects.live().extra( where=[ 'pootle_store_store.pootle_path LIKE %s', 'pootle_store_store.pootle_path ' + sql_regex + ' %s', ], params=[query_pootle_path, user_tps_regex] ).select_related('translation_project__language') else: resources = Directory.objects.live().extra( where=[ 'pootle_app_directory.pootle_path LIKE %s', 'pootle_app_directory.pootle_path ' + sql_regex + ' %s', ], params=[query_pootle_path, user_tps_regex] ).select_related('parent') if not resources.exists(): raise Http404 request.store = None request.directory = None request.pootle_path = pootle_path request.resource_obj = ProjectResource(resources, pootle_path) request.resource_path = resource_path request.ctx_obj = path_obj or request.resource_obj request.ctx_path = ctx_path
def create(): """ Creates a backup of the database. Optionally gzipped. """ # Backup file with day name included, for weekly rotation. backup_file = os.path.join(get_backup_directory(), 'dsmrreader-{}-backup-{}.sql'.format( connection.vendor, formats.date_format(timezone.now().date(), 'l') )) # PostgreSQL backup. if connection.vendor == 'postgresql': # pragma: no cover backup_process = subprocess.Popen( [ settings.DSMRREADER_BACKUP_PG_DUMP, '--host={}'.format(settings.DATABASES['default']['HOST']), '--user={}'.format(settings.DATABASES['default']['USER']), settings.DATABASES['default']['NAME'], ], env={ 'PGPASSWORD': settings.DATABASES['default']['PASSWORD'] }, stdout=open(backup_file, 'w') # pragma: no cover ) # MySQL backup. elif connection.vendor == 'mysql': # pragma: no cover backup_process = subprocess.Popen( [ settings.DSMRREADER_BACKUP_MYSQLDUMP, '--compress', '--hex-blob', '--extended-insert', '--quick', '--host', settings.DATABASES['default']['HOST'], '--user', settings.DATABASES['default']['USER'], '--password={}'.format(settings.DATABASES['default']['PASSWORD']), settings.DATABASES['default']['NAME'], ], stdout=open(backup_file, 'w') # pragma: no cover ) # SQLite backup. elif connection.vendor == 'sqlite': # pragma: no cover backup_process = subprocess.Popen( [ settings.DSMRREADER_BACKUP_SQLITE, settings.DATABASES['default']['NAME'], '.dump', ], stdout=open(backup_file, 'w') ) # pragma: no cover else: raise NotImplementedError('Unsupported backup backend: {}'.format(connection.vendor)) # pragma: no cover backup_process.wait() backup_settings = BackupSettings.get_solo() if backup_settings.compress: compress(file_path=backup_file) backup_settings.latest_backup = timezone.now() backup_settings.save()