diff --git a/include/rtps/common/MD5.h b/include/rtps/common/MD5.h new file mode 100644 index 00000000..27408046 --- /dev/null +++ b/include/rtps/common/MD5.h @@ -0,0 +1,23 @@ +// +// Created by Sven on 14.10.21. +// + +#ifndef RTPS_MD5_H +#define RTPS_MD5_H + +#include "rtps/config.h" + +namespace rtps{ +// MD5 context. + typedef struct { + uint32_t state[4]; // state (ABCD) + uint32_t count[2]; // number of bits, modulo 2^64 (lsb first) + uint8_t buffer[64]; // input buffer + } MD5_CTX; + + void MD5Init (MD5_CTX *); + void MD5Update (MD5_CTX *, const uint8_t *, uint32_t); + void MD5Final (uint8_t[16], MD5_CTX *); + +} +#endif //EMBEDDEDRTPS_EXAMPLE_MD5_H diff --git a/include/rtps/common/types.h b/include/rtps/common/types.h index 5a7ba4ba..f1448073 100644 --- a/include/rtps/common/types.h +++ b/include/rtps/common/types.h @@ -84,6 +84,13 @@ enum class DurabilityKind_t : uint32_t { PERSISTENT = 3 }; +enum class OwnershipKind_t : uint32_t{ + SHARED = 0, + EXCLUSIVE = 1 +}; + +typedef uint32_t OwnershipStrength_t; + struct GuidPrefix_t { std::array id; @@ -225,8 +232,16 @@ enum class ChangeForReaderStatusKind { enum class ChangeFromWriterStatusKind { LOST, MISSING, RECEIVED, UNKNOWN }; -struct InstanceHandle_t { // TODO - uint64_t value; +struct InstanceHandle_t { + uint8_t key[16]; + bool operator==(const InstanceHandle_t &other) const { + for (int i = 0; i < 16; i++) { + if(key[i] != other.key[i]) { + return false; + } + } + return true; + } }; struct ParticipantMessageData { // TODO @@ -269,7 +284,16 @@ const SequenceNumber_t SEQUENCENUMBER_UNKNOWN = {-1, 0}; const Time_t TIME_ZERO = {}; const Time_t TIME_INVALID = {-1, 0xFFFFFFFF}; const Time_t TIME_INFINITY = {0x7FFFFFFF, 0xFFFFFFFF}; +class WriterProxy; + +struct Instance_t{ + InstanceHandle_t handle; + WriterProxy *owner = nullptr; +}; +#ifndef CHIBIOS //has its own TIME_INFINITE +const Time_t TIME_INFINITE = {0x7FFFFFFF, 0xFFFFFFFF}; +#endif const VendorId_t VENDOR_UNKNOWN = {}; } // namespace rtps diff --git a/include/rtps/communication/UdpConnection.h b/include/rtps/communication/UdpConnection.h index 25b4fb5a..8770f1c1 100644 --- a/include/rtps/communication/UdpConnection.h +++ b/include/rtps/communication/UdpConnection.h @@ -32,6 +32,7 @@ namespace rtps { struct UdpConnection { udp_pcb *pcb = nullptr; + uint16_t port = 0; UdpConnection() = default; // Required for static allocation diff --git a/include/rtps/discovery/ParticipantProxyData.h b/include/rtps/discovery/ParticipantProxyData.h index 019484d3..3a3ab288 100644 --- a/include/rtps/discovery/ParticipantProxyData.h +++ b/include/rtps/discovery/ParticipantProxyData.h @@ -28,9 +28,13 @@ Author: i11 - Embedded Software, RWTH Aachen University #include "rtps/common/Locator.h" #include "rtps/config.h" #include "rtps/messages/MessageTypes.h" + #include "ucdr/microcdr.h" + #if defined(unix) || defined(__unix__) #include +#elif defined(CHIBIOS) +#include "chtime.h" #endif #include @@ -63,8 +67,10 @@ class ParticipantProxyData { #if defined(unix) || defined(__unix__) std::chrono::time_point m_lastLivelinessReceivedTimestamp; -#else +#elif !defined(CHIBIOS) TickType_t m_lastLivelinessReceivedTickCount = 0; +#else + sysinterval_t m_lastLivelinessReceivedTickCount = 0; #endif void reset(); @@ -146,8 +152,10 @@ bool ParticipantProxyData::hasSubscriptionReader() { void ParticipantProxyData::onAliveSignal() { #if defined(unix) || defined(__unix__) m_lastLivelinessReceivedTimestamp = std::chrono::high_resolution_clock::now(); -#else +#elif !defined(CHIBIOS) m_lastLivelinessReceivedTickCount = xTaskGetTickCount(); +#else + m_lastLivelinessReceivedTickCount = chVTGetSystemTimeX(); #endif } @@ -157,9 +165,11 @@ uint32_t ParticipantProxyData::getAliveSignalAgeInMilliseconds() { std::chrono::duration duration = now - m_lastLivelinessReceivedTimestamp; return duration.count(); -#else +#elif !defined(CHIBIOS) return (xTaskGetTickCount() - m_lastLivelinessReceivedTickCount) * (1000 / configTICK_RATE_HZ); +#else + return chTimeI2MS(chVTGetSystemTimeX() - m_lastLivelinessReceivedTickCount); #endif } diff --git a/include/rtps/discovery/SEDPAgent.h b/include/rtps/discovery/SEDPAgent.h index aa89e41f..bfa88792 100644 --- a/include/rtps/discovery/SEDPAgent.h +++ b/include/rtps/discovery/SEDPAgent.h @@ -53,7 +53,7 @@ class SEDPAgent { Participant *m_part; sys_mutex_t m_mutex; uint8_t m_buffer[600]; // TODO check size, currently changed from 300 to 600 - // (FastDDS gives too many options) + // (FastDDS gives too much options) BuiltInEndpoints m_endpoints; /* * If we add readers later on, remote participants will not send matching diff --git a/include/rtps/discovery/TopicData.h b/include/rtps/discovery/TopicData.h index 2a867e26..2c758f2a 100644 --- a/include/rtps/discovery/TopicData.h +++ b/include/rtps/discovery/TopicData.h @@ -48,7 +48,8 @@ struct TopicData { DurabilityKind_t durabilityKind; Locator unicastLocator; Locator multicastLocator; - + OwnershipKind_t ownership_Kind; + OwnershipStrength_t ownership_strenght; TopicData() : endpointGuid(GUID_UNKNOWN), typeName{'\0'}, topicName{'\0'}, reliabilityKind(ReliabilityKind_t::BEST_EFFORT), @@ -57,13 +58,15 @@ struct TopicData { 192, 168, 0, 42, rtps::getUserUnicastPort(0)); unicastLocator = someLocator; multicastLocator = Locator(); + ownership_Kind = OwnershipKind_t::SHARED; + ownership_strenght = 0; }; TopicData(Guid_t guid, ReliabilityKind_t reliability, Locator loc) : endpointGuid(guid), typeName{'\0'}, topicName{'\0'}, reliabilityKind(reliability), durabilityKind(DurabilityKind_t::TRANSIENT_LOCAL), unicastLocator(loc) { - } + }; bool matchesTopicOf(const TopicData &other); diff --git a/include/rtps/entities/Domain.h b/include/rtps/entities/Domain.h index 14ed9812..74d27556 100644 --- a/include/rtps/entities/Domain.h +++ b/include/rtps/entities/Domain.h @@ -45,13 +45,24 @@ class Domain { void stop(); Participant *createParticipant(); + Writer *createWriter(Participant &part, const char *topicName, const char *typeName, bool reliable, bool enforceUnicast = false); + + + Writer *createWriter(Participant &part, const char *topicName, const char *typeName, bool topichasKey, OwnershipKind_t ownership_kind, OwnershipStrength_t ownershipStrength, bool reliable , bool enforceUnicast); + + + Reader *createReader(Participant &part, const char *topicName, bool topichasKey, + const char *typeName, bool reliable, OwnershipKind_t ownershipKind, + ip4_addr_t mcastaddress = {0}); + Reader *createReader(Participant &part, const char *topicName, const char *typeName, bool reliable, ip4_addr_t mcastaddress = {0}); + Writer *writerExists(Participant &part, const char *topicName, const char *typeName, bool reliable); Reader *readerExists(Participant &part, const char *topicName, diff --git a/include/rtps/entities/Reader.h b/include/rtps/entities/Reader.h index 9dc43c33..caaf335d 100644 --- a/include/rtps/entities/Reader.h +++ b/include/rtps/entities/Reader.h @@ -76,12 +76,13 @@ class ReaderCacheChange { typedef void (*ddsReaderCallback_fp)(void *callee, const ReaderCacheChange &cacheChange); - +typedef void (*ddsGetKey_Callback_fp)(const uint8_t *data, uint32_t data_len, InstanceHandle_t &key); class Reader { public: TopicData m_attributes; virtual void newChange(const ReaderCacheChange &cacheChange) = 0; virtual void registerCallback(ddsReaderCallback_fp cb, void *callee) = 0; + virtual void registerKeyCallback(ddsGetKey_Callback_fp cb) = 0; virtual bool onNewHeartbeat(const SubmessageHeartbeat &msg, const GuidPrefix_t &remotePrefix) = 0; virtual bool addNewMatchedWriter(const WriterProxy &newProxy) = 0; diff --git a/include/rtps/entities/StatefulReader.h b/include/rtps/entities/StatefulReader.h index fc73892e..47838094 100644 --- a/include/rtps/entities/StatefulReader.h +++ b/include/rtps/entities/StatefulReader.h @@ -41,6 +41,7 @@ template class StatefulReaderT final : public Reader { void init(const TopicData &attributes, NetworkDriver &driver); void newChange(const ReaderCacheChange &cacheChange) override; void registerCallback(ddsReaderCallback_fp cb, void *callee) override; + void registerKeyCallback(ddsGetKey_Callback_fp cb) override; bool addNewMatchedWriter(const WriterProxy &newProxy) override; void removeWriter(const Guid_t &guid) override; void removeWriterOfParticipant(const GuidPrefix_t &guidPrefix) override; @@ -51,10 +52,13 @@ template class StatefulReaderT final : public Reader { PacketInfo m_packetInfo; // TODO intended for reuse but buffer not used as such NetworkDriver *m_transport; - + TopicKind_t m_kind = TopicKind_t::NO_KEY; ddsReaderCallback_fp m_callback = nullptr; + ddsGetKey_Callback_fp m_KeyCallback = nullptr; void *m_callee = nullptr; sys_mutex_t m_mutex; + bool isOwner(InstanceHandle_t &handle, WriterProxy *proxy); + MemoryPool m_instances; }; using StatefulReader = StatefulReaderT; diff --git a/include/rtps/entities/StatefulReader.tpp b/include/rtps/entities/StatefulReader.tpp index 0c2130c4..1aea4dc4 100644 --- a/include/rtps/entities/StatefulReader.tpp +++ b/include/rtps/entities/StatefulReader.tpp @@ -59,12 +59,49 @@ void StatefulReaderT::init(const TopicData &attributes, return; } + + if(attributes.endpointGuid.entityId.entityKind == EntityKind_t::USER_DEFINED_READER_WITH_KEY){ + m_kind = TopicKind_t::WITH_KEY; + } + m_attributes = attributes; m_transport = &driver; m_packetInfo.srcPort = attributes.unicastLocator.port; m_is_initialized_ = true; } +template +bool StatefulReaderT::isOwner(InstanceHandle_t &handle, WriterProxy *proxy){ + for(auto &instance : m_instances){ + if(instance.handle == handle){ + if(instance.owner == nullptr){ + instance.owner = proxy; + return true; + } + if(instance.owner == proxy){ + return true; + } + else{ + if(proxy->ownershipStrength < instance.owner->ownershipStrength){ + return false; + } + else if(proxy->ownershipStrength > instance.owner->ownershipStrength){ + instance.owner = proxy; + return true; + } + else{//equal strength , just pick the first one + return false; + } + } + } + } + Instance_t instance; + instance.owner = proxy; + instance.handle = handle; + m_instances.add(instance); + return true; +} + template void StatefulReaderT::newChange( const ReaderCacheChange &cacheChange) { @@ -75,8 +112,17 @@ void StatefulReaderT::newChange( for (auto &proxy : m_proxies) { if (proxy.remoteWriterGuid == cacheChange.writerGuid) { if (proxy.expectedSN == cacheChange.sn) { - m_callback(m_callee, cacheChange); ++proxy.expectedSN; + if(m_attributes.ownership_Kind == OwnershipKind_t::EXCLUSIVE) { + InstanceHandle_t handle; + m_KeyCallback(cacheChange.getData(), cacheChange.getDataSize(), handle); + if (isOwner(handle, &proxy)) { // + m_callback(m_callee, cacheChange); + } + } + else{ + m_callback(m_callee, cacheChange); + } return; } } @@ -95,6 +141,16 @@ void StatefulReaderT::registerCallback(ddsReaderCallback_fp cb, } } +template +void StatefulReaderT::registerKeyCallback(ddsGetKey_Callback_fp cb){ + if(cb != nullptr){ + m_KeyCallback = cb; + } + else{ + SFR_LOG("Passed Key callback is nullptr\n"); + } +} + template bool StatefulReaderT::addNewMatchedWriter( const WriterProxy &newProxy) { @@ -109,6 +165,14 @@ bool StatefulReaderT::addNewMatchedWriter( template void StatefulReaderT::removeWriter(const Guid_t &guid) { Lock lock(m_mutex); + auto isElementToRemove_Instance = [&](const Instance_t &instance){ + return instance.owner->remoteWriterGuid == guid; + }; + auto thunk_instance = [](void *arg, const Instance_t &value) { + return (*static_cast(arg))(value); + }; + m_instances.remove(thunk_instance, &isElementToRemove_Instance); + auto isElementToRemove = [&](const WriterProxy &proxy) { return proxy.remoteWriterGuid == guid; }; @@ -123,6 +187,14 @@ template void StatefulReaderT::removeWriterOfParticipant( const GuidPrefix_t &guidPrefix) { Lock lock(m_mutex); + auto isElementToRemove_Instance = [&](const Instance_t &instance){ + return instance.owner->remoteWriterGuid.prefix == guidPrefix; + }; + auto thunk_instance = [](void *arg, const Instance_t &value) { + return (*static_cast(arg))(value); + }; + m_instances.remove(thunk_instance, &isElementToRemove_Instance); + auto isElementToRemove = [&](const WriterProxy &proxy) { return proxy.remoteWriterGuid.prefix == guidPrefix; }; diff --git a/include/rtps/entities/StatefulWriter.tpp b/include/rtps/entities/StatefulWriter.tpp index 470452ad..972cc354 100644 --- a/include/rtps/entities/StatefulWriter.tpp +++ b/include/rtps/entities/StatefulWriter.tpp @@ -348,6 +348,7 @@ bool StatefulWriterT::sendData( info.destAddr = locator.getIp4Address(); info.destPort = (Ip4Port_t)locator.port; + { Lock lock(m_mutex); const CacheChange *next = m_history.getChangeBySN(snMissing); @@ -358,9 +359,14 @@ bool StatefulWriterT::sendData( return false; } + bool inlineQoS = false; + // if(m_attributes.ownership_Kind == OwnershipKind_t::EXCLUSIVE){ + // inlineQoS = true; + //} MessageFactory::addSubMessageData( - info.buffer, next->data, false, next->sequenceNumber, - m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId); + info.buffer, next->data, inlineQoS, next->sequenceNumber, + m_attributes.endpointGuid.entityId, reader.remoteReaderGuid.entityId, m_attributes.ownership_Kind, m_attributes.ownership_strenght + ); } m_transport->sendPacket(info); diff --git a/include/rtps/entities/StatelessReader.h b/include/rtps/entities/StatelessReader.h index 3f1c7963..e179ce03 100644 --- a/include/rtps/entities/StatelessReader.h +++ b/include/rtps/entities/StatelessReader.h @@ -39,11 +39,15 @@ class StatelessReader final : public Reader { bool addNewMatchedWriter(const WriterProxy &newProxy) override; void removeWriter(const Guid_t &guid) override; void removeWriterOfParticipant(const GuidPrefix_t &guidPrefix) override; - + void registerKeyCallback(ddsGetKey_Callback_fp cb) override; private: sys_mutex_t m_mutex; ddsReaderCallback_fp m_callback = nullptr; + ddsGetKey_Callback_fp m_keyCallback = nullptr; void *m_callee = nullptr; + MemoryPool m_instances; + + bool isOwner(InstanceHandle_t &handle, WriterProxy *proxy); }; } // namespace rtps diff --git a/include/rtps/entities/StatelessWriter.tpp b/include/rtps/entities/StatelessWriter.tpp index f45d07be..dafc8f08 100644 --- a/include/rtps/entities/StatelessWriter.tpp +++ b/include/rtps/entities/StatelessWriter.tpp @@ -34,7 +34,6 @@ Author: i11 - Embedded Software, RWTH Aachen University #include "rtps/utils/udpUtils.h" using rtps::CacheChange; -using rtps::SequenceNumber_t; using rtps::StatelessWriterT; #if SLW_VERBOSE && RTPS_GLOBAL_VERBOSE diff --git a/include/rtps/entities/Writer.h b/include/rtps/entities/Writer.h index c300bcae..2a0552af 100644 --- a/include/rtps/entities/Writer.h +++ b/include/rtps/entities/Writer.h @@ -46,6 +46,7 @@ class Writer { virtual void progress() = 0; virtual const CacheChange *newChange(ChangeKind_t kind, const uint8_t *data, DataSize_t size) = 0; + // virtual const CacheChange *newChange(ChangeKind_t kind, DynamicDataType *data) = 0; virtual void setAllChangesToUnsent() = 0; virtual void onNewAckNack(const SubmessageAckNack &msg, const GuidPrefix_t &sourceGuidPrefix) = 0; diff --git a/include/rtps/entities/WriterProxy.h b/include/rtps/entities/WriterProxy.h index a2302c0d..1a8e6719 100644 --- a/include/rtps/entities/WriterProxy.h +++ b/include/rtps/entities/WriterProxy.h @@ -35,13 +35,13 @@ struct WriterProxy { Count_t ackNackCount; Count_t hbCount; Locator remoteLocator; - + OwnershipStrength_t ownershipStrength; WriterProxy() = default; - WriterProxy(const Guid_t &guid, const Locator &loc) + WriterProxy(const Guid_t &guid, const Locator &loc, const OwnershipStrength_t ownershipStrength = 0) : remoteWriterGuid(guid), expectedSN(SequenceNumber_t{0, 1}), ackNackCount{1}, hbCount{0}, - remoteLocator(loc) {} + remoteLocator(loc), ownershipStrength(ownershipStrength) {} // For now, we don't store any packets, so we just request all starting from // the next expected diff --git a/include/rtps/messages/MessageFactory.h b/include/rtps/messages/MessageFactory.h index c9c2b683..e16cdd8f 100644 --- a/include/rtps/messages/MessageFactory.h +++ b/include/rtps/messages/MessageFactory.h @@ -101,7 +101,7 @@ void addSubMessageTimeStamp(Buffer &buffer, bool setInvalid = false) { template void addSubMessageData(Buffer &buffer, const Buffer &filledPayload, bool containsInlineQos, const SequenceNumber_t &SN, - const EntityId_t &writerID, const EntityId_t &readerID) { + const EntityId_t &writerID, const EntityId_t &readerID, const OwnershipKind_t &ownershipKind = OwnershipKind_t::SHARED, const OwnershipStrength_t &ownershipStrength = 0) { SubmessageData msg; msg.header.submessageId = SubmessageKind::DATA; #if IS_LITTLE_ENDIAN diff --git a/include/rtps/messages/MessageTypes.h b/include/rtps/messages/MessageTypes.h index b14a26cc..29970b49 100644 --- a/include/rtps/messages/MessageTypes.h +++ b/include/rtps/messages/MessageTypes.h @@ -175,6 +175,9 @@ struct SubmessageData { EntityId_t readerId; EntityId_t writerId; SequenceNumber_t writerSN; + + OwnershipKind_t ownershipKind; + OwnershipStrength_t ownershipStrength; static constexpr uint16_t getRawSize() { return SubmessageHeader::getRawSize() + sizeof(uint16_t) + sizeof(uint16_t) + (2 * 3 + 2 * 1) // EntityID @@ -288,6 +291,19 @@ bool serializeMessage(Buffer &buffer, SubmessageData &msg) { sizeof(msg.writerSN.high)); buffer.append(reinterpret_cast(&msg.writerSN.low), sizeof(msg.writerSN.low)); + if(msg.ownershipKind == OwnershipKind_t::EXCLUSIVE ){ + // buffer.append(reinterpret_cast(SMElement::ParameterId::PID_OWNERSHIP_STRENGTH), sizeof(uint16_t)); + // buffer.append(reinterpret_cast(sizeof(OwnershipStrength_t)), sizeof(uint16_t)); + // buffer.append(reinterpret_cast(msg.ownershipStrength), sizeof(OwnershipStrength_t)); + //buffer.append(reinterpret_cast(SMElement::ParameterId::PID_KEY_HASH), sizeof(uint16_t)); + //uint8_t key[16]; + // buffer.append(reinterpret_cast(16),sizeof(uint16_t)); + // buffer.append(reinterpret_cast(key), 16); + + // buffer.append(reinterpret_cast(SMElement::ParameterId::PID_SENTINEL), sizeof(uint16_t)); + // buffer.append(reinterpret_cast(0), sizeof(uint16_t)); + + } return true; } diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index babc01d8..f148611f 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -180,8 +180,8 @@ void ThreadPool::readCallback(void *args, udp_pcb *target, pbuf *pbuf, packet.destPort = target->local_port; packet.srcPort = port; packet.buffer = PBufWrapper{pbuf}; - if (!pool.addNewPacket(std::move(packet))) { + THREAD_POOL_LOG("ThreadPool: dropped packet\n"); } } @@ -207,7 +207,7 @@ void ThreadPool::doReaderWork() { continue; } - m_receiveJumppad(m_callee, const_cast(packet)); + m_receiveJumppad(m_callee, const_cast(packet)); } } diff --git a/src/common/MD5.cpp b/src/common/MD5.cpp new file mode 100644 index 00000000..3c2bd944 --- /dev/null +++ b/src/common/MD5.cpp @@ -0,0 +1,285 @@ +// +// Created by Sven on 14.10.21. +// + +#include + +#include +//#include "rtps/commonmd5.h" +using namespace rtps; +// Constants for MD5Transform routine. + +#define MD5_memcpy memcpy +#define MD5_memset memset + +#define S11 7 +#define S12 12 +#define S13 17 +#define S14 22 +#define S21 5 +#define S22 9 +#define S23 14 +#define S24 20 +#define S31 4 +#define S32 11 +#define S33 16 +#define S34 23 +#define S41 6 +#define S42 10 +#define S43 15 +#define S44 21 + +static const uint8_t PADDING[64] = { + 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 +}; + +// F, G, H and I are basic MD5 functions. + +#define F(x, y, z) (((x) & (y)) | ((~x) & (z))) +#define G(x, y, z) (((x) & (z)) | ((y) & (~z))) +#define H(x, y, z) ((x) ^ (y) ^ (z)) +#define I(x, y, z) ((y) ^ ((x) | (~z))) + +// ROTATE_LEFT rotates x left n bits. + +#define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (32-(n)))) +//extern DWORD fast_long_roll( DWORD l, signed char n ) small; +//#define ROTATE_LEFT fast_long_roll + +// FF, GG, HH, and II transformations for rounds 1, 2, 3, and 4. +// Rotation is separate from addition to prevent recomputation. + +#define FF(a, b, c, d, x, s, ac) { \ + (a) += F ((b), (c), (d)) + (x) + (uint32_t)(ac); \ + (a) = ROTATE_LEFT ((a), (s)); \ + (a) += (b); \ + } +#define GG(a, b, c, d, x, s, ac) { \ + (a) += G ((b), (c), (d)) + (x) + (uint32_t)(ac); \ + (a) = ROTATE_LEFT ((a), (s)); \ + (a) += (b); \ + } +#define HH(a, b, c, d, x, s, ac) { \ + (a) += H ((b), (c), (d)) + (x) + (uint32_t)(ac); \ + (a) = ROTATE_LEFT ((a), (s)); \ + (a) += (b); \ + } +#define II(a, b, c, d, x, s, ac) { \ + (a) += I ((b), (c), (d)) + (x) + (uint32_t)(ac); \ + (a) = ROTATE_LEFT ((a), (s)); \ + (a) += (b); \ + } + +// Encodes input (UINT4) into output (BYTE). Assumes len is +// a multiple of 4. +static void Encode (uint8_t *output, const uint32_t *input, uint16_t len ) +{ + uint16_t i, j; + + for (i = 0, j = 0; j < len; i++, j += 4) + { + output[j] = (uint8_t)(input[i] & 0xff); + *(output+j+1) = (uint8_t)((input[i] >> 8) & 0xff); + *(output+j+2) = (uint8_t)((input[i] >> 16) & 0xff); + *(output+j+3) = (uint8_t)((input[i] >> 24) & 0xff); + } +} + +// Decodes input (BYTE) into output (UINT4). Assumes len is +// multiple of 4. +static void Decode (uint32_t *output, const uint8_t *input, uint16_t len ) +{ + uint16_t i, j; + + for (i = 0, j = 0; j < len; i++, j += 4) + output[i] = ((uint32_t)input[j]) | (((uint32_t)*(input + j + 1)) << 8) | (((uint32_t)*(input + j + 2)) << 16) | (((uint32_t)*(input + j + 3)) << 24); +} + +// Note: Replace "for loop" with standard memcpy if possible. + +//static void MD5_memcpy (char * output, char * input, WORD len ) +//{ +//// WORD i; +// +// memcpy(output,input,len); +//// for (i = 0; i < len; i++) +//// output[i] = input[i]; +//} +// +//// Note: Replace "for loop" with standard memset if possible. +//static void MD5_memset (char * output, char value, WORD len ) +//{ +//// WORD i; +// +// memset( output, value, len ); +//// for (i = 0; i < len; i++) +//// ((char *)output)[i] = (char)value; +//} + +// MD5 basic transformation. Transforms state based on block. + +static void MD5Transform (uint32_t state[4], const uint8_t block[64]) +{ + uint32_t a = state[0], b = state[1], c = state[2], d = state[3], x[16]; + + Decode (x, block, 64); + + // Round 1 + FF (a, b, c, d, x[ 0], S11, 0xd76aa478); // 1 + FF (d, a, b, c, x[ 1], S12, 0xe8c7b756); // 2 + FF (c, d, a, b, x[ 2], S13, 0x242070db); // 3 + FF (b, c, d, a, x[ 3], S14, 0xc1bdceee); // 4 + FF (a, b, c, d, x[ 4], S11, 0xf57c0faf); // 5 + FF (d, a, b, c, x[ 5], S12, 0x4787c62a); // 6 + FF (c, d, a, b, x[ 6], S13, 0xa8304613); // 7 + FF (b, c, d, a, x[ 7], S14, 0xfd469501); // 8 + FF (a, b, c, d, x[ 8], S11, 0x698098d8); // 9 + FF (d, a, b, c, x[ 9], S12, 0x8b44f7af); // 10 + FF (c, d, a, b, x[10], S13, 0xffff5bb1); // 11 + FF (b, c, d, a, x[11], S14, 0x895cd7be); // 12 + FF (a, b, c, d, x[12], S11, 0x6b901122); // 13 + FF (d, a, b, c, x[13], S12, 0xfd987193); // 14 + FF (c, d, a, b, x[14], S13, 0xa679438e); // 15 + FF (b, c, d, a, x[15], S14, 0x49b40821); // 16 + + // Round 2 + GG (a, b, c, d, x[ 1], S21, 0xf61e2562); // 17 + GG (d, a, b, c, x[ 6], S22, 0xc040b340); // 18 + GG (c, d, a, b, x[11], S23, 0x265e5a51); // 19 + GG (b, c, d, a, x[ 0], S24, 0xe9b6c7aa); // 20 + GG (a, b, c, d, x[ 5], S21, 0xd62f105d); // 21 + GG (d, a, b, c, x[10], S22, 0x2441453); // 22 + GG (c, d, a, b, x[15], S23, 0xd8a1e681); // 23 + GG (b, c, d, a, x[ 4], S24, 0xe7d3fbc8); // 24 + GG (a, b, c, d, x[ 9], S21, 0x21e1cde6); // 25 + GG (d, a, b, c, x[14], S22, 0xc33707d6); // 26 + GG (c, d, a, b, x[ 3], S23, 0xf4d50d87); // 27 + GG (b, c, d, a, x[ 8], S24, 0x455a14ed); // 28 + GG (a, b, c, d, x[13], S21, 0xa9e3e905); // 29 + GG (d, a, b, c, x[ 2], S22, 0xfcefa3f8); // 30 + GG (c, d, a, b, x[ 7], S23, 0x676f02d9); // 31 + GG (b, c, d, a, x[12], S24, 0x8d2a4c8a); // 32 + + // Round 3 + HH (a, b, c, d, x[ 5], S31, 0xfffa3942); // 33 + HH (d, a, b, c, x[ 8], S32, 0x8771f681); // 34 + HH (c, d, a, b, x[11], S33, 0x6d9d6122); // 35 + HH (b, c, d, a, x[14], S34, 0xfde5380c); // 36 + HH (a, b, c, d, x[ 1], S31, 0xa4beea44); // 37 + HH (d, a, b, c, x[ 4], S32, 0x4bdecfa9); // 38 + HH (c, d, a, b, x[ 7], S33, 0xf6bb4b60); // 39 + HH (b, c, d, a, x[10], S34, 0xbebfbc70); // 40 + HH (a, b, c, d, x[13], S31, 0x289b7ec6); // 41 + HH (d, a, b, c, x[ 0], S32, 0xeaa127fa); // 42 + HH (c, d, a, b, x[ 3], S33, 0xd4ef3085); // 43 + HH (b, c, d, a, x[ 6], S34, 0x4881d05); // 44 + HH (a, b, c, d, x[ 9], S31, 0xd9d4d039); // 45 + HH (d, a, b, c, x[12], S32, 0xe6db99e5); // 46 + HH (c, d, a, b, x[15], S33, 0x1fa27cf8); // 47 + HH (b, c, d, a, x[ 2], S34, 0xc4ac5665); // 48 + + // Round 4 + II (a, b, c, d, x[ 0], S41, 0xf4292244); // 49 + II (d, a, b, c, x[ 7], S42, 0x432aff97); // 50 + II (c, d, a, b, x[14], S43, 0xab9423a7); // 51 + II (b, c, d, a, x[ 5], S44, 0xfc93a039); // 52 + II (a, b, c, d, x[12], S41, 0x655b59c3); // 53 + II (d, a, b, c, x[ 3], S42, 0x8f0ccc92); // 54 + II (c, d, a, b, x[10], S43, 0xffeff47d); // 55 + II (b, c, d, a, x[ 1], S44, 0x85845dd1); // 56 + II (a, b, c, d, x[ 8], S41, 0x6fa87e4f); // 57 + II (d, a, b, c, x[15], S42, 0xfe2ce6e0); // 58 + II (c, d, a, b, x[ 6], S43, 0xa3014314); // 59 + II (b, c, d, a, x[13], S44, 0x4e0811a1); // 60 + II (a, b, c, d, x[ 4], S41, 0xf7537e82); // 61 + II (d, a, b, c, x[11], S42, 0xbd3af235); // 62 + II (c, d, a, b, x[ 2], S43, 0x2ad7d2bb); // 63 + II (b, c, d, a, x[ 9], S44, 0xeb86d391); // 64 + + state[0] += a; + state[1] += b; + state[2] += c; + state[3] += d; + + // Zeroize sensitive information. + MD5_memset ((char *)x, 0, sizeof (x)); +} + +void rtps::MD5Init (MD5_CTX *context) +{ + context->count[0] = context->count[1] = 0; + // Load magic initialization constants. + + context->state[0] = 0x67452301; // A + context->state[1] = 0xefcdab89; // B + context->state[2] = 0x98badcfe; // C + context->state[3] = 0x10325476; // D +} + +// MD5 block update operation. Continues an MD5 message-digest +// operation, processing another message block, and updating the +// context. + +void rtps::MD5Update ( MD5_CTX *context, const uint8_t *input, uint32_t inputLen ) +{ + uint16_t i, index, partLen; + + // Compute number of bytes mod 64 + index = (uint16_t)((context->count[0] >> 3) & 0x3F); + + // Update number of bits + if ((context->count[0] += (inputLen << 3)) < (inputLen << 3)) + context->count[1]++; + + context->count[1] += (inputLen >> 29); + + partLen = 64 - index; + + // Transform as many times as possible. + + if (inputLen >= partLen) + { + MD5_memcpy(&context->buffer[index], (char *)input, (int)partLen); + MD5Transform (context->state, context->buffer); + + for (i = partLen; i + 63 < inputLen; i += 64) + MD5Transform (context->state, &input[i]); + + index = 0; + } + else + i = 0; + + // Buffer remaining input + MD5_memcpy(&context->buffer[index], (char *)&input[i],(int)(inputLen-i)); +} + +// MD5 finalization. Ends an MD5 message-digest operation, writing the +// the message digest and zeroizing the context. +void rtps::MD5Final ( uint8_t digest[16], MD5_CTX *context ) +{ + uint8_t bits[8]; + uint16_t index, padLen; + + // Save number of bits + Encode (bits, context->count, 8); + + // Pad out to 56 mod 64. + + index = (uint16_t)((context->count[0] >> 3) & 0x3f); + padLen = (index < 56) ? (56 - index) : (120 - index); + rtps::MD5Update (context, PADDING, (uint32_t)padLen); + + // Append length (before padding) + rtps::MD5Update (context, bits, 8L); + + // Store state in digest + Encode (digest, context->state, 16); + + // Zeroize sensitive information. + MD5_memset ((char *)context, 0, sizeof (*context)); +} + + diff --git a/src/discovery/SPDPAgent.cpp b/src/discovery/SPDPAgent.cpp index 441d3f8e..54b722c0 100644 --- a/src/discovery/SPDPAgent.cpp +++ b/src/discovery/SPDPAgent.cpp @@ -65,7 +65,7 @@ void SPDPAgent::init(Participant &participant, BuiltInEndpoints &endpoints) { ucdr_init_buffer(&m_microbuffer, m_outputBuffer.data(), m_outputBuffer.size()); - // addInlineQos(); +// addInlineQos(); addParticipantParameters(); initialized = true; } diff --git a/src/discovery/TopicData.cpp b/src/discovery/TopicData.cpp index ebb608a0..94658ffb 100644 --- a/src/discovery/TopicData.cpp +++ b/src/discovery/TopicData.cpp @@ -30,8 +30,10 @@ using rtps::TopicDataCompressed; using rtps::SMElement::ParameterId; bool TopicData::matchesTopicOf(const TopicData &other) { - return strcmp(this->topicName, other.topicName) == 0 && + bool sameTopicAndType = strcmp(this->topicName, other.topicName) == 0 && strcmp(this->typeName, other.typeName) == 0; + + return sameTopicAndType; } bool TopicData::readFromUcdrBuffer(ucdrBuffer &buffer) { @@ -63,8 +65,6 @@ bool TopicData::readFromUcdrBuffer(ucdrBuffer &buffer) { buffer.iterator += 8; // TODO Skip 8 bytes. don't know what they are yet break; - case ParameterId::PID_SENTINEL: - return true; case ParameterId::PID_TOPIC_NAME: uint32_t topicNameLength; ucdr_deserialize_uint32_t(&buffer, &topicNameLength); @@ -85,6 +85,15 @@ bool TopicData::readFromUcdrBuffer(ucdrBuffer &buffer) { case ParameterId::PID_MULTICAST_LOCATOR: multicastLocator.readFromUcdrBuffer(buffer); break; + case ParameterId::PID_OWNERSHIP: + ucdr_deserialize_uint32_t(&buffer,reinterpret_cast(&ownership_Kind)); + + break; + case ParameterId::PID_OWNERSHIP_STRENGTH: + ucdr_deserialize_uint32_t(&buffer,reinterpret_cast(&ownership_strenght)); + break; + case ParameterId::PID_SENTINEL: + return true;//End of information default: buffer.iterator += length; buffer.last_data_size = 1; @@ -153,6 +162,7 @@ bool TopicData::serializeIntoUcdrBuffer(ucdrBuffer &buffer) const { ucdr_serialize_uint16_t(&buffer, ParameterId::PID_KEY_HASH); ucdr_serialize_uint16_t(&buffer, guidSize); + ucdr_serialize_array_uint8_t(&buffer, endpointGuid.prefix.id.data(), endpointGuid.prefix.id.size()); ucdr_serialize_array_uint8_t(&buffer, endpointGuid.entityId.entityKey.data(), @@ -171,9 +181,9 @@ bool TopicData::serializeIntoUcdrBuffer(ucdrBuffer &buffer) const { const uint8_t unidentifiedOffset = 8; ucdr_serialize_uint16_t(&buffer, ParameterId::PID_RELIABILITY); - ucdr_serialize_uint16_t(&buffer, - sizeof(ReliabilityKind_t) + unidentifiedOffset); + ucdr_serialize_uint16_t(&buffer,sizeof(ReliabilityKind_t) + unidentifiedOffset); ucdr_serialize_uint32_t(&buffer, static_cast(reliabilityKind)); + ucdr_serialize_uint32_t(&buffer, 0); // unidentified additional value ucdr_serialize_uint32_t(&buffer, 0); // unidentified additional value @@ -181,6 +191,17 @@ bool TopicData::serializeIntoUcdrBuffer(ucdrBuffer &buffer) const { ucdr_serialize_uint16_t(&buffer, sizeof(DurabilityKind_t)); ucdr_serialize_uint32_t(&buffer, static_cast(durabilityKind)); + ucdr_serialize_uint16_t(&buffer, ParameterId::PID_OWNERSHIP); + ucdr_serialize_uint16_t(&buffer, sizeof(OwnershipKind_t)); + ucdr_serialize_uint32_t(&buffer, static_cast(ownership_Kind)); + + if(ownership_Kind == OwnershipKind_t::EXCLUSIVE && endpointGuid.entityId.entityKind == EntityKind_t::USER_DEFINED_WRITER_WITH_KEY) {//Only writers have strength + ucdr_serialize_uint16_t(&buffer, ParameterId::PID_OWNERSHIP_STRENGTH); + ucdr_serialize_uint16_t(&buffer, sizeof(ownership_strenght)); + ucdr_serialize_uint32_t(&buffer, ownership_strenght); + } + + //End of QoS ucdr_serialize_uint16_t(&buffer, ParameterId::PID_SENTINEL); ucdr_serialize_uint16_t(&buffer, 0); diff --git a/src/entities/Domain.cpp b/src/entities/Domain.cpp index 54967f25..0a0e3ee4 100644 --- a/src/entities/Domain.cpp +++ b/src/entities/Domain.cpp @@ -309,21 +309,20 @@ rtps::Writer *Domain::writerExists(Participant &part, const char *topicName, return nullptr; } -rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, - const char *typeName, bool reliable, - bool enforceUnicast) { - - // Check if there is enough capacity for more writers - if ((reliable && m_statefulWriters.size() <= m_numStatefulWriters) || - (!reliable && m_statelessWriters.size() <= m_numStatelessWriters) || - part.isWritersFull()) { - - DOMAIN_LOG("No Writer created. Max Number of Writers reached.\n"); - return nullptr; - } +rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, + const char *typeName,bool topichasKey, + OwnershipKind_t ownership_kind, OwnershipStrength_t ownershipStrength, + bool reliable , + bool enforceUnicast ){ + // Check if there is enough capacity for more writers + if ((reliable && m_statefulWriters.size() <= m_numStatefulWriters) || + ((!reliable) && m_statelessWriters.size() <= m_numStatelessWriters) || + part.isWritersFull()) { + DOMAIN_LOG("No Writer created. Max Number of Writers reached.\n"); + return nullptr; + } - // TODO Distinguish WithKey and NoKey (Also changes EntityKind) TopicData attributes; if (strlen(topicName) > Config::MAX_TOPICNAME_LENGTH || @@ -334,18 +333,25 @@ rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, strcpy(attributes.typeName, typeName); attributes.endpointGuid.prefix = part.m_guidPrefix; attributes.endpointGuid.entityId = { - part.getNextUserEntityKey(), - EntityKind_t::USER_DEFINED_WRITER_WITHOUT_KEY}; + part.getNextUserEntityKey(), + EntityKind_t::USER_DEFINED_WRITER_WITHOUT_KEY}; + + TopicKind_t kind = TopicKind_t::NO_KEY; + if(topichasKey){ + kind = TopicKind_t::WITH_KEY; + attributes.endpointGuid.entityId.entityKind = EntityKind_t::USER_DEFINED_WRITER_WITH_KEY; + } attributes.unicastLocator = getUserUnicastLocator(part.m_participantId); attributes.durabilityKind = DurabilityKind_t::TRANSIENT_LOCAL; - DOMAIN_LOG("Creating writer[%s, %s]\n", topicName, typeName); - - if (reliable) { + attributes.ownership_Kind = ownership_kind; + if(ownership_kind == OwnershipKind_t::EXCLUSIVE) { + attributes.ownership_strenght = ownershipStrength; + } + if (reliable || ownership_kind == OwnershipKind_t::EXCLUSIVE) { attributes.reliabilityKind = ReliabilityKind_t::RELIABLE; - StatefulWriter &writer = m_statefulWriters[m_numStatefulWriters++]; - writer.init(attributes, TopicKind_t::NO_KEY, &m_threadPool, m_transport, + writer.init(attributes, kind, &m_threadPool, m_transport, enforceUnicast); part.addWriter(&writer); @@ -354,7 +360,7 @@ rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, attributes.reliabilityKind = ReliabilityKind_t::BEST_EFFORT; StatelessWriter &writer = m_statelessWriters[m_numStatelessWriters++]; - writer.init(attributes, TopicKind_t::NO_KEY, &m_threadPool, m_transport, + writer.init(attributes, kind, &m_threadPool, m_transport, enforceUnicast); part.addWriter(&writer); @@ -362,10 +368,16 @@ rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, } } -rtps::Reader *Domain::createReader(Participant &part, const char *topicName, +rtps::Writer *Domain::createWriter(Participant &part, const char *topicName, const char *typeName, bool reliable, - ip4_addr_t mcastaddress) { - if ((reliable && m_statefulReaders.size() <= m_numStatefulReaders) || + bool enforceUnicast) { + return createWriter(part,topicName,typeName,false,OwnershipKind_t::SHARED, 0, reliable, enforceUnicast); +} + +rtps::Reader *Domain::createReader(Participant &part, const char *topicName, bool topichasKey, + const char *typeName, bool reliable, OwnershipKind_t ownershipKind, + ip4_addr_t mcastaddress){ + if ((reliable&& m_statefulReaders.size() <= m_numStatefulReaders) || (!reliable && m_statelessReaders.size() <= m_numStatelessReaders) || part.isReadersFull()) { @@ -373,8 +385,6 @@ rtps::Reader *Domain::createReader(Participant &part, const char *topicName, return nullptr; } - - // TODO Distinguish WithKey and NoKey (Also changes EntityKind) TopicData attributes; if (strlen(topicName) > Config::MAX_TOPICNAME_LENGTH || @@ -385,17 +395,17 @@ rtps::Reader *Domain::createReader(Participant &part, const char *topicName, strcpy(attributes.typeName, typeName); attributes.endpointGuid.prefix = part.m_guidPrefix; attributes.endpointGuid.entityId = { - part.getNextUserEntityKey(), - EntityKind_t::USER_DEFINED_READER_WITHOUT_KEY}; + part.getNextUserEntityKey(), + EntityKind_t::USER_DEFINED_READER_WITHOUT_KEY}; attributes.unicastLocator = getUserUnicastLocator(part.m_participantId); if (!isZeroAddress(mcastaddress)) { if (ip4_addr_ismulticast(&mcastaddress)) { attributes.multicastLocator = rtps::Locator::createUDPv4Locator( - ip4_addr1(&mcastaddress), ip4_addr2(&mcastaddress), - ip4_addr3(&mcastaddress), ip4_addr4(&mcastaddress), - getUserMulticastPort()); + ip4_addr1(&mcastaddress), ip4_addr2(&mcastaddress), + ip4_addr3(&mcastaddress), ip4_addr4(&mcastaddress), + getUserMulticastPort()); m_transport.joinMultiCastGroup( - attributes.multicastLocator.getIp4Address()); + attributes.multicastLocator.getIp4Address()); registerMulticastPort(attributes.multicastLocator); DOMAIN_LOG("Multicast enabled!\n"); @@ -406,16 +416,21 @@ rtps::Reader *Domain::createReader(Participant &part, const char *topicName, } } attributes.durabilityKind = DurabilityKind_t::VOLATILE; - + //TopicKind_t kind; + if(topichasKey){ + // kind = TopicKind_t::WITH_KEY; + attributes.endpointGuid.entityId.entityKind = EntityKind_t::USER_DEFINED_READER_WITH_KEY; + } DOMAIN_LOG("Creating reader[%s, %s]\n", topicName, typeName); + attributes.ownership_Kind = ownershipKind; + if (reliable) { if (m_numStatefulReaders == m_statefulReaders.size()) { return nullptr; } attributes.reliabilityKind = ReliabilityKind_t::RELIABLE; - StatefulReader &reader = m_statefulReaders[m_numStatefulReaders++]; reader.init(attributes, m_transport); @@ -438,15 +453,30 @@ rtps::Reader *Domain::createReader(Participant &part, const char *topicName, } return &reader; } + } +rtps::Reader *Domain::createReader(Participant &part, const char *topicName, + const char *typeName, bool reliable, + ip4_addr_t mcastaddress) { + return createReader(part,topicName, false,typeName, reliable, OwnershipKind_t::SHARED, mcastaddress); +} + + + rtps::GuidPrefix_t Domain::generateGuidPrefix(ParticipantId_t id) const { GuidPrefix_t prefix = Config::BASE_GUID_PREFIX; #if defined(unix) || defined(__unix__) srand(time(nullptr)); -#else +#elseif defined(CHIBIOS) + unsigned int seed = (int)chVTGetSystemTimeX(); + srand(seed); +#elseif defined(CHIBIOS) unsigned int seed = (int)xTaskGetTickCount(); srand(seed); +#else + unsigned int seed = (int)chVTGetSystemTimeX(); + srand(seed); #endif for (auto i = 0; i < rtps::Config::BASE_GUID_PREFIX.id.size(); i++) { prefix.id[i] = (rand() % 256); diff --git a/src/entities/Participant.cpp b/src/entities/Participant.cpp index f1e42339..07fc6c23 100644 --- a/src/entities/Participant.cpp +++ b/src/entities/Participant.cpp @@ -1,3 +1,4 @@ + /* The MIT License Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University @@ -163,10 +164,12 @@ rtps::Reader *Participant::getReaderByWriterId(const Guid_t &guid) const { rtps::Writer * Participant::getMatchingWriter(const TopicData &readerTopicData) const { for (uint8_t i = 0; i < m_numWriters; ++i) { - if (m_writers[i]->m_attributes.matchesTopicOf(readerTopicData) && - (readerTopicData.reliabilityKind == ReliabilityKind_t::BEST_EFFORT || - m_writers[i]->m_attributes.reliabilityKind == - ReliabilityKind_t::RELIABLE)) { + bool topic = m_writers[i]->m_attributes.matchesTopicOf(readerTopicData); + bool reliable = (readerTopicData.reliabilityKind == ReliabilityKind_t::BEST_EFFORT || + m_writers[i]->m_attributes.reliabilityKind == ReliabilityKind_t::RELIABLE ); + + bool ownership = m_writers[i]->m_attributes.ownership_Kind == readerTopicData.ownership_Kind; + if(reliable && ownership && topic){ return m_writers[i]; } } @@ -176,10 +179,13 @@ Participant::getMatchingWriter(const TopicData &readerTopicData) const { rtps::Reader * Participant::getMatchingReader(const TopicData &writerTopicData) const { for (uint8_t i = 0; i < m_numReaders; ++i) { - if (m_readers[i]->m_attributes.matchesTopicOf(writerTopicData) && - (writerTopicData.reliabilityKind == ReliabilityKind_t::RELIABLE || + bool topic = m_readers[i]->m_attributes.matchesTopicOf(writerTopicData); + bool reliabale = (writerTopicData.reliabilityKind == ReliabilityKind_t::RELIABLE || m_readers[i]->m_attributes.reliabilityKind == - ReliabilityKind_t::BEST_EFFORT)) { + ReliabilityKind_t::BEST_EFFORT); + + bool ownership = m_readers[i]->m_attributes.ownership_Kind == writerTopicData.ownership_Kind; + if(ownership && reliabale && topic){ return m_readers[i]; } } diff --git a/src/entities/StatelessReader.cpp b/src/entities/StatelessReader.cpp index be3a6a99..e8ba372c 100644 --- a/src/entities/StatelessReader.cpp +++ b/src/entities/StatelessReader.cpp @@ -48,9 +48,52 @@ void StatelessReader::init(const TopicData &attributes) { } } +bool StatelessReader::isOwner(InstanceHandle_t &handle, WriterProxy *proxy){ + for(auto &instance : m_instances){ + bool keyEqual = instance.handle == handle; + if(instance.handle == handle){ + if(instance.owner == nullptr){ + instance.owner = proxy; + return true; + } + if(instance.owner == proxy){ + return true; + } + else{ + if(proxy->ownershipStrength < instance.owner->ownershipStrength){ + return false; + } + else if(proxy->ownershipStrength > instance.owner->ownershipStrength){ + instance.owner = proxy; + return true; + } + else{//equal strength , just pick the first one + return false; + } + } + } + }//Instance not knwon + Instance_t instance; + instance.owner = proxy; + instance.handle = handle; + m_instances.add(instance); + return true; +} + void StatelessReader::newChange(const ReaderCacheChange &cacheChange) { if (m_callback != nullptr) { - m_callback(m_callee, cacheChange); + if(m_attributes.ownership_Kind == OwnershipKind_t::EXCLUSIVE) { + InstanceHandle_t handle; + m_keyCallback(cacheChange.getData(), cacheChange.getDataSize(), handle); + for (auto &proxy:m_proxies) { + if (cacheChange.writerGuid == proxy.remoteWriterGuid && isOwner(handle, &proxy)) { // + m_callback(m_callee, cacheChange); + } + } + } + else{ + m_callback(m_callee, cacheChange); + } } } @@ -59,7 +102,7 @@ void StatelessReader::registerCallback(ddsReaderCallback_fp cb, void *callee) { m_callback = cb; m_callee = callee; // It's okay if this is null } else { -#if (SLR_VERBOSE && RTPS_GLOBAL_VERBOSE) +#if SLR_VERBOSE SLR_LOG("Passed callback is nullptr\n"); #endif } @@ -76,6 +119,14 @@ bool StatelessReader::addNewMatchedWriter(const WriterProxy &newProxy) { void StatelessReader::removeWriter(const Guid_t &guid) { Lock lock(m_mutex); + auto isElementToRemove_Instance = [&](const Instance_t &instance){ + return instance.owner->remoteWriterGuid == guid; + }; + auto thunk_instance = [](void *arg, const Instance_t &value) { + return (*static_cast(arg))(value); + }; + m_instances.remove(thunk_instance, &isElementToRemove_Instance); + auto isElementToRemove = [&](const WriterProxy &proxy) { return proxy.remoteWriterGuid == guid; }; @@ -89,6 +140,13 @@ void StatelessReader::removeWriter(const Guid_t &guid) { void StatelessReader::removeWriterOfParticipant( const GuidPrefix_t &guidPrefix) { Lock lock(m_mutex); + auto isElementToRemove_Instance = [&](const Instance_t &instance){ + return instance.owner->remoteWriterGuid.prefix == guidPrefix; + }; + auto thunk_instance = [](void *arg, const Instance_t &value) { + return (*static_cast(arg))(value); + }; + m_instances.remove(thunk_instance, &isElementToRemove_Instance); auto isElementToRemove = [&](const WriterProxy &proxy) { return proxy.remoteWriterGuid.prefix == guidPrefix; }; @@ -105,4 +163,10 @@ bool StatelessReader::onNewHeartbeat(const SubmessageHeartbeat &, return true; } +void StatelessReader::registerKeyCallback(ddsGetKey_Callback_fp cb){ + if(cb != nullptr){ + m_keyCallback = cb; + } +} + #undef SLR_VERBOSE diff --git a/src/messages/MessageReceiver.cpp b/src/messages/MessageReceiver.cpp index fc6e9fe2..8bb6971e 100644 --- a/src/messages/MessageReceiver.cpp +++ b/src/messages/MessageReceiver.cpp @@ -120,6 +120,9 @@ bool MessageReceiver::processSubmessage(MessageProcessingInfo &msgInfo, RECV_LOG("Info_TS submessage not relevant.\n"); success = true; // Not relevant now break; + case SubmessageKind::GAP: + //success = processGapSubmessage(&msgInfo, ) + break; default: RECV_LOG("Submessage of type %u currently not supported. Skipping..\n", static_cast(submsgHeader.submessageId));