На майских праздниках выдалось достаточно много свободного времени, потому было просто необходимо продолжить эксперимент начатый в предыдущей статье. Ранее мне удалось получить данные температуры и влажности из датчика Xiaomi, теперь же была поставлена задача научиться отправлять эти данные MQTT-брокеру.
Изменения в коде получения данных
Для начала стоит отметить, что в код получения данных с датчиков пришлось внести несколько изменений.
После некоторого количества тестов выяснилось, что по неизвестной (хотелось сказать неисследованной, но от этого она менее известной не становится, потому оставим так) причине advertising BLE пакеты начинают разрываться. Т.е. если ранее в одном пакете приходило и имя датчика и его данные, то теперь данные приходят в отдельных пакетах. По этой причине пришлось отказаться от проверки на имя устройства:
if (!advertisedDevice.haveName() || advertisedDevice.getName().compare("MJ_HT_V1"))
return;
После удаления этой проверки появилась другая, можно сказать забавная проблема. Если бы мне 10 лет назад сказали, что когда я буду программировать микроконтроллер с целью получить данные температуры с датчиков по Bluetooth, а отвечать мне будет моя зубная щетка, то я бы попытался вызвать санитаров. Тем не менее, все происходило именно так. Контроллер получал пакеты от зубной щетки и принимал их за нужные датчики. Оказалось, что она тоже имеет данные в пакете с идентификатором 0xfe95
. Очевидно, что данные температуры и влажности щетка сообщить не в состоянии, потому хотелось бы её пакеты игнорировать. Потому пршлось добавить проверку следующего за 0xfe95
байта (data+4
):
if (blockType == 0x16 && serviceType == 0xfe95 && *(data + 4) == 0x50) {
*foundBlockLength = blockLength-3;
return data+4;
}
Дальнейшие тесты показали, что теперь вроде бы всё хорошо и можно продолжать дальше.
MQTT
Для начала нужно было установить MQTT-брокер. Чтобы долго не думать над выбором и не регистрироваться ни в каких сервисах было прнято решение использовать Eclipse Mosquitto. Установить и запустить его оказалось просто до безумия благодаря тому, что он поставляется в том числе и в виде docker-контейнера. У меня уже имелся домашний Linux-сервер (хотя запустить контейнер сейчас можно и на Windows машине), так что оставалось только набрать волшебную команду:
docker run -it -p 1883:1883 -p 9001:9001 -v /mosquitto/data:/var/mosquito/data -v /mosquitto/log:/var/mosquito/logs eclipse-mosquitto
Поскольку это пока только эксперименты, то никаких дополнительных настроек добавлять смысла нет. Достаточно всего того, что есть по умолчанию (даже авторизация не нужна).
Для просмотра данных в брокере был использован первый попавшийся клиент для Windows — MQTT Explorer.
В Интернете можно найти огромное количество материала по вопросам отправки данных в MQTT-брокер с ESP32 (например, вот). Зачастую в них используется библиотека PubSubClient, которую в Arduino IDE можно установить через Library Manager. Работать с этой библиотекой относительно просто. Достаточно иметь активное Wifi подключение (про это тоже можно найти тонну руководств, например вот), а дальше все должно пройти буквально в несколько строчек:
WiFiClient wifiClient; // wifi client object
PubSubClient pubsubClient(wifiClient); // MQTT server client object
pubsubClient.setServer(MqttServer, 1883);
pubsubClient.connect(MqttClientName);
pubsubClient.publish(topic, payload);
В нашем случае, всё пришлось немного усложнить, но об этом далее.
Многозадачность
Особенностью нашего подхода к получению данных с датчиков является то, что контроллер пассивно слушает всё, что они отправляют в эфир. Не очень бы хотелось потерять какие-то пакеты только потому, что контроллер в этот момент занят какой-то другой работой (например, отправкой данный по MQTT). К тому же наш котроллер имеет 2-х ядерный процессор (модули и спецификации).
Потому было просто необходимо разобраться с многозадачностью. Благо программная библиотека ESP32 поддерживает FreeRTOS функции. Запустить новую задачу очень просто — достаточно вызвать функцию xTaskCreate()
. Для решения текущей ситуации понадобилось 2 задачи: для работы с BLE и для работы с Wifi. Запускаются задачи простым кодом:
TaskHandle_t bleScanHandle = nullptr;
xTaskCreate(taskScanBleDevices, "ble-scan", 2048, nullptr, tskIDLE_PRIORITY, &bleScanHandle);
TaskHandle_t wifiActivityHandle = nullptr;
xTaskCreate(taskWifiActivity, "wifi-activity", 2048*8, nullptr, tskIDLE_PRIORITY, &wifiActivityHandle);
Код задачи для работы с BLE очевиден — достаточно просто запускать сканирование с необходимым интервалом:
void taskScanBleDevices(void* pvParameters) {
while (true) {
BLEScan * pBLEScan = BLEDevice::getScan();
pBLEScan->start(g_scanTime, false);
Serial.println("Scan done!");
vTaskDelay(2000 / portTICK_RATE_MS);
}
}
А вот задача для работы с Wifi и MQTT получилась несколько сложнее.
Конечный автомат
Казалось бы, причем тут конечный автомат, но почему-то захотелось реализовать работу с сетью именно так. Всему виной неблокирующая функция WiFi.begin()
, которая требует активного ожидания подключения к Wifi с помощью функции WiFi.status()
(пример). А поскольку связь с Wifi-роутером может обрываться, то нужно поддерживать возможность пересоединения. В этом случае конечный автомат позволит реализовать эту логику относительно просто. Для Arduino можно найти несколько библиотек для реализации подобных решений (например, вот или вот), но и без них реализовать нужное поведение достаточно просто. Хватит и оператора switch
.
Для начала был определен набор состояний автомата:
WifiReconnect
— инициализация Wifi соединения;WifiConnecting
— ожидание Wifi соединения;WifiConnected
— Wifi соединение установлено;MqttReconnect
— инициализация подключения к MQTT-брокеру;MqttConnecting
— ожидание соединения к MQTT-брокеру;MqttConnected
— соединение с MQTT-брокером установлено, можно отправлять данные;Error
— ошибка подключения.
Ну и к этим состояниям прилагается следующая логика переходов:
Выглядит сложно, но в реализации всё достаточно просто. Как уже омечалось ранее достаточно иметь бесконечный цикл и оператор switch
:
void taskWifiActivity(void* pvParameters) {
Serial.println("Wifi Task Started");
WifiActivityState state = WifiActivityState::WifiReconnect;
while (true)
{
wl_status_t wifiStatus = WiFi.status();
Serial.printf("Wifi status: %d State: %d\n", wifiStatus, state);
switch (state)
{
case WifiActivityState::WifiReconnect:
vTaskDelay(5000 / portTICK_RATE_MS); // reconnect delay, just in case
WiFi.begin(WifiSsid, WifiPassword);
state = WifiActivityState::WifiConnecting;
Serial.println("Connecting...");
break;
case WifiActivityState::WifiConnecting:
switch (wifiStatus)
{
case WL_CONNECTED:
state = WifiActivityState::WifiConnected;
Serial.println("Wifi Connected");
break;
case WL_CONNECT_FAILED:
state = WifiActivityState::Error;
break;
default:
vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
break;
}
break;
case WifiActivityState::WifiConnected:
if (wifiStatus == WL_CONNECTED) {
Serial.println(WiFi.localIP());
state = WifiActivityState::MqttReconnect;
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttReconnect:
if (wifiStatus == WL_CONNECTED) {
Serial.println("Mqtt server connecting");
g_pubsubClient.setServer(MqttServer, 1883);
g_pubsubClient.connect(MqttClientName);
state = WifiActivityState::MqttConnecting;
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttConnecting:
if (wifiStatus == WL_CONNECTED) {
if (g_pubsubClient.connected()) {
Serial.println("Mqtt server connected");
state = WifiActivityState::MqttConnected;
}
vTaskDelay(1000 / portTICK_RATE_MS); // active waiting delay
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::MqttConnected:
if (wifiStatus == WL_CONNECTED) {
Serial.println("...Activity...");
if (g_pubsubClient.connected()) {
prepareSendBuffer();
publishEvents();
vTaskDelay(g_eventsDeliverInterval * 1000 / portTICK_RATE_MS);
} else {
Serial.println("Client Disconnected");
state = WifiActivityState::MqttReconnect;
vTaskDelay(5000/portTICK_RATE_MS);
}
} else {
state = WifiActivityState::WifiReconnect;
}
break;
case WifiActivityState::Error:
Serial.println("Connection error");
return; // end task
// TODO add code for connection retry with increasing time intervals
break;
default:
break;
}
}
В приведенном выше коде переменная state
содержит текущее состояние, соответственно для перехода в новое состояние достаточно просто присвоить новое значение в эту переменную. В каждом состоянии приходится проверять состояние Wifi-соединения и переходить к переподключению при необходимости. Этот код можно было не дублировать, но хотелось оставить всю логику переходов внутри обработки каждого состояния.
Также как можно заметить из кода, вся работа по отправке данных в MQTT-брокер происходит в функциях prepareSendBuffer()
и publishEvents()
. Осталось пояснить как они работают.
Отправка данных
Для того, чтобы организовать отправку данных в нашем случае необходимо организовать буфер для принимаемых по Bluetooth сообщений. Потому был создан класс Event
, который хранит в себе информацию о полученном сообщении (тип сообщения, адрес устройства, значение). Сам же буффер принимаемых сообщений это просто vector
указателей на объекты класса Event
.
std::vector<Event*> g_eventsBuffer;
Код приёма и разбора Bluetooth сообщений добавляет сообщения в этот буфер следующим кодом:
g_eventsBuffer.push_back(new Event(deviceAddress, EventType::Humidity, humidity));
Задаче отправки в MQTT-брокер остается только взять события из этого буфера и обработать. Однако, важно не забыть о синхронизации параллельно выполняемых задач, поскольку в случае использования буфера двумя задачами одновременно возможны серьезные проблемы. Для этого достаточно использовать простейший примитив синхронизации доступный во FreeRTOS — мьютекс.
Работать с мьютексом очень просто. Достаточно создать его при помощи функции xSemaphoreCreateMutex()
, а далее использовать 2 функции для его занятия xSemaphoreTake()
и для освобождения xSemaphoreGive()
.
При каждом обращении к буферу сообщений нам необходимо занимать мьютекс, что позволит нам синхронизировать обращения к нему из разных задач.
Оданако, если задача будет держать мьютекс заблокированным все время отправки (которое может быть не маленьким), это приведет к тому, что некоторые принимаемые сообщения могут быть пропущены. Потому критически важно как можно быстрее освобождать буфер для записи сообщений. Для этого применяется дополнительный буфер для отправляемых событий, который заполняется функцией prepareSendBuffer()
:
void prepareSendBuffer() {
Serial.println("Send buffer prepare started");
xSemaphoreTake(g_eventsBufferMutex, portMAX_DELAY);
if (!g_eventsBuffer.empty()) {
Serial.println("Found events");
for(std::vector<Event*>::reverse_iterator i = g_eventsBuffer.rbegin(); i != g_eventsBuffer.rend(); ++i) {
Event* e = *i;
std::string address = e->getDeviceAddress();
Serial.printf("Trying to add event for address %s\n", address.c_str());
// we should check if we already added that event type for that deviceAddress
bool found = false;
if (!g_eventsSendBuffer.empty()) {
for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) {
if ((*i)->getDeviceAddress() == address && (*i)->getEventType() == e->getEventType()) {
found = true;
break;
}
}
}
if (!found) {
g_eventsSendBuffer.push_back(e);
Serial.println("Event added");
} else {
delete e; // we don't need this event anymore
}
}
}
g_eventsBuffer.clear();
xSemaphoreGive(g_eventsBufferMutex);
Serial.println("Send buffer prepared");
}
Эта функция переписывает события в буфер отправки, а также убирает дублирующиеся сообщения для одного устройства (потому кода так много). После переписывания в буфер отправки эта функция удаляет ненужные сообщения и очищает буфер приёма. Синхронизация треубется только на этом этапе поскольку далее с буфером отправки будет работать только одна задача.
Непосредственно отпавка происходит в функции publishEvents()
, код который относительно прост:
void publishEvents() {
Serial.println("Publish events started");
const int bufferSize = 1000;
char* topicStringBuffer = new char[bufferSize];
char* payloadStringBuffer = new char[bufferSize];
if (!g_eventsSendBuffer.empty()) {
for(std::vector<Event*>::iterator i = g_eventsSendBuffer.begin(); i != g_eventsSendBuffer.end(); ++i) {
Event* e = *i;
std::string address = e->getDeviceAddress();
Serial.printf("Publishing event for %s\n", address.c_str());
switch (e->getEventType())
{
case EventType::Temperature:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/temperature", address.c_str());
break;
case EventType::Humidity:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/humidity", address.c_str());
break;
case EventType::Battery:
snprintf(topicStringBuffer, bufferSize, "sensor/%s/battery", address.c_str());
break;
case EventType::VisibleDevices:
snprintf(topicStringBuffer, bufferSize, "sensor/devices");
break;
case EventType::SensorDevices:
snprintf(topicStringBuffer, bufferSize, "sensor/sensors");
break;
default:
continue;
break;
}
snprintf(payloadStringBuffer, bufferSize, "%f", e->getValue());
Serial.printf("Event: %s %s\n", topicStringBuffer, payloadStringBuffer);
delete e;
g_pubsubClient.publish(topicStringBuffer, payloadStringBuffer);
}
}
Serial.println("Publish events DONE");
g_eventsSendBuffer.clear();
delete[] topicStringBuffer;
delete[] payloadStringBuffer;
}
Функция обрабатывает сообщения в буфере отправки последовательно, формирует название MQTT топика и отправляет значение в брокер. Обработанное сообщение затем удалеяется, а буфер отправки очищается.
Результат
В результате мы получаем в достаточной степени автономный модуль, который способен принимать сообщения от датчиков Xiaomi и отправлять их в MQTT-брокер. Выглядит это как-то так:
В дальнейшем данные из MQTT-брокера можно использовать как угодно (например, отображать в каком-нибудь виджете способном работать с MQTT). И надеюсь, мне удасться продолжить эксперименты и показать интересный способ работы с ними.
Полный код из данной заметки можно найти на Github.
Комментариев нет:
Отправить комментарий