Python gobject 模块,MainLoop() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用gobject.MainLoop()

项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the loop that the application runs in
        GObject.threads_init()
        dbus.mainloop.glib.threads_init()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero/application{}'.format(id(self))
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Initialise services within the application
        self.services = []

        self.dongle = adapter.Adapter(device_id)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, adapter_addr, device_addr):
        """Default initialiser.

        Creates object for the specified remote Bluetooth device.
        This is on the specified adapter specified.

        :param adapter_addr: Address of the local Bluetooth adapter.
        :param device_addr: Address of the remote Bluetooth device.
        """
        self.bus = dbus.SystemBus()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.mainloop = GObject.MainLoop()

        device_path = dbus_tools.get_dbus_path(adapter_addr, device_addr)

        self.remote_device_path = device_path
        self.remote_device_obj = self.bus.get_object(
            constants.BLUEZ_SERVICE_NAME,
            self.remote_device_path)
        self.remote_device_methods = dbus.Interface(
            self.remote_device_obj,
            constants.DEVICE_INTERFACE)
        self.remote_device_props = dbus.Interface(self.remote_device_obj,
                                                  dbus.PROPERTIES_IFACE)
项目:BMW-RPi-iBUS    作者:KLUSEK    | 项目源码 | 文件源码
def main():
    global bluetooth
    global ibus

    bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged)

    ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
    ibus.cmd = ibus_.IBUSCommands(ibus)

    ibus.main_thread = threading.Thread(target=ibus.start)
    ibus.main_thread.daemon = True
    ibus.main_thread.start()

    try:
        mainloop = GObject.MainLoop()
        mainloop.run()
    except KeyboardInterrupt:
        pass
    except:
        print("Unable to run the gobject main loop")

    print("")
    shutdown()
    sys.exit(0)
项目:BMW-RPi-iBUS    作者:KLUSEK    | 项目源码 | 文件源码
def main():
    global ibus

    ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket)
    ibus.cmd = ibus_.IBUSCommands(ibus)

    ibus.main_thread = threading.Thread(target=ibus.start)
    ibus.main_thread.daemon = True
    ibus.main_thread.start()

    try:
        mainloop = GObject.MainLoop()
        mainloop.run()
    except KeyboardInterrupt:
        pass
    except:
        print("Unable to run the gobject main loop")

    print("")
    shutdown()
    sys.exit(0)
项目:senic-hub    作者:getsenic    | 项目源码 | 文件源码
def run(self):
        """
        Registers advertisement and services to D-Bus and starts the main loop.
        """
        if self._main_loop:
            return
        self._main_loop = GObject.MainLoop()
        self._disconnect_all()
        self._register()
        logger.info("--- Mainloop started ---")
        try:
            self._main_loop.run()
        except KeyboardInterrupt:
            # ignore exception as it is a valid way to exit the program
            # and skip to finally clause
            pass
        except Exception as e:
            logger.error(e)
        finally:
            logger.info("--- Mainloop finished ---")
            self._unregister()
            self._main_loop.quit()
            self._main_loop = None
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, useGtk=True):
        self.context = gobject.main_context_default()
        self.loop = gobject.MainLoop()
        posixbase.PosixReactorBase.__init__(self)
        # pre 2.3.91 the glib iteration and mainloop functions didn't release
        # global interpreter lock, thus breaking thread and signal support.
        if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
            and not useGtk):
            self.__pending = self.context.pending
            self.__iteration = self.context.iteration
            self.__crash = self.loop.quit
            self.__run = self.loop.run
        else:
            import gtk
            self.__pending = gtk.events_pending
            self.__iteration = gtk.main_iteration
            self.__crash = _our_mainquit
            self.__run = gtk.main

    # The input_add function in pygtk1 checks for objects with a
    # 'fileno' method and, if present, uses the result of that method
    # as the input source. The pygtk2 input_add does not do this. The
    # function below replicates the pygtk1 functionality.

    # In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
    # g_io_add_watch() takes different condition bitfields than
    # gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
    # bug.
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
    args = parser.parse_args()
    adapter_name = args.adapter_name

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    mainloop = GObject.MainLoop()

    advertising.advertising_main(mainloop, bus, adapter_name)
    gatt_server.gatt_server_main(mainloop, bus, adapter_name)
    mainloop.run()
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def __init__(self):
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    self.mainloop = gobject.MainLoop()
    bus = dbus.SystemBus()
        proxy = bus.get_object("org.bluez", "/")
    manager = dbus.Interface(proxy, "org.bluez.Manager")
    adapter_path = manager.DefaultAdapter()
        proxy = bus.get_object("org.bluez", adapter_path)
    self.adapter = dbus.Interface(proxy, "org.bluez.Adapter")
    self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def __init__(self):
    self.mainloop = gobject.MainLoop()
    bus = dbus.SystemBus()
        proxy = bus.get_object("org.bluez", "/")
    manager = dbus.Interface(proxy, "org.bluez.Manager")
    adapter_path = manager.DefaultAdapter()
        proxy = bus.get_object("org.bluez", adapter_path)
    self.adapter = dbus.Interface(proxy, "org.bluez.Adapter")
    self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
