From 692f54562dc6d4acbcd58fe9860decab300d9ec8 Mon Sep 17 00:00:00 2001 From: BlueFox Date: Fri, 13 Jun 2025 16:05:03 +0200 Subject: [PATCH] Added script --- .env | 15 ++++ lge320reader.py | 196 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 .env create mode 100755 lge320reader.py diff --git a/.env b/.env new file mode 100644 index 0000000..38dc012 --- /dev/null +++ b/.env @@ -0,0 +1,15 @@ +READER_PORT=/dev/ttyUSB0 +DBHOST= +DBPORT= +DBUSER= +DBPWD= +DBNAME= +TZ=Europe/Berlin +MQBROKER= +MQPORT= +MQUSER= +MQPWD= +MQTOPIC_CUR= +MQTOPIC_SUM= +PYTHONUNBUFFERED=1 + diff --git a/lge320reader.py b/lge320reader.py new file mode 100755 index 0000000..da72efd --- /dev/null +++ b/lge320reader.py @@ -0,0 +1,196 @@ +#!/usr/local/bin/python3 +import sys +import serial + +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +class LGE320: + def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5): + # initialize variables + self.chunk_size = read_chunk_size + self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41" + + # connect to easymeter q3c + try: + self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout) + self.device.close() + except serial.serialutil.SerialException: + print(bcolors.FAIL + f"Can't connect to Landis+Gyr E320 on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC) + exit(1) + print(bcolors.OKGREEN + f"Connected successfully to Landis+Gyr E320 on Port {serial_port}" + bcolors.ENDC) + + def _enableSerial(self, enable: bool): + if enable: + self.device.open() + else: + self.device.close() + + def _get_energy_value(self, energy_values): + result = {} # result dictionary + for energy_value in energy_values: + watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes) + # write the watthours into the result dictionary + # formula: (((b1*256)+b2)*256+b3)*256+b4 + #result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3] + result[energy_value] = int.from_bytes(watthours, "big", signed=True) + return result + + def read(self): + self._enableSerial(True) # start communication + + # wake up easymeter + #self.device.write(self.init_string) + + # read data + read_buffer = b"" + for _ in range(3): + byte_chunk = self.device.read(size=self.chunk_size) + read_buffer += byte_chunk + + # stop communication + self._enableSerial(False) + + read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message + #print(read_buffer) + #print(read_buffer[read_buffer.find(b'\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x00\x00\x00\x1B\x1B')].hex(" ").upper()) + + energy_values = { + "T1": read_buffer[read_buffer.find(b'\x07\x01\x00\x01\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x02\x08\x00')], # 1.8.0 + "T2": read_buffer[read_buffer.find(b'\x07\x01\x00\x02\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x10\x07\x00')], # 2.8.0 + "P": read_buffer[read_buffer.find(b'\x07\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x01\x01\x01\x63')], # current power + } + return self._get_energy_value(energy_values) + + +# report data to influxdb (every 10s) and mqtt (as often as possible; when new data is received) +if __name__ == "__main__": # if run from cli (not imported as a module) + import os + from dotenv import load_dotenv + import datetime + from time import time, sleep + import threading + + # for reports to mqtt and influxdb + import paho.mqtt.client as mqtt + from influxdb import InfluxDBClient + + # load .env file (if existing) and get the env vars + load_dotenv() + + db_host = os.getenv('DBHOST', None) + db_port = int(os.getenv('DBPORT', None)) + db_user = os.getenv('DBUSER', None) + db_pwd = os.getenv('DBPWD', None) + db_name = os.getenv('DBNAME', None) + + mq_broker = os.getenv('MQBROKER', None) + mq_port = int(os.getenv('MQPORT', None)) + mq_user = os.getenv('MQUSER', None) + mq_pwd = os.getenv('MQPWD', None) + mq_topic_cur = os.getenv('MQTOPIC_CUR', None) + mq_topic_sum = os.getenv('MQTOPIC_SUM', None) + + port = os.getenv('READER_PORT', None) + + if not port: + port = "/dev/ttyUSB0" # default to /dev/ttyUSB0 if READER_PORT env var is not set, and print a warning + print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC) + lge320 = LGE320(port) + + + ## define the clients + def on_connect(client, userdata, flags, reason_code, properties): + if(reason_code==0): + print(f"{bcolors.OKGREEN}Connected successfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.OKGREEN}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.") + else: + print(f"{bcolors.WARNING}Connected unsuccessfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.WARNING}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.") + def on_publish(client, userdata, mid, reason_codes, properties): + #print("Daten erfolgreich veroeffentlicht") # use for debug reasons + pass + mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) + mqtt_client.on_connect = on_connect + mqtt_client.on_publish = on_publish + mqtt_client.username_pw_set(mq_user, mq_pwd) + mqtt_client.connect(mq_broker, mq_port, 60) + mqtt_client.loop_start() + + influx_client = InfluxDBClient(db_host, db_port, db_user, db_pwd, db_name) + + ## define the report functions (to be run in a thread) + def threaded_mqtt_report(): + last_data = data + while True: + # publish data + mqtt_client.publish(mq_topic_cur, "{" + f"\"total_act_power\": {float(data['P'])}" + "}") + mqtt_client.publish(mq_topic_sum, "{" + f"\"total_act\": {float(data['T1'])/10}, \"total_act_ret\": {float(data['T2'])/10}" + "}") + print(f"{bcolors.OKBLUE}Sending data to {bcolors.BOLD}MQTT.{bcolors.ENDC}") + # wait for new data + while last_data == data: + sleep(0.2) + last_data = data + + def threaded_influx_report(): + last_time = time() + while True: + # write data to influx + json_body = [ + { + "measurement": "power", + "tags": { + "counter": 'T181' + }, + "fields": { + #"value": float(data['T1'])*10 // alter zähler + "value": float(data['T1'])/10 + } + }, + { + "measurement": "power", + "tags": { + "counter": 'T280' + }, + "fields": { + "value": float(data['T2'])/10 + } + }, + { + "measurement": "power", + "tags": { + "counter": 'P' + }, + "fields": { + "value": float(data['P']) + } + } + ] + influx_client.write_points(json_body) + print(f"{bcolors.OKBLUE}Written data to the {bcolors.BOLD}influx database.{bcolors.ENDC}") + + # wait 10s (and calculate out the time the reporting took) + while (time() - last_time) < 10: + sleep(1) + last_time = time() + + # initialize data and threads + data = lge320.read() # get data once before starting the threads + + mqtt_report_thread = threading.Thread(target = threaded_mqtt_report) + influx_report_thread = threading.Thread(target = threaded_influx_report) + mqtt_report_thread.start() + influx_report_thread.start() + + # get new data from the reader forever + while True: + data = lge320.read() + print(f"{bcolors.OKGREEN}Received new data:{bcolors.ENDC} @{bcolors.OKBLUE}{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{bcolors.ENDC} | {bcolors.OKCYAN}T1.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T1'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}T2.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T2'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}P:{bcolors.ENDC} {bcolors.BOLD}{data['P']}{bcolors.ENDC}") + +