SPSP
Simple publish-subscribe protocol. Connects low power IoT clients to MQTT.
All Classes Files Functions Variables Typedefs Enumerations
bridge.hpp
Go to the documentation of this file.
1 
10 #pragma once
11 
12 #include <chrono>
13 #include <mutex>
14 #include <unordered_map>
15 #include <vector>
16 
17 #include "spsp/local_addr_mac.hpp"
18 #include "spsp/logger.hpp"
19 #include "spsp/node.hpp"
20 #include "spsp/timer.hpp"
21 #include "spsp/wildcard_trie.hpp"
22 
23 // Log tag
24 #define SPSP_LOG_TAG "SPSP/Bridge"
25 
26 namespace SPSP::Nodes
27 {
29  static constexpr auto BRIDGE_SUB_NO_EXPIRE = std::chrono::milliseconds::max();
30 
36  struct BridgeConfig
37  {
38  struct Reporting
39  {
40  bool probePayload = true;
41  bool rssiOnProbe = true;
42  bool rssiOnPub = true;
43  bool rssiOnSub = true;
44  bool rssiOnUnsub = true;
45  };
46 
47  struct SubDB
48  {
56  std::chrono::milliseconds interval = std::chrono::minutes(1);
57 
64  std::chrono::milliseconds subLifetime = std::chrono::minutes(15);
65  };
66 
67  Reporting reporting;
68  SubDB subDB;
69  };
70 
81  template <typename TLocalLayer, typename TFarLayer>
82  class Bridge : public ILocalAndFarNode<TLocalLayer, TFarLayer>
83  {
84  public:
85  using LocalAddrT = typename TLocalLayer::LocalAddrT;
86  using LocalMessageT = typename TLocalLayer::LocalMessageT;
87 
88  protected:
96  struct SubDBEntry
97  {
98  std::chrono::milliseconds lifetime;
99  SPSP::SubscribeCb cb = nullptr;
100  };
101 
102  using SubDBMapT = std::unordered_map<LocalAddrT, SubDBEntry>;
103 
104  std::mutex m_mutex;
108 
109  public:
117  Bridge(TLocalLayer* ll, TFarLayer* fl, BridgeConfig conf = {})
119  m_conf{conf},
120  m_subDBTimer{conf.subDB.interval,
122  this)}
123  {
124  SPSP_LOGI("Initialized");
125  }
126 
132  {
133  SPSP_LOGI("Deinitialized");
134  }
135 
146  bool receiveFar(const std::string& topic, const std::string& payload)
147  {
148  const std::scoped_lock lock(m_mutex);
149 
150  SPSP_LOGD("Received far msg: topic '%s', payload '%s'",
151  topic.c_str(), payload.c_str());
152 
153  // Get matching entries
154  auto entries = m_subDB.find(topic);
155 
156  for (auto& [entryTopic, entryMap] : entries) {
157  for (auto& [addr, entry] : entryMap) {
158  if (addr == LocalAddrT{}) {
159  // This node's subscription - call callback
160  SPSP_LOGD("Calling user callback for topic '%s' in new thread",
161  topic.c_str());
162  std::thread t(entry.cb, topic, payload);
163  t.detach();
164  } else {
165  // Local layer subscription
167  this, addr, topic, payload);
168  t.detach();
169  }
170  }
171  }
172 
173  return true;
174  }
175 
187  bool publish(const std::string& topic, const std::string& payload)
188  {
189  SPSP_LOGD("Publishing locally: topic '%s', payload '%s'",
190  topic.c_str(), payload.c_str());
191 
192  if (topic.empty()) {
193  SPSP_LOGW("Can't publish to empty topic");
194  return false;
195  }
196 
197  return this->getFarLayer()->publish(LocalAddrMAC::local().str,
198  topic, payload);
199  }
200 
212  bool subscribe(const std::string& topic, SubscribeCb cb)
213  {
214  const std::scoped_lock lock(m_mutex);
215 
216  SPSP_LOGD("Subscribing locally to topic '%s'", topic.c_str());
217 
218  if (topic.empty()) {
219  SPSP_LOGW("Can't subscribe to empty topic");
220  return false;
221  }
222 
223  auto& entryMap = m_subDB[topic];
224 
225  // Attempt to subscribe to new topic
226  if (entryMap.empty()) {
227  if (!this->getFarLayer()->subscribe(topic)) {
228  return false;
229  }
230  }
231 
232  entryMap[LocalAddrT{}] = SubDBEntry{
233  .lifetime = BRIDGE_SUB_NO_EXPIRE,
234  .cb = cb
235  };
236 
237  return true;
238  }
239 
245  {
246  const std::scoped_lock lock(m_mutex);
247 
249  [this](const std::string& topic, const SubDBMapT& topicEntries) {
250  if (!this->getFarLayer()->subscribe(topic)) {
251  SPSP_LOGW("Resubscribe to topic %s failed",
252  topic.c_str());
253  }
254  }
255  );
256  }
257 
267  bool unsubscribe(const std::string& topic)
268  {
269  SPSP_LOGD("Unsubscribing locally from topic '%s'", topic.c_str());
270 
271  if (topic.empty()) {
272  SPSP_LOGW("Can't unsubscribe from empty topic");
273  return false;
274  }
275 
276  {
277  const std::scoped_lock lock(m_mutex);
278  if (!m_subDB[topic].erase(LocalAddrT{})) {
279  // Entry doesn't exist
280  SPSP_LOGD("Can't unsubscribe from not-subscribed topic '%s'",
281  topic.c_str());
282  return false;
283  }
284  }
285 
286  // Remove unused topics
287  this->subDBRemoveUnusedTopics();
288 
289  return true;
290  }
291 
292  protected:
301  bool processProbeReq(const LocalMessageT& req,
302  int rssi = NODE_RSSI_UNKNOWN)
303  {
304  LocalMessageT res = req;
305  res.type = LocalMessageType::PROBE_RES;
306  res.payload = "";
307 
308  // Publish RSSI
309  if (m_conf.reporting.rssiOnProbe) {
310  this->publishRssi(req.addr, rssi);
311  }
312 
313  // Publish payload
314  if (m_conf.reporting.probePayload && !req.payload.empty()) {
315  std::string probePayloadReportTopic =
316  NODE_REPORTING_TOPIC + "/" +
317  NODE_REPORTING_PROBE_PAYLOAD_SUBTOPIC + "/" +
318  req.addr.str;
319 
320  this->publish(probePayloadReportTopic, req.payload);
321  }
322 
323  return this->sendLocal(res);
324  }
325 
336  bool processProbeRes(const LocalMessageT& req,
337  int rssi = NODE_RSSI_UNKNOWN) { return false; }
338 
347  bool processPub(const LocalMessageT& req,
348  int rssi = NODE_RSSI_UNKNOWN)
349  {
350  // Publish RSSI
351  if (m_conf.reporting.rssiOnPub) {
352  this->publishRssi(req.addr, rssi);
353  }
354 
355  if (req.topic.empty()) {
356  SPSP_LOGW("Can't publish to empty topic");
357  return false;
358  }
359 
360  return this->getFarLayer()->publish(req.addr.str, req.topic,
361  req.payload);
362  }
363 
372  bool processSubReq(const LocalMessageT& req,
373  int rssi = NODE_RSSI_UNKNOWN)
374  {
375  // Publish RSSI
376  if (m_conf.reporting.rssiOnSub) {
377  this->publishRssi(req.addr, rssi);
378  }
379 
380  if (req.topic.empty()) {
381  SPSP_LOGW("Can't subscribe to empty topic");
382  return false;
383  }
384 
385  {
386  const std::scoped_lock lock(m_mutex);
387 
388  auto& entryMap = m_subDB[req.topic];
389 
390  // Attempt to subscribe to new topic
391  if (entryMap.empty()) {
392  if (!this->getFarLayer()->subscribe(req.topic)) {
393  return false;
394  }
395  }
396 
397  entryMap[req.addr] = SubDBEntry{
398  .lifetime = m_conf.subDB.subLifetime,
399  .cb = nullptr
400  };
401  }
402 
403  return true;
404  }
405 
416  bool processSubData(const LocalMessageT& req,
417  int rssi = NODE_RSSI_UNKNOWN) { return false; }
418 
427  bool processUnsub(const LocalMessageT& req,
428  int rssi = NODE_RSSI_UNKNOWN)
429  {
430  // Publish RSSI
431  if (m_conf.reporting.rssiOnUnsub) {
432  this->publishRssi(req.addr, rssi);
433  }
434 
435  if (req.topic.empty()) {
436  SPSP_LOGW("Can't unsubscribe from empty topic");
437  return false;
438  }
439 
440  {
441  const std::scoped_lock lock(m_mutex);
442  m_subDB[req.topic].erase(req.addr);
443  }
444 
445  // Remove unused topics
446  this->subDBRemoveUnusedTopics();
447 
448  return true;
449  }
450 
461  bool processTimeReq(const LocalMessageT& req,
462  int rssi = NODE_RSSI_UNKNOWN)
463  {
464  // Get current time (millisecond accuracy)
465  auto now = std::chrono::system_clock::now().time_since_epoch();
466  auto nowMilliseconds =
467  std::chrono::duration_cast<std::chrono::milliseconds>(now);
468 
469  LocalMessageT res = req;
470  res.type = LocalMessageType::TIME_RES;
471  res.payload = std::to_string(nowMilliseconds.count());
472 
473  return this->sendLocal(res);
474  }
475 
486  bool processTimeRes(const LocalMessageT& req,
487  int rssi = NODE_RSSI_UNKNOWN) { return false; }
488 
498  bool publishSubData(const LocalAddrT& addr, const std::string& topic,
499  const std::string& payload)
500  {
501  SPSP_LOGD("Sending SUB_DATA to %s: topic '%s'",
502  addr.str.c_str(), topic.c_str());
503 
504  LocalMessageT msg = {};
505  msg.addr = addr;
506  msg.type = LocalMessageType::SUB_DATA;
507  msg.topic = topic;
508  msg.payload = payload;
509 
510  return this->sendLocal(msg);
511  }
512 
519  void subDBTick()
520  {
521  SPSP_LOGD("SubDB: Tick running");
522 
523  this->subDBDecrementLifetimes();
525  this->subDBRemoveUnusedTopics();
526 
527  SPSP_LOGD("SubDB: Tick done");
528  }
529 
535  {
536  const std::scoped_lock lock(m_mutex);
537 
538  m_subDB.forEach([this] (const std::string& topic,
539  const SubDBMapT& entryMap) {
540  for (auto& [addr, entry] : entryMap) {
541  // Don't decrement entries with infinite lifetime
542  if (entry.lifetime != BRIDGE_SUB_NO_EXPIRE) {
543  m_subDB[topic][addr].lifetime -= m_conf.subDB.interval;
544  }
545  }
546  });
547  }
548 
554  {
555  using namespace std::chrono_literals;
556 
557  const std::scoped_lock lock(m_mutex);
558 
559  m_subDB.forEach([this] (const std::string& topic,
560  const SubDBMapT& entryMap) {
561  auto entryIt = entryMap.begin();
562  while (entryIt != entryMap.end()) {
563  auto addr = entryIt->first;
564 
565  if (entryIt->second.lifetime <= 0ms) {
566  // Expired
567  entryIt = m_subDB[topic].erase(entryIt);
568  SPSP_LOGD("SubDB: Removed addr %s from topic '%s'",
569  addr.str.c_str(), topic.c_str());
570  } else {
571  // Continue
572  entryIt++;
573  }
574  }
575  });
576  }
577 
583  {
584  const std::scoped_lock lock(m_mutex);
585 
586  std::vector<std::string> unusedTopics;
587 
588  // Get unused topics
589  m_subDB.forEach([&unusedTopics] (const std::string& topic,
590  const SubDBMapT& entryMap) {
591  if (entryMap.empty()) {
592  unusedTopics.push_back(topic);
593  }
594  });
595 
596  // Unsubscribe from them
597  for (auto& topic : unusedTopics) {
598  if (this->getFarLayer()->unsubscribe(topic)) {
599  // Unsub successful, remove topic from sub DB
600  m_subDB.remove(topic);
601  SPSP_LOGD("SubDB: Removed unused topic '%s'", topic.c_str());
602  } else {
603  SPSP_LOGW("SubDB: Topic '%s' can't be unsubscribed. Will try again in next tick.",
604  topic.c_str());
605  }
606  }
607  }
608  };
609 } // namespace SPSP::Nodes
610 
611 #undef SPSP_LOG_TAG
SPSP::Nodes::Bridge::processProbeRes
bool processProbeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_RES message.
Definition: bridge.hpp:336
SPSP::WildcardTrie::find
const FindReturnT find(const std::string &key) const
Finds key in trie.
Definition: wildcard_trie.hpp:169
SPSP::Nodes::Bridge::m_conf
BridgeConfig m_conf
Configuration.
Definition: bridge.hpp:105
SPSP::Nodes::BridgeConfig::SubDB::interval
std::chrono::milliseconds interval
Definition: bridge.hpp:56
SPSP::Nodes::Bridge::SubDBEntry
Bridge subscribe entry.
Definition: bridge.hpp:96
SPSP::Nodes::Bridge::subDBDecrementLifetimes
void subDBDecrementLifetimes()
Decrements lifetimes of entries.
Definition: bridge.hpp:534
SPSP::Nodes::Bridge::resubscribeAll
void resubscribeAll()
Resubscribes to all topics.
Definition: bridge.hpp:244
SPSP::Nodes::Bridge::publishSubData
bool publishSubData(const LocalAddrT &addr, const std::string &topic, const std::string &payload)
Publishes received subscription data to local layer node.
Definition: bridge.hpp:498
SPSP::Nodes::Bridge::processPub
bool processPub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PUB message.
Definition: bridge.hpp:347
node.hpp
Node interface for SPSP.
SPSP::LocalAddrMAC::local
static LocalAddrMAC local()
Constructs a new object from MAC address of this node.
SPSP::Nodes::Bridge::processTimeRes
bool processTimeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_RES message.
Definition: bridge.hpp:486
SPSP::Nodes::BridgeConfig::Reporting
Definition: bridge.hpp:38
SPSP::Nodes::BridgeConfig::Reporting::rssiOnSub
bool rssiOnSub
Report RSSI on SUB_REQ.
Definition: bridge.hpp:43
SPSP::Nodes::Bridge::Bridge
Bridge(TLocalLayer *ll, TFarLayer *fl, BridgeConfig conf={})
Construct a new bridge object.
Definition: bridge.hpp:117
SPSP::Nodes::Bridge::publish
bool publish(const std::string &topic, const std::string &payload)
Publishes payload to topic.
Definition: bridge.hpp:187
SPSP::Nodes::Bridge::receiveFar
bool receiveFar(const std::string &topic, const std::string &payload)
Receives the message from far layer.
Definition: bridge.hpp:146
SPSP::Timer
Simple timer for SPSP purposes.
Definition: timer.hpp:27
timer.hpp
Timer for SPSP purposes.
SPSP::ILocalNode::sendLocal
bool sendLocal(const LocalMessageT &msg)
Sends the message to local layer.
Definition: node.hpp:220
SPSP::Nodes::Bridge::~Bridge
~Bridge()
Destroys the bridge node.
Definition: bridge.hpp:131
SPSP::Nodes::Bridge::subDBRemoveExpiredEntries
void subDBRemoveExpiredEntries()
Removes expired entries.
Definition: bridge.hpp:553
SPSP::Nodes::BridgeConfig::Reporting::rssiOnProbe
bool rssiOnProbe
Report RSSI on PROBE_REQ.
Definition: bridge.hpp:41
SPSP::Nodes::Bridge::subDBTick
void subDBTick()
Subscribe DB timer tick callback.
Definition: bridge.hpp:519
SPSP::Nodes::Bridge::subscribe
bool subscribe(const std::string &topic, SubscribeCb cb)
Subscribes to topic.
Definition: bridge.hpp:212
SPSP::IFarNode::getFarLayer
TFarLayer * getFarLayer() const
Gets the far layer object.
Definition: node.hpp:395
SPSP::Nodes::Bridge::subDBRemoveUnusedTopics
void subDBRemoveUnusedTopics()
Removes and unsubscribes from unused topics.
Definition: bridge.hpp:582
SPSP::WildcardTrie::remove
bool remove(const std::string &key)
Removes key from trie.
Definition: wildcard_trie.hpp:119
SPSP::Nodes::Bridge::processTimeReq
bool processTimeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes TIME_REQ message.
Definition: bridge.hpp:461
SPSP::SubscribeCb
std::function< void(const std::string &topic, const std::string &payload)> SubscribeCb
Subscribe callback type.
Definition: node.hpp:42
SPSP::Nodes::BridgeConfig::SubDB::subLifetime
std::chrono::milliseconds subLifetime
Definition: bridge.hpp:64
SPSP::WildcardTrie::forEach
void forEach(std::function< void(const std::string &key, const TValue &value)> f)
Iterates through each item in trie and calls callback on each one.
Definition: wildcard_trie.hpp:218
SPSP::Nodes::Bridge::m_mutex
std::mutex m_mutex
Mutex to prevent race conditions.
Definition: bridge.hpp:104
SPSP::Nodes::Bridge::unsubscribe
bool unsubscribe(const std::string &topic)
Unsubscribes from topic.
Definition: bridge.hpp:267
SPSP::Nodes::Bridge::processUnsub
bool processUnsub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes UNSUB message.
Definition: bridge.hpp:427
SPSP::Nodes::Bridge::processSubData
bool processSubData(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_DATA message.
Definition: bridge.hpp:416
SPSP::Nodes::Bridge::m_subDB
WildcardTrie< SubDBMapT > m_subDB
Subscribe database.
Definition: bridge.hpp:106
SPSP::Nodes::Bridge::SubDBEntry::cb
SPSP::SubscribeCb cb
Callback for incoming data.
Definition: bridge.hpp:99
SPSP::Nodes::Bridge::m_subDBTimer
Timer m_subDBTimer
Sub DB timer.
Definition: bridge.hpp:107
SPSP::ILocalNode::publishRssi
void publishRssi(const LocalAddrT &addr, int rssi)
Publishes RSSI of received message from addr
Definition: node.hpp:250
SPSP::Nodes::BridgeConfig::SubDB
Definition: bridge.hpp:47
SPSP::ILocalAndFarNode
Generic local and far layer node of SPSP.
Definition: node.hpp:408
SPSP::Nodes::Bridge
Bridge node.
Definition: bridge.hpp:82
SPSP::Nodes::BridgeConfig::Reporting::probePayload
bool probePayload
Report non-empty payload of PROBE_REQ.
Definition: bridge.hpp:40
SPSP::WildcardTrie< SubDBMapT >
SPSP::Nodes::Bridge::processSubReq
bool processSubReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes SUB_REQ message.
Definition: bridge.hpp:372
SPSP::Nodes::BridgeConfig
Bridge configuration.
Definition: bridge.hpp:36
SPSP::Nodes::BridgeConfig::Reporting::rssiOnPub
bool rssiOnPub
Report RSSI on PUB.
Definition: bridge.hpp:42
wildcard_trie.hpp
Trie implementation with wildcard support.
SPSP::Nodes::BridgeConfig::Reporting::rssiOnUnsub
bool rssiOnUnsub
Report RSSI on UNSUB.
Definition: bridge.hpp:44
SPSP::Nodes::Bridge::processProbeReq
bool processProbeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)
Processes PROBE_REQ message.
Definition: bridge.hpp:301
local_addr_mac.hpp
Local layer address container for MAC address.
SPSP::Nodes::Bridge::SubDBEntry::lifetime
std::chrono::milliseconds lifetime
Lifetime.
Definition: bridge.hpp:98