fastiot.core package

This module provides the core functionality for FastIoT Services:

Submodules

fastiot.core.broker_connection module

class fastiot.core.broker_connection.Subscription[source]
abstract async unsubscribe()[source]

Cancels the subscription

class fastiot.core.broker_connection.NatsBrokerSubscription(subscription_error_cb=None, **kwargs)[source]
__init__(subscription_error_cb=None, **kwargs)[source]
async unsubscribe()[source]

Cancels the subscription

class fastiot.core.broker_connection.NatsBrokerSubscriptionSubject(subject, cb, **kwargs)[source]
__init__(subject, cb, **kwargs)[source]
async received_nats_msg_cb(nats_msg)[source]
class fastiot.core.broker_connection.NatsBrokerSubscriptionReplySubject(subject, cb, send_reply_fn, **kwargs)[source]
__init__(subject, cb, send_reply_fn, **kwargs)[source]
async received_nats_msg_cb(nats_msg)[source]
class fastiot.core.broker_connection.BrokerConnection(**kwargs)[source]
__init__(**kwargs)[source]
abstract async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_msg_queue(subject, msg_queue)[source]

Subscribe to a subject using a message queue instead of a callback. Use this if you prefer querying a msg_queue.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • msg_queue (Queue[Union[FastIoTData, dict]]) – The message queue where received messages are enqueued.

Return type:

Subscription

abstract async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription

async publish(subject, msg)[source]

Publishes a message for a subject.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

async request(subject, msg, timeout=30.0)[source]

Send a request on a subject.

Parameters:
  • subject (ReplySubject) – The subject used for sending the request.

  • msg (Union[FastIoTRequest, dict]) – The request

  • timeout (float) – The time in seconds to wait for an answer. Raises ErrTimeout if no answer is received in time.

Return type:

Union[FastIoTResponse, dict]

Returns:

The response

abstract property is_connected: bool

Return the connection status e.g. for health checks

run_threadsafe_nowait(coro)[source]

Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.

Parameters:

coro (Coroutine) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’

Return type:

Future

run_threadsafe(coro, timeout=0.0)[source]

Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.

Parameters:
  • coro (Coroutine) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’

  • timeout (float) – The number of seconds to wait for the result to be done. Raises concurrent.futures.TimeoutError if timeout exceeds. A value of zero means wait forever.

Return type:

Any

Returns:

Returns the result of the coroutine. If the coroutine raised an exception, it is reraised.

publish_sync(subject, msg, timeout=0.0)[source]

Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

  • timeout (float) – The timeout.

publish_sync_nowait(subject, msg)[source]

Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe_nowait.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

Return type:

Future

request_sync(subject, msg, timeout=30.0)[source]

Performs a request on the subject. This method is thread-safe. Under the hood, it uses run_threadsafe.

Please note, that it will only timeout if the request times out. For purposes of simplicity it will wait forever, if the executing thread is occupied too much and the request cannot be scheduled.

Parameters:
  • subject (ReplySubject) – The subject info to publish the request.

  • msg (Union[FastIoTRequest, dict]) – The request message.

  • timeout (float) – The timeout for the broker call.

Return type:

Union[FastIoTResponse, dict]

Returns:

The requested message.

class fastiot.core.broker_connection.NatsBrokerConnection(client, subscription_error_cb=None)[source]
async classmethod connect(closed_cb=None, subscription_error_cb=None)[source]

Connects a nats instance and returns a nats broker connection.

Return type:

NatsBrokerConnection

__init__(client, subscription_error_cb=None)[source]
async close()[source]
async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription

property is_connected

Return the connection status e.g. for health checks

class fastiot.core.broker_connection.SubscriptionDummy[source]
async unsubscribe()[source]

Cancels the subscription

check_pending_error()[source]
async raise_pending_error()[source]
class fastiot.core.broker_connection.BrokerConnectionDummy(**kwargs)[source]

A dummy broker implementation to mock dependencies.

property is_connected: bool

Return the connection status e.g. for health checks

async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription

fastiot.core.core_uuid module

fastiot.core.core_uuid.get_uuid()[source]

This method returns a UUID in project canonical format.

Return type:

str

fastiot.core.data_models module

pydantic model fastiot.core.data_models.FastIoTData[source]

Basemodel for all data types / data models to be transferred over the broker between the services. This is basically a Pydantic model with the additional handling of subjects. So any Pydantic model should work here, as long as it can be serialized using the library msgpack.

