diff --git a/solarcontrol.py b/solarcontrol.py index 17a0801..e05a274 100755 --- a/solarcontrol.py +++ b/solarcontrol.py @@ -9,6 +9,20 @@ from threading import Thread import requests import math + +# let's get colorful! +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + # get env vars load_dotenv() mq_broker = os.getenv('MQBROKER', None) @@ -45,11 +59,11 @@ powers = {"total": None, "total_house": None, "total_solar": None, "timestamp": # define mqtt callbacks def on_connect(client, userdata, flags, reason_code, properties): - print(f"Connected with result code {reason_code}") + print(f"{bcolors.OKGREEN}Connected with result code {bcolors.ENDC}{reason_code}") client.subscribe("lge320/#") client.subscribe("solar/ac/#") def on_message(client, userdata, msg): - #print(f"{msg.topic}: {msg.payload}") + #print(f"{msg.topic}: {msg.payload}") # just for debug / development reasons if msg.topic == mq_topic_p_house: powers_raw["house_ts"] = time() powers_raw["house"] = math.floor(((powers_raw["house"] + float(json.loads(msg.payload)["total_act_power"])) / 2)*10)/10 @@ -73,7 +87,7 @@ def threaded_current_power_calculation(): while last_powers == powers_raw: sleep(0.2) if abs(powers_raw["solar_ts"] - powers_raw["house_ts"]) < maximum_data_ts_deviation and powers_raw["solar"] != None and powers_raw["house"] != None: - print(f"Current total P: {powers_raw['solar'] + powers_raw['house']} | Solar P: {powers_raw['solar']} | House P: {powers_raw['house']}") + print(f"{bcolors.OKCYAN}Current total P: {bcolors.BOLD}{(powers_raw['solar'] + powers_raw['house']):.2f}{bcolors.ENDC} | {bcolors.WARNING}Solar P: {bcolors.BOLD}{powers_raw['solar']:.2f}{bcolors.ENDC} | {bcolors.OKBLUE}House P: {bcolors.BOLD}{powers_raw['house']:.2f}{bcolors.ENDC}") powers["total"] = powers_raw["solar"] + powers_raw["house"] powers["total_house"] = powers_raw["house"] powers["total_solar"] = powers_raw["solar"] @@ -114,15 +128,15 @@ def threaded_solar_power_limit_setting(): if not dry_run: data_to_send = 'data={"serial":"'+opendtu_inverter_sn+'","limit_type":'+str(power_limit_type)+',"limit_value":'+str(new_dampened_limit)+'}' r = requests.post(opendtu_address.strip("/") + "/api/limit/config", data=data_to_send, headers={'Content-Type':'text/plain'}, auth=(opendtu_user, opendtu_pwd)) - print("Setting new limit over the API: " + str(new_dampened_limit) + f"% (previously: {cur_limit}% | currently targeting: {new_ideal_limit}%)... ", end="") + print(f"Setting new limit over the API: {bcolors.OKBLUE}{bcolors.BOLD}{str(new_dampened_limit)}%{bcolors.ENDC} ({bcolors.OKGREEN}previously: {bcolors.BOLD}{cur_limit}%{bcolors.ENDC} | {bcolors.WARNING}currently targeting: {bcolors.BOLD}{new_ideal_limit}%{bcolors.ENDC})... ", end="") #print(f"Current limit: {cur_limit}, new ideal limit: {new_ideal_limit}, new dampened limit: {new_dampened_limit}") if json.loads(r.text)["type"] == "success": - print("Success!") + print(bcolors.OKGREEN+"Success!"+bcolors.ENDC) else: - print(f"Unsuccessful :( Code: {json.loads(r.text)['code']} ({json.loads(r.text)['type']})") + print(f"{bcolors.FAIL}Unsuccessful :( Code: {bcolors.BOLD}{json.loads(r.text)['code']} ({json.loads(r.text)['type']}){bcolors.ENDC}") print(r.text) # keep and uncomment for debug reasons else: - print("Now the new limit would be set over the API (but DRY_RUN is either not specified or True): " + str(new_limit) + "%") + print(f"{bcolors.WARNING}Now the new limit would be set over the API (but DRY_RUN is either not specified or True): {str(new_limit)}%") while (time() - last_time) < limit_update_interval: