Python gi.repository.GObject 模块,timeout_add_seconds() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用gi.repository.GObject.timeout_add_seconds()

项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def __init__(self, oneconfviewpickler):
        '''Controller of the installed pane'''

        LOG.debug("OneConf Handler init")
        super(OneConfHandler, self).__init__()

        # OneConf stuff
        self.oneconf = DbusConnect()
        self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed',
            self.refresh_hosts)
        self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed',
            self._on_store_packagelist_changed)
        self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed',
            self.on_new_latest_oneconf_sync_timestamp)
        self.already_registered_hostids = []
        self.is_current_registered = False

        self.oneconfviewpickler = oneconfviewpickler

        # refresh host list
        self._refreshing_hosts = False
        GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY,
            self.get_latest_oneconf_sync)
        GObject.idle_add(self.refresh_hosts)
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def __init__(self, oneconfviewpickler):
        '''Controller of the installed pane'''

        LOG.debug("OneConf Handler init")
        super(OneConfHandler, self).__init__()

        # OneConf stuff
        self.oneconf = DbusConnect()
        self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed',
            self.refresh_hosts)
        self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed',
            self._on_store_packagelist_changed)
        self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed',
            self.on_new_latest_oneconf_sync_timestamp)
        self.already_registered_hostids = []
        self.is_current_registered = False

        self.oneconfviewpickler = oneconfviewpickler

        # refresh host list
        self._refreshing_hosts = False
        GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY,
            self.get_latest_oneconf_sync)
        GObject.idle_add(self.refresh_hosts)
项目:handsup    作者:stuartlangridge    | 项目源码 | 文件源码
def main():
    global indicator, menu
    indicator = appindicator.Indicator.new(APPINDICATOR_ID, os.path.abspath('closed.svg'), appindicator.IndicatorCategory.SYSTEM_SERVICES)
    indicator.set_status(appindicator.IndicatorStatus.ACTIVE)

    pubnub = set_up_pubnub()

    menu = gtk.Menu()
    item = gtk.MenuItem('Quit')
    item.connect('activate', die, pubnub)
    menu.append(item)
    menu.show_all()

    indicator.set_menu(menu)
    indicator.set_icon(os.path.abspath("closed.svg"))

    notify.init(APPINDICATOR_ID)

    GObject.timeout_add_seconds(1, check_caps, pubnub)
    gtk.main()
项目:sbrick-controller    作者:wintersandroid    | 项目源码 | 文件源码
def set_sbrick(self, sbrick):
        self.sbrick = sbrick
        self.set_sensitive(sbrick is not None)
        if sbrick is not None:
            self.refresh_all_values()
            GObject.timeout_add_seconds(5, self.refresh_updating_values)
