forked from mirrors/gecko-dev
Bug 1770630 - Worker stream readers should contribute to busy count. r=dom-worker-reviewers,jstutte
Differential Revision: https://phabricator.services.mozilla.com/D149185
This commit is contained in:
parent
73aa521160
commit
51a0dc12c2
7 changed files with 33 additions and 52 deletions
|
|
@ -115,8 +115,8 @@ void BodyStream::Create(JSContext* aCx, BodyStreamHolder* aStreamHolder,
|
||||||
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
||||||
MOZ_ASSERT(workerPrivate);
|
MOZ_ASSERT(workerPrivate);
|
||||||
|
|
||||||
RefPtr<WeakWorkerRef> workerRef =
|
RefPtr<StrongWorkerRef> workerRef =
|
||||||
WeakWorkerRef::Create(workerPrivate, [stream]() { stream->Close(); });
|
StrongWorkerRef::Create(workerPrivate, "BodyStream", [stream]() { stream->Close(); });
|
||||||
|
|
||||||
if (NS_WARN_IF(!workerRef)) {
|
if (NS_WARN_IF(!workerRef)) {
|
||||||
aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
|
aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
|
||||||
|
|
@ -215,6 +215,7 @@ already_AddRefed<Promise> BodyStream::PullCallback(
|
||||||
ErrorPropagation(aCx, lock, stream, rv);
|
ErrorPropagation(aCx, lock, stream, rv);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
mAsyncWaitWorkerRef = mWorkerRef;
|
||||||
|
|
||||||
// All good.
|
// All good.
|
||||||
return resolvedWithUndefinedPromise.forget();
|
return resolvedWithUndefinedPromise.forget();
|
||||||
|
|
@ -272,6 +273,7 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
|
||||||
ErrorPropagation(aCx, lock, aStream, rv);
|
ErrorPropagation(aCx, lock, aStream, rv);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
mAsyncWaitWorkerRef = mWorkerRef;
|
||||||
|
|
||||||
// All good.
|
// All good.
|
||||||
}
|
}
|
||||||
|
|
@ -429,6 +431,7 @@ BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
|
||||||
NO_THREAD_SAFETY_ANALYSIS {
|
NO_THREAD_SAFETY_ANALYSIS {
|
||||||
AssertIsOnOwningThread();
|
AssertIsOnOwningThread();
|
||||||
MOZ_DIAGNOSTIC_ASSERT(aStream);
|
MOZ_DIAGNOSTIC_ASSERT(aStream);
|
||||||
|
mAsyncWaitWorkerRef = nullptr;
|
||||||
|
|
||||||
// Acquire |mMutex| in order to safely inspect |mState| and use |mGlobal|.
|
// Acquire |mMutex| in order to safely inspect |mState| and use |mGlobal|.
|
||||||
Maybe<MutexSingleWriterAutoLock> lock;
|
Maybe<MutexSingleWriterAutoLock> lock;
|
||||||
|
|
@ -581,7 +584,7 @@ void BodyStream::ReleaseObjects(const MutexSingleWriterAutoLock& aProofOfLock) {
|
||||||
// Let's dispatch a WorkerControlRunnable if the owning thread is a worker.
|
// Let's dispatch a WorkerControlRunnable if the owning thread is a worker.
|
||||||
if (mWorkerRef) {
|
if (mWorkerRef) {
|
||||||
RefPtr<WorkerShutdown> r =
|
RefPtr<WorkerShutdown> r =
|
||||||
new WorkerShutdown(mWorkerRef->GetUnsafePrivate(), this);
|
new WorkerShutdown(mWorkerRef->Private(), this);
|
||||||
Unused << NS_WARN_IF(!r->Dispatch());
|
Unused << NS_WARN_IF(!r->Dispatch());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class ErrorResult;
|
||||||
namespace dom {
|
namespace dom {
|
||||||
|
|
||||||
class BodyStream;
|
class BodyStream;
|
||||||
class WeakWorkerRef;
|
class StrongWorkerRef;
|
||||||
class ReadableStream;
|
class ReadableStream;
|
||||||
class ReadableStreamController;
|
class ReadableStreamController;
|
||||||
|
|
||||||
|
|
@ -200,7 +200,8 @@ class BodyStream final : public nsIInputStreamCallback,
|
||||||
nsCOMPtr<nsIInputStream> mOriginalInputStream;
|
nsCOMPtr<nsIInputStream> mOriginalInputStream;
|
||||||
nsCOMPtr<nsIAsyncInputStream> mInputStream;
|
nsCOMPtr<nsIAsyncInputStream> mInputStream;
|
||||||
|
|
||||||
RefPtr<WeakWorkerRef> mWorkerRef;
|
RefPtr<StrongWorkerRef> mWorkerRef;
|
||||||
|
RefPtr<StrongWorkerRef> mAsyncWaitWorkerRef;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dom
|
} // namespace dom
|
||||||
|
|
|
||||||
|
|
@ -76,16 +76,14 @@ nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
|
||||||
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
||||||
MOZ_ASSERT(workerPrivate);
|
MOZ_ASSERT(workerPrivate);
|
||||||
|
|
||||||
RefPtr<WeakWorkerRef> workerRef =
|
RefPtr<StrongWorkerRef> workerRef = StrongWorkerRef::Create(
|
||||||
WeakWorkerRef::Create(workerPrivate, [streamReader]() {
|
workerPrivate, "FetchStreamReader", [streamReader]() {
|
||||||
MOZ_ASSERT(streamReader);
|
MOZ_ASSERT(streamReader);
|
||||||
MOZ_ASSERT(streamReader->mWorkerRef);
|
MOZ_ASSERT(streamReader->mWorkerRef);
|
||||||
|
|
||||||
WorkerPrivate* workerPrivate = streamReader->mWorkerRef->GetPrivate();
|
streamReader->CloseAndRelease(
|
||||||
MOZ_ASSERT(workerPrivate);
|
streamReader->mWorkerRef->Private()->GetJSContext(),
|
||||||
|
NS_ERROR_DOM_INVALID_STATE_ERR);
|
||||||
streamReader->CloseAndRelease(workerPrivate->GetJSContext(),
|
|
||||||
NS_ERROR_DOM_INVALID_STATE_ERR);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if (NS_WARN_IF(!workerRef)) {
|
if (NS_WARN_IF(!workerRef)) {
|
||||||
|
|
@ -194,6 +192,7 @@ void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
|
||||||
if (NS_WARN_IF(aRv.Failed())) {
|
if (NS_WARN_IF(aRv.Failed())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
mAsyncWaitWorkerRef = mWorkerRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct FetchReadRequest : public ReadRequest {
|
struct FetchReadRequest : public ReadRequest {
|
||||||
|
|
@ -236,6 +235,7 @@ MOZ_CAN_RUN_SCRIPT_BOUNDARY
|
||||||
NS_IMETHODIMP
|
NS_IMETHODIMP
|
||||||
FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
|
FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
|
||||||
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
|
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
|
||||||
|
mAsyncWaitWorkerRef = nullptr;
|
||||||
if (mStreamClosed) {
|
if (mStreamClosed) {
|
||||||
return NS_OK;
|
return NS_OK;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ namespace mozilla::dom {
|
||||||
|
|
||||||
class ReadableStream;
|
class ReadableStream;
|
||||||
class ReadableStreamDefaultReader;
|
class ReadableStreamDefaultReader;
|
||||||
class WeakWorkerRef;
|
class StrongWorkerRef;
|
||||||
|
|
||||||
class FetchStreamReader final : public nsIOutputStreamCallback {
|
class FetchStreamReader final : public nsIOutputStreamCallback {
|
||||||
public:
|
public:
|
||||||
|
|
@ -66,7 +66,8 @@ class FetchStreamReader final : public nsIOutputStreamCallback {
|
||||||
|
|
||||||
nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
|
nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
|
||||||
|
|
||||||
RefPtr<WeakWorkerRef> mWorkerRef;
|
RefPtr<StrongWorkerRef> mWorkerRef;
|
||||||
|
RefPtr<StrongWorkerRef> mAsyncWaitWorkerRef;
|
||||||
|
|
||||||
RefPtr<ReadableStreamDefaultReader> mReader;
|
RefPtr<ReadableStreamDefaultReader> mReader;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -295,14 +295,13 @@ class WorkerStreamOwner final {
|
||||||
RefPtr<WorkerStreamOwner> self =
|
RefPtr<WorkerStreamOwner> self =
|
||||||
new WorkerStreamOwner(aStream, std::move(target));
|
new WorkerStreamOwner(aStream, std::move(target));
|
||||||
|
|
||||||
self->mWorkerRef = WeakWorkerRef::Create(aWorker, [self]() {
|
self->mWorkerRef = StrongWorkerRef::Create(aWorker, "JSStreamConsumer", [self]() {
|
||||||
if (self->mStream) {
|
if (self->mStream) {
|
||||||
// If this Close() calls JSStreamConsumer::OnInputStreamReady and drops
|
// If this Close() calls JSStreamConsumer::OnInputStreamReady and drops
|
||||||
// the last reference to the JSStreamConsumer, 'this' will not be
|
// the last reference to the JSStreamConsumer, 'this' will not be
|
||||||
// destroyed since ~JSStreamConsumer() only enqueues a release proxy.
|
// destroyed since ~JSStreamConsumer() only enqueues a release proxy.
|
||||||
self->mStream->Close();
|
self->mStream->Close();
|
||||||
self->mStream = nullptr;
|
self->mStream = nullptr;
|
||||||
self->mWorkerRef = nullptr;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -326,7 +325,7 @@ class WorkerStreamOwner final {
|
||||||
// Read from any thread but only set/cleared on the worker thread. The
|
// Read from any thread but only set/cleared on the worker thread. The
|
||||||
// lifecycle of WorkerStreamOwner prevents concurrent read/clear.
|
// lifecycle of WorkerStreamOwner prevents concurrent read/clear.
|
||||||
nsCOMPtr<nsIAsyncInputStream> mStream;
|
nsCOMPtr<nsIAsyncInputStream> mStream;
|
||||||
RefPtr<WeakWorkerRef> mWorkerRef;
|
RefPtr<StrongWorkerRef> mWorkerRef;
|
||||||
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
|
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -624,15 +624,15 @@ FileReader::Notify(nsITimer* aTimer) {
|
||||||
// InputStreamCallback
|
// InputStreamCallback
|
||||||
NS_IMETHODIMP
|
NS_IMETHODIMP
|
||||||
FileReader::OnInputStreamReady(nsIAsyncInputStream* aStream) {
|
FileReader::OnInputStreamReady(nsIAsyncInputStream* aStream) {
|
||||||
if (mReadyState != LOADING || aStream != mAsyncStream) {
|
|
||||||
return NS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We use this class to decrease the busy counter at the end of this method.
|
// We use this class to decrease the busy counter at the end of this method.
|
||||||
// In theory we can do it immediatelly but, for debugging reasons, we want to
|
// In theory we can do it immediatelly but, for debugging reasons, we want to
|
||||||
// be 100% sure we have a workerRef when OnLoadEnd() is called.
|
// be 100% sure we have a workerRef when OnLoadEnd() is called.
|
||||||
FileReaderDecreaseBusyCounter RAII(this);
|
FileReaderDecreaseBusyCounter RAII(this);
|
||||||
|
|
||||||
|
if (mReadyState != LOADING || aStream != mAsyncStream) {
|
||||||
|
return NS_OK;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t count;
|
uint64_t count;
|
||||||
nsresult rv = aStream->Available(&count);
|
nsresult rv = aStream->Available(&count);
|
||||||
|
|
||||||
|
|
@ -731,14 +731,7 @@ void FileReader::Abort() {
|
||||||
|
|
||||||
MOZ_ASSERT(mReadyState == LOADING);
|
MOZ_ASSERT(mReadyState == LOADING);
|
||||||
|
|
||||||
ClearProgressEventTimer();
|
Cleanup();
|
||||||
|
|
||||||
if (mAsyncWaitRunnable) {
|
|
||||||
mAsyncWaitRunnable->Cancel();
|
|
||||||
mAsyncWaitRunnable = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
mReadyState = DONE;
|
|
||||||
|
|
||||||
// XXX The spec doesn't say this
|
// XXX The spec doesn't say this
|
||||||
mError = DOMException::Create(NS_ERROR_DOM_ABORT_ERR);
|
mError = DOMException::Create(NS_ERROR_DOM_ABORT_ERR);
|
||||||
|
|
@ -747,30 +740,12 @@ void FileReader::Abort() {
|
||||||
SetDOMStringToNull(mResult);
|
SetDOMStringToNull(mResult);
|
||||||
mResultArrayBuffer = nullptr;
|
mResultArrayBuffer = nullptr;
|
||||||
|
|
||||||
// If we have the stream and the busy-count is not 0, it means that we are
|
|
||||||
// waiting for an OnInputStreamReady() call. Let's abort the current
|
|
||||||
// AsyncWait() calling it again with a nullptr callback. See
|
|
||||||
// nsIAsyncInputStream.idl.
|
|
||||||
if (mAsyncStream && mBusyCount) {
|
|
||||||
mAsyncStream->AsyncWait(/* callback */ nullptr,
|
|
||||||
/* aFlags*/ 0,
|
|
||||||
/* aRequestedCount */ 0, mTarget);
|
|
||||||
DecreaseBusyCounter();
|
|
||||||
MOZ_ASSERT(mBusyCount == 0);
|
|
||||||
|
|
||||||
mAsyncStream->Close();
|
|
||||||
}
|
|
||||||
|
|
||||||
mAsyncStream = nullptr;
|
|
||||||
mBlob = nullptr;
|
mBlob = nullptr;
|
||||||
|
|
||||||
// Clean up memory buffer
|
|
||||||
FreeFileData();
|
|
||||||
|
|
||||||
// Dispatch the events
|
// Dispatch the events
|
||||||
DispatchProgressEvent(nsLiteralString(ABORT_STR));
|
DispatchProgressEvent(nsLiteralString(ABORT_STR));
|
||||||
DispatchProgressEvent(nsLiteralString(LOADEND_STR));
|
DispatchProgressEvent(nsLiteralString(LOADEND_STR));
|
||||||
} // namespace dom
|
}
|
||||||
|
|
||||||
nsresult FileReader::IncreaseBusyCounter() {
|
nsresult FileReader::IncreaseBusyCounter() {
|
||||||
if (mWeakWorkerRef && mBusyCount++ == 0) {
|
if (mWeakWorkerRef && mBusyCount++ == 0) {
|
||||||
|
|
@ -800,7 +775,7 @@ void FileReader::DecreaseBusyCounter() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void FileReader::Shutdown() {
|
void FileReader::Cleanup() {
|
||||||
mReadyState = DONE;
|
mReadyState = DONE;
|
||||||
|
|
||||||
if (mAsyncWaitRunnable) {
|
if (mAsyncWaitRunnable) {
|
||||||
|
|
@ -816,11 +791,12 @@ void FileReader::Shutdown() {
|
||||||
ClearProgressEventTimer();
|
ClearProgressEventTimer();
|
||||||
FreeFileData();
|
FreeFileData();
|
||||||
mResultArrayBuffer = nullptr;
|
mResultArrayBuffer = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
if (mWeakWorkerRef && mBusyCount != 0) {
|
void FileReader::Shutdown() {
|
||||||
mStrongWorkerRef = nullptr;
|
Cleanup();
|
||||||
|
if (mWeakWorkerRef) {
|
||||||
mWeakWorkerRef = nullptr;
|
mWeakWorkerRef = nullptr;
|
||||||
mBusyCount = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,7 @@ class FileReader final : public DOMEventTargetHelper,
|
||||||
nsresult IncreaseBusyCounter();
|
nsresult IncreaseBusyCounter();
|
||||||
void DecreaseBusyCounter();
|
void DecreaseBusyCounter();
|
||||||
|
|
||||||
|
void Cleanup();
|
||||||
void Shutdown();
|
void Shutdown();
|
||||||
|
|
||||||
char* mFileData;
|
char* mFileData;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue