#!/usr/bin/python3 import sys import serial import datetime from time import time,sleep class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class Q3C(): def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5): # initialize variables self.chunk_size = read_chunk_size self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41" # connect to easymeter q3c try: self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout) self.device.close() except serial.serialutil.SerialException: print(bcolors.FAIL + f"Can't connect to Easymeter Q3C on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC) exit(1) print(bcolors.OKGREEN + f"Connected succesfully to Easymeter Q3C on Port {serial_port}" + bcolors.ENDC) def _enableSerial(self, enable: bool): if enable: self.device.open() else: self.device.close() def _get_energy_value(self, energy_values): result = {} # result dictionary for energy_value in energy_values: watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes) # write the watthours into the result dictionary # formula: (((b1*256)+b2)*256+b3)*256+b4 result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3] return result def read(self): self._enableSerial(True) # start communication # wake up easymeter self.device.write(self.init_string) # read data read_buffer = b"" for _ in range(3): byte_chunk = self.device.read(size=self.chunk_size) read_buffer += byte_chunk # stop communication self._enableSerial(False) read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message energy_values = { "T0": read_buffer[read_buffer.find(b'\x01\x08\x00'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x01')], # from t1.8.0 to 1.8.1 "T1": read_buffer[read_buffer.find(b'\x01\x08\x01'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x02')], # from t1.8.1 to 1.8.2 "T2": read_buffer[read_buffer.find(b'\x01\x08\x02'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x03')], # from t1.8.2 to 1.8.3 "T3": read_buffer[read_buffer.find(b'\x01\x08\x03'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x04')], # from t1.8.3 to 1.8.4 "T4": read_buffer[read_buffer.find(b'\x01\x08\x04'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x05')], # from t1.8.4 to 1.8.5 "T5": read_buffer[read_buffer.find(b'\x01\x08\x05'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x06')], # from t1.8.5 to 1.8.6 "T6": read_buffer[read_buffer.find(b'\x01\x08\x06'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x07')], # from t1.8.6 to 1.8.7 "T7": read_buffer[read_buffer.find(b'\x01\x08\x07'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x08')], # from t1.8.7 to 1.8.8 } return self._get_energy_value(energy_values) if __name__ == "__main__": # read q3c data if(len(sys.argv) != 2): port = "/dev/ttyUSB0" print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC) else: port = sys.argv[1] device = Q3C(port) data = device.read() print(data)