diff --git a/lora_pingpong.py b/lora_pingpong.py new file mode 100644 index 0000000..8a72e27 --- /dev/null +++ b/lora_pingpong.py @@ -0,0 +1,176 @@ +from SX127x import SX127x +from machine import Pin, SPI +from time import sleep, time_ns +from json import loads + + +def as_initializer(lora, lcd, lcd_connected, interrupt_pin): + ping_to_send = True + COUNT = 0 + TX_POWER_REQ = -1 + RSSI_REQ = -1 + SNR_REQ = -1 + + while True: + if ping_to_send: + COUNT += 1 + msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}' + if lcd_connected: + lcd.move_to(14,0) + lcd.putstr(">") + print(f"[LoRaPingPong] Sending ping: {msg}") + lora.println(msg) + ping_to_send = False + if not ping_to_send: + print("[LoRaPingPong] Now waiting for a response") + while not lora.received_packet(): + sleep(0.01) + # TODO: implement a timeout + # NOW got a response! + payload = lora.read_payload() + try: + payload_dict = loads(payload) + TX_POWER_REQ = payload_dict["tx_power"] + COUNT = payload_dict["count"] + RSSI_REQ = payload_dict["rssi_req"] + SNR_REQ = payload_dict["snr_req"] + except ValueError: + print(f"[LoRaPingPong] Got a bad response: {payload};\n[LoRaPingPong] Ignoring this one.") + continue # start the next loop cycle + except KeyError: + print(f"[LoRaPingPong] Some keys are missing in received payload: {payload}") + print("[LoRaPingPong] Ignoring this one.") + continue # start the next loop cycle + if lcd_connected: # print some info about the received signal on the lcd (if connected) + lcd.move_to(14,0) + lcd.putstr("<") + lcd.move_to(0,1) + lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) + lcd.move_to(0,0) + lcd.putstr(f"LoRaPingPong [<]") + print(f"[LoRaPingPong] Got a response: {payload}") + print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!") + sleep(10) # sleep 10 seconds + ping_to_send = True + + +def as_responder(lora, lcd, lcd_connected, interrupt_pin): + to_wait_for_ping = True + COUNT = 0 + TX_POWER_REQ = -1 + RSSI_REQ = -1 + SNR_REQ = -1 + + while True: + if to_wait_for_ping: # wait for ping + print("[LoRaPingPong] Waiting for a ping (request)") + while not lora.received_packet(): + sleep(0.01) + # TODO: implement a timeout + # NOW got a response! + payload = lora.read_payload() + try: + payload_dict = loads(payload) + TX_POWER_REQ = payload_dict["tx_power"] + COUNT = payload_dict["count"] + RSSI_REQ = lora.packet_rssi() + SNR_REQ = lora.packet_snr() + except ValueError: + print(f"[LoRaPingPong] Got a bad ping (request): {payload};\n[LoRaPingPong] Ignoring this one.") + continue # start the next loop cycle + if lcd_connected: # print some info about the received signal on the lcd (if connected) + lcd.move_to(14,0) + lcd.putstr("<") + lcd.move_to(0,1) + lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) + lcd.move_to(0,0) + lcd.putstr(f"LoRaPingPong [<]") + print(f"[LoRaPingPong] Got a ping (request): {payload}") + print(f"[LoRaPingPong] Now waiting some seconds to send pong! Wo-hoo!") + sleep(1) # sleep some seconds + to_wait_for_ping = False + else: # send pong (response) + COUNT += 1 + msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + ', "rssi_req": ' + str(RSSI_REQ) + ', "snr_req": ' + str(SNR_REQ) + '}' + if lcd_connected: + lcd.move_to(14,0) + lcd.putstr(">") + lcd.move_to(0,1) + lcd.putstr("Pong! Wo-hoo! ") + sleep(0.5) + lcd.move_to(0,1) + lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16)) + lcd.move_to(0,0) + lcd.putstr(f"LoRaPingPong [>]") + print(f"[LoRaPingPong] Sending pong (response): {msg}") + lora.println(msg) + to_wait_for_ping = True + + +""" +This function implements a simple LoRa ping-pong showcase. It runs on two +devices. Which one sends the "ping" - the first message - and which one +responds ("pong") - is set by the parameter "initializer". +The communication is as following: +- If *initializer* equals True + - 1. The function sends an ping string: + {"type": "ping", "count": counter, "tx_power": tx_power_req} + - 2. It waits a specific time for a response looking like this: + {"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req} + - 3. Start again with step 1 +- If *initializer* equals False + - 1. It waits for a request looking like this: + {"type": "ping", "tx_power": tx_power_req} + - 2. The function sends the answer string: + {"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req} + The rssi_req and snr_req items are there for checking the signal quality + - 3. Start again with step 1 +""" +def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None): + print(f"[LoRaPingPong] Starting Ping-Pong (initializer: {initializer})") + + lcd = None + + if lcd_connected: + from PCF8574 import I2C_LCD + from machine import I2C + + _i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000) + lcd = I2C_LCD(_i2c, 0x27, 2, 16) + + lcd.move_to(0,0) + lcd.putstr(f"LoRaPingPong [ ]Waiting for ping") + + # infinite loop running the ping at first + if initializer: + as_initializer(lora, lcd, lcd_connected, interrupt_pin) + else: + as_responder(lora, lcd, lcd_connected, interrupt_pin) + + +device_spi = SPI(baudrate = 10000000, + polarity = 0, phase = 0, bits = 8, firstbit = SPI.MSB, id=0, + sck = Pin(2, Pin.OUT, Pin.PULL_DOWN), + mosi = Pin(3, Pin.OUT, Pin.PULL_UP), + miso = Pin(4, Pin.IN, Pin.PULL_UP)) + + +parameters = { + 'frequency': 433E6, + 'tx_power_level': 10, + 'signal_bandwidth': 125E3, + 'spreading_factor': 7, + 'coding_rate': 5, + 'preamble_length': 8, + 'implicit_header': False, + 'sync_word': 0x12, + 'enable_CRC': False, + 'invert_IQ': False, + } + +def run(): + lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 27}, parameters=parameters) + pingpong(lora, initializer=False, lcd_connected=True) + +if __name__ == "__main__": + run()