summaryrefslogtreecommitdiff
path: root/bootloader/ota-dfu-python/nrf_ble_dfu_controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'bootloader/ota-dfu-python/nrf_ble_dfu_controller.py')
-rw-r--r--bootloader/ota-dfu-python/nrf_ble_dfu_controller.py263
1 files changed, 263 insertions, 0 deletions
diff --git a/bootloader/ota-dfu-python/nrf_ble_dfu_controller.py b/bootloader/ota-dfu-python/nrf_ble_dfu_controller.py
new file mode 100644
index 00000000..85de15a6
--- /dev/null
+++ b/bootloader/ota-dfu-python/nrf_ble_dfu_controller.py
@@ -0,0 +1,263 @@
+import os
+import pexpect
+import re
+
+from abc import ABCMeta, abstractmethod
+from array import array
+from util import *
+
+verbose = False
+
+class NrfBleDfuController(object, metaclass=ABCMeta):
+ ctrlpt_handle = 0
+ ctrlpt_cccd_handle = 0
+ data_handle = 0
+
+ pkt_receipt_interval = 10
+ pkt_payload_size = 20
+
+ # --------------------------------------------------------------------------
+ # Start the firmware update process
+ # --------------------------------------------------------------------------
+ @abstractmethod
+ def start(self):
+ pass
+
+ # --------------------------------------------------------------------------
+ # Check if the peripheral is running in bootloader (DFU) or application mode
+ # Returns True if the peripheral is in DFU mode
+ # --------------------------------------------------------------------------
+ @abstractmethod
+ def check_DFU_mode(self):
+ pass
+
+ @abstractmethod
+ # --------------------------------------------------------------------------
+ # Switch from application to bootloader (DFU)
+ # --------------------------------------------------------------------------
+ def switch_to_dfu_mode(self):
+ pass
+
+ # --------------------------------------------------------------------------
+ # Parse notification status results
+ # --------------------------------------------------------------------------
+ @abstractmethod
+ def _dfu_parse_notify(self, notify):
+ pass
+
+ # --------------------------------------------------------------------------
+ # Wait for a notification and parse the response
+ # --------------------------------------------------------------------------
+ @abstractmethod
+ def _wait_and_parse_notify(self):
+ pass
+
+ def __init__(self, target_mac, firmware_path, datfile_path):
+ self.target_mac = target_mac
+
+ self.firmware_path = firmware_path
+ self.datfile_path = datfile_path
+
+ self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % target_mac)
+ self.ble_conn.delaybeforesend = 0
+
+ # --------------------------------------------------------------------------
+ # Start the firmware update process
+ # --------------------------------------------------------------------------
+ def start(self):
+ (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
+ (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
+
+ if verbose:
+ print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
+ print('Packet handle: 0x%04x' % (self.data_handle))
+
+ # Subscribe to notifications from Control Point characteristic
+ self._enable_notifications(self.ctrlpt_cccd_handle)
+
+ # Set the Packet Receipt Notification interval
+ prn = uint16_to_bytes_le(self.pkt_receipt_interval)
+ self._dfu_send_command(Procedures.SET_PRN, prn)
+
+ self._dfu_send_init()
+
+ self._dfu_send_image()
+
+ # --------------------------------------------------------------------------
+ # Initialize:
+ # Hex: read and convert hexfile into bin_array
+ # Bin: read binfile into bin_array
+ # --------------------------------------------------------------------------
+ def input_setup(self):
+ print("Sending file " + os.path.split(self.firmware_path)[1] + " to " + self.target_mac)
+
+ if self.firmware_path == None:
+ raise Exception("input invalid")
+
+ name, extent = os.path.splitext(self.firmware_path)
+
+ if extent == ".bin":
+ self.bin_array = array('B', open(self.firmware_path, 'rb').read())
+
+ self.image_size = len(self.bin_array)
+ print("Binary imge size: %d" % self.image_size)
+ print("Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)))
+
+ return
+
+ if extent == ".hex":
+ intelhex = IntelHex(self.firmware_path)
+ self.bin_array = intelhex.tobinarray()
+ self.image_size = len(self.bin_array)
+ print("bin array size: ", self.image_size)
+ return
+
+ raise Exception("input invalid")
+
+ # --------------------------------------------------------------------------
+ # Perform a scan and connect via gatttool.
+ # Will return True if a connection was established, False otherwise
+ # --------------------------------------------------------------------------
+ def scan_and_connect(self, timeout=2):
+ if verbose: print("scan_and_connect")
+
+ print("Connecting to %s" % (self.target_mac))
+
+ try:
+ self.ble_conn.expect('\[LE\]>', timeout=timeout)
+ except pexpect.TIMEOUT as e:
+ return False
+
+ self.ble_conn.sendline('connect')
+
+ try:
+ res = self.ble_conn.expect('.*Connection successful.*', timeout=timeout)
+ except pexpect.TIMEOUT as e:
+ return False
+
+ return True
+
+ # --------------------------------------------------------------------------
+ # Disconnect from the peripheral and close the gatttool connection
+ # --------------------------------------------------------------------------
+ def disconnect(self):
+ self.ble_conn.sendline('exit')
+ self.ble_conn.close()
+
+ def target_mac_increase(self, inc):
+ self.target_mac = uint_to_mac_string(mac_string_to_uint(self.target_mac) + inc)
+
+ # Re-start gatttool with the new address
+ self.disconnect()
+ self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % self.target_mac)
+ self.ble_conn.delaybeforesend = 0
+
+ # --------------------------------------------------------------------------
+ # Fetch handles for a given UUID.
+ # Will return a three-tuple: (char handle, value handle, CCCD handle)
+ # Will raise an exception if the UUID is not found
+ # --------------------------------------------------------------------------
+ def _get_handles(self, uuid):
+ self.ble_conn.before = ""
+ self.ble_conn.sendline('characteristics')
+
+ try:
+ self.ble_conn.expect([uuid], timeout=2)
+ handles = re.findall(b'.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before)
+ (handle, value_handle) = handles[-1]
+ except pexpect.TIMEOUT as e:
+ raise Exception("UUID not found: {}".format(uuid))
+
+ return (int(handle, 16), int(value_handle, 16), int(value_handle, 16)+1)
+
+ # --------------------------------------------------------------------------
+ # Wait for notification to arrive.
+ # Example format: "Notification handle = 0x0019 value: 10 01 01"
+ # --------------------------------------------------------------------------
+ def _dfu_wait_for_notify(self):
+ while True:
+ if verbose: print("dfu_wait_for_notify")
+
+ if not self.ble_conn.isalive():
+ print("connection not alive")
+ return None
+
+ try:
+ index = self.ble_conn.expect('Notification handle = .*? \r\n', timeout=30)
+
+ except pexpect.TIMEOUT:
+ #
+ # The gatttool does not report link-lost directly.
+ # The only way found to detect it is monitoring the prompt '[CON]'
+ # and if it goes to '[ ]' this indicates the connection has
+ # been broken.
+ # In order to get a updated prompt string, issue an empty
+ # sendline(''). If it contains the '[ ]' string, then
+ # raise an exception. Otherwise, if not a link-lost condition,
+ # continue to wait.
+ #
+ self.ble_conn.sendline('')
+ string = self.ble_conn.before
+ if '[ ]' in string:
+ print('Connection lost! ')
+ raise Exception('Connection Lost')
+ return None
+
+ if index == 0:
+ after = self.ble_conn.after
+ hxstr = after.split()[3:]
+ handle = int(float.fromhex(hxstr[0].decode('UTF-8')))
+ return hxstr[2:]
+
+ else:
+ print("unexpeced index: {0}".format(index))
+ return None
+
+ # --------------------------------------------------------------------------
+ # Send a procedure + any parameters required
+ # --------------------------------------------------------------------------
+ def _dfu_send_command(self, procedure, params=[]):
+ if verbose: print('_dfu_send_command')
+
+ cmd = 'char-write-req 0x%04x %02x' % (self.ctrlpt_handle, procedure)
+ cmd += array_to_hex_string(params)
+
+ if verbose: print(cmd)
+
+ self.ble_conn.sendline(cmd)
+
+ # Verify that command was successfully written
+ try:
+ res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
+ except pexpect.TIMEOUT as e:
+ print("State timeout")
+
+ # --------------------------------------------------------------------------
+ # Send an array of bytes
+ # --------------------------------------------------------------------------
+ def _dfu_send_data(self, data):
+ cmd = 'char-write-cmd 0x%04x' % (self.data_handle)
+ cmd += ' '
+ cmd += array_to_hex_string(data)
+
+ if verbose: print(cmd)
+
+ self.ble_conn.sendline(cmd)
+
+ # --------------------------------------------------------------------------
+ # Enable notifications from the Control Point Handle
+ # --------------------------------------------------------------------------
+ def _enable_notifications(self, cccd_handle):
+ if verbose: print('_enable_notifications')
+
+ cmd = 'char-write-req 0x%04x %s' % (cccd_handle, '0100')
+
+ if verbose: print(cmd)
+
+ self.ble_conn.sendline(cmd)
+
+ # Verify that command was successfully written
+ try:
+ res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
+ except pexpect.TIMEOUT as e:
+ print("State timeout")