truma_inetbox(stream): move UDP sending to background task
- Add FreeRTOS queue + dedicated sender task to avoid blocking main loop - Short socket send timeout to prevent stalls - Keepalive and PID lines enqueued; drop on full queue - Add stream_send_test() to verify UDP path
This commit is contained in:
parent
99249dfaa9
commit
382a4d3b4a
@ -63,6 +63,20 @@ void LinBusListener::setup() {
|
|||||||
|
|
||||||
#ifdef USE_ESP32
|
#ifdef USE_ESP32
|
||||||
this->stream_try_init_();
|
this->stream_try_init_();
|
||||||
|
if (this->stream_queue_ == nullptr) {
|
||||||
|
this->stream_queue_ = xQueueCreateStatic(
|
||||||
|
STREAM_QUEUE_LEN, STREAM_QUEUE_ITEM_MAX,
|
||||||
|
stream_static_queue_storage_, &stream_static_queue_);
|
||||||
|
}
|
||||||
|
if (this->streamTaskHandle_ == nullptr) {
|
||||||
|
xTaskCreatePinnedToCore(LinBusListener::streamTask_,
|
||||||
|
"lin_stream_task",
|
||||||
|
4096,
|
||||||
|
this,
|
||||||
|
1,
|
||||||
|
&this->streamTaskHandle_,
|
||||||
|
0);
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if ESPHOME_LOG_LEVEL > ESPHOME_LOG_LEVEL_NONE
|
#if ESPHOME_LOG_LEVEL > ESPHOME_LOG_LEVEL_NONE
|
||||||
@ -504,13 +518,40 @@ void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &log_msg) {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
line.push_back('\n');
|
line.push_back('\n');
|
||||||
::sendto(this->udp_sock_, line.c_str(), line.size(), 0, (struct sockaddr *) &this->udp_addr_, sizeof(this->udp_addr_));
|
this->stream_enqueue_line_(line);
|
||||||
this->last_stream_send_ms_ = millis();
|
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
void LinBusListener::stream_enqueue_line_(const std::string &line) {
|
||||||
|
if (!this->stream_queue_) return;
|
||||||
|
if (line.empty()) return;
|
||||||
|
char buf[STREAM_QUEUE_ITEM_MAX];
|
||||||
|
size_t n = line.size();
|
||||||
|
if (n >= sizeof(buf)) n = sizeof(buf) - 1;
|
||||||
|
memcpy(buf, line.data(), n);
|
||||||
|
buf[n] = '\0';
|
||||||
|
xQueueSend(this->stream_queue_, buf, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void LinBusListener::streamTask_(void *args) {
|
||||||
|
auto *self = static_cast<LinBusListener*>(args);
|
||||||
|
char buf[STREAM_QUEUE_ITEM_MAX];
|
||||||
|
for (;;) {
|
||||||
|
if (xQueueReceive(self->stream_queue_, buf, pdMS_TO_TICKS(100)) == pdPASS) {
|
||||||
|
if (self->udp_sock_ >= 0) {
|
||||||
|
size_t len = strnlen(buf, sizeof(buf));
|
||||||
|
::sendto(self->udp_sock_, buf, len, 0, (struct sockaddr *) &self->udp_addr_, sizeof(self->udp_addr_));
|
||||||
|
self->last_stream_send_ms_ = millis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#else
|
#else
|
||||||
void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &) {}
|
void LinBusListener::maybe_send_stream_(const QUEUE_LOG_MSG &) {}
|
||||||
void LinBusListener::stream_try_init_() {}
|
void LinBusListener::stream_try_init_() {}
|
||||||
void LinBusListener::stream_maybe_keepalive_() {}
|
void LinBusListener::stream_maybe_keepalive_() {}
|
||||||
|
void LinBusListener::stream_enqueue_line_(const std::string &) {}
|
||||||
|
void LinBusListener::streamTask_(void *) {}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#undef LIN_BREAK
|
#undef LIN_BREAK
|
||||||
|
|||||||
@ -150,6 +150,7 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice {
|
|||||||
void maybe_send_stream_(const QUEUE_LOG_MSG &log_msg);
|
void maybe_send_stream_(const QUEUE_LOG_MSG &log_msg);
|
||||||
void stream_try_init_();
|
void stream_try_init_();
|
||||||
void stream_maybe_keepalive_();
|
void stream_maybe_keepalive_();
|
||||||
|
void stream_enqueue_line_(const std::string &line);
|
||||||
|
|
||||||
uint8_t lin_msg_static_queue_storage[TRUMA_MSG_QUEUE_LENGTH * sizeof(QUEUE_LIN_MSG)];
|
uint8_t lin_msg_static_queue_storage[TRUMA_MSG_QUEUE_LENGTH * sizeof(QUEUE_LIN_MSG)];
|
||||||
StaticQueue_t lin_msg_static_queue_;
|
StaticQueue_t lin_msg_static_queue_;
|
||||||
@ -174,6 +175,14 @@ class LinBusListener : public PollingComponent, public uart::UARTDevice {
|
|||||||
int udp_sock_ = -1;
|
int udp_sock_ = -1;
|
||||||
struct sockaddr_in udp_addr_ {};
|
struct sockaddr_in udp_addr_ {};
|
||||||
uint32_t last_stream_send_ms_ = 0;
|
uint32_t last_stream_send_ms_ = 0;
|
||||||
|
// Background sender task + queue to avoid blocking main loop
|
||||||
|
TaskHandle_t streamTaskHandle_ = nullptr;
|
||||||
|
static void streamTask_(void *args);
|
||||||
|
static constexpr size_t STREAM_QUEUE_ITEM_MAX = 120; // bytes per line
|
||||||
|
static constexpr size_t STREAM_QUEUE_LEN = 16;
|
||||||
|
uint8_t stream_static_queue_storage_[STREAM_QUEUE_LEN * STREAM_QUEUE_ITEM_MAX];
|
||||||
|
StaticQueue_t stream_static_queue_;
|
||||||
|
QueueHandle_t stream_queue_ = nullptr;
|
||||||
#endif // USE_ESP32
|
#endif // USE_ESP32
|
||||||
#ifdef USE_ESP32_FRAMEWORK_ESP_IDF
|
#ifdef USE_ESP32_FRAMEWORK_ESP_IDF
|
||||||
TaskHandle_t uartEventTaskHandle_;
|
TaskHandle_t uartEventTaskHandle_;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user