Publish, subscribe, request and reply in your service¶
The whole framework is based on messages, s. Architecture Overview for details.
Subjects in nats and FastIoT¶
The whole messing is based on Subjects you subscribe to or publish messages on. For details how this is handled within nats.io please refer to their official documentation at https://docs.nats.io/nats-concepts/subjects
For FastIoT this results in two basic options:
You may use many (hierarchical) subjects if you send the same data type for multiple e.g. sensors. This is the case for the data type
fastiot.msg.thing.Thing
, where many services will send their sensor data to many other services reading them. This allows to subscribe to a single subject (e.g. sensor) as well as to many subjects using wildcards (s. nats.io documentation for using wildcards).You may use on subject for one data type. Especially if you have custom data types with well defined sources and sinks this is an option.
FastIoT only brings very basic data types defined in fastiot.msg
coming with it’s own subjects. If you want to
define custom data models for your projects please refer to Creating your own data types.
Subscribing to an existing subject¶
To subscribe to an subject you just have to add a method to your service and decorate it with an @subscribe
.
fastiot.core.service_annotations.subscribe()
needs to have a fastiot.core.data_models.Subject
passed. The
easiest way to get the subject for a datamodel like Thing
is to use its method get_subject
which will
automatically create a subject for the datamodel. See fastiot.core.data_models.FastIoTData.get_subject()
for more
details.
from fastiot.core import FastIoTService, Subject, subscribe
from fastiot.msg.thing import Thing
class MyService(FastIoTService):
@subscribe(subject=Thing.get_subject('*'))
async def consume(self, topic: str, msg: Thing):
""" Do something with the msg here! """
In this case a wildcard was used to receive all messages on the subject Thing.*
. This will work e.g. with
Thing.sensor_1
but not with Thing.machine.other_sensor
. To also subscribe everything below Thing
you can use
the wildcard >
like in Thing.>
. This will subscribe everything below Thing
, so machine
but also
other_machine
.
To subscribe to a non-hierarchical subject you can leave out the argument '*'
. This will then subscribe Thing
.
A working example can be found in the consumer example service: fastiot_sample_services.consumer.consumer_module.ExampleConsumerService
Defining own subjects¶
Instead of relying on the subject defined by data model you can also create you own subject:
@subscribe(subject=Subject(name='my_subject',
msg_cls=MyDataModel))
The message received will then be parsed into your MyDataModel
. You can also specify dict
as a msg_class
to
receive the message as plain dictionary. We strongly recommend to use Pydantic data models and not work with
dictionaries.
Publishing data¶
Publishing data works on the same principle as receiving with a subscription regarding topics.
The following code will initiate a forever running loop and publish a message with the value 42 on the topic
Thing.my_sensor
with the current time. If you are not familiar with async please pay extra attention not to forget
the await
before .
For a fully working example you may also consult fastiot_sample_services.producer.producer_module.ExampleProducerService
import asyncio
from datetime import datetime
from fastiot.core import FastIoTService, Subject, loop
from fastiot.core.core_uuid import get_uuid
from fastiot.msg.thing import Thing
class MyProducer(FastIoTService):
@loop
async def send_something(self):
sensor_name = 'my_sensor'
subject = Thing.get_subject(sensor_name)
await self.broker_connection.publish(
subject=subject,
msg=Thing(
name=sensor_name,
machine='MyMachine', measurement_id=get_uuid(),
value=42, timestamp=datetime.utcnow()
)
)
return asyncio.sleep(2)
If you want to send larger Files you can use
fastiot.db.redis_helper.RedisHelper
.
As an example for an implementation
you can use
fastiot_sample_services.redis_producer.redis_producer_module.ExampleRedisProducerService
Request and Reply¶
Sometimes you may not kind of “shout” the data over the broker (publish) but make sure it arrives and you get a certain response.
Therefore your responder needs to return a message at the end:
@reply(ReplySubject(name="reply",
msg_cls=Thing,
reply_cls=Thing))
async def pong(self, topic: str, msg: Thing) -> Thing:
return msg
This code will simply receive a Thing
and return the same Thing
as response.
The request for this looks similiar to a typical publish. We define an request message (here a ping_msg
), define a
subject (reply
) and send an request.
async def request(self):
ping_msg = Thing(machine='SomeMachine', name="RequestSensor", value=42, timestamp=datetime.now())
subject = ReplySubject(name="reply", msg_cls=Thing, reply_cls=Thing)
pong_msg: Thing = await self.broker_connection.request(subject=subject, msg=request, timeout=10)
All methods like request
and reply
can be imported from fastiot.core
.