from datetime import datetime
from typing import Optional, List
import numpy as np
import pandas as pd
from pydantic import BaseModel
from fastiot.core.time import ensure_tzinfo
from fastiot.msg import Thing
[docs]class ThingSeries(BaseModel):
"""
This class is used to store a list of Things with the same machine and name
"""
dt_start: Optional[datetime]
dt_end: Optional[datetime]
thing_list: List[Thing] = []
[docs] def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.thing_list:
kwargs["dt_start"] = kwargs["thing_list"][0].timestamp
kwargs["dt_end"] = kwargs["thing_list"][-1].timestamp
super().__init__(**kwargs)
[docs] def remove_until(self, timestamp: datetime):
"""
Removes all data until timestamp. It is guaranteed that if this operation finishes successfully, the values
starting from 2nd until the end of the time series will be greater then timestamp. If the time series includes
timestamp when the function is called, it still will after it finishes.
The dt_start attribute will be adjusted accordingly. No timestamps of values will be changed.
:param timestamp: Timestamp used for removal.
"""
timestamp = ensure_tzinfo(timestamp)
if self.thing_list:
while len(self.thing_list) > 1:
if self.thing_list[0].timestamp <= timestamp:
self.thing_list.pop(0)
else:
break
self.dt_start = self.thing_list[0].timestamp
[docs] def remove_from(self, timestamp: datetime):
timestamp = ensure_tzinfo(timestamp)
if self.thing_list:
while len(self.thing_list) > 1:
if self.thing_list[-1].timestamp >= timestamp:
self.thing_list.pop()
else:
break
self.dt_end = self.thing_list[-1].timestamp
[docs]class HistoricSensor:
[docs] def __init__(self, name, machine, customer, service):
self.historic_sensor_data: Optional[ThingSeries] = None
self.name = name
self.machine = machine
self.customer = customer
self.service = service
"""def get_min(self, historic_sensors_list, dashboard):
result = pytz.UTC.localize(datetime.datetime.max)
for sensor in dashboard.get("sensors"):
for historic_sensor in historic_sensors_list:
if historic_sensor.name == sensor.get("name") and historic_sensor.machine == sensor.get("machine"):
try:
if result > historic_sensor.historic_sensor_data.dt_start:
result = historic_sensor.historic_sensor_data.dt_start
except AttributeError:
print(
"Sensor " + sensor.get("name") + " of machine" + sensor.get("machine") + " failed to load")
return 0
return timegm(result.utctimetuple())"""
"""def get_max(self, historic_sensors_list, dashboard):
result = pytz.UTC.localize(datetime.datetime.min)
for sensor in dashboard.get("sensors"):
for historic_sensor in historic_sensors_list:
if historic_sensor.name == sensor.get("name") and historic_sensor.machine == sensor.get("machine"):
try:
if result < historic_sensor.historic_sensor_data.dt_end:
result = historic_sensor.historic_sensor_data.dt_end
except AttributeError:
print(
"Sensor " + sensor.get("name") + " of machine" + sensor.get("machine") + " failed to load")
return 0
return timegm(result.utctimetuple())"""
[docs] @staticmethod
def to_df(historic_sensor_list: List['HistoricSensor']):
historic_sensor_df_list = [
pd.DataFrame(
[(thing.timestamp, thing.value) for thing in historic_sensor.historic_sensor_data.thing_list],
columns=['datetime', historic_sensor.name])
for counter, historic_sensor in enumerate(historic_sensor_list)]
historic_sensors_df = pd.concat(historic_sensor_df_list, axis=0)
_, i = np.unique(historic_sensors_df.columns, return_index=True)
historic_sensors_df = historic_sensors_df.iloc[:, i]
historic_sensors_df['datetime'] = historic_sensors_df['datetime'].dt.tz_localize(None)
historic_sensors_df = historic_sensors_df.set_index('datetime')
return historic_sensors_df