Refactor the code to support python2

* Move tuple2str as internal function
* Refactor printf function to support variable parameter(kwargs)
* Packaged getTerminalSize functions
* Add KFlash.log method

Signed-off-by: Huang Rui <vowstar@gmail.com>
master v0.9.0
Huang Rui 2019-07-04 10:48:58 +08:00
parent e58a6a4d64
commit 5c1bffaed7
1 changed files with 202 additions and 205 deletions

407
kflash.py
View File

@ -17,31 +17,24 @@ import json
import re
import os
def tuple2str(t):
ret = ""
for i in t:
ret += i+" "
return ret
g_print_callback = None
def printf(*args, end="\n"):
global g_print_callback
if g_print_callback:
g_print_callback(*args, end=end)
else:
print(*args, end = end)
class KFlash:
PRINT_CALLBACK = None
def __init__(self, print_callback = None):
self.killProcess = False
self.loader = None
global g_print_callback
g_print_callback = print_callback
self.PRINT_CALLBACK = print_callback
@staticmethod
def log(*args, **kwargs):
if KFlash.PRINT_CALLBACK:
KFlash.PRINT_CALLBACK(*args, **kwargs)
else:
print(*args, **kwargs)
def process(self, terminal=True, dev="", baudrate=1500000, board="DEFAULT", sram = False, file="", callback=None, noansi=False):
self.killProcess = False
BASH_TIPS = dict(NORMAL='\033[0m',BOLD='\033[1m',DIM='\033[2m',UNDERLINE='\033[4m',
DEFAULT='\033[0m', RED='\033[31m', YELLOW='\033[33m', GREEN='\033[32m',
BG_DEFAULT='\033[49m', BG_WHITE='\033[107m')
@ -59,6 +52,12 @@ class KFlash:
ISP_FLASH_SECTOR_SIZE = 4096
ISP_FLASH_DATA_FRAME_SIZE = ISP_FLASH_SECTOR_SIZE * 16
def tuple2str(t):
ret = ""
for i in t:
ret += i+" "
return ret
try:
from enum import Enum
except ImportError:
@ -307,10 +306,10 @@ class KFlash:
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
printf('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = '\r')
KFlash.log('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = '\r')
# Print New Line on Complete
if iteration == total:
printf()
KFlash.log()
if callback:
fileTypeStr = filename
if prefix == "Downloading ISP:":
@ -390,7 +389,7 @@ class KFlash:
if ISPResponse.ISPOperation(op) == ISPResponse.ISPOperation.ISP_DEBUG_INFO:
text = data[2:].decode()
except ValueError:
printf('Warning: recv unknown op', op)
KFlash.log('Warning: recv unknown op', op)
return (op, reason, text)
@ -438,97 +437,102 @@ class KFlash:
for i in range(0, len(l), n):
yield l[i:i + n]
def getTerminalSize():
import platform
current_os = platform.system()
tuple_xy=None
if current_os == 'Windows':
tuple_xy = _getTerminalSize_windows()
if tuple_xy is None:
tuple_xy = _getTerminalSize_tput()
# needed for window's python in cygwin's xterm!
if current_os == 'Linux' or current_os == 'Darwin' or current_os.startswith('CYGWIN'):
tuple_xy = _getTerminalSize_linux()
if tuple_xy is None:
# Use default value
tuple_xy = (80, 25) # default value
return tuple_xy
class TerminalSize:
@staticmethod
def getTerminalSize():
import platform
current_os = platform.system()
tuple_xy=None
if current_os == 'Windows':
tuple_xy = TerminalSize._getTerminalSize_windows()
if tuple_xy is None:
tuple_xy = TerminalSize._getTerminalSize_tput()
# needed for window's python in cygwin's xterm!
if current_os == 'Linux' or current_os == 'Darwin' or current_os.startswith('CYGWIN'):
tuple_xy = TerminalSize._getTerminalSize_linux()
if tuple_xy is None:
# Use default value
tuple_xy = (80, 25) # default value
return tuple_xy
def _getTerminalSize_windows():
res=None
try:
from ctypes import windll, create_string_buffer
# stdin handle is -10
# stdout handle is -11
# stderr handle is -12
h = windll.kernel32.GetStdHandle(-12)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
except:
return None
if res:
import struct
(bufx, bufy, curx, cury, wattr,
left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
sizex = right - left + 1
sizey = bottom - top + 1
return sizex, sizey
else:
return None
def _getTerminalSize_tput():
# get terminal width
# src: http://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window
try:
import subprocess
proc=subprocess.Popen(["tput", "cols"],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
output=proc.communicate(input=None)
cols=int(output[0])
proc=subprocess.Popen(["tput", "lines"],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
output=proc.communicate(input=None)
rows=int(output[0])
return (cols,rows)
except:
return None
def _getTerminalSize_linux():
def ioctl_GWINSZ(fd):
@staticmethod
def _getTerminalSize_windows():
res=None
try:
import fcntl, termios, struct, os
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,'1234'))
from ctypes import windll, create_string_buffer
# stdin handle is -10
# stdout handle is -11
# stderr handle is -12
h = windll.kernel32.GetStdHandle(-12)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
if res:
import struct
(bufx, bufy, curx, cury, wattr,
left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
sizex = right - left + 1
sizey = bottom - top + 1
return sizex, sizey
else:
return None
@staticmethod
def _getTerminalSize_tput():
# get terminal width
# src: http://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LINES'], os.env['COLUMNS'])
import subprocess
proc=subprocess.Popen(["tput", "cols"],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
output=proc.communicate(input=None)
cols=int(output[0])
proc=subprocess.Popen(["tput", "lines"],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
output=proc.communicate(input=None)
rows=int(output[0])
return (cols,rows)
except:
return None
return int(cr[1]), int(cr[0])
def get_terminal_size(fallback=(100, 24)):
try:
columns, rows = getTerminalSize()
except:
columns, rows = fallback
if not terminal:
columns, rows = (50,1)
return columns, rows
@staticmethod
def _getTerminalSize_linux():
def ioctl_GWINSZ(fd):
try:
import fcntl, termios, struct, os
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,'1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LINES'], os.env['COLUMNS'])
except:
return None
return int(cr[1]), int(cr[0])
@staticmethod
def get_terminal_size(fallback=(100, 24), terminal = False):
try:
columns, rows = TerminalSize.getTerminalSize()
except:
columns, rows = fallback
if not terminal:
columns, rows = (50,1)
return columns, rows
class MAIXLoader:
def change_baudrate(self, baudrate):
printf(INFO_MSG,"Selected Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Selected Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
out = struct.pack('III', 0, 4, baudrate)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd6, 0x00) + crc32_checksum + out
@ -538,7 +542,7 @@ class KFlash:
if args.Board == "goE":
if baudrate >= 4500000:
# OPENEC super baudrate
printf(INFO_MSG, "Enable OPENEC super baudrate!!!", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG, "Enable OPENEC super baudrate!!!", BASH_TIPS['DEFAULT'])
if baudrate == 4500000:
self._port.baudrate = 300
if baudrate == 6000000:
@ -554,9 +558,9 @@ class KFlash:
# rgwan <dv.xw@qq.com>
baudrate = 1500000
if args.Board == "goE" or args.Board == "trainer":
printf(INFO_MSG,"Selected Stage0 Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Selected Stage0 Baudrate: ", baudrate, BASH_TIPS['DEFAULT'])
# This is for openec, contained ft2232, goE and trainer
printf(INFO_MSG,"FT2232 mode", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"FT2232 mode", BASH_TIPS['DEFAULT'])
baudrate_stage0 = int(baudrate * 38.6 / 38)
out = struct.pack('III', 0, 4, baudrate_stage0)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
@ -579,13 +583,13 @@ class KFlash:
except TimeoutError:
pass
elif args.Board == "dan" or args.Board == "bit" or args.Board == "kd233":
printf(INFO_MSG,"CH340 mode", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"CH340 mode", BASH_TIPS['DEFAULT'])
# This is for CH340, contained dan, bit and kd233
baudrate_stage0 = int(baudrate * 38.4 / 38)
# CH340 can not use this method, test failed, take risks at your own risk
else:
# This is for unknown board
printf(WARN_MSG,"Unknown mode", BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Unknown mode", BASH_TIPS['DEFAULT'])
def __init__(self, port='/dev/ttyUSB1', baudrate=115200):
# configure the serial connections (the parameters differs on the device you are connecting to)
@ -597,7 +601,7 @@ class KFlash:
bytesize=serial.EIGHTBITS,
timeout=0.1
)
printf(INFO_MSG, "Default baudrate is", baudrate, ", later it may be changed to the value you set.", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG, "Default baudrate is", baudrate, ", later it may be changed to the value you set.", BASH_TIPS['DEFAULT'])
self._port.isOpen()
self._slip_reader = slip_reader(self._port)
@ -614,7 +618,7 @@ class KFlash:
buf = b'\xc0' \
+ (packet.replace(b'\xdb', b'\xdb\xdd').replace(b'\xc0', b'\xdb\xdc')) \
+ b'\xc0'
#printf('[WRITE]', binascii.hexlify(buf))
#KFlash.log('[WRITE]', binascii.hexlify(buf))
return self._port.write(buf)
def read_loop(self):
@ -622,7 +626,7 @@ class KFlash:
# while self._port.inWaiting() > 0:
# out += self._port.read(1)
# printf(out)
# KFlash.log(out)
while 1:
sys.stdout.write('[RECV] raw data: ')
sys.stdout.write(binascii.hexlify(self._port.read(1)).decode())
@ -673,12 +677,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- RESET to LOW, IO16 to HIGH --')
#KFlash.log('-- RESET to LOW, IO16 to HIGH --')
# Pull reset down and keep 10ms
self._port.setDTR (True)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- IO16 to LOW, RESET to HIGH --')
#KFlash.log('-- IO16 to LOW, RESET to HIGH --')
# Pull IO16 to low and release reset
self._port.setRTS (True)
self._port.setDTR (False)
@ -687,12 +691,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- RESET to LOW --')
#KFlash.log('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setDTR (True)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- RESET to HIGH, BOOT --')
#KFlash.log('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -703,12 +707,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- RESET to LOW, IO16 to HIGH --')
#KFlash.log('-- RESET to LOW, IO16 to HIGH --')
# Pull reset down and keep 10ms
self._port.setDTR (False)
self._port.setRTS (True)
time.sleep(0.1)
#printf('-- IO16 to LOW, RESET to HIGH --')
#KFlash.log('-- IO16 to LOW, RESET to HIGH --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (True)
@ -717,12 +721,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.1)
#printf('-- RESET to LOW --')
#KFlash.log('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setDTR (False)
self._port.setRTS (True)
time.sleep(0.1)
#printf('-- RESET to HIGH, BOOT --')
#KFlash.log('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -733,12 +737,12 @@ class KFlash:
self._port.setDTR (True) ## output 0
self._port.setRTS (True)
time.sleep(0.01)
#printf('-- RESET to LOW --')
#KFlash.log('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#printf('-- RESET to HIGH, BOOT --')
#KFlash.log('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (True)
@ -747,12 +751,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.01)
#printf('-- RESET to LOW --')
#KFlash.log('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#printf('-- RESET to HIGH, BOOT --')
#KFlash.log('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (True)
self._port.setDTR (True)
@ -763,12 +767,12 @@ class KFlash:
self._port.setDTR (False)
self._port.setRTS (False)
time.sleep(0.01)
#printf('-- RESET to LOW --')
#KFlash.log('-- RESET to LOW --')
# Pull reset down and keep 10ms
self._port.setRTS (False)
self._port.setDTR (True)
time.sleep(0.01)
#printf('-- RESET to HIGH, BOOT --')
#KFlash.log('-- RESET to HIGH, BOOT --')
# Pull IO16 to low and release reset
self._port.setRTS (False)
self._port.setDTR (False)
@ -778,7 +782,7 @@ class KFlash:
self._port.write(b'\xc0\xc2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc0')
op, reason, text = ISPResponse.parse(self.recv_one_return())
#printf('MAIX return op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
#KFlash.log('MAIX return op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
def flash_greeting(self):
@ -794,7 +798,7 @@ class KFlash:
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except TimeoutError:
@ -802,7 +806,7 @@ class KFlash:
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except:
@ -810,13 +814,13 @@ class KFlash:
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
# printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# KFlash.log('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if FlashModeResponse.Operation(op) == FlashModeResponse.Operation.ISP_NOP and FlashModeResponse.ErrorCode(reason) == FlashModeResponse.ErrorCode.ISP_RET_OK:
printf(INFO_MSG,"Boot to Flashmode Successfully",BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Boot to Flashmode Successfully",BASH_TIPS['DEFAULT'])
self._port.flushInput()
self._port.flushOutput()
break
@ -825,12 +829,12 @@ class KFlash:
err = (ERROR_MSG,"Failed to Connect to K210's Stub",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
def boot(self, address=0x80000000):
printf(INFO_MSG,"Booting From " + hex(address),BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Booting From " + hex(address),BASH_TIPS['DEFAULT'])
out = struct.pack('II', address, 0)
@ -841,33 +845,33 @@ class KFlash:
def recv_debug(self):
op, reason, text = ISPResponse.parse(self.recv_one_return())
#printf('[RECV] op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
#KFlash.log('[RECV] op:', ISPResponse.ISPOperation(op).name, 'reason:', ISPResponse.ErrorCode(reason).name)
if text:
printf('-' * 30)
printf(text)
printf('-' * 30)
KFlash.log('-' * 30)
KFlash.log(text)
KFlash.log('-' * 30)
if ISPResponse.ErrorCode(reason) not in (ISPResponse.ErrorCode.ISP_RET_DEFAULT, ISPResponse.ErrorCode.ISP_RET_OK):
printf('Failed, retry, errcode=', hex(reason))
KFlash.log('Failed, retry, errcode=', hex(reason))
return False
return True
def flash_recv_debug(self):
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
#printf('[Flash-RECV] op:', FlashModeResponse.Operation(op).name, 'reason:',
#KFlash.log('[Flash-RECV] op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if text:
printf('-' * 30)
printf(text)
printf('-' * 30)
KFlash.log('-' * 30)
KFlash.log(text)
KFlash.log('-' * 30)
if FlashModeResponse.ErrorCode(reason) not in (FlashModeResponse.ErrorCode.ISP_RET_OK, FlashModeResponse.ErrorCode.ISP_RET_OK):
printf('Failed, retry')
KFlash.log('Failed, retry')
return False
return True
def init_flash(self, chip_type):
chip_type = int(chip_type)
printf(INFO_MSG,"Selected Flash: ",("In-Chip", "On-Board")[chip_type],BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Selected Flash: ",("In-Chip", "On-Board")[chip_type],BASH_TIPS['DEFAULT'])
out = struct.pack('II', chip_type, 0)
crc32_checksum = struct.pack('I', binascii.crc32(out) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd7, 0x00) + crc32_checksum + out
@ -884,7 +888,7 @@ class KFlash:
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Index Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except TimeoutError:
@ -892,7 +896,7 @@ class KFlash:
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Timeout Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
except:
@ -900,27 +904,27 @@ class KFlash:
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Unexcepted Error, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
# printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# KFlash.log('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
if FlashModeResponse.Operation(op) == FlashModeResponse.Operation.FLASHMODE_FLASH_INIT and FlashModeResponse.ErrorCode(reason) == FlashModeResponse.ErrorCode.ISP_RET_OK:
printf(INFO_MSG,"Initialization flash Successfully",BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Initialization flash Successfully",BASH_TIPS['DEFAULT'])
break
else:
if retry_count > MAX_RETRY_TIMES:
err = (ERROR_MSG,"Failed to initialize flash",BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
printf(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"Unexcepted Return recevied, retrying...",BASH_TIPS['DEFAULT'])
time.sleep(0.1)
continue
def flash_dataframe(self, data, address=0x80000000):
DATAFRAME_SIZE = 1024
data_chunks = chunks(data, DATAFRAME_SIZE)
#printf('[DEBUG] flash dataframe | data length:', len(data))
#KFlash.log('[DEBUG] flash dataframe | data length:', len(data))
total_chunk = math.ceil(len(data)/DATAFRAME_SIZE)
time_start = time.time()
@ -928,21 +932,21 @@ class KFlash:
self.checkKillExit()
while 1:
self.checkKillExit()
#printf('[INFO] sending chunk', i, '@address', hex(address), 'chunklen', len(chunk))
#KFlash.log('[INFO] sending chunk', i, '@address', hex(address), 'chunklen', len(chunk))
out = struct.pack('II', address, len(chunk))
crc32_checksum = struct.pack('I', binascii.crc32(out + chunk) & 0xFFFFFFFF)
out = struct.pack('HH', 0xc3, 0x00) + crc32_checksum + out + chunk # op: ISP_MEMORY_WRITE: 0xc3
sent = self.write(out)
#printf('[INFO]', 'sent', sent, 'bytes', 'checksum', binascii.hexlify(crc32_checksum).decode())
#KFlash.log('[INFO]', 'sent', sent, 'bytes', 'checksum', binascii.hexlify(crc32_checksum).decode())
address += len(chunk)
if self.recv_debug():
break
columns, lines = get_terminal_size()
columns, lines = TerminalSize.get_terminal_size((100, 24), terminal)
time_delta = time.time() - time_start
speed = ''
if (time_delta > 1):
@ -962,23 +966,23 @@ class KFlash:
DATAFRAME_SIZE = ISP_FLASH_DATA_FRAME_SIZE
data_chunks = chunks(data, DATAFRAME_SIZE)
#printf('[DEBUG] flash dataframe | data length:', len(data))
#KFlash.log('[DEBUG] flash dataframe | data length:', len(data))
for n, chunk in enumerate(data_chunks):
#printf('[INFO] sending chunk', i, '@address', hex(address))
#KFlash.log('[INFO] sending chunk', i, '@address', hex(address))
out = struct.pack('II', address, len(chunk))
crc32_checksum = struct.pack('I', binascii.crc32(out + chunk) & 0xFFFFFFFF)
out = struct.pack('HH', 0xd4, 0x00) + crc32_checksum + out + chunk
#printf("[$$$$]", binascii.hexlify(out[:32]).decode())
#KFlash.log("[$$$$]", binascii.hexlify(out[:32]).decode())
retry_count = 0
while True:
try:
sent = self.write(out)
#printf('[INFO]', 'sent', sent, 'bytes', 'checksum', crc32_checksum)
#KFlash.log('[INFO]', 'sent', sent, 'bytes', 'checksum', crc32_checksum)
self.flash_recv_debug()
except:
retry_count = retry_count + 1
@ -993,10 +997,10 @@ class KFlash:
def flash_erase(self):
#printf('[DEBUG] erasing spi flash.')
#KFlash.log('[DEBUG] erasing spi flash.')
self._port.write(b'\xc0\xd3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc0')
op, reason, text = FlashModeResponse.parse(self.recv_one_return())
#printf('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
#KFlash.log('MAIX return op:', FlashModeResponse.Operation(op).name, 'reason:',
# FlashModeResponse.ErrorCode(reason).name)
def install_flash_bootloader(self, data):
@ -1014,15 +1018,15 @@ class KFlash:
elffile = ELFFile(f)
if elffile['e_entry'] != 0x80000000:
printf(WARN_MSG,"ELF entry is 0x%x instead of 0x80000000" % (elffile['e_entry']), BASH_TIPS['DEFAULT'])
KFlash.log(WARN_MSG,"ELF entry is 0x%x instead of 0x80000000" % (elffile['e_entry']), BASH_TIPS['DEFAULT'])
for segment in elffile.iter_segments():
t = describe_p_type(segment['p_type'])
printf(INFO_MSG, ("Program Header: Size: %d, Virtual Address: 0x%x, Type: %s" % (segment['p_filesz'], segment['p_vaddr'], t)), BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG, ("Program Header: Size: %d, Virtual Address: 0x%x, Type: %s" % (segment['p_filesz'], segment['p_vaddr'], t)), BASH_TIPS['DEFAULT'])
if not (segment['p_vaddr'] & 0x80000000):
continue
if segment['p_filesz']==0 or segment['p_vaddr']==0:
printf("Skipped")
KFlash.log("Skipped")
continue
self.flash_dataframe(segment.data(), segment['p_vaddr'])
@ -1030,7 +1034,7 @@ class KFlash:
# type: (bytes, bytes, int, bool) -> None
# Don't remove above code!
#printf('[DEBUG] flash_firmware DEBUG: aeskey=', aes_key)
#KFlash.log('[DEBUG] flash_firmware DEBUG: aeskey=', aes_key)
if sha256Prefix == True:
# Add header to the firmware
@ -1064,9 +1068,9 @@ class KFlash:
chunk = chunk.ljust(ISP_FLASH_DATA_FRAME_SIZE, b'\x00') # align by size of dataframe
# Download a dataframe
#printf('[INFO]', 'Write firmware data piece')
#KFlash.log('[INFO]', 'Write firmware data piece')
self.dump_to_flash(chunk, address= n * ISP_FLASH_DATA_FRAME_SIZE + address_offset)
columns, lines = get_terminal_size()
columns, lines = TerminalSize.get_terminal_size((100, 24), terminal)
time_delta = time.time() - time_start
speed = ''
if (time_delta > 1):
@ -1082,13 +1086,6 @@ class KFlash:
self._kill_process = False
raise Exception("Cancel")
def printf(self, *args):
if self.print_callback:
self.print_callback(args)
else:
printf(*args)
def open_terminal(reset):
control_signal = '0' if reset else '1'
control_signal_b = not reset
@ -1134,7 +1131,7 @@ class KFlash:
ERROR_MSG = BASH_TIPS['RED']+BASH_TIPS['BOLD']+'[ERROR]'+BASH_TIPS['NORMAL']
WARN_MSG = BASH_TIPS['YELLOW']+BASH_TIPS['BOLD']+'[WARN]'+BASH_TIPS['NORMAL']
INFO_MSG = BASH_TIPS['GREEN']+BASH_TIPS['BOLD']+'[INFO]'+BASH_TIPS['NORMAL']
printf(INFO_MSG,'ANSI colors not used',BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,'ANSI colors not used',BASH_TIPS['DEFAULT'])
manually_set_the_board = False
if args.Board:
@ -1152,7 +1149,7 @@ class KFlash:
_port = list_port_info[0].device
elif len(list_port_info) > 1:
_port = list_port_info[1].device
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
elif args.Board == "trainer":
list_port_info = list(serial.tools.list_ports.grep("0403")) #Take the first one
if(len(list_port_info)==0):
@ -1161,19 +1158,19 @@ class KFlash:
raise Exception(err)
list_port_info.sort()
_port = list_port_info[0].device
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
else:
try:
list_port_info = next(serial.tools.list_ports.grep(VID_LIST_FOR_AUTO_LOOKUP)) #Take the first one within the list
_port = list_port_info.device
printf(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"COM Port Auto Detected, Selected ", _port, BASH_TIPS['DEFAULT'])
except StopIteration:
err = (ERROR_MSG,"No vaild COM Port found in Auto Detect, Check Your Connection or Specify One by"+BASH_TIPS['GREEN']+'`--port/-p`',BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
else:
_port = args.port
printf(INFO_MSG,"COM Port Selected Manually: ", _port, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"COM Port Selected Manually: ", _port, BASH_TIPS['DEFAULT'])
self.loader = MAIXLoader(port=_port, baudrate=115200)
file_format = ProgramFileFormat.FMT_BINARY
@ -1191,7 +1188,7 @@ class KFlash:
#if file_header.startswith(bytes([0x50, 0x4B])):
if file_header.startswith(b'\x50\x4B'):
if ".kfpkg" != os.path.splitext(args.firmware)[1]:
printf(INFO_MSG, 'Find a zip file, but not with ext .kfpkg:', args.firmware, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG, 'Find a zip file, but not with ext .kfpkg:', args.firmware, BASH_TIPS['DEFAULT'])
else:
file_format = ProgramFileFormat.FMT_KFPKG
@ -1199,14 +1196,14 @@ class KFlash:
if file_header.startswith(b'\x7f\x45\x4c\x46'):
file_format = ProgramFileFormat.FMT_ELF
if args.sram:
printf(INFO_MSG, 'Find an ELF file:', args.firmware, BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG, 'Find an ELF file:', args.firmware, BASH_TIPS['DEFAULT'])
else:
err = (ERROR_MSG, 'This is an ELF file and cannot be programmed to flash directly:', args.firmware, BASH_TIPS['DEFAULT'] , '\r\nPlease retry:', args.firmware + '.bin', BASH_TIPS['DEFAULT'])
err = tuple2str(err)
raise Exception(err)
# 1. Greeting.
printf(INFO_MSG,"Trying to Enter the ISP Mode...",BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Trying to Enter the ISP Mode...",BASH_TIPS['DEFAULT'])
retry_count = 0
@ -1219,7 +1216,7 @@ class KFlash:
raise Exception(err)
if args.Board == "dan" or args.Board == "bit" or args.Board == "trainer":
try:
printf('.', end='')
KFlash.log('.', end='')
self.loader.reset_to_isp_dan()
self.loader.greeting()
break
@ -1227,7 +1224,7 @@ class KFlash:
pass
elif args.Board == "kd233":
try:
printf('_', end='')
KFlash.log('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
break
@ -1235,7 +1232,7 @@ class KFlash:
pass
elif args.Board == "goE":
try:
printf('*', end='')
KFlash.log('*', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
break
@ -1243,7 +1240,7 @@ class KFlash:
pass
elif args.Board == "goD":
try:
printf('#', end='')
KFlash.log('#', end='')
self.loader.reset_to_isp_goD()
self.loader.greeting()
break
@ -1251,43 +1248,43 @@ class KFlash:
pass
else:
try:
printf('.', end='')
KFlash.log('.', end='')
self.loader.reset_to_isp_dan()
self.loader.greeting()
args.Board = "dan"
printf()
printf(INFO_MSG,"Automatically detected dan/bit/trainer",BASH_TIPS['DEFAULT'])
KFlash.log()
KFlash.log(INFO_MSG,"Automatically detected dan/bit/trainer",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
printf('_', end='')
KFlash.log('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
args.Board = "kd233"
printf()
printf(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
KFlash.log()
KFlash.log(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
printf('.', end='')
KFlash.log('.', end='')
self.loader.reset_to_isp_goD()
self.loader.greeting()
args.Board = "goD"
printf()
printf(INFO_MSG,"Automatically detected goD",BASH_TIPS['DEFAULT'])
KFlash.log()
KFlash.log(INFO_MSG,"Automatically detected goD",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
try:
# Magic, just repeat, don't remove, it may unstable, don't know why.
printf('_', end='')
KFlash.log('_', end='')
self.loader.reset_to_isp_kd233()
self.loader.greeting()
args.Board = "kd233"
printf()
printf(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
KFlash.log()
KFlash.log(INFO_MSG,"Automatically detected goE/kd233",BASH_TIPS['DEFAULT'])
break
except TimeoutError:
pass
@ -1296,8 +1293,8 @@ class KFlash:
# Dangerous, here are dinosaur infested!!!!!
ISP_RECEIVE_TIMEOUT = 3
printf()
printf(INFO_MSG,"Greeting Message Detected, Start Downloading ISP",BASH_TIPS['DEFAULT'])
KFlash.log()
KFlash.log(INFO_MSG,"Greeting Message Detected, Start Downloading ISP",BASH_TIPS['DEFAULT'])
if manually_set_the_board and (not args.Slow):
if (args.baudrate >= 1500000) or args.sram:
@ -1325,7 +1322,7 @@ class KFlash:
# Dangerous, here are dinosaur infested!!!!!
# Don't touch this code unless you know what you are doing
self.loader._port.baudrate = args.baudrate
printf(INFO_MSG,"Boot user code from SRAM", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Boot user code from SRAM", BASH_TIPS['DEFAULT'])
if(args.terminal == True):
open_terminal(False)
msg = "Burn SRAM OK"
@ -1335,7 +1332,7 @@ class KFlash:
# Don't touch this code unless you know what you are doing
self.loader._port.baudrate = 115200
printf(INFO_MSG,"Wait For 0.1 second for ISP to Boot", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Wait For 0.1 second for ISP to Boot", BASH_TIPS['DEFAULT'])
time.sleep(0.1)
@ -1343,13 +1340,13 @@ class KFlash:
if args.baudrate != 115200:
self.loader.change_baudrate(args.baudrate)
printf(INFO_MSG,"Baudrate changed, greeting with ISP again ... ", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Baudrate changed, greeting with ISP again ... ", BASH_TIPS['DEFAULT'])
self.loader.flash_greeting()
self.loader.init_flash(args.flash)
if file_format == ProgramFileFormat.FMT_KFPKG:
printf(INFO_MSG,"Extracting KFPKG ... ", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Extracting KFPKG ... ", BASH_TIPS['DEFAULT'])
firmware_bin.close()
with tempfile.TemporaryDirectory() as tmpdir:
try:
@ -1366,7 +1363,7 @@ class KFlash:
jsonFlashList = json.loads(sFlashList)
for lBinFiles in jsonFlashList['files']:
self.checkKillExit()
printf(INFO_MSG,"Writing",lBinFiles['bin'],"into","0x%08x"%int(lBinFiles['address'], 0),BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Writing",lBinFiles['bin'],"into","0x%08x"%int(lBinFiles['address'], 0),BASH_TIPS['DEFAULT'])
with open(os.path.join(tmpdir, lBinFiles["bin"]), "rb") as firmware_bin:
self.loader.flash_firmware(firmware_bin.read(), None, int(lBinFiles['address'], 0), lBinFiles['sha256Prefix'], filename=lBinFiles['bin'])
else:
@ -1389,9 +1386,9 @@ class KFlash:
elif args.Board == "goD":
self.loader.reset_to_boot_goD()
else:
printf(WARN_MSG,"Board unknown !! please press reset to boot!!")
KFlash.log(WARN_MSG,"Board unknown !! please press reset to boot!!")
printf(INFO_MSG,"Rebooting...", BASH_TIPS['DEFAULT'])
KFlash.log(INFO_MSG,"Rebooting...", BASH_TIPS['DEFAULT'])
self.loader._port.close()
if(args.terminal == True):
@ -1416,7 +1413,7 @@ def main():
except Exception as e:
if str(e) == "Burn SRAM OK":
sys.exit(0)
print(str(e))
kflash.printf(str(e))
sys.exit(1)
if __name__ == '__main__':