ukBaz Notes

A place to write longer form notes and how-to's!

Beacon Scanner

Introduction

In part 1 of this series of posts we looked at the basics of working with D-Bus and BlueZ. In this part we take on the tricky topic of scanning for nearby devices. A typical example of an application for this is to scan for nearby BLE Beacons.

Bluetooth Low Energy (BLE) Beacon is a device that only transmits small amounts of data and usually can not be connected to. iBeacon and Eddystone are two of the more well known formats. The small amount of data is stored in either the service data or manufacturer data property depending on the beacon.

BlueZ D-Bus Object Creation Signal

The core of this functionality from PyGObject is the DBusObjectManagerClient functionality.

Once an instance of this type has been created, you can connect to the Gio.DBusObjectManager object-added signal. This enables having callbacks that are run everytime a new BlueZ object is added.

In the example below a class is created to inherit from GObject.GObject so that custom signals can be created for when specific BlueZ objects are created. In this case it is an adapter and a device being added. This isn’t the only way of doing this. It could be that the _on_object_added could call the new_device_hndlr function directly. In this example it slightly complicates things but if it was part of a bigger library and there was a need for multiple listners then it could be more useful.

Filtering of objects

As the device is never connected to there is no need to store the device in the BlueZ cache so the example removes the devices once it has read the property information in the advertisement.

This is a good thing to do for beacons for a couple of reasons. First is that the one the device is in the cache it is the g-properties-changed signal on the device proxy that needs to be monitored. Secondly, once the device is in the cache the Linux kernel does some filtering of that device to ensure it doesn’t spam the system with notifications. Typically, for beacon scanning applications it is desirable to see every advertisement from the beacon.

Signals and Variants

To make the device-added signal includes all the device properties as a dictionary. In D-Bus the values in the dictionary need to be converted into variant types. This is done with the create_variant function. Rather than guess the D-Bus type from the Python type, this function looks up the property type.

import logging
from gi.repository import Gio, GLib, GObject

# DBus Information
bus_type = Gio.BusType.SYSTEM
BLUEZ_NAME = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
PROP_IFACE = 'org.freedesktop.DBus.Properties'
ADAPTER_IFACE = 'org.bluez.Adapter1'
DEVICE_IFACE = 'org.bluez.Device1'

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('scan_app')


def create_variant(py_data):
    """
    convert python native data types to D-Bus variant types by looking up
    their type expected for that key.
    """
    type_lookup = {'Address': 's',
                   'AddressType': 's',
                   'Name': 's',
                   'Icon': 's',
                   'Class': 'u',
                   'Appearance': 'q',
                   'Alias': 's',
                   'Paired': 'b',
                   'Trusted': 'b',
                   'Blocked': 'b',
                   'LegacyPairing': 'b',
                   'RSSI': 'n',
                   'Connected': 'b',
                   'UUIDs': 'as',
                   'Adapter': 'o',
                   'ManufacturerData': 'a{qay}',
                   'ServiceData': 'a{say}',
                   'TxPower': 'n',
                   'ServicesResolved': 'b',
                   'WakeAllowed': 'b',
                   'Modalias': 's',
                   'AdvertisingFlags': 'ay',
                   'AdvertisingData': 'a{yay}',
                   'Powered': 'b',
                   'Discoverable': 'b',
                   'Pairable': 'b',
                   'PairableTimeout': 'u',
                   'DiscoverableTimeout': 'u',
                   'Discovering': 'b',
                   'Roles': 'as',
                   'ExperimentalFeatures': 'as',
                   }
    if py_data is None:
        return GLib.Variant('a{sv}', {})
    for k, v in py_data.items():
        py_data[k] = GLib.Variant(type_lookup[k], v)
    return GLib.Variant('a{sv}', py_data)


