forked from mirrors/gecko-dev
		
	 46156fb100
			
		
	
	
		46156fb100
		
	
	
	
	
		
			
			At the same time, update mdns_service to socket2 0.4 to avoid a duplicate. Differential Revision: https://phabricator.services.mozilla.com/D147479
		
			
				
	
	
		
			818 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			818 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| #![allow(clippy::blacklisted_name)]
 | |
| #![warn(rust_2018_idioms)]
 | |
| #![cfg(feature = "full")]
 | |
| 
 | |
| use tokio::time::{self, sleep, sleep_until, Duration, Instant};
 | |
| use tokio_test::{assert_pending, assert_ready, task};
 | |
| use tokio_util::time::DelayQueue;
 | |
| 
 | |
| macro_rules! poll {
 | |
|     ($queue:ident) => {
 | |
|         $queue.enter(|cx, mut queue| queue.poll_expired(cx))
 | |
|     };
 | |
| }
 | |
| 
 | |
| macro_rules! assert_ready_some {
 | |
|     ($e:expr) => {{
 | |
|         match assert_ready!($e) {
 | |
|             Some(v) => v,
 | |
|             None => panic!("None"),
 | |
|         }
 | |
|     }};
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn single_immediate_delay() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let _key = queue.insert_at("foo", Instant::now());
 | |
| 
 | |
|     // Advance time by 1ms to handle thee rounding
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     assert_ready_some!(poll!(queue));
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none())
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn multi_immediate_delays() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let _k = queue.insert_at("1", Instant::now());
 | |
|     let _k = queue.insert_at("2", Instant::now());
 | |
|     let _k = queue.insert_at("3", Instant::now());
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     let mut res = vec![];
 | |
| 
 | |
|     while res.len() < 3 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none());
 | |
| 
 | |
|     res.sort_unstable();
 | |
| 
 | |
|     assert_eq!("1", res[0]);
 | |
|     assert_eq!("2", res[1]);
 | |
|     assert_eq!("3", res[2]);
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn single_short_delay() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let _key = queue.insert_at("foo", Instant::now() + ms(5));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     sleep(ms(5)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue));
 | |
|     assert_eq!(*entry.get_ref(), "foo");
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none());
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn multi_delay_at_start() {
 | |
|     time::pause();
 | |
| 
 | |
|     let long = 262_144 + 9 * 4096;
 | |
|     let delays = &[1000, 2, 234, long, 60, 10];
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     // Setup the delays
 | |
|     for &i in delays {
 | |
|         let _key = queue.insert_at(i, Instant::now() + ms(i));
 | |
|     }
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     let start = Instant::now();
 | |
|     for elapsed in 0..1200 {
 | |
|         println!("elapsed: {:?}", elapsed);
 | |
|         let elapsed = elapsed + 1;
 | |
|         tokio::time::sleep_until(start + ms(elapsed)).await;
 | |
| 
 | |
|         if delays.contains(&elapsed) {
 | |
|             assert!(queue.is_woken());
 | |
|             assert_ready!(poll!(queue));
 | |
|             assert_pending!(poll!(queue));
 | |
|         } else if queue.is_woken() {
 | |
|             let cascade = &[192, 960];
 | |
|             assert!(
 | |
|                 cascade.contains(&elapsed),
 | |
|                 "elapsed={} dt={:?}",
 | |
|                 elapsed,
 | |
|                 Instant::now() - start
 | |
|             );
 | |
| 
 | |
|             assert_pending!(poll!(queue));
 | |
|         }
 | |
|     }
 | |
|     println!("finished multi_delay_start");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn insert_in_past_fires_immediately() {
 | |
|     println!("running insert_in_past_fires_immediately");
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     queue.insert_at("foo", now);
 | |
| 
 | |
|     assert_ready!(poll!(queue));
 | |
|     println!("finished insert_in_past_fires_immediately");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn remove_entry() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let key = queue.insert_at("foo", Instant::now() + ms(5));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     let entry = queue.remove(&key);
 | |
|     assert_eq!(entry.into_inner(), "foo");
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none());
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn reset_entry() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
|     let key = queue.insert_at("foo", now + ms(5));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     queue.reset_at(&key, now + ms(10));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(7)).await;
 | |
| 
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(3)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue));
 | |
|     assert_eq!(*entry.get_ref(), "foo");
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none())
 | |
| }
 | |
| 
 | |
| // Reproduces tokio-rs/tokio#849.
 | |
