91 lines
4.1 KiB
Python
Executable File
91 lines
4.1 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import sys
|
|
import serial
|
|
import datetime
|
|
from time import time,sleep
|
|
|
|
class bcolors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
|
|
|
|
class Q3C():
|
|
def __init__(self, serial_port, read_chunk_size=2048, read_timeout=0.5):
|
|
# initialize variables
|
|
self.chunk_size = read_chunk_size
|
|
self.init_string = b"\x1b\x1b\x1b\x1b\x01\x01\x01\x01\x76\x03\x30\x30\x62\x00\x62\x00\x72\x65\x00\x00\x01\x00\x77\x01\x01\x09\x31\x31\x33\x31\x31\x38\x36\x32\x01\x01\x01\x01\x63\x03\x36\x00\x76\x03\x30\x31\x62\x00\x62\x00\x72\x65\x00\x00\x07\x00\x75\x01\x01\x01\x01\x01\x63\x14\xcb\x00\x76\x03\x30\x32\x62\x00\x62\x00\x72\x65\x00\x00\x02\x00\x71\x01\x63\x75\x6d\x00\x00\x00\x1b\x1b\x1b\x1b\x1a\x02\x72\x41"
|
|
|
|
# connect to easymeter q3c
|
|
try:
|
|
self.device = serial.Serial(port = serial_port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, timeout=read_timeout)
|
|
self.device.close()
|
|
except serial.serialutil.SerialException:
|
|
print(bcolors.FAIL + f"Can't connect to Easymeter Q3C on Port {serial_port}. Maybe wrong port specified?" + bcolors.ENDC)
|
|
exit(1)
|
|
print(bcolors.OKGREEN + f"Connected succesfully to Easymeter Q3C on Port {serial_port}" + bcolors.ENDC)
|
|
|
|
def _enableSerial(self, enable: bool):
|
|
if enable:
|
|
self.device.open()
|
|
else:
|
|
self.device.close()
|
|
|
|
def _get_energy_value(self, energy_values):
|
|
result = {} # result dictionary
|
|
for energy_value in energy_values:
|
|
watthours = energy_values[energy_value][::-1][:4][::-1] # get relevant information (last four bytes)
|
|
# write the watthours into the result dictionary
|
|
# formula: (((b1*256)+b2)*256+b3)*256+b4
|
|
result[energy_value] = ((( watthours[0]*256 ) + watthours[1] )*256 + watthours[2])*256 + watthours[3]
|
|
return result
|
|
|
|
def read(self):
|
|
self._enableSerial(True) # start communication
|
|
|
|
# wake up easymeter
|
|
self.device.write(self.init_string)
|
|
|
|
# read data
|
|
read_buffer = b""
|
|
for _ in range(3):
|
|
byte_chunk = self.device.read(size=self.chunk_size)
|
|
read_buffer += byte_chunk
|
|
|
|
# stop communication
|
|
self._enableSerial(False)
|
|
|
|
read_buffer = read_buffer[read_buffer.find(b'\x1b\x1b\x1b\x1b\x01\x01\x01\x01'):] # grab the sml message
|
|
energy_values = {
|
|
"T0": read_buffer[read_buffer.find(b'\x01\x08\x00'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x01')], # from t1.8.0 to 1.8.1
|
|
"T1": read_buffer[read_buffer.find(b'\x01\x08\x01'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x02')], # from t1.8.1 to 1.8.2
|
|
"T2": read_buffer[read_buffer.find(b'\x01\x08\x02'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x03')], # from t1.8.2 to 1.8.3
|
|
"T3": read_buffer[read_buffer.find(b'\x01\x08\x03'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x04')], # from t1.8.3 to 1.8.4
|
|
"T4": read_buffer[read_buffer.find(b'\x01\x08\x04'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x05')], # from t1.8.4 to 1.8.5
|
|
"T5": read_buffer[read_buffer.find(b'\x01\x08\x05'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x06')], # from t1.8.5 to 1.8.6
|
|
"T6": read_buffer[read_buffer.find(b'\x01\x08\x06'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x07')], # from t1.8.6 to 1.8.7
|
|
"T7": read_buffer[read_buffer.find(b'\x01\x08\x07'):read_buffer.find(b'\x01\x77\x07\x01\x01\x01\x08\x08')], # from t1.8.7 to 1.8.8
|
|
}
|
|
|
|
return self._get_energy_value(energy_values)
|
|
|
|
|
|
if __name__ == "__main__": # read q3c data
|
|
if(len(sys.argv) != 2):
|
|
port = "/dev/ttyUSB0"
|
|
print(bcolors.WARNING + f"No serial port specified, trying {port}" + bcolors.ENDC)
|
|
else:
|
|
port = sys.argv[1]
|
|
device = Q3C(port)
|
|
|
|
data = device.read()
|
|
print(data)
|
|
|
|
|