Subscriptions

Usage

Warning

This functionality is considered experimental; the interface may change in future releases

The subscription api can be used to interact with asynchronous Pelion Device Management events.

Currently we support:

  • Device State Changes (registration, deregistration etc of devices matching a filter)
  • Resource Value Changes (values changing on resources that match a device and filter)
  • Current Resource Value (requesting the current value of a specific resource on a device)
from mbed_cloud import ConnectAPI
api = ConnectAPI()
notification = api.subscribe(
  api.subscribe.channels.DeviceStateChanges(device_id=1234)
).next().block()
print(notification)

# equivalent to:
channel = api.subscribe.channels.DeviceStateChanges(device_id=1234)
api.subscribe(channel)
observer = channel.observer
async_wrapper = observer.next()
notification = async_wrapper.block()

# equivalent to:
awaitable = api.subscribe(
  api.subscribe.channels.DeviceStateChanges(device_id=1234)
).next().defer()
while True:
  if awaitable.ready():
    notification = awaitable.get()
    break

The subscription system in the Python SDK has the following layers:

A Subscription Manager, which may have several registered channels.
The manager routes inbound notifications to channels
Channels, which request and filter notifications from the cloud, and provide a single observer.
Channels represent the individual data streams relating to Pelion Device Management functionality.
They are the main abstraction layer between Pelion Device Management api channels and user code,
and may contain filters to further control notification behaviour.
Observer, an iterable of future results, each of which is an async wrapper.
Observers are generators; iterables returning potential future results in the order they arrive.
Async Wrapper, a placeholder for an event that is yet to occur.
The async wrapper can be used to obtain:
  • A Python 2/3 AsyncResult object
  • A Python 3 asyncio Future
  • Or block until the result can be returned

Additional channel examples:

# blocking calls with filters
channel = connect_api.subscribe(DeviceStateChanges(device_id=5, state='deregistered'))
for result in channel:
  print(result.block())

# deferred in python3
for my_future in connect_api.subscribe(DeviceStateChanges(device_id=None)):
  await my_future

# deferred in python2
for my_async_result in connect_api.subscribe(DeviceStateChanges(device_id=None)):
  print(my_async_result.get())

# resource values channel using wildcards
channel = connect_api.subscribe(ResourceValues(resource_path=['/4/0/1', '/3/*']))
for result in channel:
  print(result.block())

Reference

class mbed_cloud.subscribe.channels.DeviceStateChanges(device_id=None, **extra_filters)

Triggers on changes to registration state of devices

Warning

This functionality is considered experimental; the interface may change in future releases

Parameters:
  • device_id – a device identifier
  • extra_filters – additional filters e.g. dict(channel=API_CHANNELS.registrations)
notify(data)

Notify this channel of inbound data

start()

Start the channel

class mbed_cloud.subscribe.channels.ResourceValues(device_id=None, resource_path=None, first_value=’on_value_update’, **extra_filters)

Triggers when a resource’s value changes

Warning

This functionality is considered experimental; the interface may change in future releases

Parameters:
  • device_id – device identifier, (or list thereof), optionally ending with wildcard *
  • resource_path – resource path, (or list thereof), optionally ending with wildcard *
  • first_value – mode for adjusting immediacy of subscribed values
  • extra_filters – other local key-value filters
start()

Start the channel

stop()

Stop the channel

class mbed_cloud.subscribe.channels.CurrentResourceValue(device_id, resource_path, **extra_filters)

Triggers on response to a request for a current resource value

Warning

This functionality is considered experimental; the interface may change in future releases

Parameters:
  • device_id – a device identifier
  • resource_path – a resource path
  • extra_filters
start()

Start the channel

class mbed_cloud.subscribe.observer.Observer(filters=None, queue_size=0, once=False, provider=None, timeout=None)

An iterable that manufactures results or promises for a stream of data Observer[1..n] gives promises for data stream [1..n] and fulfills them in that order

Parameters:
  • filters – Additional key-value pairs to whitelist inbound notifications
  • queue_size – sets internal notification queue max length
  • once – only retrieve one item
  • provider
  • timeout
add_callback(fn)

Register a callback, triggered when new data arrives

As an alternative to callbacks, consider use of Futures or AsyncResults e.g. from .next().defer()

cancel()

Cancels the observer

No more notifications will be passed on

clear_callbacks()

Remove all callbacks

next()

Next item in sequence (Python 2 compatibility)

notify(data)

Notify this observer that data has arrived

notify_count

Number of notifications received

queue

Direct access to the internal notification queue

remove_callback(fn)

Remove a registered callback

class mbed_cloud.subscribe.async_wrapper.AsyncWrapper(func=None, concurrency_provider=None)

Creates a wrapper for a potentially asynchronous function.

Parameters:
  • func – blocking call, asyncio coroutine or future
  • concurrency_provider – ThreadPool or asyncio BaseEventLoop or None: default ThreadPool or False: default EventLoop
block(*args, **kwargs)

Call the wrapped function, and wait for the result in a blocking fashion

Returns the result of the function call.

Parameters:
  • args
  • kwargs
Returns:

result of function call

defer(*args, **kwargs)

Call the function and immediately return an asynchronous object.

The calling code will need to check for the result at a later time using:

In Python 2/3 using ThreadPools - an AsyncResult
(https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult)
In Python 3 using Asyncio - a Future
(https://docs.python.org/3/library/asyncio-task.html#future)
Parameters:
  • args
  • kwargs
Returns:

A Channels API module

class mbed_cloud.subscribe.channels.resource_values.FirstValue

Container for ‘first value’ modes

‘First value’ refers to how soon after requesting a subscription the first resource value is fetched.

There is a tradeoff in terms of performance/api calls/device power usage

class mbed_cloud.subscribe.channels.resource_values.ChannelIdentifiers

API channels

Internal Pelion Device Management channel identifiers