The subject is constructed from the model name, e.g. if your data model is called MySpecialModel a subject v1.my_special_subject will be created. If you want to have more control over the subject name you may overwrite the method fastiot.core.data_models.FastIoTData.get_subject() in your data model or create a new model based on Pydantic’s pydantic.BaseModel. See Publish, subscribe, request and reply in your service for more details about publish and subscribe.

For your own data models please use fastiot.core.data_models.FastIoTPublish for data that is simply published over the broker. For Request and Response please use fastiot.core.data_models.FastIoTRequest and fastiot.core.data_models.FastIoTResponse.

This class also implements the magic methode __setattr__, there is a known bug in BaseModel, the private attributes cannot be set with the @property, this overrode methode makes it possible.

validator set_timezones  »  all fields[source]
pydantic model fastiot.core.data_models.FastIoTPublish[source]

Base datatype for publishing data. Please refer to Creating your own data types for more information about creating your own data types.

classmethod get_subject(name='')[source]

This method returns the corresponding subject for the data model as fastiot.core.data_models.Subject.

Parameters:

name (str) – The name of the subject. Please pay special attention to this parameter: The default is set to "". This works well for data models without hierarchies. In this case you will just subscribe to v1.my_special_data_model. If you use many sensors, like in the data model fastiot.msg.ting.Thing you have to provide a name. Then you can subscribe to v1.thing.my_sensor. If you want to subscribe to all sensors use * as name. See more in Publish, subscribe, request and reply in your service

Return type:

Subject

pydantic model fastiot.core.data_models.FastIoTResponse[source]

Base datatype for answering requests based on :class: fastiot.core.data_models.FastIoTRequest. Please refer to Creating your own data types for more information about creating your custom data types.

classmethod get_subject()[source]
pydantic model fastiot.core.data_models.FastIoTRequest[source]

Base datatype for handling requests. Please refer to Creating your own data types for more information about creating your own data types.

classmethod get_reply_subject(name='')[source]
This method returns the corresponding reply subject for the data model as

fastiot.core.data_models.ReplySubject.

Parameters:

name (str) – The name of the subject. Please pay special attention to this parameter: The default is set to "". This works well for data models without hierarchies. In this case you will just subscribe to v1.my_special_data_model. If you use many sensors, like in the data model fastiot.msg.ting.Thing you have to provide a name. Then you can subscribe to v1.thing.my_sensor. If you want to subscribe to all sensors use * as name. See more in Publish, subscribe, request and reply in your service

Return type:

ReplySubject

pydantic model fastiot.core.data_models.Subject[source]

General model to handle subjects for subscriptions within the framework.

field name: str [Required]

Name of the subject, s. fastiot.core.data_models.FastIoTData.get_subject() for details about subscription names.

field msg_cls: Type[Union[FastIoTData, dict]] [Required]

Datatype the message will provide.

pydantic model fastiot.core.data_models.ReplySubject[source]

Model for handle subject subscription which also have a reply to cls

field reply_cls: Type[Union[FastIoTData, dict]] [Required]

Set to a datatype, not the default None to expect a reply in this datatype.

make_generic_reply_inbox()[source]
Return type:

Subject

get_reply_inbox(reply_to)[source]
Return type:

Subject

fastiot.core.logger module

fastiot.core.logger.setup_logger(name='root')[source]

This logging is a wrapper for logging from python, you can use it like following. Also the FASTIOT_LOG_LEVEL must be set to the expected level. s. https://docs.python.org/3/library/logging.html#logging-levels

from fastiot import logging

logging.debug('debug message')
logging.info('info message')
fastiot.core.logger.get_log_config(level=20)[source]

This function is used to set the logging configuration globally, now the console log should have this format: timestamp: LOG_LEVEL module name: Log message If the log should be saved to a file, this configuration can also be modified by using logging.Filehandler.

Return type:

Dict

fastiot.core.serialization module

fastiot.core.serialization.serialize_to_bin(msg_cls, msg)[source]

Serializes a msg to binary. It also applies some basic type checks. Please be careful, msgpack will only serialize python primary data types. Data types from numpy for example, cannot be serialized.

Return type:

bytes

fastiot.core.serialization.serialize_from_bin(msg_cls, data)[source]

Serializes a msg from binary.

Return type:

Union[FastIoTData, dict]

fastiot.core.service module

