diff options
Diffstat (limited to 'bootloader/ota-dfu-python/ble_secure_dfu_controller.py')
-rw-r--r-- | bootloader/ota-dfu-python/ble_secure_dfu_controller.py | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/bootloader/ota-dfu-python/ble_secure_dfu_controller.py b/bootloader/ota-dfu-python/ble_secure_dfu_controller.py new file mode 100644 index 00000000..7065458a --- /dev/null +++ b/bootloader/ota-dfu-python/ble_secure_dfu_controller.py @@ -0,0 +1,323 @@ +import math +import pexpect +import time + +from array import array +from util import * + +from nrf_ble_dfu_controller import NrfBleDfuController + +verbose = False + +class Procedures: + CREATE = 0x01 + SET_PRN = 0x02 + CALC_CHECKSUM = 0x03 + EXECUTE = 0x04 + SELECT = 0x06 + RESPONSE = 0x60 + + PARAM_COMMAND = 0x01 + PARAM_DATA = 0x02 + + string_map = { + CREATE : "CREATE", + SET_PRN : "SET_PRN", + CALC_CHECKSUM : "CALC_CHECKSUM", + EXECUTE : "EXECUTE", + SELECT : "SELECT", + RESPONSE : "RESPONSE", + } + + @staticmethod + def to_string(proc): + return Procedures.string_map[proc] + + @staticmethod + def from_string(proc_str): + return int(proc_str, 16) + +class Results: + INVALID_CODE = 0x00 + SUCCESS = 0x01 + OPCODE_NOT_SUPPORTED = 0x02 + INVALID_PARAMETER = 0x03 + INSUFF_RESOURCES = 0x04 + INVALID_OBJECT = 0x05 + UNSUPPORTED_TYPE = 0x07 + OPERATION_NOT_PERMITTED = 0x08 + OPERATION_FAILED = 0x0A + + string_map = { + INVALID_CODE : "INVALID_CODE", + SUCCESS : "SUCCESS", + OPCODE_NOT_SUPPORTED : "OPCODE_NOT_SUPPORTED", + INVALID_PARAMETER : "INVALID_PARAMETER", + INSUFF_RESOURCES : "INSUFFICIENT_RESOURCES", + INVALID_OBJECT : "INVALID_OBJECT", + UNSUPPORTED_TYPE : "UNSUPPORTED_TYPE", + OPERATION_NOT_PERMITTED : "OPERATION_NOT_PERMITTED", + OPERATION_FAILED : "OPERATION_FAILED", + } + + @staticmethod + def to_string(res): + return Results.string_map[res] + + @staticmethod + def from_string(res_str): + return int(res_str, 16) + + +class BleDfuControllerSecure(NrfBleDfuController): + # Class constants + UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50' + UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50' + UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50' + + # Constructor inherited from abstract base class + + # -------------------------------------------------------------------------- + # Start the firmware update process + # -------------------------------------------------------------------------- + def start(self): + (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) + (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET) + + if verbose: + print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle)) + print('Packet handle: 0x%04x' % (self.data_handle)) + + # Subscribe to notifications from Control Point characteristic + self._enable_notifications(self.ctrlpt_cccd_handle) + + # Set the Packet Receipt Notification interval + prn = uint16_to_bytes_le(self.pkt_receipt_interval) + self._dfu_send_command(Procedures.SET_PRN, prn) + + self._dfu_send_init() + + self._dfu_send_image() + + # -------------------------------------------------------------------------- + # Check if the peripheral is running in bootloader (DFU) or application mode + # Returns True if the peripheral is in DFU mode + # -------------------------------------------------------------------------- + def check_DFU_mode(self): + print("Checking DFU State...") + + self.ble_conn.sendline('characteristics') + + dfu_mode = False + + try: + self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2) + except pexpect.TIMEOUT as e: + dfu_mode = True + + return dfu_mode + + def switch_to_dfu_mode(self): + (_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS) + + self._enable_notifications(bl_cccd_handle) + + # Reset the board in DFU mode. After reset the board will be disconnected + cmd = 'char-write-req 0x%04x 01' % (bl_value_handle) + self.ble_conn.sendline(cmd) + + # Wait some time for board to reboot + time.sleep(0.5) + + # Increase the mac address by one and reconnect + self.target_mac_increase(1) + return self.scan_and_connect() + + # -------------------------------------------------------------------------- + # Parse notification status results + # -------------------------------------------------------------------------- + def _dfu_parse_notify(self, notify): + if len(notify) < 3: + print("notify data length error") + return None + + if verbose: print(notify) + + dfu_notify_opcode = Procedures.from_string(notify[0]) + if dfu_notify_opcode == Procedures.RESPONSE: + + dfu_procedure = Procedures.from_string(notify[1]) + dfu_result = Results.from_string(notify[2]) + + procedure_str = Procedures.to_string(dfu_procedure) + result_str = Results.to_string(dfu_result) + + # if verbose: print "opcode: {0}, proc: {1}, res: {2}".format(dfu_notify_opcode, procedure_str, result_str) + if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, result_str)) + + # Packet Receipt notifications are sent in the exact same format + # as responses to the CALC_CHECKSUM procedure. + if(dfu_procedure == Procedures.CALC_CHECKSUM and dfu_result == Results.SUCCESS): + offset = bytes_to_uint32_le(notify[3:7]) + crc32 = bytes_to_uint32_le(notify[7:11]) + + return (dfu_procedure, dfu_result, offset, crc32) + + elif(dfu_procedure == Procedures.SELECT and dfu_result == Results.SUCCESS): + max_size = bytes_to_uint32_le(notify[3:7]) + offset = bytes_to_uint32_le(notify[7:11]) + crc32 = bytes_to_uint32_le(notify[11:15]) + + return (dfu_procedure, dfu_result, max_size, offset, crc32) + + else: + return (dfu_procedure, dfu_result) + + # -------------------------------------------------------------------------- + # Wait for a notification and parse the response + # -------------------------------------------------------------------------- + def _wait_and_parse_notify(self): + if verbose: print("Waiting for notification") + notify = self._dfu_wait_for_notify() + + if notify is None: + raise Exception("No notification received") + + if verbose: print("Parsing notification") + + result = self._dfu_parse_notify(notify) + if result[1] != Results.SUCCESS: + raise Exception("Error in {} procedure, reason: {}".format( + Procedures.to_string(result[0]), + Results.to_string(result[1]))) + + return result + + # -------------------------------------------------------------------------- + # Send the Init info (*.dat file contents) to peripheral device. + # -------------------------------------------------------------------------- + def _dfu_send_init(self): + if verbose: print("dfu_send_init") + + # Open the DAT file and create array of its contents + init_bin_array = array('B', open(self.datfile_path, 'rb').read()) + init_size = len(init_bin_array) + init_crc = 0; + + # Select command + self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_COMMAND]); + (proc, res, max_size, offset, crc32) = self._wait_and_parse_notify() + + if offset != init_size or crc32 != init_crc: + if offset == 0 or offset > init_size: + # Create command + self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_COMMAND] + uint32_to_bytes_le(init_size)) + res = self._wait_and_parse_notify() + + segment_count = 0 + segment_total = int(math.ceil(init_size/float(self.pkt_payload_size))) + + for i in range(0, init_size, self.pkt_payload_size): + segment = init_bin_array[i:i + self.pkt_payload_size] + self._dfu_send_data(segment) + segment_count += 1 + + if (segment_count % self.pkt_receipt_interval) == 0: + (proc, res, offset, crc32) = self._wait_and_parse_notify() + + if res != Results.SUCCESS: + raise Exception("bad notification status: {}".format(Results.to_string(res))) + + # Calculate CRC + self._dfu_send_command(Procedures.CALC_CHECKSUM) + self._wait_and_parse_notify() + + # Execute command + self._dfu_send_command(Procedures.EXECUTE) + self._wait_and_parse_notify() + + print("Init packet successfully transfered") + + # -------------------------------------------------------------------------- + # Send the Firmware image to peripheral device. + # -------------------------------------------------------------------------- + def _dfu_send_image(self): + if verbose: print("dfu_send_image") + + # Select Data Object + self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_DATA]) + (proc, res, max_size, offset, crc32) = self._wait_and_parse_notify() + + # Split the firmware into multiple objects + num_objects = int(math.ceil(self.image_size / float(max_size))) + print("Max object size: %d, num objects: %d, offset: %d, total size: %d" % (max_size, num_objects, offset, self.image_size)) + + time_start = time.time() + last_send_time = time.time() + + obj_offset = (offset/max_size)*max_size + while(obj_offset < self.image_size): + # print "\nSending object {} of {}".format(obj_offset/max_size+1, num_objects) + obj_offset += self._dfu_send_object(obj_offset, max_size) + + # Image uploaded successfully, update the progress bar + print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) + + duration = time.time() - time_start + print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60))) + + # -------------------------------------------------------------------------- + # Send a single data object of given size and offset. + # -------------------------------------------------------------------------- + def _dfu_send_object(self, offset, obj_max_size): + if offset != self.image_size: + if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]): + # Create Data Object + size = min(obj_max_size, self.image_size - offset) + self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size)) + self._wait_and_parse_notify() + + segment_count = 0 + segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size))) + + segment_begin = offset + segment_end = min(offset+obj_max_size, self.image_size) + + for i in range(segment_begin, segment_end, self.pkt_payload_size): + num_bytes = min(self.pkt_payload_size, segment_end - i) + segment = self.bin_array[i:i + num_bytes] + self._dfu_send_data(segment) + segment_count += 1 + + # print "j: {} i: {}, end: {}, bytes: {}, size: {} segment #{} of {}".format( + # offset, i, segment_end, num_bytes, self.image_size, segment_count, segment_total) + + if (segment_count % self.pkt_receipt_interval) == 0: + try: + (proc, res, offset, crc32) = self._wait_and_parse_notify() + except e: + # Likely no notification received, need to re-transmit object + return 0 + + if res != Results.SUCCESS: + raise Exception("bad notification status: {}".format(Results.to_string(res))) + + if crc32 != crc32_unsigned(self.bin_array[0:offset]): + # Something went wrong, need to re-transmit this object + return 0 + + print_progress(offset, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) + + # Calculate CRC + self._dfu_send_command(Procedures.CALC_CHECKSUM) + (proc, res, offset, crc32) = self._wait_and_parse_notify() + if(crc32 != crc32_unsigned(self.bin_array[0:offset])): + # Need to re-transmit object + return 0 + + # Execute command + self._dfu_send_command(Procedures.EXECUTE) + self._wait_and_parse_notify() + + # If everything executed correctly, return amount of bytes transfered + return obj_max_size |