项目:oacids    作者:openaps    | 项目源码 | 文件源码
def Delay (self, seconds, reply_handler, error_handler):
        print "Sleeping for %ds" % seconds
        gobject.timeout_add_seconds (seconds,
                                     lambda: reply_handler (seconds))
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _ask_and_repair_broken_cache(self):
        # wait until the window window is available
        if self.window_main.props.visible == False:
            GObject.timeout_add_seconds(1, self._ask_and_repair_broken_cache)
            return
        if dialogs.confirm_repair_broken_cache(self.window_main,
                                                      self.datadir):
            self.backend.fix_broken_depends()
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def initiate_purchase(self, app, iconname, url=None, html=None):
        """
        initiates the purchase workflow inside the embedded webkit window
        for the item specified
        """
        if not self._ask_for_tos_acceptance_if_needed():
            self.emit("terms-of-service-declined")
            return False

        self.init_view()
        self.app = app
        self.iconname = iconname
        self.wk.webkit.load_html_string(self.LOADING_HTML, "file:///")
        self.wk.show()
        while Gtk.events_pending():
            Gtk.main_iteration()
        if url:
            self.wk.webkit.load_uri(url)
        elif html:
            self.wk.webkit.load_html_string(html, "file:///")
        else:
            self.wk.webkit.load_html_string(DUMMY_HTML, "file:///")
        self.pack_start(self.wk, True, True, 0)
        # only for debugging
        if os.environ.get("SOFTWARE_CENTER_DEBUG_BUY"):
            GObject.timeout_add_seconds(1, _generate_events, self)
        return True
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def get_test_window_purchaseview():
    #url = "http://www.animiertegifs.de/java-scripts/alertbox.php"
    url = "http://www.ubuntu.cohtml=DUMMY_m"
    #d = PurchaseDialog(app=None, url="http://spiegel.de")
    from softwarecenter.enums import BUY_SOMETHING_HOST
    url = BUY_SOMETHING_HOST + "/subscriptions/en/ubuntu/maverick/+new/?%s" % (
        urllib.urlencode({
                'archive_id': "mvo/private-test",
                'arch': "i386",
                }))
    # use cmdline if available
    if len(sys.argv) > 1:
        url = sys.argv[1]
    # useful for debugging
    #d.connect("key-press-event", _on_key_press)
    #GObject.timeout_add_seconds(1, _generate_events, d)

    widget = PurchaseView()
    from mock import Mock
    widget.config = Mock()

    win = Gtk.Window()
    win.set_data("view", widget)
    win.add(widget)
    win.set_size_request(600, 500)
    win.set_position(Gtk.WindowPosition.CENTER)
    win.show_all()
    win.connect('destroy', Gtk.main_quit)

    widget.initiate_purchase(app=None, iconname=None, url=url)
    #widget.initiate_purchase(app=None, iconname=None, html=DUMMY_HTML)

    return win
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def queue_next(self):
        self.cleanup_timeout()
        self._timeout = GObject.timeout_add_seconds(
                                    self.TIMEOUT_SECONDS, self.next_exhibit)
        return self._timeout
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def get_test_window():

    w = OneConfViews(Gtk.IconTheme.get_default())
    w.show()

    win = Gtk.Window()
    win.set_data("pane", w)
    win.add(w)
    win.set_size_request(400, 600)
    win.connect("destroy", lambda x: Gtk.main_quit())

    # init the view
    w.register_computer("AAAAA", "NameA")
    w.register_computer("ZZZZZ", "NameZ")
    w.register_computer("DDDDD", "NameD")
    w.register_computer("CCCCC", "NameC")
    w.register_computer("", "This computer should be first")
    w.select_first()

    GObject.timeout_add_seconds(5, w.register_computer, "EEEEE", "NameE")

    def print_selected_hostid(widget, hostid, hostname):
        print "%s selected for %s" % (hostid, hostname)

    w.connect("computer-changed", print_selected_hostid)

    w.remove_computer("DDDDD")
    win.show_all()
    return win
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
        event):
        if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
            return
        if self._timeout_id:
            GObject.source_remove(self._timeout_id)
            self._timeout_id = None
        self._timeout_id = GObject.timeout_add_seconds(10, self.open)
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def clear_token_from_ubuntu_sso_sync(appname):
    """ send a dbus signal to the com.ubuntu.sso service to clear
        the credentials for the given appname, e.g. _("Ubuntu Software Center")
        and wait for it to finish (or 2s)
    """
    from ubuntu_sso import (
        DBUS_BUS_NAME,
        DBUS_CREDENTIALS_IFACE,
        DBUS_CREDENTIALS_PATH,
        )
    # clean
    loop = GObject.MainLoop()
    bus = dbus.SessionBus()
    obj = bus.get_object(bus_name=DBUS_BUS_NAME,
                         object_path=DBUS_CREDENTIALS_PATH,
                         follow_name_owner_changes=True)
    proxy = dbus.Interface(object=obj,
                           dbus_interface=DBUS_CREDENTIALS_IFACE)
    proxy.connect_to_signal("CredentialsCleared", loop.quit)
    proxy.connect_to_signal("CredentialsNotFound", loop.quit)
    proxy.connect_to_signal("CredentialsError", loop.quit)
    proxy.clear_credentials(appname, {})
    # ensure we don't hang forever here
    GObject.timeout_add_seconds(2, loop.quit)
    # run the mainloop until the credentials are clear
    loop.run()
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def __init__(self, db):
        self.db = db
        self.distro = get_distro()
        self.backend = get_install_backend()
        self.backend.connect("channels-changed",
                             self._remove_no_longer_needed_extra_channels)
        # kick off a background check for changes that may have been made
        # in the channels list
        GObject.timeout_add_seconds(60, self._check_for_channel_updates_timer)
        # extra channels from e.g. external sources
        self.extra_channels = []
        self._logger = LOG

    # external API
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _ask_and_repair_broken_cache(self):
        # wait until the window window is available
        if self.window_main.props.visible == False:
            GObject.timeout_add_seconds(1, self._ask_and_repair_broken_cache)
            return
        if dialogs.confirm_repair_broken_cache(self.window_main,
                                                      self.datadir):
            self.backend.fix_broken_depends()
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def initiate_purchase(self, app, iconname, url=None, html=None):
        """
        initiates the purchase workflow inside the embedded webkit window
        for the item specified
        """
        if not self._ask_for_tos_acceptance_if_needed():
            self.emit("terms-of-service-declined")
            return False

        self.init_view()
        self.app = app
        self.iconname = iconname
        self.wk.webkit.load_html_string(self.LOADING_HTML, "file:///")
        self.wk.show()
        while Gtk.events_pending():
            Gtk.main_iteration()
        if url:
            self.wk.webkit.load_uri(url)
        elif html:
            self.wk.webkit.load_html_string(html, "file:///")
        else:
            self.wk.webkit.load_html_string(DUMMY_HTML, "file:///")
        self.pack_start(self.wk, True, True, 0)
        # only for debugging
        if os.environ.get("SOFTWARE_CENTER_DEBUG_BUY"):
            GObject.timeout_add_seconds(1, _generate_events, self)
        return True
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def get_test_window_purchaseview():
    #url = "http://www.animiertegifs.de/java-scripts/alertbox.php"
    url = "http://www.ubuntu.cohtml=DUMMY_m"
    #d = PurchaseDialog(app=None, url="http://spiegel.de")
    from softwarecenter.enums import BUY_SOMETHING_HOST
    url = BUY_SOMETHING_HOST + "/subscriptions/en/ubuntu/maverick/+new/?%s" % (
        urllib.urlencode({
                'archive_id': "mvo/private-test",
                'arch': "i386",
                }))
    # use cmdline if available
    if len(sys.argv) > 1:
        url = sys.argv[1]
    # useful for debugging
    #d.connect("key-press-event", _on_key_press)
    #GObject.timeout_add_seconds(1, _generate_events, d)

    widget = PurchaseView()
    from mock import Mock
    widget.config = Mock()

    win = Gtk.Window()
    win.set_data("view", widget)
    win.add(widget)
    win.set_size_request(600, 500)
    win.set_position(Gtk.WindowPosition.CENTER)
    win.show_all()
    win.connect('destroy', Gtk.main_quit)

    widget.initiate_purchase(app=None, iconname=None, url=url)
    #widget.initiate_purchase(app=None, iconname=None, html=DUMMY_HTML)

    return win
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def queue_next(self):
        self.cleanup_timeout()
        self._timeout = GObject.timeout_add_seconds(
                                    self.TIMEOUT_SECONDS, self.next_exhibit)
        return self._timeout
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def get_test_spinner_window():
    label = Gtk.Label("foo")
    spinner_notebook = SpinnerNotebook(label, "random msg")

    window = Gtk.Window()
    window.add(spinner_notebook)
    window.set_size_request(600, 500)
    window.set_position(Gtk.WindowPosition.CENTER)
    window.show_all()
    window.connect('destroy', Gtk.main_quit)
    spinner_notebook.show_spinner("Loading for 1s ...")
    GObject.timeout_add_seconds(1, lambda: spinner_notebook.hide_spinner())
    return window
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
        event):
        if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
            return
        if self._timeout_id:
            GObject.source_remove(self._timeout_id)
            self._timeout_id = None
        self._timeout_id = GObject.timeout_add_seconds(10, self.open)
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def clear_token_from_ubuntu_sso_sync(appname):
    """ send a dbus signal to the com.ubuntu.sso service to clear
        the credentials for the given appname, e.g. _("Ubuntu Software Center")
        and wait for it to finish (or 2s)
    """
    from ubuntu_sso import (
        DBUS_BUS_NAME,
        DBUS_CREDENTIALS_IFACE,
        DBUS_CREDENTIALS_PATH,
        )
    # clean
    loop = GObject.MainLoop()
    bus = dbus.SessionBus()
    obj = bus.get_object(bus_name=DBUS_BUS_NAME,
                         object_path=DBUS_CREDENTIALS_PATH,
                         follow_name_owner_changes=True)
    proxy = dbus.Interface(object=obj,
                           dbus_interface=DBUS_CREDENTIALS_IFACE)
    proxy.connect_to_signal("CredentialsCleared", loop.quit)
    proxy.connect_to_signal("CredentialsNotFound", loop.quit)
    proxy.connect_to_signal("CredentialsError", loop.quit)
    proxy.clear_credentials(appname, {})
    # ensure we don't hang forever here
    GObject.timeout_add_seconds(2, loop.quit)
    # run the mainloop until the credentials are clear
    loop.run()
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def __init__(self, db):
        self.db = db
        self.distro = get_distro()
        self.backend = get_install_backend()
        self.backend.connect("channels-changed",
                             self._remove_no_longer_needed_extra_channels)
        # kick off a background check for changes that may have been made
        # in the channels list
        GObject.timeout_add_seconds(60, self._check_for_channel_updates_timer)
        # extra channels from e.g. external sources
        self.extra_channels = []
        self._logger = LOG

    # external API