项目:Steam-Supervisor-for-Linux    作者:prozacgod    | 项目源码 | 文件源码
def run(self):
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus())
    dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor")

    self._loop = gobject.MainLoop()
    print "Service running..."
    self._loop.run()
    print "Service stopped"
项目:pywificontrol    作者:emlid    | 项目源码 | 文件源码
def __init__(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.bus = dbus.SystemBus()
        self._mainloop = GObject.MainLoop()

        self.wifi_manager = WiFiControl()

        self.callbacks = {}

        self.current_state = self.OFF_STATE
        self.current_ssid = None

        self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, useGtk=True):
        self.context = gobject.main_context_default()
        self.loop = gobject.MainLoop()
        posixbase.PosixReactorBase.__init__(self)
        # pre 2.3.91 the glib iteration and mainloop functions didn't release
        # global interpreter lock, thus breaking thread and signal support.
        if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91)
            and not useGtk):
            self.__pending = self.context.pending
            self.__iteration = self.context.iteration
            self.__crash = self.loop.quit
            self.__run = self.loop.run
        else:
            import gtk
            self.__pending = gtk.events_pending
            self.__iteration = gtk.main_iteration
            self.__crash = _our_mainquit
            self.__run = gtk.main

    # The input_add function in pygtk1 checks for objects with a
    # 'fileno' method and, if present, uses the result of that method
    # as the input source. The pygtk2 input_add does not do this. The
    # function below replicates the pygtk1 functionality.

    # In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and
    # g_io_add_watch() takes different condition bitfields than
    # gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this
    # bug.
项目:bluetool    作者:emlid    | 项目源码 | 文件源码
def __init__(
            self, client_class, timeout=180, capability="KeyboardDisplay",
            path="/org/bluez/my_bluetooth_agent"):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.client_class = client_class
        self.timeout = timeout
        self.capability = capability
        self.path = path
        self._bus = dbus.SystemBus()
        self._mainloop = GObject.MainLoop()
        _bluetooth.make_discoverable(False)
项目:bluetool    作者:emlid    | 项目源码 | 文件源码
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1):
        self._spp = SerialPort(channel)
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.service.Object.__init__(
            self, dbus.SystemBus(), self._spp.profile_path)
        self.tcp_port_in = tcp_port_in
        self.tcp_port_out = tcp_port_out
        self._mainloop = GObject.MainLoop()
