Source code for fastiot_sample_services.fastapi.fastapi_module

import asyncio
import logging
import os

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from starlette.middleware.cors import CORSMiddleware

from fastiot.core import FastIoTService, subscribe
from fastiot.msg.thing import Thing
from fastiot_sample_services.fastapi.env import env_fastapi
from fastiot_sample_services.fastapi.model import Request, Response
from fastiot_sample_services.fastapi.uvicorn_server import UvicornAsyncServer


[docs]class FastAPIModule(FastIoTService):
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self.app = FastAPI() self._register_routes() self.server = UvicornAsyncServer(self.app, port=env_fastapi.fastapi_port) self.message_received = asyncio.Event() self.last_msg = None
# We have an event trigger whenever a message arrives via nats # This will than trigger the websocket to send some data def _register_routes(self): self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) self.app.websocket("/ws")(self.serve_websocket) self.app.get("/get_some_data")(self._handle_get) self.app.post("/post_some_data")(self._handle_post) self.app.mount("/", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "vue", "dist"), html=True), name="static") # This will serve static files created with a vue.js framework. async def _start(self): """ Methods to start once the module is initialized """ await self.server.up() async def _stop(self): """ Methods to call on module shutdown """ await self.server.down() @subscribe(subject=Thing.get_subject('*')) async def _on_data_received(self, _, msg: Thing): """ Callback whenever some new machine data arrives """ self.last_msg = msg self.message_received.set() def _handle_get(self): """ Simple method to reply to a get request """ return {"hello_world": "Good morning!", "last_message": self.last_msg} def _handle_post(self, message: Request): """ Simple handling of Post Request the = Body(...) is needed as we don’t use pydantic classes, s. https://fastapi.tiangolo.com/tutorial/body-multiple-params/#embed-a-single-body-parameter for more details """ mean = sum(message.req_value) / len(message.req_value) return Response(resp_value=mean)
[docs] async def serve_websocket(self, websocket: WebSocket): """ Demonstration of using websockets You need to send one char first before all new arriving machine data will be sent to the client https://fastapi.tiangolo.com/advanced/websockets/ """ await websocket.accept() await websocket.send_text("Hello world!") await websocket.send_text("Send any char to start!") received = await websocket.receive_text() logging.info("Received data: %s", received) while await self.wait_for_shutdown(): await self.message_received.wait() await websocket.send_text(f"Received message from sensor {self.last_msg.name}: {str(self.last_msg.value)}") self.message_received.clear()