class fastiot.core.service.FastIoTService(broker_connection=None, **kwargs)[source]

This is the most base class for all FastIoT Services. Your service must inherit from this class for everything to work.

classmethod main(**kwargs)[source]

Entrypoint of the class; this is the method to be started using e.g. a run.py like generated when creating a new service. Do not overwrite, unless you know exactly what you are doing.

Parameters:

kwargs – kwargs will be passed to class constructor.

__init__(broker_connection=None, **kwargs)[source]
run_task(coro)[source]

Creates an asyncio-Task which is managed by this class. If the execution of the coroutine raises an exception, they are logged and a service shutdown is requested. If the task terminates regularly, it is dropped and the module continuous to run.

The task is awaited after _stop() has been called.

async wait_for_shutdown(timeout=0.0)[source]

Method to wait for service shutdown. This is helpfull if you have a loop running forever till the service needs to shut down.

Shutdown may occur when some other parts of the service fail like database connection or broker connection.

Per default, it will wait indefinetly, but you can specify a timeout. If timeout exceeds, it will not raise a timeout error, but instead return false. Otherwise, it will return true.

Example: >>> while await self.wait_for_shutdown(1.0) is False: >>> print(“Still running…”)

Parameters:

timeout (float) – Specify a time you want to wait for the shutdown. A value of 0.0 (default) will wait indefinetly.

Return type:

bool

Returns:

Return true if shutdown is requested, false if timeout occured.

async run_coro(coro)[source]

Waits for coro or raises ShutdownRequestedInterruption if shutdown is requested.

Parameters:

coro – The coroutine to wait for

Returns:

The result of the coroutine

async run()[source]
async request_shutdown(reason='', exception=None)[source]

Sets the shutdown request for all loops and tasks in the service to stop

fastiot.core.service_annotations module

Decorator functions to add the basic functionality to FastIoT Services

fastiot.core.service_annotations.subscribe(subject)[source]

Decorator method for methods subscribing to a subject. The decorated method must have either one or two arguments:

  • subscribe_something(message: Type[FastIoTData])

  • subscribe_something(subject_name: str, message: Type[FastIoTData])

See Publish, subscribe, request and reply in your service for more details.

Parameters:

subject (Subject) – The subject (fastiot.core.data_models.Subject) to subscribe.

fastiot.core.service_annotations.reply(subject)[source]

Decorator for methods replying on the specified subject. This works similar to fastiot.core.service_annotations.subscribe() but you have to return a Msg as a reply message.

Parameters:

subject (ReplySubject) – The subject to subscribe to for sending replies

fastiot.core.service_annotations.loop(fn)[source]

Decorator function for methods to run continuously. This will basically create a “while True loop” wrapper around the provided function. This is purely syntactic sugar for run_task.

Your method needs to return an awaitable that is awaited after each loop execution before the next iteration. However, the returned awaitable does not finish if a service shutdown is requested. It uses self.run_coro under the hood. But it is guaranteed, that the annotated loop function is awaited and not interrupted if a shutdown is requested.

Example:

@loop
async def log_still_running(self):
    self._logger.info("Service is still running. Next message in 10 seconds.")
    return asyncio.sleep(10.0)

fastiot.core.subject_helper module

fastiot.core.subject_helper.sanitize_pub_subject_name(subject_name)[source]

This function will help you to check if the subject in right format and build it for you. In FastIoT Framework, the right format base for subject_name is: “v1.my_message**”, this will only subscribe till my_message level. If you want to build a hierarchy for this topic, the subject_name must be “v1.my_topic.*” or “v1.my_topic.>”

Return type:

str

In summary, it will do the following for you:
  • “MyMessage” -> “v1.my_message”;

  • “my_message” -> “v1.my_message”;

  • “MyMessage.*” -> “v1.my_message.*”

  • “my_message.*” -> “v1.my_message.*”

  • “v1.MyMessage” -> “v1.my_message”

  • “v1.my_message” -> “v1.my_message”

  • “v1.MyMessage” -> “v1.my_message”

  • “v1.my_message.*” -> “v1.my_message.*”

If the suffix is a “>” instead of a “*”, it will also be kept.

fastiot.core.subject_helper.filter_specific_sign(name)[source]
Return type:

str

fastiot.core.time module

fastiot.core.time.get_time_now()[source]
Return type:

datetime

fastiot.core.time.ensure_tzinfo(v)[source]