#!/usr/local/bin/python3 import sys import serial class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class LGE320: def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5): # initialize variables self.chunk_size = read_chunk_size self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41" # connect to easymeter q3c try: self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout) self.device.close() except serial.serialutil.SerialException: print(bcolors.FAIL + f"Can't connect to Landis+Gyr E320 on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC) exit(1) print(bcolors.OKGREEN + f"Connected successfully to Landis+Gyr E320 on Port {serial_port}" + bcolors.ENDC) def _enableSerial(self, enable: bool): if enable: self.device.open() else: self.device.close() def _get_energy_value(self, energy_values): result = {} # result dictionary for energy_value in energy_values: watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes) # write the watthours into the result dictionary # formula: (((b1*256)+b2)*256+b3)*256+b4 #result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3] result[energy_value] = int.from_bytes(watthours, "big", signed=True) return result def read(self): self._enableSerial(True) # start communication # wake up easymeter #self.device.write(self.init_string) # read data read_buffer = b"" for _ in range(3): byte_chunk = self.device.read(size=self.chunk_size) read_buffer += byte_chunk # stop communication self._enableSerial(False) read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message #print(read_buffer) #print(read_buffer[read_buffer.find(b'\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x00\x00\x00\x1B\x1B')].hex(" ").upper()) energy_values = { "T1": read_buffer[read_buffer.find(b'\x07\x01\x00\x01\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x02\x08\x00')], # 1.8.0 "T2": read_buffer[read_buffer.find(b'\x07\x01\x00\x02\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x10\x07\x00')], # 2.8.0 "P": read_buffer[read_buffer.find(b'\x07\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x01\x01\x01\x63')], # current power } return self._get_energy_value(energy_values) # report data to influxdb (every 10s) and mqtt (as often as possible; when new data is received) if __name__ == "__main__": # if run from cli (not imported as a module) import os from dotenv import load_dotenv import datetime from time import time, sleep import threading # for reports to mqtt and influxdb import paho.mqtt.client as mqtt from influxdb import InfluxDBClient # load .env file (if existing) and get the env vars load_dotenv() db_host = os.getenv('DBHOST', None) db_port = int(os.getenv('DBPORT', None)) db_user = os.getenv('DBUSER', None) db_pwd = os.getenv('DBPWD', None) db_name = os.getenv('DBNAME', None) mq_broker = os.getenv('MQBROKER', None) mq_port = int(os.getenv('MQPORT', None)) mq_user = os.getenv('MQUSER', None) mq_pwd = os.getenv('MQPWD', None) mq_topic_cur = os.getenv('MQTOPIC_CUR', None) mq_topic_sum = os.getenv('MQTOPIC_SUM', None) port = os.getenv('READER_PORT', None) if not port: port = "/dev/ttyUSB0" # default to /dev/ttyUSB0 if READER_PORT env var is not set, and print a warning print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC) lge320 = LGE320(port) ## define the clients def on_connect(client, userdata, flags, reason_code, properties): if(reason_code==0): print(f"{bcolors.OKGREEN}Connected successfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.OKGREEN}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.") else: print(f"{bcolors.WARNING}Connected unsuccessfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.WARNING}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.") def on_publish(client, userdata, mid, reason_codes, properties): #print("Daten erfolgreich veroeffentlicht") # use for debug reasons pass mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) mqtt_client.on_connect = on_connect mqtt_client.on_publish = on_publish mqtt_client.username_pw_set(mq_user, mq_pwd) mqtt_client.connect(mq_broker, mq_port, 60) mqtt_client.loop_start() influx_client = InfluxDBClient(db_host, db_port, db_user, db_pwd, db_name) ## define the report functions (to be run in a thread) def threaded_mqtt_report(): last_data = data while True: # publish data mqtt_client.publish(mq_topic_cur, "{" + f"\"total_act_power\": {float(data['P'])}" + "}") mqtt_client.publish(mq_topic_sum, "{" + f"\"total_act\": {float(data['T1'])/10}, \"total_act_ret\": {float(data['T2'])/10}" + "}") print(f"{bcolors.OKBLUE}Sending data to {bcolors.BOLD}MQTT.{bcolors.ENDC}") # wait for new data while last_data == data: sleep(0.2) last_data = data def threaded_influx_report(): last_time = time() while True: # write data to influx json_body = [ { "measurement": "power", "tags": { "counter": 'T181' }, "fields": { #"value": float(data['T1'])*10 // alter zähler "value": float(data['T1'])/10 } }, { "measurement": "power", "tags": { "counter": 'T280' }, "fields": { "value": float(data['T2'])/10 } }, { "measurement": "power", "tags": { "counter": 'P' }, "fields": { "value": float(data['P']) } } ] influx_client.write_points(json_body) print(f"{bcolors.OKBLUE}Written data to the {bcolors.BOLD}influx database.{bcolors.ENDC}") # wait 10s (and calculate out the time the reporting took) while (time() - last_time) < 10: sleep(1) last_time = time() # initialize data and threads data = lge320.read() # get data once before starting the threads mqtt_report_thread = threading.Thread(target = threaded_mqtt_report) influx_report_thread = threading.Thread(target = threaded_influx_report) mqtt_report_thread.start() influx_report_thread.start() # get new data from the reader forever while True: data = lge320.read() print(f"{bcolors.OKGREEN}Received new data:{bcolors.ENDC} @{bcolors.OKBLUE}{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{bcolors.ENDC} | {bcolors.OKCYAN}T1.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T1'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}T2.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T2'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}P:{bcolors.ENDC} {bcolors.BOLD}{data['P']}{bcolors.ENDC}")