Source code for fastiot.core.serialization

import msgpack


from fastiot.core.data_models import FastIoTData, Msg, MsgCls


[docs]def serialize_to_bin(msg_cls: MsgCls, msg: Msg) -> bytes: """ Serializes a msg to binary. It also applies some basic type checks. """ if issubclass(msg_cls, FastIoTData): if not isinstance(msg, msg_cls): raise TypeError( f"Expected msg to be of type '{msg_cls}', but it is instead " f"of type '{type(msg)}' instead." ) return msgpack.packb(msg.dict(), datetime=True) return msgpack.packb(msg, datetime=True)
[docs]def serialize_from_bin(msg_cls: MsgCls, data: bytes) -> Msg: """ Serializes a msg from binary. """ unpacked = msgpack.unpackb(data, timestamp=3) if issubclass(msg_cls, FastIoTData): return msg_cls(**unpacked) return unpacked