truma_inetbox: add UDP streaming (ESP32)

- observer_mode sniffer UDP: create socket, send 'PID XX <hex>' lines
- Keepalive with stream_keepalive_ms
- stream_diag_only filters to 0x3C/0x3D
- Controlled by stream_enabled/host/port
This commit is contained in:
Hendrik Groove 2025-09-08 17:11:53 +02:00
parent 4719d483a7
commit 9e07fb6953
2 changed files with 108 additions and 2 deletions

View File

@ -2,6 +2,10 @@
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
#include "helpers.h"
#ifdef USE_ESP32
#include <lwip/sockets.h>
#include <lwip/inet.h>
#endif
namespace esphome {
namespace truma_inetbox {
@ -57,6 +61,10 @@ void LinBusListener::setup() {
// call device specific function
this->setup_framework();
#ifdef USE_ESP32
this->stream_try_init_();
#endif
#if ESPHOME_LOG_LEVEL > ESPHOME_LOG_LEVEL_NONE
assert(this->log_queue_ != 0);
@ -70,7 +78,12 @@ void LinBusListener::setup() {
}
}
void LinBusListener::update() { this->check_for_lin_fault_(); }
void LinBusListener::update() {
this->check_for_lin_fault_();
#ifdef USE_ESP32
this->stream_maybe_keepalive_();
#endif
}
void LinBusListener::write_lin_answer_(const u_int8_t *data, u_int8_t len) {
QUEUE_LOG_MSG log_msg = QUEUE_LOG_MSG();
@ -337,6 +350,8 @@ void LinBusListener::process_log_queue(TickType_t xTicksToWait) {
QUEUE_LOG_MSG log_msg;
while (xQueueReceive(this->log_queue_, &log_msg, xTicksToWait) == pdPASS) {
auto current_PID = log_msg.current_PID;
// Optional: forward to UDP stream before/after printing
this->maybe_send_stream_(log_msg);
switch (log_msg.type) {
case QUEUE_LOG_MSG_TYPE::ERROR_LIN_ANSWER_CAN_WRITE_LIN_ANSWER:
ESP_LOGE(TAG, "Cannot answer LIN because there is no open order from master.");
@ -405,6 +420,89 @@ void LinBusListener::process_log_queue(TickType_t xTicksToWait) {
#endif // ESPHOME_LOG_LEVEL > ESPHOME_LOG_LEVEL_NONE
}
#ifdef USE_ESP32
void LinBusListener::stream_try_init_() {
if (!this->stream_enabled_) return;
if (this->udp_sock_ >= 0) return;
if (this->udp_host_.empty() || this->udp_port_ == 0) {
ESP_LOGI(TAG, "UDP stream disabled: missing host/port");
return;
}
this->udp_sock_ = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (this->udp_sock_ < 0) {
ESP_LOGW(TAG, "UDP socket create failed");
return;
}
memset(&this->udp_addr_, 0, sizeof(this->udp_addr_));
this->udp_addr_.sin_family = AF_INET;
this->udp_addr_.sin_port = htons(this->udp_port_);
this->udp_addr_.sin_addr.s_addr = inet_addr(this->udp_host_.c_str());
if (this->udp_addr_.sin_addr.s_addr == (in_addr_t) -1) {
ESP_LOGW(TAG, "UDP host invalid: %s", this->udp_host_.c_str());
::close(this->udp_sock_);
this->udp_sock_ = -1;
return;
}
ESP_LOGI(TAG, "UDP streaming to %s:%u", this->udp_host_.c_str(), (unsigned) this->udp_port_);
this->last_stream_send_ms_ = millis();
}
static std::string hex_bytes_(const uint8_t *data, uint8_t len) {
char buf[4];
std::string out;
out.reserve(len * 3);
for (uint8_t i = 0; i < len; i++) {
snprintf(buf, sizeof(buf), "%02X", data[i]);
if (!out.empty()) out.push_back(' ');
out += buf;
}
return out;
}
void LinBusListener::stream_maybe_keepalive_() {
if (!this->stream_enabled_ || this->udp_sock_ < 0) return;
const uint32_t now = millis();
if (this->stream_keepalive_ms_ == 0) return;
if (now - this->last_stream_send_ms_ >= this->stream_keepalive_ms_) {
const char *msg = "KEEPALIVE";
::sendto(this->udp_sock_, msg, strlen(msg), 0, (struct sockaddr *) &this->udp_addr_, sizeof(this->udp_addr_));
this->last_stream_send_ms_ = now;
}
}
void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &log_msg) {
if (!this->stream_enabled_) return;
if (this->udp_sock_ < 0) this->stream_try_init_();
if (this->udp_sock_ < 0) return;
const uint8_t pid = log_msg.current_PID;
if (this->stream_diag_only_ && !(pid == DIAGNOSTIC_FRAME_MASTER || pid == DIAGNOSTIC_FRAME_SLAVE)) {
return;
}
if (log_msg.len == 0) return;
// Compose a compact line: "PID XX <hex> [MASTER|SLAVE]"
std::string line;
line.reserve(64);
char head[16];
snprintf(head, sizeof(head), "PID %02X ", pid);
line += head;
line += hex_bytes_(log_msg.data, log_msg.len);
#ifdef ESPHOME_LOG_HAS_VERBOSE
if (log_msg.message_source_know) {
line += (log_msg.message_from_master ? " MASTER" : " SLAVE");
}
#endif
line.push_back('\n');
::sendto(this->udp_sock_, line.c_str(), line.size(), 0, (struct sockaddr *) &this->udp_addr_, sizeof(this->udp_addr_));
this->last_stream_send_ms_ = millis();
}
#else
void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &) {}
void LinBusListener::stream_try_init_() {}
void LinBusListener::stream_maybe_keepalive_() {}
#endif
#undef LIN_BREAK
#undef LIN_SYNC
#undef DIAGNOSTIC_FRAME_MASTER

View File

@ -7,6 +7,8 @@
#ifdef USE_ESP32
#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
#include <lwip/sockets.h>
#include <lwip/inet.h>
#endif // USE_ESP32
#ifdef USE_RP2040
#include <hardware/uart.h>
@ -144,6 +146,8 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice {
void clear_uart_buffer_();
void setup_framework();
void maybe_send_stream_(const QUEUE_LOG_MSG &log_msg);
void stream_try_init_();
void stream_maybe_keepalive_();
uint8_t lin_msg_static_queue_storage[TRUMA_MSG_QUEUE_LENGTH * sizeof(QUEUE_LIN_MSG)];
StaticQueue_t lin_msg_static_queue_;
@ -164,6 +168,10 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice {
#ifdef USE_ESP32
TaskHandle_t eventTaskHandle_;
static void eventTask_(void *args);
// UDP streaming (ESP32 only)
int udp_sock_ = -1;
struct sockaddr_in udp_addr_ {};
uint32_t last_stream_send_ms_ = 0;
#endif // USE_ESP32
#ifdef USE_ESP32_FRAMEWORK_ESP_IDF
TaskHandle_t uartEventTaskHandle_;
@ -176,4 +184,4 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice {
};
} // namespace truma_inetbox
} // namespace esphome
} // namespace esphome