add UDP command processing and sender for dynamic testing
This commit is contained in:
85
tools/udp_cmd_sender.py
Normal file
85
tools/udp_cmd_sender.py
Normal file
@@ -0,0 +1,85 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
UDP Command Sender for Truma ESP32 Master Mode
|
||||
Sends diagnostic commands to ESP32 for dynamic testing
|
||||
"""
|
||||
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
def send_command(esp_ip, command, nad):
|
||||
"""Send a diagnostic command to ESP32"""
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
# Format: "CMD:B2 23 17 46 10 03 NAD:01"
|
||||
message = f"CMD:{command} NAD:{nad:02X}"
|
||||
|
||||
print(f"Sending: {message}")
|
||||
sock.sendto(message.encode(), (esp_ip, 5556))
|
||||
sock.close()
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage:")
|
||||
print(" python udp_cmd_sender.py <ESP32_IP> [command] [nad]")
|
||||
print("")
|
||||
print("Examples:")
|
||||
print(" python udp_cmd_sender.py 192.168.1.90")
|
||||
print(" python udp_cmd_sender.py 192.168.1.90 'B2 23 17 46 10 03' 01")
|
||||
print(" python udp_cmd_sender.py 192.168.1.90 'B2 00 17 46 10 03' 01")
|
||||
print(" python udp_cmd_sender.py 192.168.1.90 'B2 00 17 46 40 03' 01")
|
||||
print("")
|
||||
print("Interactive mode if no command specified.")
|
||||
return
|
||||
|
||||
esp_ip = sys.argv[1]
|
||||
|
||||
if len(sys.argv) >= 4:
|
||||
# Single command mode
|
||||
command = sys.argv[2]
|
||||
nad = int(sys.argv[3], 16)
|
||||
send_command(esp_ip, command, nad)
|
||||
else:
|
||||
# Interactive mode
|
||||
print(f"UDP Command Sender - Connected to ESP32 at {esp_ip}:5556")
|
||||
print("Commands will be sent to ESP32 and you should see responses on UDP port 5555")
|
||||
print("")
|
||||
print("Useful commands to try:")
|
||||
print(" B2 23 17 46 10 03 (Query identifier 0x23 with CP Plus signature)")
|
||||
print(" B2 00 17 46 10 03 (Query identifier 0x00 with CP Plus signature)")
|
||||
print(" B2 00 17 46 40 03 (Query identifier 0x00 with heater signature)")
|
||||
print(" B2 00 17 46 01 03 (Query identifier 0x00 with old signature)")
|
||||
print("")
|
||||
print("Enter 'quit' to exit")
|
||||
print("")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("Enter command (hex bytes): ").strip()
|
||||
if user_input.lower() in ['quit', 'exit', 'q']:
|
||||
break
|
||||
|
||||
if not user_input:
|
||||
continue
|
||||
|
||||
# Get NAD
|
||||
nad_input = input("Enter NAD (hex, default 01): ").strip()
|
||||
if not nad_input:
|
||||
nad = 1
|
||||
else:
|
||||
nad = int(nad_input, 16)
|
||||
|
||||
send_command(esp_ip, user_input, nad)
|
||||
print("Command sent! Check UDP receiver for responses.")
|
||||
print("")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
print("Goodbye!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user