Python django.conf.settings 模块,DATABASES 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.conf.settings.DATABASES

项目:zappa-django-utils    作者:Miserlou    | 项目源码 | 文件源码
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting Schema creation..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        con = connect(dbname=dbname, user=user, host=host, password=password)

        self.stdout.write(self.style.SUCCESS('Adding schema {schema} to database {dbname}'
                                             .format(schema=settings.SCHEMA, dbname=dbname)))
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        cur = con.cursor()
        cur.execute('CREATE SCHEMA {schema};'.format(schema=settings.SCHEMA))
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All Done!'))
项目:zappa-django-utils    作者:Miserlou    | 项目源码 | 文件源码
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting Schema deletion..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        con = connect(dbname=dbname, user=user, host=host, password=password)

        self.stdout.write(self.style.SUCCESS('Removing schema {schema} from database {dbname}'
                                             .format(schema=settings.SCHEMA, dbname=dbname)))
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        cur = con.cursor()
        cur.execute('DROP SCHEMA {schema} CASCADE;'.format(schema=settings.SCHEMA))
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All Done.'))
项目:zappa-django-utils    作者:Miserlou    | 项目源码 | 文件源码
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting DB creation..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        self.stdout.write(self.style.SUCCESS('Connecting to host..'))
        con = connect(dbname='postgres', user=user, host=host, password=password)
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        self.stdout.write(self.style.SUCCESS('Creating database'))
        cur = con.cursor()
        cur.execute('CREATE DATABASE ' + dbname)
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All done!'))
项目:sqlibrist    作者:condograde    | 项目源码 | 文件源码
def get_config():
    """
    Gets engine type from Django settings
    """
    DB = settings.DATABASES['default']
    ENGINE = DB.get('ENGINE', '')
    config = {}

    if 'postgresql' in ENGINE \
            or 'psycopg' in ENGINE:
        config['engine'] = ENGINE_POSTGRESQL
    elif 'mysql' in ENGINE:
        config['engine'] = ENGINE_MYSQL

    else:
        raise BadConfig('Django configured with unsupported database engine: '
                        '%s' % DB.get('ENGINE', ''))

    return config
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def __init__(self, db_settings=None, db_backup_name=None):
        """
        Constructor

        Args:
            db_settings (dict): A dict of database settings
            db_backup_name (str): The name that will be given to the backup database
        """
        self.db_settings = db_settings or settings.DATABASES['default']
        self.db_name = self.db_settings['NAME']
        if self.db_name[0:5] != 'test_':
            raise Exception(
                "The test suite is attempting to use the database '{}'."
                "The test database should have a name that begins with 'test_'. Exiting...".format(self.db_name)
            )
        self.db_backup_name = db_backup_name or getattr(settings, 'BACKUP_DB_NAME', self.DEFAULT_BACKUP_DB_NAME)
        self.db_cmd_args = [
            "-h", self.db_settings['HOST'],
            "-p", str(self.db_settings['PORT']),
            "-U", self.db_settings['USER']
        ]
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def pg_dump(file_location):
    env = os.environ.copy()
    env.update({  # requires having password set to test
        "PGPASSWORD": settings.DATABASES['default']['PASSWORD']
    })
    pg_dump = [
        'pg_dump',
        '-h%s' % settings.DATABASES[settings.CLIPS_DATABASE_ALIAS]['HOST'],
        '-U%s' % settings.DATABASES['default']['USER'],
        settings.DATABASES['default']['NAME']
    ]
    with Popen(pg_dump, env=env, stdout=PIPE, stderr=STDOUT, bufsize=1
               ) as task, open(file_location, 'wb') as f:
        for line in task.stdout:
            f.write(line)
        return task.wait()
项目:django-schemas    作者:ryannjohnson    | 项目源码 | 文件源码
def dbs_by_environment(environment, write_only=True):
    """
    Retrieve all database aliases that contain the given environment.

    Args:
        environment (str): The environment the databases must contain.
        write_only (Optional[bool]): Exclude any read-only databases.

    Returns:
        Set of aliases.

    """
    possible = set()
    for alias in settings.DATABASES:
        if write_only and is_read_db(alias):
            continue
        if environment in settings.DATABASES[alias]['ENVIRONMENTS']:
            possible.add(alias)
    return possible
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            from django.core.exceptions import ImproperlyConfigured
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            from django.core.exceptions import ImproperlyConfigured
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False):
        """
        Destroy a test database, prompting the user for confirmation if the
        database already exists.
        """
        self.connection.close()
        test_database_name = self.connection.settings_dict['NAME']
        if verbosity >= 1:
            test_db_repr = ''
            action = 'Destroying'
            if verbosity >= 2:
                test_db_repr = " ('%s')" % test_database_name
            if keepdb:
                action = 'Preserving'
            print("%s test database for alias '%s'%s..." % (
                action, self.connection.alias, test_db_repr))

        # if we want to preserve the database
        # skip the actual destroying piece.
        if not keepdb:
            self._destroy_test_db(test_database_name, verbosity)

        # Restore the original database name
        settings.DATABASES[self.connection.alias]["NAME"] = old_database_name
        self.connection.settings_dict["NAME"] = old_database_name
