Compare commits
5 Commits
e06cc1d4fc
...
9e0703051f
Author | SHA1 | Date | |
---|---|---|---|
9e0703051f | |||
fff3699efe | |||
3999de77d7 | |||
023b29fcae | |||
44b48485be |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
config.ini
|
||||
tasmotonov-gui-config.json
|
||||
__pycache__/
|
||||
|
@ -20,7 +20,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import tasmotonov
|
||||
import configparser
|
||||
import json
|
||||
from PySide6.QtUiTools import QUiLoader
|
||||
from PySide6.QtWidgets import QApplication,QFileDialog
|
||||
from PySide6.QtCore import QFile, QUrl, QIODevice
|
||||
@ -29,35 +29,41 @@ from PySide6.QtCore import QFile, QUrl, QIODevice
|
||||
filename = ""
|
||||
action = "toggle"
|
||||
|
||||
def load_config(file="config.ini"): # load config from configparser.ConfigParser object
|
||||
to_load = configparser.ConfigParser()
|
||||
to_load.read(file)
|
||||
def load_config(file="tasmotonov-gui-config.json"): # load config from configparser.ConfigParser object
|
||||
try:
|
||||
with open(file, "r") as f:
|
||||
to_load = json.loads(f.read())
|
||||
global filename, action
|
||||
if "DEFAULT" in to_load:
|
||||
if "file" in to_load['DEFAULT']:
|
||||
filename = to_load['DEFAULT']['file']
|
||||
if "file" in to_load:
|
||||
filename = to_load['file']
|
||||
if filename != "":
|
||||
try:
|
||||
tab1_load_file()
|
||||
except FileNotFoundError:
|
||||
tab1_clear()
|
||||
if "inline" in to_load['DEFAULT']:
|
||||
window.tab2_plainTextEdit.setPlainText(to_load['DEFAULT']['inline'])
|
||||
if "action" in to_load['DEFAULT']:
|
||||
a_new = to_load['DEFAULT']['action']
|
||||
if "inline" in to_load:
|
||||
window.tab2_plainTextEdit.setPlainText(to_load['inline'])
|
||||
if "action" in to_load:
|
||||
a_new = to_load['action']
|
||||
if a_new != "":
|
||||
action = a_new
|
||||
# else: no config there yet
|
||||
def save_config(file="config.ini"):
|
||||
if "tab_index" in to_load:
|
||||
tab_index = to_load["tab_index"]
|
||||
if type(tab_index) == int and (0 <= tab_index < window.tabWidget.count()):
|
||||
window.tabWidget.setCurrentIndex(to_load["tab_index"])
|
||||
except FileNotFoundError:
|
||||
pass # no config there yet
|
||||
|
||||
def save_config(file="tasmotonov-gui-config.json"):
|
||||
window.tab3_textBrowser.append("Saving configuration!")
|
||||
global filename, action
|
||||
to_save = configparser.ConfigParser()
|
||||
to_save['DEFAULT'] = {}
|
||||
to_save['DEFAULT']['file'] = str(filename)
|
||||
to_save['DEFAULT']['inline'] = str(window.tab2_plainTextEdit.toPlainText())
|
||||
to_save['DEFAULT']['action'] = str(action)
|
||||
with open(file, 'w') as configfile:
|
||||
to_save.write(configfile)
|
||||
to_save = {}
|
||||
to_save['file'] = str(filename)
|
||||
to_save['inline'] = str(window.tab2_plainTextEdit.toPlainText())
|
||||
to_save['action'] = str(action)
|
||||
to_save['tab_index'] = window.tabWidget.currentIndex()
|
||||
with open(file, 'w') as f:
|
||||
f.write(json.dumps(to_save))
|
||||
|
||||
# tab1 slots
|
||||
def tab1_load_file_pressed():
|
||||
@ -76,7 +82,14 @@ def tab1_load_file():
|
||||
window.tab1_label_help.show()
|
||||
# clear listWidget and add the items
|
||||
window.tab1_listWidget.clear()
|
||||
window.tab1_listWidget.addItems(tasmotonov.TasmotonovRunner("file", filename, action, False, False).get_addresses())
|
||||
addresses = tasmotonov.TasmotonovRunner("file", filename, action, False, False).get_addresses()
|
||||
items_to_add = []
|
||||
for address, comment in addresses.items():
|
||||
if comment == "":
|
||||
items_to_add.append(address)
|
||||
else:
|
||||
items_to_add.append(f"{comment} [{address}]")
|
||||
window.tab1_listWidget.addItems(items_to_add)
|
||||
def tab1_clear():
|
||||
global filename
|
||||
filename = ""
|
||||
@ -107,7 +120,14 @@ def tab2_plainTextEdit_change():
|
||||
content = window.tab2_plainTextEdit.toPlainText()
|
||||
if content != "":
|
||||
tasmotonov_runner = tasmotonov.TasmotonovRunner("inline", content, action, False, False)
|
||||
window.tab2_listWidget.addItems(tasmotonov_runner.get_addresses())
|
||||
addresses = tasmotonov_runner.get_addresses()
|
||||
items_to_add = []
|
||||
for address, comment in addresses.items():
|
||||
if comment == "":
|
||||
items_to_add.append(address)
|
||||
else:
|
||||
items_to_add.append(f"{comment} [{address}]")
|
||||
window.tab2_listWidget.addItems(items_to_add)
|
||||
window.tab3_textBrowser.append(str(tasmotonov_runner.logger.log_string))
|
||||
def tab2_action():
|
||||
content = window.tab2_plainTextEdit.toPlainText()
|
||||
|
103
tasmotonov.py
103
tasmotonov.py
@ -20,10 +20,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse # to parse the cli args!
|
||||
import requests # needed to access the http endpoints
|
||||
import threading # for running the requests simultaneously
|
||||
import ipaddress # to validate IP addresses
|
||||
from fqdn import FQDN # validate FQDNs
|
||||
|
||||
version = "v1.1.0"
|
||||
version = "v1.2.0"
|
||||
|
||||
|
||||
# some helpers / utils
|
||||
@ -99,99 +100,103 @@ class TasmotonovRunner:
|
||||
raise ValueError(error_string)
|
||||
|
||||
self.logger.log(f'Initialized runner: source={source}, data={data}, action={action}, verbose={verbose}')
|
||||
self.tasmota_addresses = self.remove_invalid_addresses(self.clean_addresses(self.parse_addresses())) # returns a list
|
||||
self.set_valid_addresses_and_comments(self.parse_addresses())
|
||||
self.logger.log(f'Validated addresses: distilled the following data. Devices added: {self.tasmota_addresses}')
|
||||
|
||||
def parse_addresses(self):
|
||||
# convert the given adresses to a list
|
||||
tasmota_addresses = []
|
||||
tasmota_addresses_raw = []
|
||||
if self.source == 'file':
|
||||
try:
|
||||
self.logger.log('Now trying to open the given file...', end='')
|
||||
with open(self.data, 'r') as file:
|
||||
contents = file.read()
|
||||
if ';' in contents:
|
||||
tasmota_addresses = contents.split(';')
|
||||
elif ',' in contents:
|
||||
tasmota_addresses = contents.split(',')
|
||||
if ',' in contents:
|
||||
tasmota_addresses_raw = contents.split(',')
|
||||
elif ';' in contents:
|
||||
tasmota_addresses_raw = contents.split(';')
|
||||
elif '\n' in contents:
|
||||
tasmota_addresses = contents.split('\n')
|
||||
elif ' ' in contents:
|
||||
tasmota_addresses = contents.split(' ')
|
||||
tasmota_addresses_raw = contents.split('\n')
|
||||
else:
|
||||
tasmota_addresses = [contents]
|
||||
tasmota_addresses_raw = [contents]
|
||||
self.logger.log('Done.')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
||||
except FileNotFoundError as e:
|
||||
self.logger.log_error(f'Failed reading addresses. File with the path "{self.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
||||
raise e
|
||||
|
||||
elif self.source == 'inline':
|
||||
self.logger.log('Now reading from inline... ', end='')
|
||||
if ';' in self.data:
|
||||
tasmota_addresses = self.data.split(';')
|
||||
elif ',' in self.data:
|
||||
tasmota_addresses = self.data.split(',')
|
||||
if ',' in self.data:
|
||||
tasmota_addresses_raw = self.data.split(',')
|
||||
elif ';' in self.data:
|
||||
tasmota_addresses_raw = self.data.split(';')
|
||||
elif '\n' in self.data:
|
||||
tasmota_addresses = self.data.split('\n')
|
||||
elif ' ' in self.data:
|
||||
tasmota_addresses = self.data.split(' ')
|
||||
tasmota_addresses_raw = self.data.split('\n')
|
||||
else:
|
||||
tasmota_addresses = [self.data]
|
||||
tasmota_addresses_raw = [self.data]
|
||||
self.logger.log('Done.')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
||||
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
||||
return tasmota_addresses_raw
|
||||
|
||||
return tasmota_addresses
|
||||
|
||||
def clean_addresses(self, addresses_raw):
|
||||
# clean them up (e.g. remove newlines if the file has a random newline somewhere
|
||||
tasmota_addresses_cleaned = []
|
||||
for address in addresses_raw:
|
||||
to_add = address.replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
||||
if to_add != '':
|
||||
tasmota_addresses_cleaned.append(to_add)
|
||||
return tasmota_addresses_cleaned
|
||||
|
||||
def remove_invalid_addresses(self, addressse_raw):
|
||||
def set_valid_addresses_and_comments(self, tasmota_addresses_raw):
|
||||
# now check data for consistency and integrity (is it really an ip address or a domain name?)
|
||||
# remove the ones that are not valid and log a warning
|
||||
tasmota_addresses_validated = []
|
||||
tasmota_addresses_validated = {}
|
||||
self.logger.log('Now validating given addresses...')
|
||||
for address in addressse_raw:
|
||||
if is_ip(address) or is_fqdn(address):
|
||||
tasmota_addresses_validated.append(address)
|
||||
for address in tasmota_addresses_raw:
|
||||
address_split = address.split("#") # split to separate existing comments
|
||||
address_cleaned = address_split[0].replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
||||
if len(address_split) > 1:
|
||||
comment = address.split("#")[-1].lstrip()
|
||||
else:
|
||||
self.logger.log_warning(f'The address "{address}" is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
||||
self.logger.log('Validated given adddresses.')
|
||||
comment = ""
|
||||
if address_cleaned != "":
|
||||
if is_ip(address_cleaned) or is_fqdn(address_cleaned):
|
||||
tasmota_addresses_validated[address_cleaned] = comment
|
||||
else:
|
||||
self.logger.log_warning(f'The address "{address_cleaned}" (comment: "{comment}") is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
||||
|
||||
return tasmota_addresses_validated
|
||||
self.tasmota_addresses = tasmota_addresses_validated
|
||||
|
||||
# some getters
|
||||
def get_addresses(self):
|
||||
return self.tasmota_addresses
|
||||
def get_address(self, index):
|
||||
return self.tasmota_addresses[index]
|
||||
return {list(self.tasmota_addresses.keys())[index]: list(self.tasmota_addresses.values())[index]}
|
||||
|
||||
def run_custom_action(self, action):
|
||||
# and finally turn on or off or toggle the lights!
|
||||
for address in self.tasmota_addresses:
|
||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
||||
def thread_runner(address):
|
||||
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
||||
try:
|
||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||
success = True
|
||||
except requests.exceptions.ConnectionError:
|
||||
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
||||
success = False
|
||||
if answer.status_code != 200:
|
||||
self.logger.log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
||||
if not str(answer.status_code).startswith("20"):
|
||||
self.logger.log_warning(f'{address} returned the following non-20x status code: {answer.status_code}. Skipping...')
|
||||
success = False
|
||||
if success:
|
||||
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully.")
|
||||
|
||||
threads = []
|
||||
for address in list(self.tasmota_addresses.keys()):
|
||||
t = threading.Thread(target=thread_runner, args=(address,))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
# wait for all threads to finish execution
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
||||
def run(self):
|
||||
self.run_custom_action(self.action)
|
||||
|
||||
def run_single_custom_action(self, index, action):
|
||||
address = self.tasmota_addresses[index]
|
||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
||||
address = list(self.tasmota_addresses.keys())[index]
|
||||
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
||||
try:
|
||||
answer = None
|
||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||
@ -217,10 +222,10 @@ if __name__ == '__main__':
|
||||
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
||||
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, space-separated, or newline-separated!\n\n© Benjamin Burkhardt, 2025')
|
||||
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, or newline-separated (each entry can have a comment starting with a # (hashtag)!\n\n© Benjamin Burkhardt, 2025')
|
||||
|
||||
parser.add_argument('source', help='Select either to read the adresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
||||
parser.add_argument('data', help='Either the path to the file, or a comma-, space- or semicolon-separated list of tasmota adresses.')
|
||||
parser.add_argument('data', help='Either the path to the file, or a comma-, semicolon- or newline-separated list of tasmota adresses.')
|
||||
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
||||
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
||||
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
||||
|
Loading…
x
Reference in New Issue
Block a user