From c81ffdcb12a867d21a5356aa3ba7d443d8c642a4 Mon Sep 17 00:00:00 2001 From: Hendrik Groove Date: Mon, 8 Sep 2025 16:20:46 +0200 Subject: [PATCH] tools: add udp_lin_receiver.py for capturing and diffing UDP LIN stream - Records each datagram with timestamp to CSV - Prints live console summary - Optional --pid-diff mode parses textual lines like 'PID XX ' and only prints on data changes --- tools/udp_lin_receiver.py | 154 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 tools/udp_lin_receiver.py diff --git a/tools/udp_lin_receiver.py b/tools/udp_lin_receiver.py new file mode 100644 index 0000000..53f31b3 --- /dev/null +++ b/tools/udp_lin_receiver.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +""" +UDP receiver for LIN sniffer/master stream. + +Purpose + - Capture all UDP datagrams from the ESP and write a CSV for later diffing. + - Print a live summary to the console. + - Optionally parse lines that contain textual "PID XX" + hex bytes and emit + only changes per PID (useful when the stream is verbose). + +This is intentionally tolerant about payload format: it records raw hex and, if +the payload looks textual, the decoded text as well. + +Examples + # Basic recording on port 5555 + python udp_lin_receiver.py --port 5555 --out capture.csv + + # Only show changed payloads per PID (when textual lines include "PID ..") + python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff + + # Quiet mode (only CSV), capture for 5 minutes + python udp_lin_receiver.py --quiet --duration 300 --out capture.csv +""" +from __future__ import annotations + +import argparse +import csv +import datetime as dt +import os +import re +import socket +import sys +import time +from typing import Dict, Optional, Tuple + + +def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool: + if not b: + return True + printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b) + return printable / len(b) >= threshold + + +def to_hex(b: bytes) -> str: + return " ".join(f"{x:02X}" for x in b) + + +PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)") + + +def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]: + """Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...)) + Returns None if no clear match. + """ + m = PID_LINE_RE.search(text) + if not m: + return None + try: + pid = int(m.group(1), 16) + data_hex = m.group(2) + data = tuple(int(x, 16) for x in data_hex.split()) + return pid, data + except Exception: + return None + + +def now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds") + + +def run(): + ap = argparse.ArgumentParser(description="UDP LIN receiver") + ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)") + ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)") + ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path") + ap.add_argument("--append", action="store_true", help="Append to CSV if exists") + ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)") + ap.add_argument("--quiet", action="store_true", help="No console output, only CSV") + ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes") + ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)") + args = ap.parse_args() + + # Prepare CSV + out_exists = os.path.exists(args.out) + csv_mode = "a" if args.append else "w" + with open(args.out, csv_mode, newline="", encoding="utf-8") as f: + w = csv.writer(f) + if not out_exists or not args.append: + w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header + + # Socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((args.host, args.port)) + sock.settimeout(1.0) + + if not args.quiet: + print(f"Listening on {args.host}:{args.port} → {args.out}") + if args.pid_diff: + print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes") + + start = time.time() + last_by_pid: Dict[int, Tuple[int, ...]] = {} + + while True: + if args.duration and (time.time() - start) >= args.duration: + if not args.quiet: + print("Duration reached; exiting.") + break + try: + data, addr = sock.recvfrom(args.buffer) + except socket.timeout: + continue + + ts = now_iso() + src_ip, src_port = addr[0], addr[1] + size = len(data) + as_text = "" + is_text = False + try: + if is_mostly_text(data): + as_text = data.decode("utf-8", errors="replace").strip() + is_text = True + except Exception: + pass + + hexstr = to_hex(data) + w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr]) + + # Console output + if not args.quiet: + if args.pid_diff and is_text: + parsed = maybe_parse_pid_and_data(as_text) + if parsed: + pid, pdata = parsed + last = last_by_pid.get(pid) + if last != pdata: + last_by_pid[pid] = pdata + print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }") + continue + + # default console print + if is_text and as_text: + print(f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}") + else: + print(f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}") + + +if __name__ == "__main__": + try: + run() + except KeyboardInterrupt: + pass +