esphome-truma_inetbox/tools/udp_lin_receiver.py
Hendrik Groove 98ad6caab9 truma_inetbox: implement master mode auto-discovery and device scanning
udp_lin_receiver: add option to suppress repeated identical messages
2025-09-14 21:07:18 +02:00

183 lines
6.7 KiB
Python

#!/usr/bin/env python3
"""
UDP receiver for LIN sniffer/master stream.
Purpose
- Capture all UDP datagrams from the ESP and write a CSV for later diffing.
- Print a live summary to the console.
- Optionally parse lines that contain textual "PID XX" + hex bytes and emit
only changes per PID (useful when the stream is verbose).
This is intentionally tolerant about payload format: it records raw hex and, if
the payload looks textual, the decoded text as well.
Examples
# Basic recording on port 5555
python udp_lin_receiver.py --port 5555 --out capture.csv
# Only show changed payloads per PID (when textual lines include "PID ..")
python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff
# Quiet mode (only CSV), capture for 5 minutes
python udp_lin_receiver.py --quiet --duration 300 --out capture.csv
"""
from __future__ import annotations
import argparse
import csv
import datetime as dt
import os
import re
import socket
import sys
import time
from typing import Dict, Optional, Tuple
def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool:
if not b:
return True
printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b)
return printable / len(b) >= threshold
def to_hex(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)")
def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]:
"""Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...))
Returns None if no clear match.
"""
m = PID_LINE_RE.search(text)
if not m:
return None
try:
pid = int(m.group(1), 16)
data_hex = m.group(2)
data = tuple(int(x, 16) for x in data_hex.split())
return pid, data
except Exception:
return None
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds")
def run():
ap = argparse.ArgumentParser(description="UDP LIN receiver")
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)")
ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)")
ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path")
ap.add_argument("--append", action="store_true", help="Append to CSV if exists")
ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)")
ap.add_argument("--quiet", action="store_true", help="No console output, only CSV")
ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes")
ap.add_argument("--suppress-repeats", action="store_true", help="Suppress repeated identical messages (shows first + count)")
ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)")
args = ap.parse_args()
# Prepare CSV
out_exists = os.path.exists(args.out)
csv_mode = "a" if args.append else "w"
with open(args.out, csv_mode, newline="", encoding="utf-8") as f:
w = csv.writer(f)
if not out_exists or not args.append:
w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header
# Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((args.host, args.port))
sock.settimeout(1.0)
if not args.quiet:
print(f"Listening on {args.host}:{args.port}{args.out}")
if args.pid_diff:
print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes")
start = time.time()
last_by_pid: Dict[int, Tuple[int, ...]] = {}
last_message = None
repeat_count = 0
last_display_time = 0
while True:
if args.duration and (time.time() - start) >= args.duration:
if not args.quiet:
print("Duration reached; exiting.")
break
try:
data, addr = sock.recvfrom(args.buffer)
except socket.timeout:
continue
ts = now_iso()
src_ip, src_port = addr[0], addr[1]
size = len(data)
as_text = ""
is_text = False
try:
if is_mostly_text(data):
as_text = data.decode("utf-8", errors="replace").strip()
is_text = True
except Exception:
pass
hexstr = to_hex(data)
w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr])
# Console output
if not args.quiet:
# Format the display message
if is_text and as_text:
display_msg = f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}"
else:
display_msg = f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}"
# Handle repeat suppression
if args.suppress_repeats:
current_msg_key = as_text if is_text else hexstr
current_time = time.time()
if current_msg_key == last_message:
repeat_count += 1
# Show periodic updates for long repeated sequences
if current_time - last_display_time > 5.0: # Every 5 seconds
print(f"... repeated {repeat_count} times (last: {ts})")
last_display_time = current_time
else:
# Message changed
if repeat_count > 0:
print(f"... repeated {repeat_count} times total")
print(display_msg)
last_message = current_msg_key
repeat_count = 0
last_display_time = current_time
continue
if args.pid_diff and is_text:
parsed = maybe_parse_pid_and_data(as_text)
if parsed:
pid, pdata = parsed
last = last_by_pid.get(pid)
if last != pdata:
last_by_pid[pid] = pdata
print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }")
continue
# default console print
print(display_msg)
if __name__ == "__main__":
try:
run()
except KeyboardInterrupt:
pass