Added lora_pingpong.py

This commit is contained in:
BlueFox 2024-03-16 14:21:47 +01:00
parent e254f63f3d
commit 57ce838cf6

176
lora_pingpong.py Normal file
View File

@ -0,0 +1,176 @@
from SX127x import SX127x
from machine import Pin, SPI
from time import sleep, time_ns
from json import loads
def as_initializer(lora, lcd, lcd_connected, interrupt_pin):
ping_to_send = True
COUNT = 0
TX_POWER_REQ = -1
RSSI_REQ = -1
SNR_REQ = -1
while True:
if ping_to_send:
COUNT += 1
msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + '}'
if lcd_connected:
lcd.move_to(14,0)
lcd.putstr(">")
print(f"[LoRaPingPong] Sending ping: {msg}")
lora.println(msg)
ping_to_send = False
if not ping_to_send:
print("[LoRaPingPong] Now waiting for a response")
while not lora.received_packet():
sleep(0.01)
# TODO: implement a timeout
# NOW got a response!
payload = lora.read_payload()
try:
payload_dict = loads(payload)
TX_POWER_REQ = payload_dict["tx_power"]
COUNT = payload_dict["count"]
RSSI_REQ = payload_dict["rssi_req"]
SNR_REQ = payload_dict["snr_req"]
except ValueError:
print(f"[LoRaPingPong] Got a bad response: {payload};\n[LoRaPingPong] Ignoring this one.")
continue # start the next loop cycle
except KeyError:
print(f"[LoRaPingPong] Some keys are missing in received payload: {payload}")
print("[LoRaPingPong] Ignoring this one.")
continue # start the next loop cycle
if lcd_connected: # print some info about the received signal on the lcd (if connected)
lcd.move_to(14,0)
lcd.putstr("<")
lcd.move_to(0,1)
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
lcd.move_to(0,0)
lcd.putstr(f"LoRaPingPong [<]")
print(f"[LoRaPingPong] Got a response: {payload}")
print(f"[LoRaPingPong] Now waiting some time for next ping! Wo-hoo!")
sleep(10) # sleep 10 seconds
ping_to_send = True
def as_responder(lora, lcd, lcd_connected, interrupt_pin):
to_wait_for_ping = True
COUNT = 0
TX_POWER_REQ = -1
RSSI_REQ = -1
SNR_REQ = -1
while True:
if to_wait_for_ping: # wait for ping
print("[LoRaPingPong] Waiting for a ping (request)")
while not lora.received_packet():
sleep(0.01)
# TODO: implement a timeout
# NOW got a response!
payload = lora.read_payload()
try:
payload_dict = loads(payload)
TX_POWER_REQ = payload_dict["tx_power"]
COUNT = payload_dict["count"]
RSSI_REQ = lora.packet_rssi()
SNR_REQ = lora.packet_snr()
except ValueError:
print(f"[LoRaPingPong] Got a bad ping (request): {payload};\n[LoRaPingPong] Ignoring this one.")
continue # start the next loop cycle
if lcd_connected: # print some info about the received signal on the lcd (if connected)
lcd.move_to(14,0)
lcd.putstr("<")
lcd.move_to(0,1)
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
lcd.move_to(0,0)
lcd.putstr(f"LoRaPingPong [<]")
print(f"[LoRaPingPong] Got a ping (request): {payload}")
print(f"[LoRaPingPong] Now waiting some seconds to send pong! Wo-hoo!")
sleep(1) # sleep some seconds
to_wait_for_ping = False
else: # send pong (response)
COUNT += 1
msg = '{"type": "ping", "count": ' + str(COUNT) + ', "tx_power": ' + str(lora._tx_power_level) + ', "rssi_req": ' + str(RSSI_REQ) + ', "snr_req": ' + str(SNR_REQ) + '}'
if lcd_connected:
lcd.move_to(14,0)
lcd.putstr(">")
lcd.move_to(0,1)
lcd.putstr("Pong! Wo-hoo! ")
sleep(0.5)
lcd.move_to(0,1)
lcd.putstr(f"p{TX_POWER_REQ};n{COUNT};r{str(RSSI_REQ)[1:]};s{SNR_REQ}".center(16))
lcd.move_to(0,0)
lcd.putstr(f"LoRaPingPong [>]")
print(f"[LoRaPingPong] Sending pong (response): {msg}")
lora.println(msg)
to_wait_for_ping = True
"""
This function implements a simple LoRa ping-pong showcase. It runs on two
devices. Which one sends the "ping" - the first message - and which one
responds ("pong") - is set by the parameter "initializer".
The communication is as following:
- If *initializer* equals True
- 1. The function sends an ping string:
{"type": "ping", "count": counter, "tx_power": tx_power_req}
- 2. It waits a specific time for a response looking like this:
{"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req}
- 3. Start again with step 1
- If *initializer* equals False
- 1. It waits for a request looking like this:
{"type": "ping", "tx_power": tx_power_req}
- 2. The function sends the answer string:
{"type": "pong", "tx_power": tx_power_res, "rssi_req": rssi_req, "snr": snr_req}
The rssi_req and snr_req items are there for checking the signal quality
- 3. Start again with step 1
"""
def pingpong(lora, initializer: bool, lcd_connected=True, interrupt_pin=None):
print(f"[LoRaPingPong] Starting Ping-Pong (initializer: {initializer})")
lcd = None
if lcd_connected:
from PCF8574 import I2C_LCD
from machine import I2C
_i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=400000)
lcd = I2C_LCD(_i2c, 0x27, 2, 16)
lcd.move_to(0,0)
lcd.putstr(f"LoRaPingPong [ ]Waiting for ping")
# infinite loop running the ping at first
if initializer:
as_initializer(lora, lcd, lcd_connected, interrupt_pin)
else:
as_responder(lora, lcd, lcd_connected, interrupt_pin)
device_spi = SPI(baudrate = 10000000,
polarity = 0, phase = 0, bits = 8, firstbit = SPI.MSB, id=0,
sck = Pin(2, Pin.OUT, Pin.PULL_DOWN),
mosi = Pin(3, Pin.OUT, Pin.PULL_UP),
miso = Pin(4, Pin.IN, Pin.PULL_UP))
parameters = {
'frequency': 433E6,
'tx_power_level': 10,
'signal_bandwidth': 125E3,
'spreading_factor': 7,
'coding_rate': 5,
'preamble_length': 8,
'implicit_header': False,
'sync_word': 0x12,
'enable_CRC': False,
'invert_IQ': False,
}
def run():
lora = SX127x(device_spi, pins={"dio_0": 6, "ss": 5, "led": 27}, parameters=parameters)
pingpong(lora, initializer=False, lcd_connected=True)
if __name__ == "__main__":
run()