[dartuino][bootloader] Get Bootloader USB working.
This commit is contained in:
314
tools/moot/mtldr.py
Normal file
314
tools/moot/mtldr.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""
|
||||
Copyright (c) 2016 Gurjant Kalsi <me@gurjantkalsi.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files
|
||||
(the "Software"), to deal in the Software without restriction,
|
||||
including without limitation the rights to use, copy, modify, merge,
|
||||
publish, distribute, sublicense, and/or sell copies of the Software,
|
||||
and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
"""
|
||||
import argparse
|
||||
import binascii
|
||||
import logging
|
||||
import struct
|
||||
import time
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
|
||||
class Command:
|
||||
flash = 0x01
|
||||
boot = 0x02
|
||||
devinfo = 0x03
|
||||
|
||||
|
||||
class DataPhaseType:
|
||||
none = 0
|
||||
host_to_device = 1
|
||||
device_to_host = 2
|
||||
|
||||
|
||||
class Retcode:
|
||||
# Normal operation
|
||||
no_error = (0x00)
|
||||
xmit_ready = (0x01)
|
||||
recv_ready = (0x02)
|
||||
|
||||
# Malformed reqeust
|
||||
bad_data_len = (0xAAA0)
|
||||
bad_magic = (0xAAA1)
|
||||
unknown_command = (0xAAA2)
|
||||
|
||||
# Device side system error.
|
||||
sys_image_too_big = (0xFFF1)
|
||||
err_open_sys_flash = (0xFFF2)
|
||||
err_erase_sys_flash = (0xFFF3)
|
||||
err_write_sys_flash = (0xFFF4)
|
||||
cant_find_buildsig = (0xFFF5)
|
||||
|
||||
|
||||
class CommandParam:
|
||||
def __init__(self, data_phase_type):
|
||||
self.data_phase_type = data_phase_type
|
||||
|
||||
VENDOR_ID = 0x9999
|
||||
PRODUCT_ID = 0x9999
|
||||
CLASS_VENDOR_SPECIFIC = 0xFF
|
||||
SUBCLASS_MTLDR_DEBUG = 0x01
|
||||
|
||||
# create logger
|
||||
logger = logging.getLogger('mtldr')
|
||||
logger.setLevel(logging.WARN)
|
||||
|
||||
|
||||
class FindByDeviceClass(object):
|
||||
# Callable object that selects a USB device by a Sub/Device class pair
|
||||
|
||||
def __init__(self, device, subdevice):
|
||||
self._device_class = device
|
||||
self._subdevice_class = subdevice
|
||||
|
||||
def __call__(self, device):
|
||||
for cfg in device:
|
||||
intf = usb.util.find_descriptor(
|
||||
cfg, bInterfaceClass=self._device_class,
|
||||
bInterfaceSubClass=self._subdevice_class)
|
||||
if intf:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class CommandDispatcher:
|
||||
header_struct = struct.Struct("< i i i")
|
||||
header_struct_len = 12 # bytes
|
||||
|
||||
response_struct = struct.Struct("< i i i")
|
||||
response_struct_len = 12 # bytes
|
||||
|
||||
cmd_magic = 0x4d4f4f54
|
||||
resp_magic = 0x52455350
|
||||
|
||||
data_phase_timeout = 10000 # ms
|
||||
|
||||
CommandParams = {
|
||||
Command.flash: CommandParam(DataPhaseType.host_to_device),
|
||||
Command.boot: CommandParam(DataPhaseType.none),
|
||||
Command.devinfo: CommandParam(DataPhaseType.device_to_host),
|
||||
}
|
||||
|
||||
def __init__(self, ep_in, ep_out):
|
||||
self._ep_in = ep_in
|
||||
self._ep_out = ep_out
|
||||
|
||||
def __read_response(self, timeout=None):
|
||||
"""
|
||||
reads a standard response from the USB device.
|
||||
"""
|
||||
if timeout:
|
||||
resp = self._ep_in.read(CommandDispatcher.response_struct_len, timeout=timeout).tostring()
|
||||
else:
|
||||
resp = self._ep_in.read(CommandDispatcher.response_struct_len).tostring()
|
||||
|
||||
logger.debug(
|
||||
("Read %d bytes: " % CommandDispatcher.response_struct_len) +
|
||||
str(binascii.hexlify(resp))
|
||||
)
|
||||
|
||||
resp = CommandDispatcher.response_struct.unpack(resp)
|
||||
if resp[0] != CommandDispatcher.resp_magic:
|
||||
raise("Device responded with an unexpected magic value.")
|
||||
|
||||
logger.debug(
|
||||
("Read Response - retcode = %d, nbytes = %d" % (resp[1], resp[2]))
|
||||
)
|
||||
|
||||
return (resp[1], int(resp[2]))
|
||||
|
||||
def __dispatch_no_data(self, command):
|
||||
"""
|
||||
Dispatches a command that has no data phase.
|
||||
"""
|
||||
logger.debug("Write %d bytes, command = %d" % (0, command))
|
||||
command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, 0)
|
||||
self._ep_out.write(command)
|
||||
|
||||
retcode, datalen = self.__read_response()
|
||||
assert datalen == 0 # A command with no data can't have a datalen
|
||||
|
||||
return (retcode, list())
|
||||
|
||||
def __dispatch_device_to_host(self, command):
|
||||
"""
|
||||
Dispatches a command that has a device to host data phase.
|
||||
"""
|
||||
logger.debug("Write %d bytes, command = %d" % (0, command))
|
||||
command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, 0)
|
||||
self._ep_out.write(command)
|
||||
|
||||
retcode, datalen = self.__read_response()
|
||||
if retcode != Retcode.xmit_ready:
|
||||
return (retcode, list())
|
||||
|
||||
logger.debug("Read %d bytes, retcode = %d" % (int(datalen), retcode))
|
||||
resp = self._ep_in.read(int(datalen), timeout=CommandDispatcher.data_phase_timeout)
|
||||
|
||||
retcode, datalen = self.__read_response()
|
||||
|
||||
return (retcode, resp)
|
||||
|
||||
def __dispatch_host_to_device(self, command, data):
|
||||
"""
|
||||
Dispatches a command that has a host to device data phase.
|
||||
"""
|
||||
logger.debug("Write %d bytes, command = %d" % (len(data), command))
|
||||
# Tell the device that we're about to send it data. Also mention how
|
||||
# much data we're about to send.
|
||||
command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, len(data))
|
||||
self._ep_out.write(command)
|
||||
|
||||
# The device will signal back to us that it's ready to read data.
|
||||
retcode, datalen = self.__read_response(CommandDispatcher.data_phase_timeout)
|
||||
assert datalen == 0
|
||||
|
||||
if retcode != Retcode.recv_ready:
|
||||
# The device experienced an error and is not ready to receive data.
|
||||
return (retcode, list())
|
||||
|
||||
# Write the data back to the device.
|
||||
self._ep_out.write(data, timeout=CommandDispatcher.data_phase_timeout)
|
||||
|
||||
# The device will reply to us to let us know whether or not it received
|
||||
# our data correctly.
|
||||
retcode, datalen = self.__read_response()
|
||||
return (retcode, list())
|
||||
|
||||
def dispatch(self, command, data=None):
|
||||
"""
|
||||
Dispatches a command to the connected USB device.
|
||||
|
||||
A command is composed of a command phase followed by an optional data
|
||||
phase. If the data parameter is specified, dispatch(...) will also
|
||||
attempt to send data to the device.
|
||||
|
||||
Returns a 2-Tuple as follows: (command result, optional data). If the
|
||||
device returned data during the data phase, optional data will contain
|
||||
that data, otherwise it will be None.
|
||||
"""
|
||||
# Make sure the command actually exists.
|
||||
params = CommandDispatcher.CommandParams.get(command)
|
||||
if not params:
|
||||
raise("Command " + str(command) + " does not exist.")
|
||||
|
||||
if params.data_phase_type == DataPhaseType.none:
|
||||
result = self.__dispatch_no_data(command)
|
||||
elif params.data_phase_type == DataPhaseType.host_to_device:
|
||||
result = self.__dispatch_host_to_device(command, data)
|
||||
elif params.data_phase_type == DataPhaseType.device_to_host:
|
||||
result = self.__dispatch_device_to_host(command)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def cmd_devinfo(dispatcher, args):
|
||||
retcode, data = dispatcher.dispatch(Command.devinfo)
|
||||
|
||||
if retcode != Retcode.no_error:
|
||||
print ("Error %d while reading devinfo" % retcode)
|
||||
else:
|
||||
print data.tostring()
|
||||
|
||||
|
||||
def cmd_flash(dispatcher, args):
|
||||
with open(args.bin, 'rb') as file:
|
||||
binary = file.read()
|
||||
|
||||
retcode, data = dispatcher.dispatch(Command.flash, binary)
|
||||
if retcode != Retcode.no_error:
|
||||
print ("Error %d while flashing device" % retcode)
|
||||
|
||||
|
||||
|
||||
def cmd_boot(dispatcher, args):
|
||||
retcode, data = dispatcher.dispatch(Command.boot)
|
||||
if retcode != Retcode.no_error:
|
||||
print ("Error %d while booting device" % retcode)
|
||||
|
||||
|
||||
def main():
|
||||
# Setup the Logger
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.DEBUG)
|
||||
formatter = logging.Formatter(
|
||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
ch.setFormatter(formatter)
|
||||
logger.addHandler(ch)
|
||||
|
||||
# Setup the argument parser
|
||||
parser = argparse.ArgumentParser(prog='PROG')
|
||||
subparsers = parser.add_subparsers(help='sub-command help')
|
||||
|
||||
devinfo_parser = subparsers.add_parser('devinfo', help='a help')
|
||||
devinfo_parser.set_defaults(func=cmd_devinfo)
|
||||
|
||||
flash_parser = subparsers.add_parser('flash', help='b help')
|
||||
flash_parser.add_argument('bin', help="Path to the LK Binary to flash")
|
||||
flash_parser.set_defaults(func=cmd_flash)
|
||||
|
||||
boot_parser = subparsers.add_parser('boot', help='b help')
|
||||
boot_parser.set_defaults(func=cmd_boot)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
logger.info("Waiting for device (vid=0x%04x, pid=0x%04x, "
|
||||
"class=0x%02x, subclass=0x%02x)" %
|
||||
(VENDOR_ID, PRODUCT_ID, CLASS_VENDOR_SPECIFIC,
|
||||
SUBCLASS_MTLDR_DEBUG))
|
||||
while True:
|
||||
dev = usb.core.find(
|
||||
idVendor=VENDOR_ID,
|
||||
idProduct=PRODUCT_ID,
|
||||
custom_match=FindByDeviceClass(CLASS_VENDOR_SPECIFIC,
|
||||
SUBCLASS_MTLDR_DEBUG))
|
||||
if dev:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
logger.info("Found USB Device!")
|
||||
dev.set_configuration()
|
||||
|
||||
cfg = dev.get_active_configuration()
|
||||
intf = usb.util.find_descriptor(
|
||||
cfg, bInterfaceClass=CLASS_VENDOR_SPECIFIC,
|
||||
bInterfaceSubClass=SUBCLASS_MTLDR_DEBUG)
|
||||
|
||||
ep_out = usb.util.find_descriptor(
|
||||
intf,
|
||||
custom_match=lambda e:
|
||||
usb.util.endpoint_direction(e.bEndpointAddress) ==
|
||||
usb.util.ENDPOINT_OUT)
|
||||
ep_in = usb.util.find_descriptor(
|
||||
intf,
|
||||
custom_match=lambda e:
|
||||
usb.util.endpoint_direction(e.bEndpointAddress) ==
|
||||
usb.util.ENDPOINT_IN)
|
||||
|
||||
dispatcher = CommandDispatcher(ep_in, ep_out)
|
||||
args.func(dispatcher, args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user