Python gi.repository.Gio 模块,bus_get_sync() 实例源码

我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用gi.repository.Gio.bus_get_sync()

项目:rauc-hawkbit    作者:rauc    | 项目源码 | 文件源码
def __init__(self):
        self.logger = logging.getLogger('rauc_hawkbit')
        self.dbus_events = asyncio.Queue()
        loop = asyncio.get_event_loop()
        # handle dbus events in async way
        self.dbus_event_task = loop.create_task(self.handle_dbus_event())
        # holds active subscriptions
        self.signal_subscriptions = []
        # ({interface}, {signal}): {callback}
        self.signal_callbacks = {}
        # ({interface}, {property}): {callback}
        self.property_callbacks = {}

        self.system_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)

        # always subscribe to property changes by default
        self.new_signal_subscription('org.freedesktop.DBus.Properties',
                                     'PropertiesChanged',
                                     self.property_changed_callback)
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def __init__(self, name, dbus_name, skeleton):
        self._cb_registery = dict()
        self._obj_registery = dict()
        self._dbus_name = build_dbus_name(dbus_name, name)
        self._interface_skeleton_object = skeleton

        self.con = Gio.bus_get_sync(BUS_TYPE, None)

        dbus_path = BASE_DBUS_PATH # build_dbus_path(name)

        logger.debug('Producer ObjectManagerServer dbus: %s, %s', dbus_path, self._dbus_name)
        self._object_manager = Gio.DBusObjectManagerServer(object_path=dbus_path)
        self._object_manager.set_connection(self.con)

        Gio.bus_own_name_on_connection(
            self.con, self._dbus_name, Gio.BusNameOwnerFlags.NONE, None, None)
项目:cozy    作者:geigi    | 项目源码 | 文件源码
def __init__(self, app, ui):
        self.__app = app
        self.__ui = ui
        self.__rating = None
        self.__cozy_id = 0
        self.__metadata = {"mpris:trackid": GLib.Variant(
            "o",
            "/org/mpris/MediaPlayer2/TrackList/NoTrack")}
        self.__track_id = self.__get_media_id(0)
        self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
        Gio.bus_own_name_on_connection(self.__bus,
                                       self.__MPRIS_COZY,
                                       Gio.BusNameOwnerFlags.NONE,
                                       None,
                                       None)
        Server.__init__(self, self.__bus, self.__MPRIS_PATH)

        bus = get_gst_bus()
        bus.connect("message", self.__on_gst_message)

        #Lp().player.connect("current-changed", self.__on_current_changed)
        #Lp().player.connect("seeked", self.__on_seeked)
        #Lp().player.connect("status-changed", self.__on_status_changed)
        #Lp().player.connect("volume-changed", self.__on_volume_changed)
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def __init__(self):
        self.con = Gio.bus_get_sync(Gio.BusType.SESSION, None)

        Gio.bus_own_name_on_connection(
            self.con, 'com.endlessm.EknSubscriptionsDownloader', Gio.BusNameOwnerFlags.NONE, None, None)

        self.con.register_object(
            object_path='/com/endlessm/EknSubscriptionsDownloader',
                                 interface_info=IFACE_INFO, method_call_closure=self._on_method_call)

        self._consumer = Consumer(name=SUBSCRIPTIONS_INSTALLED)
        self._consumer.connect('data', self._on_data)

        # FIXME: Port to GNotification instead
        self._notification = Notify.Notification.new("", "")
项目:display-config    作者:xytovl    | 项目源码 | 文件源码
def __init__(self):
        self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
        self.proxy = Gio.DBusProxy.new_sync(
            self.bus, Gio.DBusProxyFlags.NONE, None,
            'org.gnome.Mutter.DisplayConfig',
            '/org/gnome/Mutter/DisplayConfig',
            'org.gnome.Mutter.DisplayConfig', None)
        self.get_resources()