2024-03-16 13:47:33 +00:00
|
|
|
"""
|
|
|
|
LoRa-Training / lora_pingpong - A simple lora pingpong program for testing reasons
|
|
|
|
Copyright (C) 2024 Benjamin Burkhardt
|
|
|
|
|
|
|
|
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
|
|
|
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
2024-03-16 13:21:47 +00:00
|
|
|
from SX127x import SX127x
|
|
|
|
from machine import Pin, SPI
|
2024-03-30 17:10:20 +00:00
|
|
|
from time import sleep, time
|
2024-03-16 13:21:47 +00:00
|
|
|
from json import loads
|
|
|
|
|
2024-03-30 17:10:20 +00:00
|
|
|
def blink_led(pin, sleep_time=0.2):
|
|
|
|
pin = Pin(pin, Pin.OUT)
|
|
|
|
|
|
|
|
pin.value(1)
|
|
|
|
sleep(sleep_time)
|
|
|
|
pin.value(0)
|
2024-03-16 13:21:47 +00:00
|
|
|
|
|
|
|
def as_initializer(lora, lcd, lcd_connected, interrupt_pin):
|
|
|
|
ping_to_send = True
|
|
|
|
COUNT = 0
|
|
|
|
TX_POWER_REQ = -1
|
|
|
|
RSSI_REQ = -1
|
|
|
|
SNR_REQ = -1
|
|
|
|
|
2024-03-30 17:10:20 +00:00
|
|
|
# pin definitions
|
|
|
|
interrupt = None
|
|
|
|
if interrupt_pin:
|
|
|
|
interrupt = Pin(interrupt_pin, Pin.OUT)
|
|
|
|
# the lambda only returns true true if a pin is specified and it is pulled high (btn pressed)
|
|
|
|
interrupted = lambda: interrupt.value() == 1 if interrupt else False
|
|
|
|
receive_led = 26
|
|
|
|
send_led = 27
|
|
|
|
|
|
|
|
while not interrupted():
|
2024-03-16 13:21:47 +00:00
|
|
|
if ping_to_send:
|
|
|
|
COUNT += 1
|
|
|
|
msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}'
|
|
|
|
if lcd_connected:
|
|
|
|
lcd.move_to(14,0)
|
|
|
|
lcd.putstr(">")
|
2024-03-30 17:10:20 +00:00
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr("Waiting for pong")
|
2024-03-16 13:21:47 +00:00
|
|
|
print(f"[LoRaPingPong] Sending ping: {msg}")
|
|
|
|
lora.println(msg)
|
2024-03-30 17:10:20 +00:00
|
|
|
blink_led(send_led)
|
2024-03-16 13:21:47 +00:00
|
|
|
ping_to_send = False
|
|
|
|
if not ping_to_send:
|
|
|
|
print("[LoRaPingPong] Now waiting for a response")
|
2024-03-30 17:25:47 +00:00
|
|
|
time_now = time()
|
2024-03-16 13:21:47 +00:00
|
|
|
while not lora.received_packet():
|
|
|
|
sleep(0.01)
|
2024-03-30 17:10:20 +00:00
|
|
|
if interrupted(): return # if interrupt btn is pressed, stop listening
|
2024-03-30 17:25:47 +00:00
|
|
|
if (time() - time_now) > 15:
|
|
|
|
# timeout if waiting too long for a response
|
|
|
|
# possible causes of triggering this could be e.g. lost connection
|
|
|
|
print("[LoRaPingPong] Timeout reached. Sending ping again.")
|
|
|
|
if lcd:
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr("Reached timeout!")
|
|
|
|
break
|
|
|
|
|
2024-03-16 13:21:47 +00:00
|
|
|
payload = lora.read_payload()
|
2024-03-30 17:25:47 +00:00
|
|
|
|
|
|
|
if not payload: # if empty because of timeout
|
|
|
|
ping_to_send = True
|
|
|
|
continue
|
|
|
|
|
2024-03-30 17:31:28 +00:00
|
|
|
# NOW got a response!
|
|
|
|
blink_led(receive_led)
|
|
|
|
|
2024-03-16 13:21:47 +00:00
|
|
|
try:
|
|
|
|
payload_dict = loads(payload)
|
|
|
|
TX_POWER_REQ = payload_dict["tx_power"]
|
|
|
|
COUNT = payload_dict["count"]
|
|
|
|
RSSI_REQ = payload_dict["rssi_req"]
|
|
|
|
SNR_REQ = payload_dict["snr_req"]
|
|
|
|
except ValueError:
|
|
|
|
print(f"[LoRaPingPong] Got a bad response: {payload};\n[LoRaPingPong] Ignoring this one.")
|
|
|
|
continue # start the next loop cycle
|
|
|
|
except KeyError:
|
|
|
|
print(f"[LoRaPingPong] Some keys are missing in received payload: {payload}")
|
|
|
|
print("[LoRaPingPong] Ignoring this one.")
|
|
|
|
continue # start the next loop cycle
|
|
|
|
if lcd_connected: # print some info about the received signal on the lcd (if connected)
|
|
|
|
lcd.move_to(14,0)
|
|
|
|
lcd.putstr("<")
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
|
|
|
|
lcd.move_to(0,0)
|
|
|
|
lcd.putstr(f"LoRaPingPong [<]")
|
|
|
|
print(f"[LoRaPingPong] Got a response: {payload}")
|
|
|
|
print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!")
|
2024-03-30 17:10:20 +00:00
|
|
|
# sleep 10 seconds
|
|
|
|
time_now = time()
|
|
|
|
while (time() - time_now) < 10:
|
|
|
|
if interrupted(): return # end ping pong when btn at interrupt_pin is pressed (given as param)
|
2024-03-16 13:21:47 +00:00
|
|
|
ping_to_send = True
|
|
|
|
|
|
|
|
|
|
|
|
def as_responder(lora, lcd, lcd_connected, interrupt_pin):
|
|
|
|
to_wait_for_ping = True
|
|
|
|
COUNT = 0
|
|
|
|
TX_POWER_REQ = -1
|
|
|
|
RSSI_REQ = -1
|
|
|
|
SNR_REQ = -1
|
|
|
|
|
2024-03-30 17:10:20 +00:00
|
|
|
# pin definitions
|
|
|
|
interrupt = None
|
|
|
|
if interrupt_pin:
|
|
|
|
interrupt = Pin(interrupt_pin, Pin.OUT)
|
|
|
|
# the lambda only returns true true if a pin is specified and it is pulled high (btn pressed)
|
|
|
|
interrupted = lambda: interrupt.value() == 1 if interrupt else False
|
|
|
|
receive_led = 26
|
|
|
|
send_led = 27
|
|
|
|
|
|
|
|
# print waiting for ping at startup (if lcd's connected)
|
|
|
|
if lcd:
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr("Waiting for ping")
|
|
|
|
|
|
|
|
while not interrupted(): # run as long the interrupt pin is low (btn not pressed)
|
2024-03-16 13:21:47 +00:00
|
|
|
if to_wait_for_ping: # wait for ping
|
|
|
|
print("[LoRaPingPong] Waiting for a ping (request)")
|
|
|
|
while not lora.received_packet():
|
|
|
|
sleep(0.01)
|
2024-03-30 17:10:20 +00:00
|
|
|
if interrupted(): return # if interrupt btn is pressed, stop listening
|
2024-03-16 13:21:47 +00:00
|
|
|
# NOW got a response!
|
2024-03-30 17:10:20 +00:00
|
|
|
blink_led(receive_led)
|
2024-03-16 13:21:47 +00:00
|
|
|
payload = lora.read_payload()
|
|
|
|
try:
|
|
|
|
payload_dict = loads(payload)
|
|
|
|
TX_POWER_REQ = payload_dict["tx_power"]
|
|
|
|
COUNT = payload_dict["count"]
|
|
|
|
RSSI_REQ = lora.packet_rssi()
|
|
|
|
SNR_REQ = lora.packet_snr()
|
|
|
|
except ValueError:
|
|
|
|
print(f"[LoRaPingPong] Got a bad ping (request): {payload};\n[LoRaPingPong] Ignoring this one.")
|
|
|
|
continue # start the next loop cycle
|
|
|
|
if lcd_connected: # print some info about the received signal on the lcd (if connected)
|
|
|
|
lcd.move_to(14,0)
|
|
|
|
lcd.putstr("<")
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
|
|
|
|
lcd.move_to(0,0)
|
|
|
|
lcd.putstr(f"LoRaPingPong [<]")
|
|
|
|
print(f"[LoRaPingPong] Got a ping (request): {payload}")
|
|
|
|
print(f"[LoRaPingPong] Now waiting some seconds to send pong! Wo-hoo!")
|
|
|
|
sleep(1) # sleep some seconds
|
|
|
|
to_wait_for_ping = False
|
|
|
|
else: # send pong (response)
|
|
|
|
COUNT += 1
|
|
|
|
msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + ', "rssi_req": ' + str(RSSI_REQ) + ', "snr_req": ' + str(SNR_REQ) + '}'
|
|
|
|
if lcd_connected:
|
|
|
|
lcd.move_to(14,0)
|
|
|
|
lcd.putstr(">")
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr("Pong! Wo-hoo! ")
|
|
|
|
sleep(0.5)
|
|
|
|
lcd.move_to(0,1)
|
|
|
|
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
|
|
|
|
lcd.move_to(0,0)
|
|
|
|
lcd.putstr(f"LoRaPingPong [>]")
|
|
|
|
print(f"[LoRaPingPong] Sending pong (response): {msg}")
|
|
|
|
lora.println(msg)
|
2024-03-30 17:10:20 +00:00
|
|
|
blink_led(send_led)
|
2024-03-16 13:21:47 +00:00
|
|
|
to_wait_for_ping = True
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
This function implements a simple LoRa ping-pong showcase. It runs on two
|
|
|
|
devices. Which one sends the "ping" - the first message - and which one
|
|
|
|
responds ("pong") - is set by the parameter "initializer".
|
|
|
|
The communication is as following:
|
|
|
|
- If *initializer* equals True
|
|
|
|
- 1. The function sends an ping string:
|
|
|
|
{"type": "ping", "count": counter, "tx_power": tx_power_req}
|
|
|
|
- 2. It waits a specific time for a response looking like this:
|
|
|
|
{"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req}
|
|
|
|
- 3. Start again with step 1
|
|
|
|
- If *initializer* equals False
|
|
|
|
- 1. It waits for a request looking like this:
|
|
|
|
{"type": "ping", "tx_power": tx_power_req}
|
|
|
|
- 2. The function sends the answer string:
|
|
|
|
{"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req}
|
|
|
|
The rssi_req and snr_req items are there for checking the signal quality
|
|
|
|
- 3. Start again with step 1
|
|
|
|
"""
|
|
|
|
def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None):
|
|
|
|
print(f"[LoRaPingPong] Starting Ping-Pong (initializer: {initializer})")
|
|
|
|
|
|
|
|
lcd = None
|
|
|
|
|
|
|
|
if lcd_connected:
|
|
|
|
from PCF8574 import I2C_LCD
|
|
|
|
from machine import I2C
|
|
|
|
|
|
|
|
_i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000)
|
|
|
|
lcd = I2C_LCD(_i2c, 0x27, 2, 16)
|
|
|
|
|
|
|
|
lcd.move_to(0,0)
|
2024-03-30 17:10:20 +00:00
|
|
|
lcd.putstr(f"LoRaPingPong [ ]")
|
2024-03-16 13:21:47 +00:00
|
|
|
|
|
|
|
# infinite loop running the ping at first
|
|
|
|
if initializer:
|
|
|
|
as_initializer(lora, lcd, lcd_connected, interrupt_pin)
|
|
|
|
else:
|
|
|
|
as_responder(lora, lcd, lcd_connected, interrupt_pin)
|
|
|
|
|
|
|
|
|
2024-03-16 13:37:21 +00:00
|
|
|
# initialize LoRa
|
|
|
|
|
2024-03-16 13:21:47 +00:00
|
|
|
device_spi = SPI(baudrate = 10000000,
|
|
|
|
polarity = 0, phase = 0, bits = 8, firstbit = SPI.MSB, id=0,
|
|
|
|
sck = Pin(2, Pin.OUT, Pin.PULL_DOWN),
|
|
|
|
mosi = Pin(3, Pin.OUT, Pin.PULL_UP),
|
|
|
|
miso = Pin(4, Pin.IN, Pin.PULL_UP))
|
|
|
|
|
|
|
|
|
|
|
|
parameters = {
|
|
|
|
'frequency': 433E6,
|
|
|
|
'tx_power_level': 10,
|
|
|
|
'signal_bandwidth': 125E3,
|
|
|
|
'spreading_factor': 7,
|
|
|
|
'coding_rate': 5,
|
|
|
|
'preamble_length': 8,
|
|
|
|
'implicit_header': False,
|
|
|
|
'sync_word': 0x12,
|
|
|
|
'enable_CRC': False,
|
|
|
|
'invert_IQ': False,
|
|
|
|
}
|
|
|
|
|
2024-03-30 17:10:20 +00:00
|
|
|
lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 7}, parameters=parameters)
|
2024-03-16 13:37:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
# The run function (for different options; see below the "if __name__ == ..." section)
|
|
|
|
|
|
|
|
def run_lcd_init():
|
2024-03-30 17:10:20 +00:00
|
|
|
pingpong(lora, initializer=True, lcd_connected=True, interrupt_pin=8)
|
2024-03-16 13:37:21 +00:00
|
|
|
def run_wo_lcd_init():
|
2024-03-30 17:10:20 +00:00
|
|
|
pingpong(lora, initializer=True, lcd_connected=False, interrupt_pin=8)
|
2024-03-16 13:37:21 +00:00
|
|
|
def run_lcd_resp():
|
2024-03-30 17:10:20 +00:00
|
|
|
pingpong(lora, initializer=False, lcd_connected=True, interrupt_pin=8)
|
2024-03-16 13:37:21 +00:00
|
|
|
def run_wo_lcd_resp():
|
2024-03-30 17:10:20 +00:00
|
|
|
pingpong(lora, initializer=False, lcd_connected=False, interrupt_pin=8)
|
2024-03-16 13:21:47 +00:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-03-16 13:37:21 +00:00
|
|
|
#run_lcd_resp() # run as responder (with lcd)
|
|
|
|
#run_wo_lcd_resp() # run as responder (without lcd)
|
2024-03-30 17:10:20 +00:00
|
|
|
run_lcd_init() # run as initializer (with lcd)
|
|
|
|
#run_wo_lcd_init() # run as initializer (without lcd)
|
|
|
|
|
|
|
|
print("Finished execution")
|