Compare commits
5 Commits
e06cc1d4fc
...
9e0703051f
Author | SHA1 | Date | |
---|---|---|---|
9e0703051f | |||
fff3699efe | |||
3999de77d7 | |||
023b29fcae | |||
44b48485be |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
|||||||
config.ini
|
config.ini
|
||||||
|
tasmotonov-gui-config.json
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
@ -20,7 +20,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import tasmotonov
|
import tasmotonov
|
||||||
import configparser
|
import json
|
||||||
from PySide6.QtUiTools import QUiLoader
|
from PySide6.QtUiTools import QUiLoader
|
||||||
from PySide6.QtWidgets import QApplication,QFileDialog
|
from PySide6.QtWidgets import QApplication,QFileDialog
|
||||||
from PySide6.QtCore import QFile, QUrl, QIODevice
|
from PySide6.QtCore import QFile, QUrl, QIODevice
|
||||||
@ -29,35 +29,41 @@ from PySide6.QtCore import QFile, QUrl, QIODevice
|
|||||||
filename = ""
|
filename = ""
|
||||||
action = "toggle"
|
action = "toggle"
|
||||||
|
|
||||||
def load_config(file="config.ini"): # load config from configparser.ConfigParser object
|
def load_config(file="tasmotonov-gui-config.json"): # load config from configparser.ConfigParser object
|
||||||
to_load = configparser.ConfigParser()
|
try:
|
||||||
to_load.read(file)
|
with open(file, "r") as f:
|
||||||
global filename, action
|
to_load = json.loads(f.read())
|
||||||
if "DEFAULT" in to_load:
|
global filename, action
|
||||||
if "file" in to_load['DEFAULT']:
|
if "file" in to_load:
|
||||||
filename = to_load['DEFAULT']['file']
|
filename = to_load['file']
|
||||||
if filename != "":
|
if filename != "":
|
||||||
try:
|
try:
|
||||||
tab1_load_file()
|
tab1_load_file()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
tab1_clear()
|
tab1_clear()
|
||||||
if "inline" in to_load['DEFAULT']:
|
if "inline" in to_load:
|
||||||
window.tab2_plainTextEdit.setPlainText(to_load['DEFAULT']['inline'])
|
window.tab2_plainTextEdit.setPlainText(to_load['inline'])
|
||||||
if "action" in to_load['DEFAULT']:
|
if "action" in to_load:
|
||||||
a_new = to_load['DEFAULT']['action']
|
a_new = to_load['action']
|
||||||
if a_new != "":
|
if a_new != "":
|
||||||
action = a_new
|
action = a_new
|
||||||
# else: no config there yet
|
if "tab_index" in to_load:
|
||||||
def save_config(file="config.ini"):
|
tab_index = to_load["tab_index"]
|
||||||
|
if type(tab_index) == int and (0 <= tab_index < window.tabWidget.count()):
|
||||||
|
window.tabWidget.setCurrentIndex(to_load["tab_index"])
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass # no config there yet
|
||||||
|
|
||||||
|
def save_config(file="tasmotonov-gui-config.json"):
|
||||||
window.tab3_textBrowser.append("Saving configuration!")
|
window.tab3_textBrowser.append("Saving configuration!")
|
||||||
global filename, action
|
global filename, action
|
||||||
to_save = configparser.ConfigParser()
|
to_save = {}
|
||||||
to_save['DEFAULT'] = {}
|
to_save['file'] = str(filename)
|
||||||
to_save['DEFAULT']['file'] = str(filename)
|
to_save['inline'] = str(window.tab2_plainTextEdit.toPlainText())
|
||||||
to_save['DEFAULT']['inline'] = str(window.tab2_plainTextEdit.toPlainText())
|
to_save['action'] = str(action)
|
||||||
to_save['DEFAULT']['action'] = str(action)
|
to_save['tab_index'] = window.tabWidget.currentIndex()
|
||||||
with open(file, 'w') as configfile:
|
with open(file, 'w') as f:
|
||||||
to_save.write(configfile)
|
f.write(json.dumps(to_save))
|
||||||
|
|
||||||
# tab1 slots
|
# tab1 slots
|
||||||
def tab1_load_file_pressed():
|
def tab1_load_file_pressed():
|
||||||
@ -76,7 +82,14 @@ def tab1_load_file():
|
|||||||
window.tab1_label_help.show()
|
window.tab1_label_help.show()
|
||||||
# clear listWidget and add the items
|
# clear listWidget and add the items
|
||||||
window.tab1_listWidget.clear()
|
window.tab1_listWidget.clear()
|
||||||
window.tab1_listWidget.addItems(tasmotonov.TasmotonovRunner("file", filename, action, False, False).get_addresses())
|
addresses = tasmotonov.TasmotonovRunner("file", filename, action, False, False).get_addresses()
|
||||||
|
items_to_add = []
|
||||||
|
for address, comment in addresses.items():
|
||||||
|
if comment == "":
|
||||||
|
items_to_add.append(address)
|
||||||
|
else:
|
||||||
|
items_to_add.append(f"{comment} [{address}]")
|
||||||
|
window.tab1_listWidget.addItems(items_to_add)
|
||||||
def tab1_clear():
|
def tab1_clear():
|
||||||
global filename
|
global filename
|
||||||
filename = ""
|
filename = ""
|
||||||
@ -107,7 +120,14 @@ def tab2_plainTextEdit_change():
|
|||||||
content = window.tab2_plainTextEdit.toPlainText()
|
content = window.tab2_plainTextEdit.toPlainText()
|
||||||
if content != "":
|
if content != "":
|
||||||
tasmotonov_runner = tasmotonov.TasmotonovRunner("inline", content, action, False, False)
|
tasmotonov_runner = tasmotonov.TasmotonovRunner("inline", content, action, False, False)
|
||||||
window.tab2_listWidget.addItems(tasmotonov_runner.get_addresses())
|
addresses = tasmotonov_runner.get_addresses()
|
||||||
|
items_to_add = []
|
||||||
|
for address, comment in addresses.items():
|
||||||
|
if comment == "":
|
||||||
|
items_to_add.append(address)
|
||||||
|
else:
|
||||||
|
items_to_add.append(f"{comment} [{address}]")
|
||||||
|
window.tab2_listWidget.addItems(items_to_add)
|
||||||
window.tab3_textBrowser.append(str(tasmotonov_runner.logger.log_string))
|
window.tab3_textBrowser.append(str(tasmotonov_runner.logger.log_string))
|
||||||
def tab2_action():
|
def tab2_action():
|
||||||
content = window.tab2_plainTextEdit.toPlainText()
|
content = window.tab2_plainTextEdit.toPlainText()
|
||||||
|
103
tasmotonov.py
103
tasmotonov.py
@ -20,10 +20,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
|
|
||||||
import argparse # to parse the cli args!
|
import argparse # to parse the cli args!
|
||||||
import requests # needed to access the http endpoints
|
import requests # needed to access the http endpoints
|
||||||
|
import threading # for running the requests simultaneously
|
||||||
import ipaddress # to validate IP addresses
|
import ipaddress # to validate IP addresses
|
||||||
from fqdn import FQDN # validate FQDNs
|
from fqdn import FQDN # validate FQDNs
|
||||||
|
|
||||||
version = "v1.1.0"
|
version = "v1.2.0"
|
||||||
|
|
||||||
|
|
||||||
# some helpers / utils
|
# some helpers / utils
|
||||||
@ -99,99 +100,103 @@ class TasmotonovRunner:
|
|||||||
raise ValueError(error_string)
|
raise ValueError(error_string)
|
||||||
|
|
||||||
self.logger.log(f'Initialized runner: source={source}, data={data}, action={action}, verbose={verbose}')
|
self.logger.log(f'Initialized runner: source={source}, data={data}, action={action}, verbose={verbose}')
|
||||||
self.tasmota_addresses = self.remove_invalid_addresses(self.clean_addresses(self.parse_addresses())) # returns a list
|
self.set_valid_addresses_and_comments(self.parse_addresses())
|
||||||
|
self.logger.log(f'Validated addresses: distilled the following data. Devices added: {self.tasmota_addresses}')
|
||||||
|
|
||||||
def parse_addresses(self):
|
def parse_addresses(self):
|
||||||
# convert the given adresses to a list
|
# convert the given adresses to a list
|
||||||
tasmota_addresses = []
|
tasmota_addresses_raw = []
|
||||||
if self.source == 'file':
|
if self.source == 'file':
|
||||||
try:
|
try:
|
||||||
self.logger.log('Now trying to open the given file...', end='')
|
self.logger.log('Now trying to open the given file...', end='')
|
||||||
with open(self.data, 'r') as file:
|
with open(self.data, 'r') as file:
|
||||||
contents = file.read()
|
contents = file.read()
|
||||||
if ';' in contents:
|
if ',' in contents:
|
||||||
tasmota_addresses = contents.split(';')
|
tasmota_addresses_raw = contents.split(',')
|
||||||
elif ',' in contents:
|
elif ';' in contents:
|
||||||
tasmota_addresses = contents.split(',')
|
tasmota_addresses_raw = contents.split(';')
|
||||||
elif '\n' in contents:
|
elif '\n' in contents:
|
||||||
tasmota_addresses = contents.split('\n')
|
tasmota_addresses_raw = contents.split('\n')
|
||||||
elif ' ' in contents:
|
|
||||||
tasmota_addresses = contents.split(' ')
|
|
||||||
else:
|
else:
|
||||||
tasmota_addresses = [contents]
|
tasmota_addresses_raw = [contents]
|
||||||
self.logger.log('Done.')
|
self.logger.log('Done.')
|
||||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
self.logger.log_error(f'Failed reading addresses. File with the path "{self.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
self.logger.log_error(f'Failed reading addresses. File with the path "{self.data} can\'t be opened because it doesn\'t exist. Exiting.')
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
elif self.source == 'inline':
|
elif self.source == 'inline':
|
||||||
self.logger.log('Now reading from inline... ', end='')
|
self.logger.log('Now reading from inline... ', end='')
|
||||||
if ';' in self.data:
|
if ',' in self.data:
|
||||||
tasmota_addresses = self.data.split(';')
|
tasmota_addresses_raw = self.data.split(',')
|
||||||
elif ',' in self.data:
|
elif ';' in self.data:
|
||||||
tasmota_addresses = self.data.split(',')
|
tasmota_addresses_raw = self.data.split(';')
|
||||||
elif '\n' in self.data:
|
elif '\n' in self.data:
|
||||||
tasmota_addresses = self.data.split('\n')
|
tasmota_addresses_raw = self.data.split('\n')
|
||||||
elif ' ' in self.data:
|
|
||||||
tasmota_addresses = self.data.split(' ')
|
|
||||||
else:
|
else:
|
||||||
tasmota_addresses = [self.data]
|
tasmota_addresses_raw = [self.data]
|
||||||
self.logger.log('Done.')
|
self.logger.log('Done.')
|
||||||
self.logger.log(f'Collected addresses: {tasmota_addresses}')
|
self.logger.log(f'Collected addresses: {tasmota_addresses_raw}')
|
||||||
|
return tasmota_addresses_raw
|
||||||
|
|
||||||
return tasmota_addresses
|
def set_valid_addresses_and_comments(self, tasmota_addresses_raw):
|
||||||
|
|
||||||
def clean_addresses(self, addresses_raw):
|
|
||||||
# clean them up (e.g. remove newlines if the file has a random newline somewhere
|
|
||||||
tasmota_addresses_cleaned = []
|
|
||||||
for address in addresses_raw:
|
|
||||||
to_add = address.replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
|
||||||
if to_add != '':
|
|
||||||
tasmota_addresses_cleaned.append(to_add)
|
|
||||||
return tasmota_addresses_cleaned
|
|
||||||
|
|
||||||
def remove_invalid_addresses(self, addressse_raw):
|
|
||||||
# now check data for consistency and integrity (is it really an ip address or a domain name?)
|
# now check data for consistency and integrity (is it really an ip address or a domain name?)
|
||||||
# remove the ones that are not valid and log a warning
|
# remove the ones that are not valid and log a warning
|
||||||
tasmota_addresses_validated = []
|
tasmota_addresses_validated = {}
|
||||||
self.logger.log('Now validating given addresses...')
|
self.logger.log('Now validating given addresses...')
|
||||||
for address in addressse_raw:
|
for address in tasmota_addresses_raw:
|
||||||
if is_ip(address) or is_fqdn(address):
|
address_split = address.split("#") # split to separate existing comments
|
||||||
tasmota_addresses_validated.append(address)
|
address_cleaned = address_split[0].replace('\n', '').replace(';', '').replace(',', '').replace(' ', '')
|
||||||
|
if len(address_split) > 1:
|
||||||
|
comment = address.split("#")[-1].lstrip()
|
||||||
else:
|
else:
|
||||||
self.logger.log_warning(f'The address "{address}" is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
comment = ""
|
||||||
self.logger.log('Validated given adddresses.')
|
if address_cleaned != "":
|
||||||
|
if is_ip(address_cleaned) or is_fqdn(address_cleaned):
|
||||||
|
tasmota_addresses_validated[address_cleaned] = comment
|
||||||
|
else:
|
||||||
|
self.logger.log_warning(f'The address "{address_cleaned}" (comment: "{comment}") is neither a valid IP address nor a FQDN / domain name; SKIPPING THIS ONE.')
|
||||||
|
|
||||||
return tasmota_addresses_validated
|
self.tasmota_addresses = tasmota_addresses_validated
|
||||||
|
|
||||||
# some getters
|
# some getters
|
||||||
def get_addresses(self):
|
def get_addresses(self):
|
||||||
return self.tasmota_addresses
|
return self.tasmota_addresses
|
||||||
def get_address(self, index):
|
def get_address(self, index):
|
||||||
return self.tasmota_addresses[index]
|
return {list(self.tasmota_addresses.keys())[index]: list(self.tasmota_addresses.values())[index]}
|
||||||
|
|
||||||
def run_custom_action(self, action):
|
def run_custom_action(self, action):
|
||||||
# and finally turn on or off or toggle the lights!
|
# and finally turn on or off or toggle the lights!
|
||||||
for address in self.tasmota_addresses:
|
def thread_runner(address):
|
||||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
||||||
try:
|
try:
|
||||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||||
success = True
|
success = True
|
||||||
except requests.exceptions.ConnectionError:
|
except requests.exceptions.ConnectionError:
|
||||||
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
self.logger.log_warning(f'Could not connect to "{address}". Skipping...')
|
||||||
success = False
|
success = False
|
||||||
if answer.status_code != 200:
|
if not str(answer.status_code).startswith("20"):
|
||||||
self.logger.log_warning(f'{address} returned the following non-200 status code: {answer.status_code}. Skipping...')
|
self.logger.log_warning(f'{address} returned the following non-20x status code: {answer.status_code}. Skipping...')
|
||||||
success = False
|
success = False
|
||||||
if success:
|
if success:
|
||||||
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully.")
|
self.logger.log_success(f"Ran action {str(action).upper()} on {self.logger.COLORS['OKCYAN']}{address}{self.logger.COLORS['ENDC']} successfully.")
|
||||||
|
|
||||||
|
threads = []
|
||||||
|
for address in list(self.tasmota_addresses.keys()):
|
||||||
|
t = threading.Thread(target=thread_runner, args=(address,))
|
||||||
|
threads.append(t)
|
||||||
|
t.start()
|
||||||
|
# wait for all threads to finish execution
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.run_custom_action(self.action)
|
self.run_custom_action(self.action)
|
||||||
|
|
||||||
def run_single_custom_action(self, index, action):
|
def run_single_custom_action(self, index, action):
|
||||||
address = self.tasmota_addresses[index]
|
address = list(self.tasmota_addresses.keys())[index]
|
||||||
self.logger.log('Running the action "{action}" on the following device: {address}')
|
self.logger.log(f'Running the action "{action}" on the following device: {address}')
|
||||||
try:
|
try:
|
||||||
answer = None
|
answer = None
|
||||||
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
answer = requests.get(f'http://{address}/cm?cmnd=Power%20{str(action).lower()}')
|
||||||
@ -217,10 +222,10 @@ if __name__ == '__main__':
|
|||||||
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
prog='Tasmotonov - simply toggle multiple tasmota lights',
|
||||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
description='A very simple script which allows you to turn on/off multiple tasmota devices specified.',
|
||||||
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, space-separated, or newline-separated!\n\n© Benjamin Burkhardt, 2025')
|
epilog='Info: if you choose a file as source, this files needs to contain the addresses of the tasmota devices either comma-separated, semicolon-separated, or newline-separated (each entry can have a comment starting with a # (hashtag)!\n\n© Benjamin Burkhardt, 2025')
|
||||||
|
|
||||||
parser.add_argument('source', help='Select either to read the adresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
parser.add_argument('source', help='Select either to read the adresses (of the devices) from a "file" or from "inline"', choices=['file', 'inline'])
|
||||||
parser.add_argument('data', help='Either the path to the file, or a comma-, space- or semicolon-separated list of tasmota adresses.')
|
parser.add_argument('data', help='Either the path to the file, or a comma-, semicolon- or newline-separated list of tasmota adresses.')
|
||||||
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
parser.add_argument('action', help='Select to turn all tasmota devices "on" or "off" or "toggle" (case insensitive)', choices=['on', 'off', 'toggle'])
|
||||||
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
parser.add_argument('-v', '--verbose', help='Turn on verbose file output', action='store_true')
|
||||||
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
parser.add_argument('--version', action='version', version=f'Tasmotonov.py {version}')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user