Compare commits
No commits in common. "231d90ece524a33a5a977eb5e4e856dafade2f3a" and "38bf18b231e6def6295d30781c0676da9f41d585" have entirely different histories.
231d90ece5
...
38bf18b231
15
.env
15
.env
@ -1,15 +0,0 @@
|
|||||||
READER_PORT=/dev/ttyUSB0
|
|
||||||
DBHOST=<IP OF INFLUXDB>
|
|
||||||
DBPORT=<PORT OF INFLUXDB>
|
|
||||||
DBUSER=<USER OF INFLUXDB>
|
|
||||||
DBPWD=<PASSWROD OF INFLUXDB>
|
|
||||||
DBNAME=<DATABASE NAME IN INFLUXDB>
|
|
||||||
TZ=Europe/Berlin
|
|
||||||
MQBROKER=<IP OF THE MQTT BROKER>
|
|
||||||
MQPORT=<PORT OF THE MQTT BROKER>
|
|
||||||
MQUSER=<USER OF THE MQTT BROKER>
|
|
||||||
MQPWD=<PASSWORD OF THE MQTT BROKER>
|
|
||||||
MQTOPIC_CUR=<TOPIC WHERE THE CURRENT STATUS SHALL BE WRITTEN TO>
|
|
||||||
MQTOPIC_SUM=<TOPIC WHERE THE TOTAL STATUS SHALL BE WRITTEN TO>
|
|
||||||
PYTHONUNBUFFERED=1
|
|
||||||
|
|
19
README.md
19
README.md
@ -1,20 +1,3 @@
|
|||||||
# lge320-reader
|
# lge320-reader
|
||||||
|
|
||||||
Read data from the Landis+Gyr E320 electricity meter (or any DIN EN 62056‐21-compatible meter on 9600 Baud)
|
Read data from the Landis+Gyr E320 electricity meter (or any DIN EN 62056‐21-compatible meter on 9600 Baud)
|
||||||
In the `if __name__ == "__main__"` statement, an example data processing is written (as it is in use by me) - sending data to MQTT every second (on each data receive of the Landis+Gyr) and writing data to an influx database every 10s is currently implemented and can be configured using the `.env` file.
|
|
||||||
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
First, you need to configure all the necessary values in the .env file. After configuring, run
|
|
||||||
|
|
||||||
```bash
|
|
||||||
python3 lge320reader.py
|
|
||||||
```
|
|
||||||
|
|
||||||
...to run the script.
|
|
||||||
|
|
||||||
|
|
||||||
## License
|
|
||||||
|
|
||||||
This project is licensed under the terms of the GNU General Public License v3.0 or later, see [COPYING](COPYING).
|
|
196
lge320reader.py
196
lge320reader.py
@ -1,196 +0,0 @@
|
|||||||
#!/usr/local/bin/python3
|
|
||||||
import sys
|
|
||||||
import serial
|
|
||||||
|
|
||||||
class bcolors:
|
|
||||||
HEADER = '\033[95m'
|
|
||||||
OKBLUE = '\033[94m'
|
|
||||||
OKCYAN = '\033[96m'
|
|
||||||
OKGREEN = '\033[92m'
|
|
||||||
WARNING = '\033[93m'
|
|
||||||
FAIL = '\033[91m'
|
|
||||||
ENDC = '\033[0m'
|
|
||||||
BOLD = '\033[1m'
|
|
||||||
UNDERLINE = '\033[4m'
|
|
||||||
|
|
||||||
class LGE320:
|
|
||||||
def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5):
|
|
||||||
# initialize variables
|
|
||||||
self.chunk_size = read_chunk_size
|
|
||||||
self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41"
|
|
||||||
|
|
||||||
# connect to easymeter q3c
|
|
||||||
try:
|
|
||||||
self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout)
|
|
||||||
self.device.close()
|
|
||||||
except serial.serialutil.SerialException:
|
|
||||||
print(bcolors.FAIL + f"Can't connect to Landis+Gyr E320 on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC)
|
|
||||||
exit(1)
|
|
||||||
print(bcolors.OKGREEN + f"Connected successfully to Landis+Gyr E320 on Port {serial_port}" + bcolors.ENDC)
|
|
||||||
|
|
||||||
def _enableSerial(self, enable: bool):
|
|
||||||
if enable:
|
|
||||||
self.device.open()
|
|
||||||
else:
|
|
||||||
self.device.close()
|
|
||||||
|
|
||||||
def _get_energy_value(self, energy_values):
|
|
||||||
result = {} # result dictionary
|
|
||||||
for energy_value in energy_values:
|
|
||||||
watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes)
|
|
||||||
# write the watthours into the result dictionary
|
|
||||||
# formula: (((b1*256)+b2)*256+b3)*256+b4
|
|
||||||
#result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3]
|
|
||||||
result[energy_value] = int.from_bytes(watthours, "big", signed=True)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
self._enableSerial(True) # start communication
|
|
||||||
|
|
||||||
# wake up easymeter
|
|
||||||
#self.device.write(self.init_string)
|
|
||||||
|
|
||||||
# read data
|
|
||||||
read_buffer = b""
|
|
||||||
for _ in range(3):
|
|
||||||
byte_chunk = self.device.read(size=self.chunk_size)
|
|
||||||
read_buffer += byte_chunk
|
|
||||||
|
|
||||||
# stop communication
|
|
||||||
self._enableSerial(False)
|
|
||||||
|
|
||||||
read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message
|
|
||||||
#print(read_buffer)
|
|
||||||
#print(read_buffer[read_buffer.find(b'\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x00\x00\x00\x1B\x1B')].hex(" ").upper())
|
|
||||||
|
|
||||||
energy_values = {
|
|
||||||
"T1": read_buffer[read_buffer.find(b'\x07\x01\x00\x01\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x02\x08\x00')], # 1.8.0
|
|
||||||
"T2": read_buffer[read_buffer.find(b'\x07\x01\x00\x02\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x10\x07\x00')], # 2.8.0
|
|
||||||
"P": read_buffer[read_buffer.find(b'\x07\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x01\x01\x01\x63')], # current power
|
|
||||||
}
|
|
||||||
return self._get_energy_value(energy_values)
|
|
||||||
|
|
||||||
|
|
||||||
# report data to influxdb (every 10s) and mqtt (as often as possible; when new data is received)
|
|
||||||
if __name__ == "__main__": # if run from cli (not imported as a module)
|
|
||||||
import os
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
import datetime
|
|
||||||
from time import time, sleep
|
|
||||||
import threading
|
|
||||||
|
|
||||||
# for reports to mqtt and influxdb
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
from influxdb import InfluxDBClient
|
|
||||||
|
|
||||||
# load .env file (if existing) and get the env vars
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
db_host = os.getenv('DBHOST', None)
|
|
||||||
db_port = int(os.getenv('DBPORT', None))
|
|
||||||
db_user = os.getenv('DBUSER', None)
|
|
||||||
db_pwd = os.getenv('DBPWD', None)
|
|
||||||
db_name = os.getenv('DBNAME', None)
|
|
||||||
|
|
||||||
mq_broker = os.getenv('MQBROKER', None)
|
|
||||||
mq_port = int(os.getenv('MQPORT', None))
|
|
||||||
mq_user = os.getenv('MQUSER', None)
|
|
||||||
mq_pwd = os.getenv('MQPWD', None)
|
|
||||||
mq_topic_cur = os.getenv('MQTOPIC_CUR', None)
|
|
||||||
mq_topic_sum = os.getenv('MQTOPIC_SUM', None)
|
|
||||||
|
|
||||||
port = os.getenv('READER_PORT', None)
|
|
||||||
|
|
||||||
if not port:
|
|
||||||
port = "/dev/ttyUSB0" # default to /dev/ttyUSB0 if READER_PORT env var is not set, and print a warning
|
|
||||||
print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC)
|
|
||||||
lge320 = LGE320(port)
|
|
||||||
|
|
||||||
|
|
||||||
## define the clients
|
|
||||||
def on_connect(client, userdata, flags, reason_code, properties):
|
|
||||||
if(reason_code==0):
|
|
||||||
print(f"{bcolors.OKGREEN}Connected successfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.OKGREEN}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
|
|
||||||
else:
|
|
||||||
print(f"{bcolors.WARNING}Connected unsuccessfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.WARNING}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
|
|
||||||
def on_publish(client, userdata, mid, reason_codes, properties):
|
|
||||||
#print("Daten erfolgreich veroeffentlicht") # use for debug reasons
|
|
||||||
pass
|
|
||||||
mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
|
||||||
mqtt_client.on_connect = on_connect
|
|
||||||
mqtt_client.on_publish = on_publish
|
|
||||||
mqtt_client.username_pw_set(mq_user, mq_pwd)
|
|
||||||
mqtt_client.connect(mq_broker, mq_port, 60)
|
|
||||||
mqtt_client.loop_start()
|
|
||||||
|
|
||||||
influx_client = InfluxDBClient(db_host, db_port, db_user, db_pwd, db_name)
|
|
||||||
|
|
||||||
## define the report functions (to be run in a thread)
|
|
||||||
def threaded_mqtt_report():
|
|
||||||
last_data = data
|
|
||||||
while True:
|
|
||||||
# publish data
|
|
||||||
mqtt_client.publish(mq_topic_cur, "{" + f"\"total_act_power\": {float(data['P'])}" + "}")
|
|
||||||
mqtt_client.publish(mq_topic_sum, "{" + f"\"total_act\": {float(data['T1'])/10}, \"total_act_ret\": {float(data['T2'])/10}" + "}")
|
|
||||||
print(f"{bcolors.OKBLUE}Sending data to {bcolors.BOLD}MQTT.{bcolors.ENDC}")
|
|
||||||
# wait for new data
|
|
||||||
while last_data == data:
|
|
||||||
sleep(0.2)
|
|
||||||
last_data = data
|
|
||||||
|
|
||||||
def threaded_influx_report():
|
|
||||||
last_time = time()
|
|
||||||
while True:
|
|
||||||
# write data to influx
|
|
||||||
json_body = [
|
|
||||||
{
|
|
||||||
"measurement": "power",
|
|
||||||
"tags": {
|
|
||||||
"counter": 'T181'
|
|
||||||
},
|
|
||||||
"fields": {
|
|
||||||
#"value": float(data['T1'])*10 // alter zähler
|
|
||||||
"value": float(data['T1'])/10
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"measurement": "power",
|
|
||||||
"tags": {
|
|
||||||
"counter": 'T280'
|
|
||||||
},
|
|
||||||
"fields": {
|
|
||||||
"value": float(data['T2'])/10
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"measurement": "power",
|
|
||||||
"tags": {
|
|
||||||
"counter": 'P'
|
|
||||||
},
|
|
||||||
"fields": {
|
|
||||||
"value": float(data['P'])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
influx_client.write_points(json_body)
|
|
||||||
print(f"{bcolors.OKBLUE}Written data to the {bcolors.BOLD}influx database.{bcolors.ENDC}")
|
|
||||||
|
|
||||||
# wait 10s (and calculate out the time the reporting took)
|
|
||||||
while (time() - last_time) < 10:
|
|
||||||
sleep(1)
|
|
||||||
last_time = time()
|
|
||||||
|
|
||||||
# initialize data and threads
|
|
||||||
data = lge320.read() # get data once before starting the threads
|
|
||||||
|
|
||||||
mqtt_report_thread = threading.Thread(target = threaded_mqtt_report)
|
|
||||||
influx_report_thread = threading.Thread(target = threaded_influx_report)
|
|
||||||
mqtt_report_thread.start()
|
|
||||||
influx_report_thread.start()
|
|
||||||
|
|
||||||
# get new data from the reader forever
|
|
||||||
while True:
|
|
||||||
data = lge320.read()
|
|
||||||
print(f"{bcolors.OKGREEN}Received new data:{bcolors.ENDC} @{bcolors.OKBLUE}{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{bcolors.ENDC} | {bcolors.OKCYAN}T1.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T1'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}T2.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T2'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}P:{bcolors.ENDC} {bcolors.BOLD}{data['P']}{bcolors.ENDC}")
|
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user