| #[tokio::test]
 | |
| async fn reset_much_later() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     let key = queue.insert_at("foo", now + ms(200));
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(3)).await;
 | |
| 
 | |
|     queue.reset_at(&key, now + ms(10));
 | |
| 
 | |
|     sleep(ms(20)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| }
 | |
| 
 | |
| // Reproduces tokio-rs/tokio#849.
 | |
| #[tokio::test]
 | |
| async fn reset_twice() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     let key = queue.insert_at("foo", now + ms(200));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(3)).await;
 | |
| 
 | |
|     queue.reset_at(&key, now + ms(50));
 | |
| 
 | |
|     sleep(ms(20)).await;
 | |
| 
 | |
|     queue.reset_at(&key, now + ms(40));
 | |
| 
 | |
|     sleep(ms(20)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| }
 | |
| 
 | |
| /// Regression test: Given an entry inserted with a deadline in the past, so
 | |
| /// that it is placed directly on the expired queue, reset the entry to a
 | |
| /// deadline in the future. Validate that this leaves the entry and queue in an
 | |
| /// internally consistent state by running an additional reset on the entry
 | |
| /// before polling it to completion.
 | |
| #[tokio::test]
 | |
| async fn repeatedly_reset_entry_inserted_as_expired() {
 | |
|     time::pause();
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let key = queue.insert_at("foo", now - ms(100));
 | |
| 
 | |
|     queue.reset_at(&key, now + ms(100));
 | |
|     queue.reset_at(&key, now + ms(50));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     time::sleep_until(now + ms(60)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "foo");
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none());
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn remove_expired_item() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = DelayQueue::new();
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     let key = queue.insert_at("foo", now);
 | |
| 
 | |
|     let entry = queue.remove(&key);
 | |
|     assert_eq!(entry.into_inner(), "foo");
 | |
| }
 | |
| 
 | |
| /// Regression test: it should be possible to remove entries which fall in the
 | |
| /// 0th slot of the internal timer wheel — that is, entries whose expiration
 | |
| /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
 | |
| /// is equal to the wheel's current elapsed time.
 | |
| #[tokio::test]
 | |
| async fn remove_at_timer_wheel_threshold() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let key1 = queue.insert_at("foo", now + ms(64));
 | |
|     let key2 = queue.insert_at("bar", now + ms(64));
 | |
| 
 | |
|     sleep(ms(80)).await;
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
| 
 | |
|     match entry {
 | |
|         "foo" => {
 | |
|             let entry = queue.remove(&key2).into_inner();
 | |
|             assert_eq!(entry, "bar");
 | |
|         }
 | |
|         "bar" => {
 | |
|             let entry = queue.remove(&key1).into_inner();
 | |
|             assert_eq!(entry, "foo");
 | |
|         }
 | |
|         other => panic!("other: {:?}", other),
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn expires_before_last_insert() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo", now + ms(10_000));
 | |
| 
 | |
|     // Delay should be set to 8.192s here.
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     // Delay should be set to the delay of the new item here
 | |
|     queue.insert_at("bar", now + ms(600));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(600)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "bar");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn multi_reset() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let one = queue.insert_at("one", now + ms(200));
 | |
|     let two = queue.insert_at("two", now + ms(250));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     queue.reset_at(&one, now + ms(300));
 | |
|     queue.reset_at(&two, now + ms(350));
 | |
|     queue.reset_at(&one, now + ms(400));
 | |
| 
 | |
|     sleep(ms(310)).await;
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(50)).await;
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue));
 | |
|     assert_eq!(*entry.get_ref(), "two");
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(50)).await;
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue));
 | |
|     assert_eq!(*entry.get_ref(), "one");
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none())
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn expire_first_key_when_reset_to_expire_earlier() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let one = queue.insert_at("one", now + ms(200));
 | |
|     queue.insert_at("two", now + ms(250));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     queue.reset_at(&one, now + ms(100));
 | |
| 
 | |
|     sleep(ms(100)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "one");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn expire_second_key_when_reset_to_expire_earlier() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("one", now + ms(200));
 | |
|     let two = queue.insert_at("two", now + ms(250));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     queue.reset_at(&two, now + ms(100));
 | |
| 
 | |
|     sleep(ms(100)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "two");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn reset_first_expiring_item_to_expire_later() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let one = queue.insert_at("one", now + ms(200));
 | |