class BluezObjectManager(GObject.GObject):
    __gsignals__ = {
        'adapter-added': (GObject.SignalFlags.NO_HOOKS, None,
                          (str, GObject.TYPE_VARIANT)),
        'adapter-removed': (GObject.SignalFlags.NO_HOOKS, None, (str,)),
        'device-added': (GObject.SignalFlags.NO_HOOKS, None,
                         (str, GObject.TYPE_VARIANT)),
        'device-removed': (GObject.SignalFlags.NO_HOOKS, None, (str,)),
    }

    def __init__(self) -> None:
        super().__init__()
        self._object_manager = Gio.DBusObjectManagerClient.new_for_bus_sync(
            bus_type,
            Gio.DBusObjectManagerClientFlags.DO_NOT_AUTO_START,
            BLUEZ_NAME,
            '/', None, None, None)

        self._object_manager.connect("object-added", self._on_object_added)
        self._object_manager.connect("object-removed", self._on_object_removed)

    def _on_object_added(self,
                         _object_manager: Gio.DBusObjectManager,
                         dbus_object: Gio.DBusObject) -> None:
        object_path = dbus_object.get_object_path()
        ifaces = [iface.get_interface_name()
                  for iface in dbus_object.get_interfaces()]
        if ADAPTER_IFACE in ifaces:
            prop_proxy = dbus_object.get_interface(PROP_IFACE)
            adapter_props = prop_proxy.GetAll('(s)', ADAPTER_IFACE)
            logger.debug('adapter-added %s', object_path, adapter_props)
            self.emit('adapter-added', object_path, adapter_props)
        elif DEVICE_IFACE in ifaces:
            prop_proxy = dbus_object.get_interface(PROP_IFACE)
            dev_props = prop_proxy.GetAll('(s)', DEVICE_IFACE)
            logger.debug('device-added %s : %s', object_path, dev_props)
            self.emit('device-added', object_path, create_variant(dev_props))

    def _on_object_removed(self,
                           _object_manager: Gio.DBusObjectManager,
                           dbus_object: Gio.DBusObject) -> None:
        object_path = dbus_object.get_object_path()
        ifaces = [iface.get_interface_name()
                  for iface in dbus_object.get_interfaces()]
        if ADAPTER_IFACE in ifaces:
            logger.debug('adapter-removed %s : %s', object_path)
            self.emit('adapter-removed', object_path)
        elif DEVICE_IFACE in ifaces:
            logger.debug('device-removed: %s', object_path)
            self.emit('device-removed', object_path)


def bluez_proxy(object_path, interface):
    """Return a BlueZ proxy object for the given D-Bus information"""
    return Gio.DBusProxy.new_for_bus_sync(
        bus_type=bus_type,
        flags=Gio.DBusProxyFlags.NONE,
        info=None,
        name=BLUEZ_NAME,
        object_path=object_path,
        interface_name=interface,
        cancellable=None)


def new_device_hndlr(proxy: BluezObjectManager,
                     object_path: str,
                     device_props: GObject.TYPE_VARIANT) -> None:
    """Event handler for New device has been detected with scan"""
    props = device_props.unpack()
    logger.debug('New Device Handler: %s : %s', object_path, props)
    address = props.get('Address')
    if address:
        print(f'Device with address {address} found. '
              f'Removing from BlueZ cache')
    # adapter.RemoveDevice('(o)', GLib.Variant.new_object_path(object_path))
    adapter.RemoveDevice('(o)', object_path)


def stop_scan():
    """Stop scanning for new devices and quit event loop"""
    logger.info('Stopping Discovery')
    adapter.StopDiscovery()
    mainloop.quit()
    return False


if __name__ == '__main__':
    # setup dbus
    mngr = BluezObjectManager()
    adapter = bluez_proxy(ADAPTER_PATH, ADAPTER_IFACE)
    # Link device-added event to callback function
    mngr.connect('device-added', new_device_hndlr)
    # Start discovering (scanning) for devices
    adapter.StartDiscovery()

    # Enable eventloop for notifications
    mainloop = GLib.MainLoop()
    # Create a timed event to call the stop_scan function
    GLib.timeout_add_seconds(interval=20, function=stop_scan)

    try:
        mainloop.run()
    except KeyboardInterrupt:
        mainloop.quit()
        adapter.StopDiscovery()

© Copyright 2022, Barry Byford.

first published: 2022 January 29

last updated: 2022 January 29

Creative Commons Licence
This work is licensed under a Creative Commons Attribution 4.0 International License.