SPSP
Simple publish-subscribe protocol. Connects low power IoT clients to MQTT.
All Classes Files Functions Variables Typedefs Enumerations
node.hpp
Go to the documentation of this file.
1 
10 #pragma once
11 
12 #include <chrono>
13 #include <cinttypes>
14 #include <climits>
15 #include <functional>
16 #include <memory>
17 #include <mutex>
18 #include <thread>
19 
20 #include "spsp/layers.hpp"
21 #include "spsp/local_message.hpp"
22 #include "spsp/logger.hpp"
23 #include "spsp/version.hpp"
24 
25 // Log tag
26 #define SPSP_LOG_TAG "SPSP/Node"
27 
28 namespace SPSP
29 {
30  // Topics for reporting
31  static const std::string NODE_REPORTING_TOPIC = "_report";
32  static const std::string NODE_REPORTING_RSSI_SUBTOPIC = "rssi";
33  static const std::string NODE_REPORTING_PROBE_PAYLOAD_SUBTOPIC = "probe_payload";
34 
35  static const int NODE_RSSI_UNKNOWN = INT_MIN;
36 
41  using SubscribeCb = std::function<void(const std::string& topic,
42  const std::string& payload)>;
43 
49  class INode
50  {
51  public:
57  {
58  SPSP_LOGI("SPSP version: %s", SPSP::VERSION);
59  }
60 
71  virtual bool publish(const std::string& topic,
72  const std::string& payload) = 0;
73 
84  virtual bool subscribe(const std::string& topic, SubscribeCb cb) = 0;
85 
95  virtual bool unsubscribe(const std::string& topic) = 0;
96 
101  virtual void resubscribeAll() = 0;
102  };
103 
109  template <typename TLocalLayer>
110  class ILocalNode : virtual public INode
111  {
112  using LocalAddrT = typename TLocalLayer::LocalAddrT;
113  using LocalMessageT = typename TLocalLayer::LocalMessageT;
114  using LocalRecvSendCb = std::function<void(const LocalMessageT&)>;
115 
116  TLocalLayer* m_ll;
117  LocalRecvSendCb m_localRecvSendCb = nullptr;
118 
119  public:
124  ILocalNode(TLocalLayer* ll) : m_ll{ll}
125  {
126  m_ll->setNode(this);
127  }
128 
137  void receiveLocal(const LocalMessageT& msg, int rssi = NODE_RSSI_UNKNOWN)
138  {
139  if (rssi != NODE_RSSI_UNKNOWN) {
140  SPSP_LOGI("Received local msg: %s (%d dBm)",
141  msg.toString().c_str(), rssi);
142  } else {
143  SPSP_LOGI("Received local msg: %s", msg.toString().c_str());
144  }
145 
146  // Call receive/send callback
147  if (m_localRecvSendCb != nullptr) {
148  SPSP_LOGD("Calling receive/send callback");
149  m_localRecvSendCb(msg);
150  }
151 
152  bool processed = false;
153  auto processingTimeBegin = std::chrono::steady_clock::now();
154 
155  // Call responsible handler
156  switch (msg.type) {
157  case LocalMessageType::PROBE_REQ: processed = processProbeReq(msg, rssi); break;
158  case LocalMessageType::PROBE_RES: processed = processProbeRes(msg, rssi); break;
159  case LocalMessageType::PUB: processed = processPub(msg, rssi); break;
160  case LocalMessageType::SUB_REQ: processed = processSubReq(msg, rssi); break;
161  case LocalMessageType::SUB_DATA: processed = processSubData(msg, rssi); break;
162  case LocalMessageType::UNSUB: processed = processUnsub(msg, rssi); break;
163  case LocalMessageType::TIME_REQ: processed = processTimeReq(msg, rssi); break;
164  case LocalMessageType::TIME_RES: processed = processTimeRes(msg, rssi); break;
165  default:
166  SPSP_LOGW("Unprocessable message type %s (%d)",
167  localMessageTypeToStr(msg.type),
168  static_cast<uint8_t>(msg.type));
169  break;
170  }
171 
172  auto processingTimeEnd = std::chrono::steady_clock::now();
173  auto processingDuration = std::chrono::duration_cast
174  <std::chrono::milliseconds>(processingTimeEnd
175  - processingTimeBegin);
176 
177  if (processed) {
178  SPSP_LOGD("Message processed (%" PRId64 " ms): %s",
179  processingDuration.count(), msg.toString().c_str());
180  } else {
181  SPSP_LOGW("Message not processed (%" PRId64 " ms): %s",
182  processingDuration.count(), msg.toString().c_str());
183  }
184  }
185 
197  void setLocalRecvSendCb(LocalRecvSendCb cb)
198  {
199  m_localRecvSendCb = cb;
200  }
201 
202  protected:
208  inline TLocalLayer* getLocalLayer() const
209  {
210  return m_ll;
211  }
212 
220  bool sendLocal(const LocalMessageT& msg)
221  {
222  SPSP_LOGI("Sending local msg: %s", msg.toString().c_str());
223 
224  // Send to local layer
225  bool delivered = m_ll->send(msg);
226 
227  if (delivered) {
228  SPSP_LOGD("Message delivered: %s", msg.toString().c_str());
229  } else {
230  SPSP_LOGW("Message not delivered: %s", msg.toString().c_str());
231  }
232 
233  // Call receive/send callback
234  if (m_localRecvSendCb != nullptr) {
235  SPSP_LOGD("Calling receive/send callback");
236  m_localRecvSendCb(msg);
237  }
238 
239  return delivered;
240  }
241 
250  void publishRssi(const LocalAddrT& addr, int rssi)
251  {
252  if (rssi == NODE_RSSI_UNKNOWN) return;
253 
254  // Spawn new thread for this publish
255  std::thread t([this, addr, rssi] {
256  std::string topic = NODE_REPORTING_TOPIC + "/"
257  + NODE_REPORTING_RSSI_SUBTOPIC + "/"
258  + addr.str;
259 
260  this->publish(topic, std::to_string(rssi));
261  });
262  t.detach();
263  }
264 
273  virtual bool processProbeReq(const LocalMessageT& req,
274  int rssi = NODE_RSSI_UNKNOWN) = 0;
275 
284  virtual bool processProbeRes(const LocalMessageT& req,
285  int rssi = NODE_RSSI_UNKNOWN) = 0;
286 
295  virtual bool processPub(const LocalMessageT& req,
296  int rssi = NODE_RSSI_UNKNOWN) = 0;
297 
306  virtual bool processSubReq(const LocalMessageT& req,
307  int rssi = NODE_RSSI_UNKNOWN) = 0;
308 
317  virtual bool processSubData(const LocalMessageT& req,
318  int rssi = NODE_RSSI_UNKNOWN) = 0;
319 
328  virtual bool processUnsub(const LocalMessageT& req,
329  int rssi = NODE_RSSI_UNKNOWN) = 0;
330 
339  virtual bool processTimeReq(const LocalMessageT& req,
340  int rssi = NODE_RSSI_UNKNOWN) = 0;
341 
350  virtual bool processTimeRes(const LocalMessageT& req,
351  int rssi = NODE_RSSI_UNKNOWN) = 0;
352  };
353 
361  template <typename TFarLayer>
362  class IFarNode : virtual public INode
363  {
364  TFarLayer* m_fl;
365 
366  public:
371  IFarNode(TFarLayer* fl) : m_fl{fl}
372  {
373  m_fl->setNode(this);
374  }
375 
386  virtual bool receiveFar(const std::string& topic,
387  const std::string& payload) = 0;
388 
389  protected:
395  inline TFarLayer* getFarLayer() const
396  {
397  return m_fl;
398  }
399  };
400 
407  template<typename TLocalLayer, typename TFarLayer>
408  class ILocalAndFarNode : public ILocalNode<TLocalLayer>,
409  public IFarNode<TFarLayer>
410  {
411  public:
416  ILocalAndFarNode(TLocalLayer *ll, TFarLayer *fl)
417  : ILocalNode<TLocalLayer>(ll), IFarNode<TFarLayer>(fl) {}
418  };
419 } // namespace SPSP
420 
421 #undef SPSP_LOG_TAG
SPSP::ILocalAndFarNode::ILocalAndFarNode
ILocalAndFarNode(TLocalLayer *ll, TFarLayer *fl)
Constructs a new node.
Definition: node.hpp:416
SPSP::ILocalNode
Generic local node of SPSP.
Definition: layers.hpp:17
SPSP::INode::subscribe
virtual bool subscribe(const std::string &topic, SubscribeCb cb)=0
Subscribes to topic.
SPSP::ILocalNode::processProbeRes
virtual bool processProbeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes PROBE_RES message.
SPSP::INode
Most generic node type of SPSP.
Definition: node.hpp:49
SPSP::IFarNode::receiveFar
virtual bool receiveFar(const std::string &topic, const std::string &payload)=0
Receives the message from far layer.
SPSP::INode::unsubscribe
virtual bool unsubscribe(const std::string &topic)=0
Unsubscribes from topic.
SPSP::INode::publish
virtual bool publish(const std::string &topic, const std::string &payload)=0
Publishes payload to topic.
SPSP::ILocalNode::getLocalLayer
TLocalLayer * getLocalLayer() const
Gets the far layer object.
Definition: node.hpp:208
SPSP::INode::INode
INode()
Constructs a new generic node.
Definition: node.hpp:56
version.hpp
Current version of SPSP.
SPSP::ILocalNode::processProbeReq
virtual bool processProbeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes PROBE_REQ message.
layers.hpp
Local and far layers for SPSP.
SPSP::ILocalNode::sendLocal
bool sendLocal(const LocalMessageT &msg)
Sends the message to local layer.
Definition: node.hpp:220
SPSP::INode::resubscribeAll
virtual void resubscribeAll()=0
Resubscribes to all topics.
SPSP::IFarNode::getFarLayer
TFarLayer * getFarLayer() const
Gets the far layer object.
Definition: node.hpp:395
local_message.hpp
Local message classes.
SPSP::localMessageTypeToStr
constexpr const char * localMessageTypeToStr(LocalMessageType mt) noexcept
Helper to convert LocalMessageType to string representation.
Definition: local_message.hpp:43
SPSP::IFarNode
Generic far layer node of SPSP.
Definition: layers.hpp:18
SPSP::ILocalNode::ILocalNode
ILocalNode(TLocalLayer *ll)
Constructs a new node.
Definition: node.hpp:124
SPSP::SubscribeCb
std::function< void(const std::string &topic, const std::string &payload)> SubscribeCb
Subscribe callback type.
Definition: node.hpp:42
SPSP::ILocalNode::processSubData
virtual bool processSubData(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes SUB_DATA message.
SPSP::ILocalNode::receiveLocal
void receiveLocal(const LocalMessageT &msg, int rssi=NODE_RSSI_UNKNOWN)
Receives the message from local layer.
Definition: node.hpp:137
SPSP::ILocalNode::setLocalRecvSendCb
void setLocalRecvSendCb(LocalRecvSendCb cb)
Sets local receive/send callback function.
Definition: node.hpp:197
SPSP::ILocalNode::processUnsub
virtual bool processUnsub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes UNSUB message.
SPSP::ILocalNode::processTimeReq
virtual bool processTimeReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes TIME_REQ message.
SPSP::ILocalNode::publishRssi
void publishRssi(const LocalAddrT &addr, int rssi)
Publishes RSSI of received message from addr
Definition: node.hpp:250
SPSP::ILocalNode::processTimeRes
virtual bool processTimeRes(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes TIME_RES message.
SPSP::ILocalAndFarNode
Generic local and far layer node of SPSP.
Definition: node.hpp:408
SPSP::ILocalNode::processSubReq
virtual bool processSubReq(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes SUB_REQ message.
SPSP::IFarNode::IFarNode
IFarNode(TFarLayer *fl)
Constructs a new node.
Definition: node.hpp:371
SPSP::VERSION
constexpr const char *const VERSION
Version of SPSP.
Definition: version.hpp:18
SPSP::ILocalNode::processPub
virtual bool processPub(const LocalMessageT &req, int rssi=NODE_RSSI_UNKNOWN)=0
Processes PUB message.