import logging
import re
import time
from typing import List, Dict
import pymongo
from fastiot.core import FastIoTService, Subject
from fastiot.core.subject_helper import sanitize_pub_subject_name, filter_specific_sign, MSG_FORMAT_VERSION, \
WILDCARD_SAME_LEVEL, HIERARCHY
from fastiot.core.time import get_time_now
from fastiot.env import env_basic, env_mongodb
from fastiot.msg.custom_db_data_type_conversion import to_mongo_data, from_mongo_data
from fastiot.msg.hist import HistObjectReq, HistObjectResp
from fastiot_core_services.object_storage.config_model import ObjectStorageConfig, SubscriptionConfig
from fastiot_core_services.object_storage.mongodb_handler import MongoDBHandler
from fastiot_core_services.object_storage.object_storage_helper_fn import build_query_dict
[docs]class ObjectStorageService(FastIoTService):
[docs] def __init__(self, **kwargs):
super().__init__(**kwargs)
self._mongodb_handler = MongoDBHandler()
self.database = self._mongodb_handler.get_database(env_mongodb.name)
self.service_config = ObjectStorageConfig.from_service(self)
if not self.service_config.subscriptions:
self._logger.error('No subscriptions configured in configuration file for object storage. Aborting.')
time.sleep(10)
raise RuntimeError
self._create_index()
def _create_index(self):
for collection, index_config in self.service_config.search_index.items():
for index_nr, index in enumerate(index_config):
if "," in index: # Build compound index
self._logger.warning(
"Using a list seperated by ',' is deprecated. Please convert your configuration "
"to a proper YAML list.")
indices = index.split(",")
index = [i.strip() for i in indices]
if isinstance(index, list):
index = list(zip(index,
map(lambda index_name:
pymongo.ASCENDING if index_name != '_timestamp' else pymongo.DESCENDING,
index)))
# the later the _timestamp in mongo_data - the more time relevant query results
self._logger.debug("Created compound index: '%s'", index)
else:
index = [(index, pymongo.ASCENDING)]
self._mongodb_handler.create_index(collection=self.database[collection],
index=index, index_name=f"index_{index_nr}")
async def _start(self):
for subject_name, subscription_config in self.service_config.subscriptions.items():
subject = Subject(name=sanitize_pub_subject_name(subject_name), msg_cls=dict)
await self.broker_connection.subscribe(subject=subject, cb=self._cb_receive_data)
if not subscription_config.reply_subject_name:
self._logger.warning("Please set `reply_subject_name` in your configuration.\n"
"Using `subject_name` for receiving and sending is deprecating.")
subscription_config.reply_subject_name = subject_name
subscription_config.reply_subject_name = filter_specific_sign(subscription_config.reply_subject_name)
reply_subject = HistObjectReq.get_reply_subject(name=subscription_config.reply_subject_name)
await self.broker_connection.subscribe_reply_cb(subject=reply_subject, cb=self._cb_reply_hist_object)
async def _cb_receive_data(self, subject_name: str, msg: dict):
subscription_config = self._find_matching_subject(subject_name)
self._logger.debug("Received message %s", str(msg))
# True for things; Possibly False for other messages
if 'timestamp' in list(msg.keys()):
timestamp = msg['timestamp']
else:
timestamp = get_time_now()
mongo_data = to_mongo_data(timestamp=timestamp, subject_name=subject_name, msg=msg)
self._logger.debug("Converted Mongo data is %s", mongo_data)
if not subscription_config.enable_overwriting:
self.database[subscription_config.collection].insert_one(mongo_data)
else:
# the last overwriting data should be saved (overwriting has to be asynchron)
self._overwrite_data(mongo_data, subscription_config)
def _find_matching_subject(self, subject_name: str) -> SubscriptionConfig:
if len(self.service_config.subscriptions) == 1: # Quick fix for most tasks
return list(self.service_config.subscriptions.values())[0]
for subscription_name in self.service_config.subscriptions:
regex = MSG_FORMAT_VERSION + r'\.'
regex += subscription_name.replace('.', r'\.'). \
replace(WILDCARD_SAME_LEVEL, r"[^\.]*"). \
replace(HIERARCHY, r".*") + '$'
if re.findall(regex, subject_name):
return self.service_config.subscriptions[subscription_name]
raise RuntimeError(f"Could not find any configured subject to match the message received via subject "
f"`{subject_name}`")
def _overwrite_data(self, mongo_data, subscription_config: SubscriptionConfig):
primary_keys = subscription_config.identify_object_with
collection = self.database[subscription_config.collection]
# generate upsert query
query = {}
for key_name in primary_keys:
query[key_name] = mongo_data[key_name]
# generate an update
update_fields = [field for field in list(mongo_data.keys()) if field not in primary_keys]
update = {'$set': {}}
for field in update_fields:
update['$set'][field] = mongo_data[field]
if env_basic.log_level <= 10:
self._logger.debug("Index used by MongoDB: %s", collection.find(query).explain())
self._logger.debug('found a document to update' if collection.count_documents(query) > 0
else 'inserting the document')
collection.update_one(filter=query, update=update, upsert=True)
async def _cb_reply_hist_object(self, subject: str, hist_object_req: HistObjectReq) -> HistObjectResp:
sub_config = [c for c in self.service_config.subscriptions.values() if
HistObjectReq.get_reply_subject(c.reply_subject_name).name == subject][0]
self._logger.debug("Received request on subject %s with message %s", subject, hist_object_req)
query_dict = build_query_dict(hist_object_req=hist_object_req)
query_results = self._query_db(subscription_config=sub_config,
query_dict=query_dict, limit_nr=hist_object_req.limit)
values = [from_mongo_data(result) for result in query_results]
if values:
hist_object_resp = HistObjectResp(values=values)
else:
hist_object_resp = HistObjectResp(
error_msg='No query results from Mongodb, please check Connection or query',
values=values)
return hist_object_resp
def _query_db(self, subscription_config: SubscriptionConfig, query_dict: Dict, limit_nr: int) -> List:
collection = self.database[subscription_config.collection]
return list(collection.find(query_dict).limit(limit_nr))
if __name__ == '__main__':
# Change this to reduce verbosity or remove completely to use `FASTIOT_LOG_LEVEL` environment variable to configure
# logging.
logging.basicConfig(level=logging.DEBUG)
ObjectStorageService.main()