import datetime
from fastiot import logging
from fastiot.core import FastIoTService, subscribe, reply
from fastiot.db.influxdb_helper_fn import get_async_influxdb_client_from_env
from fastiot.env.env import env_influxdb
from fastiot.msg.hist import HistObjectReq, HistObjectResp
from fastiot.msg.thing import Thing
from fastiot_core_services.time_series.env import time_series_env as env
[docs]class TimeSeriesService(FastIoTService):
[docs] def __init__(self, **kwargs):
super().__init__(**kwargs)
self.msg_counter_ = 0
self.client = None
async def _start(self):
self.client = await get_async_influxdb_client_from_env()
async def _stop(self):
await self.client.close()
[docs] @subscribe(subject=Thing.get_subject(env.subscribe_subject))
async def consume(self, msg: Thing):
data = [{"measurement": str(msg.name),
"tags": {"machine": str(msg.machine),
"unit": str(msg.unit)},
"fields": {"value": msg.value},
"time": msg.timestamp
}]
await self.client.write_api().write(bucket=env_influxdb.bucket, org=env_influxdb.organisation,
record=data, precision='ms')
self.msg_counter_ = self.msg_counter_ + 1
if self.msg_counter_ >= 10:
self.msg_counter_ = 0
logging.info("10 datasets written")
[docs] @reply(HistObjectReq.get_reply_subject(name=env.request_subject))
async def reply(self, request: HistObjectReq):
query = await self.generate_query(request)
results: list = []
tables = await self.client.query_api().query(query, org=env_influxdb.organisation)
for table in tables:
for row in table:
results.append({"machine": row.values.get("machine"),
"sensor": row.get_measurement(),
"value": row.get_value(),
"unit": row.values.get("unit"),
"timestamp": row.get_time(),
})
if len(results) > 0:
return HistObjectResp(values=results)
logging.debug("No data found. Returning error code 1.")
return HistObjectResp(values=results, error_msg="no data found", error_code=1)
[docs] @staticmethod
async def generate_query(request: HistObjectReq) -> str:
if request.raw_query and isinstance(request.raw_query, str):
return request.raw_query
query: str = f'from(bucket: "{env_influxdb.bucket}")'
if request.dt_start is not None:
query = query + f'|> range(start: {str(request.dt_start.timestamp()).split(".", maxsplit=1)[0]}'
if request.dt_end is not None:
query = query + f', stop: {str(request.dt_end.timestamp()).split(".", maxsplit=1)[0]}'
query = query + ')'
else:
start = datetime.datetime.utcnow() - datetime.timedelta(days=30)
query = query + "|> range(start:" + str(str(start.timestamp()).split(".", maxsplit=1)[0]) + ")"
if request.sensor is not None:
query = query + f'|> filter(fn: (r) => r["_measurement"] == "{request.sensor}")'
if request.machine is not None:
query = query + f'|>filter(fn: (r) => r["machine"] =="{request.machine}")'
query = query + f'|> limit(n: {request.limit})' \
'|> group(columns:["time"])' \
'|> sort(columns: ["_time"])'
return query
if __name__ == '__main__':
TimeSeriesService.main()