|     let _two = queue.insert_at("two", now + ms(250));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     queue.reset_at(&one, now + ms(300));
 | |
|     sleep(ms(250)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "two");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn insert_before_first_after_poll() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let _one = queue.insert_at("one", now + ms(200));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     let _two = queue.insert_at("two", now + ms(100));
 | |
| 
 | |
|     sleep(ms(99)).await;
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "two");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn insert_after_ready_poll() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("1", now + ms(100));
 | |
|     queue.insert_at("2", now + ms(100));
 | |
|     queue.insert_at("3", now + ms(100));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(100)).await;
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let mut res = vec![];
 | |
| 
 | |
|     while res.len() < 3 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|         queue.insert_at("foo", now + ms(500));
 | |
|     }
 | |
| 
 | |
|     res.sort_unstable();
 | |
| 
 | |
|     assert_eq!("1", res[0]);
 | |
|     assert_eq!("2", res[1]);
 | |
|     assert_eq!("3", res[2]);
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn reset_later_after_slot_starts() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let foo = queue.insert_at("foo", now + ms(100));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep_until(now + Duration::from_millis(80)).await;
 | |
| 
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     // At this point the queue hasn't been polled, so `elapsed` on the wheel
 | |
|     // for the queue is still at 0 and hence the 1ms resolution slots cover
 | |
|     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
 | |
|     // the [64-128) slot.  As the queue knows that the first entry is within
 | |
|     // that slot, but doesn't know when, it must wake immediately to advance
 | |
|     // the wheel.
 | |
|     queue.reset_at(&foo, now + ms(120));
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep_until(now + Duration::from_millis(119)).await;
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "foo");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn reset_inserted_expired() {
 | |
|     time::pause();
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let key = queue.insert_at("foo", now - ms(100));
 | |
| 
 | |
|     // this causes the panic described in #2473
 | |
|     queue.reset_at(&key, now + ms(100));
 | |
| 
 | |
|     assert_eq!(1, queue.len());
 | |
| 
 | |
|     sleep(ms(200)).await;
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "foo");
 | |
| 
 | |
|     assert_eq!(queue.len(), 0);
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn reset_earlier_after_slot_starts() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let foo = queue.insert_at("foo", now + ms(200));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep_until(now + Duration::from_millis(80)).await;
 | |
| 
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     // At this point the queue hasn't been polled, so `elapsed` on the wheel
 | |
|     // for the queue is still at 0 and hence the 1ms resolution slots cover
 | |
|     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
 | |
|     // the [64-128) slot.  As the queue knows that the first entry is within
 | |
|     // that slot, but doesn't know when, it must wake immediately to advance
 | |
|     // the wheel.
 | |
|     queue.reset_at(&foo, now + ms(120));
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep_until(now + Duration::from_millis(119)).await;
 | |
|     assert!(!queue.is_woken());
 | |
| 
 | |
|     sleep(ms(1)).await;
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "foo");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn insert_in_past_after_poll_fires_immediately() {
 | |
|     time::pause();
 | |
| 
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo", now + ms(200));
 | |
| 
 | |
|     assert_pending!(poll!(queue));
 | |
| 
 | |
|     sleep(ms(80)).await;
 | |
| 
 | |
|     assert!(!queue.is_woken());
 | |
|     queue.insert_at("bar", now + ms(40));
 | |
| 
 | |
|     assert!(queue.is_woken());
 | |
| 
 | |
|     let entry = assert_ready_some!(poll!(queue)).into_inner();
 | |
|     assert_eq!(entry, "bar");
 | |
| }
 | |
| 
 | |
| #[tokio::test]
 | |
