Bug 1852924 - When pacer returns a small delay, pretend to wait and get the next packet to send, r=mt

Differential Revision: https://phabricator.services.mozilla.com/D191325
This commit is contained in:
Kershaw Chang 2023-11-21 11:49:49 +00:00
parent aae0367a09
commit 8e045b102b
9 changed files with 207 additions and 112 deletions

View file

@ -360,7 +360,8 @@ void UDPSocketParent::Send(const nsTArray<uint8_t>& aData,
}
case UDPSocketAddr::TNetAddr: {
const NetAddr& addr(aAddr.get_NetAddr());
rv = mSocket->SendWithAddress(&addr, aData, &count);
rv = mSocket->SendWithAddress(&addr, aData.Elements(), aData.Length(),
&count);
break;
}
default:

View file

@ -12461,6 +12461,14 @@
mirror: always
rust: true
# It represents the maximum duration that we used to accumulate
# callback timeouts before we set a timer and break out of the loop.
- name: network.http.http3.max_accumlated_time_ms
type: RelaxedAtomicUint32
value: 1
mirror: always
rust: true
# When true, a http request will be upgraded to https when HTTPS RR is
# available.
- name: network.dns.upgrade_with_https_rr

View file

@ -194,7 +194,8 @@ interface nsIUDPSocket : nsISupports
* @return number of bytes written. (0 or length of data)
*/
[noscript] unsigned long sendWithAddress([const] in NetAddrPtr addr,
in Array<uint8_t> data);
[array, size_is(length), const] in uint8_t data,
in unsigned long length);
/**
* sendBinaryStream

View file

@ -1007,7 +1007,8 @@ PendingSend::OnLookupComplete(nsICancelable* request, nsIDNSRecord* aRecord,
NetAddr addr;
if (NS_SUCCEEDED(rec->GetNextAddr(mPort, &addr))) {
uint32_t count;
nsresult rv = mSocket->SendWithAddress(&addr, mData, &count);
nsresult rv = mSocket->SendWithAddress(&addr, mData.Elements(),
mData.Length(), &count);
NS_ENSURE_SUCCESS(rv, rv);
}
@ -1072,7 +1073,7 @@ class SendRequestRunnable : public Runnable {
NS_IMETHODIMP
SendRequestRunnable::Run() {
uint32_t count;
mSocket->SendWithAddress(&mAddr, mData, &count);
mSocket->SendWithAddress(&mAddr, mData.Elements(), mData.Length(), &count);
return NS_OK;
}
@ -1140,13 +1141,12 @@ nsUDPSocket::SendWithAddr(nsINetAddr* aAddr, const nsTArray<uint8_t>& aData,
NetAddr netAddr;
aAddr->GetNetAddr(&netAddr);
return SendWithAddress(&netAddr, aData, _retval);
return SendWithAddress(&netAddr, aData.Elements(), aData.Length(), _retval);
}
NS_IMETHODIMP
nsUDPSocket::SendWithAddress(const NetAddr* aAddr,
const nsTArray<uint8_t>& aData,
uint32_t* _retval) {
nsUDPSocket::SendWithAddress(const NetAddr* aAddr, const uint8_t* aData,
uint32_t aLength, uint32_t* _retval) {
NS_ENSURE_ARG(aAddr);
NS_ENSURE_ARG_POINTER(_retval);
@ -1170,8 +1170,7 @@ nsUDPSocket::SendWithAddress(const NetAddr* aAddr,
return NS_ERROR_FAILURE;
}
int32_t count =
PR_SendTo(mFD, aData.Elements(), sizeof(uint8_t) * aData.Length(), 0,
&prAddr, PR_INTERVAL_NO_WAIT);
PR_SendTo(mFD, aData, aLength, 0, &prAddr, PR_INTERVAL_NO_WAIT);
if (count < 0) {
PRErrorCode code = PR_GetError();
return ErrorAccordingToNSPR(code);
@ -1180,7 +1179,7 @@ nsUDPSocket::SendWithAddress(const NetAddr* aAddr,
*_retval = count;
} else {
FallibleTArray<uint8_t> fallibleArray;
if (!fallibleArray.InsertElementsAt(0, aData, fallible)) {
if (!fallibleArray.AppendElements(aData, aLength, fallible)) {
return NS_ERROR_OUT_OF_MEMORY;
}
@ -1188,7 +1187,7 @@ nsUDPSocket::SendWithAddress(const NetAddr* aAddr,
new SendRequestRunnable(this, *aAddr, std::move(fallibleArray)),
NS_DISPATCH_NORMAL);
NS_ENSURE_SUCCESS(rv, rv);
*_retval = aData.Length();
*_retval = aLength;
}
return NS_OK;
}

View file

@ -71,10 +71,18 @@ Http3Session::Http3Session() {
mThroughCaptivePortal = gHttpHandler->GetThroughCaptivePortal();
}
static nsresult StringAndPortToNetAddr(nsACString& remoteAddrStr,
uint16_t remotePort, NetAddr* netAddr) {
if (NS_FAILED(netAddr->InitFromString(remoteAddrStr, remotePort))) {
return NS_ERROR_FAILURE;
static nsresult RawBytesToNetAddr(uint16_t aFamily, const uint8_t* aRemoteAddr,
uint16_t remotePort, NetAddr* netAddr) {
if (aFamily == AF_INET) {
netAddr->inet.family = AF_INET;
netAddr->inet.port = htons(remotePort);
memcpy(&netAddr->inet.ip, aRemoteAddr, 4);
} else if (aFamily == AF_INET6) {
netAddr->inet6.family = AF_INET6;
netAddr->inet6.port = htons(remotePort);
memcpy(&netAddr->inet6.ip.u8, aRemoteAddr, 16);
} else {
return NS_ERROR_UNEXPECTED;
}
return NS_OK;
@ -133,6 +141,7 @@ nsresult Http3Session::Init(const nsHttpConnectionInfo* aConnInfo,
StaticPrefs::network_http_http3_max_stream_data(),
StaticPrefs::network_http_http3_version_negotiation_enabled(),
mConnInfo->GetWebTransport(), gHttpHandler->Http3QlogDir(), datagramSize,
StaticPrefs::network_http_http3_max_accumlated_time_ms(),
getter_AddRefs(mHttp3Connection));
if (NS_FAILED(rv)) {
return rv;
@ -878,46 +887,46 @@ nsresult Http3Session::ProcessOutput(nsIUDPSocket* socket) {
LOG(("Http3Session::ProcessOutput reader=%p, [this=%p]", mUdpConn.get(),
this));
// Check if we have a packet that could not have been sent in a previous
// iteration or maybe get new packets to send.
while (true) {
nsTArray<uint8_t> packetToSend;
nsAutoCString remoteAddrStr;
uint16_t port = 0;
uint64_t timeout = 0;
if (!mHttp3Connection->ProcessOutput(&remoteAddrStr, &port, packetToSend,
&timeout)) {
SetupTimer(timeout);
break;
}
MOZ_ASSERT(packetToSend.Length());
LOG(
("Http3Session::ProcessOutput sending packet with %u bytes to %s "
"port=%d [this=%p].",
(uint32_t)packetToSend.Length(),
PromiseFlatCString(remoteAddrStr).get(), port, this));
mSocket = socket;
nsresult rv = mHttp3Connection->ProcessOutputAndSend(
this,
[](void* aContext, uint16_t aFamily, const uint8_t* aAddr, uint16_t aPort,
const uint8_t* aData, uint32_t aLength) {
Http3Session* self = (Http3Session*)aContext;
uint32_t written = 0;
NetAddr addr;
if (NS_FAILED(StringAndPortToNetAddr(remoteAddrStr, port, &addr))) {
continue;
}
nsresult rv = socket->SendWithAddress(&addr, packetToSend, &written);
uint32_t written = 0;
NetAddr addr;
if (NS_FAILED(RawBytesToNetAddr(aFamily, aAddr, aPort, &addr))) {
return NS_OK;
}
LOG(("Http3Session::ProcessOutput sending packet rv=%d osError=%d",
static_cast<int32_t>(rv), NS_FAILED(rv) ? PR_GetOSError() : 0));
if (NS_FAILED(rv) && (rv != NS_BASE_STREAM_WOULD_BLOCK)) {
mSocketError = rv;
// If there was an error that is not NS_BASE_STREAM_WOULD_BLOCK
// return from here. We do not need to set a timer, because we
// will close the connection.
return rv;
}
mTotalBytesWritten += packetToSend.Length();
mLastWriteTime = PR_IntervalNow();
}
LOG3(
("Http3Session::ProcessOutput sending packet with %u bytes to %s "
"port=%d [this=%p].",
aLength, addr.ToString().get(), aPort, self));
return NS_OK;
nsresult rv =
self->mSocket->SendWithAddress(&addr, aData, aLength, &written);
LOG(("Http3Session::ProcessOutput sending packet rv=%d osError=%d",
static_cast<int32_t>(rv), NS_FAILED(rv) ? PR_GetOSError() : 0));
if (NS_FAILED(rv) && (rv != NS_BASE_STREAM_WOULD_BLOCK)) {
self->mSocketError = rv;
// If there was an error that is not NS_BASE_STREAM_WOULD_BLOCK
// return from here. We do not need to set a timer, because we
// will close the connection.
return rv;
}
self->mTotalBytesWritten += aLength;
self->mLastWriteTime = PR_IntervalNow();
return NS_OK;
},
[](void* aContext, uint64_t timeout) {
Http3Session* self = (Http3Session*)aContext;
self->SetupTimer(timeout);
});
mSocket = nullptr;
return rv;
}
// This is only called when timer expires.
@ -951,7 +960,8 @@ void Http3Session::SetupTimer(uint64_t aTimeout) {
return;
}
LOG(("Http3Session::SetupTimer to %" PRIu64 "ms [this=%p].", aTimeout, this));
LOG3(
("Http3Session::SetupTimer to %" PRIu64 "ms [this=%p].", aTimeout, this));
// Remember the time when the timer should trigger.
mTimerShouldTrigger =

View file

@ -375,6 +375,10 @@ class Http3Session final : public nsAHttpTransaction, public nsAHttpConnection {
bool mHasWebTransportSession = false;
// When true, we don't add this connection info into the Http/3 excluded list.
bool mDontExclude = false;
// The lifetime of the UDP socket is managed by the HttpConnectionUDP. This
// is only used in Http3Session::ProcessOutput. Using raw pointer here to
// improve performance.
nsIUDPSocket* mSocket;
};
NS_DEFINE_STATIC_IID_ACCESSOR(Http3Session, NS_HTTP3SESSION_IID);

View file

@ -19,12 +19,12 @@ class NeqoHttp3Conn final {
uint64_t aMaxData, uint64_t aMaxStreamData,
bool aVersionNegotiation, bool aWebTransport,
const nsACString& aQlogDir, uint32_t aDatagramSize,
NeqoHttp3Conn** aConn) {
return neqo_http3conn_new(&aOrigin, &aAlpn, &aLocalAddr, &aRemoteAddr,
aMaxTableSize, aMaxBlockedStreams, aMaxData,
aMaxStreamData, aVersionNegotiation,
aWebTransport, &aQlogDir, aDatagramSize,
(const mozilla::net::NeqoHttp3Conn**)aConn);
uint32_t aMaxAccumulatedTime, NeqoHttp3Conn** aConn) {
return neqo_http3conn_new(
&aOrigin, &aAlpn, &aLocalAddr, &aRemoteAddr, aMaxTableSize,
aMaxBlockedStreams, aMaxData, aMaxStreamData, aVersionNegotiation,
aWebTransport, &aQlogDir, aDatagramSize, aMaxAccumulatedTime,
(const mozilla::net::NeqoHttp3Conn**)aConn);
}
void Close(uint64_t aError) { neqo_http3conn_close(this, aError); }
@ -46,11 +46,10 @@ class NeqoHttp3Conn final {
return neqo_http3conn_process_input(this, &aRemoteAddr, &aPacket);
}
bool ProcessOutput(nsACString* aRemoteAddr, uint16_t* aPort,
nsTArray<uint8_t>& aData, uint64_t* aTimeout) {
aData.TruncateLength(0);
return neqo_http3conn_process_output(this, aRemoteAddr, aPort, &aData,
aTimeout);
nsresult ProcessOutputAndSend(void* aContext, SendFunc aSendFunc,
SetTimerFunc aSetTimerFunc) {
return neqo_http3conn_process_output_and_send(this, aContext, aSendFunc,
aSetTimerFunc);
}
nsresult GetEvent(Http3Event* aEvent, nsTArray<uint8_t>& aData) {

View file

@ -20,8 +20,10 @@ use nsstring::*;
use qlog::streamer::QlogStreamer;
use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::{max, min};
use std::convert::TryFrom;
use std::convert::TryInto;
use std::ffi::c_void;
use std::fs::OpenOptions;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
@ -32,7 +34,7 @@ use std::slice;
use std::str;
#[cfg(feature = "fuzzing")]
use std::time::Duration;
use std::time::Instant;
use std::time::{Duration, Instant};
use thin_vec::ThinVec;
use uuid::Uuid;
#[cfg(windows)]
@ -44,6 +46,8 @@ pub struct NeqoHttp3Conn {
conn: Http3Client,
local_addr: SocketAddr,
refcnt: AtomicRefcnt,
last_output_time: Instant,
max_accumlated_time: Duration,
}
// Opaque interface to mozilla::net::NetAddr defined in DNS.h
@ -85,6 +89,21 @@ fn netaddr_to_socket_addr(arg: *const NetAddr) -> Result<SocketAddr, nsresult> {
Err(NS_ERROR_UNEXPECTED)
}
fn get_current_or_last_output_time(last_output_time: &Instant) -> Instant {
max(*last_output_time, Instant::now())
}
type SendFunc = extern "C" fn(
context: *mut c_void,
addr_family: u16,
addr: *const u8,
port: u16,
data: *const u8,
size: u32,
) -> nsresult;
type SetTimerFunc = extern "C" fn(context: *mut c_void, timeout: u64);
impl NeqoHttp3Conn {
fn new(
origin: &nsACString,
@ -99,6 +118,7 @@ impl NeqoHttp3Conn {
webtransport: bool,
qlog_dir: &nsACString,
webtransport_datagram_size: u32,
max_accumlated_time_ms: u32,
) -> Result<RefPtr<NeqoHttp3Conn>, nsresult> {
// Nss init.
init();
@ -212,6 +232,8 @@ impl NeqoHttp3Conn {
conn,
local_addr: local,
refcnt: unsafe { AtomicRefcnt::new() },
last_output_time: Instant::now(),
max_accumlated_time: Duration::from_millis(max_accumlated_time_ms.into()),
}));
unsafe { Ok(RefPtr::from_raw(conn).unwrap()) }
}
@ -255,6 +277,7 @@ pub extern "C" fn neqo_http3conn_new(
webtransport: bool,
qlog_dir: &nsACString,
webtransport_datagram_size: u32,
max_accumlated_time_ms: u32,
result: &mut *const NeqoHttp3Conn,
) -> nsresult {
*result = ptr::null_mut();
@ -272,6 +295,7 @@ pub extern "C" fn neqo_http3conn_new(
webtransport,
qlog_dir,
webtransport_datagram_size,
max_accumlated_time_ms,
) {
Ok(http3_conn) => {
http3_conn.forget(result);
@ -296,52 +320,89 @@ pub unsafe extern "C" fn neqo_http3conn_process_input(
};
conn.conn.process_input(
Datagram::new(remote, conn.local_addr, (*packet).to_vec()),
Instant::now(),
get_current_or_last_output_time(&conn.last_output_time),
);
return NS_OK;
}
/* Process output:
* this may return a packet that needs to be sent or a timeout.
* if it returns a packet the function returns true, otherwise it returns false.
*/
#[no_mangle]
pub extern "C" fn neqo_http3conn_process_output(
pub extern "C" fn neqo_http3conn_process_output_and_send(
conn: &mut NeqoHttp3Conn,
remote_addr: &mut nsACString,
remote_port: &mut u16,
packet: &mut ThinVec<u8>,
timeout: &mut u64,
) -> bool {
match conn.conn.process_output(Instant::now()) {
Output::Datagram(dg) => {
packet.extend_from_slice(&dg);
remote_addr.append(&dg.destination().ip().to_string());
*remote_port = dg.destination().port();
true
}
Output::Callback(to) => {
*timeout = to.as_millis() as u64;
// Necko resolution is in milliseconds whereas neqo resolution
// is in nanoseconds. If we called process_output too soon due
// to this difference, we might do few unnecessary loops until
// we waste the remaining time. To avoid it, we return 1ms when
// the timeout is less than 1ms.
if *timeout == 0 {
*timeout = 1;
context: *mut c_void,
send_func: SendFunc,
set_timer_func: SetTimerFunc,
) -> nsresult {
let now = Instant::now();
if conn.last_output_time > now {
// The timer fired too early, so reschedule it.
// The 1ms of extra delay is not ideal, but this is a fail
set_timer_func(
context,
u64::try_from((conn.last_output_time - now + conn.max_accumlated_time).as_millis())
.unwrap(),
);
return NS_OK;
}
let mut accumulated_time = Duration::from_nanos(0);
loop {
conn.last_output_time = if accumulated_time.is_zero() {
Instant::now()
} else {
now + accumulated_time
};
match conn.conn.process_output(conn.last_output_time) {
Output::Datagram(dg) => {
let rv = match dg.destination().ip() {
IpAddr::V4(v4) => send_func(
context,
u16::try_from(AF_INET).unwrap(),
v4.octets().as_ptr(),
dg.destination().port(),
dg.as_ptr(),
u32::try_from(dg.len()).unwrap(),
),
IpAddr::V6(v6) => send_func(
context,
u16::try_from(AF_INET6).unwrap(),
v6.octets().as_ptr(),
dg.destination().port(),
dg.as_ptr(),
u32::try_from(dg.len()).unwrap(),
),
};
if rv != NS_OK {
return rv;
}
}
Output::Callback(to) => {
let timeout = min(to, Duration::from_nanos(u64::MAX - 1));
accumulated_time += timeout;
if accumulated_time >= conn.max_accumlated_time {
let mut timeout = accumulated_time.as_millis() as u64;
if timeout == 0 {
timeout = 1;
}
set_timer_func(context, timeout);
break;
}
}
Output::None => {
set_timer_func(context, std::u64::MAX);
break;
}
false
}
Output::None => {
*timeout = std::u64::MAX;
false
}
}
NS_OK
}
#[no_mangle]
pub extern "C" fn neqo_http3conn_close(conn: &mut NeqoHttp3Conn, error: u64) {
conn.conn.close(Instant::now(), error, "");
conn.conn.close(
get_current_or_last_output_time(&conn.last_output_time),
error,
"",
);
}
fn is_excluded_header(name: &str) -> bool {
@ -445,7 +506,7 @@ pub extern "C" fn neqo_http3conn_fetch(
}
let priority = Priority::new(urgency, incremental);
match conn.conn.fetch(
Instant::now(),
get_current_or_last_output_time(&conn.last_output_time),
method_tmp,
&(scheme_tmp, host_tmp, path_tmp),
&hdrs,
@ -1046,10 +1107,11 @@ pub unsafe extern "C" fn neqo_http3conn_read_response_data(
fin: &mut bool,
) -> nsresult {
let array = slice::from_raw_parts_mut(buf, len as usize);
match conn
.conn
.read_data(Instant::now(), StreamId::from(stream_id), &mut array[..])
{
match conn.conn.read_data(
get_current_or_last_output_time(&conn.last_output_time),
StreamId::from(stream_id),
&mut array[..],
) {
Ok((amount, fin_recvd)) => {
*read = u32::try_from(amount).unwrap();
*fin = fin_recvd;
@ -1156,7 +1218,10 @@ pub extern "C" fn neqo_http3conn_peer_certificate_info(
#[no_mangle]
pub extern "C" fn neqo_http3conn_authenticated(conn: &mut NeqoHttp3Conn, error: PRErrorCode) {
conn.conn.authenticated(error.into(), Instant::now());
conn.conn.authenticated(
error.into(),
get_current_or_last_output_time(&conn.last_output_time),
);
}
#[no_mangle]
@ -1164,7 +1229,10 @@ pub extern "C" fn neqo_http3conn_set_resumption_token(
conn: &mut NeqoHttp3Conn,
token: &mut ThinVec<u8>,
) {
let _ = conn.conn.enable_resumption(Instant::now(), token);
let _ = conn.conn.enable_resumption(
get_current_or_last_output_time(&conn.last_output_time),
token,
);
}
#[no_mangle]
@ -1247,7 +1315,7 @@ pub extern "C" fn neqo_http3conn_webtransport_create_session(
};
match conn.conn.webtransport_create_session(
Instant::now(),
get_current_or_last_output_time(&conn.last_output_time),
&("https", host_tmp, path_tmp),
&hdrs,
) {

View file

@ -293,7 +293,8 @@ TEST(TestUDPSocket, TestUDPSocketMain)
clientAddr.inet.ip = PR_htonl(127 << 24 | 1);
phase = TEST_SEND_API;
rv = server->SendWithAddress(&clientAddr, data, &count);
rv = server->SendWithAddress(&clientAddr, data.Elements(), data.Length(),
&count);
ASSERT_NS_SUCCEEDED(rv);
EXPECT_EQ(count, sizeof(uint32_t));
@ -322,7 +323,8 @@ TEST(TestUDPSocket, TestUDPSocketMain)
// Send multicast ping
timerCb->mResult = NS_OK;
timer->InitWithCallback(timerCb, MULTICAST_TIMEOUT, nsITimer::TYPE_ONE_SHOT);
rv = client->SendWithAddress(&multicastAddr, data, &count);
rv = client->SendWithAddress(&multicastAddr, data.Elements(), data.Length(),
&count);
ASSERT_NS_SUCCEEDED(rv);
EXPECT_EQ(count, sizeof(uint32_t));
@ -340,7 +342,8 @@ TEST(TestUDPSocket, TestUDPSocketMain)
// Send multicast ping
timerCb->mResult = NS_OK;
timer->InitWithCallback(timerCb, MULTICAST_TIMEOUT, nsITimer::TYPE_ONE_SHOT);
rv = client->SendWithAddress(&multicastAddr, data, &count);
rv = client->SendWithAddress(&multicastAddr, data.Elements(), data.Length(),
&count);
ASSERT_NS_SUCCEEDED(rv);
EXPECT_EQ(count, sizeof(uint32_t));
@ -362,7 +365,8 @@ TEST(TestUDPSocket, TestUDPSocketMain)
// Send multicast ping
timerCb->mResult = NS_OK;
timer->InitWithCallback(timerCb, MULTICAST_TIMEOUT, nsITimer::TYPE_ONE_SHOT);
rv = client->SendWithAddress(&multicastAddr, data, &count);
rv = client->SendWithAddress(&multicastAddr, data.Elements(), data.Length(),
&count);
ASSERT_NS_SUCCEEDED(rv);
EXPECT_EQ(count, sizeof(uint32_t));
@ -384,7 +388,8 @@ TEST(TestUDPSocket, TestUDPSocketMain)
// Send multicast ping
timerCb->mResult = NS_OK;
timer->InitWithCallback(timerCb, MULTICAST_TIMEOUT, nsITimer::TYPE_ONE_SHOT);
rv = client->SendWithAddress(&multicastAddr, data, &count);
rv = client->SendWithAddress(&multicastAddr, data.Elements(), data.Length(),
&count);
ASSERT_NS_SUCCEEDED(rv);
EXPECT_EQ(count, sizeof(uint32_t));