diff --git a/components/truma_inetbox/LinBusListener.cpp b/components/truma_inetbox/LinBusListener.cpp index 5a16c0d..53f9e1f 100644 --- a/components/truma_inetbox/LinBusListener.cpp +++ b/components/truma_inetbox/LinBusListener.cpp @@ -63,6 +63,20 @@ void LinBusListener::setup() { #ifdef USE_ESP32 this->stream_try_init_(); + if (this->stream_queue_ == nullptr) { + this->stream_queue_ = xQueueCreateStatic( + STREAM_QUEUE_LEN, STREAM_QUEUE_ITEM_MAX, + stream_static_queue_storage_, &stream_static_queue_); + } + if (this->streamTaskHandle_ == nullptr) { + xTaskCreatePinnedToCore(LinBusListener::streamTask_, + "lin_stream_task", + 4096, + this, + 1, + &this->streamTaskHandle_, + 0); + } #endif #if ESPHOME_LOG_LEVEL > ESPHOME_LOG_LEVEL_NONE @@ -504,13 +518,40 @@ void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &log_msg) { } #endif line.push_back('\n'); - ::sendto(this->udp_sock_, line.c_str(), line.size(), 0, (struct sockaddr *) &this->udp_addr_, sizeof(this->udp_addr_)); - this->last_stream_send_ms_ = millis(); + this->stream_enqueue_line_(line); } +#endif +void LinBusListener::stream_enqueue_line_(const std::string &line) { + if (!this->stream_queue_) return; + if (line.empty()) return; + char buf[STREAM_QUEUE_ITEM_MAX]; + size_t n = line.size(); + if (n >= sizeof(buf)) n = sizeof(buf) - 1; + memcpy(buf, line.data(), n); + buf[n] = '\0'; + xQueueSend(this->stream_queue_, buf, 0); +} + +void LinBusListener::streamTask_(void *args) { + auto *self = static_cast(args); + char buf[STREAM_QUEUE_ITEM_MAX]; + for (;;) { + if (xQueueReceive(self->stream_queue_, buf, pdMS_TO_TICKS(100)) == pdPASS) { + if (self->udp_sock_ >= 0) { + size_t len = strnlen(buf, sizeof(buf)); + ::sendto(self->udp_sock_, buf, len, 0, (struct sockaddr *) &self->udp_addr_, sizeof(self->udp_addr_)); + self->last_stream_send_ms_ = millis(); + } + } + } +} + #else void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &) {} void LinBusListener::stream_try_init_() {} void LinBusListener::stream_maybe_keepalive_() {} +void LinBusListener::stream_enqueue_line_(const std::string &) {} +void LinBusListener::streamTask_(void *) {} #endif #undef LIN_BREAK diff --git a/components/truma_inetbox/LinBusListener.h b/components/truma_inetbox/LinBusListener.h index 97c8cb0..6763fb5 100644 --- a/components/truma_inetbox/LinBusListener.h +++ b/components/truma_inetbox/LinBusListener.h @@ -150,6 +150,7 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice { void maybe_send_stream_(const QUEUE_LOG_MSG &log_msg); void stream_try_init_(); void stream_maybe_keepalive_(); + void stream_enqueue_line_(const std::string &line); uint8_t lin_msg_static_queue_storage[TRUMA_MSG_QUEUE_LENGTH * sizeof(QUEUE_LIN_MSG)]; StaticQueue_t lin_msg_static_queue_; @@ -174,6 +175,14 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice { int udp_sock_ = -1; struct sockaddr_in udp_addr_ {}; uint32_t last_stream_send_ms_ = 0; + // Background sender task + queue to avoid blocking main loop + TaskHandle_t streamTaskHandle_ = nullptr; + static void streamTask_(void *args); + static constexpr size_t STREAM_QUEUE_ITEM_MAX = 120; // bytes per line + static constexpr size_t STREAM_QUEUE_LEN = 16; + uint8_t stream_static_queue_storage_[STREAM_QUEUE_LEN * STREAM_QUEUE_ITEM_MAX]; + StaticQueue_t stream_static_queue_; + QueueHandle_t stream_queue_ = nullptr; #endif // USE_ESP32 #ifdef USE_ESP32_FRAMEWORK_ESP_IDF TaskHandle_t uartEventTaskHandle_;