fastiot.core.broker_connection module

class fastiot.core.broker_connection.Subscription[source]
abstract async unsubscribe()[source]

Cancels the subscription

class fastiot.core.broker_connection.NatsBrokerSubscription(subscription_error_cb=None, **kwargs)[source]
__init__(subscription_error_cb=None, **kwargs)[source]
async unsubscribe()[source]

Cancels the subscription

class fastiot.core.broker_connection.NatsBrokerSubscriptionSubject(subject, cb, **kwargs)[source]
__init__(subject, cb, **kwargs)[source]
async received_nats_msg_cb(nats_msg)[source]
class fastiot.core.broker_connection.NatsBrokerSubscriptionReplySubject(subject, cb, send_reply_fn)[source]
__init__(subject, cb, send_reply_fn)[source]
async received_nats_msg_cb(nats_msg)[source]
class fastiot.core.broker_connection.BrokerConnection(**kwargs)[source]
__init__(**kwargs)[source]
abstract async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_msg_queue(subject, msg_queue)[source]

Subscribe to a subject using a message queue instead of a callback. Use this if you prefer querying a msg_queue.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • msg_queue (Queue[Union[FastIoTData, dict]]) – The message queue where received messages are enqueued.

Return type:

Subscription

abstract async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription

async publish(subject, msg)[source]

Publishes a message for a subject.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

async request(subject, msg, timeout=30.0)[source]

Send a request on a subject.

Parameters:
  • subject (ReplySubject) – The subject used for sending the request.

  • msg (Union[FastIoTRequest, dict]) – The request

  • timeout (float) – The time to wait for an answer. Raises ErrTimeout if no answer is received in time.

Return type:

Union[FastIoTResponse, dict]

:return The response

run_threadsafe_nowait(coro)[source]

Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.

Parameters:

coro (Coroutine) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’

Return type:

Future

run_threadsafe(coro, timeout=0.0)[source]

Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.

Parameters:
  • coro (Coroutine) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’

  • timeout (float) – The number of seconds to wait for the result to be done. Raises concurrent.futures.TimeoutError if timeout exceeds. A value of zero means wait forever.

Return type:

Any

Returns:

Returns the result of the coroutine. If the coroutine raised an exception, it is reraised.

publish_sync(subject, msg, timeout=0.0)[source]

Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

  • timeout (float) – The timeout.

publish_sync_nowait(subject, msg)[source]

Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe_nowait.

Parameters:
  • subject (Subject) – The subject info to publish to.

  • msg (Union[FastIoTPublish, dict]) – The message.

Return type:

Future

request_sync(subject, msg, timeout=30.0)[source]

Performs a request on the subject. This method is thread-safe. Under the hood, it uses run_threadsafe.

Please note, that it will only timeout if the request times out. For purposes of simplicity it will wait forever, if the executing thread is occupied too much and the request cannot be scheduled.

Parameters:
  • subject (ReplySubject) – The subject info to publish the request.

  • msg (Union[FastIoTRequest, dict]) – The request message.

  • timeout (float) – The timeout for the broker call.

Return type:

Union[FastIoTResponse, dict]

:return The requested message.

class fastiot.core.broker_connection.NatsBrokerConnection(client, subscription_error_cb=None)[source]
async classmethod connect(closed_cb=None, subscription_error_cb=None)[source]

Connects a nats instance and returns a nats broker connection.

Return type:

NatsBrokerConnection

__init__(client, subscription_error_cb=None)[source]
async close()[source]
async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription

class fastiot.core.broker_connection.SubscriptionDummy[source]
async unsubscribe()[source]

Cancels the subscription

check_pending_error()[source]
async raise_pending_error()[source]
class fastiot.core.broker_connection.BrokerConnectionDummy(**kwargs)[source]

A dummy broker implementation to mock dependencies.

async subscribe(subject, cb)[source]

Subscribe to a subject.

Parameters:
  • subject (Subject) – The subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, None]]) – The callback which is called when a message is received

Return type:

Subscription

async subscribe_reply_cb(subject, cb)[source]

Subscribe to a reply subject. It is expected that the message will be answered.

Parameters:
  • subject (ReplySubject) – The reply subject to subscribe to

  • cb (Callable[..., Coroutine[None, None, Union[FastIoTData, dict]]]) – The callback which is called when a request is received

Return type:

Subscription