Source code for fastiot.db.influxdb_helper_fn

import asyncio
import logging
import sys

from fastiot.env.env import env_influxdb
from fastiot.exceptions import ServiceError


[docs]class Client: """ Singleton for Async InfluxDB Client""" client = None
[docs]async def get_async_influxdb_client_from_env(): """ For connecting Influxdb, the environment variables can be set, if you want to use your own settings instead of default: :envvar:`FASTIOT_INFLUX_DB_HOST`, :envvar:`FASTIOT_INFLUX_DB_PORT`, :envvar:`FASTIOT_INFLUX_DB_TOKEN` After setting up the InfluxDB Server, the InfluxDB Server provides the possibility to visualize data in this database using browser with "http:<host>:<port>". Default username: *influx_db_admin* and password: *mf9ZXfeLKuaL3HL7w*. You can also change these default values by editing :envvar:`FASTIOT_INFLUX_DB_USER` and :envvar:`FASTIOT_INFLUX_DB_PASSWORD`. >>> influxdb_client = await get_async_influxdb_client_from_env() """ if Client.client is None: Client.client = await create_async_influxdb_client_from_env() return Client.client
[docs]async def get_new_async_influx_client_from_env(): """ Instead of using the singleton like in :meth:`get_async_influxdb_client_from_env` a new connection to the database will be established. This seems to be necessary in some test cases. """ Client.client = await create_async_influxdb_client_from_env() return Client.client
[docs]async def create_async_influxdb_client_from_env(): try: # pylint: disable=import-outside-toplevel from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync from influxdb_client.client.exceptions import InfluxDBError from aiohttp.client_exceptions import ClientError except (ImportError, ModuleNotFoundError): logging.error("You have to manually install `fastiot[influxdb]` or `influxdb-client[async]>=1.30,<2` using " "your `pyproject.toml` to make use of this helper.") sys.exit(5) sleep_time = 0.25 num_tries = 50 while num_tries > 0: try: client = InfluxDBClientAsync( url=f"http://{env_influxdb.host}:{env_influxdb.port}", token=env_influxdb.token, org=env_influxdb.organisation, timeout=15 * 1000 ) health_check = await client.ping() if health_check: logging.info('Connected to InfluxDB Server!') return client except (InfluxDBError, ClientError): pass await asyncio.sleep(sleep_time) num_tries -= 1 raise ServiceError("Could not connect to InfluxDB")
[docs]async def influx_query(machine, name, start_time, end_time): client = await get_new_async_influx_client_from_env() query = f'from(bucket: "{env_influxdb.bucket}")' \ f'|> range(start: {start_time}Z, stop: {end_time}Z)' \ f'|> group(columns: ["time"])' \ f'|> sort(columns: ["_time"])' \ f'|> filter(fn: (r) => r["machine"] == "{machine}")' \ f'|> filter(fn: (r) => r["_field"] == "value")' \ f'|> filter(fn: (r) => r["_measurement"] == "{name}")' result = await client.query_api().query(org=env_influxdb.organisation, query=query) await client.close() return result
[docs]def influx_query_wrapper(coro, *args): coroutine = coro(*args) r = asyncio.run(coroutine) return r