Source code for fastiot_core_services.opc_ua_reader.machine_monitoring_module

import asyncio
import os
from typing import Any, Dict

from opcua import Client

from fastiot.core import FastIoTService, loop
from fastiot.core.time import get_time_now
from fastiot.msg.thing import Thing
from fastiot_core_services.opc_ua_reader.env import env_opcua, OPCUARetrievalMode
from fastiot_core_services.opc_ua_reader.extract_thing_metadata import extract_thing_metadata_from_csv


[docs]class OPCUAReader(FastIoTService):
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self._loop = asyncio.get_event_loop() self._thing_queue = asyncio.Queue() self._mainloop_wait_condition = asyncio.Condition() self._data_received_event_for_max_allowed_data_delay = asyncio.Event() self._things: Dict[str, Thing] = {} self._load_config()
def _load_config(self): config_dir = env_opcua.opc_ua_reader_config_dir if os.path.isdir(config_dir): self._logger.info("Importing from csv") things_filename = os.path.join(config_dir, "opc_ua_reader_things.csv") if os.path.isfile(things_filename): for nodeid, thing in extract_thing_metadata_from_csv(things_filename).items(): if nodeid in self._things: self._logger.warning(f'Thing with nodeid "{nodeid}" skipped because it is ' f'already included') else: self._things[nodeid] = thing self._logger.info(f'File "{things_filename}" successfully imported.') else: self._logger.info(f'File "{things_filename}" does not exist. Skipping import of sensors.') else: self._logger.warning(f'Config dir "{config_dir}" does not exist') async def _start(self): if env_opcua.max_allowed_data_delay > 0.0: self._logger.info("Opcua max allowed data delay is set.") else: self._logger.info("Opcua max allowed data delay is not set.") self._setup_opcua_client() if len(self._things) > 0: if env_opcua.retrieval_mode is OPCUARetrievalMode.subscription: self._setup_monitoring_subscriptions() elif env_opcua.retrieval_mode in [OPCUARetrievalMode.polling, OPCUARetrievalMode.polling_always]: self.run_task(self._poll_monitored_node_values()) else: raise NotImplementedError() def _setup_opcua_client(self): self._opcua_client = Client(url=env_opcua.endpoint_url) if env_opcua.security_string: self._opcua_client.set_security_string(env_opcua.security_string) if env_opcua.user: self._opcua_client.set_user(env_opcua.user) if env_opcua.password: self._opcua_client.set_password(env_opcua.password) if env_opcua.application_uri: self._opcua_client.application_uri = env_opcua.application_uri self._opcua_client.connect() self._opcua_client_subscription = None def _setup_monitoring_subscriptions(self): opcua_nodes = [] for nodeid in self._things.keys(): opcua_node = self._opcua_client.get_node(nodeid) opcua_nodes.append(opcua_node) self._opcua_client_subscription = self._opcua_client.create_subscription(0, self) self._opcua_client_subscription.subscribe_data_change(opcua_nodes)
[docs] def datachange_notification(self, node, val, _): thing = self._things[node.nodeid.to_string()] self._apply_changes_to_thing(thing, val)
async def _poll_monitored_node_values(self): while env_opcua.polling_delay == 0.0 or await self.wait_for_shutdown(env_opcua.polling_delay) is False: for nodeid, thing in self._things.items(): opc_node = self._opcua_client.get_node(nodeid) val = opc_node.get_value() self._apply_changes_to_thing(thing, val) if self._shutdown_event.is_set() is True: # if a large amount of sensors is polled; evaluating shutdown # after each poll ensures responsiveness in case of shutdown request break @loop async def _mainloop_cb(self): while self._thing_queue.qsize() > 0: enqueued_thing: Thing = await self._thing_queue.get() self._thing_queue.task_done() await self.broker_connection.publish( subject=enqueued_thing.default_subject, msg=enqueued_thing ) self._data_received_event_for_max_allowed_data_delay.set() async with self._mainloop_wait_condition: await self._mainloop_wait_condition.wait() return asyncio.sleep(0) @loop async def _check_max_allowed_data_delay_cb(self): if not env_opcua.max_allowed_data_delay: return asyncio.Event().wait() self._data_received_event_for_max_allowed_data_delay.clear() try: await asyncio.wait_for( self._data_received_event_for_max_allowed_data_delay.wait(), env_opcua.max_allowed_data_delay ) except asyncio.TimeoutError: self._logger.error("Receiving no data. Writing to opcua error logfile.") error_log_dir = os.path.dirname(env_opcua.opc_ua_reader_error_logfile) if os.path.isdir(error_log_dir) is False: os.makedirs(error_log_dir) with open(env_opcua.opc_ua_reader_error_logfile, "a") as f: f.write(f"{get_time_now()} Receiving no data.\n") return asyncio.sleep(0.0) def _apply_changes_to_thing(self, thing: Thing, new_val): if env_opcua.retrieval_mode == OPCUARetrievalMode.polling and thing.value == new_val: return thing.value = new_val thing.timestamp = get_time_now() self._enqueue_thing(thing=thing) def _enqueue_thing(self, thing: Thing): asyncio.run_coroutine_threadsafe( self._put_into_queue(queue=self._thing_queue, msg=thing.copy(deep=True)), self._loop ) async def _put_into_queue(self, queue: asyncio.Queue, msg: Any): await queue.put(msg) async with self._mainloop_wait_condition: self._mainloop_wait_condition.notify_all() async def _stop(self): self._data_received_event_for_max_allowed_data_delay.set() self._opcua_client.disconnect() async with self._mainloop_wait_condition: self._mainloop_wait_condition.notify_all()