forked from mirrors/gecko-dev
		
	
		
			
				
	
	
		
			280 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			280 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
 | |
| /* vim:set ts=2 sw=2 sts=2 et cindent: */
 | |
| /* This Source Code Form is subject to the terms of the Mozilla Public
 | |
|  * License, v. 2.0. If a copy of the MPL was not distributed with this
 | |
|  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 | |
| 
 | |
| #include "mozilla/dom/UnderlyingSinkCallbackHelpers.h"
 | |
| #include "StreamUtils.h"
 | |
| #include "mozilla/dom/UnionTypes.h"
 | |
| #include "mozilla/dom/WebTransportError.h"
 | |
| #include "nsHttp.h"
 | |
| 
 | |
| using namespace mozilla::dom;
 | |
| 
 | |
| NS_IMPL_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase)
 | |
| NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkAlgorithmsBase)
 | |
| NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkAlgorithmsBase)
 | |
| NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithmsBase)
 | |
|   NS_INTERFACE_MAP_ENTRY(nsISupports)
 | |
| NS_INTERFACE_MAP_END
 | |
| 
 | |
| NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS(
 | |
|     UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase,
 | |
|     (mGlobal, mStartCallback, mWriteCallback, mCloseCallback, mAbortCallback),
 | |
|     (mUnderlyingSink))
 | |
| NS_IMPL_ADDREF_INHERITED(UnderlyingSinkAlgorithms, UnderlyingSinkAlgorithmsBase)
 | |
| NS_IMPL_RELEASE_INHERITED(UnderlyingSinkAlgorithms,
 | |
|                           UnderlyingSinkAlgorithmsBase)
 | |
| NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAlgorithms)
 | |
| NS_INTERFACE_MAP_END_INHERITING(UnderlyingSinkAlgorithmsBase)
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
 | |
| void UnderlyingSinkAlgorithms::StartCallback(
 | |
|     JSContext* aCx, WritableStreamDefaultController& aController,
 | |
|     JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) {
 | |
|   if (!mStartCallback) {
 | |
|     // Step 2: Let startAlgorithm be an algorithm that returns undefined.
 | |
|     aRetVal.setUndefined();
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   // Step 6: If underlyingSinkDict["start"] exists, then set startAlgorithm to
 | |
|   // an algorithm which returns the result of invoking
 | |
|   // underlyingSinkDict["start"] with argument list « controller » and callback
 | |
|   // this value underlyingSink.
 | |
|   JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
 | |
|   return mStartCallback->Call(thisObj, aController, aRetVal, aRv,
 | |
|                               "UnderlyingSink.start",
 | |
|                               CallbackFunction::eRethrowExceptions);
 | |
| }
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
 | |
| already_AddRefed<Promise> UnderlyingSinkAlgorithms::WriteCallback(
 | |
|     JSContext* aCx, JS::Handle<JS::Value> aChunk,
 | |
|     WritableStreamDefaultController& aController, ErrorResult& aRv) {
 | |
|   if (!mWriteCallback) {
 | |
|     // Step 3: Let writeAlgorithm be an algorithm that returns a promise
 | |
|     // resolved with undefined.
 | |
|     return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
 | |
|   }
 | |
| 
 | |
|   // Step 7: If underlyingSinkDict["write"] exists, then set writeAlgorithm to
 | |
|   // an algorithm which takes an argument chunk and returns the result of
 | |
|   // invoking underlyingSinkDict["write"] with argument list « chunk, controller
 | |
|   // » and callback this value underlyingSink.
 | |
|   JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
 | |
|   RefPtr<Promise> promise = mWriteCallback->Call(
 | |
|       thisObj, aChunk, aController, aRv, "UnderlyingSink.write",
 | |
|       CallbackFunction::eRethrowExceptions);
 | |
|   return promise.forget();
 | |
| }
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
 | |
| already_AddRefed<Promise> UnderlyingSinkAlgorithms::CloseCallback(
 | |
|     JSContext* aCx, ErrorResult& aRv) {
 | |
|   if (!mCloseCallback) {
 | |
|     // Step 4: Let closeAlgorithm be an algorithm that returns a promise
 | |
|     // resolved with undefined.
 | |
|     return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
 | |
|   }
 | |
| 
 | |
|   // Step 8: If underlyingSinkDict["close"] exists, then set closeAlgorithm to
 | |
|   // an algorithm which returns the result of invoking
 | |
|   // underlyingSinkDict["close"] with argument list «» and callback this value
 | |
|   // underlyingSink.
 | |
|   JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
 | |
|   RefPtr<Promise> promise =
 | |
|       mCloseCallback->Call(thisObj, aRv, "UnderlyingSink.close",
 | |
|                            CallbackFunction::eRethrowExceptions);
 | |
|   return promise.forget();
 | |
| }
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
 | |
| already_AddRefed<Promise> UnderlyingSinkAlgorithms::AbortCallback(
 | |
|     JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
 | |
|     ErrorResult& aRv) {
 | |
|   if (!mAbortCallback) {
 | |
|     // Step 5: Let abortAlgorithm be an algorithm that returns a promise
 | |
|     // resolved with undefined.
 | |
|     return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
 | |
|   }
 | |
| 
 | |
|   // Step 9: Let abortAlgorithm be an algorithm that returns a promise resolved
 | |
|   // with undefined.
 | |
|   JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
 | |
|   RefPtr<Promise> promise =
 | |
|       mAbortCallback->Call(thisObj, aReason, aRv, "UnderlyingSink.abort",
 | |
|                            CallbackFunction::eRethrowExceptions);
 | |
| 
 | |
|   return promise.forget();
 | |
| }
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#writable-set-up
 | |
| // Step 2.1: Let closeAlgorithmWrapper be an algorithm that runs these steps:
 | |
| already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::CloseCallback(
 | |
|     JSContext* aCx, ErrorResult& aRv) {
 | |
|   nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx);
 | |
|   return PromisifyAlgorithm(
 | |
|       global, [&](ErrorResult& aRv) { return CloseCallbackImpl(aCx, aRv); },
 | |
|       aRv);
 | |
| }
 | |
| 
 | |
| // https://streams.spec.whatwg.org/#writable-set-up
 | |
| // Step 3.1: Let abortAlgorithmWrapper be an algorithm that runs these steps:
 | |
| already_AddRefed<Promise> UnderlyingSinkAlgorithmsWrapper::AbortCallback(
 | |
|     JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
 | |
|     ErrorResult& aRv) {
 | |
|   nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx);
 | |
|   return PromisifyAlgorithm(
 | |
|       global,
 | |
|       [&](ErrorResult& aRv) { return AbortCallbackImpl(aCx, aReason, aRv); },
 | |
|       aRv);
 | |
| }
 | |
| 
 | |
| NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput,
 | |
|                                              UnderlyingSinkAlgorithmsBase,
 | |
|                                              nsIOutputStreamCallback)
 | |
| NS_IMPL_CYCLE_COLLECTION_INHERITED(WritableStreamToOutput,
 | |
|                                    UnderlyingSinkAlgorithmsBase, mParent,
 | |
|                                    mOutput, mPromise)
 | |
| 
 | |
| NS_IMETHODIMP
 | |
| WritableStreamToOutput::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
 | |
|   if (!mData) {
 | |
|     return NS_OK;
 | |
|   }
 | |
|   MOZ_ASSERT(mPromise);
 | |
|   uint32_t written = 0;
 | |
|   nsresult rv = mOutput->Write(
 | |
|       reinterpret_cast<const char*>(mData->Elements() + mWritten),
 | |
|       mData->Length() - mWritten, &written);
 | |
|   if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) {
 | |
|     mPromise->MaybeRejectWithAbortError("Error writing to stream"_ns);
 | |
|     ClearData();
 | |
|     // XXX should we add mErrored and fail future calls immediately?
 | |
|     // I presume new calls to Write() will fail though, too
 | |
|     return rv;
 | |
|   }
 | |
|   if (NS_SUCCEEDED(rv)) {
 | |
|     mWritten += written;
 | |
|     MOZ_ASSERT(mWritten <= mData->Length());
 | |
|     if (mWritten >= mData->Length()) {
 | |
|       mPromise->MaybeResolveWithUndefined();
 | |
|       ClearData();
 | |
|       return NS_OK;
 | |
|     }
 | |
|     // more to write
 | |
|   }
 | |
|   // wrote partial or nothing
 | |
|   // Wait for space
 | |
|   nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget();
 | |
|   rv = mOutput->AsyncWait(this, 0, 0, target);
 | |
|   if (NS_FAILED(rv)) {
 | |
|     mPromise->MaybeRejectWithUnknownError("error waiting to write data");
 | |
|     ClearData();
 | |
|     // XXX should we add mErrored and fail future calls immediately?
 | |
|     // New calls to Write() will fail, note
 | |
|     // See step 5.2 of
 | |
|     // https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write.
 | |
|     return rv;
 | |
|   }
 | |
|   return NS_OK;
 | |
| }
 | |
| 
 | |
| already_AddRefed<Promise> WritableStreamToOutput::WriteCallback(
 | |
|     JSContext* aCx, JS::Handle<JS::Value> aChunk,
 | |
|     WritableStreamDefaultController& aController, ErrorResult& aError) {
 | |
|   ArrayBufferViewOrArrayBuffer data;
 | |
|   if (!data.Init(aCx, aChunk)) {
 | |
|     aError.StealExceptionFromJSContext(aCx);
 | |
|     return nullptr;
 | |
|   }
 | |
|   // buffer/bufferView
 | |
|   MOZ_ASSERT(data.IsArrayBuffer() || data.IsArrayBufferView());
 | |
| 
 | |
|   RefPtr<Promise> promise = Promise::Create(mParent, aError);
 | |
|   if (NS_WARN_IF(aError.Failed())) {
 | |
|     return nullptr;
 | |
|   }
 | |
| 
 | |
|   // This is a duplicate of dom/encoding/TextDecoderStream.cpp#51-69
 | |
|   // PeterV will deal with that when he lands his patch for TypedArrays
 | |
|   auto dataSpan = [&data]() {
 | |
|     if (data.IsArrayBuffer()) {
 | |
|       const ArrayBuffer& buffer = data.GetAsArrayBuffer();
 | |
|       buffer.ComputeState();
 | |
|       return Span{buffer.Data(), buffer.Length()};
 | |
|     }
 | |
|     MOZ_ASSERT(data.IsArrayBufferView());
 | |
|     const ArrayBufferView& buffer = data.GetAsArrayBufferView();
 | |
|     buffer.ComputeState();
 | |
|     return Span{buffer.Data(), buffer.Length()};
 | |
|   }();
 | |
| 
 | |
|   // Try to write first, and only enqueue data if we were already blocked
 | |
|   // or the write didn't write it all.  This avoids allocations and copies
 | |
|   // in common cases.
 | |
|   MOZ_ASSERT(!mPromise);
 | |
|   MOZ_ASSERT(mWritten == 0);
 | |
|   uint32_t written = 0;
 | |
|   nsresult rv = mOutput->Write(mozilla::AsChars(dataSpan).Elements(),
 | |
|                                dataSpan.Length(), &written);
 | |
|   if (NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) {
 | |
|     promise->MaybeRejectWithAbortError("error writing data");
 | |
|     return promise.forget();
 | |
|   }
 | |
|   if (NS_SUCCEEDED(rv)) {
 | |
|     if (written == dataSpan.Length()) {
 | |
|       promise->MaybeResolveWithUndefined();
 | |
|       return promise.forget();
 | |
|     }
 | |
|     dataSpan = dataSpan.From(written);
 | |
|   }
 | |
| 
 | |
|   auto buffer = Buffer<uint8_t>::CopyFrom(dataSpan);
 | |
|   if (buffer.isNothing()) {
 | |
|     promise->MaybeReject(NS_ERROR_OUT_OF_MEMORY);
 | |
|     return promise.forget();
 | |
|   }
 | |
|   mData = std::move(buffer);
 | |
|   mPromise = promise;
 | |
| 
 | |
|   nsCOMPtr<nsIEventTarget> target = mozilla::GetCurrentSerialEventTarget();
 | |
|   rv = mOutput->AsyncWait(this, 0, 0, target);
 | |
|   if (NS_FAILED(rv)) {
 | |
|     ClearData();
 | |
|     promise->MaybeRejectWithUnknownError("error waiting to write data");
 | |
|   }
 | |
|   return promise.forget();
 | |
| }
 | |
| 
 | |
| already_AddRefed<Promise> WritableStreamToOutput::AbortCallbackImpl(
 | |
|     JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
 | |
|     ErrorResult& aRv) {
 | |
|   // https://streams.spec.whatwg.org/#writablestream-set-up
 | |
|   // Step 3. Let abortAlgorithmWrapper be an algorithm that runs these steps:
 | |
| 
 | |
|   if (aReason.WasPassed() && aReason.Value().isObject()) {
 | |
|     JS::Rooted<JSObject*> obj(aCx, &aReason.Value().toObject());
 | |
|     RefPtr<WebTransportError> error;
 | |
|     UnwrapObject<prototypes::id::WebTransportError, WebTransportError>(
 | |
|         obj, error, nullptr);
 | |
|     if (error) {
 | |
|       mOutput->CloseWithStatus(net::GetNSResultFromWebTransportError(
 | |
|           error->GetStreamErrorCode().Value()));
 | |
|       return nullptr;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // XXX The close or rather a dedicated abort should be async. For now we have
 | |
|   // to always fall back to the Step 3.3 below.
 | |
|   // XXX how do we know this stream is used by webtransport?
 | |
|   mOutput->CloseWithStatus(NS_ERROR_WEBTRANSPORT_CODE_BASE);
 | |
| 
 | |
|   // Step 3.3. Return a promise resolved with undefined.
 | |
|   // Wrapper handles this
 | |
|   return nullptr;
 | |
| }
 | |
| 
 | |
| void WritableStreamToOutput::ReleaseObjects() { mOutput->Close(); }
 | 
