Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CANFD for ISOTPScan #4574

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 72 additions & 37 deletions scapy/contrib/isotp/isotp_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,15 @@
# See https://scapy.net/ for more information
# Copyright (C) Nils Weiss <[email protected]>
# Copyright (C) Alexander Schroeder <[email protected]>
import itertools
import json

# scapy.contrib.description = ISO-TP (ISO 15765-2) Scanner Utility
# scapy.contrib.status = library

import itertools
import json
import logging
import time

from threading import Event

from scapy.packet import Packet
from scapy.compat import orb
from scapy.layers.can import CAN
from scapy.supersocket import SuperSocket
from scapy.contrib.cansocket import PYTHON_CAN
from scapy.contrib.isotp.isotp_packet import ISOTPHeader, ISOTPHeaderEA, \
ISOTP_FF, ISOTP

# Typing imports
from typing import (
Any,
Expand All @@ -28,9 +20,18 @@
List,
Optional,
Tuple,
Union,
Union, Type,
)

from scapy.compat import orb
from scapy.contrib.cansocket import PYTHON_CAN
from scapy.contrib.isotp import ISOTPHeader_FD
from scapy.contrib.isotp.isotp_packet import ISOTPHeader, ISOTPHeaderEA, \
ISOTP_FF, ISOTP, ISOTPHeaderEA_FD
from scapy.layers.can import CAN, CANFD
from scapy.packet import Packet
from scapy.supersocket import SuperSocket

log_isotp = logging.getLogger("scapy.contrib.isotp")


Expand All @@ -55,22 +56,29 @@ def send_multiple_ext(sock, ext_id, packet, number_of_packets):
sock.send(packet)


def get_isotp_packet(identifier=0x0, extended=False, extended_can_id=False):
# type: (int, bool, bool) -> Packet
def get_isotp_packet(identifier=0x0, extended=False, extended_can_id=False, fd=False):
# type: (int, bool, bool, bool) -> Packet
"""Craft ISO-TP packet

:param identifier: identifier of crafted packet
:param extended: boolean if packet uses extended address
:param extended_can_id: boolean if CAN should use extended Ids
:param fd: boolean if CANFD packets should be used
:return: Crafted Packet
"""

