Subscriptions¶
Usage¶
Warning
This functionality is considered experimental; the interface may change in future releases
The subscription api can be used to interact with asynchronous Pelion Device Management events.
Currently we support:
- Device State Changes (registration, deregistration etc of devices matching a filter)
- Resource Value Changes (values changing on resources that match a device and filter)
- Current Resource Value (requesting the current value of a specific resource on a device)
from mbed_cloud import ConnectAPI
api = ConnectAPI()
notification = api.subscribe(
api.subscribe.channels.DeviceStateChanges(device_id=1234)
).next().block()
print(notification)
# equivalent to:
channel = api.subscribe.channels.DeviceStateChanges(device_id=1234)
api.subscribe(channel)
observer = channel.observer
async_wrapper = observer.next()
notification = async_wrapper.block()
# equivalent to:
awaitable = api.subscribe(
api.subscribe.channels.DeviceStateChanges(device_id=1234)
).next().defer()
while True:
if awaitable.ready():
notification = awaitable.get()
break
The subscription system in the Python SDK has the following layers:
- A Python 2/3 AsyncResult object
- A Python 3 asyncio Future
- Or block until the result can be returned
Additional channel examples:
# blocking calls with filters
channel = connect_api.subscribe(DeviceStateChanges(device_id=5, state='deregistered'))
for result in channel:
print(result.block())
# deferred in python3
for my_future in connect_api.subscribe(DeviceStateChanges(device_id=None)):
await my_future
# deferred in python2
for my_async_result in connect_api.subscribe(DeviceStateChanges(device_id=None)):
print(my_async_result.get())
# resource values channel using wildcards
channel = connect_api.subscribe(ResourceValues(resource_path=['/4/0/1', '/3/*']))
for result in channel:
print(result.block())
Reference¶
-
class
mbed_cloud.subscribe.channels.
DeviceStateChanges
(device_id=None, **extra_filters)¶ Triggers on changes to registration state of devices
Warning
This functionality is considered experimental; the interface may change in future releases
Parameters: - device_id – a device identifier
- extra_filters – additional filters e.g. dict(channel=API_CHANNELS.registrations)
-
notify
(data)¶ Notify this channel of inbound data
-
start
()¶ Start the channel
-
class
mbed_cloud.subscribe.channels.
ResourceValues
(device_id=None, resource_path=None, first_value=’on_value_update’, **extra_filters)¶ Triggers when a resource’s value changes
Warning
This functionality is considered experimental; the interface may change in future releases
Parameters: - device_id – device identifier, (or list thereof), optionally ending with wildcard *
- resource_path – resource path, (or list thereof), optionally ending with wildcard *
- first_value – mode for adjusting immediacy of subscribed values
- extra_filters – other local key-value filters
-
start
()¶ Start the channel
-
stop
()¶ Stop the channel
-
class
mbed_cloud.subscribe.channels.
CurrentResourceValue
(device_id, resource_path, **extra_filters)¶ Triggers on response to a request for a current resource value
Warning
This functionality is considered experimental; the interface may change in future releases
Parameters: - device_id – a device identifier
- resource_path – a resource path
- extra_filters –
-
start
()¶ Start the channel
-
class
mbed_cloud.subscribe.observer.
Observer
(filters=None, queue_size=0, once=False, provider=None, timeout=None)¶ An iterable that manufactures results or promises for a stream of data Observer[1..n] gives promises for data stream [1..n] and fulfills them in that order
Parameters: - filters – Additional key-value pairs to whitelist inbound notifications
- queue_size – sets internal notification queue max length
- once – only retrieve one item
- provider –
- timeout –
-
add_callback
(fn)¶ Register a callback, triggered when new data arrives
As an alternative to callbacks, consider use of Futures or AsyncResults e.g. from .next().defer()
-
cancel
()¶ Cancels the observer
No more notifications will be passed on
-
clear_callbacks
()¶ Remove all callbacks
-
next
()¶ Next item in sequence (Python 2 compatibility)
-
notify
(data)¶ Notify this observer that data has arrived
-
notify_count
¶ Number of notifications received
-
queue
¶ Direct access to the internal notification queue
-
remove_callback
(fn)¶ Remove a registered callback
-
class
mbed_cloud.subscribe.async_wrapper.
AsyncWrapper
(func=None, concurrency_provider=None)¶ Creates a wrapper for a potentially asynchronous function.
Parameters: - func – blocking call, asyncio coroutine or future
- concurrency_provider – ThreadPool or asyncio BaseEventLoop or None: default ThreadPool or False: default EventLoop
-
block
(*args, **kwargs)¶ Call the wrapped function, and wait for the result in a blocking fashion
Returns the result of the function call.
Parameters: - args –
- kwargs –
Returns: result of function call
-
defer
(*args, **kwargs)¶ Call the function and immediately return an asynchronous object.
The calling code will need to check for the result at a later time using:
- In Python 2/3 using ThreadPools - an AsyncResult
- (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult)
- In Python 3 using Asyncio - a Future
- (https://docs.python.org/3/library/asyncio-task.html#future)
Parameters: - args –
- kwargs –
Returns:
A Channels API module
-
class
mbed_cloud.subscribe.channels.resource_values.
FirstValue
¶ Container for ‘first value’ modes
‘First value’ refers to how soon after requesting a subscription the first resource value is fetched.
There is a tradeoff in terms of performance/api calls/device power usage
-
class
mbed_cloud.subscribe.channels.resource_values.
ChannelIdentifiers
¶ API channels
Internal Pelion Device Management channel identifiers