项目:pokemon-go-status    作者:sousatg    | 项目源码 | 文件源码
def change_app_icon(self):
        self.indicator.set_icon( self.get_icon() )
        gobject.timeout_add_seconds( 60, self.update_server_status_icon )
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def start_session(self, host, port, session, username, password, wait):
        """ Start a session using qtnx """

        self.state = "connecting"
        if not os.path.exists(os.path.expanduser('~/.qtnx')):
            os.mkdir(os.path.expanduser('~/.qtnx'))

        # Generate qtnx's configuration file
        filename = os.path.expanduser('~/.qtnx/%s-%s-%s.nxml') % (
            host, port, session.replace("/", "_"))
        nxml = open(filename, "w+")
        config = self.NXML_TEMPLATE
        config = config.replace("WL_NAME", "%s-%s-%s" % (host, port,
            session.replace("/", "_")))
        config = config.replace("WL_SERVER", host)
        config = config.replace("WL_PORT", str(port))
        config = config.replace("WL_COMMAND", "weblive-session %s" % session)
        nxml.write(config)
        nxml.close()

        # Prepare qtnx call
        cmd = [self.BINARY_PATH,
               '%s-%s-%s' % (str(host), str(port), session.replace("/", "_")),
               username,
               password]

        def qtnx_countdown():
            """ Send progress events every two seconds """

            if self.helper_progress == 10:
                self.state = "connected"
                self.emit("connected", False)
                return False
            else:
                self.emit("progress", self.helper_progress * 10)
                self.helper_progress += 1
                return True

        def qtnx_start_timer():
            """ As we don't have a way of knowing the connection
                status, we countdown from 20s
            """

            self.helper_progress = 0
            qtnx_countdown()
            GObject.timeout_add_seconds(2, qtnx_countdown)

        qtnx_start_timer()

        if wait == False:
            # Start in the background and attach a watch for when it exits
            (self.helper_pid, stdin, stdout, stderr) = GObject.spawn_async(
                cmd, standard_input=True, standard_output=True,
                standard_error=True, flags=GObject.SPAWN_DO_NOT_REAP_CHILD)
            GObject.child_watch_add(self.helper_pid, self._on_qtnx_exit,
                filename)
        else:
            # Start it and wait till it finishes
            p = subprocess.Popen(cmd)
            p.wait()