forked from mirrors/gecko-dev
Having another read request on an empty chunk sounds nice, but I guess it's not worth allowing recursion for this edge case. Now the next read request will happen asynchronously by the next OnOutputStreamReady callback, which is similar to what Blink does and the spec recommends. * Blink: https://source.chromium.org/chromium/chromium/src/+/main:third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.cc;l=179-186;drc=059796845b1738dbf28ea76f0e9ff4b6f8787d3a (queues a microtask to prevent recursion) * Spec: https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes (See the note below "chunk steps") Differential Revision: https://phabricator.services.mozilla.com/D173813
436 lines
14 KiB
C++
436 lines
14 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||
|
||
#include "FetchStreamReader.h"
|
||
#include "InternalResponse.h"
|
||
#include "mozilla/ConsoleReportCollector.h"
|
||
#include "mozilla/ErrorResult.h"
|
||
#include "mozilla/StaticAnalysisFunctions.h"
|
||
#include "mozilla/dom/AutoEntryScript.h"
|
||
#include "mozilla/dom/Promise.h"
|
||
#include "mozilla/dom/PromiseBinding.h"
|
||
#include "mozilla/dom/ReadableStream.h"
|
||
#include "mozilla/dom/ReadableStreamDefaultController.h"
|
||
#include "mozilla/dom/ReadableStreamDefaultReader.h"
|
||
#include "mozilla/dom/WorkerPrivate.h"
|
||
#include "mozilla/dom/WorkerRef.h"
|
||
#include "mozilla/HoldDropJSObjects.h"
|
||
#include "mozilla/TaskCategory.h"
|
||
#include "nsContentUtils.h"
|
||
#include "nsDebug.h"
|
||
#include "nsIAsyncInputStream.h"
|
||
#include "nsIPipe.h"
|
||
#include "nsIScriptError.h"
|
||
#include "nsPIDOMWindow.h"
|
||
#include "jsapi.h"
|
||
|
||
namespace mozilla::dom {
|
||
|
||
NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader)
|
||
NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader)
|
||
|
||
NS_IMPL_CYCLE_COLLECTION_CLASS(FetchStreamReader)
|
||
|
||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(FetchStreamReader)
|
||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal)
|
||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
|
||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||
|
||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(FetchStreamReader)
|
||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal)
|
||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
|
||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||
|
||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(FetchStreamReader)
|
||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||
|
||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader)
|
||
NS_INTERFACE_MAP_ENTRY(nsIOutputStreamCallback)
|
||
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIOutputStreamCallback)
|
||
NS_INTERFACE_MAP_END
|
||
|
||
/* static */
|
||
nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
|
||
FetchStreamReader** aStreamReader,
|
||
nsIInputStream** aInputStream) {
|
||
MOZ_ASSERT(aCx);
|
||
MOZ_ASSERT(aGlobal);
|
||
MOZ_ASSERT(aStreamReader);
|
||
MOZ_ASSERT(aInputStream);
|
||
|
||
RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal);
|
||
|
||
nsCOMPtr<nsIAsyncInputStream> pipeIn;
|
||
|
||
NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(streamReader->mPipeOut),
|
||
true, true, 0, 0);
|
||
|
||
if (!NS_IsMainThread()) {
|
||
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
||
MOZ_ASSERT(workerPrivate);
|
||
|
||
RefPtr<StrongWorkerRef> workerRef = StrongWorkerRef::Create(
|
||
workerPrivate, "FetchStreamReader", [streamReader]() {
|
||
MOZ_ASSERT(streamReader);
|
||
MOZ_ASSERT(streamReader->mWorkerRef);
|
||
|
||
streamReader->CloseAndRelease(
|
||
streamReader->mWorkerRef->Private()->GetJSContext(),
|
||
NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
});
|
||
|
||
if (NS_WARN_IF(!workerRef)) {
|
||
streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
return NS_ERROR_DOM_INVALID_STATE_ERR;
|
||
}
|
||
|
||
// These 2 objects create a ref-cycle here that is broken when the stream is
|
||
// closed or the worker shutsdown.
|
||
streamReader->mWorkerRef = std::move(workerRef);
|
||
}
|
||
|
||
pipeIn.forget(aInputStream);
|
||
streamReader.forget(aStreamReader);
|
||
return NS_OK;
|
||
}
|
||
|
||
FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
|
||
: mGlobal(aGlobal),
|
||
mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)) {
|
||
MOZ_ASSERT(aGlobal);
|
||
|
||
mozilla::HoldJSObjects(this);
|
||
}
|
||
|
||
FetchStreamReader::~FetchStreamReader() {
|
||
CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED);
|
||
|
||
mozilla::DropJSObjects(this);
|
||
}
|
||
|
||
// If a context is provided, an attempt will be made to cancel the reader. The
|
||
// only situation where we don't expect to have a context is when closure is
|
||
// being triggered from the destructor or the WorkerRef is notifying. If
|
||
// we're at the destructor, it's far too late to cancel anything. And if the
|
||
// WorkerRef is being notified, the global is going away, so there's also
|
||
// no need to do further JS work.
|
||
void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) {
|
||
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
|
||
|
||
if (mStreamClosed) {
|
||
// Already closed.
|
||
return;
|
||
}
|
||
|
||
RefPtr<FetchStreamReader> kungFuDeathGrip = this;
|
||
if (aCx && mReader) {
|
||
ErrorResult rv;
|
||
if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) {
|
||
rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>();
|
||
} else {
|
||
rv = aStatus;
|
||
}
|
||
JS::Rooted<JS::Value> errorValue(aCx);
|
||
if (ToJSValue(aCx, std::move(rv), &errorValue)) {
|
||
IgnoredErrorResult ignoredError;
|
||
// It's currently safe to cancel an already closed reader because, per the
|
||
// comments in ReadableStream::cancel() conveying the spec, step 2 of
|
||
// 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
|
||
// "closed", return a new promise resolved with undefined.
|
||
RefPtr<Promise> cancelResultPromise =
|
||
MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError);
|
||
NS_WARNING_ASSERTION(!ignoredError.Failed(),
|
||
"Failed to cancel stream during close and release");
|
||
if (cancelResultPromise) {
|
||
bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled();
|
||
NS_WARNING_ASSERTION(setHandled,
|
||
"Failed to mark cancel promise as handled.");
|
||
(void)setHandled;
|
||
}
|
||
}
|
||
|
||
// We don't want to propagate exceptions during the cleanup.
|
||
JS_ClearPendingException(aCx);
|
||
}
|
||
|
||
mStreamClosed = true;
|
||
|
||
mGlobal = nullptr;
|
||
|
||
if (mPipeOut) {
|
||
mPipeOut->CloseWithStatus(aStatus);
|
||
}
|
||
mPipeOut = nullptr;
|
||
|
||
mWorkerRef = nullptr;
|
||
|
||
mReader = nullptr;
|
||
mBuffer.Clear();
|
||
}
|
||
|
||
// https://fetch.spec.whatwg.org/#body-incrementally-read
|
||
void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
|
||
ReadableStreamDefaultReader** aReader,
|
||
ErrorResult& aRv) {
|
||
MOZ_DIAGNOSTIC_ASSERT(!mReader);
|
||
MOZ_DIAGNOSTIC_ASSERT(aStream);
|
||
|
||
// Step 2: Let reader be the result of getting a reader for body’s stream.
|
||
RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv);
|
||
if (aRv.Failed()) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
return;
|
||
}
|
||
|
||
mReader = reader;
|
||
reader.forget(aReader);
|
||
|
||
mAsyncWaitWorkerRef = mWorkerRef;
|
||
aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
|
||
if (NS_WARN_IF(aRv.Failed())) {
|
||
mAsyncWaitWorkerRef = nullptr;
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
}
|
||
}
|
||
|
||
struct FetchReadRequest : public ReadRequest {
|
||
public:
|
||
NS_DECL_ISUPPORTS_INHERITED
|
||
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest)
|
||
|
||
explicit FetchReadRequest(FetchStreamReader* aReader)
|
||
: mFetchStreamReader(aReader) {}
|
||
|
||
MOZ_CAN_RUN_SCRIPT_BOUNDARY
|
||
void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
|
||
ErrorResult& aRv) override {
|
||
mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv);
|
||
}
|
||
|
||
MOZ_CAN_RUN_SCRIPT_BOUNDARY
|
||
void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
|
||
mFetchStreamReader->CloseSteps(aCx, aRv);
|
||
}
|
||
|
||
MOZ_CAN_RUN_SCRIPT_BOUNDARY
|
||
void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
|
||
ErrorResult& aRv) override {
|
||
mFetchStreamReader->ErrorSteps(aCx, aError, aRv);
|
||
}
|
||
|
||
protected:
|
||
virtual ~FetchReadRequest() = default;
|
||
|
||
MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader;
|
||
};
|
||
|
||
NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest,
|
||
mFetchStreamReader)
|
||
NS_IMPL_ADDREF_INHERITED(FetchReadRequest, ReadRequest)
|
||
NS_IMPL_RELEASE_INHERITED(FetchReadRequest, ReadRequest)
|
||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest)
|
||
NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
|
||
|
||
// nsIOutputStreamCallback interface
|
||
MOZ_CAN_RUN_SCRIPT_BOUNDARY
|
||
NS_IMETHODIMP
|
||
FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
|
||
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
|
||
if (mStreamClosed) {
|
||
mAsyncWaitWorkerRef = nullptr;
|
||
return NS_OK;
|
||
}
|
||
|
||
AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
|
||
if (!Process(aes.cx())) {
|
||
// We're done processing data, and haven't queued up a new AsyncWait - we
|
||
// can clear our mAsyncWaitWorkerRef.
|
||
mAsyncWaitWorkerRef = nullptr;
|
||
}
|
||
return NS_OK;
|
||
}
|
||
|
||
bool FetchStreamReader::Process(JSContext* aCx) {
|
||
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
|
||
MOZ_ASSERT(mReader);
|
||
|
||
if (!mBuffer.IsEmpty()) {
|
||
nsresult rv = WriteBuffer();
|
||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
// Check if the output stream has already been closed. This lets us propagate
|
||
// errors eagerly, and detect output stream closures even when we have no data
|
||
// to write.
|
||
if (NS_WARN_IF(NS_FAILED(mPipeOut->StreamStatus()))) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
|
||
return false;
|
||
}
|
||
|
||
// We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
|
||
// notice if the reader closes.
|
||
nsresult rv = mPipeOut->AsyncWait(
|
||
this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, mOwningEventTarget);
|
||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
return false;
|
||
}
|
||
|
||
// If we already have an outstanding read request, don't start another one
|
||
// concurrently.
|
||
if (!mHasOutstandingReadRequest) {
|
||
// https://fetch.spec.whatwg.org/#incrementally-read-loop
|
||
// The below very loosely tries to implement the incrementally-read-loop
|
||
// from the fetch spec.
|
||
// Step 2: Read a chunk from reader given readRequest.
|
||
RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
|
||
RefPtr<ReadableStreamDefaultReader> reader = mReader;
|
||
mHasOutstandingReadRequest = true;
|
||
|
||
IgnoredErrorResult err;
|
||
reader->ReadChunk(aCx, *readRequest, err);
|
||
if (NS_WARN_IF(err.Failed())) {
|
||
// Let's close the stream.
|
||
mHasOutstandingReadRequest = false;
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
|
||
// Don't return false, as we've already called `AsyncWait`.
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
|
||
ErrorResult& aRv) {
|
||
// This roughly implements the chunk steps from
|
||
// https://fetch.spec.whatwg.org/#incrementally-read-loop.
|
||
|
||
mHasOutstandingReadRequest = false;
|
||
|
||
// Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
|
||
// this step: run processBodyError given a TypeError.
|
||
RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx);
|
||
if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR);
|
||
return;
|
||
}
|
||
chunk.ComputeState();
|
||
|
||
MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
|
||
|
||
// Let's take a copy of the data.
|
||
// FIXME: We could sometimes avoid this copy by trying to write `chunk`
|
||
// directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
|
||
// enough space in the pipe's buffer.
|
||
if (!mBuffer.AppendElements(chunk.Data(), chunk.Length(), fallible)) {
|
||
CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY);
|
||
return;
|
||
}
|
||
|
||
mBufferOffset = 0;
|
||
mBufferRemaining = chunk.Length();
|
||
|
||
nsresult rv = WriteBuffer();
|
||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
|
||
}
|
||
}
|
||
|
||
void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
|
||
mHasOutstandingReadRequest = false;
|
||
CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
|
||
}
|
||
|
||
void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
|
||
ErrorResult& aRv) {
|
||
mHasOutstandingReadRequest = false;
|
||
ReportErrorToConsole(aCx, aError);
|
||
CloseAndRelease(aCx, NS_ERROR_FAILURE);
|
||
}
|
||
|
||
nsresult FetchStreamReader::WriteBuffer() {
|
||
MOZ_ASSERT(mBuffer.Length() == (mBufferOffset + mBufferRemaining));
|
||
|
||
char* data = reinterpret_cast<char*>(mBuffer.Elements());
|
||
|
||
while (mBufferRemaining > 0) {
|
||
uint32_t written = 0;
|
||
nsresult rv =
|
||
mPipeOut->Write(data + mBufferOffset, mBufferRemaining, &written);
|
||
|
||
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
|
||
break;
|
||
}
|
||
|
||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||
return rv;
|
||
}
|
||
|
||
MOZ_ASSERT(written <= mBufferRemaining);
|
||
mBufferRemaining -= written;
|
||
mBufferOffset += written;
|
||
|
||
if (mBufferRemaining == 0) {
|
||
mBuffer.Clear();
|
||
break;
|
||
}
|
||
}
|
||
|
||
nsresult rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
|
||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||
return rv;
|
||
}
|
||
|
||
return NS_OK;
|
||
}
|
||
|
||
void FetchStreamReader::ReportErrorToConsole(JSContext* aCx,
|
||
JS::Handle<JS::Value> aValue) {
|
||
nsCString sourceSpec;
|
||
uint32_t line = 0;
|
||
uint32_t column = 0;
|
||
nsString valueString;
|
||
|
||
nsContentUtils::ExtractErrorValues(aCx, aValue, sourceSpec, &line, &column,
|
||
valueString);
|
||
|
||
nsTArray<nsString> params;
|
||
params.AppendElement(valueString);
|
||
|
||
RefPtr<ConsoleReportCollector> reporter = new ConsoleReportCollector();
|
||
reporter->AddConsoleReport(nsIScriptError::errorFlag,
|
||
"ReadableStreamReader.read"_ns,
|
||
nsContentUtils::eDOM_PROPERTIES, sourceSpec, line,
|
||
column, "ReadableStreamReadingFailed"_ns, params);
|
||
|
||
uint64_t innerWindowId = 0;
|
||
|
||
if (NS_IsMainThread()) {
|
||
nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
|
||
if (window) {
|
||
innerWindowId = window->WindowID();
|
||
}
|
||
reporter->FlushReportsToConsole(innerWindowId);
|
||
return;
|
||
}
|
||
|
||
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
|
||
if (workerPrivate) {
|
||
innerWindowId = workerPrivate->WindowID();
|
||
}
|
||
|
||
RefPtr<Runnable> r = NS_NewRunnableFunction(
|
||
"FetchStreamReader::ReportErrorToConsole", [reporter, innerWindowId]() {
|
||
reporter->FlushReportsToConsole(innerWindowId);
|
||
});
|
||
|
||
workerPrivate->DispatchToMainThread(r.forget());
|
||
}
|
||
|
||
} // namespace mozilla::dom
|