Bug 1552607 - p2: check sample session ID when processing buffers. r=jya

Because IPC call runs asynchronously in both remote decoder process and
content process, ProcessOutput() for buffers prior to Flush() could be
scheduled to run after the flush promise is resolved, and Codec.queueInput()
could be preempted and processes prior sample after flush.
To help check the validness of buffers, a session ID increased by flush
is added to both RemoteDataDecoder and remote codec service and will be
passed through IPC. If the passed ID doesn't agree with current session
ID, it means the buffer doesn't belong to current session and should be
discard.

Differential Revision: https://phabricator.services.mozilla.com/D36382

--HG--
extra : moz-landing-system : lando
This commit is contained in:
John Lin 2019-07-02 18:12:44 +00:00
parent a06092b96c
commit a987fc3b69
6 changed files with 35 additions and 14 deletions

View file

@ -437,7 +437,7 @@ class RemoteAudioDecoder : public RemoteDataDecoder {
return mFirstDemuxedSampleTime->ToMicroseconds() > aTime;
}
bool ShouldDiscardSample() const {
bool ShouldDiscardSample(int64_t aSession) const {
AssertOnTaskQueue();
// HandleOutput() runs on Android binder thread pool and could be preempted
// by RemoteDateDecoder task queue. That means ProcessOutput() could be
@ -445,7 +445,8 @@ class RemoteAudioDecoder : public RemoteDataDecoder {
// sample which is returned after calling Shutdown() and Flush(). We can
// check mFirstDemuxedSampleTime to know whether the Flush() has been
// called, becasue it would be reset in Flush().
return GetState() == State::SHUTDOWN || !mFirstDemuxedSampleTime;
return GetState() == State::SHUTDOWN || !mFirstDemuxedSampleTime ||
mSession != aSession;
}
// Param and LocalRef are only valid for the duration of a JNI method call.
@ -466,7 +467,7 @@ class RemoteAudioDecoder : public RemoteDataDecoder {
AssertOnTaskQueue();
if (ShouldDiscardSample() || !aBuffer->IsValid()) {
if (ShouldDiscardSample(aSample->Session()) || !aBuffer->IsValid()) {
aSample->Dispose();
return;
}
@ -590,6 +591,7 @@ RemoteDataDecoder::RemoteDataDecoder(MediaData::Type aType,
mFormat(aFormat),
mDrmStubId(aDrmStubId),
mTaskQueue(aTaskQueue),
mSession(0),
mNumPendingInputs(0) {}
RefPtr<MediaDataDecoder::FlushPromise> RemoteDataDecoder::Flush() {
@ -606,6 +608,7 @@ RefPtr<MediaDataDecoder::FlushPromise> RemoteDataDecoder::ProcessFlush() {
mDecodePromise.RejectIfExists(NS_ERROR_DOM_MEDIA_CANCELED, __func__);
mDrainPromise.RejectIfExists(NS_ERROR_DOM_MEDIA_CANCELED, __func__);
SetState(State::DRAINED);
mSession++;
mJavaDecoder->Flush();
return FlushPromise::CreateAndResolve(true, __func__);
}
@ -633,7 +636,7 @@ RefPtr<MediaDataDecoder::DecodePromise> RemoteDataDecoder::Drain() {
SetState(State::DRAINING);
self->mInputBufferInfo->Set(0, 0, -1,
MediaCodec::BUFFER_FLAG_END_OF_STREAM);
mJavaDecoder->Input(nullptr, self->mInputBufferInfo, nullptr);
mJavaDecoder->Input(nullptr, self->mInputBufferInfo, nullptr, mSession);
return p;
});
}
@ -740,7 +743,7 @@ RefPtr<MediaDataDecoder::DecodePromise> RemoteDataDecoder::ProcessDecode(
SetState(State::DRAINABLE);
mInputBufferInfo->Set(0, aSample->Size(), aSample->mTime.ToMicroseconds(), 0);
return mJavaDecoder->Input(bytes, mInputBufferInfo,
GetCryptoInfoFromSample(aSample))
GetCryptoInfoFromSample(aSample), mSession)
? mDecodePromise.Ensure(__func__)
: DecodePromise::CreateAndReject(
MediaResult(NS_ERROR_OUT_OF_MEMORY, __func__), __func__);
@ -828,6 +831,7 @@ void RemoteDataDecoder::DrainComplete() {
}
SetState(State::DRAINED);
ReturnDecodedData();
mSession++;
// Make decoder accept input again.
mJavaDecoder->Flush();
}

View file

@ -82,6 +82,10 @@ class RemoteDataDecoder : public MediaDataDecoder,
// information. Contents must be changed only on mTaskQueue.
java::sdk::BufferInfo::GlobalRef mInputBufferInfo;
// Session ID attached to samples. It's increased every time
// CodecProxy::Flush() is called. Accessed on mTaskqueue only.
int64_t mSession;
private:
enum class PendingOp { INCREASE, DECREASE, CLEAR };
void UpdatePendingInputStatus(PendingOp aOp);

View file

@ -1113,7 +1113,7 @@ int32_t WebrtcMediaCodecVP8VideoRemoteEncoder::Encode(
bufferInfo->Set(0, size, inputImage.render_time_ms() * PR_USEC_PER_MSEC, 0);
}
mJavaEncoder->Input(bytes, bufferInfo, nullptr);
mJavaEncoder->Input(bytes, bufferInfo, nullptr, 0);
return WEBRTC_VIDEO_CODEC_OK;
}

View file

@ -69,6 +69,7 @@ import org.mozilla.gecko.gfx.GeckoSurface;
private Queue<Sample> mDequeuedSamples = new LinkedList<>();
private Queue<Input> mInputSamples = new LinkedList<>();
private boolean mStopped;
private long mSession;
private synchronized Sample onAllocate(final int size) {
Sample sample = mSamplePool.obtainInput(size);
@ -89,10 +90,12 @@ import org.mozilla.gecko.gfx.GeckoSurface;
return;
}
Sample dequeued = mDequeuedSamples.remove();
dequeued.setBufferInfo(sample.info);
dequeued.setCryptoInfo(sample.cryptoInfo);
queueSample(dequeued);
if (sample.session >= mSession) {
Sample dequeued = mDequeuedSamples.remove();
dequeued.setBufferInfo(sample.info);
dequeued.setCryptoInfo(sample.cryptoInfo);
queueSample(dequeued);
}
sample.dispose();
}
@ -210,6 +213,7 @@ import org.mozilla.gecko.gfx.GeckoSurface;
mDequeuedSamples.clear();
mAvailableInputBuffers.clear();
mSession++;
}
private synchronized void start() {
@ -243,6 +247,7 @@ import org.mozilla.gecko.gfx.GeckoSurface;
private boolean mHasOutputCapacitySet;
private Queue<Output> mSentOutputs = new LinkedList<>();
private boolean mStopped;
private long mSession;
private OutputProcessor(final boolean renderToSurface) {
mRenderToSurface = renderToSurface;
@ -256,6 +261,7 @@ import org.mozilla.gecko.gfx.GeckoSurface;
try {
Sample output = obtainOutputSample(index, info);
mSentOutputs.add(new Output(output, index));
output.session = mSession;
mCallbacks.onOutput(output);
} catch (Exception e) {
e.printStackTrace();
@ -335,6 +341,7 @@ import org.mozilla.gecko.gfx.GeckoSurface;
mSamplePool.recycleOutput(o.sample);
}
mSentOutputs.clear();
mSession++;
}
private synchronized void start() {

View file

@ -224,7 +224,7 @@ public final class CodecProxy {
@WrapForJNI
public synchronized boolean input(final ByteBuffer bytes, final BufferInfo info,
final CryptoInfo cryptoInfo) {
final CryptoInfo cryptoInfo, final long session) {
if (mRemote == null) {
Log.e(LOGTAG, "cannot send input to an ended codec");
return false;
@ -239,7 +239,7 @@ public final class CodecProxy {
try {
Sample s = mRemote.dequeueInput(info.size);
fillInputBuffer(s.bufferId, bytes, info.offset, info.size);
return sendInput(s.set(info, cryptoInfo));
return sendInput(s.set(info, cryptoInfo, session));
} catch (RemoteException | NullPointerException e) {
Log.e(LOGTAG, "fail to dequeue input buffer", e);
} catch (IOException e) {

View file

@ -22,6 +22,8 @@ public final class Sample implements Parcelable {
EOS = new Sample();
EOS.info.set(0, 0, Long.MIN_VALUE, MediaCodec.BUFFER_FLAG_END_OF_STREAM);
}
@WrapForJNI
public long session;
public static final int NO_BUFFER = -1;
@ -71,9 +73,10 @@ public final class Sample implements Parcelable {
mode);
}
public Sample set(final BufferInfo info, final CryptoInfo cryptoInfo) {
public Sample set(final BufferInfo info, final CryptoInfo cryptoInfo, final long session) {
setBufferInfo(info);
setCryptoInfo(cryptoInfo);
this.session = session;
return this;
}
@ -144,6 +147,7 @@ public final class Sample implements Parcelable {
} else {
s = new Sample();
}
s.session = in.readLong();
s.bufferId = in.readInt();
s.readInfo(in);
s.readCrypto(in);
@ -158,6 +162,7 @@ public final class Sample implements Parcelable {
@Override
public void writeToParcel(final Parcel dest, final int parcelableFlags) {
dest.writeLong(session);
dest.writeInt(bufferId);
writeInfo(dest);
writeCrypto(dest);
@ -206,7 +211,8 @@ public final class Sample implements Parcelable {
}
StringBuilder str = new StringBuilder();
str.append("{ buffer#").append(bufferId).
str.append("{ session#:").append(session).
append(", buffer#").append(bufferId).
append(", info=").
append("{ offset=").append(info.offset).
append(", size=").append(info.size).