fastiot.core.broker_connection module¶
- class fastiot.core.broker_connection.NatsBrokerSubscription(subscription_error_cb=None, **kwargs)[source]¶
- class fastiot.core.broker_connection.NatsBrokerSubscriptionReplySubject(subject, cb, send_reply_fn)[source]¶
- class fastiot.core.broker_connection.BrokerConnection(**kwargs)[source]¶
-
- abstract async subscribe(subject, cb)[source]¶
Subscribe to a subject.
- Parameters:
subject (
Subject
) – The subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,None
]]) – The callback which is called when a message is received
- Return type:
- async subscribe_msg_queue(subject, msg_queue)[source]¶
Subscribe to a subject using a message queue instead of a callback. Use this if you prefer querying a msg_queue.
- Parameters:
subject (
Subject
) – The subject to subscribe tomsg_queue (
Queue
[Union
[FastIoTData
,dict
]]) – The message queue where received messages are enqueued.
- Return type:
- abstract async subscribe_reply_cb(subject, cb)[source]¶
Subscribe to a reply subject. It is expected that the message will be answered.
- Parameters:
subject (
ReplySubject
) – The reply subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,Union
[FastIoTData
,dict
]]]) – The callback which is called when a request is received
- Return type:
- async publish(subject, msg)[source]¶
Publishes a message for a subject.
- Parameters:
subject (
Subject
) – The subject info to publish to.msg (
Union
[FastIoTPublish
,dict
]) – The message.
- async request(subject, msg, timeout=30.0)[source]¶
Send a request on a subject.
- Parameters:
subject (
ReplySubject
) – The subject used for sending the request.msg (
Union
[FastIoTRequest
,dict
]) – The requesttimeout (
float
) – The time to wait for an answer. Raises ErrTimeout if no answer is received in time.
- Return type:
Union
[FastIoTResponse
,dict
]
:return The response
- run_threadsafe_nowait(coro)[source]¶
Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.
- Parameters:
coro (
Coroutine
) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’- Return type:
Future
- run_threadsafe(coro, timeout=0.0)[source]¶
Runs a coroutine on brokers event loop. This method is thread-safe. It can be useful if you want to interact with the broker from another thread.
- Parameters:
coro (
Coroutine
) – The coroutine to run thread-safe on brokers event loop, for example ‘broker_client.publish(…)’timeout (
float
) – The number of seconds to wait for the result to be done. Raises concurrent.futures.TimeoutError if timeout exceeds. A value of zero means wait forever.
- Return type:
Any
- Returns:
Returns the result of the coroutine. If the coroutine raised an exception, it is reraised.
- publish_sync(subject, msg, timeout=0.0)[source]¶
Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe.
- Parameters:
subject (
Subject
) – The subject info to publish to.msg (
Union
[FastIoTPublish
,dict
]) – The message.timeout (
float
) – The timeout.
- publish_sync_nowait(subject, msg)[source]¶
Publishes a message for a subject. This method is thread-safe. Under the hood, it uses run_threadsafe_nowait.
- Parameters:
subject (
Subject
) – The subject info to publish to.msg (
Union
[FastIoTPublish
,dict
]) – The message.
- Return type:
Future
- request_sync(subject, msg, timeout=30.0)[source]¶
Performs a request on the subject. This method is thread-safe. Under the hood, it uses run_threadsafe.
Please note, that it will only timeout if the request times out. For purposes of simplicity it will wait forever, if the executing thread is occupied too much and the request cannot be scheduled.
- Parameters:
subject (
ReplySubject
) – The subject info to publish the request.msg (
Union
[FastIoTRequest
,dict
]) – The request message.timeout (
float
) – The timeout for the broker call.
- Return type:
Union
[FastIoTResponse
,dict
]
:return The requested message.
- class fastiot.core.broker_connection.NatsBrokerConnection(client, subscription_error_cb=None)[source]¶
- async classmethod connect(closed_cb=None, subscription_error_cb=None)[source]¶
Connects a nats instance and returns a nats broker connection.
- Return type:
- async subscribe(subject, cb)[source]¶
Subscribe to a subject.
- Parameters:
subject (
Subject
) – The subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,None
]]) – The callback which is called when a message is received
- Return type:
- async subscribe_reply_cb(subject, cb)[source]¶
Subscribe to a reply subject. It is expected that the message will be answered.
- Parameters:
subject (
ReplySubject
) – The reply subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,Union
[FastIoTData
,dict
]]]) – The callback which is called when a request is received
- Return type:
- class fastiot.core.broker_connection.BrokerConnectionDummy(**kwargs)[source]¶
A dummy broker implementation to mock dependencies.
- async subscribe(subject, cb)[source]¶
Subscribe to a subject.
- Parameters:
subject (
Subject
) – The subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,None
]]) – The callback which is called when a message is received
- Return type:
- async subscribe_reply_cb(subject, cb)[source]¶
Subscribe to a reply subject. It is expected that the message will be answered.
- Parameters:
subject (
ReplySubject
) – The reply subject to subscribe tocb (
Callable
[...
,Coroutine
[None
,None
,Union
[FastIoTData
,dict
]]]) – The callback which is called when a request is received
- Return type: