diff options
Diffstat (limited to 'bootloader/ota-dfu-python/ble_legacy_dfu_controller.py')
-rw-r--r-- | bootloader/ota-dfu-python/ble_legacy_dfu_controller.py | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py b/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py new file mode 100644 index 00000000..325b4489 --- /dev/null +++ b/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py @@ -0,0 +1,291 @@ +import math +import pexpect +import time + +from array import array +from util import * + +from nrf_ble_dfu_controller import NrfBleDfuController + +verbose = False + +class Procedures: + START_DFU = 1 + INITIALIZE_DFU = 2 + RECEIVE_FIRMWARE_IMAGE = 3 + VALIDATE_FIRMWARE = 4 + ACTIVATE_IMAGE_AND_RESET = 5 + RESET_SYSTEM = 6 + REPORT_RECEIVED_IMAGE_SIZE = 7 + PRN_REQUEST = 8 + RESPONSE = 16 + PACKET_RECEIPT_NOTIFICATION = 17 + + string_map = { + START_DFU : "START_DFU", + INITIALIZE_DFU : "INITIALIZE_DFU", + RECEIVE_FIRMWARE_IMAGE : "RECEIVE_FIRMWARE_IMAGE", + VALIDATE_FIRMWARE : "VALIDATE_FIRMWARE", + ACTIVATE_IMAGE_AND_RESET : "ACTIVATE_IMAGE_AND_RESET", + RESET_SYSTEM : "RESET_SYSTEM", + REPORT_RECEIVED_IMAGE_SIZE : "REPORT_RECEIVED_IMAGE_SIZE", + PRN_REQUEST : "PACKET_RECEIPT_NOTIFICATION_REQUEST", + RESPONSE : "RESPONSE", + PACKET_RECEIPT_NOTIFICATION : "PACKET_RECEIPT_NOTIFICATION", + } + + @staticmethod + def to_string(proc): + return Procedures.string_map[proc] + + @staticmethod + def from_string(proc_str): + return int(proc_str, 16) + +class Responses: + SUCCESS = 1 + INVALID_STATE = 2 + NOT_SUPPORTED = 3 + DATA_SIZE_EXCEEDS_LIMITS = 4 + CRC_ERROR = 5 + OPERATION_FAILED = 6 + + string_map = { + SUCCESS : "SUCCESS", + INVALID_STATE : "INVALID_STATE", + NOT_SUPPORTED : "NOT_SUPPORTED", + DATA_SIZE_EXCEEDS_LIMITS : "DATA_SIZE_EXCEEDS_LIMITS", + CRC_ERROR : "CRC_ERROR", + OPERATION_FAILED : "OPERATION_FAILED", + } + + @staticmethod + def to_string(res): + return Responses.string_map[res] + + @staticmethod + def from_string(res_str): + return int(res_str, 16) + + +class BleDfuControllerLegacy(NrfBleDfuController): + # Class constants + UUID_CONTROL_POINT = "00001531-1212-efde-1523-785feabcd123" + UUID_PACKET = "00001532-1212-efde-1523-785feabcd123" + UUID_VERSION = "00001534-1212-efde-1523-785feabcd123" + + # Constructor inherited from abstract base class + + # -------------------------------------------------------------------------- + # Start the firmware update process + # -------------------------------------------------------------------------- + def start(self, verbose=False): + (_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) + (_, self.data_handle, _) = self._get_handles(self.UUID_PACKET) + + self.pkt_receipt_interval = 10 + + if verbose: + print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle)) + print('Packet handle: 0x%04x' % (self.data_handle)) + + # Subscribe to notifications from Control Point characteristic + if verbose: print("Enabling notifications") + self._enable_notifications(self.ctrlpt_cccd_handle) + + # Send 'START DFU' + Application Command + if verbose: print("Sending START_DFU") + self._dfu_send_command(Procedures.START_DFU, [0x04]) + + # Transmit binary image size + # Need to pad the byte array with eight zero bytes + # (because that's what the bootloader is expecting...) + hex_size_array_lsb = uint32_to_bytes_le(len(self.bin_array)) + zero_pad_array_le(hex_size_array_lsb, 8) + self._dfu_send_data(hex_size_array_lsb) + + # Wait for response to Image Size + print("Waiting for Image Size notification") + self._wait_and_parse_notify() + + # Send 'INIT DFU' + Init Packet Command + self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x00]) + + # Transmit the Init image (DAT). + self._dfu_send_init() + + # Send 'INIT DFU' + Init Packet Complete Command + self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x01]) + + print("Waiting for INIT DFU notification") + # Wait for INIT DFU notification (indicates flash erase completed) + self._wait_and_parse_notify() + + # Set the Packet Receipt Notification interval + if verbose: print("Setting pkt receipt notification interval") + prn = uint16_to_bytes_le(self.pkt_receipt_interval) + self._dfu_send_command(Procedures.PRN_REQUEST, prn) + + # Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state. + self._dfu_send_command(Procedures.RECEIVE_FIRMWARE_IMAGE) + + # Send bin_array contents as as series of packets (burst mode). + # Each segment is pkt_payload_size bytes long. + # For every pkt_receipt_interval sends, wait for notification. + segment_count = 0 + segment_total = int(math.ceil(self.image_size/float(self.pkt_payload_size))) + time_start = time.time() + last_send_time = time.time() + print("Begin DFU") + for i in range(0, self.image_size, self.pkt_payload_size): + segment = self.bin_array[i:i + self.pkt_payload_size] + self._dfu_send_data(segment) + segment_count += 1 + + # print "segment #{} of {}, dt = {}".format(segment_count, segment_total, time.time() - last_send_time) + # last_send_time = time.time() + + if (segment_count == segment_total): + print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) + + duration = time.time() - time_start + print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60))) + if verbose: print("segments sent: {}".format(segment_count)) + + print("Waiting for DFU complete notification") + # Wait for DFU complete notification + self._wait_and_parse_notify() + + elif (segment_count % self.pkt_receipt_interval) == 0: + (proc, res, pkts) = self._wait_and_parse_notify() + + # TODO: Check pkts == segment_count * pkt_payload_size + + if res != Responses.SUCCESS: + raise Exception("bad notification status: {}".format(Responses.to_string(res))) + + print_progress(pkts, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50) + + # Send Validate Command + self._dfu_send_command(Procedures.VALIDATE_FIRMWARE) + + print("Waiting for Firmware Validation notification") + # Wait for Firmware Validation notification + self._wait_and_parse_notify() + + # Wait a bit for copy on the peer to be finished + time.sleep(1) + + # Send Activate and Reset Command + print("Activate and reset") + self._dfu_send_command(Procedures.ACTIVATE_IMAGE_AND_RESET) + + # -------------------------------------------------------------------------- + # Check if the peripheral is running in bootloader (DFU) or application mode + # Returns True if the peripheral is in DFU mode + # -------------------------------------------------------------------------- + def check_DFU_mode(self): + if verbose: print("Checking DFU State...") + + cmd = 'char-read-uuid %s' % self.UUID_VERSION + + if verbose: print(cmd) + + self.ble_conn.sendline(cmd) + + # Skip two rows + try: + res = self.ble_conn.expect('handle:.*', timeout=10) + # res = self.ble_conn.expect('handle:', timeout=10) + except pexpect.TIMEOUT as e: + print("State timeout") + except: + pass + + return self.ble_conn.after.find(b'value: 08 00')!=-1 + + def switch_to_dfu_mode(self): + (_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT) + + # Enable notifications + cmd = 'char-write-req 0x%02x %02x' % (bl_cccd_handle, 1) + if verbose: print(cmd) + self.ble_conn.sendline(cmd) + + # Reset the board in DFU mode. After reset the board will be disconnected + cmd = 'char-write-req 0x%02x 0104' % (bl_value_handle) + if verbose: print(cmd) + self.ble_conn.sendline(cmd) + + time.sleep(0.5) + + #print "Send 'START DFU' + Application Command" + #self._dfu_state_set(0x0104) + + # Reconnect the board. + #ret = self.scan_and_connect() + #if verbose: print("Connected " + str(ret)) + + #return ret + return 1 + + + # -------------------------------------------------------------------------- + # Parse notification status results + # -------------------------------------------------------------------------- + def _dfu_parse_notify(self, notify): + if len(notify) < 3: + print("notify data length error") + return None + + if verbose: print(notify) + + dfu_notify_opcode = Procedures.from_string(notify[0]) + + if dfu_notify_opcode == Procedures.RESPONSE: + + dfu_procedure = Procedures.from_string(notify[1]) + dfu_response = Responses.from_string(notify[2]) + + procedure_str = Procedures.to_string(dfu_procedure) + response_str = Responses.to_string(dfu_response) + + if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, response_str)) + + return (dfu_procedure, dfu_response) + + if dfu_notify_opcode == Procedures.PACKET_RECEIPT_NOTIFICATION: + receipt = bytes_to_uint32_le(notify[1:5]) + return (dfu_notify_opcode, Responses.SUCCESS, receipt) + + # -------------------------------------------------------------------------- + # Wait for a notification and parse the response + # -------------------------------------------------------------------------- + def _wait_and_parse_notify(self): + if verbose: print("Waiting for notification") + notify = self._dfu_wait_for_notify() + + if notify is None: + raise Exception("No notification received") + + if verbose: print("Parsing notification") + + result = self._dfu_parse_notify(notify) + if result[1] != Responses.SUCCESS: + raise Exception("Error in {} procedure, reason: {}".format( + Procedures.to_string(result[0]), + Responses.to_string(result[1]))) + + return result + + #-------------------------------------------------------------------------- + # Send the Init info (*.dat file contents) to peripheral device. + #-------------------------------------------------------------------------- + def _dfu_send_init(self): + if verbose: print("dfu_send_init") + + # Open the DAT file and create array of its contents + init_bin_array = array('B', open(self.datfile_path, 'rb').read()) + + # Transmit Init info + self._dfu_send_data(init_bin_array) |