Compare commits

...

2 Commits

Author SHA1 Message Date
231d90ece5 Added more detailed description 2025-06-13 16:11:23 +02:00
692f54562d Added script 2025-06-13 16:05:03 +02:00
4 changed files with 229 additions and 1 deletions

15
.env Normal file
View File

@ -0,0 +1,15 @@
READER_PORT=/dev/ttyUSB0
DBHOST=<IP OF INFLUXDB>
DBPORT=<PORT OF INFLUXDB>
DBUSER=<USER OF INFLUXDB>
DBPWD=<PASSWROD OF INFLUXDB>
DBNAME=<DATABASE NAME IN INFLUXDB>
TZ=Europe/Berlin
MQBROKER=<IP OF THE MQTT BROKER>
MQPORT=<PORT OF THE MQTT BROKER>
MQUSER=<USER OF THE MQTT BROKER>
MQPWD=<PASSWORD OF THE MQTT BROKER>
MQTOPIC_CUR=<TOPIC WHERE THE CURRENT STATUS SHALL BE WRITTEN TO>
MQTOPIC_SUM=<TOPIC WHERE THE TOTAL STATUS SHALL BE WRITTEN TO>
PYTHONUNBUFFERED=1

View File

View File

@ -1,3 +1,20 @@
# lge320-reader
Read data from the Landis+Gyr E320 electricity meter (or any DIN EN 6205621-compatible meter on 9600 Baud)
Read data from the Landis+Gyr E320 electricity meter (or any DIN EN 6205621-compatible meter on 9600 Baud)
In the `if __name__ == "__main__"` statement, an example data processing is written (as it is in use by me) - sending data to MQTT every second (on each data receive of the Landis+Gyr) and writing data to an influx database every 10s is currently implemented and can be configured using the `.env` file.
## Usage
First, you need to configure all the necessary values in the .env file. After configuring, run
```bash
python3 lge320reader.py
```
...to run the script.
## License
This project is licensed under the terms of the GNU General Public License v3.0 or later, see [COPYING](COPYING).

196
lge320reader.py Executable file
View File

@ -0,0 +1,196 @@
#!/usr/local/bin/python3
import sys
import serial
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class LGE320:
def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5):
# initialize variables
self.chunk_size = read_chunk_size
self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41"
# connect to easymeter q3c
try:
self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout)
self.device.close()
except serial.serialutil.SerialException:
print(bcolors.FAIL + f"Can't connect to Landis+Gyr E320 on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC)
exit(1)
print(bcolors.OKGREEN + f"Connected successfully to Landis+Gyr E320 on Port {serial_port}" + bcolors.ENDC)
def _enableSerial(self, enable: bool):
if enable:
self.device.open()
else:
self.device.close()
def _get_energy_value(self, energy_values):
result = {} # result dictionary
for energy_value in energy_values:
watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes)
# write the watthours into the result dictionary
# formula: (((b1*256)+b2)*256+b3)*256+b4
#result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3]
result[energy_value] = int.from_bytes(watthours, "big", signed=True)
return result
def read(self):
self._enableSerial(True) # start communication
# wake up easymeter
#self.device.write(self.init_string)
# read data
read_buffer = b""
for _ in range(3):
byte_chunk = self.device.read(size=self.chunk_size)
read_buffer += byte_chunk
# stop communication
self._enableSerial(False)
read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message
#print(read_buffer)
#print(read_buffer[read_buffer.find(b'\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x00\x00\x00\x1B\x1B')].hex(" ").upper())
energy_values = {
"T1": read_buffer[read_buffer.find(b'\x07\x01\x00\x01\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x02\x08\x00')], # 1.8.0
"T2": read_buffer[read_buffer.find(b'\x07\x01\x00\x02\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x10\x07\x00')], # 2.8.0
"P": read_buffer[read_buffer.find(b'\x07\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x01\x01\x01\x63')], # current power
}
return self._get_energy_value(energy_values)
# report data to influxdb (every 10s) and mqtt (as often as possible; when new data is received)
if __name__ == "__main__": # if run from cli (not imported as a module)
import os
from dotenv import load_dotenv
import datetime
from time import time, sleep
import threading
# for reports to mqtt and influxdb
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
# load .env file (if existing) and get the env vars
load_dotenv()
db_host = os.getenv('DBHOST', None)
db_port = int(os.getenv('DBPORT', None))
db_user = os.getenv('DBUSER', None)
db_pwd = os.getenv('DBPWD', None)
db_name = os.getenv('DBNAME', None)
mq_broker = os.getenv('MQBROKER', None)
mq_port = int(os.getenv('MQPORT', None))
mq_user = os.getenv('MQUSER', None)
mq_pwd = os.getenv('MQPWD', None)
mq_topic_cur = os.getenv('MQTOPIC_CUR', None)
mq_topic_sum = os.getenv('MQTOPIC_SUM', None)
port = os.getenv('READER_PORT', None)
if not port:
port = "/dev/ttyUSB0" # default to /dev/ttyUSB0 if READER_PORT env var is not set, and print a warning
print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC)
lge320 = LGE320(port)
## define the clients
def on_connect(client, userdata, flags, reason_code, properties):
if(reason_code==0):
print(f"{bcolors.OKGREEN}Connected successfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.OKGREEN}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
else:
print(f"{bcolors.WARNING}Connected unsuccessfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.WARNING}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
def on_publish(client, userdata, mid, reason_codes, properties):
#print("Daten erfolgreich veroeffentlicht") # use for debug reasons
pass
mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.username_pw_set(mq_user, mq_pwd)
mqtt_client.connect(mq_broker, mq_port, 60)
mqtt_client.loop_start()
influx_client = InfluxDBClient(db_host, db_port, db_user, db_pwd, db_name)
## define the report functions (to be run in a thread)
def threaded_mqtt_report():
last_data = data
while True:
# publish data
mqtt_client.publish(mq_topic_cur, "{" + f"\"total_act_power\": {float(data['P'])}" + "}")
mqtt_client.publish(mq_topic_sum, "{" + f"\"total_act\": {float(data['T1'])/10}, \"total_act_ret\": {float(data['T2'])/10}" + "}")
print(f"{bcolors.OKBLUE}Sending data to {bcolors.BOLD}MQTT.{bcolors.ENDC}")
# wait for new data
while last_data == data:
sleep(0.2)
last_data = data
def threaded_influx_report():
last_time = time()
while True:
# write data to influx
json_body = [
{
"measurement": "power",
"tags": {
"counter": 'T181'
},
"fields": {
#"value": float(data['T1'])*10 // alter zähler
"value": float(data['T1'])/10
}
},
{
"measurement": "power",
"tags": {
"counter": 'T280'
},
"fields": {
"value": float(data['T2'])/10
}
},
{
"measurement": "power",
"tags": {
"counter": 'P'
},
"fields": {
"value": float(data['P'])
}
}
]
influx_client.write_points(json_body)
print(f"{bcolors.OKBLUE}Written data to the {bcolors.BOLD}influx database.{bcolors.ENDC}")
# wait 10s (and calculate out the time the reporting took)
while (time() - last_time) < 10:
sleep(1)
last_time = time()
# initialize data and threads
data = lge320.read() # get data once before starting the threads
mqtt_report_thread = threading.Thread(target = threaded_mqtt_report)
influx_report_thread = threading.Thread(target = threaded_influx_report)
mqtt_report_thread.start()
influx_report_thread.start()
# get new data from the reader forever
while True:
data = lge320.read()
print(f"{bcolors.OKGREEN}Received new data:{bcolors.ENDC} @{bcolors.OKBLUE}{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{bcolors.ENDC} | {bcolors.OKCYAN}T1.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T1'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}T2.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T2'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}P:{bcolors.ENDC} {bcolors.BOLD}{data['P']}{bcolors.ENDC}")