From 517db51909f2ee4a396ca6a5ef415a0e6657192a Mon Sep 17 00:00:00 2001 From: BlueFox Date: Sat, 30 Mar 2024 17:10:20 +0000 Subject: [PATCH] Added support for the interrupt_pin parameter and added blinking leds on send/receive --- lora_pingpong.py | 66 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/lora_pingpong.py b/lora_pingpong.py index 60f9cf5..4813c9b 100644 --- a/lora_pingpong.py +++ b/lora_pingpong.py @@ -12,9 +12,15 @@ You should have received a copy of the GNU General Public License along with thi from SX127x import SX127x from machine import Pin, SPI -from time import sleep, time_ns +from time import sleep, time from json import loads +def blink_led(pin, sleep_time=0.2): + pin = Pin(pin, Pin.OUT) + + pin.value(1) + sleep(sleep_time) + pin.value(0) def as_initializer(lora, lcd, lcd_connected, interrupt_pin): ping_to_send = True @@ -23,22 +29,36 @@ def as_initializer(lora, lcd, lcd_connected, interrupt_pin): RSSI_REQ = -1 SNR_REQ = -1 - while True: + # pin definitions + interrupt = None + if interrupt_pin: + interrupt = Pin(interrupt_pin, Pin.OUT) + # the lambda only returns true true if a pin is specified and it is pulled high (btn pressed) + interrupted = lambda: interrupt.value() == 1 if interrupt else False + receive_led = 26 + send_led = 27 + + while not interrupted(): if ping_to_send: COUNT += 1 msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}' if lcd_connected: lcd.move_to(14,0) lcd.putstr(">") + lcd.move_to(0,1) + lcd.putstr("Waiting for pong") print(f"[LoRaPingPong] Sending ping: {msg}") lora.println(msg) + blink_led(send_led) ping_to_send = False if not ping_to_send: print("[LoRaPingPong] Now waiting for a response") while not lora.received_packet(): sleep(0.01) + if interrupted(): return # if interrupt btn is pressed, stop listening # TODO: implement a timeout # NOW got a response! + blink_led(receive_led) payload = lora.read_payload() try: payload_dict = loads(payload) @@ -62,7 +82,10 @@ def as_initializer(lora, lcd, lcd_connected, interrupt_pin): lcd.putstr(f"LoRaPingPong [<]") print(f"[LoRaPingPong] Got a response: {payload}") print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!") - sleep(10) # sleep 10 seconds + # sleep 10 seconds + time_now = time() + while (time() - time_now) < 10: + if interrupted(): return # end ping pong when btn at interrupt_pin is pressed (given as param) ping_to_send = True @@ -73,13 +96,29 @@ def as_responder(lora, lcd, lcd_connected, interrupt_pin): RSSI_REQ = -1 SNR_REQ = -1 - while True: + # pin definitions + interrupt = None + if interrupt_pin: + interrupt = Pin(interrupt_pin, Pin.OUT) + # the lambda only returns true true if a pin is specified and it is pulled high (btn pressed) + interrupted = lambda: interrupt.value() == 1 if interrupt else False + receive_led = 26 + send_led = 27 + + # print waiting for ping at startup (if lcd's connected) + if lcd: + lcd.move_to(0,1) + lcd.putstr("Waiting for ping") + + while not interrupted(): # run as long the interrupt pin is low (btn not pressed) if to_wait_for_ping: # wait for ping print("[LoRaPingPong] Waiting for a ping (request)") while not lora.received_packet(): sleep(0.01) + if interrupted(): return # if interrupt btn is pressed, stop listening # TODO: implement a timeout # NOW got a response! + blink_led(receive_led) payload = lora.read_payload() try: payload_dict = loads(payload) @@ -116,6 +155,7 @@ def as_responder(lora, lcd, lcd_connected, interrupt_pin): lcd.putstr(f"LoRaPingPong [>]") print(f"[LoRaPingPong] Sending pong (response): {msg}") lora.println(msg) + blink_led(send_led) to_wait_for_ping = True @@ -151,7 +191,7 @@ def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None): lcd = I2C_LCD(_i2c, 0x27, 2, 16) lcd.move_to(0,0) - lcd.putstr(f"LoRaPingPong [ ]Waiting for ping") + lcd.putstr(f"LoRaPingPong [ ]") # infinite loop running the ping at first if initializer: @@ -182,22 +222,24 @@ parameters = { 'invert_IQ': False, } -lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 27}, parameters=parameters) +lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 7}, parameters=parameters) # The run function (for different options; see below the "if __name__ == ..." section) def run_lcd_init(): - pingpong(lora, initializer=True, lcd_connected=True) + pingpong(lora, initializer=True, lcd_connected=True, interrupt_pin=8) def run_wo_lcd_init(): - pingpong(lora, initializer=True, lcd_connected=False) + pingpong(lora, initializer=True, lcd_connected=False, interrupt_pin=8) def run_lcd_resp(): - pingpong(lora, initializer=False, lcd_connected=True) + pingpong(lora, initializer=False, lcd_connected=True, interrupt_pin=8) def run_wo_lcd_resp(): - pingpong(lora, initializer=False, lcd_connected=False) + pingpong(lora, initializer=False, lcd_connected=False, interrupt_pin=8) if __name__ == "__main__": #run_lcd_resp() # run as responder (with lcd) #run_wo_lcd_resp() # run as responder (without lcd) - #run_lcd_init() # run as initializer (with lcd) - run_wo_lcd_init() # run as initializer (without lcd) + run_lcd_init() # run as initializer (with lcd) + #run_wo_lcd_init() # run as initializer (without lcd) + + print("Finished execution")