if extended:
pkt = ISOTPHeaderEA() / ISOTP_FF() # type: Packet
if fd:
pkt = ISOTPHeaderEA_FD() / ISOTP_FF() # type: Packet
else:
pkt = ISOTPHeaderEA() / ISOTP_FF()
pkt.extended_address = 0
pkt.data = b'\x00\x00\x00\x00\x00'
else:
pkt = ISOTPHeader() / ISOTP_FF()
if fd:
pkt = ISOTPHeader_FD() / ISOTP_FF()
else:
pkt = ISOTPHeader() / ISOTP_FF()
pkt.data = b'\x00\x00\x00\x00\x00\x00'
if extended_can_id:
pkt.flags = "extended"
Expand Down Expand Up @@ -170,7 +178,8 @@ def scan(sock, # type: SuperSocket
sniff_time=0.1, # type: float
extended_can_id=False, # type: bool
verify_results=True, # type: bool
stop_event=None # type: Optional[Event]
stop_event=None, # type: Optional[Event]
fd=False # type: bool
): # type: (...) -> Dict[int, Tuple[Packet, int]]
"""Scan and return dictionary of detections

Expand All @@ -187,6 +196,7 @@ def scan(sock, # type: SuperSocket
:param verify_results: Verify scan results. This will cause a second scan
of all possible candidates for ISOTP Sockets
:param stop_event: Event object to asynchronously stop the scan
:param fd: Use CANFD packets for scan
:return: Dictionary with all found packets
"""
return_values = dict() # type: Dict[int, Tuple[Packet, int]]
Expand All @@ -195,7 +205,7 @@ def scan(sock, # type: SuperSocket
break
if noise_ids and value in noise_ids:
continue
sock.send(get_isotp_packet(value, False, extended_can_id))
sock.send(get_isotp_packet(value, False, extended_can_id, fd))
sock.sniff(prn=lambda pkt: get_isotp_fc(value, return_values,
noise_ids, False, pkt),
timeout=sniff_time, store=False)
Expand All @@ -210,7 +220,7 @@ def scan(sock, # type: SuperSocket
for value in retest_ids:
if stop_event is not None and stop_event.is_set():
break
sock.send(get_isotp_packet(value, False, extended_can_id))
sock.send(get_isotp_packet(value, False, extended_can_id, fd))
sock.sniff(prn=lambda pkt: get_isotp_fc(value, cleaned_ret_val,
noise_ids, False, pkt),
timeout=sniff_time * 10, store=False)
Expand All @@ -225,7 +235,8 @@ def scan_extended(sock, # type: SuperSocket
noise_ids=None, # type: Optional[List[int]]
sniff_time=0.1, # type: float
extended_can_id=False, # type: bool
stop_event=None # type: Optional[Event]
stop_event=None, # type: Optional[Event]
fd=False # type: bool
): # type: (...) -> Dict[int, Tuple[Packet, int]]
"""Scan with ISOTP extended addresses and return dictionary of detections

Expand All @@ -243,6 +254,7 @@ def scan_extended(sock, # type: SuperSocket
after sending a first frame
:param extended_can_id: Send extended can frames
:param stop_event: Event object to asynchronously stop the scan
:param fd: Use CANFD packets for scan
:return: Dictionary with all found packets
"""
return_values = dict() # type: Dict[int, Tuple[Packet, int]]
Expand All @@ -254,7 +266,7 @@ def scan_extended(sock, # type: SuperSocket
continue

pkt = get_isotp_packet(
value, extended=True, extended_can_id=extended_can_id)
value, extended=True, extended_can_id=extended_can_id, fd=fd)
id_list = [] # type: List[int]
for ext_isotp_id in range(r[0], r[-1], scan_block_size):
if stop_event is not None and stop_event.is_set():
Expand Down Expand Up @@ -298,7 +310,8 @@ def isotp_scan(sock, # type: SuperSocket
extended_can_id=False, # type: bool
verify_results=True, # type: bool
verbose=False, # type: bool
stop_event=None # type: Optional[Event]
stop_event=None, # type: Optional[Event]
fd=False # type: bool
):
# type: (...) -> Union[str, List[SuperSocket]]
"""Scan for ISOTP Sockets on a bus and return findings
Expand Down Expand Up @@ -329,6 +342,7 @@ def isotp_scan(sock, # type: SuperSocket
of all possible candidates for ISOTP Sockets
:param verbose: displays information during scan
:param stop_event: Event object to asynchronously stop the scan
:param fd: Create CANFD frames
:return:
"""
if verbose:
Expand All @@ -337,9 +351,13 @@ def isotp_scan(sock, # type: SuperSocket
log_isotp.info("Filtering background noise...")

# Send dummy packet. In most cases, this triggers activity on the bus.
if fd:
dummy_pkt_cls = CANFD # type: Union[Type[CAN], Type[CANFD]]
else:
dummy_pkt_cls = CAN

dummy_pkt = CAN(identifier=0x123,
data=b'\xaa\xbb\xcc\xdd\xee\xff\xaa\xbb')
dummy_pkt = dummy_pkt_cls(identifier=0x123,
data=b'\xaa\xbb\xcc\xdd\xee\xff\xaa\xbb')

background_pkts = sock.sniff(
timeout=noise_listen_time,
Expand All @@ -353,14 +371,16 @@ def isotp_scan(sock, # type: SuperSocket
noise_ids=noise_ids,
sniff_time=sniff_time,
extended_can_id=extended_can_id,
stop_event=stop_event)
stop_event=stop_event,
fd=fd)
else:
found_packets = scan(sock, scan_range,
noise_ids=noise_ids,
sniff_time=sniff_time,
extended_can_id=extended_can_id,
verify_results=verify_results,
stop_event=stop_event)
stop_event=stop_event,
fd=fd)

filter_periodic_packets(found_packets)

Expand All @@ -379,14 +399,15 @@ def isotp_scan(sock, # type: SuperSocket
extended_addressing)


def generate_text_output(found_packets, extended_addressing=False):
# type: (Dict[int, Tuple[Packet, int]], bool) -> str
def generate_text_output(found_packets, extended_addressing=False, fd=False):
# type: (Dict[int, Tuple[Packet, int]], bool, bool) -> str
"""Generate a human readable output from the result of the `scan` or the
`scan_extended` function.

:param found_packets: result of the `scan` or `scan_extended` function
:param extended_addressing: print results from a scan with
ISOTP extended addressing
:param fd: set CANFD flag in output
:return: human readable scan results
"""
if not found_packets:
Expand Down Expand Up @@ -420,13 +441,16 @@ def generate_text_output(found_packets, extended_addressing=False):
else:
text += "\nNo Padding"

if fd:
text += "\nCANFD enabled"

text += "\n"
return text


def generate_code_output(found_packets, can_interface="iface",
extended_addressing=False):
# type: (Dict[int, Tuple[Packet, int]], Optional[str], bool) -> str
extended_addressing=False, fd=False):
# type: (Dict[int, Tuple[Packet, int]], Optional[str], bool, bool) -> str
"""Generate a copy&past-able output from the result of the `scan` or
the `scan_extended` function.

Expand All @@ -435,6 +459,7 @@ def generate_code_output(found_packets, can_interface="iface",
used for the creation of the output.
:param extended_addressing: print results from a scan with ISOTP
extended addressing
:param fd: set CANFD flag in output
:return: Python-code as string to generate all found sockets
"""
result = ""
Expand All @@ -452,26 +477,29 @@ def generate_code_output(found_packets, can_interface="iface",
send_ext = pack - (send_id * 256)
ext_id = orb(found_packets[pack][0].data[0])
result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, " \
"ext_address=0x%x, rx_ext_address=0x%x, " \
"ext_address=0x%x, rx_ext_address=0x%x, fd=%s, " \
"basecls=ISOTP)\n" % \
(can_interface, send_id,
int(found_packets[pack][0].identifier),
found_packets[pack][0].length == 8,
send_ext,
ext_id)
ext_id,
fd)

