forked from mirrors/gecko-dev
Bug 1604746 - Start the pipeline after stopping to avoid thread races. r=padenot
With tracks in different MTGs we risk of having thread races if we stop and start immediatelly. Previously, stoping and starting was happenign to the same MTG so this problem did not exist. This change wires a promise in the `MediaTrack::RemoveListener` method and perform the start when that step has been completed. Differential Revision: https://phabricator.services.mozilla.com/D57947 --HG-- extra : moz-landing-system : lando
This commit is contained in:
parent
59fb3dc5da
commit
1bb4949d41
5 changed files with 44 additions and 19 deletions
|
|
@ -2118,20 +2118,29 @@ void MediaTrack::RemoveListenerImpl(MediaTrackListener* aListener) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaTrack::RemoveListener(MediaTrackListener* aListener) {
|
RefPtr<GenericPromise> MediaTrack::RemoveListener(
|
||||||
|
MediaTrackListener* aListener) {
|
||||||
class Message : public ControlMessage {
|
class Message : public ControlMessage {
|
||||||
public:
|
public:
|
||||||
Message(MediaTrack* aTrack, MediaTrackListener* aListener)
|
Message(MediaTrack* aTrack, MediaTrackListener* aListener)
|
||||||
: ControlMessage(aTrack), mListener(aListener) {}
|
: ControlMessage(aTrack), mListener(aListener) {}
|
||||||
void Run() override { mTrack->RemoveListenerImpl(mListener); }
|
void Run() override {
|
||||||
|
mTrack->RemoveListenerImpl(mListener);
|
||||||
|
mRemovedPromise.Resolve(true, __func__);
|
||||||
|
}
|
||||||
void RunDuringShutdown() override {
|
void RunDuringShutdown() override {
|
||||||
// During shutdown we still want the listener's NotifyRemoved to be
|
// During shutdown we still want the listener's NotifyRemoved to be
|
||||||
// called, since not doing that might block shutdown of other modules.
|
// called, since not doing that might block shutdown of other modules.
|
||||||
Run();
|
Run();
|
||||||
}
|
}
|
||||||
RefPtr<MediaTrackListener> mListener;
|
RefPtr<MediaTrackListener> mListener;
|
||||||
|
MozPromiseHolder<GenericPromise> mRemovedPromise;
|
||||||
};
|
};
|
||||||
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener));
|
|
||||||
|
UniquePtr<Message> message = MakeUnique<Message>(this, aListener);
|
||||||
|
RefPtr<GenericPromise> p = message->mRemovedPromise.Ensure(__func__);
|
||||||
|
GraphImpl()->AppendMessage(std::move(message));
|
||||||
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaTrack::AddDirectListenerImpl(
|
void MediaTrack::AddDirectListenerImpl(
|
||||||
|
|
|
||||||
|
|
@ -306,7 +306,7 @@ class MediaTrack : public mozilla::LinkedListElement<MediaTrack> {
|
||||||
virtual void Resume();
|
virtual void Resume();
|
||||||
// Events will be dispatched by calling methods of aListener.
|
// Events will be dispatched by calling methods of aListener.
|
||||||
virtual void AddListener(MediaTrackListener* aListener);
|
virtual void AddListener(MediaTrackListener* aListener);
|
||||||
virtual void RemoveListener(MediaTrackListener* aListener);
|
virtual RefPtr<GenericPromise> RemoveListener(MediaTrackListener* aListener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds aListener to the source track of this track.
|
* Adds aListener to the source track of this track.
|
||||||
|
|
|
||||||
|
|
@ -81,10 +81,12 @@ class FakeAudioTrack : public mozilla::ProcessedMediaTrack {
|
||||||
mListener = aListener;
|
mListener = aListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoveListener(MediaTrackListener* aListener) override {
|
RefPtr<GenericPromise> RemoveListener(
|
||||||
|
MediaTrackListener* aListener) override {
|
||||||
mozilla::MutexAutoLock lock(mMutex);
|
mozilla::MutexAutoLock lock(mMutex);
|
||||||
MOZ_ASSERT(mListener == aListener);
|
MOZ_ASSERT(mListener == aListener);
|
||||||
mListener = nullptr;
|
mListener = nullptr;
|
||||||
|
return GenericPromise::CreateAndResolve(true, __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ProcessInput(GraphTime aFrom, GraphTime aTo, uint32_t aFlags) override {}
|
void ProcessInput(GraphTime aFrom, GraphTime aTo, uint32_t aFlags) override {}
|
||||||
|
|
|
||||||
|
|
@ -867,15 +867,15 @@ void MediaPipelineTransmit::SetDescription() {
|
||||||
NS_DISPATCH_NORMAL);
|
NS_DISPATCH_NORMAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaPipelineTransmit::Stop() {
|
RefPtr<GenericPromise> MediaPipelineTransmit::Stop() {
|
||||||
ASSERT_ON_THREAD(mMainThread);
|
ASSERT_ON_THREAD(mMainThread);
|
||||||
|
|
||||||
if (!mTransmitting) {
|
if (!mTransmitting) {
|
||||||
return;
|
return GenericPromise::CreateAndResolve(true, __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mSendTrack) {
|
if (!mSendTrack) {
|
||||||
return;
|
return GenericPromise::CreateAndResolve(true, __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
mTransmitting = false;
|
mTransmitting = false;
|
||||||
|
|
@ -885,7 +885,7 @@ void MediaPipelineTransmit::Stop() {
|
||||||
if (mSendTrack->mType == MediaSegment::VIDEO) {
|
if (mSendTrack->mType == MediaSegment::VIDEO) {
|
||||||
mSendTrack->RemoveDirectListener(mListener);
|
mSendTrack->RemoveDirectListener(mListener);
|
||||||
}
|
}
|
||||||
mSendTrack->RemoveListener(mListener);
|
return mSendTrack->RemoveListener(mListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MediaPipelineTransmit::Transmitting() const {
|
bool MediaPipelineTransmit::Transmitting() const {
|
||||||
|
|
@ -1007,8 +1007,23 @@ nsresult MediaPipelineTransmit::SetTrack(RefPtr<MediaStreamTrack> aDomTrack) {
|
||||||
if (aDomTrack && mDomTrack && !aDomTrack->Ended() && !mDomTrack->Ended() &&
|
if (aDomTrack && mDomTrack && !aDomTrack->Ended() && !mDomTrack->Ended() &&
|
||||||
aDomTrack->Graph() != mDomTrack->Graph() && mSendTrack) {
|
aDomTrack->Graph() != mDomTrack->Graph() && mSendTrack) {
|
||||||
// Recreate the send track if the new stream resides in different MTG.
|
// Recreate the send track if the new stream resides in different MTG.
|
||||||
|
// Stopping and re-starting will result in removing and re-adding the
|
||||||
|
// listener BUT in different threads, since tracks belong to different MTGs.
|
||||||
|
// This can create tread races so we wait here for the stop to happen
|
||||||
|
// before re-starting. Please note that start should happen at the end of
|
||||||
|
// the method after the mSendTrack replace bellow. Since we dispatch the
|
||||||
|
// result of the promise to the next event of the same thread, it is
|
||||||
|
// guarantee the start will be executed after this method has finished.
|
||||||
wasTransmitting = mTransmitting;
|
wasTransmitting = mTransmitting;
|
||||||
Stop();
|
RefPtr<MediaPipelineTransmit> self = this;
|
||||||
|
Stop()->Then(
|
||||||
|
GetMainThreadSerialEventTarget(), __func__,
|
||||||
|
[wasTransmitting, self](bool) {
|
||||||
|
if (wasTransmitting) {
|
||||||
|
self->Start();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
[](nsresult aRv) { MOZ_CRASH("Never get here!"); });
|
||||||
mSendTrack->Destroy();
|
mSendTrack->Destroy();
|
||||||
mSendTrack = nullptr;
|
mSendTrack = nullptr;
|
||||||
}
|
}
|
||||||
|
|
@ -1031,9 +1046,6 @@ nsresult MediaPipelineTransmit::SetTrack(RefPtr<MediaStreamTrack> aDomTrack) {
|
||||||
mConverter->SetTrackEnabled(mDomTrack->Enabled());
|
mConverter->SetTrackEnabled(mDomTrack->Enabled());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (wasTransmitting) {
|
|
||||||
Start();
|
|
||||||
}
|
|
||||||
|
|
||||||
return NS_OK;
|
return NS_OK;
|
||||||
}
|
}
|
||||||
|
|
@ -1543,11 +1555,12 @@ void MediaPipelineReceiveAudio::Start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaPipelineReceiveAudio::Stop() {
|
RefPtr<GenericPromise> MediaPipelineReceiveAudio::Stop() {
|
||||||
if (mListener) {
|
if (mListener) {
|
||||||
mListener->RemoveSelf();
|
mListener->RemoveSelf();
|
||||||
}
|
}
|
||||||
mConduit->StopReceiving();
|
mConduit->StopReceiving();
|
||||||
|
return GenericPromise::CreateAndResolve(true, __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
|
void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
|
||||||
|
|
@ -1718,11 +1731,12 @@ void MediaPipelineReceiveVideo::Start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaPipelineReceiveVideo::Stop() {
|
RefPtr<GenericPromise> MediaPipelineReceiveVideo::Stop() {
|
||||||
if (mListener) {
|
if (mListener) {
|
||||||
mListener->RemoveSelf();
|
mListener->RemoveSelf();
|
||||||
}
|
}
|
||||||
mConduit->StopReceiving();
|
mConduit->StopReceiving();
|
||||||
|
return GenericPromise::CreateAndResolve(true, __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
|
void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ class MediaPipeline : public sigslot::has_slots<> {
|
||||||
RefPtr<MediaSessionConduit> aConduit);
|
RefPtr<MediaSessionConduit> aConduit);
|
||||||
|
|
||||||
virtual void Start() = 0;
|
virtual void Start() = 0;
|
||||||
virtual void Stop() = 0;
|
virtual RefPtr<GenericPromise> Stop() = 0;
|
||||||
virtual void DetachMedia() {}
|
virtual void DetachMedia() {}
|
||||||
|
|
||||||
void SetLevel(size_t aLevel) { mLevel = aLevel; }
|
void SetLevel(size_t aLevel) { mLevel = aLevel; }
|
||||||
|
|
@ -282,7 +282,7 @@ class MediaPipelineTransmit : public MediaPipeline {
|
||||||
bool Transmitting() const;
|
bool Transmitting() const;
|
||||||
|
|
||||||
void Start() override;
|
void Start() override;
|
||||||
void Stop() override;
|
RefPtr<GenericPromise> Stop() override;
|
||||||
|
|
||||||
// written and used from MainThread
|
// written and used from MainThread
|
||||||
bool IsVideo() const override;
|
bool IsVideo() const override;
|
||||||
|
|
@ -368,7 +368,7 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive {
|
||||||
void MakePrincipalPrivate_s() override;
|
void MakePrincipalPrivate_s() override;
|
||||||
|
|
||||||
void Start() override;
|
void Start() override;
|
||||||
void Stop() override;
|
RefPtr<GenericPromise> Stop() override;
|
||||||
|
|
||||||
void OnRtpPacketReceived() override;
|
void OnRtpPacketReceived() override;
|
||||||
|
|
||||||
|
|
@ -399,7 +399,7 @@ class MediaPipelineReceiveVideo : public MediaPipelineReceive {
|
||||||
void MakePrincipalPrivate_s() override;
|
void MakePrincipalPrivate_s() override;
|
||||||
|
|
||||||
void Start() override;
|
void Start() override;
|
||||||
void Stop() override;
|
RefPtr<GenericPromise> Stop() override;
|
||||||
|
|
||||||
void OnRtpPacketReceived() override;
|
void OnRtpPacketReceived() override;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue