summaryrefslogtreecommitdiff
path: root/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'bootloader/ota-dfu-python/ble_legacy_dfu_controller.py')
-rw-r--r--bootloader/ota-dfu-python/ble_legacy_dfu_controller.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py b/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py
new file mode 100644
index 00000000..325b4489
--- /dev/null
+++ b/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py
@@ -0,0 +1,291 @@
+import math
+import pexpect
+import time
+
+from array import array
+from util import *
+
+from nrf_ble_dfu_controller import NrfBleDfuController
+
+verbose = False
+
+class Procedures:
+ START_DFU = 1
+ INITIALIZE_DFU = 2
+ RECEIVE_FIRMWARE_IMAGE = 3
+ VALIDATE_FIRMWARE = 4
+ ACTIVATE_IMAGE_AND_RESET = 5
+ RESET_SYSTEM = 6
+ REPORT_RECEIVED_IMAGE_SIZE = 7
+ PRN_REQUEST = 8
+ RESPONSE = 16
+ PACKET_RECEIPT_NOTIFICATION = 17
+
+ string_map = {
+ START_DFU : "START_DFU",
+ INITIALIZE_DFU : "INITIALIZE_DFU",
+ RECEIVE_FIRMWARE_IMAGE : "RECEIVE_FIRMWARE_IMAGE",
+ VALIDATE_FIRMWARE : "VALIDATE_FIRMWARE",
+ ACTIVATE_IMAGE_AND_RESET : "ACTIVATE_IMAGE_AND_RESET",
+ RESET_SYSTEM : "RESET_SYSTEM",
+ REPORT_RECEIVED_IMAGE_SIZE : "REPORT_RECEIVED_IMAGE_SIZE",
+ PRN_REQUEST : "PACKET_RECEIPT_NOTIFICATION_REQUEST",
+ RESPONSE : "RESPONSE",
+ PACKET_RECEIPT_NOTIFICATION : "PACKET_RECEIPT_NOTIFICATION",
+ }
+
+ @staticmethod
+ def to_string(proc):
+ return Procedures.string_map[proc]
+
+ @staticmethod
+ def from_string(proc_str):
+ return int(proc_str, 16)
+
+class Responses:
+ SUCCESS = 1
+ INVALID_STATE = 2
+ NOT_SUPPORTED = 3
+ DATA_SIZE_EXCEEDS_LIMITS = 4
+ CRC_ERROR = 5
+ OPERATION_FAILED = 6
+
+ string_map = {
+ SUCCESS : "SUCCESS",
+ INVALID_STATE : "INVALID_STATE",
+ NOT_SUPPORTED : "NOT_SUPPORTED",
+ DATA_SIZE_EXCEEDS_LIMITS : "DATA_SIZE_EXCEEDS_LIMITS",
+ CRC_ERROR : "CRC_ERROR",
+ OPERATION_FAILED : "OPERATION_FAILED",
+ }
+
+ @staticmethod
+ def to_string(res):
+ return Responses.string_map[res]
+
+ @staticmethod
+ def from_string(res_str):
+ return int(res_str, 16)
+
+
+class BleDfuControllerLegacy(NrfBleDfuController):
+ # Class constants
+ UUID_CONTROL_POINT = "00001531-1212-efde-1523-785feabcd123"
+ UUID_PACKET = "00001532-1212-efde-1523-785feabcd123"
+ UUID_VERSION = "00001534-1212-efde-1523-785feabcd123"
+
+ # Constructor inherited from abstract base class
+
+ # --------------------------------------------------------------------------
+ # Start the firmware update process
+ # --------------------------------------------------------------------------
+ def start(self, verbose=False):
+ (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
+ (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
+
+ self.pkt_receipt_interval = 10
+
+ if verbose:
+ print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
+ print('Packet handle: 0x%04x' % (self.data_handle))
+
+ # Subscribe to notifications from Control Point characteristic
+ if verbose: print("Enabling notifications")
+ self._enable_notifications(self.ctrlpt_cccd_handle)
+
+ # Send 'START DFU' + Application Command
+ if verbose: print("Sending START_DFU")
+ self._dfu_send_command(Procedures.START_DFU, [0x04])
+
+ # Transmit binary image size
+ # Need to pad the byte array with eight zero bytes
+ # (because that's what the bootloader is expecting...)
+ hex_size_array_lsb = uint32_to_bytes_le(len(self.bin_array))
+ zero_pad_array_le(hex_size_array_lsb, 8)
+ self._dfu_send_data(hex_size_array_lsb)
+
+ # Wait for response to Image Size
+ print("Waiting for Image Size notification")
+ self._wait_and_parse_notify()
+
+ # Send 'INIT DFU' + Init Packet Command
+ self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x00])
+
+ # Transmit the Init image (DAT).
+ self._dfu_send_init()
+
+ # Send 'INIT DFU' + Init Packet Complete Command
+ self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x01])
+
+ print("Waiting for INIT DFU notification")
+ # Wait for INIT DFU notification (indicates flash erase completed)
+ self._wait_and_parse_notify()
+
+ # Set the Packet Receipt Notification interval
+ if verbose: print("Setting pkt receipt notification interval")
+ prn = uint16_to_bytes_le(self.pkt_receipt_interval)
+ self._dfu_send_command(Procedures.PRN_REQUEST, prn)
+
+ # Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state.
+ self._dfu_send_command(Procedures.RECEIVE_FIRMWARE_IMAGE)
+
+ # Send bin_array contents as as series of packets (burst mode).
+ # Each segment is pkt_payload_size bytes long.
+ # For every pkt_receipt_interval sends, wait for notification.
+ segment_count = 0
+ segment_total = int(math.ceil(self.image_size/float(self.pkt_payload_size)))
+ time_start = time.time()
+ last_send_time = time.time()
+ print("Begin DFU")
+ for i in range(0, self.image_size, self.pkt_payload_size):
+ segment = self.bin_array[i:i + self.pkt_payload_size]
+ self._dfu_send_data(segment)
+ segment_count += 1
+
+ # print "segment #{} of {}, dt = {}".format(segment_count, segment_total, time.time() - last_send_time)
+ # last_send_time = time.time()
+
+ if (segment_count == segment_total):
+ print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
+
+ duration = time.time() - time_start
+ print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
+ if verbose: print("segments sent: {}".format(segment_count))
+
+ print("Waiting for DFU complete notification")
+ # Wait for DFU complete notification
+ self._wait_and_parse_notify()
+
+ elif (segment_count % self.pkt_receipt_interval) == 0:
+ (proc, res, pkts) = self._wait_and_parse_notify()
+
+ # TODO: Check pkts == segment_count * pkt_payload_size
+
+ if res != Responses.SUCCESS:
+ raise Exception("bad notification status: {}".format(Responses.to_string(res)))
+
+ print_progress(pkts, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
+
+ # Send Validate Command
+ self._dfu_send_command(Procedures.VALIDATE_FIRMWARE)
+
+ print("Waiting for Firmware Validation notification")
+ # Wait for Firmware Validation notification
+ self._wait_and_parse_notify()
+
+ # Wait a bit for copy on the peer to be finished
+ time.sleep(1)
+
+ # Send Activate and Reset Command
+ print("Activate and reset")
+ self._dfu_send_command(Procedures.ACTIVATE_IMAGE_AND_RESET)
+
+ # --------------------------------------------------------------------------
+ # Check if the peripheral is running in bootloader (DFU) or application mode
+ # Returns True if the peripheral is in DFU mode
+ # --------------------------------------------------------------------------
+ def check_DFU_mode(self):
+ if verbose: print("Checking DFU State...")
+
+ cmd = 'char-read-uuid %s' % self.UUID_VERSION
+
+ if verbose: print(cmd)
+
+ self.ble_conn.sendline(cmd)
+
+ # Skip two rows
+ try:
+ res = self.ble_conn.expect('handle:.*', timeout=10)
+ # res = self.ble_conn.expect('handle:', timeout=10)
+ except pexpect.TIMEOUT as e:
+ print("State timeout")
+ except:
+ pass
+
+ return self.ble_conn.after.find(b'value: 08 00')!=-1
+
+ def switch_to_dfu_mode(self):
+ (_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
+
+ # Enable notifications
+ cmd = 'char-write-req 0x%02x %02x' % (bl_cccd_handle, 1)
+ if verbose: print(cmd)
+ self.ble_conn.sendline(cmd)
+
+ # Reset the board in DFU mode. After reset the board will be disconnected
+ cmd = 'char-write-req 0x%02x 0104' % (bl_value_handle)
+ if verbose: print(cmd)
+ self.ble_conn.sendline(cmd)
+
+ time.sleep(0.5)
+
+ #print "Send 'START DFU' + Application Command"
+ #self._dfu_state_set(0x0104)
+
+ # Reconnect the board.
+ #ret = self.scan_and_connect()
+ #if verbose: print("Connected " + str(ret))
+
+ #return ret
+ return 1
+
+
+ # --------------------------------------------------------------------------
+ # Parse notification status results
+ # --------------------------------------------------------------------------
+ def _dfu_parse_notify(self, notify):
+ if len(notify) < 3:
+ print("notify data length error")
+ return None
+
+ if verbose: print(notify)
+
+ dfu_notify_opcode = Procedures.from_string(notify[0])
+
+ if dfu_notify_opcode == Procedures.RESPONSE:
+
+ dfu_procedure = Procedures.from_string(notify[1])
+ dfu_response = Responses.from_string(notify[2])
+
+ procedure_str = Procedures.to_string(dfu_procedure)
+ response_str = Responses.to_string(dfu_response)
+
+ if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, response_str))
+
+ return (dfu_procedure, dfu_response)
+
+ if dfu_notify_opcode == Procedures.PACKET_RECEIPT_NOTIFICATION:
+ receipt = bytes_to_uint32_le(notify[1:5])
+ return (dfu_notify_opcode, Responses.SUCCESS, receipt)
+
+ # --------------------------------------------------------------------------
+ # Wait for a notification and parse the response
+ # --------------------------------------------------------------------------
+ def _wait_and_parse_notify(self):
+ if verbose: print("Waiting for notification")
+ notify = self._dfu_wait_for_notify()
+
+ if notify is None:
+ raise Exception("No notification received")
+
+ if verbose: print("Parsing notification")
+
+ result = self._dfu_parse_notify(notify)
+ if result[1] != Responses.SUCCESS:
+ raise Exception("Error in {} procedure, reason: {}".format(
+ Procedures.to_string(result[0]),
+ Responses.to_string(result[1])))
+
+ return result
+
+ #--------------------------------------------------------------------------
+ # Send the Init info (*.dat file contents) to peripheral device.
+ #--------------------------------------------------------------------------
+ def _dfu_send_init(self):
+ if verbose: print("dfu_send_init")
+
+ # Open the DAT file and create array of its contents
+ init_bin_array = array('B', open(self.datfile_path, 'rb').read())
+
+ # Transmit Init info
+ self._dfu_send_data(init_bin_array)