diff --git a/CMakeLists.txt b/CMakeLists.txt index 3829f491..587a50a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,6 +60,8 @@ set(HEADER_FILES set( SOURCE_FILES src/Consumer.cpp + src/DataConsumer.cpp + src/DataProducer.cpp src/Device.cpp src/Handler.cpp src/Logger.cpp diff --git a/include/DataConsumer.hpp b/include/DataConsumer.hpp new file mode 100644 index 00000000..ef78065c --- /dev/null +++ b/include/DataConsumer.hpp @@ -0,0 +1,85 @@ +#ifndef MSC_DATACONSUMER_HPP +#define MSC_DATACONSUMER_HPP + +#include +#include +#include + +namespace mediasoupclient +{ + class RecvTransport; + + class DataConsumer : public webrtc::DataChannelObserver + { + public: + class PrivateListener + { + public: + virtual void OnClose(DataConsumer* dataConsumer) = 0; + }; + + class Listener + { + public: + // DataChannel state changes. + virtual void OnConnecting(DataConsumer* dataConsumer) = 0; + virtual void OnOpen(DataConsumer* dataConsumer) = 0; + virtual void OnClosing(DataConsumer* dataConsumer) = 0; + virtual void OnClose(DataConsumer* dataConsumer) = 0; + + // A data buffer was successfully received. + virtual void OnMessage(DataConsumer* dataConsumer, const webrtc::DataBuffer& buffer) = 0; + + virtual void OnTransportClose(DataConsumer* dataConsumer) = 0; + }; + + private: + DataConsumer( + Listener* listener, + PrivateListener* privateListener, + const std::string& id, + const std::string& dataProducerId, + rtc::scoped_refptr dataChannel, + const nlohmann::json& sctpStreamParameters, + const nlohmann::json& appData); + + public: + const std::string& GetId() const; + std::string GetLocalId() const; + const std::string& GetDataProducerId() const; + const nlohmann::json& GetSctpStreamParameters() const; + webrtc::DataChannelInterface::DataState GetReadyState() const; + std::string GetLabel() const; + std::string GetProtocol() const; + const nlohmann::json& GetAppData() const; + bool IsClosed() const; + void Close(); + + private: + void TransportClosed(); + + // RecvTransport will create instances and call private member TransporClosed. + friend RecvTransport; + + private: + Listener* listener; + PrivateListener* privateListener; + std::string id; + std::string dataProducerId; + rtc::scoped_refptr dataChannel; + bool closed{ false }; + nlohmann::json sctpParameters; + nlohmann::json appData; + + /* Virtual methods inherited from webrtc::DataChannelObserver. */ + public: + // The data channel state has changed. + void OnStateChange() override; + // A data buffer was successfully received. + void OnMessage(const webrtc::DataBuffer& buffer) override; + // The data channel's buffered_amount has changed. + void OnBufferedAmountChange(uint64_t sentDataSize) override; + }; +} // namespace mediasoupclient + +#endif diff --git a/include/DataProducer.hpp b/include/DataProducer.hpp new file mode 100644 index 00000000..77f920df --- /dev/null +++ b/include/DataProducer.hpp @@ -0,0 +1,78 @@ +#ifndef MSC_DATAPRODUCER_HPP +#define MSC_DATAPRODUCER_HPP + +#include "Handler.hpp" +#include +#include +#include + +namespace mediasoupclient +{ + class SendTransport; + + class DataProducer : public webrtc::DataChannelObserver + { + public: + class PrivateListener + { + public: + virtual void OnClose(DataProducer* dataProducer) = 0; + }; + + /* Public Listener API */ + class Listener + { + public: + // DataChannel state changes. + virtual void OnOpen(DataProducer* dataProducer) = 0; + virtual void OnClose(DataProducer* dataProducer) = 0; + virtual void OnBufferedAmountChange(DataProducer* dataProducer, uint64_t sentDataSize) = 0; + + virtual void OnTransportClose(DataProducer* dataProducer) = 0; + }; + + private: + PrivateListener* privateListener; + Listener* listener; + std::string id; + rtc::scoped_refptr dataChannel; + bool closed; + nlohmann::json sctpStreamParameters; + nlohmann::json appData; + void TransportClosed(); + + friend SendTransport; + + private: + DataProducer( + DataProducer::PrivateListener* privateListener, + DataProducer::Listener* listener, + const std::string& id, + rtc::scoped_refptr dataChannel, + const nlohmann::json& sctpStreamParameters, + const nlohmann::json& appData); + + public: + const std::string& GetId() const; + std::string GetLocalId() const; + const nlohmann::json& GetSctpStreamParameters() const; + webrtc::DataChannelInterface::DataState GetReadyState() const; + std::string GetLabel(); + std::string GetProtocol(); + uint64_t GetBufferedAmount() const; + const nlohmann::json& GetAppData() const; + bool IsClosed() const; + void Close(); + void Send(const webrtc::DataBuffer& buffer); + + /* Virtual methods inherited from webrtc::DataChannelObserver. */ + public: + void OnStateChange() override; + // A data buffer was successfully received. + void OnMessage(const webrtc::DataBuffer& buffer) override; + // The data channel's buffered_amount has changed. + void OnBufferedAmountChange(uint64_t sentDataSize) override; + }; +} // namespace mediasoupclient + +#endif diff --git a/include/Handler.hpp b/include/Handler.hpp index 47267039..11151323 100644 --- a/include/Handler.hpp +++ b/include/Handler.hpp @@ -26,6 +26,14 @@ namespace mediasoupclient webrtc::PeerConnectionInterface::IceConnectionState connectionState) = 0; }; + public: + struct DataChannel + { + std::string localId; + rtc::scoped_refptr dataChannel; + nlohmann::json sctpStreamParameters; + }; + public: static nlohmann::json GetNativeRtpCapabilities( const PeerConnection::Options* peerConnectionOptions = nullptr); @@ -47,8 +55,7 @@ namespace mediasoupclient virtual void RestartIce(const nlohmann::json& iceParameters) = 0; protected: - void SetupTransport( - const std::string& localDtlsRole, nlohmann::json localSdpObject = nlohmann::json::object()); + void SetupTransport(const std::string& localDtlsRole, nlohmann::json& localSdpObject); /* Methods inherited from PeerConnectionListener. */ public: @@ -58,19 +65,21 @@ namespace mediasoupclient // PrivateListener instance. PrivateListener* privateListener{ nullptr }; // Remote SDP instance. - std::unique_ptr remoteSdp; + std::unique_ptr remoteSdp{ nullptr }; // Got transport local and remote parameters. bool transportReady{ false }; // Map of RTCTransceivers indexed by MID. - std::unordered_map mapMidTransceiver; + std::unordered_map mapMidTransceiver{}; // PeerConnection instance. - std::unique_ptr pc; + std::unique_ptr pc{ nullptr }; + bool hasDataChannelMediaSection = false; + uint32_t nextSendSctpStreamId = 0; }; class SendHandler : public Handler { public: - struct SendData + struct SendResult { std::string localId; webrtc::RtpSenderInterface* rtpSender{ nullptr }; @@ -89,7 +98,7 @@ namespace mediasoupclient const nlohmann::json& sendingRemoteRtpParametersByKind = nlohmann::json()); public: - SendData Send( + SendResult Send( webrtc::MediaStreamTrackInterface* track, std::vector* encodings, const nlohmann::json* codecOptions); @@ -98,6 +107,7 @@ namespace mediasoupclient void SetMaxSpatialLayer(const std::string& localId, uint8_t spatialLayer); nlohmann::json GetSenderStats(const std::string& localId); void RestartIce(const nlohmann::json& iceParameters) override; + DataChannel SendDataChannel(const std::string& label, webrtc::DataChannelInit dataChannelInit); private: // Generic sending RTP parameters for audio and video. @@ -110,7 +120,7 @@ namespace mediasoupclient class RecvHandler : public Handler { public: - struct RecvData + struct RecvResult { std::string localId; webrtc::RtpReceiverInterface* rtpReceiver{ nullptr }; @@ -126,10 +136,12 @@ namespace mediasoupclient const nlohmann::json& sctpParameters, const PeerConnection::Options* peerConnectionOptions); - RecvData Receive(const std::string& id, const std::string& kind, const nlohmann::json* rtpParameters); + RecvResult Receive( + const std::string& id, const std::string& kind, const nlohmann::json* rtpParameters); void StopReceiving(const std::string& localId); nlohmann::json GetReceiverStats(const std::string& localId); void RestartIce(const nlohmann::json& iceParameters) override; + DataChannel ReceiveDataChannel(const std::string& label, webrtc::DataChannelInit dataChannelInit); }; } // namespace mediasoupclient diff --git a/include/PeerConnection.hpp b/include/PeerConnection.hpp index a704096b..93b81470 100644 --- a/include/PeerConnection.hpp +++ b/include/PeerConnection.hpp @@ -130,6 +130,8 @@ namespace mediasoupclient nlohmann::json GetStats(); nlohmann::json GetStats(rtc::scoped_refptr selector); nlohmann::json GetStats(rtc::scoped_refptr selector); + rtc::scoped_refptr CreateDataChannel( + const std::string& label, const webrtc::DataChannelInit* config); private: // Signaling and worker threads. diff --git a/include/Producer.hpp b/include/Producer.hpp index a37b87ae..d8be084a 100644 --- a/include/Producer.hpp +++ b/include/Producer.hpp @@ -64,7 +64,7 @@ namespace mediasoupclient private: void TransportClosed(); - /* SendTransport will create instances and call private member TransporClosed */ + /* SendTransport will create instances and call private member TransportClosed */ friend SendTransport; private: diff --git a/include/Transport.hpp b/include/Transport.hpp index ebf60651..6f0bb027 100644 --- a/include/Transport.hpp +++ b/include/Transport.hpp @@ -2,13 +2,16 @@ #define MSC_TRANSPORT_HPP #include "Consumer.hpp" +#include "DataConsumer.hpp" +#include "DataProducer.hpp" #include "Handler.hpp" #include "Producer.hpp" + #include #include // webrtc::MediaStreamTrackInterface #include // webrtc::PeerConnectionInterface #include // webrtc::RtpEncodingParameters -#include + #include #include #include // unique_ptr @@ -66,6 +69,8 @@ namespace mediasoupclient size_t maxSctpMessageSize{ 0u }; // Whether the Consumer for RTP probation has been created. bool probatorConsumerCreated{ false }; + // Whether this transport supports DataChannel. + bool hasSctpParameters{ false }; private: // Listener. @@ -82,7 +87,9 @@ namespace mediasoupclient nlohmann::json appData = nlohmann::json::object(); }; - class SendTransport : public Transport, public Producer::PrivateListener + class SendTransport : public Transport, + public Producer::PrivateListener, + public DataProducer::PrivateListener { public: /* Public Listener API */ @@ -94,6 +101,13 @@ namespace mediasoupclient const std::string& kind, nlohmann::json rtpParameters, const nlohmann::json& appData) = 0; + + virtual std::future OnProduceData( + SendTransport* transport, + const nlohmann::json& sctpStreamParameters, + const std::string& label, + const std::string& protocol, + const nlohmann::json& appData) = 0; }; private: @@ -120,6 +134,15 @@ namespace mediasoupclient const nlohmann::json* codecOptions, const nlohmann::json& appData = nlohmann::json::object()); + DataProducer* ProduceData( + DataProducer::Listener* listener, + const std::string& label = "", + const std::string& protocol = "", + bool ordered = true, + int maxRetransmits = 0, + int maxPacketLifeTime = 0, + const nlohmann::json& appData = nlohmann::json::object()); + /* Virtual methods inherited from Transport. */ public: void Close() override; @@ -127,6 +150,7 @@ namespace mediasoupclient /* Virtual methods inherited from Producer::PrivateListener. */ public: void OnClose(Producer* producer) override; + void OnClose(DataProducer* dataProducer) override; void OnReplaceTrack(const Producer* producer, webrtc::MediaStreamTrackInterface* track) override; void OnSetMaxSpatialLayer(const Producer* producer, uint8_t maxSpatialLayer) override; nlohmann::json OnGetStats(const Producer* producer) override; @@ -136,6 +160,7 @@ namespace mediasoupclient Listener* listener; // Map of Producers indexed by id. std::unordered_map producers; + std::unordered_map dataProducers; // Whether we can produce audio/video based on computed extended RTP // capabilities. const std::map* canProduceByKind{ nullptr }; @@ -143,7 +168,9 @@ namespace mediasoupclient std::unique_ptr sendHandler; }; - class RecvTransport : public Transport, public Consumer::PrivateListener + class RecvTransport : public Transport, + public Consumer::PrivateListener, + public DataConsumer::PrivateListener { private: RecvTransport( @@ -171,6 +198,14 @@ namespace mediasoupclient nlohmann::json* rtpParameters, const nlohmann::json& appData = nlohmann::json::object()); + DataConsumer* ConsumeData( + DataConsumer::Listener* listener, + const std::string& id, + const std::string& producerId, + const std::string& label, + const std::string& protocol = std::string(), + const nlohmann::json& appData = nlohmann::json::object()); + /* Virtual methods inherited from Transport. */ public: void Close() override; @@ -178,11 +213,13 @@ namespace mediasoupclient /* Virtual methods inherited from Consumer::PrivateListener. */ public: void OnClose(Consumer* consumer) override; + void OnClose(DataConsumer* consumer) override; nlohmann::json OnGetStats(const Consumer* consumer) override; private: // Map of Consumers indexed by id. std::unordered_map consumers; + std::unordered_map dataConsumers; // SendHandler instance. std::unique_ptr recvHandler; }; diff --git a/include/sdp/RemoteSdp.hpp b/include/sdp/RemoteSdp.hpp index 3b4e49b9..c0bec081 100644 --- a/include/sdp/RemoteSdp.hpp +++ b/include/sdp/RemoteSdp.hpp @@ -34,6 +34,10 @@ namespace mediasoupclient nlohmann::json& offerRtpParameters, nlohmann::json& answerRtpParameters, const nlohmann::json* codecOptions); + + void SendSctpAssociation(nlohmann::json& offerMediaObject); + void RecvSctpAssociation(); + void Receive( const std::string& mid, const std::string& kind, diff --git a/src/DataConsumer.cpp b/src/DataConsumer.cpp new file mode 100644 index 00000000..627ea12c --- /dev/null +++ b/src/DataConsumer.cpp @@ -0,0 +1,185 @@ +#define MSC_CLASS "DataConsumer" + +#include "DataConsumer.hpp" +#include "Logger.hpp" +#include "MediaSoupClientErrors.hpp" + +using json = nlohmann::json; + +namespace mediasoupclient +{ + DataConsumer::DataConsumer( + DataConsumer::Listener* listener, + DataConsumer::PrivateListener* privateListener, + const std::string& id, + const std::string& dataProducerId, + rtc::scoped_refptr dataChannel, + const json& sctpStreamParameters, + const json& appData) + : listener(listener), privateListener(privateListener), id(id), dataProducerId(dataProducerId), + dataChannel(dataChannel), sctpParameters(sctpStreamParameters), appData(appData) + { + MSC_TRACE(); + + this->dataChannel->RegisterObserver(this); + } + + // The data channel state has changed. + void DataConsumer::OnStateChange() + { + MSC_TRACE(); + + webrtc::DataChannelInterface::DataState state = this->dataChannel->state(); + + switch (state) + { + case webrtc::DataChannelInterface::DataState::kConnecting: + this->listener->OnConnecting(this); + break; + case webrtc::DataChannelInterface::DataState::kOpen: + this->listener->OnOpen(this); + break; + case webrtc::DataChannelInterface::DataState::kClosing: + this->listener->OnClosing(this); + break; + case webrtc::DataChannelInterface::DataState::kClosed: + this->listener->OnClose(this); + break; + default: + MSC_ERROR("unknown state %s", webrtc::DataChannelInterface::DataStateString(state)); + break; + } + } + + // A data buffer was successfully received. + void DataConsumer::OnMessage(const webrtc::DataBuffer& buffer) + { + MSC_TRACE(); + + this->listener->OnMessage(this, buffer); + } + + // The data channel's buffered_amount has changed. + void DataConsumer::OnBufferedAmountChange(uint64_t /*sentDataSize*/) + { + MSC_TRACE(); + // Should not happen on consumer. + } + + /** + * DataConsumer id. + */ + const std::string& DataConsumer::GetId() const + { + MSC_TRACE(); + + return this->id; + } + + std::string DataConsumer::GetLocalId() const + { + MSC_TRACE(); + + return std::to_string(this->dataChannel->id()); + } + + /** + * Associated DataProducer id. + */ + const std::string& DataConsumer::GetDataProducerId() const + { + MSC_TRACE(); + + return this->dataProducerId; + } + + /** + * SCTP stream parameters. + */ + const json& DataConsumer::GetSctpStreamParameters() const + { + MSC_TRACE(); + + return this->sctpParameters; + } + + /** + * DataChannel readyState. + */ + webrtc::DataChannelInterface::DataState DataConsumer::GetReadyState() const + { + MSC_TRACE(); + + return this->dataChannel->state(); + } + + /** + * DataChannel label. + */ + std::string DataConsumer::GetLabel() const + { + MSC_TRACE(); + + return this->dataChannel->label(); + } + + /** + * DataChannel protocol. + */ + std::string DataConsumer::GetProtocol() const + { + MSC_TRACE(); + + return this->dataChannel->protocol(); + } + + /** + * App custom data. + */ + const json& DataConsumer::GetAppData() const + { + MSC_TRACE(); + + return this->appData; + } + + /** + * Whether the DataConsumer is closed. + */ + bool DataConsumer::IsClosed() const + { + MSC_TRACE(); + + return this->closed; + } + + /** + * Closes the DataConsumer. + */ + void DataConsumer::Close() + { + MSC_TRACE(); + + if (this->closed) + return; + + this->closed = true; + this->dataChannel->Close(); + this->privateListener->OnClose(this); + } + + /** + * Transport was closed. + */ + void DataConsumer::TransportClosed() + { + MSC_TRACE(); + + if (this->closed) + return; + + this->closed = true; + this->dataChannel->Close(); + this->listener->OnTransportClose(this); + } +} // namespace mediasoupclient diff --git a/src/DataProducer.cpp b/src/DataProducer.cpp new file mode 100644 index 00000000..c53119d5 --- /dev/null +++ b/src/DataProducer.cpp @@ -0,0 +1,160 @@ +#define MSC_CLASS "DataProducer" + +#include "DataProducer.hpp" +#include "Logger.hpp" + +using json = nlohmann::json; + +namespace mediasoupclient +{ + DataProducer::DataProducer( + DataProducer::PrivateListener* privateListener, + DataProducer::Listener* listener, + const std::string& id, + rtc::scoped_refptr dataChannel, + const json& sctpStreamParameters, + const json& appData) + : privateListener(privateListener), listener(listener), id(id), dataChannel(dataChannel), + sctpStreamParameters(sctpStreamParameters), appData(appData) + { + MSC_TRACE(); + + this->dataChannel->RegisterObserver(this); + }; + + const std::string& DataProducer::GetId() const + { + MSC_TRACE(); + + return this->id; + } + + std::string DataProducer::GetLocalId() const + { + MSC_TRACE(); + + return std::to_string(this->dataChannel->id()); + } + + const json& DataProducer::GetSctpStreamParameters() const + { + MSC_TRACE(); + + return this->sctpStreamParameters; + } + + webrtc::DataChannelInterface::DataState DataProducer::GetReadyState() const + { + MSC_TRACE(); + + return this->dataChannel->state(); + } + + std::string DataProducer::GetLabel() + { + MSC_TRACE(); + + return this->dataChannel->label(); + } + + std::string DataProducer::GetProtocol() + { + MSC_TRACE(); + + return this->dataChannel->protocol(); + } + + uint64_t DataProducer::GetBufferedAmount() const + { + MSC_TRACE(); + + return this->dataChannel->buffered_amount(); + } + + const json& DataProducer::GetAppData() const + { + MSC_TRACE(); + + return this->appData; + } + + bool DataProducer::IsClosed() const + { + MSC_TRACE(); + + return this->closed; + } + + void DataProducer::Close() + { + MSC_TRACE(); + + if (this->closed) + return; + + this->closed = true; + this->dataChannel->Close(); + this->privateListener->OnClose(this); + } + + void DataProducer::Send(const webrtc::DataBuffer& buffer) + { + MSC_TRACE(); + + this->dataChannel->Send(buffer); + } + + /** + * Transport was closed. + */ + void DataProducer::TransportClosed() + { + MSC_TRACE(); + + if (this->closed) + return; + + this->closed = true; + this->dataChannel->Close(); + this->listener->OnTransportClose(this); + } + + // The data channel state has changed. + void DataProducer::OnStateChange() + { + MSC_TRACE(); + + webrtc::DataChannelInterface::DataState state = this->dataChannel->state(); + + switch (state) + { + case webrtc::DataChannelInterface::DataState::kConnecting: + break; + case webrtc::DataChannelInterface::DataState::kOpen: + this->listener->OnOpen(this); + break; + case webrtc::DataChannelInterface::DataState::kClosing: + break; + case webrtc::DataChannelInterface::DataState::kClosed: + this->listener->OnClose(this); + break; + default: + MSC_ERROR("unknown state %s", webrtc::DataChannelInterface::DataStateString(state)); + break; + } + } + + // A data buffer was successfully received. + void DataProducer::OnMessage(const webrtc::DataBuffer& /*buffer*/) + { + MSC_ERROR("message received on DataProducer [dataProducer.id:%s]", this->GetId().c_str()); + } + + // The data channel's buffered_amount has changed. + void DataProducer::OnBufferedAmountChange(uint64_t sentDataSize) + { + MSC_TRACE(); + + this->listener->OnBufferedAmountChange(this, sentDataSize); + } +} // namespace mediasoupclient diff --git a/src/Handler.cpp b/src/Handler.cpp index 47b373bc..4a2bb74c 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -4,13 +4,17 @@ #include "Logger.hpp" #include "MediaSoupClientErrors.hpp" #include "PeerConnection.hpp" +#include "ortc.hpp" #include "sdptransform.hpp" #include "sdp/Utils.hpp" #include // PRIu64, etc using json = nlohmann::json; -static json SctpNumStreams = { { "OS", 1024u }, { "MIS", 1024u } }; +constexpr uint16_t SctpNumStreamsOs{ 1024u }; +constexpr uint16_t SctpNumStreamsMis{ 1024u }; + +json SctpNumStreams = { { "OS", SctpNumStreamsOs }, { "MIS", SctpNumStreamsMis } }; // Static functions declaration. static void fillJsonRtpEncodingParameters( @@ -46,9 +50,7 @@ namespace mediasoupclient { MSC_TRACE(); - auto caps = json::object(); - - caps["numStreams"] = SctpNumStreams; + json caps = { { "numStreams", SctpNumStreams } }; return caps; } @@ -94,7 +96,7 @@ namespace mediasoupclient configuration.servers.clear(); - for (auto& iceServerUri : iceServerUris) + for (const auto& iceServerUri : iceServerUris) { webrtc::PeerConnectionInterface::IceServer iceServer; @@ -115,7 +117,7 @@ namespace mediasoupclient return this->privateListener->OnConnectionStateChange(newState); } - void Handler::SetupTransport(const std::string& localDtlsRole, json localSdpObject) + void Handler::SetupTransport(const std::string& localDtlsRole, json& localSdpObject) { MSC_TRACE(); @@ -158,7 +160,7 @@ namespace mediasoupclient this->sendingRemoteRtpParametersByKind = sendingRemoteRtpParametersByKind; }; - SendHandler::SendData SendHandler::Send( + SendHandler::SendResult SendHandler::Send( webrtc::MediaStreamTrackInterface* track, std::vector* encodings, const json* codecOptions) @@ -299,13 +301,98 @@ namespace mediasoupclient // Store in the map. this->mapMidTransceiver[localId] = transceiver; - SendData sendData; + SendResult sendResult; + + sendResult.localId = localId; + sendResult.rtpSender = transceiver->sender(); + sendResult.rtpParameters = sendingRtpParameters; + + return sendResult; + } + + Handler::DataChannel SendHandler::SendDataChannel( + const std::string& label, webrtc::DataChannelInit dataChannelInit) + { + MSC_TRACE(); + + uint16_t streamId = this->nextSendSctpStreamId; + + dataChannelInit.negotiated = true; + dataChannelInit.id = streamId; + + /* clang-format off */ + json sctpStreamParameters = + { + { "streamId", streamId }, + { "ordered", dataChannelInit.ordered }, + { "protocol", dataChannelInit.protocol } + }; + /* clang-format on */ + + if (dataChannelInit.maxRetransmitTime.has_value()) + { + sctpStreamParameters["maxPacketLifeTime"] = dataChannelInit.maxRetransmitTime.value(); + } + + if (dataChannelInit.maxRetransmits.has_value()) + { + sctpStreamParameters["maxRetransmits"] = dataChannelInit.maxRetransmits.value(); + } + + // This will fill sctpStreamParameters's missing fields with default values. + ortc::validateSctpStreamParameters(sctpStreamParameters); + + rtc::scoped_refptr webrtcDataChannel = + this->pc->CreateDataChannel(label, &dataChannelInit); + + // Increase next id. + this->nextSendSctpStreamId = (this->nextSendSctpStreamId + 1) % SctpNumStreamsMis; + + // If this is the first DataChannel we need to create the SDP answer with + // m=application section. + if (!this->hasDataChannelMediaSection) + { + webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options; + std::string offer = this->pc->CreateOffer(options); + auto localSdpObject = sdptransform::parse(offer); + const Sdp::RemoteSdp::MediaSectionIdx mediaSectionIdx = + this->remoteSdp->GetNextMediaSectionIdx(); + + auto offerMediaObject = + find_if(localSdpObject["media"].begin(), localSdpObject["media"].end(), [](const json& m) { + return m.at("type").get() == "application"; + }); + + if (offerMediaObject == localSdpObject["media"].end()) + { + MSC_THROW_ERROR("Missing 'application' media section in SDP offer"); + } + + if (!this->transportReady) + { + this->SetupTransport("server", localSdpObject); + } + + MSC_DEBUG("calling pc.setLocalDescription() [offer:%s]", offer.c_str()); - sendData.localId = localId; - sendData.rtpSender = transceiver->sender(); - sendData.rtpParameters = sendingRtpParameters; + this->pc->SetLocalDescription(PeerConnection::SdpType::OFFER, offer); + this->remoteSdp->SendSctpAssociation(*offerMediaObject); + + auto sdpAnswer = this->remoteSdp->GetSdp(); + + MSC_DEBUG("calling pc.setRemoteDescription() [answer:%s]", sdpAnswer.c_str()); + + this->pc->SetRemoteDescription(PeerConnection::SdpType::ANSWER, sdpAnswer); + this->hasDataChannelMediaSection = true; + } - return sendData; + SendHandler::DataChannel dataChannel; + + dataChannel.localId = std::to_string(streamId); + dataChannel.dataChannel = webrtcDataChannel; + dataChannel.sctpStreamParameters = sctpStreamParameters; + + return dataChannel; } void SendHandler::StopSending(const std::string& localId) @@ -492,7 +579,7 @@ namespace mediasoupclient MSC_TRACE(); }; - RecvHandler::RecvData RecvHandler::Receive( + RecvHandler::RecvResult RecvHandler::Receive( const std::string& id, const std::string& kind, const json* rtpParameters) { MSC_TRACE(); @@ -508,7 +595,7 @@ namespace mediasoupclient else localId = std::to_string(this->mapMidTransceiver.size()); - auto& cname = (*rtpParameters)["rtcp"]["cname"]; + const auto& cname = (*rtpParameters)["rtcp"]["cname"]; this->remoteSdp->Receive(localId, kind, *rtpParameters, cname, id); @@ -559,13 +646,78 @@ namespace mediasoupclient // Store in the map. this->mapMidTransceiver[localId] = transceiver; - RecvData recvData; + RecvResult recvResult; + + recvResult.localId = localId; + recvResult.rtpReceiver = transceiver->receiver(); + recvResult.track = transceiver->receiver()->track(); + + return recvResult; + } + + Handler::DataChannel RecvHandler::ReceiveDataChannel( + const std::string& label, webrtc::DataChannelInit dataChannelInit) + { + MSC_TRACE(); + + uint16_t streamId = this->nextSendSctpStreamId; + + dataChannelInit.negotiated = true; + dataChannelInit.id = streamId; + + /* clang-format off */ + nlohmann::json sctpStreamParameters = + { + { "streamId", streamId }, + { "ordered", dataChannelInit.ordered } + }; + /* clang-format on */ + + // This will fill sctpStreamParameters's missing fields with default values. + ortc::validateSctpStreamParameters(sctpStreamParameters); + + rtc::scoped_refptr webrtcDataChannel = + this->pc->CreateDataChannel(label, &dataChannelInit); + + // Increase next id. + this->nextSendSctpStreamId = (this->nextSendSctpStreamId + 1) % SctpNumStreamsMis; + + // If this is the first DataChannel we need to create the SDP answer with + // m=application section. + if (!this->hasDataChannelMediaSection) + { + this->remoteSdp->RecvSctpAssociation(); + auto sdpOffer = this->remoteSdp->GetSdp(); + + MSC_DEBUG("calling pc->setRemoteDescription() [offer:%s]", sdpOffer.c_str()); + + // May throw. + this->pc->SetRemoteDescription(PeerConnection::SdpType::OFFER, sdpOffer); + + webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options; + auto sdpAnswer = this->pc->CreateAnswer(options); + + if (!this->transportReady) + { + auto localSdpObject = sdptransform::parse(sdpAnswer); + this->SetupTransport("client", localSdpObject); + } + + MSC_DEBUG("calling pc->setLocalDescription() [answer: %s]", sdpAnswer.c_str()); + + // May throw. + this->pc->SetLocalDescription(PeerConnection::SdpType::ANSWER, sdpAnswer); + + this->hasDataChannelMediaSection = true; + } + + RecvHandler::DataChannel dataChannel; - recvData.localId = localId; - recvData.rtpReceiver = transceiver->receiver(); - recvData.track = transceiver->receiver()->track(); + dataChannel.localId = std::to_string(streamId); + dataChannel.dataChannel = webrtcDataChannel; + dataChannel.sctpStreamParameters = sctpStreamParameters; - return recvData; + return dataChannel; } void RecvHandler::StopReceiving(const std::string& localId) diff --git a/src/PeerConnection.cpp b/src/PeerConnection.cpp index 26c5253f..e4ed2e27 100644 --- a/src/PeerConnection.cpp +++ b/src/PeerConnection.cpp @@ -366,6 +366,26 @@ namespace mediasoupclient return future.get(); } + rtc::scoped_refptr PeerConnection::CreateDataChannel( + const std::string& label, const webrtc::DataChannelInit* config) + { + MSC_TRACE(); + + rtc::scoped_refptr webrtcDataChannel = + this->pc->CreateDataChannel(label, config); + + if (webrtcDataChannel.get()) + { + MSC_DEBUG("Success creating data channel"); + } + else + { + MSC_THROW_ERROR("Failed creating data channel"); + } + + return webrtcDataChannel; + } + /* SetSessionDescriptionObserver */ std::future PeerConnection::SetSessionDescriptionObserver::GetFuture() diff --git a/src/Transport.cpp b/src/Transport.cpp index 8f742bc3..9a3e48f4 100644 --- a/src/Transport.cpp +++ b/src/Transport.cpp @@ -139,7 +139,8 @@ namespace mediasoupclient if (sctpParameters != nullptr && sctpParameters.is_object()) { - auto maxMessageSizeIt = sctpParameters.find("maxMessageSize"); + this->hasSctpParameters = true; + auto maxMessageSizeIt = sctpParameters.find("maxMessageSize"); if (maxMessageSizeIt->is_number_integer()) this->maxSctpMessageSize = maxMessageSizeIt->get(); @@ -156,7 +157,7 @@ namespace mediasoupclient }; this->sendHandler.reset(new SendHandler( - this, + dynamic_cast(this), iceParameters, iceCandidates, dtlsParameters, @@ -214,20 +215,20 @@ namespace mediasoupclient } // May throw. - auto sendData = this->sendHandler->Send(track, &normalizedEncodings, codecOptions); + auto sendResult = this->sendHandler->Send(track, &normalizedEncodings, codecOptions); try { // This will fill rtpParameters's missing fields with default values. - ortc::validateRtpParameters(sendData.rtpParameters); + ortc::validateRtpParameters(sendResult.rtpParameters); // May throw. producerId = - this->listener->OnProduce(this, track->kind(), sendData.rtpParameters, appData).get(); + this->listener->OnProduce(this, track->kind(), sendResult.rtpParameters, appData).get(); } catch (MediaSoupClientError& error) { - this->sendHandler->StopSending(sendData.localId); + this->sendHandler->StopSending(sendResult.localId); throw; } @@ -236,10 +237,10 @@ namespace mediasoupclient this, producerListener, producerId, - sendData.localId, - sendData.rtpSender, + sendResult.localId, + sendResult.rtpSender, track, - sendData.rtpParameters, + sendResult.rtpParameters, appData); this->producers[producer->GetId()] = producer; @@ -247,6 +248,55 @@ namespace mediasoupclient return producer; } + DataProducer* SendTransport::ProduceData( + DataProducer::Listener* dataProducerListener, + const std::string& label, + const std::string& protocol, + bool ordered, + int maxRetransmits, + int maxPacketLifeTime, + const nlohmann::json& appData) + { + MSC_TRACE(); + + if (!this->hasSctpParameters) + MSC_THROW_ERROR("SctpParameters are mandatory when using data producer listener"); + + webrtc::DataChannelInit dataChannelInit; + dataChannelInit.protocol = protocol; + dataChannelInit.ordered = ordered; + if (maxRetransmits != -1 && maxPacketLifeTime != 0) + { + MSC_THROW_ERROR("Cannot set both maxRetransmits and maxPacketLifeTime"); + } + if (maxRetransmits != 0) + { + dataChannelInit.maxRetransmits = maxRetransmits; + } + if (maxPacketLifeTime != 0) + { + dataChannelInit.maxRetransmitTime = maxPacketLifeTime; + } + + // This may throw. + auto sendResult = this->sendHandler->SendDataChannel(label, dataChannelInit); + + auto dataChannelId = + this->listener->OnProduceData(this, sendResult.sctpStreamParameters, label, protocol, appData); + + auto dataProducer = new DataProducer( + this, + dataProducerListener, + dataChannelId.get(), + sendResult.dataChannel, + sendResult.sctpStreamParameters, + appData); + + this->dataProducers[dataProducer->GetId()] = dataProducer; + + return dataProducer; + } + void SendTransport::Close() { MSC_TRACE(); @@ -263,6 +313,14 @@ namespace mediasoupclient producer->TransportClosed(); } + + // Close all Data Producers. + for (auto& kv : this->dataProducers) + { + auto* dataProducer = kv.second; + + dataProducer->TransportClosed(); + } } void SendTransport::OnClose(Producer* producer) @@ -278,6 +336,13 @@ namespace mediasoupclient this->sendHandler->StopSending(producer->GetLocalId()); } + void SendTransport::OnClose(DataProducer* dataProducer) + { + MSC_TRACE(); + + this->dataProducers.erase(dataProducer->GetId()); + } + void SendTransport::OnReplaceTrack(const Producer* producer, webrtc::MediaStreamTrackInterface* track) { MSC_TRACE(); @@ -318,8 +383,15 @@ namespace mediasoupclient { MSC_TRACE(); + this->hasSctpParameters = sctpParameters != nullptr && sctpParameters.is_object(); + this->recvHandler.reset(new RecvHandler( - this, iceParameters, iceCandidates, dtlsParameters, sctpParameters, peerConnectionOptions)); + dynamic_cast(this), + iceParameters, + iceCandidates, + dtlsParameters, + sctpParameters, + peerConnectionOptions)); Transport::SetHandler(this->recvHandler.get()); } @@ -353,16 +425,16 @@ namespace mediasoupclient MSC_THROW_UNSUPPORTED_ERROR("cannot consume this Producer"); // May throw. - auto recvData = this->recvHandler->Receive(id, kind, rtpParameters); + auto recvResult = this->recvHandler->Receive(id, kind, rtpParameters); auto* consumer = new Consumer( this, consumerListener, id, - recvData.localId, + recvResult.localId, producerId, - recvData.rtpReceiver, - recvData.track, + recvResult.rtpReceiver, + recvResult.track, *rtpParameters, appData); @@ -394,6 +466,42 @@ namespace mediasoupclient return consumer; } + /** + * Create a DataConsumer. + */ + DataConsumer* RecvTransport::ConsumeData( + DataConsumer::Listener* listener, + const std::string& id, + const std::string& producerId, + const std::string& label, + const std::string& protocol, + const nlohmann::json& appData) + { + MSC_TRACE(); + + webrtc::DataChannelInit dataChannelInit; + dataChannelInit.protocol = protocol; + + if (this->closed) + MSC_THROW_INVALID_STATE_ERROR("RecvTransport closed"); + else if (id.empty()) + MSC_THROW_TYPE_ERROR("missing id"); + else if (producerId.empty()) + MSC_THROW_TYPE_ERROR("missing producerId"); + else if (!this->hasSctpParameters) + MSC_THROW_TYPE_ERROR("Cannot use DataChannels with this transport. SctpParameters are not set."); + + // This may throw. + auto recvResult = this->recvHandler->ReceiveDataChannel(label, dataChannelInit); + + auto dataConsumer = new DataConsumer( + listener, this, id, producerId, recvResult.dataChannel, recvResult.sctpStreamParameters, appData); + + this->dataConsumers[dataConsumer->GetId()] = dataConsumer; + + return dataConsumer; + } + void RecvTransport::Close() { MSC_TRACE(); @@ -403,13 +511,21 @@ namespace mediasoupclient Transport::Close(); - // Close all Producers. + // Close all Consumers. for (auto& kv : this->consumers) { auto* consumer = kv.second; consumer->TransportClosed(); } + + // Close all DataConsumers. + for (auto& kv : this->dataConsumers) + { + auto* dataConsumer = kv.second; + + dataConsumer->TransportClosed(); + } } void RecvTransport::OnClose(Consumer* consumer) @@ -425,6 +541,13 @@ namespace mediasoupclient this->recvHandler->StopReceiving(consumer->GetLocalId()); } + void RecvTransport::OnClose(DataConsumer* dataConsumer) + { + MSC_TRACE(); + + this->dataConsumers.erase(dataConsumer->GetId()); + } + json RecvTransport::OnGetStats(const Consumer* consumer) { MSC_TRACE(); diff --git a/src/ortc.cpp b/src/ortc.cpp index 2e70d012..ce4cf3a5 100644 --- a/src/ortc.cpp +++ b/src/ortc.cpp @@ -641,7 +641,6 @@ namespace mediasoupclient auto orderedIt = params.find("ordered"); auto maxPacketLifeTimeIt = params.find("maxPacketLifeTime"); auto maxRetransmitsIt = params.find("maxRetransmits"); - auto priorityIt = params.find("priority"); auto labelIt = params.find("label"); auto protocolIt = params.find("protocol"); @@ -694,10 +693,6 @@ namespace mediasoupclient params["ordered"] = false; } - // priority is optional. If unset set it to empty string. - if (priorityIt == params.end() || !priorityIt->is_string()) - params["priority"] = ""; - // label is optional. If unset set it to empty string. if (labelIt == params.end() || !labelIt->is_string()) params["label"] = ""; diff --git a/src/sdp/RemoteSdp.cpp b/src/sdp/RemoteSdp.cpp index cf166608..8154a52a 100644 --- a/src/sdp/RemoteSdp.cpp +++ b/src/sdp/RemoteSdp.cpp @@ -163,6 +163,39 @@ namespace mediasoupclient } } + void Sdp::RemoteSdp::SendSctpAssociation(json& offerMediaObject) + { + nlohmann::json emptyJson; + auto* mediaSection = new AnswerMediaSection( + this->iceParameters, + this->iceCandidates, + this->dtlsParameters, + this->sctpParameters, + offerMediaObject, + emptyJson, + emptyJson, + nullptr); + + this->AddMediaSection(mediaSection); + } + + void Sdp::RemoteSdp::RecvSctpAssociation() + { + nlohmann::json emptyJson; + auto* mediaSection = new OfferMediaSection( + this->iceParameters, + this->iceCandidates, + this->dtlsParameters, + this->sctpParameters, + "datachannel", // mid + "application", // kind + emptyJson, // offerRtpParameters + "", // streamId + "" // trackId + ); + this->AddMediaSection(mediaSection); + } + void Sdp::RemoteSdp::Receive( const std::string& mid, const std::string& kind, diff --git a/test/include/FakeTransportListener.hpp b/test/include/FakeTransportListener.hpp index d6150201..15699043 100644 --- a/test/include/FakeTransportListener.hpp +++ b/test/include/FakeTransportListener.hpp @@ -44,6 +44,27 @@ class FakeSendTransportListener : public mediasoupclient::SendTransport::Listene return promise.get_future(); }; + std::future OnProduceData( + mediasoupclient::SendTransport* /*transport*/, + const nlohmann::json& /*sctpStreamParameters*/, + const std::string& /*label*/, + const std::string& /*protocol*/, + const nlohmann::json& appData) override + { + this->onProduceDataTimesCalled++; + + std::promise promise; + + this->appData = appData; + + // this->audioProducerLocalParameters = rtpParameters; + this->dataProducerId = generateProducerRemoteId(); + + promise.set_value(this->dataProducerId); + + return promise.get_future(); + } + std::future OnConnect(mediasoupclient::Transport* transport, const json& dtlsParameters) override { this->onConnectTimesCalled++; @@ -72,6 +93,8 @@ class FakeSendTransportListener : public mediasoupclient::SendTransport::Listene json videoProducerLocalParameters; std::string videoProducerId; json appData; + std::string dataProducerId; + std::string dataConsumerId; size_t onProduceTimesCalled{ 0 }; size_t onConnectTimesCalled{ 0 }; @@ -80,6 +103,9 @@ class FakeSendTransportListener : public mediasoupclient::SendTransport::Listene size_t onProduceExpectedTimesCalled{ 0 }; size_t onConnectExpectedTimesCalled{ 0 }; size_t onConnectionStateChangeExpectedTimesCalled{ 0 }; + + size_t onProduceDataTimesCalled{ 0 }; + size_t onProduceDataExpectedTimesCalled{ 0 }; }; class FakeRecvTransportListener : public mediasoupclient::RecvTransport::Listener @@ -115,9 +141,19 @@ class FakeRecvTransportListener : public mediasoupclient::RecvTransport::Listene size_t onConnectionStateChangeExpectedTimesCalled{ 0 }; }; -class FakeProducerListener : public mediasoupclient::Producer::Listener +class FakeProducerListener : public mediasoupclient::Producer::Listener, + public mediasoupclient::DataProducer::Listener { public: + void OnOpen(mediasoupclient::DataProducer* dataProducer) override + { + } + void OnClose(mediasoupclient::DataProducer* dataProducer) override{}; + void OnBufferedAmountChange( + mediasoupclient::DataProducer* dataProducer, uint64_t sent_data_size) override{}; + + void OnTransportClose(mediasoupclient::DataProducer* /*dataProducer*/) override{}; + void OnTransportClose(mediasoupclient::Producer* /*producer*/) override { this->onTransportCloseTimesCalled++; diff --git a/test/src/Handler.test.cpp b/test/src/Handler.test.cpp index 917f4a30..cec05a9f 100644 --- a/test/src/Handler.test.cpp +++ b/test/src/Handler.test.cpp @@ -64,14 +64,14 @@ TEST_CASE("SendHandler", "[Handler][SendHandler]") { track = createAudioTrack("test-track-id"); - mediasoupclient::SendHandler::SendData sendData; + mediasoupclient::SendHandler::SendResult sendResult; - REQUIRE_NOTHROW(sendData = sendHandler.Send(track, nullptr, nullptr)); + REQUIRE_NOTHROW(sendResult = sendHandler.Send(track, nullptr, nullptr)); - localId = sendData.localId; + localId = sendResult.localId; - REQUIRE(sendData.rtpParameters["codecs"].size() == 1); - REQUIRE(sendData.rtpParameters["headerExtensions"].size() == 3); + REQUIRE(sendResult.rtpParameters["codecs"].size() == 1); + REQUIRE(sendResult.rtpParameters["headerExtensions"].size() == 3); } SECTION("sendHandler.Send() succeeds if track is already handled") @@ -120,11 +120,11 @@ TEST_CASE("SendHandler", "[Handler][SendHandler]") SECTION("sendHandler.Sends() succeeds after stopping if track if provided") { - mediasoupclient::SendHandler::SendData sendData; + mediasoupclient::SendHandler::SendResult sendResult; - REQUIRE_NOTHROW(sendData = sendHandler.Send(track, nullptr, nullptr)); + REQUIRE_NOTHROW(sendResult = sendHandler.Send(track, nullptr, nullptr)); - localId = sendData.localId; + localId = sendResult.localId; } SECTION("sendHandler.StopSending() succeeds if track is being sent") @@ -167,11 +167,11 @@ TEST_CASE("RecvHandler", "[Handler][RecvHandler]") SECTION("recvHander.Receive() succeeds if correct rtpParameters are provided") { - mediasoupclient::RecvHandler::RecvData recvData; + mediasoupclient::RecvHandler::RecvResult recvResult; - REQUIRE_NOTHROW(recvData = recvHandler.Receive("test", "audio", &rtpParameters)); + REQUIRE_NOTHROW(recvResult = recvHandler.Receive("test", "audio", &rtpParameters)); - localId = recvData.localId; + localId = recvResult.localId; } SECTION("recvHandler.GetReceiverStats() fails if unknown receiver id is provided") diff --git a/test/src/fakeParameters.cpp b/test/src/fakeParameters.cpp index 03c01f06..cd91bb11 100644 --- a/test/src/fakeParameters.cpp +++ b/test/src/fakeParameters.cpp @@ -270,9 +270,11 @@ json generateTransportRemoteParameters() }, "sctpParameters" : { - "port" : 5000, - "numStreams" : 2048, - "maxMessageSize" : 2000000 + "port" : 5000, + "OS" : 1024, + "MIS" : 1024, + "numStreams" : 2048, + "maxMessageSize" : 2000000 } })"_json; diff --git a/test/src/mediasoupclient.test.cpp b/test/src/mediasoupclient.test.cpp index 7e28aa80..dee9477f 100644 --- a/test/src/mediasoupclient.test.cpp +++ b/test/src/mediasoupclient.test.cpp @@ -15,6 +15,7 @@ TEST_CASE("mediasoupclient", "[mediasoupclient]") static std::unique_ptr device; static std::unique_ptr sendTransport; + static std::unique_ptr sendTransportNoSctp; static std::unique_ptr recvTransport; static std::unique_ptr audioProducer; static std::unique_ptr videoProducer; @@ -22,6 +23,9 @@ TEST_CASE("mediasoupclient", "[mediasoupclient]") static std::unique_ptr videoConsumer; static std::unique_ptr audioConsumer2; + static std::unique_ptr dataProducer; + static std::unique_ptr dataConsumer; + static rtc::scoped_refptr audioTrack; static rtc::scoped_refptr videoTrack; @@ -127,6 +131,7 @@ TEST_CASE("mediasoupclient", "[mediasoupclient]") TransportRemoteParameters["iceParameters"], TransportRemoteParameters["iceCandidates"], TransportRemoteParameters["dtlsParameters"], + TransportRemoteParameters["sctpParameters"], nullptr, appData))); @@ -270,12 +275,62 @@ TEST_CASE("mediasoupclient", "[mediasoupclient]") REQUIRE(videoProducer->GetAppData() == json::object()); } + SECTION("transport.produceData() succeeds") + { + /* clang-format off */ + json appData = + { + { "tdr", "TDR" } + }; + /* clang-format on */ + + REQUIRE_NOTHROW(dataProducer.reset( + sendTransport->ProduceData(&producerListener, "", "", true, 0, 0, appData))); + + REQUIRE( + sendTransportListener.onConnectTimesCalled == + sendTransportListener.onConnectExpectedTimesCalled); // connect has already been called for Producer + + REQUIRE(sendTransportListener.id == sendTransport->GetId()); + + REQUIRE( + sendTransportListener.onProduceDataExpectedTimesCalled == + ++sendTransportListener.onProduceDataExpectedTimesCalled); + + REQUIRE(dataProducer->GetId() == sendTransportListener.dataProducerId); + REQUIRE(!dataProducer->IsClosed()); + REQUIRE(dataProducer->GetAppData() == appData); + } + SECTION("transport.produce() without track throws") { REQUIRE_THROWS_AS( sendTransport->Produce(&producerListener, nullptr, nullptr, nullptr), MediaSoupClientTypeError); } + SECTION("transport.produceData() on transport without sctpParameters throws") + { + /* clang-format off */ + json appData = + { + { "tdr", "TDR" } + }; + /* clang-format on */ + + REQUIRE_NOTHROW(sendTransportNoSctp.reset(device->CreateSendTransport( + &sendTransportListener, + TransportRemoteParameters["id"], + TransportRemoteParameters["iceParameters"], + TransportRemoteParameters["iceCandidates"], + TransportRemoteParameters["dtlsParameters"], + nullptr, + appData))); + + REQUIRE_THROWS_AS( + sendTransportNoSctp->ProduceData(&producerListener, "", "", true, 0, 0, appData), + MediaSoupClientError); + } + SECTION("transport.consume() succeeds") { /* clang-format off */