SPSP
Simple publish-subscribe protocol. Connects low power IoT clients to MQTT.
All Classes Files Functions Variables Typedefs Enumerations
client.hpp
Go to the documentation of this file.
1 
10 #pragma once
11 
12 #include <chrono>
13 #include <cinttypes>
14 #include <future>
15 #include <mutex>
16 #include <unordered_map>
17 #include <sys/time.h> // Unix and ESP
18 
19 #include "spsp/logger.hpp"
20 #include "spsp/node.hpp"
21 #include "spsp/timer.hpp"
22 #include "spsp/wildcard_trie.hpp"
23 
24 // Log tag
25 #define SPSP_LOG_TAG "SPSP/Client"
26 
27 namespace SPSP::Nodes
28 {
34  struct ClientConfig
35  {
36  struct Reporting
37  {
38  bool rssiOnProbe = true;
39  };
40 
41  struct SubDB
42  {
50  std::chrono::milliseconds interval = std::chrono::minutes(1);
51 
58  std::chrono::milliseconds subLifetime = std::chrono::minutes(10);
59  };
60 
61  Reporting reporting;
62  SubDB subDB;
63 
68  std::chrono::milliseconds timeSyncTimeout = std::chrono::seconds(2);
69  };
70 
76  template <typename TLocalLayer>
77  class Client : public ILocalNode<TLocalLayer>
78  {
79  protected:
85  struct SubDBEntry
86  {
87  std::chrono::milliseconds lifetime;
88  SPSP::SubscribeCb cb = nullptr;
89  };
90 
91  std::mutex m_mutex;
95  bool m_timeSyncOngoing = false;
96  std::promise<bool> m_timeSyncPromise;
97 
98  public:
99  using LocalAddrT = typename TLocalLayer::LocalAddrT;
100  using LocalMessageT = typename TLocalLayer::LocalMessageT;
101 
108  Client(TLocalLayer* ll, ClientConfig conf = {})
109  : ILocalNode<TLocalLayer>{ll}, m_conf{conf}, m_subDB{},
110  m_subDBTimer{conf.subDB.interval,
111  std::bind(&Client<TLocalLayer>::subDBTick, this)}
112  {
113  SPSP_LOGI("Initialized");
114  }
115 
121  {
122  SPSP_LOGI("Deinitialized");
123  }
124 
136  bool publish(const std::string& topic, const std::string& payload)
137  {
138  SPSP_LOGD("Publishing: topic '%s', payload '%s'",
139  topic.c_str(), payload.c_str());
140 
141  if (topic.empty()) {
142  SPSP_LOGW("Can't publish to empty topic");
143  return false;
144  }
145 
146  LocalMessageT msg = {};
147  // msg.addr is default => send to the bridge node
148  msg.type = LocalMessageType::PUB;
149  msg.topic = topic;
150  msg.payload = payload;
151 
152  return this->sendLocal(msg);
153  }
154 
168  bool subscribe(const std::string& topic, SubscribeCb cb)
169  {
170  SPSP_LOGD("Subscribing to topic '%s'", topic.c_str());
171 
172  if (topic.empty()) {
173  SPSP_LOGW("Can't subscribe to empty topic");
174  return false;
175  }
176 
177  if (this->sendSubscribe(topic)) {
178  // Subscribe request delivered successfully
179  const std::scoped_lock lock(m_mutex);
180 
181  // Add to sub DB
182  SubDBEntry subDBEntry = {
183  .lifetime = m_conf.subDB.subLifetime,
184  .cb = cb
185  };
186  m_subDB.insert(topic, subDBEntry);
187 
188  return true;
189  }
190 
191  return false;
192  }
193 
203  bool unsubscribe(const std::string& topic)
204  {
205  SPSP_LOGD("Unsubscribing from topic '%s'", topic.c_str());
206 
207  if (topic.empty()) {
208  SPSP_LOGW("Can't unsubscribe from empty topic");
209  return false;
210  }
211 
212  LocalMessageT msg = {};
213  // msg.addr is default => send to the bridge node
214  msg.type = LocalMessageType::UNSUB;
215  msg.topic = topic;
216  // msg.payload is empty
217 
218  // Remove from sub DB
219  {
220  const std::scoped_lock lock(m_mutex);
221  if (!m_subDB.remove(topic)) {
222  // Not subscribed to this topic
223  SPSP_LOGW("Can't unsubscribe from not-subscribed topic '%s'",
224  topic.c_str());
225  return false;
226  }
227  }
228 
229  // Explicitly unsubscribe from bridge
230  // If this fails, the timeout on bridge will just expire in
231  // couple of minutes.
232  this->sendLocal(msg);
233 
234  return true;
235  }
236 
242  {
243  const std::scoped_lock lock(m_mutex);
244 
245  m_subDB.forEach([this](const std::string& topic, const SubDBEntry&) {
246  if (!this->sendSubscribe(topic)) {
247  SPSP_LOGW("Resubscribe to topic %s failed", topic.c_str());
248  }
249  });
250  }
251 
258  bool syncTime()
259  {
260  SPSP_LOGD("Time sync: start");
261 
262  // Cleanup lambda to reset state
263  auto cleanup = [this]() {
264  const std::scoped_lock lock(m_mutex);
265  m_timeSyncOngoing = false;
266  m_timeSyncPromise = std::promise<bool>{};
267  };
268 
269  LocalMessageT msg = {};
270  // msg.addr is default => send to the bridge node
271  msg.type = LocalMessageType::TIME_REQ;
272  // msg.topic is empty
273  // msg.payload is empty
274 
275  // Set state and get future
276  std::future<bool> future;
277  {
278  const std::scoped_lock lock(m_mutex);
279  m_timeSyncOngoing = true;
280  future = m_timeSyncPromise.get_future();
281  }
282 
283  // Try to send sync request
284  if (!this->sendLocal(msg)) {
285  cleanup();
286  SPSP_LOGE("Time sync: request can't be sent");
287  return false;
288  }
289 
290  if (future.wait_for(m_conf.timeSyncTimeout) == std::future_status::timeout) {
291  cleanup();
292  SPSP_LOGE("Time sync: response timeout");
293  return false;
294  }
295 
296  if (future.get() == false) {
297  cleanup();
298  SPSP_LOGE("Time sync: invalid bridge response");
299  return false;
300  }
301 
302  cleanup();
303  SPSP_LOGD("Time sync: success");
304  return true;
305  }
306 
307  protected:
318  bool processProbeReq(const LocalMessageT& req,
319  int rssi = NODE_RSSI_UNKNOWN) { return false; }
320 
332  bool processProbeRes(const LocalMessageT& req,
333  int rssi = NODE_RSSI_UNKNOWN)
334  {
335  if (m_conf.reporting.rssiOnProbe) {
336  this->publishRssi(req.addr, rssi);
337  }
338  return true;
339  }
340 
351  bool processPub(const LocalMessageT& req,
352  int rssi = NODE_RSSI_UNKNOWN) { return false; }
353 
364  bool processSubReq(const LocalMessageT& req,
365  int rssi = NODE_RSSI_UNKNOWN) { return false; }
366 
375  bool processSubData(const LocalMessageT& req,
376  int rssi = NODE_RSSI_UNKNOWN)
377  {
378  // Get matching entries
379  std::unordered_map<std::string, const SubDBEntry&> entries;
380  {
381  const std::scoped_lock lock(m_mutex);
382  entries = m_subDB.find(req.topic);
383  }
384 
385  for (auto& [subTopic, entry] : entries) {
386  SPSP_LOGD("Calling user callback for topic '%s'",
387  req.topic.c_str());
388  entry.cb(req.topic, req.payload);
389  }
390 
391  return true;
392  }
393 
402  bool processUnsub(const LocalMessageT& req,
403  int rssi = NODE_RSSI_UNKNOWN) { return false; }
404 
415  bool processTimeReq(const LocalMessageT& req,
416  int rssi = NODE_RSSI_UNKNOWN) { return false; }
417 
428  bool processTimeRes(const LocalMessageT& req,
429  int rssi = NODE_RSSI_UNKNOWN)
430  {
431  const std::scoped_lock lock(m_mutex);
432 
433  auto nowMilliseconds = stoull(req.payload);
434 
435  // No time sync ongoing now
436  if (!m_timeSyncOngoing) {
437  return false;
438  }
439 
440  // Timestamp must have at least 13 digits
441  if (nowMilliseconds < 1e12) {
442  SPSP_LOGE("Time sync: invalid time received from bridge: '%s'",
443  req.payload.c_str());
444  m_timeSyncPromise.set_value(false);
445  return false;
446  }
447 
448  // Set current time
449  struct timeval tv = {
450  .tv_sec = static_cast<long int>(nowMilliseconds / 1000),
451  .tv_usec = static_cast<long int>((nowMilliseconds % 1000) * 1000),
452  };
453 
454  if (settimeofday(&tv, nullptr) != 0) {
455  SPSP_LOGE("Time sync: settimeofday failed");
456  m_timeSyncPromise.set_value(false);
457  return false;
458  }
459 
460  SPSP_LOGI("Time sync: set current time to %llu",
461  nowMilliseconds);
462  m_timeSyncPromise.set_value(true);
463  return true;
464  }
465 
473  bool sendSubscribe(const std::string& topic)
474  {
475  LocalMessageT msg = {};
476  // msg.addr is default => send to the bridge node
477  msg.type = LocalMessageType::SUB_REQ;
478  msg.topic = topic;
479  // msg.payload is empty
480 
481  return this->sendLocal(msg);
482  }
483 
490  void subDBTick()
491  {
492  using namespace std::chrono_literals;
493 
494  const std::scoped_lock lock(m_mutex);
495 
496  SPSP_LOGD("SubDB: Tick running");
497 
498  m_subDB.forEach([this](const std::string& topic, const SubDBEntry& entry) {
499  m_subDB[topic].lifetime -= m_conf.subDB.interval;
500 
501  if (entry.lifetime <= 0ms) {
502  SPSP_LOGD("SubDB: Topic '%s' expired (renewing)", topic.c_str());
503 
504  bool extended = this->sendSubscribe(topic);
505  if (extended) {
506  m_subDB[topic].lifetime = m_conf.subDB.subLifetime;
507  } else {
508  SPSP_LOGE("SubDB: Topic '%s' can't be extended. Will try again in next tick.",
509  topic.c_str());
510  }
511  }
512  });
513 
514  SPSP_LOGD("SubDB: Tick done");
515  }
516  };
517 } // namespace SPSP::Nodes
518 
519 #undef SPSP_LOG_TAG
SPSP::ILocalNode
Generic local node of SPSP.
Definition: layers.hpp:17
SPSP::Nodes::Client::processUnsub
bool processUnsub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes UNSUB message.
Definition: client.hpp:402
SPSP::Nodes::Client::m_conf
ClientConfig m_conf
Configuration.
Definition: client.hpp:92
SPSP::Nodes::Client::~Client
~Client()
Destroys the client node.
Definition: client.hpp:120
SPSP::Nodes::ClientConfig::timeSyncTimeout
std::chrono::milliseconds timeSyncTimeout
Definition: client.hpp:68
SPSP::Nodes::Client::processSubReq
bool processSubReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_REQ message.
Definition: client.hpp:364
SPSP::Nodes::Client::subscribe
bool subscribe(const std::string &topic, SubscribeCb cb)
Subscribes to topic.
Definition: client.hpp:168
SPSP::Nodes::Client::m_subDBTimer
Timer m_subDBTimer
Sub DB timer.
Definition: client.hpp:94
SPSP::Nodes::ClientConfig::SubDB
Definition: client.hpp:41
node.hpp
Node interface for SPSP.
SPSP::Nodes::Client::SubDBEntry
Client subscribe database entry.
Definition: client.hpp:85
SPSP::Nodes::Client::sendSubscribe
bool sendSubscribe(const std::string &topic)
Prepares and sends SUB_REQ message to local layer.
Definition: client.hpp:473
SPSP::Nodes::Client::SubDBEntry::cb
SPSP::SubscribeCb cb
Callback for incoming data.
Definition: client.hpp:88
SPSP::Timer
Simple timer for SPSP purposes.
Definition: timer.hpp:27
timer.hpp
Timer for SPSP purposes.
SPSP::Nodes::ClientConfig
Client configuration.
Definition: client.hpp:34
SPSP::Nodes::Client::subDBTick
void subDBTick()
Subscribe DB timer tick callback.
Definition: client.hpp:490
SPSP::ILocalNode::sendLocal
bool sendLocal(const LocalMessageT &msg)
Sends the message to local layer.
Definition: node.hpp:220
SPSP::Nodes::Client::processTimeReq
bool processTimeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_REQ message.
Definition: client.hpp:415
SPSP::Nodes::ClientConfig::SubDB::subLifetime
std::chrono::milliseconds subLifetime
Definition: client.hpp:58
SPSP::Nodes::Client::m_timeSyncOngoing
bool m_timeSyncOngoing
Whether time synchronization is ongoing.
Definition: client.hpp:95
SPSP::Nodes::Client::processPub
bool processPub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PUB message.
Definition: client.hpp:351
SPSP::Nodes::Client::m_subDB
WildcardTrie< SubDBEntry > m_subDB
Subscribe database.
Definition: client.hpp:93
SPSP::Nodes::Client::SubDBEntry::lifetime
std::chrono::milliseconds lifetime
Lifetime.
Definition: client.hpp:87
SPSP::Nodes::Client
Client node.
Definition: client.hpp:77
SPSP::Nodes::Client::processProbeReq
bool processProbeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_REQ message.
Definition: client.hpp:318
SPSP::Nodes::Client::resubscribeAll
void resubscribeAll()
Resubscribes to all topics.
Definition: client.hpp:241
SPSP::Nodes::Client::syncTime
bool syncTime()
Synchronizes clock with bridge.
Definition: client.hpp:258
SPSP::SubscribeCb
std::function< void(const std::string &topic, const std::string &payload)> SubscribeCb
Subscribe callback type.
Definition: node.hpp:42
SPSP::Nodes::ClientConfig::Reporting
Definition: client.hpp:36
SPSP::Nodes::Client::unsubscribe
bool unsubscribe(const std::string &topic)
Unsubscribes from topic.
Definition: client.hpp:203
SPSP::Nodes::Client::publish
bool publish(const std::string &topic, const std::string &payload)
Publishes payload to topic.
Definition: client.hpp:136
SPSP::Nodes::Client::processTimeRes
bool processTimeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_RES message.
Definition: client.hpp:428
SPSP::Nodes::Client::processSubData
bool processSubData(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_DATA message.
Definition: client.hpp:375
SPSP::Nodes::ClientConfig::Reporting::rssiOnProbe
bool rssiOnProbe
Report RSSI on PROBE_RES.
Definition: client.hpp:38
SPSP::Nodes::Client::processProbeRes
bool processProbeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_RES message.
Definition: client.hpp:332
SPSP::Nodes::Client::m_mutex
std::mutex m_mutex
Mutex to prevent race conditions.
Definition: client.hpp:91
SPSP::Nodes::Client::Client
Client(TLocalLayer *ll, ClientConfig conf={})
Constructs a new client node.
Definition: client.hpp:108
SPSP::ILocalNode::publishRssi
void publishRssi(const LocalAddrT &addr, int rssi)
Publishes RSSI of received message from addr
Definition: node.hpp:250
SPSP::Nodes::Client::m_timeSyncPromise
std::promise< bool > m_timeSyncPromise
Time synchronization promise.
Definition: client.hpp:96
SPSP::WildcardTrie
String-based trie with wildcard support.
Definition: wildcard_trie.hpp:34
SPSP::Nodes::ClientConfig::SubDB::interval
std::chrono::milliseconds interval
Definition: client.hpp:50
wildcard_trie.hpp
Trie implementation with wildcard support.