import asyncio
import signal
from typing import List, Optional, Type
from fastiot.cli.env import env_cli
from fastiot.core.logger import logging
from fastiot.core.broker_connection import BrokerConnection, NatsBrokerConnection, Subscription
from fastiot.env import env_basic
from fastiot.exceptions.exceptions import ShutdownRequestedInterruption
from fastiot.testlib import populate_test_env
[docs]class FastIoTService:
"""
This is the most base class for all FastIoT Services. Your service must inherit from this class for everything to
work.
"""
[docs] @classmethod
def main(cls, **kwargs):
"""
Entrypoint of the class; this is the method to be started using e.g. a :file:`run.py` like generated when
creating a new service. Do not overwrite, unless you know exactly what you are doing.
:param kwargs: kwargs will be passed to class constructor.
"""
async def run_main():
app = None
async def closed_cb():
# We want to request service shutdown if connection closes
nonlocal app
if app is not None:
await app.request_shutdown("Lost connection to broker")
async def subscription_error_cb(err: Exception):
nonlocal app
if app is not None:
logging.error(err)
await app.request_shutdown("Error raised inside broker subscription")
broker_connection = await NatsBrokerConnection.connect(
closed_cb=closed_cb,
subscription_error_cb=subscription_error_cb
)
try:
app = cls(broker_connection=broker_connection, **kwargs)
await app.run()
finally:
await broker_connection.close()
cls._try_setup_test_env()
asyncio.run(run_main())
@staticmethod
def _try_setup_test_env():
if not env_cli.within_container and env_basic.config_dir == '/etc/fastiot': # Test for default (= not set)
# Some helper for local development: Read in env vars like in unit tests
deployment_name = env_cli.use_local_deployment
logging.info("Service started locally. Population environment with variables from %s",
deployment_name or "integration test")
populate_test_env(deployment_name=deployment_name)
[docs] def __init__(self, broker_connection: Optional[BrokerConnection] = None, **kwargs):
super().__init__(**kwargs)
self.broker_connection = broker_connection
self._shutdown_event = asyncio.Event()
self._subscription_fns = []
self._reply_subscription_fns = []
self._loop_fns = []
self._tasks: List[asyncio.Task] = []
self._subs: List[Subscription] = []
self.service_id: str = env_basic.service_id # Use to separate different services instantiated
self._logger = logging
self._logger.info("Started service %s!", self.__class__.__name__)
def _setup_annotations(self):
# We cannot setup annotations inside __init__ because some services may have properties which rely on the
# __init__ method to finish and would otherwise raise exceptions, e.g. a service that reads the config in the
# __init__ and provides it's values via properties.
self._subscription_fns = []
self._reply_subscription_fns = []
self._loop_fns = []
for name in dir(self):
if name.startswith('__'):
continue
attr = getattr(self, name)
if hasattr(attr, '_fastiot_subject'):
self._subscription_fns.append(attr)
if hasattr(attr, '_fastiot_reply_subject'):
self._reply_subscription_fns.append(attr)
if hasattr(attr, '_fastiot_is_loop'):
self._loop_fns.append(attr)
async def _start(self):
""" Optionally overwrite this method to run any async start commands like ``await self._server.start()``` """
async def _stop(self):
""" Optionally overwrite this method to run any async stop commands like ``await self._server.stop()``` """
[docs] def run_task(self, coro):
"""
Creates an asyncio-Task which is managed by this class. If the execution
of the coroutine raises an exception, they are logged and a service
shutdown is requested. If the task terminates regularly, it is dropped
and the module continuous to run.
The task is awaited after _stop() has been called.
"""
self._tasks.append(
asyncio.create_task(self._exec_task(coro=coro))
)
async def _exec_task(self, coro):
err = None
try:
await coro
except Exception as exception: # pylint: disable=broad-exception-caught
logging.exception("Uncaught exception raised inside task")
err = exception
if err:
await self.request_shutdown("Task failed with an exception", exception=err)
[docs] async def wait_for_shutdown(self, timeout: float = 0.0) -> bool:
"""
Method to wait for service shutdown. This is helpfull if you have a loop
running forever till the service needs to shut down.
Shutdown may occur when some other parts of the service fail like
database connection or broker connection.
Per default, it will wait indefinetly, but you can specify a timeout. If
timeout exceeds, it will not raise a timeout error, but instead return
false. Otherwise, it will return true.
Example:
>>> while await self.wait_for_shutdown(1.0) is False:
>>> print("Still running...")
:param timeout: Specify a time you want to wait for the shutdown. A
value of 0.0 (default) will wait indefinetly.
:returns: Return true if shutdown is requested, false if timeout occured.
"""
if timeout < 0:
raise ValueError("Timeout must be greater or equal zero")
if timeout == 0:
await self._shutdown_event.wait()
return True
result = True
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=timeout
)
except asyncio.exceptions.TimeoutError:
result = False
return result
[docs] async def run_coro(self, coro):
"""
Waits for coro or raises ShutdownRequestedInterruption if shutdown is
requested.
:param coro: The coroutine to wait for
:returns: The result of the coroutine
"""
do_raise_err = True
async def _wait_and_raise_interruption():
nonlocal do_raise_err
await self.wait_for_shutdown()
if do_raise_err:
raise ShutdownRequestedInterruption()
for coro_completed in asyncio.as_completed([coro, _wait_and_raise_interruption()]):
result = await coro_completed
do_raise_err = False # We don't want error to be raised if coroutine finished successfully.
return result
async def __aenter__(self):
self._shutdown_event.clear()
self._setup_annotations()
self._try_setup_test_env()
if not self.broker_connection:
self.broker_connection = await NatsBrokerConnection.connect()
await self._start()
await self._start_annotated_subs()
await self._start_annotated_loops()
await asyncio.sleep(0.0) # pass control once so that stuff gets initialized
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.request_shutdown()
await self._stop_subs()
await self._stop()
await self._stop_tasks()
return False
[docs] async def run(self):
self._shutdown_event.clear()
loop = asyncio.get_running_loop()
def handler(signum, _):
nonlocal loop
if signum == signal.SIGTERM:
asyncio.run_coroutine_threadsafe(self.request_shutdown(), loop=loop)
signal.signal(signal.SIGTERM, handler)
self._setup_annotations()
await self._start()
await self._start_annotated_subs()
await self._start_annotated_loops()
await self.wait_for_shutdown()
await self._stop_subs()
await self._stop()
await self._stop_tasks()
[docs] async def request_shutdown(self, reason: str = '', exception: Optional[Type[Exception]] = None ):
""" Sets the shutdown request for all loops and tasks in the service to stop """
if not self._shutdown_event.is_set() and (reason or exception):
if reason:
self._logger.info("Initial shutdown requested with reason: %s.", str(reason))
if exception:
self._logger.exception(exception)
self._shutdown_event.set()
async def _start_annotated_loops(self):
for loop_fn in self._loop_fns:
self.run_task(self._loop_task_cb(loop_fn=loop_fn))
async def _start_annotated_subs(self):
for subscription_fn in self._subscription_fns:
sub = await self.broker_connection.subscribe(
subject=subscription_fn._fastiot_subject, # pylint: disable=protected-access
cb=subscription_fn
)
self._subs.append(sub)
for subscription_fn in self._reply_subscription_fns:
sub = await self.broker_connection.subscribe_reply_cb(
subject=subscription_fn._fastiot_reply_subject, # pylint: disable=protected-access
cb=subscription_fn
)
self._subs.append(sub)
async def _loop_task_cb(self, loop_fn):
while True:
awaitable = await loop_fn()
try:
await self.run_coro(awaitable)
except ShutdownRequestedInterruption:
break
except Exception:
break
async def _stop_subs(self):
for sub in self._subs:
await sub.unsubscribe()
self._subs = []
async def _stop_tasks(self):
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks = []