我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用django.db.connection.close()。
def _ensureConnection(self): # If connection is already made close it. from django.db import connection if connection.connection is not None: connection.close() # Loop forever until a connection can be made. while True: try: connection.ensure_connection() except Exception: log.err(_why=( "Error starting: " "Connection to database cannot be established.")) time.sleep(1) else: # Connection made, now close it. connection.close() break
def handle_one(self, check): """ Send an alert for a single check. Return True if an appropriate check was selected and processed. Return False if no checks need to be processed. """ # Save the new status. If sendalerts crashes, # it won't process this check again. check.status = check.get_status() check.save() tmpl = "\nSending alert, status=%s, code=%s\n" self.stdout.write(tmpl % (check.status, check.code)) errors = check.send_alert() for ch, error in errors: self.stdout.write("ERROR: %s %s %s\n" % (ch.kind, ch.value, error)) connection.close() return True
def terminate_db_connections(): """Terminates active connections to the database being used by Django""" kill_connection_sql = \ "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid();" with connection.cursor() as cur: cur.execute(kill_connection_sql) connection.close()
def job(self) -> None: while not self.stop.is_set(): seconds_to_wait = self.wait_until_next_run() if self.stop.wait(timeout=seconds_to_wait): return if self.settings.auto_wanted.enable: self.crawler_logger.info("Starting timed auto wanted.") connection.close() for provider_name in self.settings.auto_wanted.providers: attrs = Attribute.objects.filter(provider__slug=provider_name) for wanted_generator in self.settings.provider_context.get_wanted_generators(provider_name): wanted_generator(self.settings, self.crawler_logger, attrs) self.update_last_run(django_tz.now())
def job(self) -> None: while not self.stop.is_set(): seconds_to_wait = self.wait_until_next_run() if self.stop.wait(timeout=seconds_to_wait): return if self.settings.autochecker.enable: connection.close() self.crawler_logger.info("Starting timed auto search") current_settings = Settings(load_from_config=self.settings.config) current_settings.silent_processing = True current_settings.replace_metadata = True self.web_queue.enqueue_args_list(['-feed', '-wanted'], override_options=current_settings) self.update_last_run(django_tz.now())
def get_values(): for _ in range(LOOPS): # Add `reset_value=1000` to use SELECT + UPDATE instead of INSERT ON CONFLICT. get_next_value() connection.close()
def test_first_access_with_commit(self): def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value() output.append(('one', value)) time.sleep(0.2) output.append(('one', 'commit')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value() output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 1), ('two', 'begin'), ('one', 'commit'), ('two', 2), ('two', 'commit'), ] self.assertSequence(one, two, expected)
def test_later_access_with_commit(self): get_next_value() def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value() output.append(('one', value)) time.sleep(0.2) output.append(('one', 'commit')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value() output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 2), ('two', 'begin'), ('one', 'commit'), ('two', 3), ('two', 'commit'), ] self.assertSequence(one, two, expected)
def test_first_access_with_rollback(self): def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value() output.append(('one', value)) time.sleep(0.2) transaction.set_rollback(True) output.append(('one', 'rollback')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value() output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 1), ('two', 'begin'), ('one', 'rollback'), ('two', 1), ('two', 'commit'), ] self.assertSequence(one, two, expected)
def test_later_access_with_rollback(self): get_next_value() def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value() output.append(('one', value)) time.sleep(0.2) transaction.set_rollback(True) output.append(('one', 'rollback')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value() output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 2), ('two', 'begin'), ('one', 'rollback'), ('two', 2), ('two', 'commit'), ] self.assertSequence(one, two, expected)
def test_later_access_nowait(self): get_next_value() def one(output): with transaction.atomic(): value = get_next_value() output.append(('one', value)) time.sleep(0.5) connection.close() def two(output): time.sleep(0.1) with self.assertRaises(OperationalError): with transaction.atomic(): value = get_next_value(nowait=True) output.append(('two', value)) # shouldn't be reached output.append(('two', 'exc')) connection.close() expected = [ ('one', 2), ('two', 'exc'), ] self.assertSequence(one, two, expected)
def test_first_access_to_different_sequences(self): def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value('one') output.append(('one', value)) time.sleep(0.2) output.append(('one', 'commit')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value('two') output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 1), ('two', 'begin'), ('two', 1), ('two', 'commit'), ('one', 'commit'), ] self.assertSequence(one, two, expected)
def test_later_access_to_different_sequences(self): get_next_value('one') get_next_value('two') def one(output): with transaction.atomic(): output.append(('one', 'begin')) value = get_next_value('one') output.append(('one', value)) time.sleep(0.2) output.append(('one', 'commit')) connection.close() def two(output): time.sleep(0.1) with transaction.atomic(): output.append(('two', 'begin')) value = get_next_value('two') output.append(('two', value)) output.append(('two', 'commit')) connection.close() expected = [ ('one', 'begin'), ('one', 2), ('two', 'begin'), ('two', 2), ('two', 'commit'), ('one', 'commit'), ] self.assertSequence(one, two, expected)
def check_db_connection(): from django.db import connection if connection.connection: #NOTE: (zacky, 2016.MAR.21st) IF CONNECTION IS CLOSED BY BACKEND, CLOSE IT AT DJANGO, WHICH WILL BE SETUP AFTERWARDS. if not connection.is_usable(): connection.close()
def _set_database_name(testing, alias, db_name): if not testing: return connection.close() settings.DATABASES[alias]['NAME'] = db_name connection.settings_dict['NAME'] = db_name
def headerparserhandler(req): """A mod_python headerparserhandler to authenticate and authorize a request using NAV. It uses NAV's Django authenticaton and authorization middlewares and translates between mod_python and Django requests/responses. """ from mod_python import apache req.get_full_path = lambda: req.unparsed_uri is_ajax = req.headers_in.get('X-Requested-With', '') == 'XMLHttpRequest' req.is_ajax = lambda: is_ajax req.COOKIES = _get_cookie_dict(req) for mware in (SessionMiddleware, AuthenticationMiddleware, AuthorizationMiddleware): response = mware().process_request(req) try: if response: if 'Location' in response: req.headers_out['Location'] = response['Location'] return response.status_code else: return apache.OK finally: # ensure we don't leak database connections. it's inefficient, yes, but # that's the price you pay for authorizing access to your other crap connection.close()
def test_django_model(model): connection.close() # Ensure clean connection list(model.objects.all()[:5])
def signalhandler(signum, _): """Signal handler to close and reopen log file(s) on HUP and exit on TERM""" if signum == signal.SIGHUP: logger.info('SIGHUP received; reopening log files.') nav.logs.reopen_log_files() nav.daemon.redirect_std_fds( stderr=nav.logs.get_logfile_from_logger()) logger.info('Log files reopened.') elif signum == signal.SIGTERM: logger.warning('SIGTERM received: Shutting down') sys.exit(0)
def on_retry(self, exc, task_id, args, kwargs, einfo): connection.close()
def on_failure(self, exc, task_id, args, kwargs, einfo): connection.close()
def on_success(self, retval, task_id, args, kwargs): connection.close()
def maybeCloseDatabaseConnections(self): """Close database connections if their use is not permitted.""" if self.database_use_possible and not self.database_use_permitted: from django.db import connection connection.close()
def checkDatabaseUse(self): """Enforce `database_use_permitted`.""" if self.database_use_possible and not self.database_use_permitted: from django.db import connection self.expectThat( connection.connection, testtools.matchers.Is(None), "Test policy forbids use of the database.") connection.close()
def job(self) -> None: while not self.stop.is_set(): seconds_to_wait = self.wait_until_next_run() if self.stop.wait(timeout=seconds_to_wait): return if self.settings.autoupdater.enable: current_settings = Settings(load_from_config=self.settings.config) current_settings.keep_dl_type = True current_settings.silent_processing = True current_settings.config['allowed']['replace_metadata'] = 'yes' connection.close() start_date = django_tz.now() - timedelta(seconds=int(self.timer)) - timedelta(days=self.settings.autoupdater.buffer_back) end_date = django_tz.now() - timedelta(days=self.settings.autoupdater.buffer_after) to_update_providers = current_settings.autoupdater.providers galleries = Gallery.objects.filter( posted__gte=start_date, posted__lte=end_date, provider__in=to_update_providers ) if not galleries: self.crawler_logger.info( "No galleries posted from {} to {} need updating. Providers: {}".format( start_date, end_date, ", ".join(to_update_providers) ) ) else: # Leave only info downloaders, then leave only enabled auto updated providers downloaders = current_settings.provider_context.get_downloaders_name_priority(current_settings, filter_name='info') downloaders_names = [x[0] for x in downloaders if x[0].replace("_info", "") in to_update_providers] current_settings.allow_downloaders_only(downloaders_names, True, True, True) url_list = [x.get_link() for x in galleries] self.crawler_logger.info( "Starting timed auto updater, updating {} galleries " "posted from {} to {}. Providers: {}".format( len(url_list), start_date, end_date, ", ".join(to_update_providers) ) ) url_list.append('--update-mode') self.web_queue.enqueue_args_list(url_list, override_options=current_settings) self.update_last_run(django_tz.now())
def _process_message(self, peer: Tuple[str, int], mailfrom: str, rcpttos: Sequence[str], data: bytes, *, channel: PatchedSMTPChannel, **kwargs: Any) -> Union[str, None]: # we can't import the Domain model before Django has been initialized from mailauth.models import Domain data = self.add_received_header(peer, data, channel) mfdomain = mailfrom.split("@", 1)[1] dom = None # type: Domain try: dom = Domain.objects.get(name=mfdomain) except Domain.DoesNotExist: _log.debug("Unknown domain: %s (%s)", mfdomain, mailfrom) except OperationalError: # this is a hacky hack, but psycopg2 falls over when haproxy closes the connection on us _log.info("Database connection closed, Operational Error, retrying") from django.db import connection connection.close() if "retry" in kwargs: _log.exception("Database unavailable.") return "421 Processing problem. Please try again later." else: return self.process_message(peer, mailfrom, rcpttos, data, retry=True, **kwargs) signed = False if dom is not None and dom.dkimkey: sig = dkim.sign(data, dom.dkimselector.encode("utf-8"), dom.name.encode("utf-8"), dom.dkimkey.replace("\r\n", "\n").encode("utf-8")) data = b"%s%s" % (sig, data) try: logstr = data.decode('utf-8') enc = "utf-8" except UnicodeDecodeError: logstr = data.decode('latin1') enc = "latin1" _log.debug("Signed output (%s):\n%s", enc, logstr) signed = True # now send the mail back to be processed _log.info("Relaying %semail from <%s> to <%s>", "DKIM signed " if signed else "", mailfrom, rcpttos) self.smtp.sendmail(mailfrom, rcpttos, data) return None