Compare commits

...

2 Commits

Author SHA1 Message Date
4719d483a7 truma_inetbox: expose observer_mode in CONFIG_SCHEMA and wire to set_observer_mode() 2025-09-08 16:30:34 +02:00
c81ffdcb12 tools: add udp_lin_receiver.py for capturing and diffing UDP LIN stream
- Records each datagram with timestamp to CSV
- Prints live console summary
- Optional --pid-diff mode parses textual lines like 'PID XX <hex>' and only prints on data changes
2025-09-08 16:20:46 +02:00
2 changed files with 155 additions and 0 deletions

View File

@ -217,6 +217,7 @@ CONFIG_SCHEMA = cv.All(
cv.Optional(CONF_LIN_CHECKSUM, "VERSION_2"): cv.enum(CONF_SUPPORTED_LIN_CHECKSUM, upper=True), cv.Optional(CONF_LIN_CHECKSUM, "VERSION_2"): cv.enum(CONF_SUPPORTED_LIN_CHECKSUM, upper=True),
cv.Optional(CONF_CS_PIN): pins.gpio_output_pin_schema, cv.Optional(CONF_CS_PIN): pins.gpio_output_pin_schema,
cv.Optional(CONF_FAULT_PIN): pins.gpio_input_pin_schema, cv.Optional(CONF_FAULT_PIN): pins.gpio_input_pin_schema,
cv.Optional(CONF_OBSERVER_MODE, False): cv.boolean,
cv.Optional(CONF_UDP_STREAM_HOST): cv.string, cv.Optional(CONF_UDP_STREAM_HOST): cv.string,
cv.Optional(CONF_UDP_STREAM_PORT): cv.int_, cv.Optional(CONF_UDP_STREAM_PORT): cv.int_,

154
tools/udp_lin_receiver.py Normal file
View File

@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
UDP receiver for LIN sniffer/master stream.
Purpose
- Capture all UDP datagrams from the ESP and write a CSV for later diffing.
- Print a live summary to the console.
- Optionally parse lines that contain textual "PID XX" + hex bytes and emit
only changes per PID (useful when the stream is verbose).
This is intentionally tolerant about payload format: it records raw hex and, if
the payload looks textual, the decoded text as well.
Examples
# Basic recording on port 5555
python udp_lin_receiver.py --port 5555 --out capture.csv
# Only show changed payloads per PID (when textual lines include "PID ..")
python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff
# Quiet mode (only CSV), capture for 5 minutes
python udp_lin_receiver.py --quiet --duration 300 --out capture.csv
"""
from __future__ import annotations
import argparse
import csv
import datetime as dt
import os
import re
import socket
import sys
import time
from typing import Dict, Optional, Tuple
def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool:
if not b:
return True
printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b)
return printable / len(b) >= threshold
def to_hex(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)")
def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]:
"""Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...))
Returns None if no clear match.
"""
m = PID_LINE_RE.search(text)
if not m:
return None
try:
pid = int(m.group(1), 16)
data_hex = m.group(2)
data = tuple(int(x, 16) for x in data_hex.split())
return pid, data
except Exception:
return None
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds")
def run():
ap = argparse.ArgumentParser(description="UDP LIN receiver")
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)")
ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)")
ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path")
ap.add_argument("--append", action="store_true", help="Append to CSV if exists")
ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)")
ap.add_argument("--quiet", action="store_true", help="No console output, only CSV")
ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes")
ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)")
args = ap.parse_args()
# Prepare CSV
out_exists = os.path.exists(args.out)
csv_mode = "a" if args.append else "w"
with open(args.out, csv_mode, newline="", encoding="utf-8") as f:
w = csv.writer(f)
if not out_exists or not args.append:
w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header
# Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((args.host, args.port))
sock.settimeout(1.0)
if not args.quiet:
print(f"Listening on {args.host}:{args.port}{args.out}")
if args.pid_diff:
print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes")
start = time.time()
last_by_pid: Dict[int, Tuple[int, ...]] = {}
while True:
if args.duration and (time.time() - start) >= args.duration:
if not args.quiet:
print("Duration reached; exiting.")
break
try:
data, addr = sock.recvfrom(args.buffer)
except socket.timeout:
continue
ts = now_iso()
src_ip, src_port = addr[0], addr[1]
size = len(data)
as_text = ""
is_text = False
try:
if is_mostly_text(data):
as_text = data.decode("utf-8", errors="replace").strip()
is_text = True
except Exception:
pass
hexstr = to_hex(data)
w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr])
# Console output
if not args.quiet:
if args.pid_diff and is_text:
parsed = maybe_parse_pid_and_data(as_text)
if parsed:
pid, pdata = parsed
last = last_by_pid.get(pid)
if last != pdata:
last_by_pid[pid] = pdata
print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }")
continue
# default console print
if is_text and as_text:
print(f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}")
else:
print(f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}")
if __name__ == "__main__":
try:
run()
except KeyboardInterrupt:
pass