Python dbus 模块,UInt32() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用dbus.UInt32()

项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def unwrap(self, val):
        if isinstance(val, dbus.ByteArray):
            return "".join([str(x) for x in val])
        if isinstance(val, (dbus.Array, list, tuple)):
            return [self.unwrap(x) for x in val]
        if isinstance(val, (dbus.Dictionary, dict)):
            return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()])
        if isinstance(val, dbus.ObjectPath):
            if val.startswith('/org/freedesktop/NetworkManager/'):
                classname = val.split('/')[4]
                classname = {
                    'Settings': 'Connection',
                    'Devices': 'Device',
                }.get(classname, classname)
                return globals()[classname](val)
        if isinstance(val, (dbus.Signature, dbus.String)):
            return unicode(val)
        if isinstance(val, dbus.Boolean):
            return bool(val)
        if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
            return int(val)
        if isinstance(val, dbus.Byte):
            return bytes([int(val)])
        return val
项目:ubuntu-cleaner    作者:gerardpuig    | 项目源码 | 文件源码
def _check_permission(self, sender, action):
        '''
        Verifies if the specified action is permitted, and raises
        an AccessDeniedException if not.

        The caller should use ObtainAuthorization() to get permission.
        '''

        try:
            if sender:
                kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
                kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')

                (granted, _, details) = kit.CheckAuthorization(
                                ('system-bus-name', {'name': sender}),
                                action, {}, dbus.UInt32(1), '', timeout=600)

                if not granted:
                    raise AccessDeniedException('Session not authorized by PolicyKit')

        except AccessDeniedException:
            raise

        except dbus.DBusException, ex:
            raise AccessDeniedException(ex.message)
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def add_service_type(self, type):
        interface, protocol, domain = (
            self.interface, self.protocol, self.domain)
        # Are we already browsing this domain for this type?
        if self.already_browsing(type):
            return

        logger.info("Browsing for services of type '%s' in domain '%s' on %s.%i ..." %
                    (type, domain, self.siocgifname(interface), protocol))

        browser = self.server.ServiceBrowserNew(
            interface, protocol, type, domain, dbus.UInt32(0))
        bus = dbus.Interface(self.system_bus.get_object(
            avahi.DBUS_NAME, browser), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
        bus.connect_to_signal('ItemNew', self.service_add)
        bus.connect_to_signal('ItemRemove', self.service_remove)

        self.service_browsers[(interface, protocol, type, domain)] = bus
项目:bluetool    作者:emlid    | 项目源码 | 文件源码
def RequestPasskey(self, device):
        print_info("RequestPasskey: {}\n".format(device))

        if not self._trust(device):
            print_error("RequestPasskey: failed to trust\n")
            raise _Rejected

        dev_info = self._get_device_info(device)

        try:
            passkey = int(self._client.request_passkey(dev_info))
        except BaseException as error:
            print_error("RequestPasskey: {}\n".format(error))
            raise _Rejected

        return dbus.UInt32(passkey)
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def publish(self):
        bus = dbus.SystemBus()
        server = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                avahi.DBUS_PATH_SERVER
            ),
            avahi.DBUS_INTERFACE_SERVER
        )

        g = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                server.EntryGroupNew()
            ),
            avahi.DBUS_INTERFACE_ENTRY_GROUP
        )

        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                     self.name, self.stype, self.domain, self.host,
                     dbus.UInt16(self.port), self.text)

        g.Commit()
        self.group = g
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def publish(self):
        bus = dbus.SystemBus()
        server = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                avahi.DBUS_PATH_SERVER
            ),
            avahi.DBUS_INTERFACE_SERVER
        )

        g = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                server.EntryGroupNew()
            ),
            avahi.DBUS_INTERFACE_ENTRY_GROUP
        )

        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                     self.name, self.stype, self.domain, self.host,
                     dbus.UInt16(self.port), self.text)

        g.Commit()
        self.group = g
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def convert(dbus_obj):
    """Converts dbus_obj from dbus type to python type.
    :param dbus_obj: dbus object.
    :returns: dbus_obj in python type.
    """
    _isinstance = partial(isinstance, dbus_obj)
    ConvertType = namedtuple('ConvertType', 'pytype dbustypes')

    pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
                              dbus.UInt16, dbus.UInt32, dbus.UInt64))
    pybool = ConvertType(bool, (dbus.Boolean, ))
    pyfloat = ConvertType(float, (dbus.Double, ))
    pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
                         (dbus.Array, ))
    pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
                          (dbus.Struct, ))
    types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
    pystr = ConvertType(str, types_str)

    pydict = ConvertType(
        lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())),
                                   list(map(convert, dbus_obj.values()))
                                   ))
                          ),
        (dbus.Dictionary, )
    )

    for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
        if any(map(_isinstance, conv.dbustypes)):
            return conv.pytype(dbus_obj)
    else:
        return dbus_obj
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def addr_to_dbus(addr, family):
        if (family == socket.AF_INET):
            return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
        else:
            return dbus.ByteArray(socket.inet_pton(family, addr))
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def mask_to_dbus(mask):
        return dbus.UInt32(mask)