else:
result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, " \
result += "ISOTPSocket(%s, tx_id=0x%x, rx_id=0x%x, padding=%s, fd=%s, " \
"basecls=ISOTP)\n" % \
(can_interface, pack,
int(found_packets[pack][0].identifier),
found_packets[pack][0].length == 8)
found_packets[pack][0].length == 8,
fd)
return header + result


def generate_json_output(found_packets, # type: Dict[int, Tuple[Packet, int]]
can_interface="iface", # type: Optional[str]
extended_addressing=False # type: bool
extended_addressing=False, # type: bool
fd=False # type: bool
):
# type: (...) -> str
"""Generate a list of ISOTPSocket objects from the result of the `scan` or
Expand All @@ -482,6 +510,7 @@ def generate_json_output(found_packets, # type: Dict[int, Tuple[Packet, int]]
used for the creation of the output.
:param extended_addressing: print results from a scan with ISOTP
extended addressing
:param fd: set CANFD flag in output
:return: A list of all found ISOTPSockets
"""
socket_list = [] # type: List[Dict[str, Any]]
Expand All @@ -501,20 +530,23 @@ def generate_json_output(found_packets, # type: Dict[int, Tuple[Packet, int]]
"rx_id": dest_id,
"rx_ext_address": dest_ext,
"padding": pad,
"fd": fd,
"basecls": ISOTP.__name__})
else:
source_id = pack
socket_list.append({"iface": can_interface,
"tx_id": source_id,
"rx_id": dest_id,
"padding": pad,
"fd": fd,
"basecls": ISOTP.__name__})
return json.dumps(socket_list)