项目:django-green-grove    作者:dreipol    | 项目源码 | 文件源码
def back_up_database(self, backup_storage, temp_backup_path):
        logger.info('Start backing up the database.')
        file_path = '{database}_{timestamp}.dump'.format(
            database=settings.DATABASES['default']['NAME'],
            timestamp=self.timestamp
        )
        temp_file_path = '{backup_path}/{file_path}'.format(backup_path=temp_backup_path, file_path=file_path)

        # Run the `pg_dump` command.
        os.system('pg_dump -h {host} -U {user} {database} > {file_path}'.format(
            host=settings.DATABASES['default']['HOST'],
            user=settings.DATABASES['default']['USER'],
            database=settings.DATABASES['default']['NAME'],
            file_path=temp_file_path
        ))

        # Store the dump file on the backup bucket.
        with open(temp_file_path, 'rb') as database_backup_file:
            target_file_path = '{timestamp}/{path}'.format(timestamp=self.timestamp, path=file_path)
            backup_storage.save(target_file_path, database_backup_file)
            logger.info('Database dump successfully copied to the target storage backend.')
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (Database.DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:django_any_backend    作者:primal100    | 项目源码 | 文件源码
def ready(self):
        for db in settings.DATABASES:
            name = db['NAME']
            db_wrapper_class = import_string(db['ENGINE'] + '.base.DatabaseWrapper')
            base_model = getattr(db_wrapper_class, 'base_model', None)
            if base_model:
                models = apps.get_models()
                for model in models:
                    if name == router.db_for_read(model):
                        for k, v in base_model.__dict__:
                            if k == 'objects':
                                model_manager = getattr(model, 'objects', None)
                                if model_manager:
                                    manager_cls = model_manager.__class__
                                    custom_cls = v.__class__
                                    new_manager = type('AnyBackendCustomManager', (custom_cls, manager_cls), {})
                                    setattr(model, 'objects', new_manager())
                                else:
                                    setattr(model, 'objects', v)
                            elif not k.startswith('__'):
                                setattr(model, k ,v)
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (Database.DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:django-gitlab-boilerplate    作者:motius    | 项目源码 | 文件源码
def handle(self, *args, **options):
        try:
            connection = connections[options['database']]
            cursor = connection.cursor()
            database_settings = settings.DATABASES[options['database']]
        except ConnectionDoesNotExist:
            raise CommandError('Database "%s" does not exist in settings' % options['database'])

        if connection.vendor == 'sqlite':
            print("Deleting database %s" % database_settings['NAME'])
            os.remove(database_settings['NAME'])
        elif connection.vendor == 'mysql':
            print("Dropping database %s" % database_settings['NAME'])
            cursor.execute("DROP DATABASE `%s`;" % database_settings['NAME'])

            print("Creating database %s" % database_settings['NAME'])
            cursor.execute("CREATE DATABASE `%s` CHARACTER SET utf8;" % database_settings['NAME'])
            # Should fix some "MySQL has gone away issues"
            cursor.execute("SET GLOBAL max_allowed_packet=32*1024*1024;")
        elif connection.vendor == 'postgresql':
            print("Dropping and recreating schema public")
            cursor.execute("DROP schema public CASCADE; CREATE schema public")
        else:
            raise CommandError('Database vendor not supported')
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def line_strain_changed(sender, instance, action, reverse, model, pk_set, using, **kwargs):
    """
    Handles changes to the Line <-> Strain relationship caused by adding/removing/changing the
    strain associated with a single line in a study. Detects changes that indicate a need to push
    changes across to ICE for the (ICE part -> EDD study) link stored in ICE.
    """
    # only care about changes in the forward direction, Line -> Strain
    if reverse or check_ice_cannot_proceed():
        return
    # only execute these signals if using a non-testing database
    if using in settings.DATABASES:
        action_function = {
            'post_add': strain_added,
            'pre_remove': strain_removing,
            'post_remove': strain_removed,
        }.get(action, None)
        if action_function:
            action_function(instance, pk_set)


# ----- helper functions -----
项目:python-ormsqlite    作者:vallemrv    | 项目源码 | 文件源码
def modify_row(self, decoder):
        modulo = importlib.import_module(self.db+".models")
        clase_mane = decoder["tb"]
        for s in dir(modulo):
            if s.lower() == decoder["tb"]:
                clase_mane = s
        class_model = getattr(modulo, clase_mane)
        if "condition" in decoder:
            db_name = settings.DATABASES["default"]["NAME"]
            row_id = Model(db_name=db_name, table_name=self.db+"_"+decoder['tb'])
            row_id.load_first_by_query(**decoder["condition"])
            decoder["fields"]["id"] = row_id.id
        if 'id' in decoder["fields"]:
            try:
                row = class_model.objects.get(pk=decoder['fields']['id'])
            except:
                row = class_model()
        else:
            row = class_model()
        for k, v in decoder["fields"].items():
            setattr(row, k, v)
        return row
项目:django-daiquiri    作者:aipescience    | 项目源码 | 文件源码
def __init__(self):

        self.database_key = 'data'
        self.database_config = settings.DATABASES['data']

        try:
            database_adapter_class = import_class(settings.ADAPTER_DATABASE)
            self.database = database_adapter_class(self.database_key, self.database_config)
        except AttributeError:

            if self.database_config['ENGINE'] == 'django.db.backends.mysql':
                self.database = MySQLAdapter(self.database_key, self.database_config)
            else:
                raise Exception('No suitable database adapter found.')

        try:
            download_adapter_class = import_class(settings.ADAPTER_DOWNLOAD)
            self.download = download_adapter_class(self.database_key, self.database_config)
        except AttributeError:

            if self.database_config['ENGINE'] == 'django.db.backends.mysql':
                self.download = MysqldumpAdapter(self.database_key, self.database_config)
            else:
                raise Exception('No suitable download adapter found.')
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:django-next-train    作者:bitpixdigital    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (Database.DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:LatinSounds_AppEnviaMail    作者:G3ek-aR    | 项目源码 | 文件源码
def _switch_to_test_user(self, parameters):
        """
        Oracle doesn't have the concept of separate databases under the same user.
        Thus, we use a separate user (see _create_test_db). This method is used
        to switch to that user. We will need the main user again for clean-up when
        we end testing, so we keep its credentials in SAVED_USER/SAVED_PASSWORD
        entries in the settings dict.
        """
        real_settings = settings.DATABASES[self.connection.alias]
        real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
            self.connection.settings_dict['USER']
        real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
            self.connection.settings_dict['PASSWORD']
        real_test_settings = real_settings['TEST']
        test_settings = self.connection.settings_dict['TEST']
        real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
            self.connection.settings_dict['USER'] = parameters['user']
        real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
项目:maas    作者:maas    | 项目源码 | 文件源码
def _extract_django16_south_maas19(cls):
        """Extract the django16, south, and MAAS 1.9 source code in to a temp
        path."""
        path_to_tarball = cls._path_to_django16_south_maas19()
        tempdir = tempfile.mkdtemp(prefix='maas-upgrade-')
        subprocess.check_call([
            "tar", "zxf", path_to_tarball, "-C", tempdir])

        settings_json = os.path.join(tempdir, "maas19settings.json")
        with open(settings_json, "w", encoding="utf-8") as fd:
            fd.write(json.dumps({"DATABASES": settings.DATABASES}))

        script_path = os.path.join(tempdir, "migrate.py")
        with open(script_path, "wb") as fp:
            fp.write(MAAS_UPGRADE_SCRIPT.encode("utf-8"))
        return tempdir, script_path
项目:django-postgres-extra    作者:SectorLabs    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        """Initializes a new instance of :see:PostgresManager."""

        super(PostgresManager, self).__init__(*args, **kwargs)

        # make sure our back-end is set and refuse to proceed
        # if it's not set
        db_backend = settings.DATABASES['default']['ENGINE']
        if 'psqlextra' not in db_backend:
            raise ImproperlyConfigured((
                '\'%s\' is not the \'psqlextra.backend\'. '
                'django-postgres-extra cannot function without '
                'the \'psqlextra.backend\'. Set DATABASES.ENGINE.'
            ) % db_backend)

        # hook into django signals to then trigger our own

        django.db.models.signals.post_save.connect(
            self._on_model_save, sender=self.model, weak=False)

        django.db.models.signals.pre_delete.connect(
            self._on_model_delete, sender=self.model, weak=False)

        self._signals_connected = True
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        # None may be used to connect to the default 'postgres' db
        if settings_dict['NAME'] == '':
            from django.core.exceptions import ImproperlyConfigured
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        conn_params = {
            'database': settings_dict['NAME'] or 'postgres',
        }
        conn_params.update(settings_dict['OPTIONS'])
        conn_params.pop('isolation_level', None)
        if settings_dict['USER']:
            conn_params['user'] = settings_dict['USER']
        if settings_dict['PASSWORD']:
            conn_params['password'] = force_str(settings_dict['PASSWORD'])
        if settings_dict['HOST']:
            conn_params['host'] = settings_dict['HOST']
        if settings_dict['PORT']:
            conn_params['port'] = settings_dict['PORT']
        return conn_params
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def _nodb_connection(self):
        nodb_connection = super(DatabaseWrapper, self)._nodb_connection
        try:
            nodb_connection.ensure_connection()
        except (DatabaseError, WrappedDatabaseError):
            warnings.warn(
                "Normally Django will use a connection to the 'postgres' database "
                "to avoid running initialization queries against the production "
                "database when it's not needed (for example, when running tests). "
                "Django was unable to create a connection to the 'postgres' database "
                "and will use the default database instead.",
                RuntimeWarning
            )
            settings_dict = self.settings_dict.copy()
            settings_dict['NAME'] = settings.DATABASES[DEFAULT_DB_ALIAS]['NAME']
            nodb_connection = self.__class__(
                self.settings_dict.copy(),
                alias=self.alias,
                allow_thread_sharing=False)
        return nodb_connection
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False):
        """
        Destroy a test database, prompting the user for confirmation if the
        database already exists.
        """
        self.connection.close()
        test_database_name = self.connection.settings_dict['NAME']
        if verbosity >= 1:
            test_db_repr = ''
            action = 'Destroying'
            if verbosity >= 2:
                test_db_repr = " ('%s')" % test_database_name
            if keepdb:
                action = 'Preserving'
            print("%s test database for alias '%s'%s..." % (
                action, self.connection.alias, test_db_repr))

        # if we want to preserve the database
        # skip the actual destroying piece.
        if not keepdb:
            self._destroy_test_db(test_database_name, verbosity)

        # Restore the original database name
        settings.DATABASES[self.connection.alias]["NAME"] = old_database_name
        self.connection.settings_dict["NAME"] = old_database_name
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def django_db_setup():
    settings.DATABASES['default']['name'] = os.path.join(
        settings.BASE_DIR, 'db.sqlite3')
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _maindb_connection(self):
        """
        This is analogous to other backends' `_nodb_connection` property,
        which allows access to an "administrative" connection which can
        be used to manage the test databases.
        For Oracle, the only connection that can be used for that purpose
        is the main (non-test) connection.
        """
        settings_dict = settings.DATABASES[self.connection.alias]
        user = settings_dict.get('SAVED_USER') or settings_dict['USER']
        password = settings_dict.get('SAVED_PASSWORD') or settings_dict['PASSWORD']
        settings_dict = settings_dict.copy()
        settings_dict.update(USER=user, PASSWORD=password)
        DatabaseWrapper = type(self.connection)
        return DatabaseWrapper(settings_dict, alias=self.connection.alias)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def get_connection_params(self):
        settings_dict = self.settings_dict
        if not settings_dict['NAME']:
            from django.core.exceptions import ImproperlyConfigured
            raise ImproperlyConfigured(
                "settings.DATABASES is improperly configured. "
                "Please supply the NAME value.")
        kwargs = {
            'database': settings_dict['NAME'],
            'detect_types': Database.PARSE_DECLTYPES | Database.PARSE_COLNAMES,
        }
        kwargs.update(settings_dict['OPTIONS'])
        # Always allow the underlying SQLite connection to be shareable
        # between multiple threads. The safe-guarding will be handled at a
        # higher level by the `BaseDatabaseWrapper.allow_thread_sharing`
        # property. This is necessary as the shareability is disabled by
        # default in pysqlite and it cannot be changed once a connection is
        # opened.
        if 'check_same_thread' in kwargs and kwargs['check_same_thread']:
            warnings.warn(
                'The `check_same_thread` option was provided and set to '
                'True. It will be overridden with False. Use the '
                '`DatabaseWrapper.allow_thread_sharing` property instead '
                'for controlling thread shareability.',
                RuntimeWarning
            )
        kwargs.update({'check_same_thread': False})
        if self.features.can_share_in_memory_db:
            kwargs.update({'uri': True})
        return kwargs
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def destroy_test_db(self, old_database_name=None, verbosity=1, keepdb=False, number=None):
        """
        Destroy a test database, prompting the user for confirmation if the
        database already exists.
        """
        self.connection.close()
        if number is None:
            test_database_name = self.connection.settings_dict['NAME']
        else:
            test_database_name = self.get_test_db_clone_settings(number)['NAME']

        if verbosity >= 1:
            action = 'Destroying'
            if keepdb:
                action = 'Preserving'
            print("%s test database for alias %s..." % (
                action,
                self._get_database_display_str(verbosity, test_database_name),
            ))

        # if we want to preserve the database
        # skip the actual destroying piece.
        if not keepdb:
            self._destroy_test_db(test_database_name, verbosity)

        # Restore the original database name
        if old_database_name is not None:
            settings.DATABASES[self.connection.alias]["NAME"] = old_database_name
            self.connection.settings_dict["NAME"] = old_database_name
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def test_db_signature(self):
        """
        Returns a tuple with elements of self.connection.settings_dict (a
        DATABASES setting value) that uniquely identify a database
        accordingly to the RDBMS particularities.
        """
        settings_dict = self.connection.settings_dict
        return (
            settings_dict['HOST'],
            settings_dict['PORT'],
            settings_dict['ENGINE'],
            settings_dict['NAME']
        )
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self, databases=None):
        """
        databases is an optional dictionary of database definitions (structured
        like settings.DATABASES).
        """
        self._databases = databases
        self._connections = local()
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def databases(self):
        if self._databases is None:
            self._databases = settings.DATABASES
        if self._databases == {}:
            self._databases = {
                DEFAULT_DB_ALIAS: {
                    'ENGINE': 'django.db.backends.dummy',
                },
            }
        if self._databases[DEFAULT_DB_ALIAS] == {}:
            self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy'

        if DEFAULT_DB_ALIAS not in self._databases:
            raise ImproperlyConfigured("You must define a '%s' database" % DEFAULT_DB_ALIAS)
        return self._databases
项目:django-celery-results    作者:celery    | 项目源码 | 文件源码
def current_engine(self):
        try:
            return settings.DATABASES[self.db]['ENGINE']
        except AttributeError:
            return settings.DATABASE_ENGINE
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def distinct(queryset, base):
    if settings.DATABASES[queryset.db]["ENGINE"] == "django.db.backends.oracle":
        # distinct analogue for Oracle users
        return base.filter(pk__in=set(queryset.values_list('pk', flat=True)))
    return queryset.distinct()


# Obtaining manager instances and names from model options differs after 1.10.
项目:django-vitals    作者:LCOGT    | 项目源码 | 文件源码
def check(self):
        for db in settings.DATABASES:
            try:
                connections[db].introspection.table_names()
            except Exception as exc:
                self.add_error('Could not connect to {}: {}'.format(db, exc))
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def pg_load(file_location):
    env = os.environ.copy()
    env.update({  # requires having password set to test
        "PGPASSWORD": settings.DATABASES['default']['PASSWORD']
    })
    load = [
        'psql',
        '-h%s' % settings.DATABASES['default']['HOST'],
        '-U%s' % settings.DATABASES['default']['USER'],
        '-d%s' % settings.DATABASES['default']['NAME'],
        '-f%s' % file_location,
    ]
    task = Popen(load, env=env)
    return task.wait()