Added support for the interrupt_pin parameter and added blinking leds on send/receive

This commit is contained in:
BlueFox 2024-03-30 17:10:20 +00:00
parent 22ea013871
commit 517db51909

View File

@ -12,9 +12,15 @@ You should have received a copy of the GNU General Public License along with thi
from SX127x import SX127x from SX127x import SX127x
from machine import Pin, SPI from machine import Pin, SPI
from time import sleep, time_ns from time import sleep, time
from json import loads from json import loads
def blink_led(pin, sleep_time=0.2):
pin = Pin(pin, Pin.OUT)
pin.value(1)
sleep(sleep_time)
pin.value(0)
def as_initializer(lora, lcd, lcd_connected, interrupt_pin): def as_initializer(lora, lcd, lcd_connected, interrupt_pin):
ping_to_send = True ping_to_send = True
@ -23,22 +29,36 @@ def as_initializer(lora, lcd, lcd_connected, interrupt_pin):
RSSI_REQ = -1 RSSI_REQ = -1
SNR_REQ = -1 SNR_REQ = -1
while True: # pin definitions
interrupt = None
if interrupt_pin:
interrupt = Pin(interrupt_pin, Pin.OUT)
# the lambda only returns true true if a pin is specified and it is pulled high (btn pressed)
interrupted = lambda: interrupt.value() == 1 if interrupt else False
receive_led = 26
send_led = 27
while not interrupted():
if ping_to_send: if ping_to_send:
COUNT += 1 COUNT += 1
msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}' msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}'
if lcd_connected: if lcd_connected:
lcd.move_to(14,0) lcd.move_to(14,0)
lcd.putstr(">") lcd.putstr(">")
lcd.move_to(0,1)
lcd.putstr("Waiting for pong")
print(f"[LoRaPingPong] Sending ping: {msg}") print(f"[LoRaPingPong] Sending ping: {msg}")
lora.println(msg) lora.println(msg)
blink_led(send_led)
ping_to_send = False ping_to_send = False
if not ping_to_send: if not ping_to_send:
print("[LoRaPingPong] Now waiting for a response") print("[LoRaPingPong] Now waiting for a response")
while not lora.received_packet(): while not lora.received_packet():
sleep(0.01) sleep(0.01)
if interrupted(): return # if interrupt btn is pressed, stop listening
# TODO: implement a timeout # TODO: implement a timeout
# NOW got a response! # NOW got a response!
blink_led(receive_led)
payload = lora.read_payload() payload = lora.read_payload()
try: try:
payload_dict = loads(payload) payload_dict = loads(payload)
@ -62,7 +82,10 @@ def as_initializer(lora, lcd, lcd_connected, interrupt_pin):
lcd.putstr(f"LoRaPingPong [<]") lcd.putstr(f"LoRaPingPong [<]")
print(f"[LoRaPingPong] Got a response: {payload}") print(f"[LoRaPingPong] Got a response: {payload}")
print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!") print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!")
sleep(10) # sleep 10 seconds # sleep 10 seconds
time_now = time()
while (time() - time_now) < 10:
if interrupted(): return # end ping pong when btn at interrupt_pin is pressed (given as param)
ping_to_send = True ping_to_send = True
@ -73,13 +96,29 @@ def as_responder(lora, lcd, lcd_connected, interrupt_pin):
RSSI_REQ = -1 RSSI_REQ = -1
SNR_REQ = -1 SNR_REQ = -1
while True: # pin definitions
interrupt = None
if interrupt_pin:
interrupt = Pin(interrupt_pin, Pin.OUT)
# the lambda only returns true true if a pin is specified and it is pulled high (btn pressed)
interrupted = lambda: interrupt.value() == 1 if interrupt else False
receive_led = 26
send_led = 27
# print waiting for ping at startup (if lcd's connected)
if lcd:
lcd.move_to(0,1)
lcd.putstr("Waiting for ping")
while not interrupted(): # run as long the interrupt pin is low (btn not pressed)
if to_wait_for_ping: # wait for ping if to_wait_for_ping: # wait for ping
print("[LoRaPingPong] Waiting for a ping (request)") print("[LoRaPingPong] Waiting for a ping (request)")
while not lora.received_packet(): while not lora.received_packet():
sleep(0.01) sleep(0.01)
if interrupted(): return # if interrupt btn is pressed, stop listening
# TODO: implement a timeout # TODO: implement a timeout
# NOW got a response! # NOW got a response!
blink_led(receive_led)
payload = lora.read_payload() payload = lora.read_payload()
try: try:
payload_dict = loads(payload) payload_dict = loads(payload)
@ -116,6 +155,7 @@ def as_responder(lora, lcd, lcd_connected, interrupt_pin):
lcd.putstr(f"LoRaPingPong [>]") lcd.putstr(f"LoRaPingPong [>]")
print(f"[LoRaPingPong] Sending pong (response): {msg}") print(f"[LoRaPingPong] Sending pong (response): {msg}")
lora.println(msg) lora.println(msg)
blink_led(send_led)
to_wait_for_ping = True to_wait_for_ping = True
@ -151,7 +191,7 @@ def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None):
lcd = I2C_LCD(_i2c, 0x27, 2, 16) lcd = I2C_LCD(_i2c, 0x27, 2, 16)
lcd.move_to(0,0) lcd.move_to(0,0)
lcd.putstr(f"LoRaPingPong [ ]Waiting for ping") lcd.putstr(f"LoRaPingPong [ ]")
# infinite loop running the ping at first # infinite loop running the ping at first
if initializer: if initializer:
@ -182,22 +222,24 @@ parameters = {
'invert_IQ': False, 'invert_IQ': False,
} }
lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 27}, parameters=parameters) lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 7}, parameters=parameters)
# The run function (for different options; see below the "if __name__ == ..." section) # The run function (for different options; see below the "if __name__ == ..." section)
def run_lcd_init(): def run_lcd_init():
pingpong(lora, initializer=True, lcd_connected=True) pingpong(lora, initializer=True, lcd_connected=True, interrupt_pin=8)
def run_wo_lcd_init(): def run_wo_lcd_init():
pingpong(lora, initializer=True, lcd_connected=False) pingpong(lora, initializer=True, lcd_connected=False, interrupt_pin=8)
def run_lcd_resp(): def run_lcd_resp():
pingpong(lora, initializer=False, lcd_connected=True) pingpong(lora, initializer=False, lcd_connected=True, interrupt_pin=8)
def run_wo_lcd_resp(): def run_wo_lcd_resp():
pingpong(lora, initializer=False, lcd_connected=False) pingpong(lora, initializer=False, lcd_connected=False, interrupt_pin=8)
if __name__ == "__main__": if __name__ == "__main__":
#run_lcd_resp() # run as responder (with lcd) #run_lcd_resp() # run as responder (with lcd)
#run_wo_lcd_resp() # run as responder (without lcd) #run_wo_lcd_resp() # run as responder (without lcd)
#run_lcd_init() # run as initializer (with lcd) run_lcd_init() # run as initializer (with lcd)
run_wo_lcd_init() # run as initializer (without lcd) #run_wo_lcd_init() # run as initializer (without lcd)
print("Finished execution")