Skip to content

Commit aa9d785

Browse files
committed
improvement of udp punching hole
- RTMFP : added function parameter to be called when a new address is discovered - RTMFPHandshaker : now we send the handshake 30 to peers (if addresses are known) before receiving the list of addresses to grow udp hole punching result - NetGroup : update of manageBestConnections to not have more peers than wanted
1 parent 3bf9767 commit aa9d785

File tree

8 files changed

+65
-55
lines changed

8 files changed

+65
-55
lines changed

include/FlowManager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ class FlowManager : public BandWriter {
125125
// Close the session properly or abruptly if parameter is true
126126
virtual void close(bool abrupt);
127127

128-
// Set the host and peer addresses when receiving redirection address (only for P2P)
129-
virtual void setAddresses(const Mona::SocketAddress& address, const PEER_LIST_ADDRESS_TYPE& addresses) {}
128+
// Set the host and peer addresses when receiving redirection request (only for P2P)
129+
virtual void addAddress(const Mona::SocketAddress& address, RTMFP::AddressType type) {}
130130

131131
protected:
132132

include/P2PSession.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ class P2PSession : public FlowManager,
126126
// Return the diffie hellman object (related to main session)
127127
virtual bool diffieHellman(Mona::DiffieHellman* &pDh);
128128

129-
// Set the host and peer addresses when receiving redirection address (only for P2P)
130-
virtual void setAddresses(const Mona::SocketAddress& address, const PEER_LIST_ADDRESS_TYPE& addresses) { hostAddress = address; _knownAddresses.insert(addresses.begin(), addresses.end()); }
129+
// Set the host and peer addresses when receiving redirection request (only for P2P)
130+
virtual void addAddress(const Mona::SocketAddress& address, RTMFP::AddressType type);
131131

132132
/*** Public members ***/
133133

include/RTMFP.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ along with Librtmfp. If not, see <http://www.gnu.org/licenses/>.
3232

3333
#include "Mona/Logs.h"
3434

35-
#define RTMFP_LIB_VERSION 0x0102000C // (1.2.12)
35+
#define RTMFP_LIB_VERSION 0x0102000D // (1.2.13)
3636

3737
#define RTMFP_DEFAULT_KEY (UInt8*)"Adobe Systems 02"
3838
#define RTMFP_KEY_SIZE 0x10
@@ -101,7 +101,7 @@ class RTMFP : virtual Mona::Static {
101101
FAILED
102102
};
103103

104-
static bool ReadAddress(Mona::BinaryReader& reader, Mona::SocketAddress& address, Mona::UInt8& addressType);
104+
static bool ReadAddress(Mona::BinaryReader& reader, Mona::SocketAddress& address, AddressType& addressType);
105105
static Mona::BinaryWriter& WriteAddress(Mona::BinaryWriter& writer, const Mona::SocketAddress& address, AddressType type=ADDRESS_UNSPECIFIED);
106106

107107
static Mona::UInt32 Unpack(Mona::BinaryReader& reader);
@@ -126,7 +126,7 @@ class RTMFP : virtual Mona::Static {
126126

127127
// Read addresses from the buffer reader
128128
// return : True if at least an address has been read
129-
static bool ReadAddresses(Mona::BinaryReader& reader, PEER_LIST_ADDRESS_TYPE& addresses, Mona::SocketAddress& hostAddress);
129+
static bool ReadAddresses(Mona::BinaryReader& reader, PEER_LIST_ADDRESS_TYPE& addresses, Mona::SocketAddress& hostAddress, std::function<void(const Mona::SocketAddress&, AddressType)> onNewAddress);
130130

131131
// Return a random iterator which respect the isAllowed condition
132132
template<class ContainerType, typename Iterator>

sources/NetGroup.cpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -517,16 +517,20 @@ void NetGroup::manageBestConnections() {
517517
}
518518

519519
// Connect to new peers
520-
for (auto it : _bestList) {
521-
if (_mapPeers.find(it) == _mapPeers.end()) {
522-
auto itNode = _mapHeardList.find(it);
520+
int nbConnect = _bestList.size() - _mapPeers.size(); // trick to keep the target count of peers
521+
auto it2Connect = _bestList.begin();
522+
while (nbConnect > 0 && it2Connect != _bestList.end()) {
523+
if (_mapPeers.find(*it2Connect) == _mapPeers.end()) {
524+
auto itNode = _mapHeardList.find(*it2Connect);
523525
if (itNode == _mapHeardList.end())
524-
WARN("Unable to find the peer ", it) // implementation error, should not happen
526+
WARN("Unable to find the peer ", *it2Connect) // implementation error, should not happen
525527
else {
526-
DEBUG("Best Peer - Connecting to peer ", it, "...")
527-
_conn.connect2Peer(it.c_str(), stream.c_str(), itNode->second.addresses, itNode->second.hostAddress);
528+
DEBUG("Best Peer - Connecting to peer ", *it2Connect, "...")
529+
_conn.connect2Peer(it2Connect->c_str(), stream.c_str(), itNode->second.addresses, itNode->second.hostAddress);
530+
--nbConnect;
528531
}
529532
}
533+
++it2Connect;
530534
}
531535
}
532536

@@ -581,7 +585,7 @@ void NetGroup::ReadGroupConfig(shared_ptr<RTMFPGroupConfig>& parameters, PacketR
581585
bool NetGroup::readGroupReport(PacketReader& packet) {
582586
string tmp, newPeerId, rawId;
583587
SocketAddress myAddress, serverAddress;
584-
UInt8 addressType;
588+
RTMFP::AddressType addressType;
585589
PEER_LIST_ADDRESS_TYPE listAddresses;
586590
SocketAddress hostAddress(_conn.address());
587591

@@ -607,7 +611,7 @@ bool NetGroup::readGroupReport(PacketReader& packet) {
607611
return false;
608612
}
609613
BinaryReader peerAddressReader(packet.current(), size - 1);
610-
RTMFP::ReadAddresses(peerAddressReader, listAddresses, hostAddress);
614+
RTMFP::ReadAddresses(peerAddressReader, listAddresses, hostAddress, [](const SocketAddress&, RTMFP::AddressType) {});
611615
packet.next(size - 1);
612616

613617
// Loop on each peer of the NetGroup
@@ -645,13 +649,13 @@ bool NetGroup::readGroupReport(PacketReader& packet) {
645649
if (itHeardList == _mapHeardList.end()) {
646650
hostAddress.clear();
647651
listAddresses.clear();
648-
RTMFP::ReadAddresses(addressReader, listAddresses, hostAddress);
652+
RTMFP::ReadAddresses(addressReader, listAddresses, hostAddress, [](const SocketAddress&, RTMFP::AddressType) {});
649653
newPeers = true;
650654
addPeer2HeardList(newPeerId.c_str(), rawId.data(), listAddresses, hostAddress, time); // To avoid memory sharing we use c_str() (copy-on-write implementation on linux)
651655
}
652-
// Else if no host is set update the addresses
653-
else if (!itHeardList->second.hostAddress)
654-
RTMFP::ReadAddresses(addressReader, itHeardList->second.addresses, itHeardList->second.hostAddress);
656+
// Else update the addresses
657+
else
658+
RTMFP::ReadAddresses(addressReader, itHeardList->second.addresses, itHeardList->second.hostAddress, [](const SocketAddress&, RTMFP::AddressType) {});
655659

656660
}
657661
packet.next(size);

sources/P2PSession.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,18 +504,25 @@ bool P2PSession::onHandshake38(const SocketAddress& address, shared_ptr<Handshak
504504
return true;
505505
}
506506

507-
UDPSocket& P2PSession::socket(Mona::IPAddress::Family family) {
507+
UDPSocket& P2PSession::socket(IPAddress::Family family) {
508508
return _parent->socket(family);
509509
}
510510

511-
void P2PSession::removeHandshake(std::shared_ptr<Handshake>& pHandshake) {
511+
void P2PSession::removeHandshake(shared_ptr<Handshake>& pHandshake) {
512512
_parent->removeHandshake(pHandshake);
513513
}
514514

515515
bool P2PSession::diffieHellman(DiffieHellman* &pDh) {
516516
return _parent->diffieHellman(pDh);
517517
}
518518

519+
void P2PSession::addAddress(const SocketAddress& address, RTMFP::AddressType type) {
520+
if (type == RTMFP::ADDRESS_REDIRECTION)
521+
hostAddress == address;
522+
else
523+
_knownAddresses.emplace(address, type);
524+
}
525+
519526
void P2PSession::buildGroupKey() {
520527

521528
// Compile encrypted keys

sources/RTMFP.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ along with Librtmfp. If not, see <http://www.gnu.org/licenses/>.
2525
using namespace std;
2626
using namespace Mona;
2727

28-
bool RTMFP::ReadAddress(BinaryReader& reader, SocketAddress& address, UInt8& addressType) {
28+
bool RTMFP::ReadAddress(BinaryReader& reader, SocketAddress& address, AddressType& addressType) {
2929
string data;
30-
addressType = reader.read8();
30+
addressType = (AddressType)reader.read8();
3131
reader.read<string>((addressType & 0x80) ? sizeof(in6_addr) : sizeof(in_addr), data);
3232
in_addr addrV4;
3333
in6_addr addrV6;
@@ -104,21 +104,30 @@ void RTMFP::Write7BitValue(string& buff,UInt64 value) {
104104
String::Append(buff, (char)(max ? value&0xFF : value&0x7F));
105105
}
106106

107-
bool RTMFP::ReadAddresses(BinaryReader& reader, PEER_LIST_ADDRESS_TYPE& addresses, SocketAddress& hostAddress) {
107+
bool RTMFP::ReadAddresses(BinaryReader& reader, PEER_LIST_ADDRESS_TYPE& addresses, SocketAddress& hostAddress, function<void(const SocketAddress&, AddressType)> onNewAddress) {
108108

109109
// Read all addresses
110110
SocketAddress address;
111-
UInt8 addressType;
111+
AddressType addressType;
112112
while (reader.available()) {
113113

114114
RTMFP::ReadAddress(reader, address, addressType);
115115
switch (addressType & 0x0F) {
116116
case RTMFP::ADDRESS_LOCAL:
117-
case RTMFP::ADDRESS_PUBLIC:
118-
addresses.emplace(address, (RTMFP::AddressType)addressType);
117+
case RTMFP::ADDRESS_PUBLIC: {
118+
auto itAddress = addresses.lower_bound(address);
119+
if (itAddress == addresses.end() || itAddress->first != address) { // new address?
120+
addresses.emplace_hint(itAddress, address, addressType);
121+
onNewAddress(address, addressType);
122+
}
119123
break;
124+
}
120125
case RTMFP::ADDRESS_REDIRECTION:
121-
hostAddress = address; break;
126+
if (hostAddress != address) { // new address?
127+
hostAddress = address;
128+
onNewAddress(address, addressType);
129+
}
130+
break;
122131
}
123132
TRACE("IP Address : ", address.toString(), " - type : ", addressType)
124133
}

sources/RTMFPHandshaker.cpp

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,9 @@ void RTMFPHandshaker::manage() {
142142
_address.set(pHandshake->hostAddress);
143143
sendHandshake30(pHandshake->pSession->epd(), itHandshake->first);
144144
}
145-
// If we are not in p2p mode we must send to all known addresses
146-
if (!pHandshake->isP2P) {
147-
for (auto itAddresses : pHandshake->listAddresses) {
148-
_address.set(itAddresses.first);
149-
sendHandshake30(pHandshake->pSession->epd(), itHandshake->first);
150-
}
145+
for (auto itAddresses : pHandshake->listAddresses) {
146+
_address.set(itAddresses.first);
147+
sendHandshake30(pHandshake->pSession->epd(), itHandshake->first);
151148
}
152149
if (pHandshake->status == RTMFP::STOPPED)
153150
pHandshake->status = RTMFP::HANDSHAKE30;
@@ -450,10 +447,10 @@ void RTMFPHandshaker::handleRedirection(BinaryReader& reader) {
450447
ERROR("Unexpected tag size : ", tagSize)
451448
return;
452449
}
453-
string tagReceived;
454-
reader.read(16, tagReceived);
450+
string tag;
451+
reader.read(16, tag);
455452

456-
auto itTag = _mapTags.find(tagReceived);
453+
auto itTag = _mapTags.find(tag);
457454
if (itTag == _mapTags.end()) {
458455
DEBUG("Unexpected tag received from ", _address.toString(), ", possible old request")
459456
return;
@@ -467,27 +464,20 @@ void RTMFPHandshaker::handleRedirection(BinaryReader& reader) {
467464
DEBUG("Redirection message ignored, we have already received handshake 70")
468465
return;
469466
}
467+
DEBUG(pHandshake->isP2P ? "Server has sent to us the peer addresses of responders" : "Server redirection messsage, sending back the handshake 30")
470468

471469
// Read addresses
472470
SocketAddress hostAddress;
473-
RTMFP::ReadAddresses(reader, pHandshake->listAddresses, hostAddress);
474-
475-
if (pHandshake->isP2P) {
476-
DEBUG("Server has sent to us the peer addresses of responders") // (we are the initiator)
477-
478-
// Reset the host address if it changes
479-
if (hostAddress && hostAddress != pHandshake->hostAddress) {
480-
pHandshake->pSession->setAddresses(hostAddress, pHandshake->listAddresses);
481-
pHandshake->hostAddress = hostAddress;
482-
}
483-
484-
// Send the handshake 30 to all addresses
485-
for (auto itAddresses : pHandshake->listAddresses) {
486-
_address.set(itAddresses.first);
487-
sendHandshake30(pHandshake->pSession->epd(), tagReceived);
471+
RTMFP::ReadAddresses(reader, pHandshake->listAddresses, pHandshake->hostAddress, [this, pHandshake, hostAddress, tag](const SocketAddress& address, RTMFP::AddressType type) {
472+
if (pHandshake->isP2P)
473+
pHandshake->pSession->addAddress(address, type);
474+
475+
// Send the handshake 30 to new address
476+
if (type != RTMFP::ADDRESS_REDIRECTION) {
477+
_address.set(address);
478+
sendHandshake30(pHandshake->pSession->epd(), tag);
488479
}
489-
} else
490-
DEBUG("Server redirection messsage, sending back the handshake 30")
480+
});
491481
}
492482

493483
void RTMFPHandshaker::flush(UInt8 marker, UInt32 size) {

sources/RTMFPSession.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void RTMFPSession::handleP2PAddressExchange(PacketReader& reader) {
617617
string buff;
618618
reader.read(PEER_ID_SIZE, buff);
619619
SocketAddress address;
620-
UInt8 addressType;
620+
RTMFP::AddressType addressType;
621621
RTMFP::ReadAddress(reader, address, addressType);
622622

623623
string tag;

0 commit comments

Comments
 (0)