项目:cpu-g    作者:atareao    | 项目源码 | 文件源码
def convert(dbus_obj):
    """Converts dbus_obj from dbus type to python type.
    :param dbus_obj: dbus object.
    :returns: dbus_obj in python type.
    """
    _isinstance = partial(isinstance, dbus_obj)
    ConvertType = namedtuple('ConvertType', 'pytype dbustypes')

    pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64,
                              dbus.UInt16, dbus.UInt32, dbus.UInt64))
    pybool = ConvertType(bool, (dbus.Boolean, ))
    pyfloat = ConvertType(float, (dbus.Double, ))
    pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)),
                         (dbus.Array, ))
    pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)),
                          (dbus.Struct, ))
    types_str = (dbus.ObjectPath, dbus.Signature, dbus.String)
    pystr = ConvertType(str, types_str)

    pydict = ConvertType(
        lambda _obj: dict(zip(map(convert, dbus_obj.keys()),
                              map(convert, dbus_obj.values())
                              )
                          ),
        (dbus.Dictionary, )
    )

    for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict):
        if any(map(_isinstance, conv.dbustypes)):
            return conv.pytype(dbus_obj)
    else:
        return dbus_obj
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def base_to_python(val):
        if isinstance(val, dbus.ByteArray):
            return "".join([str(x) for x in val])
        if isinstance(val, (dbus.Array, list, tuple)):
            return [fixups.base_to_python(x) for x in val]
        if isinstance(val, (dbus.Dictionary, dict)):
            return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()])
        if isinstance(val, dbus.ObjectPath):
            for obj in (NetworkManager, Settings, AgentManager):
                if val == obj.object_path:
                    return obj
            if val.startswith('/org/freedesktop/NetworkManager/'):
                classname = val.split('/')[4]
                classname = {
                    'Settings': 'Connection',
                    'Devices': 'Device',
                }.get(classname, classname)
                try:
                    return globals()[classname](val)
                except ObjectVanished:
                    return None
            if val == '/':
                return None
        if isinstance(val, (dbus.Signature, dbus.String)):
            return six.text_type(val)
        if isinstance(val, dbus.Boolean):
            return bool(val)
        if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
            return int(val)
        if isinstance(val, dbus.Byte):
            return six.int2byte(int(val))
        return val
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def addr_to_dbus(addr, family):
        if family == socket.AF_INET:
            return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
        else:
            return dbus.ByteArray(socket.inet_pton(family, addr))
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def mask_to_dbus(mask):
        return dbus.UInt32(mask)
