500 lines
15 KiB
Python
500 lines
15 KiB
Python
|
"""
|
||
|
Source: https://github.com/lemariva/uPyLoRaWAN/blob/master/sx127x.py
|
||
|
License: Apache-2.0
|
||
|
"""
|
||
|
|
||
|
|
||
|
from time import sleep
|
||
|
from machine import SPI, Pin
|
||
|
import gc
|
||
|
|
||
|
PA_OUTPUT_RFO_PIN = 0
|
||
|
PA_OUTPUT_PA_BOOST_PIN = 1
|
||
|
|
||
|
# registers
|
||
|
REG_FIFO = 0x00
|
||
|
REG_OP_MODE = 0x01
|
||
|
REG_FRF_MSB = 0x06
|
||
|
REG_FRF_MID = 0x07
|
||
|
REG_FRF_LSB = 0x08
|
||
|
REG_PA_CONFIG = 0x09
|
||
|
REG_LNA = 0x0c
|
||
|
REG_FIFO_ADDR_PTR = 0x0d
|
||
|
|
||
|
REG_FIFO_TX_BASE_ADDR = 0x0e
|
||
|
FifoTxBaseAddr = 0x00
|
||
|
# FifoTxBaseAddr = 0x80
|
||
|
|
||
|
REG_FIFO_RX_BASE_ADDR = 0x0f
|
||
|
FifoRxBaseAddr = 0x00
|
||
|
REG_FIFO_RX_CURRENT_ADDR = 0x10
|
||
|
REG_IRQ_FLAGS_MASK = 0x11
|
||
|
REG_IRQ_FLAGS = 0x12
|
||
|
REG_RX_NB_BYTES = 0x13
|
||
|
REG_PKT_RSSI_VALUE = 0x1a
|
||
|
REG_PKT_SNR_VALUE = 0x1b
|
||
|
REG_MODEM_CONFIG_1 = 0x1d
|
||
|
REG_MODEM_CONFIG_2 = 0x1e
|
||
|
REG_PREAMBLE_MSB = 0x20
|
||
|
REG_PREAMBLE_LSB = 0x21
|
||
|
REG_PAYLOAD_LENGTH = 0x22
|
||
|
REG_FIFO_RX_BYTE_ADDR = 0x25
|
||
|
REG_MODEM_CONFIG_3 = 0x26
|
||
|
REG_RSSI_WIDEBAND = 0x2c
|
||
|
REG_DETECTION_OPTIMIZE = 0x31
|
||
|
REG_DETECTION_THRESHOLD = 0x37
|
||
|
REG_SYNC_WORD = 0x39
|
||
|
REG_DIO_MAPPING_1 = 0x40
|
||
|
REG_VERSION = 0x42
|
||
|
|
||
|
# invert IQ
|
||
|
REG_INVERTIQ = 0x33
|
||
|
RFLR_INVERTIQ_RX_MASK = 0xBF
|
||
|
RFLR_INVERTIQ_RX_OFF = 0x00
|
||
|
RFLR_INVERTIQ_RX_ON = 0x40
|
||
|
RFLR_INVERTIQ_TX_MASK = 0xFE
|
||
|
RFLR_INVERTIQ_TX_OFF = 0x01
|
||
|
RFLR_INVERTIQ_TX_ON = 0x00
|
||
|
|
||
|
REG_INVERTIQ2 = 0x3B
|
||
|
RFLR_INVERTIQ2_ON = 0x19
|
||
|
RFLR_INVERTIQ2_OFF = 0x1D
|
||
|
|
||
|
# modes
|
||
|
MODE_LONG_RANGE_MODE = 0x80 # bit 7: 1 => LoRa mode
|
||
|
MODE_SLEEP = 0x00
|
||
|
MODE_STDBY = 0x01
|
||
|
MODE_TX = 0x03
|
||
|
MODE_RX_CONTINUOUS = 0x05
|
||
|
MODE_RX_SINGLE = 0x06
|
||
|
|
||
|
# PA config
|
||
|
PA_BOOST = 0x80
|
||
|
|
||
|
# IRQ masks
|
||
|
IRQ_TX_DONE_MASK = 0x08
|
||
|
IRQ_PAYLOAD_CRC_ERROR_MASK = 0x20
|
||
|
IRQ_RX_DONE_MASK = 0x40
|
||
|
IRQ_RX_TIME_OUT_MASK = 0x80
|
||
|
|
||
|
# Buffer size
|
||
|
MAX_PKT_LENGTH = 255
|
||
|
|
||
|
__DEBUG__ = True
|
||
|
|
||
|
class SX127x:
|
||
|
|
||
|
default_parameters = {
|
||
|
'frequency': 868E6,
|
||
|
'tx_power_level': 2,
|
||
|
'signal_bandwidth': 125E3,
|
||
|
'spreading_factor': 8,
|
||
|
'coding_rate': 5,
|
||
|
'preamble_length': 8,
|
||
|
'implicit_header': False,
|
||
|
'sync_word': 0x12,
|
||
|
'enable_CRC': False,
|
||
|
'invert_IQ': False,
|
||
|
}
|
||
|
|
||
|
def __init__(self,
|
||
|
spi,
|
||
|
pins,
|
||
|
parameters=default_parameters):
|
||
|
|
||
|
self._spi = spi
|
||
|
self._pins = pins
|
||
|
self._parameters = parameters
|
||
|
self._lock = False
|
||
|
|
||
|
# setting pins
|
||
|
if "dio_0" in self._pins:
|
||
|
self._pin_rx_done = Pin(self._pins["dio_0"], Pin.IN)
|
||
|
if "ss" in self._pins:
|
||
|
self._pin_ss = Pin(self._pins["ss"], Pin.OUT)
|
||
|
if "led" in self._pins:
|
||
|
self._led_status = Pin(self._pins["led"], Pin.OUT)
|
||
|
|
||
|
# check hardware version
|
||
|
init_try = True
|
||
|
re_try = 0
|
||
|
while init_try and re_try < 5:
|
||
|
version = self.read_register(REG_VERSION)
|
||
|
re_try = re_try + 1
|
||
|
if version != 0:
|
||
|
init_try = False
|
||
|
if version != 0x12:
|
||
|
raise Exception('Invalid version.')
|
||
|
|
||
|
if __DEBUG__:
|
||
|
print("SX version: {}".format(version))
|
||
|
|
||
|
# put in LoRa and sleep mode
|
||
|
self.sleep()
|
||
|
|
||
|
# config
|
||
|
self.set_frequency(self._parameters['frequency'])
|
||
|
self.set_signal_bandwidth(self._parameters['signal_bandwidth'])
|
||
|
|
||
|
# set LNA boost
|
||
|
self.write_register(REG_LNA, self.read_register(REG_LNA) | 0x03)
|
||
|
|
||
|
# set auto AGC
|
||
|
self.write_register(REG_MODEM_CONFIG_3, 0x04)
|
||
|
|
||
|
self.set_tx_power(self._parameters['tx_power_level'])
|
||
|
self._implicit_header_mode = None
|
||
|
self.implicit_header_mode(self._parameters['implicit_header'])
|
||
|
self.set_spreading_factor(self._parameters['spreading_factor'])
|
||
|
self.set_coding_rate(self._parameters['coding_rate'])
|
||
|
self.set_preamble_length(self._parameters['preamble_length'])
|
||
|
self.set_sync_word(self._parameters['sync_word'])
|
||
|
self.enable_CRC(self._parameters['enable_CRC'])
|
||
|
self.invert_IQ(self._parameters["invert_IQ"])
|
||
|
|
||
|
# set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset)
|
||
|
# self.write_register(REG_MODEM_CONFIG_3, self.read_register(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset
|
||
|
bw_parameter = self._parameters["signal_bandwidth"]
|
||
|
sf_parameter = self._parameters["spreading_factor"]
|
||
|
|
||
|
if 1000 / (bw_parameter / 2**sf_parameter) > 16:
|
||
|
self.write_register(
|
||
|
REG_MODEM_CONFIG_3,
|
||
|
self.read_register(REG_MODEM_CONFIG_3) | 0x08
|
||
|
)
|
||
|
|
||
|
# set base addresses
|
||
|
self.write_register(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
|
||
|
self.write_register(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
|
||
|
|
||
|
self.standby()
|
||
|
|
||
|
def begin_packet(self, implicit_header_mode = False):
|
||
|
self.standby()
|
||
|
self.implicit_header_mode(implicit_header_mode)
|
||
|
|
||
|
# reset FIFO address and paload length
|
||
|
self.write_register(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
|
||
|
self.write_register(REG_PAYLOAD_LENGTH, 0)
|
||
|
|
||
|
def end_packet(self):
|
||
|
# put in TX mode
|
||
|
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
|
||
|
|
||
|
# wait for TX done, standby automatically on TX_DONE
|
||
|
while self.read_register(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK == 0:
|
||
|
pass
|
||
|
|
||
|
# clear IRQ's
|
||
|
self.write_register(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
|
||
|
|
||
|
self.collect_garbage()
|
||
|
|
||
|
def write(self, buffer):
|
||
|
currentLength = self.read_register(REG_PAYLOAD_LENGTH)
|
||
|
size = len(buffer)
|
||
|
|
||
|
# check size
|
||
|
size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength))
|
||
|
|
||
|
# write data
|
||
|
for i in range(size):
|
||
|
self.write_register(REG_FIFO, buffer[i])
|
||
|
|
||
|
# update length
|
||
|
self.write_register(REG_PAYLOAD_LENGTH, currentLength + size)
|
||
|
return size
|
||
|
|
||
|
def set_lock(self, lock = False):
|
||
|
self._lock = lock
|
||
|
|
||
|
def println(self, msg, implicit_header = False):
|
||
|
self.set_lock(True) # wait until RX_Done, lock and begin writing.
|
||
|
|
||
|
self.begin_packet(implicit_header)
|
||
|
|
||
|
if isinstance(msg, str):
|
||
|
message = msg.encode()
|
||
|
|
||
|
self.write(message)
|
||
|
|
||
|
self.end_packet()
|
||
|
|
||
|
self.set_lock(False) # unlock when done writing
|
||
|
self.collect_garbage()
|
||
|
|
||
|
def get_irq_flags(self):
|
||
|
irq_flags = self.read_register(REG_IRQ_FLAGS)
|
||
|
self.write_register(REG_IRQ_FLAGS, irq_flags)
|
||
|
return irq_flags
|
||
|
|
||
|
def packet_rssi(self):
|
||
|
rssi = self.read_register(REG_PKT_RSSI_VALUE)
|
||
|
return (rssi - (164 if self._frequency < 868E6 else 157))
|
||
|
|
||
|
def packet_snr(self):
|
||
|
snr = self.read_register(REG_PKT_SNR_VALUE)
|
||
|
return snr * 0.25
|
||
|
|
||
|
def standby(self):
|
||
|
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
|
||
|
|
||
|
def sleep(self):
|
||
|
self.write_register(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
|
||
|
|
||
|
def set_tx_power(self, level, outputPin = PA_OUTPUT_PA_BOOST_PIN):
|
||
|
self._tx_power_level = level
|
||
|
|
||
|
if (outputPin == PA_OUTPUT_RFO_PIN):
|
||
|
# RFO
|
||
|
level = min(max(level, 0), 14)
|
||
|
self.write_register(REG_PA_CONFIG, 0x70 | level)
|
||
|
|
||
|
else:
|
||
|
# PA BOOST
|
||
|
level = min(max(level, 2), 17)
|
||
|
self.write_register(REG_PA_CONFIG, PA_BOOST | (level - 2))
|
||
|
|
||
|
def set_frequency(self, frequency):
|
||
|
self._frequency = frequency
|
||
|
|
||
|
freq_reg = int(int(int(frequency) << 19) / 32000000) & 0xFFFFFF
|
||
|
|
||
|
self.write_register(REG_FRF_MSB, (freq_reg & 0xFF0000) >> 16)
|
||
|
self.write_register(REG_FRF_MID, (freq_reg & 0xFF00) >> 8)
|
||
|
self.write_register(REG_FRF_LSB, (freq_reg & 0xFF))
|
||
|
|
||
|
def set_spreading_factor(self, sf):
|
||
|
sf = min(max(sf, 6), 12)
|
||
|
self.write_register(REG_DETECTION_OPTIMIZE, 0xc5 if sf == 6 else 0xc3)
|
||
|
self.write_register(REG_DETECTION_THRESHOLD, 0x0c if sf == 6 else 0x0a)
|
||
|
self.write_register(
|
||
|
REG_MODEM_CONFIG_2,
|
||
|
(self.read_register(REG_MODEM_CONFIG_2) & 0x0f) | ((sf << 4) & 0xf0)
|
||
|
)
|
||
|
|
||
|
def set_signal_bandwidth(self, sbw):
|
||
|
bins = (7.8E3, 10.4E3, 15.6E3, 20.8E3, 31.25E3, 41.7E3, 62.5E3, 125E3, 250E3)
|
||
|
|
||
|
bw = 9
|
||
|
|
||
|
if sbw < 10:
|
||
|
bw = sbw
|
||
|
else:
|
||
|
for i in range(len(bins)):
|
||
|
if sbw <= bins[i]:
|
||
|
bw = i
|
||
|
break
|
||
|
|
||
|
self.write_register(
|
||
|
REG_MODEM_CONFIG_1,
|
||
|
(self.read_register(REG_MODEM_CONFIG_1) & 0x0f) | (bw << 4)
|
||
|
)
|
||
|
|
||
|
def set_coding_rate(self, denominator):
|
||
|
denominator = min(max(denominator, 5), 8)
|
||
|
cr = denominator - 4
|
||
|
self.write_register(
|
||
|
REG_MODEM_CONFIG_1,
|
||
|
(self.read_register(REG_MODEM_CONFIG_1) & 0xf1) | (cr << 1)
|
||
|
)
|
||
|
|
||
|
def set_preamble_length(self, length):
|
||
|
self.write_register(REG_PREAMBLE_MSB, (length >> 8) & 0xff)
|
||
|
self.write_register(REG_PREAMBLE_LSB, (length >> 0) & 0xff)
|
||
|
|
||
|
def enable_CRC(self, enable_CRC = False):
|
||
|
modem_config_2 = self.read_register(REG_MODEM_CONFIG_2)
|
||
|
config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xfb
|
||
|
self.write_register(REG_MODEM_CONFIG_2, config)
|
||
|
|
||
|
def invert_IQ(self, invert_IQ):
|
||
|
self._parameters["invertIQ"] = invert_IQ
|
||
|
if invert_IQ:
|
||
|
self.write_register(
|
||
|
REG_INVERTIQ,
|
||
|
(
|
||
|
(
|
||
|
self.read_register(REG_INVERTIQ)
|
||
|
& RFLR_INVERTIQ_TX_MASK
|
||
|
& RFLR_INVERTIQ_RX_MASK
|
||
|
)
|
||
|
| RFLR_INVERTIQ_RX_ON
|
||
|
| RFLR_INVERTIQ_TX_ON
|
||
|
),
|
||
|
)
|
||
|
self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_ON)
|
||
|
else:
|
||
|
self.write_register(
|
||
|
REG_INVERTIQ,
|
||
|
(
|
||
|
(
|
||
|
self.read_register(REG_INVERTIQ)
|
||
|
& RFLR_INVERTIQ_TX_MASK
|
||
|
& RFLR_INVERTIQ_RX_MASK
|
||
|
)
|
||
|
| RFLR_INVERTIQ_RX_OFF
|
||
|
| RFLR_INVERTIQ_TX_OFF
|
||
|
),
|
||
|
)
|
||
|
self.write_register(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF)
|
||
|
|
||
|
def set_sync_word(self, sw):
|
||
|
self.write_register(REG_SYNC_WORD, sw)
|
||
|
|
||
|
def set_channel(self, parameters):
|
||
|
self.standby()
|
||
|
for key in parameters:
|
||
|
if key == "frequency":
|
||
|
self.set_frequency(parameters[key])
|
||
|
continue
|
||
|
if key == "invert_IQ":
|
||
|
self.invert_IQ(parameters[key])
|
||
|
continue
|
||
|
if key == "tx_power_level":
|
||
|
self.set_tx_power(parameters[key])
|
||
|
continue
|
||
|
|
||
|
def dump_registers(self):
|
||
|
for i in range(128):
|
||
|
print("0x{:02X}: {:02X}".format(i, self.read_register(i)), end="")
|
||
|
if (i + 1) % 4 == 0:
|
||
|
print()
|
||
|
else:
|
||
|
print(" | ", end="")
|
||
|
|
||
|
def implicit_header_mode(self, implicit_header_mode = False):
|
||
|
if self._implicit_header_mode != implicit_header_mode: # set value only if different.
|
||
|
self._implicit_header_mode = implicit_header_mode
|
||
|
modem_config_1 = self.read_register(REG_MODEM_CONFIG_1)
|
||
|
config = (modem_config_1 | 0x01
|
||
|
if implicit_header_mode else modem_config_1 & 0xfe)
|
||
|
self.write_register(REG_MODEM_CONFIG_1, config)
|
||
|
|
||
|
def receive(self, size = 0):
|
||
|
self.implicit_header_mode(size > 0)
|
||
|
if size > 0:
|
||
|
self.write_register(REG_PAYLOAD_LENGTH, size & 0xff)
|
||
|
|
||
|
# The last packet always starts at FIFO_RX_CURRENT_ADDR
|
||
|
# no need to reset FIFO_ADDR_PTR
|
||
|
self.write_register(
|
||
|
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS
|
||
|
)
|
||
|
|
||
|
def on_receive(self, callback):
|
||
|
self._on_receive = callback
|
||
|
|
||
|
if self._pin_rx_done:
|
||
|
if callback:
|
||
|
self.write_register(REG_DIO_MAPPING_1, 0x00)
|
||
|
self._pin_rx_done.irq(
|
||
|
trigger=Pin.IRQ_RISING, handler = self.handle_on_receive
|
||
|
)
|
||
|
else:
|
||
|
self._pin_rx_done.detach_irq()
|
||
|
|
||
|
def handle_on_receive(self, event_source):
|
||
|
self.set_lock(True) # lock until TX_Done
|
||
|
irq_flags = self.get_irq_flags()
|
||
|
|
||
|
if (irq_flags == IRQ_RX_DONE_MASK): # RX_DONE only, irq_flags should be 0x40
|
||
|
# automatically standby when RX_DONE
|
||
|
if self._on_receive:
|
||
|
payload = self.read_payload()
|
||
|
self._on_receive(self, payload)
|
||
|
|
||
|
elif self.read_register(REG_OP_MODE) != (
|
||
|
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
|
||
|
):
|
||
|
# no packet received.
|
||
|
# reset FIFO address / # enter single RX mode
|
||
|
self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
|
||
|
self.write_register(
|
||
|
REG_OP_MODE,
|
||
|
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
|
||
|
)
|
||
|
|
||
|
self.set_lock(False) # unlock in any case.
|
||
|
self.collect_garbage()
|
||
|
return True
|
||
|
|
||
|
def received_packet(self, size = 0):
|
||
|
irq_flags = self.get_irq_flags()
|
||
|
|
||
|
self.implicit_header_mode(size > 0)
|
||
|
if size > 0:
|
||
|
self.write_register(REG_PAYLOAD_LENGTH, size & 0xff)
|
||
|
|
||
|
# if (irq_flags & IRQ_RX_DONE_MASK) and \
|
||
|
# (irq_flags & IRQ_RX_TIME_OUT_MASK == 0) and \
|
||
|
# (irq_flags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0):
|
||
|
|
||
|
if (irq_flags == IRQ_RX_DONE_MASK):
|
||
|
# RX_DONE only, irq_flags should be 0x40
|
||
|
# automatically standby when RX_DONE
|
||
|
return True
|
||
|
|
||
|
elif self.read_register(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE):
|
||
|
# no packet received.
|
||
|
# reset FIFO address / # enter single RX mode
|
||
|
self.write_register(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
|
||
|
self.write_register(
|
||
|
REG_OP_MODE,
|
||
|
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
|
||
|
)
|
||
|
|
||
|
def read_payload(self):
|
||
|
# set FIFO address to current RX address
|
||
|
# fifo_rx_current_addr = self.read_register(REG_FIFO_RX_CURRENT_ADDR)
|
||
|
self.write_register(
|
||
|
REG_FIFO_ADDR_PTR,
|
||
|
self.read_register(REG_FIFO_RX_CURRENT_ADDR)
|
||
|
)
|
||
|
|
||
|
# read packet length
|
||
|
if self._implicit_header_mode:
|
||
|
packet_length = self.read_register(REG_PAYLOAD_LENGTH)
|
||
|
else:
|
||
|
packet_length = self.read_register(REG_RX_NB_BYTES)
|
||
|
|
||
|
payload = bytearray()
|
||
|
for i in range(packet_length):
|
||
|
payload.append(self.read_register(REG_FIFO))
|
||
|
|
||
|
self.collect_garbage()
|
||
|
return bytes(payload)
|
||
|
|
||
|
def read_register(self, address, byteorder = 'big', signed = False):
|
||
|
response = self.transfer(address & 0x7f)
|
||
|
return int.from_bytes(response, byteorder)
|
||
|
|
||
|
def write_register(self, address, value):
|
||
|
self.transfer(address | 0x80, value)
|
||
|
|
||
|
|
||
|
def transfer(self, address, value = 0x00):
|
||
|
response = bytearray(1)
|
||
|
|
||
|
self._pin_ss.value(0)
|
||
|
|
||
|
self._spi.write(bytes([address]))
|
||
|
self._spi.write_readinto(bytes([value]), response)
|
||
|
|
||
|
self._pin_ss.value(1)
|
||
|
|
||
|
return response
|
||
|
|
||
|
def blink_led(self, times = 1, on_seconds = 0.1, off_seconds = 0.1):
|
||
|
for i in range(times):
|
||
|
if self._led_status:
|
||
|
self._led_status.value(True)
|
||
|
sleep(on_seconds)
|
||
|
self._led_status.value(False)
|
||
|
sleep(off_seconds)
|
||
|
|
||
|
def collect_garbage(self):
|
||
|
gc.collect()
|
||
|
if __DEBUG__:
|
||
|
print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))
|