|
SPSP
Simple publish-subscribe protocol. Connects low power IoT clients to MQTT.
|
Go to the documentation of this file.
16 #include <unordered_map>
19 #include "spsp/logger.hpp"
25 #define SPSP_LOG_TAG "SPSP/Client"
50 std::chrono::milliseconds
interval = std::chrono::minutes(1);
58 std::chrono::milliseconds
subLifetime = std::chrono::minutes(10);
76 template <
typename TLocalLayer>
99 using LocalAddrT =
typename TLocalLayer::LocalAddrT;
100 using LocalMessageT =
typename TLocalLayer::LocalMessageT;
113 SPSP_LOGI(
"Initialized");
122 SPSP_LOGI(
"Deinitialized");
136 bool publish(
const std::string& topic,
const std::string& payload)
138 SPSP_LOGD(
"Publishing: topic '%s', payload '%s'",
139 topic.c_str(), payload.c_str());
142 SPSP_LOGW(
"Can't publish to empty topic");
146 LocalMessageT msg = {};
148 msg.type = LocalMessageType::PUB;
150 msg.payload = payload;
170 SPSP_LOGD(
"Subscribing to topic '%s'", topic.c_str());
173 SPSP_LOGW(
"Can't subscribe to empty topic");
179 const std::scoped_lock lock(
m_mutex);
186 m_subDB.insert(topic, subDBEntry);
205 SPSP_LOGD(
"Unsubscribing from topic '%s'", topic.c_str());
208 SPSP_LOGW(
"Can't unsubscribe from empty topic");
212 LocalMessageT msg = {};
214 msg.type = LocalMessageType::UNSUB;
220 const std::scoped_lock lock(
m_mutex);
223 SPSP_LOGW(
"Can't unsubscribe from not-subscribed topic '%s'",
243 const std::scoped_lock lock(
m_mutex);
247 SPSP_LOGW(
"Resubscribe to topic %s failed", topic.c_str());
260 SPSP_LOGD(
"Time sync: start");
263 auto cleanup = [
this]() {
264 const std::scoped_lock lock(
m_mutex);
269 LocalMessageT msg = {};
271 msg.type = LocalMessageType::TIME_REQ;
276 std::future<bool> future;
278 const std::scoped_lock lock(
m_mutex);
286 SPSP_LOGE(
"Time sync: request can't be sent");
292 SPSP_LOGE(
"Time sync: response timeout");
296 if (future.get() ==
false) {
298 SPSP_LOGE(
"Time sync: invalid bridge response");
303 SPSP_LOGD(
"Time sync: success");
319 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
333 int rssi = NODE_RSSI_UNKNOWN)
352 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
365 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
376 int rssi = NODE_RSSI_UNKNOWN)
379 std::unordered_map<std::string, const SubDBEntry&> entries;
381 const std::scoped_lock lock(
m_mutex);
382 entries =
m_subDB.find(req.topic);
385 for (
auto& [subTopic, entry] : entries) {
386 SPSP_LOGD(
"Calling user callback for topic '%s'",
388 entry.cb(req.topic, req.payload);
403 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
416 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
429 int rssi = NODE_RSSI_UNKNOWN)
431 const std::scoped_lock lock(
m_mutex);
433 auto nowMilliseconds = stoull(req.payload);
441 if (nowMilliseconds < 1e12) {
442 SPSP_LOGE(
"Time sync: invalid time received from bridge: '%s'",
443 req.payload.c_str());
449 struct timeval tv = {
450 .tv_sec =
static_cast<long int>(nowMilliseconds / 1000),
451 .tv_usec =
static_cast<long int>((nowMilliseconds % 1000) * 1000),
454 if (settimeofday(&tv,
nullptr) != 0) {
455 SPSP_LOGE(
"Time sync: settimeofday failed");
460 SPSP_LOGI(
"Time sync: set current time to %llu",
475 LocalMessageT msg = {};
477 msg.type = LocalMessageType::SUB_REQ;
492 using namespace std::chrono_literals;
494 const std::scoped_lock lock(
m_mutex);
496 SPSP_LOGD(
"SubDB: Tick running");
502 SPSP_LOGD(
"SubDB: Topic '%s' expired (renewing)", topic.c_str());
504 bool extended = this->sendSubscribe(topic);
506 m_subDB[topic].lifetime = m_conf.subDB.subLifetime;
508 SPSP_LOGE(
"SubDB: Topic '%s' can't be extended. Will try again in next tick.",
514 SPSP_LOGD(
"SubDB: Tick done");
Generic local node of SPSP.
bool processUnsub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes UNSUB message.
ClientConfig m_conf
Configuration.
~Client()
Destroys the client node.
std::chrono::milliseconds timeSyncTimeout
bool processSubReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_REQ message.
bool subscribe(const std::string &topic, SubscribeCb cb)
Subscribes to topic.
Timer m_subDBTimer
Sub DB timer.
Client subscribe database entry.
bool sendSubscribe(const std::string &topic)
Prepares and sends SUB_REQ message to local layer.
SPSP::SubscribeCb cb
Callback for incoming data.
Simple timer for SPSP purposes.
void subDBTick()
Subscribe DB timer tick callback.
bool sendLocal(const LocalMessageT &msg)
Sends the message to local layer.
bool processTimeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_REQ message.
std::chrono::milliseconds subLifetime
bool m_timeSyncOngoing
Whether time synchronization is ongoing.
bool processPub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PUB message.
WildcardTrie< SubDBEntry > m_subDB
Subscribe database.
std::chrono::milliseconds lifetime
Lifetime.
bool processProbeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_REQ message.
void resubscribeAll()
Resubscribes to all topics.
bool syncTime()
Synchronizes clock with bridge.
std::function< void(const std::string &topic, const std::string &payload)> SubscribeCb
Subscribe callback type.
bool unsubscribe(const std::string &topic)
Unsubscribes from topic.
bool publish(const std::string &topic, const std::string &payload)
Publishes payload to topic.
bool processTimeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_RES message.
bool processSubData(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_DATA message.
bool rssiOnProbe
Report RSSI on PROBE_RES.
bool processProbeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_RES message.
std::mutex m_mutex
Mutex to prevent race conditions.
Client(TLocalLayer *ll, ClientConfig conf={})
Constructs a new client node.
void publishRssi(const LocalAddrT &addr, int rssi)
Publishes RSSI of received message from addr
std::promise< bool > m_timeSyncPromise
Time synchronization promise.
String-based trie with wildcard support.
std::chrono::milliseconds interval
Trie implementation with wildcard support.