""" LoRa-Training / lora_pingpong - A simple lora pingpong program for testing reasons Copyright (C) 2024 Benjamin Burkhardt This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ from SX127x import SX127x from machine import Pin, SPI from time import sleep, time from json import loads def blink_led(pin, sleep_time=0.2): pin = Pin(pin, Pin.OUT) pin.value(1) sleep(sleep_time) pin.value(0) def as_initializer(lora, lcd, lcd_connected, interrupt_pin): ping_to_send = True COUNT = 0 TX_POWER_REQ = -1 RSSI_REQ = -1 SNR_REQ = -1 # pin definitions interrupt = None if interrupt_pin: interrupt = Pin(interrupt_pin, Pin.OUT) # the lambda only returns true true if a pin is specified and it is pulled high (btn pressed) interrupted = lambda: interrupt.value() == 1 if interrupt else False receive_led = 26 send_led = 27 while not interrupted(): if ping_to_send: COUNT += 1 msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}' if lcd_connected: lcd.move_to(14,0) lcd.putstr(">") lcd.move_to(0,1) lcd.putstr("Waiting for pong") print(f"[LoRaPingPong] Sending ping: {msg}") lora.println(msg) blink_led(send_led) ping_to_send = False if not ping_to_send: print("[LoRaPingPong] Now waiting for a response") time_now = time() while not lora.received_packet(): sleep(0.01) if interrupted(): return # if interrupt btn is pressed, stop listening if (time() - time_now) > 15: # timeout if waiting too long for a response # possible causes of triggering this could be e.g. lost connection print("[LoRaPingPong] Timeout reached. Sending ping again.") if lcd: lcd.move_to(0,1) lcd.putstr("Reached timeout!") break # NOW got a response! blink_led(receive_led) payload = lora.read_payload() if not payload: # if empty because of timeout ping_to_send = True continue try: payload_dict = loads(payload) TX_POWER_REQ = payload_dict["tx_power"] COUNT = payload_dict["count"] RSSI_REQ = payload_dict["rssi_req"] SNR_REQ = payload_dict["snr_req"] except ValueError: print(f"[LoRaPingPong] Got a bad response: {payload};\n[LoRaPingPong] Ignoring this one.") continue # start the next loop cycle except KeyError: print(f"[LoRaPingPong] Some keys are missing in received payload: {payload}") print("[LoRaPingPong] Ignoring this one.") continue # start the next loop cycle if lcd_connected: # print some info about the received signal on the lcd (if connected) lcd.move_to(14,0) lcd.putstr("<") lcd.move_to(0,1) lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) lcd.move_to(0,0) lcd.putstr(f"LoRaPingPong [<]") print(f"[LoRaPingPong] Got a response: {payload}") print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!") # sleep 10 seconds time_now = time() while (time() - time_now) < 10: if interrupted(): return # end ping pong when btn at interrupt_pin is pressed (given as param) ping_to_send = True def as_responder(lora, lcd, lcd_connected, interrupt_pin): to_wait_for_ping = True COUNT = 0 TX_POWER_REQ = -1 RSSI_REQ = -1 SNR_REQ = -1 # pin definitions interrupt = None if interrupt_pin: interrupt = Pin(interrupt_pin, Pin.OUT) # the lambda only returns true true if a pin is specified and it is pulled high (btn pressed) interrupted = lambda: interrupt.value() == 1 if interrupt else False receive_led = 26 send_led = 27 # print waiting for ping at startup (if lcd's connected) if lcd: lcd.move_to(0,1) lcd.putstr("Waiting for ping") while not interrupted(): # run as long the interrupt pin is low (btn not pressed) if to_wait_for_ping: # wait for ping print("[LoRaPingPong] Waiting for a ping (request)") while not lora.received_packet(): sleep(0.01) if interrupted(): return # if interrupt btn is pressed, stop listening # NOW got a response! blink_led(receive_led) payload = lora.read_payload() try: payload_dict = loads(payload) TX_POWER_REQ = payload_dict["tx_power"] COUNT = payload_dict["count"] RSSI_REQ = lora.packet_rssi() SNR_REQ = lora.packet_snr() except ValueError: print(f"[LoRaPingPong] Got a bad ping (request): {payload};\n[LoRaPingPong] Ignoring this one.") continue # start the next loop cycle if lcd_connected: # print some info about the received signal on the lcd (if connected) lcd.move_to(14,0) lcd.putstr("<") lcd.move_to(0,1) lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) lcd.move_to(0,0) lcd.putstr(f"LoRaPingPong [<]") print(f"[LoRaPingPong] Got a ping (request): {payload}") print(f"[LoRaPingPong] Now waiting some seconds to send pong! Wo-hoo!") sleep(1) # sleep some seconds to_wait_for_ping = False else: # send pong (response) COUNT += 1 msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + ', "rssi_req": ' + str(RSSI_REQ) + ', "snr_req": ' + str(SNR_REQ) + '}' if lcd_connected: lcd.move_to(14,0) lcd.putstr(">") lcd.move_to(0,1) lcd.putstr("Pong! Wo-hoo! ") sleep(0.5) lcd.move_to(0,1) lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) lcd.move_to(0,0) lcd.putstr(f"LoRaPingPong [>]") print(f"[LoRaPingPong] Sending pong (response): {msg}") lora.println(msg) blink_led(send_led) to_wait_for_ping = True """ This function implements a simple LoRa ping-pong showcase. It runs on two devices. Which one sends the "ping" - the first message - and which one responds ("pong") - is set by the parameter "initializer". The communication is as following: - If *initializer* equals True - 1. The function sends an ping string: {"type": "ping", "count": counter, "tx_power": tx_power_req} - 2. It waits a specific time for a response looking like this: {"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req} - 3. Start again with step 1 - If *initializer* equals False - 1. It waits for a request looking like this: {"type": "ping", "tx_power": tx_power_req} - 2. The function sends the answer string: {"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req} The rssi_req and snr_req items are there for checking the signal quality - 3. Start again with step 1 """ def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None): print(f"[LoRaPingPong] Starting Ping-Pong (initializer: {initializer})") lcd = None if lcd_connected: from PCF8574 import I2C_LCD from machine import I2C _i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000) lcd = I2C_LCD(_i2c, 0x27, 2, 16) lcd.move_to(0,0) lcd.putstr(f"LoRaPingPong [ ]") # infinite loop running the ping at first if initializer: as_initializer(lora, lcd, lcd_connected, interrupt_pin) else: as_responder(lora, lcd, lcd_connected, interrupt_pin) # initialize LoRa device_spi = SPI(baudrate = 10000000, polarity = 0, phase = 0, bits = 8, firstbit = SPI.MSB, id=0, sck = Pin(2, Pin.OUT, Pin.PULL_DOWN), mosi = Pin(3, Pin.OUT, Pin.PULL_UP), miso = Pin(4, Pin.IN, Pin.PULL_UP)) parameters = { 'frequency': 433E6, 'tx_power_level': 10, 'signal_bandwidth': 125E3, 'spreading_factor': 7, 'coding_rate': 5, 'preamble_length': 8, 'implicit_header': False, 'sync_word': 0x12, 'enable_CRC': False, 'invert_IQ': False, } lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 7}, parameters=parameters) # The run function (for different options; see below the "if __name__ == ..." section) def run_lcd_init(): pingpong(lora, initializer=True, lcd_connected=True, interrupt_pin=8) def run_wo_lcd_init(): pingpong(lora, initializer=True, lcd_connected=False, interrupt_pin=8) def run_lcd_resp(): pingpong(lora, initializer=False, lcd_connected=True, interrupt_pin=8) def run_wo_lcd_resp(): pingpong(lora, initializer=False, lcd_connected=False, interrupt_pin=8) if __name__ == "__main__": #run_lcd_resp() # run as responder (with lcd) #run_wo_lcd_resp() # run as responder (without lcd) run_lcd_init() # run as initializer (with lcd) #run_wo_lcd_init() # run as initializer (without lcd) print("Finished execution")