237 lines
11 KiB
Python
Executable File
237 lines
11 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
"""
|
|
Tasmotonov - A very simple script which allows you to turn on/off (or toggle) multiple tasmota devices specified.
|
|
Copyright (C) 2025 Benjamin Burkhardt
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
import argparse # to parse the cli args!
|
|
import requests # needed to access the http endpoints
|
|
import threading # for running the requests simultaneously
|
|
import ipaddress # to validate IP addresses
|
|
from fqdn import FQDN # validate FQDNs
|
|
|
|
version = "v1.2.0"
|
|
|
|
|
|
# some helpers / utils
|
|
def is_ip(string):
|
|
try:
|
|
i = ipaddress.ip_address(string)
|
|
del i
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
def is_fqdn(string):
|
|
return FQDN(string).is_valid
|
|
|
|
|
|
# logging
|
|
class Logger():
|
|
def __init__(self, verbose=True, enable_stdout=True):
|
|
self.verbose = verbose
|
|
self.enable_stdout = enable_stdout
|
|
self.log_string = "" # string variable where the logged lines are stored
|
|
self.COLORS = {
|
|
'HEADER':'\033[95m',
|
|
'OKBLUE':'\033[94m',
|
|
'OKCYAN':'\033[96m',
|
|
'OKGREEN':'\033[92m',
|
|
'WARNING':'\033[93m',
|
|
'FAIL':'\033[91m',
|
|
'ENDC':'\033[0m',
|
|
'BOLD': '\033[1m',
|
|
'UNDERLINE': '\033[4m'
|
|
}
|
|
|
|
def log(self, to_log, verbose = True, end = '\n'):
|
|
if self.enable_stdout and ((verbose and self.verbose) or not verbose):
|
|
print(to_log, end=end)
|
|
if ((verbose and self.verbose) or not verbose):
|
|
# remove the shell colors
|
|
to_log_color_cleaned = to_log
|
|
for color in list(self.COLORS.values()):
|
|
to_log_color_cleaned = to_log_color_cleaned.replace(color, '')
|
|
self.log_string += to_log_color_cleaned + end
|
|
|
|
def log_error(self, to_log, end='\n'):
|
|
self.log(self.COLORS['FAIL'] + '[ERROR] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
|
def log_warning(self, to_log, end='\n'):
|
|
self.log(self.COLORS['WARNING'] + '[WARNING] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
|
def log_success(self, to_log, end='\n'):
|
|
self.log(self.COLORS['OKGREEN'] + '[SUCCESS] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
|
|
|
|
|
# the tasmotonov runner class
|
|
class TasmotonovRunner:
|
|
def __init__(self, source, data, action, verbose=True, enable_stdout=False):
|
|
if str(source).lower() in ['file', 'inline']:
|
|
self.source = source
|
|
else:
|
|
error_string = 'The source param must be either "file" or "inline" (case-insensitive)'
|
|
logger.log_error(error_string)
|
|
raise ValueError(error_string)
|
|
self.data = data
|
|
if str(action).upper() in ['ON', 'OFF', 'TOGGLE']:
|
|
self.action = action.upper()
|
|
else:
|
|
error_string = 'The action param must be either "on", "off" or "toggle" (case-insensitive)'
|
|
logger.log_error(error_string)
|
|
raise ValueError(error_string)
|
|
if type(verbose) == bool and type(enable_stdout) == bool:
|
|
self.logger = Logger(verbose, enable_stdout)
|
|
else:
|
|
error_string = 'Both verbose and enable_stdout params must be boolean'
|
|
logger.log_error(error_string)
|
|
raise ValueError(error_string)
|
|
|
|
self.logger.log(f'Initialized runner: source={source}, data={data}, action={action}, verbose={verbose}')
|
|
self.set_valid_addresses_and_comments(self.parse_addresses())
|
|
self.logger.log(f'Validated addresses: distilled the following data. Devices added: {self.tasmota_addresses}')
|
|
|
|
def parse_addresses(self):
|
|
# convert the given addresses to a list
|
|
tasmota_addresses_raw = []
|
|
if self.source == 'file':
|
|
try:
|
|
self.logger.log('Now trying to open the given file...', end='')
|
|
with open(self.data, 'r') as file:
|
|
contents = file.read()
|
|
if ',' in contents:
|
|
tasmota_addresses_raw = contents.split(',')
|
|
elif ';' in contents:
|
|
tasmota_addresses_raw = contents.split(';')
|
|
elif '\n' in contents:
|
|
tasmota_addresses_raw = contents.split('\n')
|
|
else:
|
|
tasmota_addresses_raw = [contents]
|
|
self.logger.log('Done.')
|
|
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
|
except FileNotFoundError as e:
|
|
self.logger.log_error(f'Failed reading addresses. File with the path "{self.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
|
raise e
|
|
|
|
elif self.source == 'inline':
|
|
self.logger.log('Now reading from inline... ', end='')
|
|
if ',' in self.data:
|
|
tasmota_addresses_raw = self.data.split(',')
|
|
elif ';' in self.data:
|
|
tasmota_addresses_raw = self.data.split(';')
|
|
elif '\n' in self.data:
|
|
tasmota_addresses_raw = self.data.split('\n')
|
|
else:
|
|
tasmota_addresses_raw = [self.data]
|
|
self.logger.log('Done.')
|
|
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
|
return tasmota_addresses_raw
|
|
|
|
def set_valid_addresses_and_comments(self, tasmota_addresses_raw):
|
|
# now check data for consistency and integrity (is it really an ip address or a domain name?)
|
|
# remove the ones that are not valid and log a warning
|
|
tasmota_addresses_validated = {}
|
|
self.logger.log('Now validating given addresses...')
|
|
for address in tasmota_addresses_raw:
|
|
address_split = address.split("#") # split to separate existing comments
|
|
address_cleaned = address_split[0].replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
|
if len(address_split) > 1:
|
|
comment = address.split("#")[-1].lstrip()
|
|
else:
|
|
comment = ""
|
|
if address_cleaned != "":
|
|
if is_ip(address_cleaned) or is_fqdn(address_cleaned):
|
|
tasmota_addresses_validated[address_cleaned] = comment
|
|
else:
|
|
self.logger.log_warning(f'The address "{address_cleaned}" (comment: "{comment}") is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
|
|
|
self.tasmota_addresses = tasmota_addresses_validated
|
|
|
|
# some getters
|
|
def get_addresses(self):
|
|
return self.tasmota_addresses
|
|
def get_address(self, index):
|
|
return {list(self.tasmota_addresses.keys())[index]: list(self.tasmota_addresses.values())[index]}
|
|
|
|
def run_custom_action(self, action):
|
|
# and finally turn on or off or toggle the lights!
|
|
def thread_runner(address):
|
|
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
|
try:
|
|
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
|
success = True
|
|
except requests.exceptions.ConnectionError:
|
|
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
|
success = False
|
|
if not str(answer.status_code).startswith("20"):
|
|
self.logger.log_warning(f'{address} returned the following non-20x status code: {answer.status_code}. Skipping...')
|
|
success = False
|
|
if success:
|
|
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully.")
|
|
|
|
threads = []
|
|
for address in list(self.tasmota_addresses.keys()):
|
|
t = threading.Thread(target=thread_runner, args=(address,))
|
|
threads.append(t)
|
|
t.start()
|
|
# wait for all threads to finish execution
|
|
for t in threads:
|
|
t.join()
|
|
|
|
|
|
def run(self):
|
|
self.run_custom_action(self.action)
|
|
|
|
def run_single_custom_action(self, index, action):
|
|
address = list(self.tasmota_addresses.keys())[index]
|
|
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
|
try:
|
|
answer = None
|
|
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
|
success = True
|
|
except requests.exceptions.ConnectionError:
|
|
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
|
success = False
|
|
if answer:
|
|
if answer.status_code != 200:
|
|
self.logger.log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
|
success = False
|
|
else:
|
|
self.logger.log_warning(f'{address} does not work. Skipping...')
|
|
success = False
|
|
if success:
|
|
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully. ")
|
|
def run_single(self, index):
|
|
self.run_single_custom_action(index, self.action)
|
|
|
|
# when run as a script in CLI
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
|
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, or newline-separated (each entry can have a comment starting with a # (hashtag)!\n\n© Benjamin Burkhardt, 2025')
|
|
|
|
parser.add_argument('source', help='Select either to read the addresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
|
parser.add_argument('data', help='Either the path to the file, or a comma-, semicolon- or newline-separated list of tasmota addresses.')
|
|
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
|
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
|
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
|
|
|
args = parser.parse_args() # parse all given arguments
|
|
|
|
runner = TasmotonovRunner(source=args.source, data=args.data, action=args.action, verbose=args.verbose, enable_stdout=True)
|
|
runner.run()
|