197 lines
8.5 KiB
Python
Executable File
197 lines
8.5 KiB
Python
Executable File
#!/usr/local/bin/python3
|
|
import sys
|
|
import serial
|
|
|
|
class bcolors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
|
|
class LGE320:
|
|
def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5):
|
|
# initialize variables
|
|
self.chunk_size = read_chunk_size
|
|
self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41"
|
|
|
|
# connect to easymeter q3c
|
|
try:
|
|
self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout)
|
|
self.device.close()
|
|
except serial.serialutil.SerialException:
|
|
print(bcolors.FAIL + f"Can't connect to Landis+Gyr E320 on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC)
|
|
exit(1)
|
|
print(bcolors.OKGREEN + f"Connected successfully to Landis+Gyr E320 on Port {serial_port}" + bcolors.ENDC)
|
|
|
|
def _enableSerial(self, enable: bool):
|
|
if enable:
|
|
self.device.open()
|
|
else:
|
|
self.device.close()
|
|
|
|
def _get_energy_value(self, energy_values):
|
|
result = {} # result dictionary
|
|
for energy_value in energy_values:
|
|
watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes)
|
|
# write the watthours into the result dictionary
|
|
# formula: (((b1*256)+b2)*256+b3)*256+b4
|
|
#result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3]
|
|
result[energy_value] = int.from_bytes(watthours, "big", signed=True)
|
|
return result
|
|
|
|
def read(self):
|
|
self._enableSerial(True) # start communication
|
|
|
|
# wake up easymeter
|
|
#self.device.write(self.init_string)
|
|
|
|
# read data
|
|
read_buffer = b""
|
|
for _ in range(3):
|
|
byte_chunk = self.device.read(size=self.chunk_size)
|
|
read_buffer += byte_chunk
|
|
|
|
# stop communication
|
|
self._enableSerial(False)
|
|
|
|
read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message
|
|
#print(read_buffer)
|
|
#print(read_buffer[read_buffer.find(b'\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x00\x00\x00\x1B\x1B')].hex(" ").upper())
|
|
|
|
energy_values = {
|
|
"T1": read_buffer[read_buffer.find(b'\x07\x01\x00\x01\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x02\x08\x00')], # 1.8.0
|
|
"T2": read_buffer[read_buffer.find(b'\x07\x01\x00\x02\x08\x00\xFF'):read_buffer.find(b'\x01\x77\x07\x01\x00\x10\x07\x00')], # 2.8.0
|
|
"P": read_buffer[read_buffer.find(b'\x07\x01\x00\x10\x07\x00\xFF'):read_buffer.find(b'\x01\x01\x01\x63')], # current power
|
|
}
|
|
return self._get_energy_value(energy_values)
|
|
|
|
|
|
# report data to influxdb (every 10s) and mqtt (as often as possible; when new data is received)
|
|
if __name__ == "__main__": # if run from cli (not imported as a module)
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import datetime
|
|
from time import time, sleep
|
|
import threading
|
|
|
|
# for reports to mqtt and influxdb
|
|
import paho.mqtt.client as mqtt
|
|
from influxdb import InfluxDBClient
|
|
|
|
# load .env file (if existing) and get the env vars
|
|
load_dotenv()
|
|
|
|
db_host = os.getenv('DBHOST', None)
|
|
db_port = int(os.getenv('DBPORT', None))
|
|
db_user = os.getenv('DBUSER', None)
|
|
db_pwd = os.getenv('DBPWD', None)
|
|
db_name = os.getenv('DBNAME', None)
|
|
|
|
mq_broker = os.getenv('MQBROKER', None)
|
|
mq_port = int(os.getenv('MQPORT', None))
|
|
mq_user = os.getenv('MQUSER', None)
|
|
mq_pwd = os.getenv('MQPWD', None)
|
|
mq_topic_cur = os.getenv('MQTOPIC_CUR', None)
|
|
mq_topic_sum = os.getenv('MQTOPIC_SUM', None)
|
|
|
|
port = os.getenv('READER_PORT', None)
|
|
|
|
if not port:
|
|
port = "/dev/ttyUSB0" # default to /dev/ttyUSB0 if READER_PORT env var is not set, and print a warning
|
|
print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC)
|
|
lge320 = LGE320(port)
|
|
|
|
|
|
## define the clients
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
if(reason_code==0):
|
|
print(f"{bcolors.OKGREEN}Connected successfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.OKGREEN}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
|
|
else:
|
|
print(f"{bcolors.WARNING}Connected unsuccessfully to the MQTT broker (on {bcolors.ENDC}{bcolors.OKCYAN}{mq_broker}{bcolors.ENDC}) {bcolors.WARNING}with status code {bcolors.ENDC}{bcolors.OKCYAN}{reason_code}{bcolors.ENDC}.")
|
|
def on_publish(client, userdata, mid, reason_codes, properties):
|
|
#print("Daten erfolgreich veroeffentlicht") # use for debug reasons
|
|
pass
|
|
mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
|
mqtt_client.on_connect = on_connect
|
|
mqtt_client.on_publish = on_publish
|
|
mqtt_client.username_pw_set(mq_user, mq_pwd)
|
|
mqtt_client.connect(mq_broker, mq_port, 60)
|
|
mqtt_client.loop_start()
|
|
|
|
influx_client = InfluxDBClient(db_host, db_port, db_user, db_pwd, db_name)
|
|
|
|
## define the report functions (to be run in a thread)
|
|
def threaded_mqtt_report():
|
|
last_data = data
|
|
while True:
|
|
# publish data
|
|
mqtt_client.publish(mq_topic_cur, "{" + f"\"total_act_power\": {float(data['P'])}" + "}")
|
|
mqtt_client.publish(mq_topic_sum, "{" + f"\"total_act\": {float(data['T1'])/10}, \"total_act_ret\": {float(data['T2'])/10}" + "}")
|
|
print(f"{bcolors.OKBLUE}Sending data to {bcolors.BOLD}MQTT.{bcolors.ENDC}")
|
|
# wait for new data
|
|
while last_data == data:
|
|
sleep(0.2)
|
|
last_data = data
|
|
|
|
def threaded_influx_report():
|
|
last_time = time()
|
|
while True:
|
|
# write data to influx
|
|
json_body = [
|
|
{
|
|
"measurement": "power",
|
|
"tags": {
|
|
"counter": 'T181'
|
|
},
|
|
"fields": {
|
|
#"value": float(data['T1'])*10 // alter zähler
|
|
"value": float(data['T1'])/10
|
|
}
|
|
},
|
|
{
|
|
"measurement": "power",
|
|
"tags": {
|
|
"counter": 'T280'
|
|
},
|
|
"fields": {
|
|
"value": float(data['T2'])/10
|
|
}
|
|
},
|
|
{
|
|
"measurement": "power",
|
|
"tags": {
|
|
"counter": 'P'
|
|
},
|
|
"fields": {
|
|
"value": float(data['P'])
|
|
}
|
|
}
|
|
]
|
|
influx_client.write_points(json_body)
|
|
print(f"{bcolors.OKBLUE}Written data to the {bcolors.BOLD}influx database.{bcolors.ENDC}")
|
|
|
|
# wait 10s (and calculate out the time the reporting took)
|
|
while (time() - last_time) < 10:
|
|
sleep(1)
|
|
last_time = time()
|
|
|
|
# initialize data and threads
|
|
data = lge320.read() # get data once before starting the threads
|
|
|
|
mqtt_report_thread = threading.Thread(target = threaded_mqtt_report)
|
|
influx_report_thread = threading.Thread(target = threaded_influx_report)
|
|
mqtt_report_thread.start()
|
|
influx_report_thread.start()
|
|
|
|
# get new data from the reader forever
|
|
while True:
|
|
data = lge320.read()
|
|
print(f"{bcolors.OKGREEN}Received new data:{bcolors.ENDC} @{bcolors.OKBLUE}{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{bcolors.ENDC} | {bcolors.OKCYAN}T1.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T1'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}T2.8.0:{bcolors.ENDC} {bcolors.BOLD}{(data['T2'])/10}{bcolors.ENDC} | {bcolors.OKCYAN}P:{bcolors.ENDC} {bcolors.BOLD}{data['P']}{bcolors.ENDC}")
|
|
|
|
|