easymeter_q3c/q3c_logger.py
2022-08-20 17:08:21 +02:00

91 lines
4.1 KiB
Python
Executable File

#!/usr/bin/python3
import sys
import serial
import datetime
from time import time,sleep
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class Q3C():
def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5):
# initialize variables
self.chunk_size = read_chunk_size
self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41"
# connect to easymeter q3c
try:
self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout)
self.device.close()
except serial.serialutil.SerialException:
print(bcolors.FAIL + f"Can't connect to Easymeter Q3C on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC)
exit(1)
print(bcolors.OKGREEN + f"Connected succesfully to Easymeter Q3C on Port {serial_port}" + bcolors.ENDC)
def _enableSerial(self, enable: bool):
if enable:
self.device.open()
else:
self.device.close()
def _get_energy_value(self, energy_values):
result = {} # result dictionary
for energy_value in energy_values:
watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes)
# write the watthours into the result dictionary
# formula: (((b1*256)+b2)*256+b3)*256+b4
result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3]
return result
def read(self):
self._enableSerial(True) # start communication
# wake up easymeter
self.device.write(self.init_string)
# read data
read_buffer = b""
for _ in range(3):
byte_chunk = self.device.read(size=self.chunk_size)
read_buffer += byte_chunk
# stop communication
self._enableSerial(False)
read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message
energy_values = {
"T0": read_buffer[read_buffer.find(b'\x01\x08\x00'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x01')], # from t1.8.0 to 1.8.1
"T1": read_buffer[read_buffer.find(b'\x01\x08\x01'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x02')], # from t1.8.1 to 1.8.2
"T2": read_buffer[read_buffer.find(b'\x01\x08\x02'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x03')], # from t1.8.2 to 1.8.3
"T3": read_buffer[read_buffer.find(b'\x01\x08\x03'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x04')], # from t1.8.3 to 1.8.4
"T4": read_buffer[read_buffer.find(b'\x01\x08\x04'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x05')], # from t1.8.4 to 1.8.5
"T5": read_buffer[read_buffer.find(b'\x01\x08\x05'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x06')], # from t1.8.5 to 1.8.6
"T6": read_buffer[read_buffer.find(b'\x01\x08\x06'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x07')], # from t1.8.6 to 1.8.7
"T7": read_buffer[read_buffer.find(b'\x01\x08\x07'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x08')], # from t1.8.7 to 1.8.8
}
return self._get_energy_value(energy_values)
if __name__ == "__main__": # read q3c data
if(len(sys.argv) != 2):
port = "/dev/ttyUSB0"
print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC)
else:
port = sys.argv[1]
device = Q3C(port)
data = device.read()
print(data)