Source code for fastiot.db.mongodb_helper_fn

import sys
from typing import Dict

from fastiot import logging
from fastiot.env import env_mongodb
from fastiot.exceptions import ServiceError
from fastiot.msg.time_series_msg import TimeSeriesData


[docs]class MongoClientWrapper:
[docs] def __init__(self, db_host: str, db_port: int, db_user: str = None, db_password: str = None, db_auth_source: str = None, db_compression: str = None): """ Constructor for a customer mongo client. Please note, that it will also set the feature compatibility version to the current mongodb version which may cause the database to be harder to downgrade. *Note:* You have to manually install ``pymongo>=4.1,<5`` using your :file:`requirements.txt` to make use of this helper. Database clients are not automatically installed to keep the containers smaller. """ try: # pylint: disable=import-outside-toplevel from bson.binary import UUID_SUBTYPE from bson.codec_options import CodecOptions from pymongo import MongoClient from pymongo.errors import ConnectionFailure except (ImportError, ModuleNotFoundError): logging.error("You have to manually install " "`pymongo>=4.1,<5` or `fastiot[mongo]` using your `pyproject.toml` to make use of " "this helper.") sys.exit(5) mongo_client_kwargs = { "host": db_host, "port": db_port, "username": db_user, "password": db_password } if db_auth_source is not None: mongo_client_kwargs["authSource"] = db_auth_source if db_compression is not None: mongo_client_kwargs["compressors"] = db_compression if db_compression == "zlib": mongo_client_kwargs["zlibCompressionLevel"] = 9 self._client = MongoClient(**mongo_client_kwargs) try: self._client.admin.command('ping') logging.info("Connection to database established") except ConnectionFailure: logging.exception("Connecting to database failed") raise ServiceError("Database is not available") self.__set_version_compatibility_to_current_version() self._codec_options = CodecOptions(uuid_representation=UUID_SUBTYPE)
def __set_version_compatibility_to_current_version(self): version_array = self._client.server_info()['versionArray'] version = f"{version_array[0]}.{version_array[1]}" self._client.admin.command({"setFeatureCompatibilityVersion": version})
[docs] def get_client(self): return self._client
[docs]def get_mongodb_client_from_env(): """ For connecting Mongodb, the environment variables can be set, if you want to use your own settings instead of default: :envvar:`FASTIOT_MONGO_DB_HOST`, :envvar:`FASTIOT_MONGO_DB_PORT`, :envvar:`FASTIOT_MONGO_DB_USER`, :envvar:`FASTIOT_MONGO_DB_PASSWORD`, :envvar:`FASTIOT_MONGO_DB_AUTH_SOURCE`, :envvar:`FASTIOT_MONGO_DB_NAME` >>> mongo_client = get_mongodb_client_from_env() """ db_client = MongoClientWrapper( db_host=env_mongodb.host, db_port=env_mongodb.port, db_user=env_mongodb.user, db_password=env_mongodb.password, db_auth_source=env_mongodb.auth_source ) return db_client.get_client()
[docs]def time_series_data_to_mongodb_data_set(time_series_data: TimeSeriesData) -> Dict: data_set = { '_id': time_series_data.id, 'name': time_series_data.name, 'service_id': time_series_data.service_id, 'measurement_id': time_series_data.measurement_id, 'dt_start': time_series_data.dt_start, 'dt_end': time_series_data.dt_end, 'modified_at': time_series_data.modified_at, 'values': time_series_data.values } return data_set
[docs]def time_series_data_from_mongodb_data_set(data_set: Dict) -> TimeSeriesData: time_series_data = TimeSeriesData( id=data_set['_id'], name=data_set['name'], service_id=data_set['service_id'], measurement_id=data_set['measurement_id'], dt_start=data_set['dt_start'], dt_end=data_set['dt_end'], modified_at=data_set['modified_at'], values=data_set['value'] ) return time_series_data