def generate_isotp_list(found_packets, # type: Dict[int, Tuple[Packet, int]]
can_interface, # type: Union[SuperSocket, str]
extended_addressing=False # type: bool
extended_addressing=False, # type: bool
fd=False # type: bool
):
# type: (...) -> List[SuperSocket]
"""Generate a list of ISOTPSocket objects from the result of the `scan` or
Expand All @@ -525,6 +557,7 @@ def generate_isotp_list(found_packets, # type: Dict[int, Tuple[Packet, int]]
used for the creation of the output.
:param extended_addressing: print results from a scan with ISOTP
extended addressing
:param fd: set CANFD flag in output
:return: A list of all found ISOTPSockets
"""
from scapy.contrib.isotp import ISOTPSocket
Expand All @@ -545,10 +578,12 @@ def generate_isotp_list(found_packets, # type: Dict[int, Tuple[Packet, int]]
rx_id=dest_id,
rx_ext_address=dest_ext,
padding=pad,
fd=fd,
basecls=ISOTP))
else:
source_id = pack
socket_list.append(ISOTPSocket(can_interface, tx_id=source_id,
rx_id=dest_id, padding=pad,
fd=fd,
basecls=ISOTP))
return socket_list
20 changes: 10 additions & 10 deletions test/contrib/isotpscan.uts
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock
verbose=False)

s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, " \
"padding=False, basecls=ISOTP)\n"
"padding=False, fd=False, basecls=ISOTP)\n"
s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, " \
"padding=False, basecls=ISOTP)\n"
"padding=False, fd=False, basecls=ISOTP)\n"
assert s1 in result
assert s2 in result

Expand All @@ -235,9 +235,9 @@ with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock
verbose=False)

s1 = "\"iface\": \"can0\", \"tx_id\": 1538, \"rx_id\": 1794, " \
"\"padding\": false, \"basecls\": \"ISOTP\""
"\"padding\": false, \"fd\": false, \"basecls\": \"ISOTP\""
s2 = "\"iface\": \"can0\", \"tx_id\": 1539, \"rx_id\": 1795, " \
"\"padding\": false, \"basecls\": \"ISOTP\""
"\"padding\": false, \"fd\": false, \"basecls\": \"ISOTP\""
print(result)
assert s1 in result
assert s2 in result
Expand All @@ -263,9 +263,9 @@ with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602), ISOTPSoftSocket(sock
verbose=False)

s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, " \
"padding=False, basecls=ISOTP)\n"
"padding=False, fd=False, basecls=ISOTP)\n"
s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, " \
"padding=False, basecls=ISOTP)\n"
"padding=False, fd=False, basecls=ISOTP)\n"
assert s1 not in result
assert s2 in result

Expand All @@ -292,9 +292,9 @@ with ISOTPSoftSocket(sock_recv1, tx_id=0x702, rx_id=0x602, ext_address=0x11, rx_
verbose=False)

s1 = "ISOTPSocket(can0, tx_id=0x602, rx_id=0x702, padding=False, " \
"ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)"
"ext_address=0x22, rx_ext_address=0x11, fd=False, basecls=ISOTP)"
s2 = "ISOTPSocket(can0, tx_id=0x603, rx_id=0x703, padding=False, " \
"ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)"
"ext_address=0x22, rx_ext_address=0x11, fd=False, basecls=ISOTP)"
assert s1 in result
assert s2 in result

Expand All @@ -321,9 +321,9 @@ with ISOTPSoftSocket(sock_recv1, tx_id=0x1ffff702, rx_id=0x1ffff602, ext_address
verbose=False)

s1 = "ISOTPSocket(can0, tx_id=0x1ffff602, rx_id=0x1ffff702, padding=False, " \
"ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)"
"ext_address=0x22, rx_ext_address=0x11, fd=False, basecls=ISOTP)"
s2 = "ISOTPSocket(can0, tx_id=0x1ffff603, rx_id=0x1ffff703, padding=False, " \
"ext_address=0x22, rx_ext_address=0x11, basecls=ISOTP)"
"ext_address=0x22, rx_ext_address=0x11, fd=False, basecls=ISOTP)"
print(result)
assert s1 in result
assert s2 in result
Expand Down
Loading