我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用atexit.register()。
def connect_to_vcenter(self): # connect to vcenter try: si = SmartConnect( host=self.configs['main']['host'], user=self.configs['main']['user'], pwd=self.configs['main']['password'], port=self.configs['main']['port'], sslContext=self.context) atexit.register(Disconnect, si) except IOError as e: logging.error("Could not connect to vcenter." + str(e)) if not si: raise SystemExit("Unable to connect to host with supplied info.") return si
def kas(argv): """ The main entry point of kas. """ create_logger() parser = kas_get_argparser() args = parser.parse_args(argv) if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__) loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, interruption) atexit.register(_atexit_handler) for plugin in getattr(kasplugin, 'plugins', []): if plugin().run(args): return parser.print_help()
def get_win_certfile(): try: import wincertstore except ImportError: return None class CertFile(wincertstore.CertFile): def __init__(self): super(CertFile, self).__init__() atexit.register(self.close) def close(self): try: super(CertFile, self).close() except OSError: pass _wincerts = CertFile() _wincerts.addstore('CA') _wincerts.addstore('ROOT') return _wincerts.name
def _load_wincerts(): """Set _WINCERTS to an instance of wincertstore.Certfile.""" global _WINCERTS certfile = CertFile() certfile.addstore("CA") certfile.addstore("ROOT") atexit.register(certfile.close) _WINCERTS = certfile # XXX: Possible future work. # - Support CRL files? Only supported by CPython >= 2.7.9 and >= 3.4 # http://bugs.python.org/issue8813 # - OCSP? Not supported by python at all. # http://bugs.python.org/issue17123 # - Setting OP_NO_COMPRESSION? The server doesn't yet. # - Adding an ssl_context keyword argument to MongoClient? This might # be useful for sites that have unusual requirements rather than # trying to expose every SSLContext option through a keyword/uri # parameter.
def interactive(self): utils.set_prompt(ps1="(rudra) ", ps2="... ") import os import readline import rlcompleter import atexit histfile = os.path.join(os.environ["HOME"], ".rudrahistory") if os.path.isfile(histfile): readline.read_history_file(histfile) atexit.register(readline.write_history_file, histfile) r = self print "Use the \"r\" object to analyze files" vars = globals() vars.update(locals()) readline.set_completer(rlcompleter.Completer(vars).complete) readline.parse_and_bind("tab: complete") del os, histfile, readline, rlcompleter, atexit code.interact(banner="", local=vars)
def __init__(self, obj, func, *args, **kwargs): if not self._registered_with_atexit: # We may register the exit function more than once because # of a thread race, but that is harmless import atexit atexit.register(self._exitfunc) finalize._registered_with_atexit = True info = self._Info() info.weakref = ref(obj, self) info.func = func info.args = args info.kwargs = kwargs or None info.atexit = True info.index = next(self._index_iter) self._registry[self] = info finalize._dirty = True
def extract_chunks(files, num_images, out_path): with tqdm(total=num_images, unit='images', ncols=80, unit_scale=True) as pbar: process = tar(cat(files, _piped=True), 'xvz', _iter=True, _cwd=out_path) def kill(): try: process.kill() except: pass atexit.register(kill) for line in process: if line.strip().endswith('.jpg'): pbar.update(1)
def get(self, url): print("Downloading: '{}'".format(url)) try: process = sh.wget('-q', '-c', '--tries=3', url, _cwd=self.base, _bg=True) def kill(): try: process.kill() except: pass atexit.register(kill) process.wait() return DownloadResult(DOWNLOAD_SUCCESS, url, None) except Exception as e: return DownloadResult(DOWNLOAD_FAILURE, url, repr(self.e))
def __init__(self, *args, **kwds): ## Create shared memory for rendered image #pg.dbg(namespace={'r': self}) if sys.platform.startswith('win'): self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)]) self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows else: self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_') self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1)) fd = self.shmFile.fileno() self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE) atexit.register(self.close) GraphicsView.__init__(self, *args, **kwds) self.scene().changed.connect(self.update) self.img = None self.renderTimer = QtCore.QTimer() self.renderTimer.timeout.connect(self.renderView) self.renderTimer.start(16)
def __init__(self): self._pid = os.getpid() self._loop_lock = Lock() self._started = False self._shutdown = False self._thread = None self._timers = TimerManager() try: dispatcher = self._loop_dispatch_class() dispatcher.validate() log.debug("Validated loop dispatch with %s", self._loop_dispatch_class) except Exception: log.exception("Failed validating loop dispatch with %s. Using busy wait execution instead.", self._loop_dispatch_class) dispatcher.close() dispatcher = _BusyWaitDispatcher() self._loop_dispatcher = dispatcher atexit.register(partial(_cleanup, weakref.ref(self)))
def init(): """initialize the midi module pygame.midi.init(): return None Call the initialisation function before using the midi module. It is safe to call this more than once. """ global _init, _pypm if not _init: import pygame.pypm _pypm = pygame.pypm _pypm.Initialize() _init = True atexit.register(quit)
def generate_adhoc_ssl_context(): """Generates an adhoc SSL context for the development server.""" crypto = _get_openssl_crypto_module() import tempfile import atexit cert, pkey = generate_adhoc_ssl_pair() cert_handle, cert_file = tempfile.mkstemp() pkey_handle, pkey_file = tempfile.mkstemp() atexit.register(os.remove, pkey_file) atexit.register(os.remove, cert_file) os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) os.close(cert_handle) os.close(pkey_handle) ctx = load_ssl_context(cert_file, pkey_file) return ctx
def get_win_certfile(): global _wincerts if _wincerts is not None: return _wincerts.name try: from wincertstore import CertFile except ImportError: return None class MyCertFile(CertFile): def __init__(self, stores=(), certs=()): CertFile.__init__(self) for store in stores: self.addstore(store) self.addcerts(certs) atexit.register(self.close) _wincerts = MyCertFile(stores=['CA', 'ROOT']) return _wincerts.name
def set_roidb(self, roidb): """Set the roidb to be used by this layer during training.""" self._roidb = roidb self._shuffle_roidb_inds() if cfg.TRAIN.USE_PREFETCH: self._blob_queue = Queue(10) self._prefetch_process = BlobFetcher(self._blob_queue, self._roidb, self._num_classes) self._prefetch_process.start() # Terminate the child process when the parent exists def cleanup(): print 'Terminating BlobFetcher' self._prefetch_process.terminate() self._prefetch_process.join() import atexit atexit.register(cleanup)
def mounted_at(dev='', loopback=''): df = subprocess.check_output(['df']) if dev: fn = dev[dev.rfind('/')+1:] dev_or_loop = dev m = re.search('^' + dev + r'\s.*\s(\S+)$', df, flags=re.MULTILINE) elif loopback: dev_or_loop = loopback fn = loopback[loopback.rfind('/')+1:] m = re.search(r'\s(/lib/live/\S*' + fn + ')$', df, flags=re.MULTILINE) else: sys.exit('mounted_at() needs at least one arg') if (m): return m.group(1) else: target_mp = '/tmp/mbootuz-' + str(os.getpid()) + '-' + fn subprocess.call(['mkdir', target_mp]) try: subprocess.check_output(['mount', dev_or_loop, target_mp]) except subprocess.CalledProcessError as e: subprocess.call(['rmdir', target_mp]) sys.exit('mount failure [' + e.output + '], mbootuz aborted') atexit.register(cleanup, target_mp) return target_mp
def __init__(self, path, threaded=True, timeout=None): """ >>> lock = SQLiteLockFile('somefile') >>> lock = SQLiteLockFile('somefile', threaded=False) """ LockBase.__init__(self, path, threaded, timeout) self.lock_file = unicode(self.lock_file) self.unique_name = unicode(self.unique_name) if SQLiteLockFile.testdb is None: import tempfile _fd, testdb = tempfile.mkstemp() os.close(_fd) os.unlink(testdb) del _fd, tempfile SQLiteLockFile.testdb = testdb import sqlite3 self.connection = sqlite3.connect(SQLiteLockFile.testdb) c = self.connection.cursor() try: c.execute("create table locks" "(" " lock_file varchar(32)," " unique_name varchar(32)" ")") except sqlite3.OperationalError: pass else: self.connection.commit() import atexit atexit.register(os.unlink, SQLiteLockFile.testdb)
def _setup_window_style(self, info): if 'title' not in info: info['title'] = window__get_title(info['hwnd']) is_managed, remove_border, remove_title = self._get_managed_chrome_details(info) if is_managed: # print("DEBUG managed border {0}, title {1} for {2}".format(remove_border, remove_title, info)) hwnd = info['hwnd'] orig_size = window__border_rectangle(hwnd) orig_style = window__get_style(hwnd) self.__hwnd_restore_state[hwnd] = (orig_size, orig_style) # Always, always restore window state at exit. This ensures it. atexit.register(_restore_window_state, hwnd, orig_size, orig_style) style_data = {} if remove_title: style_data['border'] = False style_data['dialog-frame'] = False if remove_border: style_data['size-border'] = False if len(style_data) > 0: try: window__set_style(hwnd, style_data) except OSError as e: self._log_debug("Problem setting style for {0}".format(info['class']), e) window__redraw(hwnd)
def hook_keyboard(callback): handle = None def low_level_callback(code, rparam, lparam): try: key_code = 0xFFFFFFFF & lparam[0] # key code callback(key_code, rparam & 0xFFFFFFFF) finally: return CallNextHookEx(handle, code, rparam, lparam) # The really big problem is the callback_pointer. # Once it goes out of scope, it is destroyed callback_pointer = CFUNCTYPE(c_int, c_int, wintypes.HINSTANCE, POINTER(c_void_p))(low_level_callback) __CALLBACK_POINTERS.append(callback_pointer) handle = SetWindowsHookExA(WH_KEYBOARD_LL, callback_pointer, GetModuleHandleA(None), 0) atexit.register(UnhookWindowsHookEx, handle) return handle
def hook_keyboard(): def handler(key_code, event_code): print("{0} {1}".format(hex(key_code), hex(event_code))) handle = None def low_level_callback(code, rparam, lparam): try: key_code = 0xFFFFFFFF & lparam[0] # key code handler(key_code, rparam & 0xFFFFFFFF) finally: return CallNextHookEx(handle, code, rparam, lparam) callback_pointer = CFUNCTYPE(c_int, c_int, wintypes.HINSTANCE, POINTER(c_void_p))(low_level_callback) handle = SetWindowsHookExA(WH_KEYBOARD_LL, callback_pointer, GetModuleHandleA(None), 0) atexit.register(UnhookWindowsHookEx, handle) def on_exit(): pass funcs.shell__pump_messages(on_exit)
def __init__(self, fn, skip=0, filename=None, immediate=False, dirs=False, sort=None, entries=40): """Creates a profiler for a function. Every profiler has its own log file (the name of which is derived from the function name). FuncProfile registers an atexit handler that prints profiling information to sys.stderr when the program terminates. """ self.fn = fn self.skip = skip self.filename = filename self.immediate = immediate self.dirs = dirs self.sort = sort or ('cumulative', 'time', 'calls') if isinstance(self.sort, str): self.sort = (self.sort, ) self.entries = entries self.reset_stats() atexit.register(self.atexit)
def autoCompletion(sqlShell=False, osShell=False): # First of all we check if the readline is available, by default # it is not in Python default installation on Windows if not readline.haveReadline: return if sqlShell: completer = CompleterNG(queriesForAutoCompletion()) elif osShell: # TODO: add more operating system commands; differentiate commands # based on future operating system fingerprint completer = CompleterNG({ "id": None, "ifconfig": None, "ls": None, "netstat -natu": None, "pwd": None, "uname": None, "whoami": None, }) readline.set_completer(completer.complete) readline.parse_and_bind("tab: complete") loadHistory() atexit.register(saveHistory)
def register(self, name, serializer): """Register ``serializer`` object under ``name``. Raises :class:`AttributeError` if ``serializer`` in invalid. .. note:: ``name`` will be used as the file extension of the saved files. :param name: Name to register ``serializer`` under :type name: ``unicode`` or ``str`` :param serializer: object with ``load()`` and ``dump()`` methods """ # Basic validation getattr(serializer, 'load') getattr(serializer, 'dump') self._serializers[name] = serializer
def __init__(self, dsn: str, loop, prefix: str = "aiotasks"): AsyncTaskSubscribeRedis.__init__(self, dsn=dsn, prefix=prefix, loop=loop) AsyncTaskDelayRedis.__init__(self, dsn=dsn, prefix=prefix, loop=loop) AsyncTaskBase.__init__(self, dsn=dsn, loop=loop) # This line is necessary to close redis connections atexit.register(self.stop)
def setup(self): """ Initialization of third-party libraries Setting interpreter history. Setting appropriate completer function. :return: """ if not os.path.exists(self.history_file): open(self.history_file, 'a+').close() readline.read_history_file(self.history_file) readline.set_history_length(self.history_length) atexit.register(readline.write_history_file, self.history_file) readline.parse_and_bind('set enable-keypad on') readline.set_completer(self.complete) readline.set_completer_delims(' \t\n;') readline.parse_and_bind("tab: complete")
def __init__(self, filename, directory = "", header = [], delimiter='\t', clearFile=False, padding=0): self._header = ["[{:<{padding}}]".format(head, padding=padding) for head in header] self._filebacked = [] self._buffer = [] self._delimiter = delimiter self._file_path = os.path.join(directory, filename) self._padding = padding if directory and not (os.path.isdir(directory)): os.makedirs(directory) if os.path.isfile(self._file_path): with open(self._file_path, 'r', encoding='utf8') as file: self._filebacked = file.readlines() self._rewrite(clearFile) atexit.register(self.flush)
def init(autoreset=False, convert=None, strip=None, wrap=True): if not wrap and any([autoreset, convert, strip]): raise ValueError('wrap=False conflicts with any other arg=True') global wrapped_stdout, wrapped_stderr if sys.stdout is None: wrapped_stdout = None else: sys.stdout = wrapped_stdout = \ wrap_stream(orig_stdout, convert, strip, autoreset, wrap) if sys.stderr is None: wrapped_stderr = None else: sys.stderr = wrapped_stderr = \ wrap_stream(orig_stderr, convert, strip, autoreset, wrap) global atexit_done if not atexit_done: atexit.register(reset_all) atexit_done = True
def setup(): """Set up the display.""" global _is_setup, display if _is_setup: return True display = AlphaNum4(i2c=bus) display.begin() display.clear() display.show() atexit.register(_exit) _is_setup = True
def __init__(self): '''Creates a KBHit object that you can call to do various keyboard things. ''' if os.name == 'nt': pass else: # Save the terminal settings self.fd = sys.stdin.fileno() self.new_term = termios.tcgetattr(self.fd) self.old_term = termios.tcgetattr(self.fd) # New terminal setting unbuffered self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO) termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term) # Support normal-terminal reset at exit atexit.register(self.set_normal_term)
def input_file(self): if self._input_file is None: try: self._input_file = open(self.path, 'rb') except IOError as e: if e.strerror == 'Permission denied': print('Permission denied ({}). You must be sudo to access global events.'.format(self.path)) exit() def try_close(): try: self._input_file.close except: pass atexit.register(try_close) return self._input_file
def create_vm_powerstate_filter(pc, from_node): """ Create a filter spec to list to VM power state changes """ filterSpec = vmodl.query.PropertyCollector.FilterSpec() objSpec = vmodl.query.PropertyCollector.ObjectSpec(obj=from_node, selectSet=vm_folder_traversal()) filterSpec.objectSet.append(objSpec) # Add the property specs propSpec = vmodl.query.PropertyCollector.PropertySpec(type=vim.VirtualMachine, all=False) propSpec.pathSet.append(VM_POWERSTATE) filterSpec.propSet.append(propSpec) try: pcFilter = pc.CreateFilter(filterSpec, True) atexit.register(pcFilter.Destroy) return None except Exception as e: err_msg = "Problem creating PropertyCollector filter: {}".format(str(e)) logging.error(err_msg) return err_msg
def connectLocalSi(): ''' Initialize a connection to the local SI ''' global _service_instance if not _service_instance: try: logging.info("Connecting to the local Service Instance as 'dcui' ") # Connect to local server as user "dcui" since this is the Admin that does not lose its # Admin permissions even when the host is in lockdown mode. User "dcui" does not have a # password - it is used by local application DCUI (Direct Console User Interface) # Version must be set to access newer features, such as VSAN. _service_instance = pyVim.connect.Connect( host='localhost', user='dcui', version=newestVersions.Get('vim')) except Exception as e: logging.exception("Failed to create the local Service Instance as 'dcui', continuing... : ") return # set out ID in context to be used in request - so we'll see it in logs reqCtx = VmomiSupport.GetRequestContext() reqCtx["realUser"] = 'dvolplug' atexit.register(pyVim.connect.Disconnect, _service_instance)