tools: add udp_lin_receiver.py for capturing and diffing UDP LIN stream
- Records each datagram with timestamp to CSV - Prints live console summary - Optional --pid-diff mode parses textual lines like 'PID XX <hex>' and only prints on data changes
This commit is contained in:
parent
4298e675a5
commit
c81ffdcb12
154
tools/udp_lin_receiver.py
Normal file
154
tools/udp_lin_receiver.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
UDP receiver for LIN sniffer/master stream.
|
||||||
|
|
||||||
|
Purpose
|
||||||
|
- Capture all UDP datagrams from the ESP and write a CSV for later diffing.
|
||||||
|
- Print a live summary to the console.
|
||||||
|
- Optionally parse lines that contain textual "PID XX" + hex bytes and emit
|
||||||
|
only changes per PID (useful when the stream is verbose).
|
||||||
|
|
||||||
|
This is intentionally tolerant about payload format: it records raw hex and, if
|
||||||
|
the payload looks textual, the decoded text as well.
|
||||||
|
|
||||||
|
Examples
|
||||||
|
# Basic recording on port 5555
|
||||||
|
python udp_lin_receiver.py --port 5555 --out capture.csv
|
||||||
|
|
||||||
|
# Only show changed payloads per PID (when textual lines include "PID ..")
|
||||||
|
python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff
|
||||||
|
|
||||||
|
# Quiet mode (only CSV), capture for 5 minutes
|
||||||
|
python udp_lin_receiver.py --quiet --duration 300 --out capture.csv
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import csv
|
||||||
|
import datetime as dt
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool:
|
||||||
|
if not b:
|
||||||
|
return True
|
||||||
|
printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b)
|
||||||
|
return printable / len(b) >= threshold
|
||||||
|
|
||||||
|
|
||||||
|
def to_hex(b: bytes) -> str:
|
||||||
|
return " ".join(f"{x:02X}" for x in b)
|
||||||
|
|
||||||
|
|
||||||
|
PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)")
|
||||||
|
|
||||||
|
|
||||||
|
def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]:
|
||||||
|
"""Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...))
|
||||||
|
Returns None if no clear match.
|
||||||
|
"""
|
||||||
|
m = PID_LINE_RE.search(text)
|
||||||
|
if not m:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
pid = int(m.group(1), 16)
|
||||||
|
data_hex = m.group(2)
|
||||||
|
data = tuple(int(x, 16) for x in data_hex.split())
|
||||||
|
return pid, data
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def now_iso() -> str:
|
||||||
|
return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds")
|
||||||
|
|
||||||
|
|
||||||
|
def run():
|
||||||
|
ap = argparse.ArgumentParser(description="UDP LIN receiver")
|
||||||
|
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)")
|
||||||
|
ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)")
|
||||||
|
ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path")
|
||||||
|
ap.add_argument("--append", action="store_true", help="Append to CSV if exists")
|
||||||
|
ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)")
|
||||||
|
ap.add_argument("--quiet", action="store_true", help="No console output, only CSV")
|
||||||
|
ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes")
|
||||||
|
ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
# Prepare CSV
|
||||||
|
out_exists = os.path.exists(args.out)
|
||||||
|
csv_mode = "a" if args.append else "w"
|
||||||
|
with open(args.out, csv_mode, newline="", encoding="utf-8") as f:
|
||||||
|
w = csv.writer(f)
|
||||||
|
if not out_exists or not args.append:
|
||||||
|
w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header
|
||||||
|
|
||||||
|
# Socket
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
sock.bind((args.host, args.port))
|
||||||
|
sock.settimeout(1.0)
|
||||||
|
|
||||||
|
if not args.quiet:
|
||||||
|
print(f"Listening on {args.host}:{args.port} → {args.out}")
|
||||||
|
if args.pid_diff:
|
||||||
|
print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes")
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
last_by_pid: Dict[int, Tuple[int, ...]] = {}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if args.duration and (time.time() - start) >= args.duration:
|
||||||
|
if not args.quiet:
|
||||||
|
print("Duration reached; exiting.")
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
data, addr = sock.recvfrom(args.buffer)
|
||||||
|
except socket.timeout:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ts = now_iso()
|
||||||
|
src_ip, src_port = addr[0], addr[1]
|
||||||
|
size = len(data)
|
||||||
|
as_text = ""
|
||||||
|
is_text = False
|
||||||
|
try:
|
||||||
|
if is_mostly_text(data):
|
||||||
|
as_text = data.decode("utf-8", errors="replace").strip()
|
||||||
|
is_text = True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
hexstr = to_hex(data)
|
||||||
|
w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr])
|
||||||
|
|
||||||
|
# Console output
|
||||||
|
if not args.quiet:
|
||||||
|
if args.pid_diff and is_text:
|
||||||
|
parsed = maybe_parse_pid_and_data(as_text)
|
||||||
|
if parsed:
|
||||||
|
pid, pdata = parsed
|
||||||
|
last = last_by_pid.get(pid)
|
||||||
|
if last != pdata:
|
||||||
|
last_by_pid[pid] = pdata
|
||||||
|
print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# default console print
|
||||||
|
if is_text and as_text:
|
||||||
|
print(f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}")
|
||||||
|
else:
|
||||||
|
print(f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
run()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user