14 #include <unordered_map>
18 #include "spsp/logger.hpp"
24 #define SPSP_LOG_TAG "SPSP/Bridge"
29 static constexpr
auto BRIDGE_SUB_NO_EXPIRE = std::chrono::milliseconds::max();
56 std::chrono::milliseconds
interval = std::chrono::minutes(1);
64 std::chrono::milliseconds
subLifetime = std::chrono::minutes(15);
81 template <
typename TLocalLayer,
typename TFarLayer>
85 using LocalAddrT =
typename TLocalLayer::LocalAddrT;
86 using LocalMessageT =
typename TLocalLayer::LocalMessageT;
102 using SubDBMapT = std::unordered_map<LocalAddrT, SubDBEntry>;
124 SPSP_LOGI(
"Initialized");
133 SPSP_LOGI(
"Deinitialized");
146 bool receiveFar(
const std::string& topic,
const std::string& payload)
148 const std::scoped_lock lock(
m_mutex);
150 SPSP_LOGD(
"Received far msg: topic '%s', payload '%s'",
151 topic.c_str(), payload.c_str());
156 for (
auto& [entryTopic, entryMap] : entries) {
157 for (
auto& [addr, entry] : entryMap) {
158 if (addr == LocalAddrT{}) {
160 SPSP_LOGD(
"Calling user callback for topic '%s' in new thread",
162 std::thread t(entry.cb, topic, payload);
167 this, addr, topic, payload);
187 bool publish(
const std::string& topic,
const std::string& payload)
189 SPSP_LOGD(
"Publishing locally: topic '%s', payload '%s'",
190 topic.c_str(), payload.c_str());
193 SPSP_LOGW(
"Can't publish to empty topic");
214 const std::scoped_lock lock(
m_mutex);
216 SPSP_LOGD(
"Subscribing locally to topic '%s'", topic.c_str());
219 SPSP_LOGW(
"Can't subscribe to empty topic");
223 auto& entryMap =
m_subDB[topic];
226 if (entryMap.empty()) {
246 const std::scoped_lock lock(
m_mutex);
249 [
this](
const std::string& topic,
const SubDBMapT& topicEntries) {
251 SPSP_LOGW(
"Resubscribe to topic %s failed",
269 SPSP_LOGD(
"Unsubscribing locally from topic '%s'", topic.c_str());
272 SPSP_LOGW(
"Can't unsubscribe from empty topic");
277 const std::scoped_lock lock(
m_mutex);
278 if (!
m_subDB[topic].erase(LocalAddrT{})) {
280 SPSP_LOGD(
"Can't unsubscribe from not-subscribed topic '%s'",
302 int rssi = NODE_RSSI_UNKNOWN)
304 LocalMessageT res = req;
305 res.type = LocalMessageType::PROBE_RES;
315 std::string probePayloadReportTopic =
316 NODE_REPORTING_TOPIC +
"/" +
317 NODE_REPORTING_PROBE_PAYLOAD_SUBTOPIC +
"/" +
320 this->
publish(probePayloadReportTopic, req.payload);
337 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
348 int rssi = NODE_RSSI_UNKNOWN)
355 if (req.topic.empty()) {
356 SPSP_LOGW(
"Can't publish to empty topic");
360 return this->
getFarLayer()->publish(req.addr.str, req.topic,
373 int rssi = NODE_RSSI_UNKNOWN)
380 if (req.topic.empty()) {
381 SPSP_LOGW(
"Can't subscribe to empty topic");
386 const std::scoped_lock lock(
m_mutex);
388 auto& entryMap =
m_subDB[req.topic];
391 if (entryMap.empty()) {
417 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
428 int rssi = NODE_RSSI_UNKNOWN)
435 if (req.topic.empty()) {
436 SPSP_LOGW(
"Can't unsubscribe from empty topic");
441 const std::scoped_lock lock(
m_mutex);
442 m_subDB[req.topic].erase(req.addr);
462 int rssi = NODE_RSSI_UNKNOWN)
465 auto now = std::chrono::system_clock::now().time_since_epoch();
466 auto nowMilliseconds =
467 std::chrono::duration_cast<std::chrono::milliseconds>(now);
469 LocalMessageT res = req;
470 res.type = LocalMessageType::TIME_RES;
471 res.payload = std::to_string(nowMilliseconds.count());
487 int rssi = NODE_RSSI_UNKNOWN) {
return false; }
499 const std::string& payload)
501 SPSP_LOGD(
"Sending SUB_DATA to %s: topic '%s'",
502 addr.str.c_str(), topic.c_str());
504 LocalMessageT msg = {};
506 msg.type = LocalMessageType::SUB_DATA;
508 msg.payload = payload;
521 SPSP_LOGD(
"SubDB: Tick running");
527 SPSP_LOGD(
"SubDB: Tick done");
536 const std::scoped_lock lock(
m_mutex);
539 const SubDBMapT& entryMap) {
540 for (
auto& [addr, entry] : entryMap) {
542 if (entry.lifetime != BRIDGE_SUB_NO_EXPIRE) {
543 m_subDB[topic][addr].lifetime -= m_conf.subDB.interval;
555 using namespace std::chrono_literals;
557 const std::scoped_lock lock(
m_mutex);
560 const SubDBMapT& entryMap) {
561 auto entryIt = entryMap.begin();
562 while (entryIt != entryMap.end()) {
563 auto addr = entryIt->first;
565 if (entryIt->second.lifetime <= 0ms) {
567 entryIt = m_subDB[topic].erase(entryIt);
568 SPSP_LOGD(
"SubDB: Removed addr %s from topic '%s'",
569 addr.str.c_str(), topic.c_str());
584 const std::scoped_lock lock(
m_mutex);
586 std::vector<std::string> unusedTopics;
590 const SubDBMapT& entryMap) {
591 if (entryMap.empty()) {
592 unusedTopics.push_back(topic);
597 for (
auto& topic : unusedTopics) {
601 SPSP_LOGD(
"SubDB: Removed unused topic '%s'", topic.c_str());
603 SPSP_LOGW(
"SubDB: Topic '%s' can't be unsubscribed. Will try again in next tick.",