Source code for fastiot_core_services.dash.dash_module

import io
from dataclasses import dataclass
from datetime import datetime, timedelta, date

import dash
import dash_bootstrap_components as dbc
import pandas as pd
import plotly.graph_objects as go
from dash import dcc, html
from flask import send_file
from pymongo.errors import ServerSelectionTimeoutError

from fastiot.core import FastIoTService, Subject
from fastiot.core.subject_helper import sanitize_pub_subject_name
from fastiot.db.influxdb_helper_fn import influx_query_wrapper, influx_query
from fastiot.db.mongodb_helper_fn import get_mongodb_client_from_env
from fastiot.env import env_mongodb
from fastiot.exceptions import ServiceError
from fastiot.msg.thing import Thing
from fastiot.util.config_helper import read_config
from fastiot_core_services.dash.env import env_dash
from fastiot_core_services.dash.model.historic_sensor import HistoricSensor
from fastiot_core_services.dash.model.live_sensor import LiveSensor
from fastiot_core_services.dash.utils import ServerThread, thing_series_from_mongodb_data_set, \
    thing_series_from_influxdb_data_set


[docs]class DashModule(FastIoTService):
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self.config = read_config(self) self.live_sensor_list = [] self.historic_sensor_list = [] self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP], prevent_initial_callbacks=True) self.server = ServerThread(self.app.server) self.setup_live_sensors() self.start_datetime = None self.end_datetime = None self._mongo_collection = None self.subject = Subject(name=sanitize_pub_subject_name(self.config['subject_name']), msg_cls=Thing)
async def _start(self): """ Methods to start once the module is initialized """ self.server.start() self._logger.info('=== Started Dash Service at %s:%s ===', env_dash.dash_host, env_dash.dash_port) self.initial_start_date = self.initial_date(self.config.get("initial_start_date")) self.initial_end_date = self.initial_date(self.config.get("initial_end_date")) self._setup_dash() try: await self._setup_mongodb() self.setup_historic_sensors(self.initial_start_date, self.initial_end_date) except ServiceError as service_error: self._logger.error(f'MongoDB Service is not available ! {service_error}') except ServerSelectionTimeoutError as server_selection_timeout: self._logger.error(f'MongoDB Server cannot be selected ! {server_selection_timeout}') await self.broker_connection.subscribe(subject=self.subject, cb=self._cb_received_data) async def _setup_mongodb(self): configured_databases = [d.get('db') for d in self.config['dashboards']] if "mongodb" in configured_databases: client_mongodb = get_mongodb_client_from_env() mongodb = client_mongodb.get_database(env_mongodb.name) self._mongo_collection = mongodb.get_collection(self.config.get("collection")) async def _stop(self): """ Methods to call on module shutdown """ self.server.shutdown()
[docs] def initial_date(self, date_in): if isinstance(date_in, str): date_out = datetime.utcnow() date_in = date_in.replace("now", "") if "-" in date_in: date_in = date_in.replace("-", "") x = datetime.strptime(date_in, '%H:%M:%S') date_out = date_out - timedelta(hours=x.hour, minutes=x.minute, seconds=x.second) else: date_out = date_in return date_out
[docs] def setup_live_sensors(self): for dashboard in self.config.get("dashboards"): if dashboard.get("live_data"): for sensor in dashboard.get("sensors"): live_sensor = LiveSensor(sensor.get("name"), sensor.get("machine"), dashboard.get("customer"), sensor.get("service") ) self.live_sensor_list.append(live_sensor)
def _setup_dash(self): self.app.title = "Data Dashboard" self.app.layout = html.Div(self.setup_html( self.initial_start_date, self.initial_end_date) ) for i_dashboard, dashboard in enumerate(self.config.get("dashboards")): if dashboard.get("live_data"): self.app.callback( dash.dependencies.Output(str(i_dashboard), 'figure'), [dash.dependencies.Input('refreshing' + str(i_dashboard), 'value'), dash.dependencies.Input(str(i_dashboard) + "interval", 'n_intervals')] )(GraphCallbacks(module=self, dashboard=dashboard).update_graph) else: self.app.callback( dash.dependencies.Output(str(i_dashboard), 'figure'), [dash.dependencies.Input(str(i_dashboard) + 'date-picker', 'start_date'), dash.dependencies.Input(str(i_dashboard) + 'date-picker', 'end_date'), dash.dependencies.Input(str(i_dashboard) + 'button', 'n_clicks')] )(GraphCallbacks(module=self, dashboard=dashboard).show_graph) self.app.server.route( "/download_excel/")(self.download_excel)
[docs] def update_graph(self, dashboard, *args, **kwargs): traces = [] for sensor in dashboard.get("sensors"): values = [] timestamp = [] for live_sensor in self.live_sensor_list: if live_sensor.name == sensor.get("name"): for sensor_list in live_sensor.live_sensors: values.append(sensor_list.value) timestamp.append(sensor_list.timestamp) trace1 = go.Scatter( x=timestamp, y=values, name=sensor.get("name"), ) traces.append(trace1) return traces
[docs] def setup_html(self, start_date, end_date): html_cards = [] html_navbar_elements = [] html_elements = [] for i_dashboard, dashboard in enumerate(self.config.get("dashboards")): html_card_elements = [] html_navbar_elements.extend([ html.A(dbc.Button(dashboard.get("name"), id=str(i_dashboard) + "nav_button", className="ms-1"), href='#card' + str(i_dashboard), )]) if dashboard.get("live_data"): html_card_elements.extend([ dcc.RadioItems( id='refreshing' + str(i_dashboard), options=[ {'label': 'Stop refreshing', 'value': 'stop'}, {'label': 'refreshing', 'value': 'start'}, ], value='start', labelStyle={'display': 'inline-block'} )]) html_card_elements.extend([ dcc.Graph(id=str(i_dashboard)), dcc.Interval( id=str(i_dashboard) + "interval", interval=dashboard.get("refresh_time"), n_intervals=0, ) ]) else: html_card_elements.extend([ html.Button('Click to reload', id=str(i_dashboard) + 'button', n_clicks=0)]) html_card_elements.extend([ dcc.Graph(id=str(i_dashboard)) ]) if not dashboard.get("live_data"): html_card_elements.extend([ dcc.DatePickerRange( id=str(i_dashboard) + 'date-picker', display_format='DD-MM-YYYY', max_date_allowed=date.today(), updatemode='bothdates', initial_visible_month=date.today(), start_date=start_date, end_date=end_date) ]) html_card_elements.extend([ html.A("download excel", href="/download_excel/")]) card = dbc.Card([ dbc.CardHeader(dashboard.get("name")), dbc.CardBody(html_card_elements), ], color="#a2d4c7", outline=True, className="mb-4", id="card" + str(i_dashboard), ) html_cards.append(card) html_navbar = dbc.Navbar( dbc.Row( [ # Use row and col to control vertical alignment of logo / brand dbc.Col( [ html.Img(src="/assets/fraunhofer_logo.png", height="40px", className="mb-3"), dbc.NavbarBrand("Data Dashboard", className="ms-4"), ], className="ps-3" ), dbc.Col( html_navbar_elements, width="auto", align="center", ), ], ), color="#185a47", dark=True, sticky="top", ) html_elements.append(html_navbar) html_elements.extend(html_cards) return html_elements
[docs] def setup_historic_sensors(self, start_time: datetime, end_time: datetime): self.historic_sensor_list.clear() for dashboard in self.config.get("dashboards"): if not dashboard.get("live_data"): for sensor in dashboard.get("sensors"): historic_sensor = HistoricSensor(sensor.get("name"), sensor.get("machine"), dashboard.get("customer"), sensor.get("service") ) if "mongodb" in dashboard.get("db"): try: result = self._mongo_collection.find({ "name": historic_sensor.name, "machine": historic_sensor.machine, 'timestamp': {'$gte': start_time, '$lte': end_time} }) r = list(result) self._logger.info(f'got {len(r)} results from mongodb') historic_sensor.historic_sensor_data = thing_series_from_mongodb_data_set(r) historic_sensor.historic_sensor_data.remove_until(start_time) historic_sensor.historic_sensor_data.remove_from(end_time) except AttributeError as e: self._logger.info( f'MongoDB Server cannot be connected, thus _mongo_collection is still None. {e}') elif "influxdb" in dashboard.get("db"): query_results = influx_query_wrapper( influx_query, sensor.get("machine"), sensor.get("name"), start_time.isoformat(), end_time.isoformat()) historic_sensor.historic_sensor_data = \ thing_series_from_influxdb_data_set(query_results.to_json()) self.historic_sensor_list.append(historic_sensor)
[docs] def download_excel(self, *args, **kwargs): if self.start_datetime and self.end_datetime and self.historic_sensor_list: self._logger.info("Download excel file from %s to %s", str(self.start_datetime), str(self.end_datetime)) self.setup_historic_sensors(self.start_datetime, self.end_datetime) df = HistoricSensor.to_df(historic_sensor_list=self.historic_sensor_list) # Convert DF str_io = io.BytesIO() excel_writer = pd.ExcelWriter(str_io, engine='xlsxwriter') df.to_excel(excel_writer, sheet_name='labor') excel_writer.save() excel_data = str_io.getvalue() str_io.seek(0) return send_file(str_io, as_attachment=True, download_name=f'{self.start_datetime}-{self.end_datetime} Data.xlsx') self._logger.warning('Please set the start_datetime and end_datetime in DatePicker first ' 'to download the excel file')
[docs] def show_graph(self, dashboard, start_date_str, end_date_str, *args, **kwargs): start_datetime = datetime.fromisoformat(start_date_str) end_datetime = datetime.fromisoformat(end_date_str) self.start_datetime = start_datetime self.end_datetime = end_datetime self.setup_historic_sensors(start_datetime, end_datetime) traces = [] for sensor in dashboard.get("sensors"): values = [] timestamps = [] for historic_sensor in self.historic_sensor_list: if historic_sensor.name == sensor.get("name") and \ historic_sensor.machine == sensor.get("machine") and \ dashboard.get("customer") == historic_sensor.customer and \ sensor.get("service") == historic_sensor.service: if historic_sensor.historic_sensor_data.thing_list: for thing in historic_sensor.historic_sensor_data.thing_list: timestamps.append(thing.timestamp) values.append(thing.value) trace1 = go.Scatter( x=timestamps, y=values, name=sensor.get("name") ) traces.append(trace1) self.historic_sensor_list = [] return traces
async def _cb_received_data(self, subject: str, msg: Thing): self._logger.debug("Received message from sensor %s: %s", msg.name, str(msg)) for dashboard in self.config.get("dashboards"): for sensor in dashboard.get("sensors"): if sensor.get("name") == msg.name: for live_sensor in self.live_sensor_list: if sensor.get("name") == live_sensor.name: live_sensor.live_sensors.append(msg) live_sensor.clean_until(msg.timestamp, dashboard.get("time_shown"))
[docs]@dataclass class GraphCallbacks: module: DashModule dashboard: {}
[docs] def show_graph(self, *args, **kwargs): traces = self.module.show_graph(self.dashboard, *args, **kwargs) return { 'data': traces, 'layout': go.Layout( title='Historic Data', barmode='stack') }
[docs] def update_graph(self, refresh, *args, **kwargs): if refresh == 'stop': return dash.no_update traces = self.module.update_graph(self.dashboard, *args, **kwargs) return { 'data': traces, 'layout': go.Layout( title='Live Data', barmode='stack') }
if __name__ == '__main__': DashModule.main()