Compare commits
2 Commits
4298e675a5
...
4719d483a7
| Author | SHA1 | Date | |
|---|---|---|---|
| 4719d483a7 | |||
| c81ffdcb12 |
@ -217,6 +217,7 @@ CONFIG_SCHEMA = cv.All(
|
||||
cv.Optional(CONF_LIN_CHECKSUM, "VERSION_2"): cv.enum(CONF_SUPPORTED_LIN_CHECKSUM, upper=True),
|
||||
cv.Optional(CONF_CS_PIN): pins.gpio_output_pin_schema,
|
||||
cv.Optional(CONF_FAULT_PIN): pins.gpio_input_pin_schema,
|
||||
cv.Optional(CONF_OBSERVER_MODE, False): cv.boolean,
|
||||
|
||||
cv.Optional(CONF_UDP_STREAM_HOST): cv.string,
|
||||
cv.Optional(CONF_UDP_STREAM_PORT): cv.int_,
|
||||
|
||||
154
tools/udp_lin_receiver.py
Normal file
154
tools/udp_lin_receiver.py
Normal file
@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
UDP receiver for LIN sniffer/master stream.
|
||||
|
||||
Purpose
|
||||
- Capture all UDP datagrams from the ESP and write a CSV for later diffing.
|
||||
- Print a live summary to the console.
|
||||
- Optionally parse lines that contain textual "PID XX" + hex bytes and emit
|
||||
only changes per PID (useful when the stream is verbose).
|
||||
|
||||
This is intentionally tolerant about payload format: it records raw hex and, if
|
||||
the payload looks textual, the decoded text as well.
|
||||
|
||||
Examples
|
||||
# Basic recording on port 5555
|
||||
python udp_lin_receiver.py --port 5555 --out capture.csv
|
||||
|
||||
# Only show changed payloads per PID (when textual lines include "PID ..")
|
||||
python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff
|
||||
|
||||
# Quiet mode (only CSV), capture for 5 minutes
|
||||
python udp_lin_receiver.py --quiet --duration 300 --out capture.csv
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import datetime as dt
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
|
||||
def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool:
|
||||
if not b:
|
||||
return True
|
||||
printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b)
|
||||
return printable / len(b) >= threshold
|
||||
|
||||
|
||||
def to_hex(b: bytes) -> str:
|
||||
return " ".join(f"{x:02X}" for x in b)
|
||||
|
||||
|
||||
PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)")
|
||||
|
||||
|
||||
def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]:
|
||||
"""Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...))
|
||||
Returns None if no clear match.
|
||||
"""
|
||||
m = PID_LINE_RE.search(text)
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
pid = int(m.group(1), 16)
|
||||
data_hex = m.group(2)
|
||||
data = tuple(int(x, 16) for x in data_hex.split())
|
||||
return pid, data
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds")
|
||||
|
||||
|
||||
def run():
|
||||
ap = argparse.ArgumentParser(description="UDP LIN receiver")
|
||||
ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)")
|
||||
ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)")
|
||||
ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path")
|
||||
ap.add_argument("--append", action="store_true", help="Append to CSV if exists")
|
||||
ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)")
|
||||
ap.add_argument("--quiet", action="store_true", help="No console output, only CSV")
|
||||
ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes")
|
||||
ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)")
|
||||
args = ap.parse_args()
|
||||
|
||||
# Prepare CSV
|
||||
out_exists = os.path.exists(args.out)
|
||||
csv_mode = "a" if args.append else "w"
|
||||
with open(args.out, csv_mode, newline="", encoding="utf-8") as f:
|
||||
w = csv.writer(f)
|
||||
if not out_exists or not args.append:
|
||||
w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header
|
||||
|
||||
# Socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind((args.host, args.port))
|
||||
sock.settimeout(1.0)
|
||||
|
||||
if not args.quiet:
|
||||
print(f"Listening on {args.host}:{args.port} → {args.out}")
|
||||
if args.pid_diff:
|
||||
print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes")
|
||||
|
||||
start = time.time()
|
||||
last_by_pid: Dict[int, Tuple[int, ...]] = {}
|
||||
|
||||
while True:
|
||||
if args.duration and (time.time() - start) >= args.duration:
|
||||
if not args.quiet:
|
||||
print("Duration reached; exiting.")
|
||||
break
|
||||
try:
|
||||
data, addr = sock.recvfrom(args.buffer)
|
||||
except socket.timeout:
|
||||
continue
|
||||
|
||||
ts = now_iso()
|
||||
src_ip, src_port = addr[0], addr[1]
|
||||
size = len(data)
|
||||
as_text = ""
|
||||
is_text = False
|
||||
try:
|
||||
if is_mostly_text(data):
|
||||
as_text = data.decode("utf-8", errors="replace").strip()
|
||||
is_text = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
hexstr = to_hex(data)
|
||||
w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr])
|
||||
|
||||
# Console output
|
||||
if not args.quiet:
|
||||
if args.pid_diff and is_text:
|
||||
parsed = maybe_parse_pid_and_data(as_text)
|
||||
if parsed:
|
||||
pid, pdata = parsed
|
||||
last = last_by_pid.get(pid)
|
||||
if last != pdata:
|
||||
last_by_pid[pid] = pdata
|
||||
print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }")
|
||||
continue
|
||||
|
||||
# default console print
|
||||
if is_text and as_text:
|
||||
print(f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}")
|
||||
else:
|
||||
print(f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user