Python gi.repository.GLib 模块,PRIORITY_DEFAULT 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用gi.repository.GLib.PRIORITY_DEFAULT

项目:gdebi    作者:linuxmint    | 项目源码 | 文件源码
def _run_lintian(self, filename):
        buf = self.textview_lintian_output.get_buffer()
        if not os.path.exists("/usr/bin/lintian"):
            buf.set_text(
                _("No lintian available.\n"
                  "Please install using sudo apt-get install lintian"))
            return
        buf.set_text(_("Running lintian..."))
        self._lintian_output = ""
        self._lintian_exit_status = None
        self._lintian_exit_status_gathered = None
        cmd = ["/usr/bin/lintian", filename]
        (pid, stdin, stdout, stderr) = GLib.spawn_async(
            cmd, flags=GObject.SPAWN_DO_NOT_REAP_CHILD,
            standard_output=True, standard_error=True)
        for fd in [stdout, stderr]:
            channel = GLib.IOChannel(filedes=fd)
            channel.set_flags(GLib.IOFlags.NONBLOCK)
            GLib.io_add_watch(channel, GLib.PRIORITY_DEFAULT,
                              GLib.IOCondition.IN | GLib.IO_ERR | GLib.IO_HUP,
                              self._on_lintian_output)
        GLib.child_watch_add(GLib.PRIORITY_DEFAULT, pid,
                             self._on_lintian_finished)
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def connect(self, connectionInfo, elementListener, onConnected):
        super(GLibUnixTransport, self).connect(
            connectionInfo, elementListener, onConnected)

        fd = self._socket.fileno()
        io_channel = GLib.IOChannel.unix_new(fd)
        self._watch_id = GLib.io_add_watch(
            io_channel, GLib.PRIORITY_DEFAULT, GLib.IO_IN, self._socket_ready)
项目:Tiltometer    作者:djcopley    | 项目源码 | 文件源码
def mainloop_do(callback, *args, **kwargs):

    """ Screen update loop - adds process to main thread """

    def cb(_none):
        callback(*args, **kwargs)
        return False

    Gdk.threads_add_idle(GLib.PRIORITY_DEFAULT, cb, None)
项目:cozy    作者:geigi    | 项目源码 | 文件源码
def search(self, user_search):
        """
        Perform a search with the current search entry
        """
        # we need the main context to call methods in the main thread after the search is finished
        main_context = GLib.MainContext.default()

        books = db.search_books(user_search)
        if self.search_thread_stop.is_set():
            return
        main_context.invoke_full(
            GLib.PRIORITY_DEFAULT, self.__on_book_search_finished, books)

        authors = db.search_authors(user_search)
        if self.search_thread_stop.is_set():
            return
        main_context.invoke_full(
            GLib.PRIORITY_DEFAULT, self.__on_author_search_finished, authors)

        readers = db.search_readers(user_search)
        if self.search_thread_stop.is_set():
            return
        main_context.invoke_full(
            GLib.PRIORITY_DEFAULT, self.__on_reader_search_finished, readers)

        if readers.count() < 1 and authors.count() < 1 and books.count() < 1:
            main_context.invoke_full(
                GLib.PRIORITY_DEFAULT, self.search_stack.set_visible_child_name, "nothing")
项目:sleep-inhibit    作者:mssever    | 项目源码 | 文件源码
def on_pct_change(self, button):
        '''Callback for the battery percentage spinner'''
        def change_timeout():#dialog, button):
            config = get_config()
            new_value = button.get_value_as_int()
            old_value = self.old_pct_value
            if (new_value != old_value) and (config.batt_percent != new_value):
                self.old_value = new_value
                config.batt_percent = new_value
                config.save_settings()
                self.parent.restart_inhibitor()
            return False # Prevent timeout from repeating
        GLib.timeout_add(priority=GLib.PRIORITY_DEFAULT,
                         interval=5000,
                         function=change_timeout)
项目:x112v4l2    作者:romlok    | 项目源码 | 文件源码
def start_process(self):
        """
            Start the ffmpeg subprocess
        """
        if self.process and self.process.poll() is None:
            raise RuntimeError('Refusing to start process when already running')

        cmd = self.get_process_command()
        self.process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdin=subprocess.DEVNULL,
        )
        # Make the pipes non-blocking
        flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        flags = fcntl.fcntl(self.process.stderr, fcntl.F_GETFL)
        fcntl.fcntl(self.process.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        # Clear any output from previous incarnations
        self.clear_process_stdout()
        self.clear_process_stderr()

        # Update the UI when there's output from the process
        def output_callback(fd, condition, pipe, func):
            """ Read from pipe, and pass to the given func """
            output = pipe.read()
            if not output:
                return False
            func(output.decode('utf-8'))
            return condition != GLib.IO_HUP

        stdout_read_cb = GLib.io_add_watch(
            self.process.stdout,
            GLib.PRIORITY_DEFAULT,
            GLib.IO_IN | GLib.IO_HUP,
            output_callback,
            self.process.stdout,
            self.append_process_stdout,
        )
        GLib.io_add_watch(
            self.process.stderr,
            GLib.PRIORITY_DEFAULT,
            GLib.IO_IN | GLib.IO_HUP,
            output_callback,
            self.process.stderr,
            self.append_process_stderr,
        )

        # Update the UI
        self.show_process_state()