#!/usr/bin/python3 import paho.mqtt.client as mqtt import os from dotenv import load_dotenv from time import time, sleep import json from threading import Thread import requests import urllib3 # only for exception handling import math # let's get colorful! class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' # get env vars load_dotenv() mq_broker = os.getenv('MQBROKER', None) mq_port = int(os.getenv('MQPORT', None)) mq_user = os.getenv('MQUSER', None) mq_pwd = os.getenv('MQPWD', None) mq_topic_p_house = os.getenv('MQTOPIC_P_HOUSE', None) mq_topic_p_solar = os.getenv('MQTOPIC_P_SOLAR', None) maximum_data_ts_deviation = int(os.getenv('MAXIMUM_DATA_TS_DEVIATION', 5)) opendtu_address = os.getenv('OPENDTU_ADDR', None) opendtu_user = os.getenv('OPENDTU_USER', None) opendtu_pwd = os.getenv('OPENDTU_PWD', None) opendtu_inverter_sn = os.getenv('OPENDTU_INVERTER_SN', None) limit_correction_factor = float(os.getenv('LIMIT_CORRECTION_FACTOR', 1.0)) limit_update_interval = float(os.getenv('LIMIT_UPDATE_INTERVAL', 5)) power_target = int(os.getenv('POWER_TARGET', 50)) power_target_min = float(os.getenv('POWER_TARGET_MIN', 0)) power_target_max = float(os.getenv('POWER_TARGET_MAX', 100)) power_damping_factor = float(os.getenv('POWER_DAMPING_FACTOR', 0.3)) power_limit_change_treshold = float(os.getenv('POWER_LIMIT_CHANGE_TRESHOLD', 0.5)) power_limit_type = int(os.getenv('POWER_LIMIT_TYPE', 1)) dry_run = bool(int(os.getenv('DRY_RUN', 1))) # some checks for the correctness of supplied data if power_target_min < 0: power_target_min = 0 if power_target_max > 100: power_target_max = 100 if power_damping_factor < 0: power_damping_factor = 0.0 if power_damping_factor > 1: power_damping_factor = 1.0 # create the powers dict (containing the current use) and data variables (for thread sharing) powers_raw = {"solar": 0, "solar_ts": 0, "house": 0, "house_ts": 0} powers = {"total": None, "total_house": None, "total_solar": None, "timestamp": 0} # define mqtt callbacks def on_connect(client, userdata, flags, reason_code, properties): print(f"{bcolors.OKGREEN}Connected to the MQTT broker ({mq_broker}:{mq_port}) with result code {bcolors.ENDC}{reason_code}{bcolors.OKGREEN}.{bcolors.ENDC}") client.subscribe("lge320/#") client.subscribe("solar/ac/#") def on_message(client, userdata, msg): #print(f"{msg.topic}: {msg.payload}") # just for debug / development reasons if msg.topic == mq_topic_p_house: powers_raw["house_ts"] = time() powers_raw["house"] = math.floor(((powers_raw["house"] + float(json.loads(msg.payload)["total_act_power"])) / 2)*10)/10 elif msg.topic == mq_topic_p_solar: powers_raw["solar_ts"] = time() powers_raw["solar"] = math.floor(((powers_raw["solar"] + float(msg.payload)) / 2)*10)/10 # initialize the mqtt client mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(mq_user, mq_pwd) mqttc.connect(mq_broker, mq_port, 60) mqttc.loop_start() def threaded_current_power_calculation(): while True: last_powers = powers_raw.copy() while last_powers == powers_raw: sleep(0.2) if abs(powers_raw["solar_ts"] - powers_raw["house_ts"]) < maximum_data_ts_deviation and powers_raw["solar"] != None and powers_raw["house"] != None: print(f"{bcolors.OKCYAN}Current total P: {bcolors.BOLD}{(powers_raw['solar'] + powers_raw['house']):.2f}{bcolors.ENDC} | {bcolors.WARNING}Solar P: {bcolors.BOLD}{powers_raw['solar']:.2f}{bcolors.ENDC} | {bcolors.OKBLUE}House P: {bcolors.BOLD}{powers_raw['house']:.2f}{bcolors.ENDC}") powers["total"] = powers_raw["solar"] + powers_raw["house"] powers["total_house"] = powers_raw["house"] powers["total_solar"] = powers_raw["solar"] powers["timestamp"] = time() power_calc_thread = Thread(target=threaded_current_power_calculation) power_calc_thread.start() def threaded_solar_power_limit_setting(): last_time = time() while True: # Get current openDTU current limit status try: status = requests.get(opendtu_address.strip("/") + "/api/limit/status", auth=(opendtu_user, opendtu_pwd)).json().copy() except (requests.exceptions.RequestException, urllib3.exceptions.HTTPError) as e: print(f"{bcolors.ERROR}Some error occured while trying to reach out to OpenDTU to get latest data about the inverter limit status. Skipping for now.{bcolors.ENDC}\n==== START OF EXCEPTION ====\n{e}\n==== END OF EXCEPTION ====") sleep(0.2) # wait some time (to avaid cpu overload on continuous unavailability of the service) continue # skip this loop pass while(time() - powers["timestamp"] > 1): # wait until recent data is available sleep(0.2) if status[opendtu_inverter_sn]["limit_set_status"] == "Ok" and powers["total"] != None and status[opendtu_inverter_sn]["max_power"] != 0: # calculate percentage to set the limit to new_ideal_limit = math.floor(((powers["total"]-power_target) / status[opendtu_inverter_sn]["max_power"]) * limit_correction_factor * 1000)/10 # * 100 because its a percentage if new_ideal_limit > power_target_max: new_ideal_limit = power_target_max if new_ideal_limit < power_target_min: new_ideal_limit = power_target_min cur_limit = status[opendtu_inverter_sn]["limit_relative"] diff_cur_new_limit = cur_limit - new_ideal_limit if abs(diff_cur_new_limit) >= power_limit_change_treshold: # dampen only when not giving away energy to the provider if powers["total_house"] < 0: new_dampened_limit = new_ideal_limit else: new_dampened_limit = cur_limit - (diff_cur_new_limit * (1-power_damping_factor)) new_dampened_limit = math.floor(new_dampened_limit*10)/10 if new_dampened_limit > power_target_max: new_dampened_limit = power_target_max if new_dampened_limit < power_target_min: new_dampened_limit = power_target_min if not dry_run: request_failed = False data_to_send = 'data={"serial":"'+opendtu_inverter_sn+'","limit_type":'+str(power_limit_type)+',"limit_value":'+str(new_dampened_limit)+'}' print(f"Setting new limit via the API: {bcolors.OKBLUE}{bcolors.BOLD}{str(new_dampened_limit)}%{bcolors.ENDC} ({bcolors.OKGREEN}previously: {bcolors.BOLD}{cur_limit}%{bcolors.ENDC} | {bcolors.WARNING}currently targeting: {bcolors.BOLD}{new_ideal_limit}%{bcolors.ENDC})... ", end="") try: r = requests.post(opendtu_address.strip("/") + "/api/limit/config", data=data_to_send, headers={'Content-Type':'text/plain'}, auth=(opendtu_user, opendtu_pwd)) except BaseException as e: request_failed = True if request_failed: print(f"{bcolors.FAIL}Unsuccessful :( The http request failed somehow. Printing traceback.{bcolors.ENDC}\n{e}") else: if "type" in json.loads(r.text).keys() and "code" in json.loads(r.text).keys(): if json.loads(r.text)["type"] == "success": print(f"{bcolors.OKGREEN}Success!{bcolors.ENDC}") else: print(f"{bcolors.FAIL}Unsuccessful :( Code: {bcolors.BOLD}{json.loads(r.text)['code']} ({json.loads(r.text)['type']}){bcolors.ENDC}") print(r.text) # keep and uncomment for debug reasons else: print(f"{bcolors.FAIL}Failed. Got some weird response from OpenDTU while trying to set the new limit. Maybe the OpenDTU API has changed?{bcolors.ENDC}\nOpenDTUs response: (needs to be in JSON and contain the keys 'code' and 'type')\n{r.text}") else: print(f"{bcolors.WARNING}Now the new limit would be set via the API (but DRY_RUN is either not specified or True): {str(new_limit)}%") elif status[opendtu_inverter_sn]["max_power"] == 0: print(f"{bcolors.ERROR}OpenDTU is reporting strange values for the inverter's maximum power output. Skipping it for now.{bcolors.ENDC}") while (time() - last_time) < limit_update_interval: sleep(0.5) last_time = time() solar_power_limit_set_thread = Thread(target=threaded_solar_power_limit_setting) solar_power_limit_set_thread.start()