add external call API( for kflash_gui repo)

pull/28/head
Neucrack 2019-05-10 17:14:24 +08:00
parent 8705777490
commit 9305ac7306
1 changed files with 291 additions and 190 deletions

481
kflash.py
View File

@ -17,9 +17,31 @@ import json
import re
import os
class KFlash:
def process(self):
def tuple2str(t):
ret = ""
for i in t:
ret += i+" "
return ret
g_print_callback = None
def printf(*args, end="\n"):
global g_print_callback
if g_print_callback:
g_print_callback(*args, end=end)
else:
print(*args, end = end)
class KFlash:
def __init__(self, print_callback = None):
self.killProcess = False
self.loader = None
global g_print_callback
g_print_callback = print_callback
def process(self, terminal=True, dev="", baudrate=1500000, board="DEFAULT", sram = False, file="", callback=None, noansi=False):
self.killProcess = False
BASH_TIPS = dict(NORMAL='\033[0m',BOLD='\033[1m',DIM='\033[2m',UNDERLINE='\033[4m',
DEFAULT='\033[0m', RED='\033[31m', YELLOW='\033[33m', GREEN='\033[32m',
BG_DEFAULT='\033[49m', BG_WHITE='\033[107m')
@ -40,15 +62,16 @@ class KFlash:
try:
from enum import Enum
except ImportError:
print(ERROR_MSG,'enum34 must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install enum34`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'enum34 must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install enum34`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
try:
import serial
import serial.tools.list_ports
except ImportError:
print(ERROR_MSG,'PySerial must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install pyserial`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'PySerial must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install pyserial`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
class TimeoutError(Exception): pass
@ -269,7 +292,7 @@ class KFlash:
ISP_PROG = binascii.unhexlify(ISP_PROG)
ISP_PROG = zlib.decompress(ISP_PROG)
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '='):
def printProgressBar (iteration, total, prefix = '', suffix = '', filename = '', decimals = 1, length = 100, fill = '='):
"""
Call in a loop to create terminal progress bar
@params:
@ -284,10 +307,17 @@ class KFlash:
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = '\r')
printf('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = '\r')
# Print New Line on Complete
if iteration == total:
print()
printf()
if callback:
fileTypeStr = filename
if prefix == "Downloading ISP:":
fileTypeStr = "ISP"
elif prefix == "Programming BIN:" and fileTypeStr == "":
fileTypeStr = "BIN"
callback(fileTypeStr, iteration, total, suffix)
def slip_reader(port):
partial_packet = None
@ -360,7 +390,7 @@ class KFlash:
if ISPResponse.ISPOperation(op) == ISPResponse.ISPOperation.ISP_DEBUG_INFO:
text = data[2:].decode()
except ValueError:
print('Warning: recv unknown op', op)
printf('Warning: recv unknown op', op)
return (op, reason, text)
@ -497,7 +527,7 @@ class KFlash:
class MAIXLoader:
def change_baudrate(self, baudrate):
print(INFO_MSG,"Selected Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Selected Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
out = struct.pack('III', 0, 4, baudrate)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd6, 0x00) + crc32_checksum + out
@ -507,7 +537,7 @@ class KFlash:
if args.Board == "goE":
if baudrate >= 4500000:
# OPENEC super baudrate
print(INFO_MSG, "Enable OPENEC super baudrate!!!", BASH_TIPS['DEFAULT'])
printf(INFO_MSG, "Enable OPENEC super baudrate!!!", BASH_TIPS['DEFAULT'])
if baudrate == 4500000:
self._port.baudrate = 300
if baudrate == 6000000:
@ -523,9 +553,9 @@ class KFlash:
# rgwan <dv.xw@qq.com>
baudrate = 1500000
if args.Board == "goE" or args.Board == "trainer":
print(INFO_MSG,"Selected Stage0 Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Selected Stage0 Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
# This is for openec, contained ft2232, goE and trainer
print(INFO_MSG,"FT2232 mode", BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"FT2232 mode", BASH_TIPS['DEFAULT'])
baudrate_stage0 = int(baudrate * 38.6 / 38)
out = struct.pack('III', 0, 4, baudrate_stage0)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
@ -536,23 +566,25 @@ class KFlash:
retry_count = 0
while 1:
self.checkKillExit()
retry_count = retry_count + 1
if retry_count > 3:
print("\n" + ERROR_MSG,'Fast mode failed, please use slow mode by add parameter ' + BASH_TIPS['GREEN'] + '--Slow', BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'Fast mode failed, please use slow mode by add parameter ' + BASH_TIPS['GREEN'] + '--Slow', BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
try:
loader.greeting()
self.greeting()
break
except TimeoutError:
pass
elif args.Board == "dan" or args.Board == "bit" or args.Board == "kd233":
print(INFO_MSG,"CH340 mode", BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"CH340 mode", BASH_TIPS['DEFAULT'])
# This is for CH340, contained dan, bit and kd233
baudrate_stage0 = int(baudrate * 38.4 / 38)
# CH340 can not use this method, test failed, take risks at your own risk
else:
# This is for unknown board
print(WARN_MSG,"Unknown mode", BASH_TIPS['DEFAULT'])
printf(WARN_MSG,"Unknown mode", BASH_TIPS['DEFAULT'])
def __init__(self, port='/dev/ttyUSB1', baudrate=115200):
# configure the serial connections (the parameters differs on the device you are connecting to)
@ -564,10 +596,11 @@ class KFlash:
bytesize=serial.EIGHTBITS,
timeout=0.1
)
print(INFO_MSG, "Default baudrate is", baudrate, ", later it may be changed to the value you set.", BASH_TIPS['DEFAULT'])
printf(INFO_MSG, "Default baudrate is", baudrate, ", later it may be changed to the value you set.", BASH_TIPS['DEFAULT'])
self._port.isOpen()
self._slip_reader = slip_reader(self._port)
self._kill_process = False
""" Read a SLIP packet from the serial port """
@ -580,7 +613,7 @@ class KFlash:
buf = b'\xc0' \
+ (packet.replace(b'\xdb', b'\xdb\xdd').replace(b'\xc0', b'\xdb\xdc')) \
+ b'\xc0'
#print('[WRITE]', binascii.hexlify(buf))
#printf('[WRITE]', binascii.hexlify(buf))
return self._port.write(buf)
def read_loop(self):
@ -588,7 +621,7 @@ class KFlash:
# while self._port.inWaiting() > 0:
# out += self._port.read(1)
# print(out)
# printf(out)
while 1:
sys.stdout.write('[RECV] raw data: ')
sys.stdout.write(binascii.hexlify(self._port.read(1)).decode())
@ -639,12 +672,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- RESET to LOW, IO16 to HIGH --')
#printf('-- RESET to LOW, IO16 to HIGH --')
# Pull reset down and keep 10ms
self._port.setDTR (True)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- IO16 to LOW, RESET to HIGH --')
#printf('-- IO16 to LOW, RESET to HIGH --')
# Pull IO16 to low and release reset
self._port.setRTS (True)
self._port.setDTR (False)
@ -653,12 +686,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- RESET to LOW --')
#printf('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setDTR (True)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- RESET to HIGH, BOOT --')
#printf('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -669,12 +702,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- RESET to LOW, IO16 to HIGH --')
#printf('-- RESET to LOW, IO16 to HIGH --')
# Pull reset down and keep 10ms
self._port.setDTR (False)
self._port.setRTS (True)
time.sleep(0.1)
#print('-- IO16 to LOW, RESET to HIGH --')
#printf('-- IO16 to LOW, RESET to HIGH --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (True)
@ -683,12 +716,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#print('-- RESET to LOW --')
#printf('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setDTR (False)
self._port.setRTS (True)
time.sleep(0.1)
#print('-- RESET to HIGH, BOOT --')
#printf('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -699,12 +732,12 @@ class KFlash:
self._port.setDTR (True) ## output 0
self._port.setRTS (True)
time.sleep(0.01)
#print('-- RESET to LOW --')
#printf('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#print('-- RESET to HIGH, BOOT --')
#printf('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (True)
@ -713,12 +746,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.01)
#print('-- RESET to LOW --')
#printf('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#print('-- RESET to HIGH, BOOT --')
#printf('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (True)
self._port.setDTR (True)
@ -729,12 +762,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.01)
#print('-- RESET to LOW --')
#printf('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#print('-- RESET to HIGH, BOOT --')
#printf('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -744,54 +777,59 @@ class KFlash:
self._port.write(b'\xc0\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc0')
op, reason, text = ISPResponse.parse(self.recv_one_return())
#print('MAIX return op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
#printf('MAIX return op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
def flash_greeting(self):
retry_count = 0
while 1:
self.checkKillExit()
self._port.write(b'\xc0\xd2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc0')
retry_count = retry_count + 1
try:
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
except IndexError:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except TimeoutError:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
# print('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if FlashModeResponse.Operation(op) == FlashModeResponse.Operation.ISP_NOP and FlashModeResponse.ErrorCode(reason) == FlashModeResponse.ErrorCode.ISP_RET_OK:
print(INFO_MSG,"Boot to Flashmode Successfully",BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Boot to Flashmode Successfully",BASH_TIPS['DEFAULT'])
self._port.flushInput()
self._port.flushOutput()
break
else:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
def boot(self, address=0x80000000):
print(INFO_MSG,"Booting From " + hex(address),BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Booting From " + hex(address),BASH_TIPS['DEFAULT'])
out = struct.pack('II', address, 0)
@ -802,94 +840,101 @@ class KFlash:
def recv_debug(self):
op, reason, text = ISPResponse.parse(self.recv_one_return())
#print('[RECV] op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
#printf('[RECV] op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
if text:
print('-' * 30)
print(text)
print('-' * 30)
printf('-' * 30)
printf(text)
printf('-' * 30)
if ISPResponse.ErrorCode(reason) not in (ISPResponse.ErrorCode.ISP_RET_DEFAULT, ISPResponse.ErrorCode.ISP_RET_OK):
print('Failed, retry, errcode=', hex(reason))
printf('Failed, retry, errcode=', hex(reason))
return False
return True
def flash_recv_debug(self):
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
#print('[Flash-RECV] op:', FlashModeResponse.Operation(op).name, 'reason:',
#printf('[Flash-RECV] op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if text:
print('-' * 30)
print(text)
print('-' * 30)
printf('-' * 30)
printf(text)
printf('-' * 30)
if FlashModeResponse.ErrorCode(reason) not in (FlashModeResponse.ErrorCode.ISP_RET_OK, FlashModeResponse.ErrorCode.ISP_RET_OK):
print('Failed, retry')
printf('Failed, retry')
return False
return True
def init_flash(self, chip_type):
chip_type = int(chip_type)
print(INFO_MSG,"Selected Flash: ",("In-Chip", "On-Board")[chip_type],BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Selected Flash: ",("In-Chip", "On-Board")[chip_type],BASH_TIPS['DEFAULT'])
out = struct.pack('II', chip_type, 0)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd7, 0x00) + crc32_checksum + out
'''Retry when it have error'''
retry_count = 0
while 1:
self.checkKillExit()
sent = self.write(out)
retry_count = retry_count + 1
try:
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
except IndexError:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except TimeoutError:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
# print('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if FlashModeResponse.Operation(op) == FlashModeResponse.Operation.FLASHMODE_FLASH_INIT and FlashModeResponse.ErrorCode(reason) == FlashModeResponse.ErrorCode.ISP_RET_OK:
print(INFO_MSG,"Initialization flash Successfully",BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Initialization flash Successfully",BASH_TIPS['DEFAULT'])
break
else:
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
sys.exit(1)
print(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
def flash_dataframe(self, data, address=0x80000000):
DATAFRAME_SIZE = 1024
data_chunks = chunks(data, DATAFRAME_SIZE)
#print('[DEBUG] flash dataframe | data length:', len(data))
#printf('[DEBUG] flash dataframe | data length:', len(data))
total_chunk = math.ceil(len(data)/DATAFRAME_SIZE)
time_start = time.time()
for n, chunk in enumerate(data_chunks):
self.checkKillExit()
while 1:
#print('[INFO] sending chunk', i, '@address', hex(address), 'chunklen', len(chunk))
self.checkKillExit()
#printf('[INFO] sending chunk', i, '@address', hex(address), 'chunklen', len(chunk))
out = struct.pack('II', address, len(chunk))
crc32_checksum = struct.pack('I', binascii.crc32(out + chunk) & 0xFFFFFFFF)
out = struct.pack('HH', 0xc3, 0x00) + crc32_checksum + out + chunk # op: ISP_MEMORY_WRITE: 0xc3
sent = self.write(out)
#print('[INFO]', 'sent', sent, 'bytes', 'checksum', binascii.hexlify(crc32_checksum).decode())
#printf('[INFO]', 'sent', sent, 'bytes', 'checksum', binascii.hexlify(crc32_checksum).decode())
address += len(chunk)
@ -916,29 +961,30 @@ class KFlash:
DATAFRAME_SIZE = ISP_FLASH_DATA_FRAME_SIZE
data_chunks = chunks(data, DATAFRAME_SIZE)
#print('[DEBUG] flash dataframe | data length:', len(data))
#printf('[DEBUG] flash dataframe | data length:', len(data))
for n, chunk in enumerate(data_chunks):
#print('[INFO] sending chunk', i, '@address', hex(address))
#printf('[INFO] sending chunk', i, '@address', hex(address))
out = struct.pack('II', address, len(chunk))
crc32_checksum = struct.pack('I', binascii.crc32(out + chunk) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd4, 0x00) + crc32_checksum + out + chunk
#print("[$$$$]", binascii.hexlify(out[:32]).decode())
#printf("[$$$$]", binascii.hexlify(out[:32]).decode())
retry_count = 0
while True:
try:
sent = self.write(out)
#print('[INFO]', 'sent', sent, 'bytes', 'checksum', crc32_checksum)
#printf('[INFO]', 'sent', sent, 'bytes', 'checksum', crc32_checksum)
self.flash_recv_debug()
except:
retry_count = retry_count + 1
if retry_count > MAX_RETRY_TIMES:
print(ERROR_MSG,"Error Count Exceeded, Stop Trying",BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,"Error Count Exceeded, Stop Trying",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
continue
break
address += len(chunk)
@ -946,10 +992,10 @@ class KFlash:
def flash_erase(self):
#print('[DEBUG] erasing spi flash.')
#printf('[DEBUG] erasing spi flash.')
self._port.write(b'\xc0\xd3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc0')
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
#print('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
#printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
def install_flash_bootloader(self, data):
@ -961,28 +1007,29 @@ class KFlash:
from elftools.elf.elffile import ELFFile
from elftools.elf.descriptions import describe_p_type
except ImportError:
print(ERROR_MSG,'pyelftools must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install pyelftools`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'pyelftools must be installed, run '+BASH_TIPS['GREEN']+'`' + ('pip', 'pip3')[sys.version_info > (3, 0)] + ' install pyelftools`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
elffile = ELFFile(f)
if elffile['e_entry'] != 0x80000000:
print(WARN_MSG,"ELF entry is 0x%x instead of 0x80000000" % (elffile['e_entry']), BASH_TIPS['DEFAULT'])
printf(WARN_MSG,"ELF entry is 0x%x instead of 0x80000000" % (elffile['e_entry']), BASH_TIPS['DEFAULT'])
for segment in elffile.iter_segments():
t = describe_p_type(segment['p_type'])
print(INFO_MSG, ("Program Header: Size: %d, Virtual Address: 0x%x, Type: %s" % (segment['p_filesz'], segment['p_vaddr'], t)), BASH_TIPS['DEFAULT'])
printf(INFO_MSG, ("Program Header: Size: %d, Virtual Address: 0x%x, Type: %s" % (segment['p_filesz'], segment['p_vaddr'], t)), BASH_TIPS['DEFAULT'])
if not (segment['p_vaddr'] & 0x80000000):
continue
if segment['p_filesz']==0 or segment['p_vaddr']==0:
print("Skipped")
printf("Skipped")
continue
self.flash_dataframe(segment.data(), segment['p_vaddr'])
def flash_firmware(self, firmware_bin, aes_key = None, address_offset = 0, sha256Prefix = True):
def flash_firmware(self, firmware_bin, aes_key = None, address_offset = 0, sha256Prefix = True, filename = ""):
# type: (bytes, bytes, int, bool) -> None
# Don't remove above code!
#print('[DEBUG] flash_firmware DEBUG: aeskey=', aes_key)
#printf('[DEBUG] flash_firmware DEBUG: aeskey=', aes_key)
if sha256Prefix == True:
# Add header to the firmware
@ -1012,17 +1059,34 @@ class KFlash:
time_start = time.time()
for n, chunk in enumerate(data_chunks):
self.checkKillExit()
chunk = chunk.ljust(ISP_FLASH_DATA_FRAME_SIZE, b'\x00') # align by size of dataframe
# Download a dataframe
#print('[INFO]', 'Write firmware data piece')
#printf('[INFO]', 'Write firmware data piece')
self.dump_to_flash(chunk, address= n * ISP_FLASH_DATA_FRAME_SIZE + address_offset)
columns, lines = get_terminal_size()
time_delta = time.time() - time_start
speed = ''
if (time_delta > 1):
speed = str(int((n + 1) * ISP_FLASH_DATA_FRAME_SIZE / 1024.0 / time_delta)) + 'kiB/s'
printProgressBar(n+1, total_chunk, prefix = 'Programming BIN:', suffix = speed, length = columns - 35)
printProgressBar(n+1, total_chunk, prefix = 'Programming BIN:', filename=filename, suffix = speed, length = columns - 35)
def kill(self):
self._kill_process = True
def checkKillExit(self):
if self._kill_process:
self._port.close()
self._kill_process = False
raise Exception("Cancel")
def printf(self, *args):
if self.print_callback:
self.print_callback(args)
else:
printf(*args)
def open_terminal(reset):
control_signal = '0' if reset else '1'
@ -1033,7 +1097,7 @@ class KFlash:
sys.argv = [sys.argv[0], _port, '115200', '--dtr='+control_signal, '--rts='+control_signal, '--filter=direct']
serial.tools.miniterm.main(default_port=_port, default_baudrate=115200, default_dtr=control_signal_b, default_rts=control_signal_b)
sys.exit(0)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="COM Port", default="DEFAULT")
parser.add_argument("-f", "--flash", help="SPI Flash type, 0 for SPI3, 1 for SPI0", default=1)
@ -1047,10 +1111,20 @@ class KFlash:
parser.add_argument("-B", "--Board",required=False, type=str, help="Select dev board, e.g. kd233, dan, bit, goD, goE or trainer")
parser.add_argument("-S", "--Slow",required=False, help="Slow download mode", default=False)
parser.add_argument("firmware", help="firmware bin path")
if terminal:
parser.add_argument("firmware", help="firmware bin path")
args = parser.parse_args()
# udpate args for none terminal call
if not terminal:
args.port = dev
args.baudrate = baudrate
args.noansi = noansi
args.sram = sram
args.Board = board
args.firmware = file
if (args.noansi == True):
BASH_TIPS = dict(NORMAL='',BOLD='',DIM='',UNDERLINE='',
DEFAULT='', RED='', YELLOW='', GREEN='',
@ -1058,7 +1132,7 @@ class KFlash:
ERROR_MSG = BASH_TIPS['RED']+BASH_TIPS['BOLD']+'[ERROR]'+BASH_TIPS['NORMAL']
WARN_MSG = BASH_TIPS['YELLOW']+BASH_TIPS['BOLD']+'[WARN]'+BASH_TIPS['NORMAL']
INFO_MSG = BASH_TIPS['GREEN']+BASH_TIPS['BOLD']+'[INFO]'+BASH_TIPS['NORMAL']
print(INFO_MSG,'ANSI colors not used',BASH_TIPS['DEFAULT'])
printf(INFO_MSG,'ANSI colors not used',BASH_TIPS['DEFAULT'])
manually_set_the_board = False
if args.Board:
@ -1068,50 +1142,54 @@ class KFlash:
if args.Board == "goE":
list_port_info = list(serial.tools.list_ports.grep("0403")) #Take the second one
if len(list_port_info) == 0:
print(ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
list_port_info.sort()
if len(list_port_info) == 1:
_port = list_port_info[0].device
elif len(list_port_info) > 1:
_port = list_port_info[1].device
print(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
elif args.Board == "trainer":
list_port_info = list(serial.tools.list_ports.grep("0403")) #Take the first one
if(len(list_port_info)==0):
print(ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
list_port_info.sort()
_port = list_port_info[0].device
print(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
else:
try:
list_port_info = next(serial.tools.list_ports.grep(VID_LIST_FOR_AUTO_LOOKUP)) #Take the first one within the list
_port = list_port_info.device
print(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
except StopIteration:
print(ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
else:
_port = args.port
print(INFO_MSG,"COM Port Selected Manually: ", _port, BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"COM Port Selected Manually: ", _port, BASH_TIPS['DEFAULT'])
loader = MAIXLoader(port=_port, baudrate=115200)
self.loader = MAIXLoader(port=_port, baudrate=115200)
file_format = ProgramFileFormat.FMT_BINARY
# 0. Check firmware
try:
firmware_bin = open(args.firmware, 'rb')
except FileNotFoundError:
print(ERROR_MSG,'Unable to find the firmware at ', args.firmware, BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'Unable to find the firmware at ', args.firmware, BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
with open(args.firmware, 'rb') as f:
file_header = f.read(4)
#if file_header.startswith(bytes([0x50, 0x4B])):
if file_header.startswith(b'\x50\x4B'):
if ".kfpkg" != os.path.splitext(args.firmware)[1]:
print(INFO_MSG, 'Find a zip file, but not with ext .kfpkg:', args.firmware, BASH_TIPS['DEFAULT'])
printf(INFO_MSG, 'Find a zip file, but not with ext .kfpkg:', args.firmware, BASH_TIPS['DEFAULT'])
else:
file_format = ProgramFileFormat.FMT_KFPKG
@ -1119,93 +1197,95 @@ class KFlash:
if file_header.startswith(b'\x7f\x45\x4c\x46'):
file_format = ProgramFileFormat.FMT_ELF
if args.sram:
print(INFO_MSG, 'Find an ELF file:', args.firmware, BASH_TIPS['DEFAULT'])
printf(INFO_MSG, 'Find an ELF file:', args.firmware, BASH_TIPS['DEFAULT'])
else:
print(ERROR_MSG, 'This is an ELF file and cannot be programmed to flash directly:', args.firmware, BASH_TIPS['DEFAULT'])
print(ERROR_MSG, 'Please retry:', args.firmware + '.bin', BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG, 'This is an ELF file and cannot be programmed to flash directly:', args.firmware, BASH_TIPS['DEFAULT'] , '\r\nPlease retry:', args.firmware + '.bin', BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
# 1. Greeting.
print(INFO_MSG,"Trying to Enter the ISP Mode...",BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Trying to Enter the ISP Mode...",BASH_TIPS['DEFAULT'])
retry_count = 0
while 1:
self.checkKillExit()
retry_count = retry_count + 1
if retry_count > 15:
print("\n" + ERROR_MSG,"No vaild Kendryte K210 found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`-p '+('/dev/ttyUSB0', 'COM3')[sys.platform == 'win32']+'`',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,"No vaild Kendryte K210 found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`-p '+('/dev/ttyUSB0', 'COM3')[sys.platform == 'win32']+'`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
if args.Board == "dan" or args.Board == "bit" or args.Board == "trainer":
try:
print('.', end='')
loader.reset_to_isp_dan()
loader.greeting()
printf('.', end='')
self.loader.reset_to_isp_dan()
self.loader.greeting()
break
except TimeoutError:
pass
elif args.Board == "kd233":
try:
print('_', end='')
loader.reset_to_isp_kd233()
loader.greeting()
printf('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
break
except TimeoutError:
pass
elif args.Board == "goE":
try:
print('*', end='')
loader.reset_to_isp_kd233()
loader.greeting()
printf('*', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
break
except TimeoutError:
pass
elif args.Board == "goD":
try:
print('#', end='')
loader.reset_to_isp_goD()
loader.greeting()
printf('#', end='')
self.loader.reset_to_isp_goD()
self.loader.greeting()
break
except TimeoutError:
pass
else:
try:
print('.', end='')
loader.reset_to_isp_dan()
loader.greeting()
printf('.', end='')
self.loader.reset_to_isp_dan()
self.loader.greeting()
args.Board = "dan"
print()
print(INFO_MSG,"Automatically detected dan/bit/trainer",BASH_TIPS['DEFAULT'])
printf()
printf(INFO_MSG,"Automatically detected dan/bit/trainer",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
print('_', end='')
loader.reset_to_isp_kd233()
loader.greeting()
printf('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
args.Board = "kd233"
print()
print(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
printf()
printf(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
print('.', end='')
loader.reset_to_isp_goD()
loader.greeting()
printf('.', end='')
self.loader.reset_to_isp_goD()
self.loader.greeting()
args.Board = "goD"
print()
print(INFO_MSG,"Automatically detected goD",BASH_TIPS['DEFAULT'])
printf()
printf(INFO_MSG,"Automatically detected goD",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
# Magic, just repeat, don't remove, it may unstable, don't know why.
print('_', end='')
loader.reset_to_isp_kd233()
loader.greeting()
printf('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
args.Board = "kd233"
print()
print(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
printf()
printf(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
@ -1214,107 +1294,128 @@ class KFlash:
# Dangerous, here are dinosaur infested!!!!!
ISP_RECEIVE_TIMEOUT = 3
print()
print(INFO_MSG,"Greeting Message Detected, Start Downloading ISP",BASH_TIPS['DEFAULT'])
printf()
printf(INFO_MSG,"Greeting Message Detected, Start Downloading ISP",BASH_TIPS['DEFAULT'])
if manually_set_the_board and (not args.Slow):
if (args.baudrate >= 1500000) or args.sram:
loader.change_baudrate_stage0(args.baudrate)
self.loader.change_baudrate_stage0(args.baudrate)
# 2. download bootloader and firmware
if args.sram:
if file_format == ProgramFileFormat.FMT_KFPKG:
print(ERROR_MSG, "Unable to load kfpkg to SRAM")
sys.exit(1)
err = (ERROR_MSG, "Unable to load kfpkg to SRAM")
err = tuple2str(err)
raise Exception(err)
elif file_format == ProgramFileFormat.FMT_ELF:
loader.load_elf_to_sram(firmware_bin)
self.loader.load_elf_to_sram(firmware_bin)
else:
loader.install_flash_bootloader(firmware_bin.read())
self.loader.install_flash_bootloader(firmware_bin.read())
else:
# install bootloader at 0x80000000
isp_loader = open(args.bootloader, 'rb').read() if args.bootloader else ISP_PROG
loader.install_flash_bootloader(isp_loader)
self.loader.install_flash_bootloader(isp_loader)
# Boot the code from SRAM
loader.boot()
self.loader.boot()
if args.sram:
# Dangerous, here are dinosaur infested!!!!!
# Don't touch this code unless you know what you are doing
loader._port.baudrate = args.baudrate
print(INFO_MSG,"Boot user code from SRAM", BASH_TIPS['DEFAULT'])
self.loader._port.baudrate = args.baudrate
printf(INFO_MSG,"Boot user code from SRAM", BASH_TIPS['DEFAULT'])
if(args.terminal == True):
open_terminal(False)
exit(0)
msg = "Burn SRAM OK"
raise Exception(msg)
# Dangerous, here are dinosaur infested!!!!!
# Don't touch this code unless you know what you are doing
loader._port.baudrate = 115200
self.loader._port.baudrate = 115200
print(INFO_MSG,"Wait For 0.1 second for ISP to Boot", BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Wait For 0.1 second for ISP to Boot", BASH_TIPS['DEFAULT'])
time.sleep(0.1)
loader.flash_greeting()
self.loader.flash_greeting()
if args.baudrate != 115200:
loader.change_baudrate(args.baudrate)
print(INFO_MSG,"Baudrate changed, greeting with ISP again ... ", BASH_TIPS['DEFAULT'])
loader.flash_greeting()
self.loader.change_baudrate(args.baudrate)
printf(INFO_MSG,"Baudrate changed, greeting with ISP again ... ", BASH_TIPS['DEFAULT'])
self.loader.flash_greeting()
loader.init_flash(args.flash)
self.loader.init_flash(args.flash)
if file_format == ProgramFileFormat.FMT_KFPKG:
print(INFO_MSG,"Extracting KFPKG ... ", BASH_TIPS['DEFAULT'])
printf(INFO_MSG,"Extracting KFPKG ... ", BASH_TIPS['DEFAULT'])
firmware_bin.close()
with tempfile.TemporaryDirectory() as tmpdir:
try:
with zipfile.ZipFile(args.firmware) as zf:
zf.extractall(tmpdir)
except zipfile.BadZipFile:
print(ERROR_MSG,'Unable to Decompress the kfpkg, your file might be corrupted.',BASH_TIPS['DEFAULT'])
sys.exit(1)
err = (ERROR_MSG,'Unable to Decompress the kfpkg, your file might be corrupted.',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
fFlashList = open(os.path.join(tmpdir, 'flash-list.json'), "r")
sFlashList = re.sub(r'"address": (.*),', r'"address": "\1",', fFlashList.read()) #Pack the Hex Number in json into str
fFlashList.close()
jsonFlashList = json.loads(sFlashList)
for lBinFiles in jsonFlashList['files']:
print(INFO_MSG,"Writing",lBinFiles['bin'],"into","0x%08x"%int(lBinFiles['address'], 0),BASH_TIPS['DEFAULT'])
firmware_bin = open(os.path.join(tmpdir, lBinFiles["bin"]), "rb")
loader.flash_firmware(firmware_bin.read(), None, int(lBinFiles['address'], 0), lBinFiles['sha256Prefix'])
firmware_bin.close()
self.checkKillExit()
printf(INFO_MSG,"Writing",lBinFiles['bin'],"into","0x%08x"%int(lBinFiles['address'], 0),BASH_TIPS['DEFAULT'])
with open(os.path.join(tmpdir, lBinFiles["bin"]), "rb") as firmware_bin:
self.loader.flash_firmware(firmware_bin.read(), None, int(lBinFiles['address'], 0), lBinFiles['sha256Prefix'], filename=lBinFiles['bin'])
else:
if args.key:
aes_key = binascii.a2b_hex(args.key)
if len(aes_key) != 16:
raise ValueError('AES key must by 16 bytes')
loader.flash_firmware(firmware_bin.read(), aes_key=aes_key)
self.loader.flash_firmware(firmware_bin.read(), aes_key=aes_key)
else:
loader.flash_firmware(firmware_bin.read())
self.loader.flash_firmware(firmware_bin.read())
# 3. boot
if args.Board == "dan" or args.Board == "bit" or args.Board == "trainer":
loader.reset_to_boot_dan()
self.loader.reset_to_boot_dan()
elif args.Board == "kd233":
loader.reset_to_boot_kd233()
self.loader.reset_to_boot_kd233()
elif args.Board == "goE":
loader.reset_to_boot_maixgo()
self.loader.reset_to_boot_maixgo()
elif args.Board == "goD":
loader.reset_to_boot_goD()
self.loader.reset_to_boot_goD()
else:
print(WARN_MSG,"Board unknown !! please press reset to boot!!")
printf(WARN_MSG,"Board unknown !! please press reset to boot!!")
print(INFO_MSG,"Rebooting...", BASH_TIPS['DEFAULT'])
loader._port.close()
printf(INFO_MSG,"Rebooting...", BASH_TIPS['DEFAULT'])
self.loader._port.close()
if(args.terminal == True):
open_terminal(True)
def kill(self):
if self.loader:
self.loader.kill()
self.killProcess = True
def checkKillExit(self):
if self.killProcess:
if self.loader:
self.loader._port.close()
raise Exception("Cancel")
def main():
kflash = KFlash()
kflash.process()
try:
kflash.process()
except Exception as e:
if str(e) == "Burn SRAM OK":
sys.exit(0)
print(str(e))
sys.exit(1)
if __name__ == '__main__':
main()