From 1bb4949d41c52b7e88ad855d407dc1b132d6b6d8 Mon Sep 17 00:00:00 2001 From: Alex Chronopoulos Date: Fri, 20 Dec 2019 17:03:42 +0000 Subject: [PATCH] Bug 1604746 - Start the pipeline after stopping to avoid thread races. r=padenot With tracks in different MTGs we risk of having thread races if we stop and start immediatelly. Previously, stoping and starting was happenign to the same MTG so this problem did not exist. This change wires a promise in the `MediaTrack::RemoveListener` method and perform the start when that step has been completed. Differential Revision: https://phabricator.services.mozilla.com/D57947 --HG-- extra : moz-landing-system : lando --- dom/media/MediaTrackGraph.cpp | 15 ++++++-- dom/media/MediaTrackGraph.h | 2 +- .../gtest/mediapipeline_unittest.cpp | 4 ++- .../src/mediapipeline/MediaPipeline.cpp | 34 +++++++++++++------ .../src/mediapipeline/MediaPipeline.h | 8 ++--- 5 files changed, 44 insertions(+), 19 deletions(-) diff --git a/dom/media/MediaTrackGraph.cpp b/dom/media/MediaTrackGraph.cpp index 0f153d22a08a..b4511f83cc8b 100644 --- a/dom/media/MediaTrackGraph.cpp +++ b/dom/media/MediaTrackGraph.cpp @@ -2118,20 +2118,29 @@ void MediaTrack::RemoveListenerImpl(MediaTrackListener* aListener) { } } -void MediaTrack::RemoveListener(MediaTrackListener* aListener) { +RefPtr MediaTrack::RemoveListener( + MediaTrackListener* aListener) { class Message : public ControlMessage { public: Message(MediaTrack* aTrack, MediaTrackListener* aListener) : ControlMessage(aTrack), mListener(aListener) {} - void Run() override { mTrack->RemoveListenerImpl(mListener); } + void Run() override { + mTrack->RemoveListenerImpl(mListener); + mRemovedPromise.Resolve(true, __func__); + } void RunDuringShutdown() override { // During shutdown we still want the listener's NotifyRemoved to be // called, since not doing that might block shutdown of other modules. Run(); } RefPtr mListener; + MozPromiseHolder mRemovedPromise; }; - GraphImpl()->AppendMessage(MakeUnique(this, aListener)); + + UniquePtr message = MakeUnique(this, aListener); + RefPtr p = message->mRemovedPromise.Ensure(__func__); + GraphImpl()->AppendMessage(std::move(message)); + return p; } void MediaTrack::AddDirectListenerImpl( diff --git a/dom/media/MediaTrackGraph.h b/dom/media/MediaTrackGraph.h index dd5c2d1dfa0d..7d16ba723a91 100644 --- a/dom/media/MediaTrackGraph.h +++ b/dom/media/MediaTrackGraph.h @@ -306,7 +306,7 @@ class MediaTrack : public mozilla::LinkedListElement { virtual void Resume(); // Events will be dispatched by calling methods of aListener. virtual void AddListener(MediaTrackListener* aListener); - virtual void RemoveListener(MediaTrackListener* aListener); + virtual RefPtr RemoveListener(MediaTrackListener* aListener); /** * Adds aListener to the source track of this track. diff --git a/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp b/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp index 704917259bb5..1f7572735297 100644 --- a/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp +++ b/media/webrtc/signaling/gtest/mediapipeline_unittest.cpp @@ -81,10 +81,12 @@ class FakeAudioTrack : public mozilla::ProcessedMediaTrack { mListener = aListener; } - void RemoveListener(MediaTrackListener* aListener) override { + RefPtr RemoveListener( + MediaTrackListener* aListener) override { mozilla::MutexAutoLock lock(mMutex); MOZ_ASSERT(mListener == aListener); mListener = nullptr; + return GenericPromise::CreateAndResolve(true, __func__); } void ProcessInput(GraphTime aFrom, GraphTime aTo, uint32_t aFlags) override {} diff --git a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp index 104a7a33b655..927c5d77053e 100644 --- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp +++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp @@ -867,15 +867,15 @@ void MediaPipelineTransmit::SetDescription() { NS_DISPATCH_NORMAL); } -void MediaPipelineTransmit::Stop() { +RefPtr MediaPipelineTransmit::Stop() { ASSERT_ON_THREAD(mMainThread); if (!mTransmitting) { - return; + return GenericPromise::CreateAndResolve(true, __func__); } if (!mSendTrack) { - return; + return GenericPromise::CreateAndResolve(true, __func__); } mTransmitting = false; @@ -885,7 +885,7 @@ void MediaPipelineTransmit::Stop() { if (mSendTrack->mType == MediaSegment::VIDEO) { mSendTrack->RemoveDirectListener(mListener); } - mSendTrack->RemoveListener(mListener); + return mSendTrack->RemoveListener(mListener); } bool MediaPipelineTransmit::Transmitting() const { @@ -1007,8 +1007,23 @@ nsresult MediaPipelineTransmit::SetTrack(RefPtr aDomTrack) { if (aDomTrack && mDomTrack && !aDomTrack->Ended() && !mDomTrack->Ended() && aDomTrack->Graph() != mDomTrack->Graph() && mSendTrack) { // Recreate the send track if the new stream resides in different MTG. + // Stopping and re-starting will result in removing and re-adding the + // listener BUT in different threads, since tracks belong to different MTGs. + // This can create tread races so we wait here for the stop to happen + // before re-starting. Please note that start should happen at the end of + // the method after the mSendTrack replace bellow. Since we dispatch the + // result of the promise to the next event of the same thread, it is + // guarantee the start will be executed after this method has finished. wasTransmitting = mTransmitting; - Stop(); + RefPtr self = this; + Stop()->Then( + GetMainThreadSerialEventTarget(), __func__, + [wasTransmitting, self](bool) { + if (wasTransmitting) { + self->Start(); + } + }, + [](nsresult aRv) { MOZ_CRASH("Never get here!"); }); mSendTrack->Destroy(); mSendTrack = nullptr; } @@ -1031,9 +1046,6 @@ nsresult MediaPipelineTransmit::SetTrack(RefPtr aDomTrack) { mConverter->SetTrackEnabled(mDomTrack->Enabled()); } } - if (wasTransmitting) { - Start(); - } return NS_OK; } @@ -1543,11 +1555,12 @@ void MediaPipelineReceiveAudio::Start() { } } -void MediaPipelineReceiveAudio::Stop() { +RefPtr MediaPipelineReceiveAudio::Stop() { if (mListener) { mListener->RemoveSelf(); } mConduit->StopReceiving(); + return GenericPromise::CreateAndResolve(true, __func__); } void MediaPipelineReceiveAudio::OnRtpPacketReceived() { @@ -1718,11 +1731,12 @@ void MediaPipelineReceiveVideo::Start() { } } -void MediaPipelineReceiveVideo::Stop() { +RefPtr MediaPipelineReceiveVideo::Stop() { if (mListener) { mListener->RemoveSelf(); } mConduit->StopReceiving(); + return GenericPromise::CreateAndResolve(true, __func__); } void MediaPipelineReceiveVideo::OnRtpPacketReceived() { diff --git a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h index ed7b757aab8e..e8dc6938dc2b 100644 --- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h +++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h @@ -88,7 +88,7 @@ class MediaPipeline : public sigslot::has_slots<> { RefPtr aConduit); virtual void Start() = 0; - virtual void Stop() = 0; + virtual RefPtr Stop() = 0; virtual void DetachMedia() {} void SetLevel(size_t aLevel) { mLevel = aLevel; } @@ -282,7 +282,7 @@ class MediaPipelineTransmit : public MediaPipeline { bool Transmitting() const; void Start() override; - void Stop() override; + RefPtr Stop() override; // written and used from MainThread bool IsVideo() const override; @@ -368,7 +368,7 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive { void MakePrincipalPrivate_s() override; void Start() override; - void Stop() override; + RefPtr Stop() override; void OnRtpPacketReceived() override; @@ -399,7 +399,7 @@ class MediaPipelineReceiveVideo : public MediaPipelineReceive { void MakePrincipalPrivate_s() override; void Start() override; - void Stop() override; + RefPtr Stop() override; void OnRtpPacketReceived() override;