fune/xpcom/io/nsMultiplexInputStream.cpp
Nika Layzell 3b40268cc1 Bug 1818305 - Part 2: Add a streamStatus method to nsIInputStream, r=xpcom-reviewers,necko-reviewers,geckoview-reviewers,valentin,jesup,m_kato,mccr8
This is semantically similar to the existing available() method, however will
not block, and doesn't need to do the work to actually determine the number of
available bytes.

As part of this patch, I also fixed one available() implementation which was
incorrectly throwing NS_BASE_STREAM_WOULD_BLOCK.

Differential Revision: https://phabricator.services.mozilla.com/D170697
2023-03-15 19:52:34 +00:00

1557 lines
45 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* The multiplex stream concatenates a list of input streams into a single
* stream.
*/
#include "mozilla/Attributes.h"
#include "mozilla/CheckedInt.h"
#include "mozilla/MathAlgorithms.h"
#include "mozilla/Mutex.h"
#include "base/basictypes.h"
#include "nsMultiplexInputStream.h"
#include "nsIBufferedStreams.h"
#include "nsICloneableInputStream.h"
#include "nsIMultiplexInputStream.h"
#include "nsISeekableStream.h"
#include "nsCOMPtr.h"
#include "nsCOMArray.h"
#include "nsIClassInfoImpl.h"
#include "nsIIPCSerializableInputStream.h"
#include "mozilla/ipc/InputStreamUtils.h"
#include "nsIAsyncInputStream.h"
#include "nsIInputStreamLength.h"
#include "nsNetUtil.h"
#include "nsStreamUtils.h"
using namespace mozilla;
using namespace mozilla::ipc;
using mozilla::DeprecatedAbs;
class nsMultiplexInputStream final : public nsIMultiplexInputStream,
public nsISeekableStream,
public nsIIPCSerializableInputStream,
public nsICloneableInputStream,
public nsIAsyncInputStream,
public nsIInputStreamCallback,
public nsIInputStreamLength,
public nsIAsyncInputStreamLength {
public:
nsMultiplexInputStream();
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIINPUTSTREAM
NS_DECL_NSIMULTIPLEXINPUTSTREAM
NS_DECL_NSISEEKABLESTREAM
NS_DECL_NSITELLABLESTREAM
NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
NS_DECL_NSICLONEABLEINPUTSTREAM
NS_DECL_NSIASYNCINPUTSTREAM
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIINPUTSTREAMLENGTH
NS_DECL_NSIASYNCINPUTSTREAMLENGTH
// This is used for nsIAsyncInputStream::AsyncWait
void AsyncWaitCompleted();
// This is used for nsIAsyncInputStreamLength::AsyncLengthWait
void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock)
MOZ_REQUIRES(mLock);
struct StreamData {
nsresult Initialize(nsIInputStream* aOriginalStream) {
mCurrentPos = 0;
mOriginalStream = aOriginalStream;
mBufferedStream = aOriginalStream;
if (!NS_InputStreamIsBuffered(mBufferedStream)) {
nsCOMPtr<nsIInputStream> bufferedStream;
nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
mBufferedStream.forget(), 4096);
NS_ENSURE_SUCCESS(rv, rv);
mBufferedStream = bufferedStream;
}
mAsyncStream = do_QueryInterface(mBufferedStream);
mSeekableStream = do_QueryInterface(mBufferedStream);
return NS_OK;
}
nsCOMPtr<nsIInputStream> mOriginalStream;
// Equal to mOriginalStream or a wrap around the original stream to make it
// buffered.
nsCOMPtr<nsIInputStream> mBufferedStream;
// This can be null.
nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
// This can be null.
nsCOMPtr<nsISeekableStream> mSeekableStream;
uint64_t mCurrentPos;
};
Mutex& GetLock() MOZ_RETURN_CAPABILITY(mLock) { return mLock; }
private:
~nsMultiplexInputStream() = default;
void NextStream() MOZ_REQUIRES(mLock) {
++mCurrentStream;
mStartedReadingCurrent = false;
}
nsresult AsyncWaitInternal();
// This method updates mSeekableStreams, mTellableStreams,
// mIPCSerializableStreams and mCloneableStreams values.
void UpdateQIMap(StreamData& aStream) MOZ_REQUIRES(mLock);
struct MOZ_STACK_CLASS ReadSegmentsState {
nsCOMPtr<nsIInputStream> mThisStream;
uint32_t mOffset;
nsWriteSegmentFun mWriter;
void* mClosure;
bool mDone;
};
void SerializedComplexityInternal(uint32_t aMaxSize, uint32_t* aSizeUsed,
uint32_t* aPipes, uint32_t* aTransferables,
bool* aSerializeAsPipe);
static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
const char* aFromRawSegment, uint32_t aToOffset,
uint32_t aCount, uint32_t* aWriteCount);
bool IsSeekable() const;
bool IsIPCSerializable() const;
bool IsCloneable() const;
bool IsAsyncInputStream() const;
bool IsInputStreamLength() const;
bool IsAsyncInputStreamLength() const;
Mutex mLock; // Protects access to all data members.
nsTArray<StreamData> mStreams MOZ_GUARDED_BY(mLock);
uint32_t mCurrentStream MOZ_GUARDED_BY(mLock);
bool mStartedReadingCurrent MOZ_GUARDED_BY(mLock);
nsresult mStatus MOZ_GUARDED_BY(mLock);
nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback MOZ_GUARDED_BY(mLock);
uint32_t mAsyncWaitFlags MOZ_GUARDED_BY(mLock);
uint32_t mAsyncWaitRequestedCount MOZ_GUARDED_BY(mLock);
nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget MOZ_GUARDED_BY(mLock);
nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback
MOZ_GUARDED_BY(mLock);
class AsyncWaitLengthHelper;
RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper MOZ_GUARDED_BY(mLock);
uint32_t mSeekableStreams MOZ_GUARDED_BY(mLock);
uint32_t mIPCSerializableStreams MOZ_GUARDED_BY(mLock);
uint32_t mCloneableStreams MOZ_GUARDED_BY(mLock);
// These are Atomics so that we can check them in QueryInterface without
// taking a lock (to look at mStreams.Length() and the numbers above)
// With no streams added yet, all of these are possible
Atomic<bool, Relaxed> mIsSeekableStream{true};
Atomic<bool, Relaxed> mIsIPCSerializableStream{true};
Atomic<bool, Relaxed> mIsCloneableStream{true};
Atomic<bool, Relaxed> mIsAsyncInputStream{false};
Atomic<bool, Relaxed> mIsInputStreamLength{false};
Atomic<bool, Relaxed> mIsAsyncInputStreamLength{false};
};
NS_IMPL_ADDREF(nsMultiplexInputStream)
NS_IMPL_RELEASE(nsMultiplexInputStream)
NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
NS_MULTIPLEXINPUTSTREAM_CID)
NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
NS_INTERFACE_MAP_ENTRY(nsIInputStream)
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
NS_INTERFACE_MAP_ENTRY(nsITellableStream)
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
IsIPCSerializable())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, IsCloneable())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, IsAsyncInputStream())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
IsAsyncInputStream())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
IsInputStreamLength())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
IsAsyncInputStreamLength())
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
NS_INTERFACE_MAP_END
NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream, nsIMultiplexInputStream,
nsIInputStream, nsISeekableStream,
nsITellableStream)
static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
uint64_t* aResult) {
nsresult rv = aStream.mBufferedStream->Available(aResult);
if (rv == NS_BASE_STREAM_CLOSED) {
// Blindly seek to the current position if Available() returns
// NS_BASE_STREAM_CLOSED.
// If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
// Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
if (aStream.mSeekableStream) {
nsresult rvSeek =
aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
if (NS_SUCCEEDED(rvSeek)) {
rv = aStream.mBufferedStream->Available(aResult);
}
}
}
return rv;
}
nsMultiplexInputStream::nsMultiplexInputStream()
: mLock("nsMultiplexInputStream lock"),
mCurrentStream(0),
mStartedReadingCurrent(false),
mStatus(NS_OK),
mAsyncWaitFlags(0),
mAsyncWaitRequestedCount(0),
mSeekableStreams(0),
mIPCSerializableStreams(0),
mCloneableStreams(0) {}
NS_IMETHODIMP
nsMultiplexInputStream::GetCount(uint32_t* aCount) {
MutexAutoLock lock(mLock);
*aCount = mStreams.Length();
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::AppendStream(nsIInputStream* aStream) {
MutexAutoLock lock(mLock);
StreamData* streamData = mStreams.AppendElement(fallible);
if (NS_WARN_IF(!streamData)) {
return NS_ERROR_OUT_OF_MEMORY;
}
nsresult rv = streamData->Initialize(aStream);
NS_ENSURE_SUCCESS(rv, rv);
UpdateQIMap(*streamData);
if (mStatus == NS_BASE_STREAM_CLOSED) {
// We were closed, but now we have more data to read.
mStatus = NS_OK;
}
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult) {
MutexAutoLock lock(mLock);
if (aIndex >= mStreams.Length()) {
return NS_ERROR_NOT_AVAILABLE;
}
StreamData& streamData = mStreams.ElementAt(aIndex);
nsCOMPtr<nsIInputStream> stream = streamData.mOriginalStream;
stream.forget(aResult);
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::Close() {
nsTArray<nsCOMPtr<nsIInputStream>> streams;
// Let's take a copy of the streams becuase, calling close() it could trigger
// a nsIInputStreamCallback immediately and we don't want to create a deadlock
// with mutex.
{
MutexAutoLock lock(mLock);
uint32_t len = mStreams.Length();
for (uint32_t i = 0; i < len; ++i) {
if (NS_WARN_IF(
!streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
mStatus = NS_BASE_STREAM_CLOSED;
return NS_ERROR_OUT_OF_MEMORY;
}
}
mStatus = NS_BASE_STREAM_CLOSED;
}
nsresult rv = NS_OK;
uint32_t len = streams.Length();
for (uint32_t i = 0; i < len; ++i) {
nsresult rv2 = streams[i]->Close();
// We still want to close all streams, but we should return an error
if (NS_FAILED(rv2)) {
rv = rv2;
}
}
return rv;
}
NS_IMETHODIMP
nsMultiplexInputStream::Available(uint64_t* aResult) {
*aResult = 0;
MutexAutoLock lock(mLock);
if (NS_FAILED(mStatus)) {
return mStatus;
}
uint64_t avail = 0;
nsresult rv = NS_BASE_STREAM_CLOSED;
uint32_t len = mStreams.Length();
for (uint32_t i = mCurrentStream; i < len; i++) {
uint64_t streamAvail;
rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
if (rv == NS_BASE_STREAM_CLOSED) {
// If a stream is closed, we continue with the next one.
// If this is the current stream we move to the following stream.
if (mCurrentStream == i) {
NextStream();
}
// If this is the last stream, we want to return this error code.
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv;
return mStatus;
}
// If the current stream is async, we have to return what we have so far
// without processing the following streams. This is needed because
// ::Available should return only what is currently available. In case of an
// nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
if (mStreams[i].mAsyncStream) {
avail += streamAvail;
break;
}
if (streamAvail == 0) {
// Nothing to read for this stream. Let's move to the next one.
continue;
}
avail += streamAvail;
}
// We still have something to read. We don't want to return an error code yet.
if (avail) {
*aResult = avail;
return NS_OK;
}
// Let's propagate the last error message.
mStatus = rv;
return rv;
}
NS_IMETHODIMP
nsMultiplexInputStream::StreamStatus() {
MutexAutoLock lock(mLock);
return mStatus;
}
NS_IMETHODIMP
nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
MutexAutoLock lock(mLock);
// It is tempting to implement this method in terms of ReadSegments, but
// that would prevent this class from being used with streams that only
// implement Read (e.g., file streams).
*aResult = 0;
if (mStatus == NS_BASE_STREAM_CLOSED) {
return NS_OK;
}
if (NS_FAILED(mStatus)) {
return mStatus;
}
nsresult rv = NS_OK;
uint32_t len = mStreams.Length();
while (mCurrentStream < len && aCount) {
uint32_t read;
rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
// (This is a bug in those stream implementations)
if (rv == NS_BASE_STREAM_CLOSED) {
MOZ_ASSERT_UNREACHABLE(
"Input stream's Read method returned "
"NS_BASE_STREAM_CLOSED");
rv = NS_OK;
read = 0;
} else if (NS_FAILED(rv)) {
break;
}
if (read == 0) {
NextStream();
} else {
NS_ASSERTION(aCount >= read, "Read more than requested");
*aResult += read;
aCount -= read;
aBuf += read;
mStartedReadingCurrent = true;
mStreams[mCurrentStream].mCurrentPos += read;
}
}
return *aResult ? NS_OK : rv;
}
NS_IMETHODIMP
nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
uint32_t aCount, uint32_t* aResult) {
MutexAutoLock lock(mLock);
if (mStatus == NS_BASE_STREAM_CLOSED) {
*aResult = 0;
return NS_OK;
}
if (NS_FAILED(mStatus)) {
return mStatus;
}
NS_ASSERTION(aWriter, "missing aWriter");
nsresult rv = NS_OK;
ReadSegmentsState state;
state.mThisStream = this;
state.mOffset = 0;
state.mWriter = aWriter;
state.mClosure = aClosure;
state.mDone = false;
uint32_t len = mStreams.Length();
while (mCurrentStream < len && aCount) {
uint32_t read;
rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
ReadSegCb, &state, aCount, &read);
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
// (This is a bug in those stream implementations)
if (rv == NS_BASE_STREAM_CLOSED) {
MOZ_ASSERT_UNREACHABLE(
"Input stream's Read method returned "
"NS_BASE_STREAM_CLOSED");
rv = NS_OK;
read = 0;
}
// if |aWriter| decided to stop reading segments...
if (state.mDone || NS_FAILED(rv)) {
break;
}
// if stream is empty, then advance to the next stream.
if (read == 0) {
NextStream();
} else {
NS_ASSERTION(aCount >= read, "Read more than requested");
state.mOffset += read;
aCount -= read;
mStartedReadingCurrent = true;
mStreams[mCurrentStream].mCurrentPos += read;
}
}
// if we successfully read some data, then this call succeeded.
*aResult = state.mOffset;
return state.mOffset ? NS_OK : rv;
}
nsresult nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
const char* aFromRawSegment,
uint32_t aToOffset, uint32_t aCount,
uint32_t* aWriteCount) {
nsresult rv;
ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment,
aToOffset + state->mOffset, aCount, aWriteCount);
if (NS_FAILED(rv)) {
state->mDone = true;
}
return rv;
}
NS_IMETHODIMP
nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking) {
MutexAutoLock lock(mLock);
uint32_t len = mStreams.Length();
if (len == 0) {
// Claim to be non-blocking, since we won't block the caller.
*aNonBlocking = true;
return NS_OK;
}
for (uint32_t i = 0; i < len; ++i) {
nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
// If one is blocking the entire stream becomes blocking.
if (!*aNonBlocking) {
return NS_OK;
}
}
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset) {
MutexAutoLock lock(mLock);
if (NS_FAILED(mStatus)) {
return mStatus;
}
nsresult rv;
uint32_t oldCurrentStream = mCurrentStream;
bool oldStartedReadingCurrent = mStartedReadingCurrent;
if (aWhence == NS_SEEK_SET) {
int64_t remaining = aOffset;
if (aOffset == 0) {
mCurrentStream = 0;
}
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
if (!stream) {
return NS_ERROR_FAILURE;
}
// See if all remaining streams should be rewound
if (remaining == 0) {
if (i < oldCurrentStream ||
(i == oldCurrentStream && oldStartedReadingCurrent)) {
rv = stream->Seek(NS_SEEK_SET, 0);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos = 0;
continue;
} else {
break;
}
}
// Get position in the current stream
int64_t streamPos;
if (i > oldCurrentStream ||
(i == oldCurrentStream && !oldStartedReadingCurrent)) {
streamPos = 0;
} else {
streamPos = mStreams[i].mCurrentPos;
}
// See if we need to seek the current stream forward or backward
if (remaining < streamPos) {
rv = stream->Seek(NS_SEEK_SET, remaining);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos = remaining;
mCurrentStream = i;
mStartedReadingCurrent = remaining != 0;
remaining = 0;
} else if (remaining > streamPos) {
if (i < oldCurrentStream) {
// We're already at end so no need to seek this stream
remaining -= streamPos;
NS_ASSERTION(remaining >= 0, "Remaining invalid");
} else {
uint64_t avail;
rv = AvailableMaybeSeek(mStreams[i], &avail);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
rv = stream->Seek(NS_SEEK_SET, newPos);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos = newPos;
mCurrentStream = i;
mStartedReadingCurrent = true;
remaining -= newPos;
NS_ASSERTION(remaining >= 0, "Remaining invalid");
}
} else {
NS_ASSERTION(remaining == streamPos, "Huh?");
MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
remaining = 0;
mCurrentStream = i;
mStartedReadingCurrent = true;
}
}
return NS_OK;
}
if (aWhence == NS_SEEK_CUR && aOffset > 0) {
int64_t remaining = aOffset;
for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
uint64_t avail;
rv = AvailableMaybeSeek(mStreams[i], &avail);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos += seek;
mCurrentStream = i;
mStartedReadingCurrent = true;
remaining -= seek;
}
return NS_OK;
}
if (aWhence == NS_SEEK_CUR && aOffset < 0) {
int64_t remaining = -aOffset;
for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
int64_t pos = mStreams[i].mCurrentPos;
int64_t seek = XPCOM_MIN(pos, remaining);
rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos -= seek;
mCurrentStream = i;
mStartedReadingCurrent = seek != -pos;
remaining -= seek;
}
return NS_OK;
}
if (aWhence == NS_SEEK_CUR) {
NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
return NS_OK;
}
if (aWhence == NS_SEEK_END) {
if (aOffset > 0) {
return NS_ERROR_INVALID_ARG;
}
int64_t remaining = aOffset;
int32_t i;
for (i = mStreams.Length() - 1; i >= 0; --i) {
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
uint64_t avail;
rv = AvailableMaybeSeek(mStreams[i], &avail);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
int64_t streamLength = avail + mStreams[i].mCurrentPos;
// The seek(END) can be completed in the current stream.
if (streamLength >= DeprecatedAbs(remaining)) {
rv = stream->Seek(NS_SEEK_END, remaining);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos = streamLength + remaining;
mCurrentStream = i;
mStartedReadingCurrent = true;
break;
}
// We are at the beginning of this stream.
rv = stream->Seek(NS_SEEK_SET, 0);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
remaining += streamLength;
mStreams[i].mCurrentPos = 0;
}
// Any other stream must be set to the end.
for (--i; i >= 0; --i) {
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
uint64_t avail;
rv = AvailableMaybeSeek(mStreams[i], &avail);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
int64_t streamLength = avail + mStreams[i].mCurrentPos;
rv = stream->Seek(NS_SEEK_END, 0);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mStreams[i].mCurrentPos = streamLength;
}
return NS_OK;
}
// other Seeks not implemented yet
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMultiplexInputStream::Tell(int64_t* aResult) {
MutexAutoLock lock(mLock);
if (NS_FAILED(mStatus)) {
return mStatus;
}
int64_t ret64 = 0;
#ifdef DEBUG
bool zeroFound = false;
#endif
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
ret64 += mStreams[i].mCurrentPos;
#ifdef DEBUG
// When we see 1 stream with currentPos = 0, all the remaining streams must
// be set to 0 as well.
MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0);
if (mStreams[i].mCurrentPos == 0) {
zeroFound = true;
}
#endif
}
*aResult = ret64;
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; }
NS_IMETHODIMP
nsMultiplexInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
// This class is used to inform nsMultiplexInputStream that it's time to execute
// the asyncWait callback.
class AsyncWaitRunnable final : public DiscardableRunnable {
RefPtr<nsMultiplexInputStream> mStream;
public:
static void Create(nsMultiplexInputStream* aStream,
nsIEventTarget* aEventTarget) {
RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
if (aEventTarget) {
aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
} else {
runnable->Run();
}
}
NS_IMETHOD
Run() override {
mStream->AsyncWaitCompleted();
return NS_OK;
}
private:
explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
: DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream) {
MOZ_ASSERT(aStream);
}
};
NS_IMETHODIMP
nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags, uint32_t aRequestedCount,
nsIEventTarget* aEventTarget) {
{
MutexAutoLock lock(mLock);
// We must execute the callback also when the stream is closed.
if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
return mStatus;
}
if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
mAsyncWaitCallback != aCallback)) {
return NS_ERROR_FAILURE;
}
mAsyncWaitCallback = aCallback;
mAsyncWaitFlags = aFlags;
mAsyncWaitRequestedCount = aRequestedCount;
mAsyncWaitEventTarget = aEventTarget;
}
return AsyncWaitInternal();
}
nsresult nsMultiplexInputStream::AsyncWaitInternal() {
nsCOMPtr<nsIAsyncInputStream> stream;
nsIInputStreamCallback* asyncWaitCallback = nullptr;
uint32_t asyncWaitFlags = 0;
uint32_t asyncWaitRequestedCount = 0;
nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
{
MutexAutoLock lock(mLock);
// Let's take the first async stream if we are not already closed, and if
// it has data to read or if it async.
if (mStatus != NS_BASE_STREAM_CLOSED) {
for (; mCurrentStream < mStreams.Length(); NextStream()) {
stream = mStreams[mCurrentStream].mAsyncStream;
if (stream) {
break;
}
uint64_t avail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
// Nothing to read here. Let's move on.
continue;
}
if (NS_FAILED(rv)) {
return rv;
}
break;
}
}
asyncWaitCallback = mAsyncWaitCallback ? this : nullptr;
asyncWaitFlags = mAsyncWaitFlags;
asyncWaitRequestedCount = mAsyncWaitRequestedCount;
asyncWaitEventTarget = mAsyncWaitEventTarget;
MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
}
// If we are here it's because we are already closed, or if the current stream
// is not async. In both case we have to execute the callback.
if (!stream) {
if (asyncWaitCallback) {
AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
}
return NS_OK;
}
return stream->AsyncWait(asyncWaitCallback, asyncWaitFlags,
asyncWaitRequestedCount, asyncWaitEventTarget);
}
NS_IMETHODIMP
nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
nsCOMPtr<nsIInputStreamCallback> callback;
// When OnInputStreamReady is called, we could be in 2 scenarios:
// a. there is something to read;
// b. the stream is closed.
// But if the stream is closed and it was not the last one, we must proceed
// with the following stream in order to have something to read by the callee.
{
MutexAutoLock lock(mLock);
// The callback has been nullified in the meantime.
if (!mAsyncWaitCallback) {
return NS_OK;
}
if (NS_SUCCEEDED(mStatus)) {
uint64_t avail = 0;
nsresult rv = NS_OK;
// Only check `Available()` if `aStream` is actually the current stream,
// otherwise we'll always want to re-poll, as we got the callback for the
// wrong stream.
if (mCurrentStream < mStreams.Length() &&
aStream == mStreams[mCurrentStream].mAsyncStream) {
rv = aStream->Available(&avail);
}
if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
// This stream is either closed, has no data available, or is not the
// current stream. If it is closed and current, move to the next stream,
// otherwise re-wait on the current stream until it has data available
// or becomes closed.
// Unlike streams not implementing nsIAsyncInputStream, async streams
// cannot use `Available() == 0` to indicate EOF, so we re-poll in that
// situation.
if (NS_FAILED(rv)) {
NextStream();
}
// Unlock and invoke AsyncWaitInternal to wait again. If this succeeds,
// we'll be called again, otherwise fall through and notify.
MutexAutoUnlock unlock(mLock);
if (NS_SUCCEEDED(AsyncWaitInternal())) {
return NS_OK;
}
}
}
mAsyncWaitCallback.swap(callback);
mAsyncWaitEventTarget = nullptr;
}
return callback ? callback->OnInputStreamReady(this) : NS_OK;
}
void nsMultiplexInputStream::AsyncWaitCompleted() {
nsCOMPtr<nsIInputStreamCallback> callback;
{
MutexAutoLock lock(mLock);
// The callback has been nullified in the meantime.
if (!mAsyncWaitCallback) {
return;
}
mAsyncWaitCallback.swap(callback);
mAsyncWaitEventTarget = nullptr;
}
callback->OnInputStreamReady(this);
}
nsresult nsMultiplexInputStreamConstructor(REFNSIID aIID, void** aResult) {
*aResult = nullptr;
RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
return inst->QueryInterface(aIID, aResult);
}
void nsMultiplexInputStream::SerializedComplexity(uint32_t aMaxSize,
uint32_t* aSizeUsed,
uint32_t* aPipes,
uint32_t* aTransferables) {
MutexAutoLock lock(mLock);
bool serializeAsPipe = false;
SerializedComplexityInternal(aMaxSize, aSizeUsed, aPipes, aTransferables,
&serializeAsPipe);
}
void nsMultiplexInputStream::SerializedComplexityInternal(
uint32_t aMaxSize, uint32_t* aSizeUsed, uint32_t* aPipes,
uint32_t* aTransferables, bool* aSerializeAsPipe) {
mLock.AssertCurrentThreadOwns();
CheckedUint32 totalSizeUsed = 0;
CheckedUint32 totalPipes = 0;
CheckedUint32 totalTransferables = 0;
CheckedUint32 maxSize = aMaxSize;
uint32_t streamCount = mStreams.Length();
for (uint32_t index = 0; index < streamCount; index++) {
uint32_t sizeUsed = 0;
uint32_t pipes = 0;
uint32_t transferables = 0;
InputStreamHelper::SerializedComplexity(mStreams[index].mOriginalStream,
maxSize.value(), &sizeUsed, &pipes,
&transferables);
MOZ_ASSERT(maxSize.value() >= sizeUsed);
maxSize -= sizeUsed;
MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
totalSizeUsed += sizeUsed;
MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
totalPipes += pipes;
MOZ_DIAGNOSTIC_ASSERT(totalPipes.isValid());
totalTransferables += transferables;
MOZ_DIAGNOSTIC_ASSERT(totalTransferables.isValid());
}
// If the combination of all streams when serialized independently is
// sufficiently complex, we may choose to serialize it as a pipe to limit the
// complexity of the payload.
if (totalTransferables.value() == 0) {
// If there are no transferables within our serialization, and it would
// contain at least one pipe, serialize the entire payload as a pipe for
// simplicity.
*aSerializeAsPipe = totalSizeUsed.value() > 0 && totalPipes.value() > 0;
} else {
// Otherwise, we may want to still serialize in segments to take advantage
// of the efficiency of serializing transferables. We'll only serialize as a
// pipe if the total attachment count exceeds kMaxAttachmentThreshold.
static constexpr uint32_t kMaxAttachmentThreshold = 8;
CheckedUint32 totalAttachments = totalPipes + totalTransferables;
*aSerializeAsPipe = !totalAttachments.isValid() ||
totalAttachments.value() > kMaxAttachmentThreshold;
}
if (*aSerializeAsPipe) {
NS_WARNING(
nsPrintfCString("Choosing to serialize multiplex stream as a pipe "
"(would be %u bytes, %u pipes, %u transferables)",
totalSizeUsed.value(), totalPipes.value(),
totalTransferables.value())
.get());
*aSizeUsed = 0;
*aPipes = 1;
*aTransferables = 0;
} else {
*aSizeUsed = totalSizeUsed.value();
*aPipes = totalPipes.value();
*aTransferables = totalTransferables.value();
}
}
void nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
uint32_t aMaxSize, uint32_t* aSizeUsed) {
MutexAutoLock lock(mLock);
// Check if we should serialize this stream as a pipe to reduce complexity.
uint32_t dummySizeUsed = 0, dummyPipes = 0, dummyTransferables = 0;
bool serializeAsPipe = false;
SerializedComplexityInternal(aMaxSize, &dummySizeUsed, &dummyPipes,
&dummyTransferables, &serializeAsPipe);
if (serializeAsPipe) {
*aSizeUsed = 0;
MutexAutoUnlock unlock(mLock);
InputStreamHelper::SerializeInputStreamAsPipe(this, aParams);
return;
}
MultiplexInputStreamParams params;
CheckedUint32 totalSizeUsed = 0;
CheckedUint32 maxSize = aMaxSize;
uint32_t streamCount = mStreams.Length();
if (streamCount) {
nsTArray<InputStreamParams>& streams = params.streams();
streams.SetCapacity(streamCount);
for (uint32_t index = 0; index < streamCount; index++) {
uint32_t sizeUsed = 0;
InputStreamHelper::SerializeInputStream(mStreams[index].mOriginalStream,
*streams.AppendElement(),
maxSize.value(), &sizeUsed);
MOZ_ASSERT(maxSize.value() >= sizeUsed);
maxSize -= sizeUsed;
MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
totalSizeUsed += sizeUsed;
MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
}
}
params.currentStream() = mCurrentStream;
params.status() = mStatus;
params.startedReadingCurrent() = mStartedReadingCurrent;
aParams = std::move(params);
MOZ_ASSERT(aSizeUsed);
*aSizeUsed = totalSizeUsed.value();
}
bool nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams) {
if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
NS_ERROR("Received unknown parameters from the other process!");
return false;
}
const MultiplexInputStreamParams& params =
aParams.get_MultiplexInputStreamParams();
const nsTArray<InputStreamParams>& streams = params.streams();
uint32_t streamCount = streams.Length();
for (uint32_t index = 0; index < streamCount; index++) {
nsCOMPtr<nsIInputStream> stream =
InputStreamHelper::DeserializeInputStream(streams[index]);
if (!stream) {
NS_WARNING("Deserialize failed!");
return false;
}
if (NS_FAILED(AppendStream(stream))) {
NS_WARNING("AppendStream failed!");
return false;
}
}
MutexAutoLock lock(mLock);
mCurrentStream = params.currentStream();
mStatus = params.status();
mStartedReadingCurrent = params.startedReadingCurrent();
return true;
}
NS_IMETHODIMP
nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
MutexAutoLock lock(mLock);
// XXXnsm Cloning a multiplex stream which has started reading is not
// permitted right now.
if (mCurrentStream > 0 || mStartedReadingCurrent) {
*aCloneable = false;
return NS_OK;
}
uint32_t len = mStreams.Length();
for (uint32_t i = 0; i < len; ++i) {
nsCOMPtr<nsICloneableInputStream> cis =
do_QueryInterface(mStreams[i].mBufferedStream);
if (!cis || !cis->GetCloneable()) {
*aCloneable = false;
return NS_OK;
}
}
*aCloneable = true;
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::Clone(nsIInputStream** aClone) {
MutexAutoLock lock(mLock);
// XXXnsm Cloning a multiplex stream which has started reading is not
// permitted right now.
if (mCurrentStream > 0 || mStartedReadingCurrent) {
return NS_ERROR_FAILURE;
}
RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
nsresult rv;
uint32_t len = mStreams.Length();
for (uint32_t i = 0; i < len; ++i) {
nsCOMPtr<nsICloneableInputStream> substream =
do_QueryInterface(mStreams[i].mBufferedStream);
if (NS_WARN_IF(!substream)) {
return NS_ERROR_FAILURE;
}
nsCOMPtr<nsIInputStream> clonedSubstream;
rv = substream->Clone(getter_AddRefs(clonedSubstream));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
rv = clone->AppendStream(clonedSubstream);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
}
clone.forget(aClone);
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::Length(int64_t* aLength) {
MutexAutoLock lock(mLock);
if (mCurrentStream > 0 || mStartedReadingCurrent) {
return NS_ERROR_NOT_AVAILABLE;
}
CheckedInt64 length = 0;
nsresult retval = NS_OK;
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIInputStreamLength> substream =
do_QueryInterface(mStreams[i].mBufferedStream);
if (!substream) {
// Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv;
return mStatus;
}
length += streamAvail;
if (!length.isValid()) {
return NS_ERROR_OUT_OF_MEMORY;
}
continue;
}
int64_t size = 0;
nsresult rv = substream->Length(&size);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (rv == NS_ERROR_NOT_AVAILABLE) {
return rv;
}
// If one stream blocks, we all block.
if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
// We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
// to see if there are other streams with length = -1.
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
retval = NS_BASE_STREAM_WOULD_BLOCK;
continue;
}
// If one of the stream doesn't know the size, we all don't know the size.
if (size == -1) {
*aLength = -1;
return NS_OK;
}
length += size;
if (!length.isValid()) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
*aLength = length.value();
return retval;
}
class nsMultiplexInputStream::AsyncWaitLengthHelper final
: public nsIInputStreamLengthCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
AsyncWaitLengthHelper()
: mStreamNotified(false), mLength(0), mNegativeSize(false) {}
bool AddStream(nsIAsyncInputStreamLength* aStream) {
return mPendingStreams.AppendElement(aStream, fallible);
}
bool AddSize(int64_t aSize) {
MOZ_ASSERT(!mNegativeSize);
mLength += aSize;
return mLength.isValid();
}
void NegativeSize() {
MOZ_ASSERT(!mNegativeSize);
mNegativeSize = true;
}
nsresult Proceed(nsMultiplexInputStream* aParentStream,
nsIEventTarget* aEventTarget,
const MutexAutoLock& aProofOfLock) {
MOZ_ASSERT(!mStream);
// If we don't need to wait, let's inform the callback immediately.
if (mPendingStreams.IsEmpty() || mNegativeSize) {
RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
int64_t length = -1;
if (!mNegativeSize && mLength.isValid()) {
length = mLength.value();
}
nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
"AsyncWaitLengthHelper", [parentStream, length]() {
MutexAutoLock lock(parentStream->GetLock());
parentStream->AsyncWaitCompleted(length, lock);
});
return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
}
// Let's store the callback and the parent stream until we have
// notifications from the async length streams.
mStream = aParentStream;
// Let's activate all the pending streams.
for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
}
return NS_OK;
}
NS_IMETHOD
OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
int64_t aLength) override {
MutexAutoLock lock(mStream->GetLock());
MOZ_ASSERT(mPendingStreams.Contains(aStream));
mPendingStreams.RemoveElement(aStream);
// Already notified.
if (mStreamNotified) {
return NS_OK;
}
if (aLength == -1) {
mNegativeSize = true;
} else {
mLength += aLength;
if (!mLength.isValid()) {
mNegativeSize = true;
}
}
// We need to wait.
if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
return NS_OK;
}
// Let's notify the parent stream.
mStreamNotified = true;
mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
return NS_OK;
}
private:
~AsyncWaitLengthHelper() = default;
RefPtr<nsMultiplexInputStream> mStream;
bool mStreamNotified;
CheckedInt64 mLength;
bool mNegativeSize;
nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
};
NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
nsIInputStreamLengthCallback)
NS_IMETHODIMP
nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
nsIEventTarget* aEventTarget) {
if (NS_WARN_IF(!aEventTarget)) {
return NS_ERROR_NULL_POINTER;
}
MutexAutoLock lock(mLock);
if (mCurrentStream > 0 || mStartedReadingCurrent) {
return NS_ERROR_NOT_AVAILABLE;
}
if (!aCallback) {
mAsyncWaitLengthCallback = nullptr;
return NS_OK;
}
// We have a pending operation! Let's use this instead of creating a new one.
if (mAsyncWaitLengthHelper) {
mAsyncWaitLengthCallback = aCallback;
return NS_OK;
}
RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
do_QueryInterface(mStreams[i].mBufferedStream);
if (asyncStream) {
if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
return NS_ERROR_OUT_OF_MEMORY;
}
continue;
}
nsCOMPtr<nsIInputStreamLength> stream =
do_QueryInterface(mStreams[i].mBufferedStream);
if (!stream) {
// Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv;
return mStatus;
}
if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
return NS_ERROR_OUT_OF_MEMORY;
}
continue;
}
int64_t size = 0;
nsresult rv = stream->Length(&size);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
"A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
"it doesn't implement nsIAsyncInputStreamLength.");
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (size == -1) {
helper->NegativeSize();
break;
}
if (NS_WARN_IF(!helper->AddSize(size))) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
nsresult rv = helper->Proceed(this, aEventTarget, lock);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mAsyncWaitLengthHelper = helper;
mAsyncWaitLengthCallback = aCallback;
return NS_OK;
}
void nsMultiplexInputStream::AsyncWaitCompleted(
int64_t aLength, const MutexAutoLock& aProofOfLock) {
mLock.AssertCurrentThreadOwns();
nsCOMPtr<nsIInputStreamLengthCallback> callback;
callback.swap(mAsyncWaitLengthCallback);
mAsyncWaitLengthHelper = nullptr;
// Already canceled.
if (!callback) {
return;
}
MutexAutoUnlock unlock(mLock);
callback->OnInputStreamLengthReady(this, aLength);
}
#define MAYBE_UPDATE_VALUE_REAL(x, y) \
if (y) { \
++x; \
}
#define MAYBE_UPDATE_VALUE(x, y) \
{ \
nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
MAYBE_UPDATE_VALUE_REAL(x, substream) \
}
#define MAYBE_UPDATE_BOOL(x, y) \
if (!x) { \
nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
if (substream) { \
x = true; \
} \
}
void nsMultiplexInputStream::UpdateQIMap(StreamData& aStream) {
auto length = mStreams.Length();
MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
mIsSeekableStream = (mSeekableStreams == length);
MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
mIsIPCSerializableStream = (mIPCSerializableStreams == length);
MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
mIsCloneableStream = (mCloneableStreams == length);
// nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
// substream implements that interface
if (!mIsAsyncInputStream && aStream.mAsyncStream) {
mIsAsyncInputStream = true;
}
MAYBE_UPDATE_BOOL(mIsInputStreamLength, nsIInputStreamLength)
MAYBE_UPDATE_BOOL(mIsAsyncInputStreamLength, nsIAsyncInputStreamLength)
}
#undef MAYBE_UPDATE_VALUE
#undef MAYBE_UPDATE_VALUE_REAL
#undef MAYBE_UPDATE_BOOL
bool nsMultiplexInputStream::IsSeekable() const { return mIsSeekableStream; }
bool nsMultiplexInputStream::IsIPCSerializable() const {
return mIsIPCSerializableStream;
}
bool nsMultiplexInputStream::IsCloneable() const { return mIsCloneableStream; }
bool nsMultiplexInputStream::IsAsyncInputStream() const {
// nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
// substream implements that interface.
return mIsAsyncInputStream;
}
bool nsMultiplexInputStream::IsInputStreamLength() const {
return mIsInputStreamLength;
}
bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
return mIsAsyncInputStreamLength;
}