项目:pywificontrol    作者:emlid    | 项目源码 | 文件源码
def set_ap_scan(self, value):
        return self.__set_property("ApScan", dbus.UInt32(value))
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def service_add(self, interface, protocol, name, type, domain, flags):
        logger.debug("Found service '%s' of type '%s:%s' in domain '%s' on %s.%i." %
                     (name, type, flags, domain, self.siocgifname(interface), avahi.LOOKUP_RESULT_LOCAL))

        # this check is for local services
        if flags & avahi.LOOKUP_RESULT_LOCAL:
            logger.debug('Dropping local service')
            return

        self.server.ResolveService(interface, protocol, name, type, domain, avahi.PROTO_INET, dbus.UInt32(
            0), reply_handler=self.service_resolved, error_handler=logger.error)
项目:bluetool    作者:emlid    | 项目源码 | 文件源码
def run(self):
        _bluetooth.make_discoverable(True, self.timeout)
        _bluetooth.set_adapter_property("Pairable", dbus.Boolean(1))
        _bluetooth.set_adapter_property("PairableTimeout", dbus.UInt32(0))
        self.agent = Agent(self.client_class, self.timeout, self.path)

        if not self._register():
            self.shutdown()
            return

        self._mainloop.run()
项目:bluetool    作者:emlid    | 项目源码 | 文件源码
def make_discoverable(self, value=True, timeout=180):
        try:
            adapter = bluezutils.find_adapter()
        except (bluezutils.BluezUtilError,
                dbus.exceptions.DBusException) as error:
            print_error(str(error) + "\n")
            return False

        try:
            props = dbus.Interface(
                self._bus.get_object("org.bluez", adapter.object_path),
                "org.freedesktop.DBus.Properties")

            timeout = int(timeout)
            value = int(value)

            if int(props.Get(
                    "org.bluez.Adapter1", "DiscoverableTimeout")) != timeout:
                props.Set(
                    "org.bluez.Adapter1", "DiscoverableTimeout",
                    dbus.UInt32(timeout))

            if int(props.Get("org.bluez.Adapter1", "Discoverable")) != value:
                props.Set(
                    "org.bluez.Adapter1", "Discoverable", dbus.Boolean(value))
        except dbus.exceptions.DBusException as error:
            print_error(str(error) + "\n")
            return False

        return True
项目:senic-hub    作者:getsenic    | 项目源码 | 文件源码
def discoverable_timeout(self, value):
        self._adapter_props.Set('org.bluez.Adapter1', 'DiscoverableTimeout', dbus.UInt32(value))
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def RequestPasskey(self, device):
    self.log().info("RequestPasskey (%s)" % (device))
    return dbus.UInt32(self.__passcode)
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def enable_pairability(self, timeout = 0):
    self._set_property('Discoverable', True)
    self._set_property('Pairable', True)
    self._set_property('PairableTimeout', dbus.UInt32(timeout))
    self._set_property('DiscoverableTimeout', dbus.UInt32(timeout))
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def disable_pairability(self):
    try:
      self._set_property('Discoverable', False)
      self._set_property('Pairable', False)
      self._set_property('PairableTimeout', dbus.UInt32(0))
      self._set_property('DiscoverableTimeout', dbus.UInt32(180))
    except:
      pass