| async fn delay_queue_poll_expired_when_empty() {
 | |
|     let mut delay_queue = task::spawn(DelayQueue::new());
 | |
|     let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
 | |
|     assert_pending!(poll!(delay_queue));
 | |
| 
 | |
|     delay_queue.remove(&key);
 | |
|     assert!(assert_ready!(poll!(delay_queue)).is_none());
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| async fn compact_expire_empty() {
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo1", now + ms(10));
 | |
|     queue.insert_at("foo2", now + ms(10));
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     let mut res = vec![];
 | |
|     while res.len() < 2 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     queue.compact();
 | |
| 
 | |
|     assert_eq!(queue.len(), 0);
 | |
|     assert_eq!(queue.capacity(), 0);
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| async fn compact_remove_empty() {
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     let key1 = queue.insert_at("foo1", now + ms(10));
 | |
|     let key2 = queue.insert_at("foo2", now + ms(10));
 | |
| 
 | |
|     queue.remove(&key1);
 | |
|     queue.remove(&key2);
 | |
| 
 | |
|     queue.compact();
 | |
| 
 | |
|     assert_eq!(queue.len(), 0);
 | |
|     assert_eq!(queue.capacity(), 0);
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| // Trigger a re-mapping of keys in the slab due to a `compact` call and
 | |
| // test removal of re-mapped keys
 | |
| async fn compact_remove_remapped_keys() {
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo1", now + ms(10));
 | |
|     queue.insert_at("foo2", now + ms(10));
 | |
| 
 | |
|     // should be assigned indices 3 and 4
 | |
|     let key3 = queue.insert_at("foo3", now + ms(20));
 | |
|     let key4 = queue.insert_at("foo4", now + ms(20));
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     let mut res = vec![];
 | |
|     while res.len() < 2 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     // items corresponding to `foo3` and `foo4` will be assigned
 | |
|     // new indices here
 | |
|     queue.compact();
 | |
| 
 | |
|     queue.insert_at("foo5", now + ms(10));
 | |
| 
 | |
|     // test removal of re-mapped keys
 | |
|     let expired3 = queue.remove(&key3);
 | |
|     let expired4 = queue.remove(&key4);
 | |
| 
 | |
|     assert_eq!(expired3.into_inner(), "foo3");
 | |
|     assert_eq!(expired4.into_inner(), "foo4");
 | |
| 
 | |
|     queue.compact();
 | |
|     assert_eq!(queue.len(), 1);
 | |
|     assert_eq!(queue.capacity(), 1);
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| async fn compact_change_deadline() {
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let mut now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo1", now + ms(10));
 | |
|     queue.insert_at("foo2", now + ms(10));
 | |
| 
 | |
|     // should be assigned indices 3 and 4
 | |
|     queue.insert_at("foo3", now + ms(20));
 | |
|     let key4 = queue.insert_at("foo4", now + ms(20));
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     let mut res = vec![];
 | |
|     while res.len() < 2 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     // items corresponding to `foo3` and `foo4` should be assigned
 | |
|     // new indices
 | |
|     queue.compact();
 | |
| 
 | |
|     now = Instant::now();
 | |
| 
 | |
|     queue.insert_at("foo5", now + ms(10));
 | |
|     let key6 = queue.insert_at("foo6", now + ms(10));
 | |
| 
 | |
|     queue.reset_at(&key4, now + ms(20));
 | |
|     queue.reset_at(&key6, now + ms(20));
 | |
| 
 | |
|     // foo3 and foo5 will expire
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     while res.len() < 4 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
| 
 | |
|     while res.len() < 6 {
 | |
|         let entry = assert_ready_some!(poll!(queue));
 | |
|         res.push(entry.into_inner());
 | |
|     }
 | |
| 
 | |
|     let entry = assert_ready!(poll!(queue));
 | |
|     assert!(entry.is_none());
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| async fn remove_after_compact() {
 | |
|     let now = Instant::now();
 | |
|     let mut queue = DelayQueue::new();
 | |
| 
 | |
|     let foo_key = queue.insert_at("foo", now + ms(10));
 | |
|     queue.insert_at("bar", now + ms(20));
 | |
|     queue.remove(&foo_key);
 | |
|     queue.compact();
 | |
| 
 | |
|     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
 | |
|         queue.remove(&foo_key);
 | |
|     }));
 | |
|     assert!(panic.is_err());
 | |
| }
 | |
| 
 | |
| #[tokio::test(start_paused = true)]
 | |
| async fn remove_after_compact_poll() {
 | |
|     let now = Instant::now();
 | |
|     let mut queue = task::spawn(DelayQueue::new());
 | |
| 
 | |
|     let foo_key = queue.insert_at("foo", now + ms(10));
 | |
|     queue.insert_at("bar", now + ms(20));
 | |
| 
 | |
|     sleep(ms(10)).await;
 | |
|     assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);
 | |
| 
 | |
|     queue.compact();
 | |
| 
 | |
|     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
 | |
|         queue.remove(&foo_key);
 | |
|     }));
 | |
|     assert!(panic.is_err());
 | |
| }
 | |
| 
 | |
| fn ms(n: u64) -> Duration {
 | |
|     Duration::from_millis(n)
 | |
| }
 |