From a982fb7664fdbe5b1013c061d897038860c1068e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20=C3=98ye=20Amundsen?= Date: Thu, 9 May 2024 08:12:24 +0200 Subject: [PATCH] scripts: sdfw: add scripts for communicating with SDFW MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ref: NCSDK-27391 Signed-off-by: Håkon Øye Amundsen --- scripts/sdfw/conn.py | 82 ++++++++++++ scripts/sdfw/ctrlap.py | 139 +++++++++++++++++++++ scripts/sdfw/jlink_connector.py | 105 ++++++++++++++++ scripts/sdfw/main.py | 140 +++++++++++++++++++++ scripts/sdfw/requirements.txt | 1 + scripts/sdfw/sdfw_adac_cmd.py | 185 ++++++++++++++++++++++++++++ scripts/sdfw/sdfw_update_service.py | 120 ++++++++++++++++++ scripts/sdfw/ssf_ctrlap.py | 94 ++++++++++++++ 8 files changed, 866 insertions(+) create mode 100644 scripts/sdfw/conn.py create mode 100644 scripts/sdfw/ctrlap.py create mode 100644 scripts/sdfw/jlink_connector.py create mode 100644 scripts/sdfw/main.py create mode 100644 scripts/sdfw/requirements.txt create mode 100644 scripts/sdfw/sdfw_adac_cmd.py create mode 100644 scripts/sdfw/sdfw_update_service.py create mode 100644 scripts/sdfw/ssf_ctrlap.py diff --git a/scripts/sdfw/conn.py b/scripts/sdfw/conn.py new file mode 100644 index 000000000000..3eae314879d6 --- /dev/null +++ b/scripts/sdfw/conn.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2024 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +Module for communicating with the SDFW using the ADAC protocol over the CTRLAP interface +""" + +from __future__ import annotations + +import struct +from typing import List + +from sdfw_adac_cmd import AdacResponse, AdacRequest, AdacStatus +from ctrlap import Ctrlap + +BYTES_PER_WORD = 4 + + +class Adac: + """SDFW ADAC connector.""" + + def __init__( + self, + ctrlap: Ctrlap, + ) -> None: + """ + :param ctrlap: Used for performing ctrlap operations + """ + self.ctrlap = ctrlap + + def request(self, req: AdacRequest) -> AdacResponse: + """ + Issue an ADAC request and read the response. + + :param req: The ADAC request to execute. + :returns: ADAC response. + """ + self.ctrlap.wait_for_ready() + self._write_request(req) + + self.ctrlap.wait_for_ready() + rsp = self._read_response() + + return rsp + + def _write_request(self, req: AdacRequest) -> None: + """ + Write an ADAC request to CTRL-AP + + :param req: ADAC request. + """ + for word in _to_word_chunks(req.to_bytes()): + self.ctrlap.write(word) + + def _read_response(self) -> AdacResponse: + """ + Read a whole ADAC response over CTRL-AP + + :return: ADAC response. + """ + _reserved, status = struct.unpack(" List[bytes]: + if len(data) % BYTES_PER_WORD != 0: + raise ValueError( + f"data of length {len(data)} is not aligned to {BYTES_PER_WORD} bytes" + ) + + return [data[i: i + BYTES_PER_WORD] for i in range(0, len(data), BYTES_PER_WORD)] diff --git a/scripts/sdfw/ctrlap.py b/scripts/sdfw/ctrlap.py new file mode 100644 index 000000000000..bc5f1bf79191 --- /dev/null +++ b/scripts/sdfw/ctrlap.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +Module exposing CTRLAP functionality +""" + +from argparse import ArgumentParser +from enum import IntEnum +from time import sleep, time + +from jlink_connector import JLinkConnector + +ACCESS_PORT_CTRLAP = 4 + + +class ReadyStatus(IntEnum): + """Enum for READY status CTRLAP register""" + + READY = 0 + NOT_READY = 1 + + +class CtrlapAddresses(IntEnum): + """Enum for addresses used in the CTRLAP""" + + READY = 0x4 + TXDATA = 0x10 + RXDATA = 0x18 + RXSTATUS = 0x1C + TXSTATUS = 0x14 + + +class MailboxStatus(IntEnum): + """Enum for mailbox Rx/Tx status CTRLAP register""" + + NO_DATA_PENDING = 0 + DATA_PENDING = 1 + + +class Ctrlap: + """Class for performing CTRLAP operations""" + + def __init__(self, **kwargs) -> None: + """ + :param kwargs: dict with wait_time and timeout. + Use 'add_arguments(...)' function from this class to add required arguments to + the argument parser. + """ + self.connector = JLinkConnector(auto_connect=False, **kwargs) + self.wait_time = kwargs["wait_time"] + self.timeout = kwargs["timeout"] + + def wait_for_ready(self) -> None: + """Wait until the CTRLAP READY register is ready. + + :raises Exception: If waiting times out. + """ + + self._block_on_ctrlap_status(CtrlapAddresses.READY, ReadyStatus.NOT_READY) + + def read(self) -> bytes: + """ + Receive a word from CTRLAP by reading the RX register. Will block until there is + data pending. + + :return: The value of CTRLAP.RX + """ + + self._block_on_ctrlap_status( + CtrlapAddresses.RXSTATUS, MailboxStatus.NO_DATA_PENDING + ) + + word = self.connector.api.read_access_port_register( + ACCESS_PORT_CTRLAP, CtrlapAddresses.RXDATA + ) + + return word.to_bytes(4, "little") + + def write(self, word: bytes) -> None: + """ + Write a word to CTRLAP, block until the data has been read. + + :param word: Word to write + """ + + val = int.from_bytes(word, byteorder="little") + self.connector.api.write_access_port_register( + ACCESS_PORT_CTRLAP, CtrlapAddresses.TXDATA, val + ) + self._block_on_ctrlap_status( + CtrlapAddresses.TXSTATUS, MailboxStatus.DATA_PENDING + ) + + def _block_on_ctrlap_status(self, address: IntEnum, status: IntEnum) -> None: + """ + Block while waiting for another status + + :param address: Address of register to check status of + :param status: Block while the value of the register equals this value. + :raises RuntimeError: if operation times out. + """ + + start = time() + while ( + self.connector.api.read_access_port_register(ACCESS_PORT_CTRLAP, address) + ) == status: + if (time() - start) >= self.timeout: + raise RuntimeError("Timed out when waiting for CTRLAP status") + sleep(self.wait_time) + + @classmethod + def add_arguments(cls, parser: ArgumentParser) -> None: + """ + Append command line options needed by this class. + """ + + group = parser.add_argument_group( + title="CTRL-AP connection parameters", + description="Use these parameters for communicating with CTRL-AP", + ) + group.add_argument( + "--timeout", + type=float, + default=100, + help="Number of seconds to wait for a CTRL-AP register value before timing out", + ) + + group.add_argument( + "--wait-time", + type=float, + default=0.1, + help="Number of seconds to wait between reading the CTRL-AP registers while waiting for a given status", + ) + + JLinkConnector.add_arguments(parser) diff --git a/scripts/sdfw/jlink_connector.py b/scripts/sdfw/jlink_connector.py new file mode 100644 index 000000000000..86943d77703f --- /dev/null +++ b/scripts/sdfw/jlink_connector.py @@ -0,0 +1,105 @@ +# +# Copyright (c) 2023 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +Module exposing a JLink connection +""" + +from argparse import ArgumentParser + +from pynrfjprog import LowLevel +from pynrfjprog.Parameters import CoProcessor, DeviceFamily + + +class JLinkConnector: + """Connect and maintain a JLink connection""" + + def __init__(self, auto_connect: bool = True, **kwargs) -> None: + """ + :param auto_connect: automatically connect to JLink if this is set. + Otherwise, the `connect(...)` function must be used. + :param kwargs: dict with serial, hostname, speed and port + Use the `add_arguments(...)` function from this class to add those to + your CLI's argument list. + """ + + self.params = kwargs + self.device = DeviceFamily.NRF54H + self.api = LowLevel.API(self.device) + if auto_connect: + self.connect() + + def connect(self) -> None: + """ + Connect to the JLink + """ + if not self.api.is_open(): + self.api.open() + + if self.params["hostname"] and self.params["serial"]: + self.api.connect_to_emu_with_ip( + hostname=self.params["hostname"], + serial_number=self.params["serial"], + port=self.params["port"], + jlink_speed_khz=self.params["speed"], + ) + elif self.params["hostname"]: + self.api.connect_to_emu_with_ip( + hostname=self.params["hostname"], + port=self.params["port"], + jlink_speed_khz=self.params["speed"], + ) + elif self.params["serial"]: + self.api.connect_to_emu_with_snr( + serial_number=self.params["serial"], + jlink_speed_khz=self.params["speed"], + ) + else: + self.api.connect_to_emu_without_snr(jlink_speed_khz=self.params["speed"]) + + self.api.select_coprocessor( + LowLevel.Parameters.CoProcessor[self.params["coprocessor"].upper()] + ) + + def disconnect(self) -> None: + """ + Disconnect from JLink and close the API. + """ + self.api.disconnect_from_emu() + self.api.close() + + @staticmethod + def add_arguments(parser: ArgumentParser) -> None: + """ + Append command line options needed by this class. + """ + + group = parser.add_argument_group( + title="JLink connection parameters", + description="Use these parameters for connecting to JLink", + ) + group.add_argument( + "--speed", type=int, default=4000, help="J-Link emulator speed in kHz" + ) + + group.add_argument( + "--port", + type=int, + default=19020, + help="Port to use when connecting to J-Link emulator host", + ) + + group.add_argument("--hostname", type=str, help="J-Link emulator hostname IP") + + group.add_argument("--serial", type=int, help="J-Link emulator serial number") + + group.add_argument( + "--coprocessor", + type=str, + help="Coprocessor (AP) to connect to", + default=CoProcessor.CP_SECURE.name.lower(), + choices=[i.name.lower() for i in CoProcessor], + ) diff --git a/scripts/sdfw/main.py b/scripts/sdfw/main.py new file mode 100644 index 000000000000..75e0becd0498 --- /dev/null +++ b/scripts/sdfw/main.py @@ -0,0 +1,140 @@ +# +# Copyright (c) 2024 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +SDFW ADAC CLI. +""" + +from __future__ import annotations + +import argparse +import dataclasses +import inspect +import re +import typing +from typing import Any, Dict + +import sdfw_adac_cmd +from conn import Adac +from ctrlap import Ctrlap + + +def main() -> None: + args = parse_arguments() + do_adac_transaction(**vars(args)) + + +def parse_arguments() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Send SDFW ADAC commands over CTRL-AP using JLink", + allow_abbrev=False, + ) + + Ctrlap.add_arguments(parser) + + subparsers = parser.add_subparsers( + title="ADAC commands", description="Supported SDFW ADAC commands", required=True + ) + + for request_type in [ + sdfw_adac_cmd.Version, + sdfw_adac_cmd.MemCfg, + sdfw_adac_cmd.Revert, + sdfw_adac_cmd.Reset, + sdfw_adac_cmd.MemErase, + sdfw_adac_cmd.LcsGet, + sdfw_adac_cmd.LcsSet, + sdfw_adac_cmd.Ssf, + sdfw_adac_cmd.Purge, + ]: + add_request_subcommand(subparsers, request_type) + + return parser.parse_args() + + +def add_request_subcommand(subparsers: Any, request_type: type) -> None: + """Add a subcommand for doing an ADAC transaction with the given request type. + This relies on dataclass introspection/type hints and supports simple dataclasses without + nesting and fields with either int, bytes, bool or string types. + Each subcommand automatically sets 'request_type' in the parsed arguments to the given + request type class. + """ + + command_name = to_kebab_case(request_type.__name__) + command_help = inspect.getdoc(request_type) + + command_parser = subparsers.add_parser(command_name, help=command_help) + command_parser.set_defaults(request_type=request_type) + + type_hints = typing.get_type_hints(request_type) + + for field in dataclasses.fields(request_type): + arg_name = f"--{to_kebab_case(field.name)}" + default_value = ( + field.default + if field.default != dataclasses.MISSING + else field.default_factory + ) + is_required = default_value == dataclasses.MISSING + + field_type = type_hints[field.name] + type_kwargs: Dict[str, Any] = {} + + if issubclass(int, field_type): + type_kwargs["type"] = lambda x: int(x, 0) + elif issubclass(bytes, field_type): + type_kwargs["type"] = bytes.fromhex + elif issubclass(bool, field_type): + type_kwargs["action"] = "store_true" + is_required = False + elif not issubclass(str, field_type): + raise ValueError( + f"Unsupported field type for {request_type.__name__}.{field.name}: {field_type}" + ) + + if default_value != dataclasses.MISSING: + type_kwargs["default"] = default_value + + command_parser.add_argument( + arg_name, + dest=field.name, + required=is_required, + **type_kwargs, + ) + + +def do_adac_transaction(request_type: type, **kwargs: Any) -> None: + """Perform a single ADAC transaction using the given request type.""" + + request_fields = {f.name for f in dataclasses.fields(request_type)} + request_kwargs = {k: v for k, v in kwargs.items() if k in request_fields} + request = request_type(**request_kwargs) + + ctrlap = Ctrlap(**kwargs) + adac = Adac(ctrlap) + + adac.ctrlap.connector.connect() + + print(f"--> {request}") + + response = adac.request(request.to_request()) + + print(f"<-- {response}") + + adac.ctrlap.connector.disconnect() + + +def to_kebab_case(string: str) -> str: + """Convert the given string to kebab case (lowercase-with-hyphens).""" + + string = re.sub("(.)([A-Z][a-z]+)", r"\1-\2", string) + string = re.sub("([a-z0-9])([A-Z])", r"\1-\2", string) + string = string.replace("_", "-").lower() + return string + + +if __name__ == "__main__": + main() diff --git a/scripts/sdfw/requirements.txt b/scripts/sdfw/requirements.txt new file mode 100644 index 000000000000..d56a415c3a5c --- /dev/null +++ b/scripts/sdfw/requirements.txt @@ -0,0 +1 @@ +pynrfjprog diff --git a/scripts/sdfw/sdfw_adac_cmd.py b/scripts/sdfw/sdfw_adac_cmd.py new file mode 100644 index 000000000000..92bfc005e125 --- /dev/null +++ b/scripts/sdfw/sdfw_adac_cmd.py @@ -0,0 +1,185 @@ +# +# Copyright (c) 2024 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +SDFW ADAC command structures. +""" + +from __future__ import annotations + +import dataclasses as dc +import struct +from enum import IntEnum + + +class SdfwAdacCmd(IntEnum): + """ADAC command opcodes""" + + VERSION = 0xA300 + MEM_CFG = 0xA301 + REVERT = 0xA302 + RESET = 0xA303 + MEM_ERASE = 0xA304 + LCS_GET = 0xA305 + LCS_SET = 0xA306 + SSF = 0xA307 + PURGE = 0xA308 + + +@dc.dataclass +class AdacRequest: + """Generic ADAC request""" + + command: SdfwAdacCmd + data: bytes = b"" + + @property + def data_count(self) -> int: + return len(self.data) + + def to_bytes(self) -> bytes: + header = struct.pack(" int: + return len(self.data) + + +@dc.dataclass +class Version: + """Get version of vendor specific SDFW ADAC commands.""" + + type: int + + def to_request(self) -> AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.VERSION, + data=struct.pack(" AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.MEM_CFG, + data=struct.pack(" AdacRequest: + return AdacRequest(command=SdfwAdacCmd.REVERT) + + +@dc.dataclass +class Reset: + """Reset the whole system or a specific local domain.""" + + domain_id: int = 0 + mode: int = 0 + + def to_request(self) -> AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.RESET, + data=struct.pack(" AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.MEM_ERASE, + data=struct.pack(" AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.LCS_GET, data=struct.pack(" AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.LCS_SET, + data=struct.pack(" AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.SSF, + data=self.data, + ) + + +@dc.dataclass +class Purge: + """Purge a given local domain.""" + + domain_id: int + + def to_request(self) -> AdacRequest: + return AdacRequest( + command=SdfwAdacCmd.PURGE, + data=struct.pack(" argparse.Namespace: + """ + Parse command line options. + + :return: argparse arguments. + """ + + parser = argparse.ArgumentParser( + allow_abbrev=False, + description="sdfw_update_service invokes the SDFW update service over ADAC.", + ) + + Ctrlap.add_arguments(parser) + + for param in sdfw_update_service_cddl.value: + parser.add_argument( + f"--{param.label.replace('_','-')}", + type=lambda number_string: int(number_string, 0), + required=True, + help=f"Value to pass to {param}.", + ) + + parsed_args = parser.parse_args() + + return parsed_args + + +def create_update_req(**kwargs) -> bytes: + """ + Create an SDFW update service request + + :param blob_addr: + :return: The CBOR formatted request + """ + + write_req_raw = ( + kwargs["tbs_addr"], + kwargs["dl_max"], + kwargs["dl_addr_fw"], + kwargs["dl_addr_pk"], + kwargs["dl_addr_signature"], + ) + + write_req_cbor = cbor2.dumps(write_req_raw) + + sdfw_update_service_cddl.validate_str(write_req_cbor) + + # Verify that the request was populated as expected + decoded = sdfw_update_service_cddl.decode_str(write_req_cbor) + assert decoded.tbs_addr == kwargs["tbs_addr"] + assert decoded.dl_max == kwargs["dl_max"] + assert decoded.dl_addr_fw == kwargs["dl_addr_fw"] + assert decoded.dl_addr_pk == kwargs["dl_addr_pk"] + assert decoded.dl_addr_signature == kwargs["dl_addr_signature"] + + return write_req_cbor + + +def main() -> None: + args = get_arguments() + + cbor_request = create_update_req(**vars(args)) + + adac = Adac(Ctrlap(**vars(args))) + + adac.ctrlap.connector.connect() + + ssf = Ssf( + adac=adac, + service_id=SSF_SDFW_UPDATE_SERVICE_ID, + service_version=SSF_SDFW_UPDATE_SERVICE_VERSION, + ) + + update_retval = ssf.request(service_request=cbor_request) + + print( + f"sdfw_update_service: Response status code: {SdfwUpdateStatus(update_retval[0]).name}" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/sdfw/ssf_ctrlap.py b/scripts/sdfw/ssf_ctrlap.py new file mode 100644 index 000000000000..292e73a8635e --- /dev/null +++ b/scripts/sdfw/ssf_ctrlap.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2024 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +# + +""" +Module communicating with the SSF (Secure Domain Service Framework) +using the ADAC protocol over the CTRLAP interface +""" + +import io +from enum import IntEnum +from typing import Any + +import cbor2 + +import sdfw_adac_cmd +from conn import Adac + +ADAC_SSF_REMOTE_ID = 165 + + +class SsfError(IntEnum): + """Enum for SSF error messages""" + + SSF_SUCCESS = 0 + SSF_EPERM = 1 + SSF_EIO = 5 + SSF_ENXIO = 6 + SSF_EAGAIN = 11 + SSF_ENOMEM = 12 + SSF_EFAULT = 14 + SSF_EBUSY = 16 + SSF_EINVAL = 22 + SSF_ENODATA = 61 + SSF_EPROTO = 71 + SSF_EBADMSG = 77 + SSF_ENOBUFS = 105 + SSF_EADDRINUSE = 112 + SSF_ETIMEDOUT = 116 + SSF_EALREADY = 120 + SSF_EMSGSIZE = 122 + SSF_EPROTONOSUPPORT = 123 + SSF_ENOTSUP = 134 + + +class Ssf: + """Execute SSF service requests""" + + def __init__( + self, + adac: Adac, + service_id: int, + service_version: int, + ) -> None: + """ + :param adac: Used for performing ADAC operations + :param service_id: ID of service + :param service_version: Version of service. + """ + self.adac = adac + self.service_id = service_id + self.service_version = service_version + + def request(self, service_request: bytes) -> Any: + """ + Issue an SSF service request and read its response. + The generic SSF header is added by this function. + + :param service_request: Service specific CBOR serialized data to write. + """ + ssf_header = cbor2.dumps( + (ADAC_SSF_REMOTE_ID, self.service_id, self.service_version) + ) + full_request = ssf_header + service_request + + num_align_bytes = (4 - (len(full_request) % 4)) % 4 + full_request_aligned = full_request + bytes(num_align_bytes) + + req = sdfw_adac_cmd.Ssf(full_request_aligned).to_request() + + rsp = self.adac.request(req) + + if rsp.status != sdfw_adac_cmd.AdacStatus.ADAC_SUCCESS: + raise RuntimeError(f"Got ADAC failure: {rsp.status.name}") + + with io.BytesIO(rsp.data) as data_io: + decoder = cbor2.CBORDecoder(data_io) + ssf_response = decoder.decode()[0] + if ssf_response != SsfError.SSF_SUCCESS: + raise RuntimeError(f"SSF operation failed: {SsfError(ssf_response)}") + + return decoder.decode()