项目:phony    作者:littlecraft    | 项目源码 | 文件源码
def RequestPasskey(self, device):
    self.log().debug("RequestPasskey (%s)" % (device))
    return dbus.UInt32(self._passcode)
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def _check_polkit_privilege(self, sender, conn, privilege):
        '''Verify that sender has a given PolicyKit privilege.

        sender is the sender's (private) D-BUS name, such as ":1:42"
        (sender_keyword in @dbus.service.methods). conn is
        the dbus.Connection object (connection_keyword in
        @dbus.service.methods). privilege is the PolicyKit privilege string.

        This method returns if the caller is privileged, and otherwise throws a
        PermissionDeniedByPolicy exception.
        '''
        if sender is None and conn is None:
            # called locally, not through D-BUS
            return
        if not self.enforce_polkit:
            # that happens for testing purposes when running on the session
            # bus, and it does not make sense to restrict operations here
            return

        # get peer PID
        if self.dbus_info is None:
            self.dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus',
                '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
        pid = self.dbus_info.GetConnectionUnixProcessID(sender)

        # query PolicyKit
        if self.polkit is None:
            self.polkit = dbus.Interface(dbus.SystemBus().get_object(
                'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False),
                'org.freedesktop.PolicyKit1.Authority')
        try:
            # we don't need is_challenge return here, since we call with AllowUserInteraction
            (is_auth, unused, details) = self.polkit.CheckAuthorization(
                    ('unix-process', {'pid': dbus.UInt32(pid, variant_level=1),
                        'start-time': dbus.UInt64(0, variant_level=1)}),
                    privilege, {'': ''}, dbus.UInt32(1), '', timeout=600)
        except dbus.DBusException as msg:
            if msg.get_dbus_name() == \
                                    'org.freedesktop.DBus.Error.ServiceUnknown':
                # polkitd timed out, connect again
                self.polkit = None
                return self._check_polkit_privilege(sender, conn, privilege)
            else:
                raise

        if not is_auth:
            logging.debug('_check_polkit_privilege: sender %s on connection %s pid %i is not authorized for %s: %s',
                    sender, conn, pid, privilege, str(details))
            raise PermissionDeniedByPolicy(privilege)

    #
    # Internal API for calling from Handlers (not exported through D-BUS)
    #
项目:BMW-RPi-iBUS    作者:KLUSEK    | 项目源码 | 文件源码
def __init__(self, onBluetoothConnected_callback, onPlayerChanged_callback):
        self.onBluetoothConnected_callback = onBluetoothConnected_callback
        self.onPlayerChanged_callback = onPlayerChanged_callback

        # Get the system bus
        try:
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            self.bus = dbus.SystemBus()
        except Exception as ex:
            print("Unable to get the system dbus: '{0}'. Exiting. Is dbus running?".format(ex.message))
            return False

        adapter_path = bluezutils.find_adapter(None).object_path
        adapter = dbus.Interface(self.bus.get_object(bluezutils.SERVICE_NAME, adapter_path), "org.freedesktop.DBus.Properties")

        # Power on adapter
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "Powered", dbus.Boolean(1))
        # Set name to display in available connections
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "Alias", bluezutils.BT_DEVICE_NAME)
        # Set pairable on
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "Pairable", dbus.Boolean(1))
        # Set discoverable on
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "Discoverable", dbus.Boolean(1))
        # Set discoverable timeout to 0
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "DiscoverableTimeout", dbus.UInt32(0))
        # Set paraible time out to 0
        adapter.Set(bluezutils.ADAPTER_INTERFACE, "PairableTimeout", dbus.UInt32(0))

        bluezutils.show_adapter_info()

        self.path = "/test/agent"
        agent = bluezutils.Agent(self.bus, self.path)

        obj = self.bus.get_object(bluezutils.SERVICE_NAME, "/org/bluez");
        self.manager = dbus.Interface(obj, "org.bluez.AgentManager1")
        self.manager.RegisterAgent(self.path, "NoInputNoOutput")

        print("Bluetooth AgentManager registered")

        # listen for signal of remove adapter
#        self.bus.add_signal_receiver(self.interfaces_removed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesRemoved")

        # listen for signals on the Bluez bus
        self.bus.add_signal_receiver(self.device_property_changed, bus_name=bluezutils.SERVICE_NAME, signal_name="PropertiesChanged", path_keyword="device_path", interface_keyword="interface")

        # listen for signal of changing properties
        self.bus.add_signal_receiver(self.player_changed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path")

        print("Signals receiver registered")

        self.manager.RequestDefaultAgent(self.path)