项目:Bluetooth-Low-Energy-LED-Matrix    作者:WIStudent    | 项目源码 | 文件源码
def main():
    global mainloop
    global display

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    # Get ServiceManager and AdvertisingManager
    service_manager = get_service_manager(bus)
    ad_manager = get_ad_manager(bus)

    # Create gatt services
    display = setup_display()
    app = LedApplication(bus, display)

    # Create advertisement
    test_advertisement = LedAdvertisement(bus, 0)

    mainloop = GObject.MainLoop()

    # Register gatt services
    service_manager.RegisterApplication(app.get_path(), {},
                                        reply_handler=register_app_cb,
                                        error_handler=register_app_error_cb)

    # Register advertisement
    ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        display.clear()
        display.write_display()
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    mainloop.run()
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez")
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    try:
        mainloop.run()
    except (KeyboardInterrupt, SystemExit):
        closeThreads()
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def bluetoothConnection():
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1")
    profile_path = "/foo/bar/profile"
    auto_connect = {"AutoConnect": False}
    profile_uuid = "1101"
    profile = Profile(bus, profile_path)
    profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect)
    mainloop = GObject.MainLoop()
    mainloop.run()
项目:senic-hub    作者:getsenic    | 项目源码 | 文件源码
def _listen_for_wifi_state_changes(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        def print_status(status, nm_state):
            if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED):
                logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid))
            else:
                logger.info("Wifi status changed: %s (%d)" % (status, nm_state))

        def on_state_changed(nm_instance, nm_state, **kwargs):

            if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL:
                new_status = WifiConnectionState.CONNECTED
            elif nm_state > NetworkManager.NM_STATE_DISCONNECTING:
                new_status = WifiConnectionState.CONNECTING
            else:
                new_status = WifiConnectionState.DISCONNECTED

            self._update_current_ssid()

            if new_status == self._wifi_status:
                return

            print_status(new_status, nm_state)
            self._wifi_status = new_status
            self._on_wifi_status_changed()

        # check initial status:
        initial_state = NetworkManager.NetworkManager.State
        on_state_changed(None, initial_state)

        # listen for changes:
        NetworkManager.NetworkManager.OnStateChanged(on_state_changed)
        logger.debug("Start listening to network status changes")
        # Attention: a GObject.MainLoop() is required for this to work
        # in this case it is started by the BLE Peripheral object
项目:senic-hub    作者:getsenic    | 项目源码 | 文件源码
def run(self):
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        # check initial status:
        state = NetworkManager.NetworkManager.State
        self._on_state_changed(None, state)

        # listen for changes:
        NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed)
        logger.debug("Start listening to network status changes")
        loop = GObject.MainLoop()
        loop.run()
项目:rebus    作者:airbus-seclab    | 项目源码 | 文件源码
def run(cls, store, master_options):
        gobject.threads_init()
        dbus.glib.init_threads()
        DBusGMainLoop(set_as_default=True)

        bus = dbus.SessionBus()
        name = dbus.service.BusName("com.airbus.rebus.bus", bus)
        svc = cls(bus, "/bus", store)

        svc.mainloop = gobject.MainLoop()
        log.info("Entering main loop.")
        try:
            svc.mainloop.run()
        except (KeyboardInterrupt, SystemExit):
            if len(svc.clients) > 0:
                log.info("Trying to stop all agents properly. Press Ctrl-C "
                         "again to stop.")
                # stop scheduler
                svc.sched.shutdown()
                # ask slave agents to shutdown nicely & save internal state
                log.info("Expecting %u more agents to exit (ex. %s)",
                         len(svc.clients), svc.clients.keys()[0])
                svc.bus_exit(store.STORES_INTSTATE)
                store.store_state()
                try:
                    svc.mainloop.run()
                except (KeyboardInterrupt, SystemExit):
                    if len(svc.clients) > 0:
                        log.info(
                            "Not all agents have stopped, exiting nonetheless")
        log.info("Stopping storage...")
        store.store_state()
