Source code for fastiot_sample_services.redis_producer.redis_producer_module

import asyncio
import logging
import random
from datetime import datetime

from fastiot.core import FastIoTService, loop, subscribe
from fastiot.db.redis_helper import RedisHelper
from fastiot.msg import Thing, RedisMsg
from fastiot.util.object_helper import parse_object


[docs]class ExampleRedisProducerService(FastIoTService): def __int__(self, **kwargs): super().__init__(**kwargs) self.helper = None async def _start(self): self.helper = RedisHelper(self.broker_connection)
[docs] @loop async def produce(self): sensor_name = 'my_redis_sensor' value = random.randint(20, 30) subject = RedisMsg.get_subject(sensor_name) thing = Thing( name=sensor_name, machine='FastIoT_Example_Machine', value=value, unit="m", timestamp=datetime.utcnow()) await self.helper.send_data(data=thing, subject=subject) return asyncio.sleep(2)
[docs] @subscribe(subject=RedisMsg.get_subject("my_redis_sensor")) async def consume(self, msg: RedisMsg): thing: Thing = parse_object(await self.helper.get_data(msg.id), Thing) logging.info("%s: %s", msg.get_subject().name, str(thing))
if __name__ == '__main__': ExampleRedisProducerService.main()