Changed tastmotonov.py's structure to make it object-oriented and importable from python3
This commit is contained in:
parent
f5f132914b
commit
5c9cdfd78e
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
config.ini
|
||||
__pycache__/
|
224
tasmotonov.py
224
tasmotonov.py
@ -18,29 +18,15 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import sys # for exiting with the correct exit code
|
||||
import argparse # to parse the cli args!
|
||||
import requests # needed to access the http endpoints
|
||||
import ipaddress # to validate IP addresses
|
||||
from fqdn import FQDN # validate FQDNs
|
||||
|
||||
version = "v1.0.0"
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
||||
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, or newline-separated!\n\n© Benjamin Burkhardt, 2025')
|
||||
|
||||
parser.add_argument('source', help='Select either to read the adresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
||||
parser.add_argument('data', help='Either the path to the file, or a comma- or semicolon-separated list of tasmota adresses.')
|
||||
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
||||
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
||||
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
||||
version = "v1.1.0"
|
||||
|
||||
|
||||
# some helpers / utils
|
||||
|
||||
def is_ip(string):
|
||||
try:
|
||||
i = ipaddress.ip_address(string)
|
||||
@ -54,41 +40,74 @@ def is_fqdn(string):
|
||||
|
||||
|
||||
# logging
|
||||
class Logger():
|
||||
def __init__(self, verbose=True, enable_stdout=True):
|
||||
self.verbose = verbose
|
||||
self.enable_stdout = enable_stdout
|
||||
self.log_string = "" # string variable where the logged lines are stored
|
||||
self.COLORS = {
|
||||
'HEADER':'\033[95m',
|
||||
'OKBLUE':'\033[94m',
|
||||
'OKCYAN':'\033[96m',
|
||||
'OKGREEN':'\033[92m',
|
||||
'WARNING':'\033[93m',
|
||||
'FAIL':'\033[91m',
|
||||
'ENDC':'\033[0m',
|
||||
'BOLD': '\033[1m',
|
||||
'UNDERLINE': '\033[4m'
|
||||
}
|
||||
|
||||
class COLORS:
|
||||
HEADER = '\033[95m'
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
UNDERLINE = '\033[4m'
|
||||
|
||||
def log(to_log, verbose = True, end = '\n'):
|
||||
if (verbose and args.verbose) or not verbose:
|
||||
def log(self, to_log, verbose = True, end = '\n'):
|
||||
if self.enable_stdout and ((verbose and self.verbose) or not verbose):
|
||||
print(to_log, end=end)
|
||||
def log_error(to_log, end='\n'):
|
||||
log(COLORS.FAIL + '[ERROR] ' + COLORS.ENDC + to_log, verbose=False, end=end)
|
||||
def log_warning(to_log, end='\n'):
|
||||
log(COLORS.WARNING + '[WARNING] ' + COLORS.ENDC + to_log, verbose=False, end=end)
|
||||
def log_success(to_log, end='\n'):
|
||||
log(COLORS.OKGREEN + '[SUCCESS] ' + COLORS.ENDC + to_log, verbose=False, end=end)
|
||||
if ((verbose and self.verbose) or not verbose):
|
||||
# remove the shell colors
|
||||
to_log_color_cleaned = to_log
|
||||
for color in list(self.COLORS.values()):
|
||||
to_log_color_cleaned = to_log_color_cleaned.replace(color, '')
|
||||
self.log_string += to_log_color_cleaned + end
|
||||
|
||||
def log_error(self, to_log, end='\n'):
|
||||
self.log(self.COLORS['FAIL'] + '[ERROR] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
||||
def log_warning(self, to_log, end='\n'):
|
||||
self.log(self.COLORS['WARNING'] + '[WARNING] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
||||
def log_success(self, to_log, end='\n'):
|
||||
self.log(self.COLORS['OKGREEN'] + '[SUCCESS] ' + self.COLORS['ENDC'] + to_log, verbose=False, end=end)
|
||||
|
||||
|
||||
# the real program
|
||||
# the tasmotonov runner class
|
||||
class TasmotonovRunner:
|
||||
def __init__(self, source, data, action, verbose=True, enable_stdout=False):
|
||||
if str(source).lower() in ['file', 'inline']:
|
||||
self.source = source
|
||||
else:
|
||||
error_string = 'The source param must be either "file" or "inline" (case-insensitive)'
|
||||
logger.log_error(error_string)
|
||||
raise ValueError(error_string)
|
||||
self.data = data
|
||||
if str(action).upper() in ['ON', 'OFF', 'TOGGLE']:
|
||||
self.action = action.upper()
|
||||
else:
|
||||
error_string = 'The action param must be either "on", "off" or "toggle" (case-insensitive)'
|
||||
logger.log_error(error_string)
|
||||
raise ValueError(error_string)
|
||||
if type(verbose) == bool and type(enable_stdout) == bool:
|
||||
self.logger = Logger(verbose, enable_stdout)
|
||||
else:
|
||||
error_string = 'Both verbose and enable_stdout params must be boolean'
|
||||
logger.log_error(error_string)
|
||||
raise ValueError(error_string)
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parser.parse_args() # parse all given arguments
|
||||
log(f'Parsed args: {args}')
|
||||
self.logger.log(f'Initialized runner: source={source}, data={data}, action={action}, verbose={verbose}')
|
||||
self.tasmota_addresses = self.remove_invalid_addresses(self.clean_addresses(self.parse_addresses())) # returns a list
|
||||
|
||||
def parse_addresses(self):
|
||||
# convert the given adresses to a list
|
||||
tasmota_addresses = []
|
||||
if args.source == 'file':
|
||||
if self.source == 'file':
|
||||
try:
|
||||
log('Now trying to open the given file...', end='')
|
||||
with open(args.data, 'r') as file:
|
||||
self.logger.log('Now trying to open the given file...', end='')
|
||||
with open(self.data, 'r') as file:
|
||||
contents = file.read()
|
||||
if ';' in contents:
|
||||
tasmota_addresses = contents.split(';')
|
||||
@ -96,62 +115,117 @@ if __name__ == '__main__':
|
||||
tasmota_addresses = contents.split(',')
|
||||
elif '\n' in contents:
|
||||
tasmota_addresses = contents.split('\n')
|
||||
elif ' ' in contents:
|
||||
tasmota_addresses = contents.split(' ')
|
||||
else:
|
||||
tasmota_addresses = [contents]
|
||||
log('Done.')
|
||||
log(f'Collected addresses: {tasmota_addresses}')
|
||||
except FileNotFoundError:
|
||||
log_error(f'Failed reading addresses. File with the path "{args.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
||||
sys.exit(1)
|
||||
self.logger.log('Done.')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
||||
except FileNotFoundError as e:
|
||||
self.logger.log_error(f'Failed reading addresses. File with the path "{self.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
||||
raise e
|
||||
|
||||
elif args.source == 'inline':
|
||||
log('Now reading from inline... ', end='')
|
||||
if ';' in args.data:
|
||||
tasmota_addresses = args.data.split(';')
|
||||
elif ',' in args.data:
|
||||
tasmota_addresses = args.data.split(',')
|
||||
elif '\n' in args.data:
|
||||
tasmota_addresses = args.data.split('\n')
|
||||
elif self.source == 'inline':
|
||||
self.logger.log('Now reading from inline... ', end='')
|
||||
if ';' in self.data:
|
||||
tasmota_addresses = self.data.split(';')
|
||||
elif ',' in self.data:
|
||||
tasmota_addresses = self.data.split(',')
|
||||
elif '\n' in self.data:
|
||||
tasmota_addresses = self.data.split('\n')
|
||||
elif ' ' in self.data:
|
||||
tasmota_addresses = self.data.split(' ')
|
||||
else:
|
||||
tasmota_addresses = [args.data]
|
||||
log('Done.')
|
||||
log(f'Collected addresses: {tasmota_addresses}')
|
||||
|
||||
else:
|
||||
log_error(f'You need to specify either "file" or "inline" as the data source for addresses!')
|
||||
sys.exit(1)
|
||||
tasmota_addresses = [self.data]
|
||||
self.logger.log('Done.')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
||||
|
||||
return tasmota_addresses
|
||||
|
||||
def clean_addresses(self, addresses_raw):
|
||||
# clean them up (e.g. remove newlines if the file has a random newline somewhere
|
||||
tasmota_addresses_cleaned = []
|
||||
for address in tasmota_addresses:
|
||||
if address != '':
|
||||
tasmota_addresses_cleaned.append(address.replace('\n', '').replace(';', '').replace(',', '').replace(' ', ''))
|
||||
for address in addresses_raw:
|
||||
to_add = address.replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
||||
if to_add != '':
|
||||
tasmota_addresses_cleaned.append(to_add)
|
||||
return tasmota_addresses_cleaned
|
||||
|
||||
def remove_invalid_addresses(self, addressse_raw):
|
||||
# now check data for consistency and integrity (is it really an ip address or a domain name?)
|
||||
# remove the ones that are not valid and log a warning
|
||||
tasmota_addresses_validated = []
|
||||
log('Now validating given addresses...')
|
||||
for address in tasmota_addresses_cleaned:
|
||||
self.logger.log('Now validating given addresses...')
|
||||
for address in addressse_raw:
|
||||
if is_ip(address) or is_fqdn(address):
|
||||
tasmota_addresses_validated.append(address)
|
||||
else:
|
||||
log_warning(f'The address "{address}" is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
||||
log('Validated given adddresses.')
|
||||
self.logger.log_warning(f'The address "{address}" is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
||||
self.logger.log('Validated given adddresses.')
|
||||
|
||||
return tasmota_addresses_validated
|
||||
|
||||
# some getters
|
||||
def get_addresses(self):
|
||||
return self.tasmota_addresses
|
||||
def get_address(self, index):
|
||||
return self.tasmota_addresses[index]
|
||||
|
||||
def run_custom_action(self, action):
|
||||
# and finally turn on or off or toggle the lights!
|
||||
for address in tasmota_addresses_validated:
|
||||
log('Running the action "{args.action}" on the following device: {address}')
|
||||
for address in self.tasmota_addresses:
|
||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
||||
try:
|
||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(args.action).lower()}')
|
||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||
success = True
|
||||
except requests.exceptions.ConnectionError:
|
||||
log_warning(f'Could not connect to "{address}". Skipping...')
|
||||
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
||||
success = False
|
||||
if answer.status_code != 200:
|
||||
log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
||||
self.logger.log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
||||
success = False
|
||||
if success:
|
||||
log_success(f"Ran action {str(args.action).upper()} on {COLORS.OKCYAN}{address}{COLORS.ENDC} successfully.")
|
||||
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully.")
|
||||
def run(self):
|
||||
self.run_custom_action(self.action)
|
||||
|
||||
sys.exit(0)
|
||||
def run_single_custom_action(self, index, action):
|
||||
address = self.tasmota_addresses[index]
|
||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
||||
try:
|
||||
answer = None
|
||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||
success = True
|
||||
except requests.exceptions.ConnectionError:
|
||||
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
||||
success = False
|
||||
if answer:
|
||||
if answer.status_code != 200:
|
||||
self.logger.log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
||||
success = False
|
||||
else:
|
||||
self.logger.log_warning(f'{address} does not work. Skipping...')
|
||||
success = False
|
||||
if success:
|
||||
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully. ")
|
||||
def run_single(self, index):
|
||||
self.run_single_custom_action(index, self.action)
|
||||
|
||||
# when run as a script in CLI
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
||||
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, space-separated, or newline-separated!\n\n© Benjamin Burkhardt, 2025')
|
||||
|
||||
parser.add_argument('source', help='Select either to read the adresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
||||
parser.add_argument('data', help='Either the path to the file, or a comma-, space- or semicolon-separated list of tasmota adresses.')
|
||||
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
||||
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
||||
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
||||
|
||||
args = parser.parse_args() # parse all given arguments
|
||||
|
||||
runner = TasmotonovRunner(source=args.source, data=args.data, action=args.action, verbose=args.verbose, enable_stdout=True)
|
||||
runner.run()
|
||||
|
Loading…
x
Reference in New Issue
Block a user