项目:rebus    作者:airbus-seclab    | 项目源码 | 文件源码
def run_agents(self):
        self.agent.run_and_catch_exc()
        if self.agent.__class__.run != Agent.run:
            # the run() method has been overridden - agent will run on his own
            # then quit
            self.iface.unregister(self.agent_id)
            return
        log.info("Entering agent loop")
        self.loop = gobject.MainLoop()
        try:
            self.loop.run()
        except (KeyboardInterrupt, SystemExit):
            for args in self.agent.held_locks:
                self.agent.unlock(*args)
            self.loop.quit()
        # Clean up signals - useful for tests, where one process runs several
        # agents successively
        self.bus.remove_signal_receiver(self.broadcast_wrapper,
                                        dbus_interface="com.airbus.rebus.bus",
                                        signal_name="new_descriptor")
        self.bus.remove_signal_receiver(self.targeted_wrapper,
                                        dbus_interface="com.airbus.rebus.bus",
                                        signal_name="targeted_descriptor")
        self.bus.remove_signal_receiver(self.bus_exit_handler,
                                        dbus_interface="com.airbus.rebus.bus",
                                        signal_name="bus_exit")
        self.iface.unregister(self.agent_id)
        self.agent.save_internal_state()

    # DBus specific functions
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def main_loop(self):
    return gobject.MainLoop()
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def run(self):
    """
    Starts phony service which manages device pairing and setting
    up of hands-free profile services.  This function never returns.
    """
    bus = phony.base.ipc.BusProvider()

    # Find the first audio card that provides
    # audio input and output mixers.
    audio_card_index = -1

    with phony.bluetooth.adapters.Bluez5(bus) as adapter, \
         phony.bluetooth.profiles.handsfree.Ofono(bus) as hfp, \
         phony.audio.alsa.Alsa(card_index=audio_card_index) as audio, \
         phony.headset.HandsFreeHeadset(bus, adapter, hfp, audio) as hs:

      # Register to receive some bluetooth events
      hs.on_device_connected(self.device_connected)
      hs.on_incoming_call(self.incoming_call)
      hs.on_call_began(self.call_began)
      hs.on_call_ended(self.call_ended)

      hs.start('MyBluetoothHeadset', pincode='1234')
      hs.enable_pairability(timeout=30)

      self._hs = hs

      # Wait forever
      gobject.MainLoop().run()

  #
  # Call these from your event handlers
  #
项目:Alexa_MMDAgent    作者:jianmliu    | 项目源码 | 文件源码
def __init__(self, service):
        super(MainLoop, self).__init__()
        self.__service = service
项目:Alexa_MMDAgent    作者:jianmliu    | 项目源码 | 文件源码
def run(self):
        gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_HUP, self.__stdin_cb)
        super(MainLoop, self).run()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self):

            self.main_loop = gobject.MainLoop()            

            # Create a window with a horizontal scale.
            self.wnd = gtk.Window()
            self.wnd.set_default_size(640, 480)
            self.wnd.set_title('Have fun with the transparency slider')

            hscale = gtk.HScale()
            hscale.set_digits(0)
            hscale.set_increments(1, 10)
            hscale.set_range(0, 100)
            hscale.set_value(100)

            hscale.connect('value_changed', self.set_window_alpha)

            self.wnd.add(hscale)

            # Note: gtk window must be realized before installing extensions.
            self.wnd.realize()
            self.wnd.show_all()

            self.win32ext = GTKWin32Ext(self.wnd)

            self.win32ext.add_notify_icon()

            # GTK menus from the notify icon!
            menu = gtk.Menu()

            menu_item = gtk.MenuItem('Baloons!')
            menu_item.connect_object('activate', self.menu_cb, self.wnd)
            menu.append(menu_item)

            menu_item = gtk.MenuItem('Fadeout Window')
            menu_item.connect('activate', self.fadeoutwindow)
            menu.append(menu_item)

            menu_item = gtk.MenuItem('Window Disappeared?')
            menu_item.connect('activate', self.fadeinwindow)
            menu.append(menu_item)

            menu.show_all()
            self.win32ext.notify_icon.menu = menu

            # Set up the callback messages
            self.win32ext.message_map({
                WM_TRAYMESSAGE: self.on_notifyicon_activity
                })