Added intrusive linked list for the waker queue

This commit is contained in:
Emil Fresk 2022-06-20 14:54:17 +02:00
parent dd563e3cee
commit 27b8aca673
6 changed files with 540 additions and 118 deletions

View file

@ -60,7 +60,6 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
#[doc = #doc] #[doc = #doc]
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub mod #m { pub mod #m {
/// Read the current time from this monotonic /// Read the current time from this monotonic
pub fn now() -> <super::super::#m as rtic::Monotonic>::Instant { pub fn now() -> <super::super::#m as rtic::Monotonic>::Instant {
rtic::export::interrupt::free(|_| { rtic::export::interrupt::free(|_| {
@ -73,39 +72,13 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
}) })
} }
fn enqueue_waker(
instant: <super::super::#m as rtic::Monotonic>::Instant,
waker: core::task::Waker
) -> Result<u32, ()> {
unsafe {
rtic::export::interrupt::free(|_| {
let marker = super::super::#tq_marker.get().read();
super::super::#tq_marker.get_mut().write(marker.wrapping_add(1));
let nr = rtic::export::WakerNotReady {
waker,
instant,
marker,
};
let tq = &mut *super::super::#tq.get_mut();
tq.enqueue_waker(
nr,
|| #enable_interrupt,
|| #pend,
(&mut *super::super::#m_ident.get_mut()).as_mut()).map(|_| marker)
})
}
}
/// Delay /// Delay
#[inline(always)] #[inline(always)]
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn delay(duration: <super::super::#m as rtic::Monotonic>::Duration) pub fn delay(duration: <super::super::#m as rtic::Monotonic>::Duration)
-> DelayFuture { -> DelayFuture {
let until = now() + duration; let until = now() + duration;
DelayFuture { until, tq_marker: None } DelayFuture { until, waker_storage: None }
} }
/// Delay future. /// Delay future.
@ -113,11 +86,22 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub struct DelayFuture { pub struct DelayFuture {
until: <super::super::#m as rtic::Monotonic>::Instant, until: <super::super::#m as rtic::Monotonic>::Instant,
tq_marker: Option<u32>, waker_storage: Option<rtic::export::IntrusiveNode<rtic::export::WakerNotReady<super::super::#m>>>,
}
impl Drop for DelayFuture {
fn drop(&mut self) {
if let Some(waker_storage) = &mut self.waker_storage {
rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *super::super::#tq.get_mut();
tq.cancel_waker_marker(waker_storage.val.marker);
});
}
}
} }
impl core::future::Future for DelayFuture { impl core::future::Future for DelayFuture {
type Output = Result<(), ()>; type Output = ();
fn poll( fn poll(
mut self: core::pin::Pin<&mut Self>, mut self: core::pin::Pin<&mut Self>,
@ -125,22 +109,33 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
) -> core::task::Poll<Self::Output> { ) -> core::task::Poll<Self::Output> {
let mut s = self.as_mut(); let mut s = self.as_mut();
let now = now(); let now = now();
let until = s.until;
let is_ws_none = s.waker_storage.is_none();
if now >= s.until { if now >= until {
core::task::Poll::Ready(Ok(())) return core::task::Poll::Ready(());
} else { } else if is_ws_none {
if s.tq_marker.is_some() { rtic::export::interrupt::free(|_| unsafe {
core::task::Poll::Pending let marker = super::super::#tq_marker.get().read();
} else { super::super::#tq_marker.get_mut().write(marker.wrapping_add(1));
match enqueue_waker(s.until, cx.waker().clone()) {
Ok(marker) => { let nr = s.waker_storage.insert(rtic::export::IntrusiveNode::new(rtic::export::WakerNotReady {
s.tq_marker = Some(marker); waker: cx.waker().clone(),
core::task::Poll::Pending instant: until,
}, marker,
Err(()) => core::task::Poll::Ready(Err(())), }));
}
} let tq = &mut *super::super::#tq.get_mut();
tq.enqueue_waker(
core::mem::transmute(nr), // Transmute the reference to static
|| #enable_interrupt,
|| #pend,
(&mut *super::super::#m_ident.get_mut()).as_mut());
});
} }
core::task::Poll::Pending
} }
} }
@ -150,7 +145,18 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
pub struct TimeoutFuture<F: core::future::Future> { pub struct TimeoutFuture<F: core::future::Future> {
future: F, future: F,
until: <super::super::#m as rtic::Monotonic>::Instant, until: <super::super::#m as rtic::Monotonic>::Instant,
tq_marker: Option<u32>, waker_storage: Option<rtic::export::IntrusiveNode<rtic::export::WakerNotReady<super::super::#m>>>,
}
impl<F: core::future::Future> Drop for TimeoutFuture<F> {
fn drop(&mut self) {
if let Some(waker_storage) = &mut self.waker_storage {
rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *super::super::#tq.get_mut();
tq.cancel_waker_marker(waker_storage.val.marker);
});
}
}
} }
/// Timeout after /// Timeout after
@ -164,7 +170,7 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
TimeoutFuture { TimeoutFuture {
future, future,
until, until,
tq_marker: None, waker_storage: None,
} }
} }
@ -178,7 +184,7 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
TimeoutFuture { TimeoutFuture {
future, future,
until: instant, until: instant,
tq_marker: None, waker_storage: None,
} }
} }
@ -186,46 +192,58 @@ pub fn codegen(app: &App, _analysis: &Analysis, _extra: &Extra) -> TokenStream2
where where
F: core::future::Future, F: core::future::Future,
{ {
type Output = Result<Result<F::Output, super::TimeoutError>, ()>; type Output = Result<F::Output, super::TimeoutError>;
fn poll( fn poll(
self: core::pin::Pin<&mut Self>, self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_> cx: &mut core::task::Context<'_>
) -> core::task::Poll<Self::Output> { ) -> core::task::Poll<Self::Output> {
let now = now();
// SAFETY: We don't move the underlying pinned value. // SAFETY: We don't move the underlying pinned value.
let mut s = unsafe { self.get_unchecked_mut() }; let mut s = unsafe { self.get_unchecked_mut() };
let future = unsafe { core::pin::Pin::new_unchecked(&mut s.future) }; let future = unsafe { core::pin::Pin::new_unchecked(&mut s.future) };
let now = now();
let until = s.until;
let is_ws_none = s.waker_storage.is_none();
match future.poll(cx) { match future.poll(cx) {
core::task::Poll::Ready(r) => { core::task::Poll::Ready(r) => {
if let Some(marker) = s.tq_marker { if let Some(waker_storage) = &mut s.waker_storage {
rtic::export::interrupt::free(|_| unsafe { rtic::export::interrupt::free(|_| unsafe {
let tq = &mut *super::super::#tq.get_mut(); let tq = &mut *super::super::#tq.get_mut();
tq.cancel_waker_marker(marker); tq.cancel_waker_marker(waker_storage.val.marker);
}); });
} }
core::task::Poll::Ready(Ok(Ok(r))) return core::task::Poll::Ready(Ok(r));
} }
core::task::Poll::Pending => { core::task::Poll::Pending => {
if now >= s.until { if now >= until {
// Timeout // Timeout
core::task::Poll::Ready(Ok(Err(super::TimeoutError))) return core::task::Poll::Ready(Err(super::TimeoutError));
} else if s.tq_marker.is_none() { } else if is_ws_none {
match enqueue_waker(s.until, cx.waker().clone()) { rtic::export::interrupt::free(|_| unsafe {
Ok(marker) => { let marker = super::super::#tq_marker.get().read();
s.tq_marker = Some(marker); super::super::#tq_marker.get_mut().write(marker.wrapping_add(1));
core::task::Poll::Pending
}, let nr = s.waker_storage.insert(rtic::export::IntrusiveNode::new(rtic::export::WakerNotReady {
Err(()) => core::task::Poll::Ready(Err(())), // TQ full waker: cx.waker().clone(),
} instant: until,
} else { marker,
core::task::Poll::Pending }));
let tq = &mut *super::super::#tq.get_mut();
tq.enqueue_waker(
core::mem::transmute(nr), // Transmute the reference to static
|| #enable_interrupt,
|| #pend,
(&mut *super::super::#m_ident.get_mut()).as_mut());
});
} }
} }
} }
core::task::Poll::Pending
} }
} }
} }

View file

@ -67,13 +67,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
.map(|(_name, task)| task.args.capacity as usize) .map(|(_name, task)| task.args.capacity as usize)
.sum(); .sum();
let n_task = util::capacity_literal(cap); let n_task = util::capacity_literal(cap);
let n_worker: usize = app let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n_task>);
.software_tasks
.iter()
.map(|(_name, task)| task.is_async as usize)
.sum();
let n_worker = util::capacity_literal(n_worker);
let tq_ty = quote!(rtic::export::TimerQueue<#mono_type, #t, #n_task, #n_worker>);
// For future use // For future use
// let doc = format!(" RTIC internal: {}:{}", file!(), line!()); // let doc = format!(" RTIC internal: {}:{}", file!(), line!());
@ -84,7 +78,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
static #tq: rtic::RacyCell<#tq_ty> = rtic::RacyCell::new( static #tq: rtic::RacyCell<#tq_ty> = rtic::RacyCell::new(
rtic::export::TimerQueue { rtic::export::TimerQueue {
task_queue: rtic::export::SortedLinkedList::new_u16(), task_queue: rtic::export::SortedLinkedList::new_u16(),
waker_queue: rtic::export::SortedLinkedList::new_u16(), waker_queue: rtic::export::IntrusiveSortedLinkedList::new(),
} }
); );
)); ));
@ -148,7 +142,7 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
#[no_mangle] #[no_mangle]
#[allow(non_snake_case)] #[allow(non_snake_case)]
unsafe fn #bound_interrupt() { unsafe fn #bound_interrupt() {
while let Some(task_or_waker) = rtic::export::interrupt::free(|_| while let Some((task, index)) = rtic::export::interrupt::free(|_|
if let Some(mono) = (&mut *#m_ident.get_mut()).as_mut() { if let Some(mono) = (&mut *#m_ident.get_mut()).as_mut() {
(&mut *#tq.get_mut()).dequeue(|| #disable_isr, mono) (&mut *#tq.get_mut()).dequeue(|| #disable_isr, mono)
} else { } else {
@ -157,13 +151,8 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
core::hint::unreachable_unchecked() core::hint::unreachable_unchecked()
}) })
{ {
match task_or_waker { match task {
rtic::export::TaskOrWaker::Waker(waker) => waker.wake(), #(#arms)*
rtic::export::TaskOrWaker::Task((task, index)) => {
match task {
#(#arms)*
}
}
} }
} }

View file

@ -1,11 +1,13 @@
#![allow(clippy::inline_always)] #![allow(clippy::inline_always)]
pub use crate::{
sll::{IntrusiveSortedLinkedList, Node as IntrusiveNode},
tq::{TaskNotReady, TimerQueue, WakerNotReady},
};
pub use bare_metal::CriticalSection;
use core::{ use core::{
cell::Cell, cell::Cell,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
}; };
pub use crate::tq::{TaskNotReady, TaskOrWaker, TimerQueue, WakerNotReady};
pub use bare_metal::CriticalSection;
pub use cortex_m::{ pub use cortex_m::{
asm::nop, asm::nop,
asm::wfi, asm::wfi,

View file

@ -52,6 +52,8 @@ pub mod mutex {
#[doc(hidden)] #[doc(hidden)]
pub mod export; pub mod export;
#[doc(hidden)] #[doc(hidden)]
pub mod sll;
#[doc(hidden)]
mod tq; mod tq;
/// Sets the given `interrupt` as pending /// Sets the given `interrupt` as pending

421
src/sll.rs Normal file
View file

@ -0,0 +1,421 @@
//! An intrusive sorted priority linked list, designed for use in `Future`s in RTIC.
use core::cmp::Ordering;
use core::fmt;
use core::marker::PhantomData;
use core::ops::{Deref, DerefMut};
use core::ptr::NonNull;
/// Marker for Min sorted [`IntrusiveSortedLinkedList`].
pub struct Min;
/// Marker for Max sorted [`IntrusiveSortedLinkedList`].
pub struct Max;
/// The linked list kind: min-list or max-list
pub trait Kind: private::Sealed {
#[doc(hidden)]
fn ordering() -> Ordering;
}
impl Kind for Min {
fn ordering() -> Ordering {
Ordering::Less
}
}
impl Kind for Max {
fn ordering() -> Ordering {
Ordering::Greater
}
}
/// Sealed traits
mod private {
pub trait Sealed {}
}
impl private::Sealed for Max {}
impl private::Sealed for Min {}
/// A node in the [`IntrusiveSortedLinkedList`].
pub struct Node<T> {
pub val: T,
next: Option<NonNull<Node<T>>>,
}
impl<T> Node<T> {
pub fn new(val: T) -> Self {
Self { val, next: None }
}
}
/// The linked list.
pub struct IntrusiveSortedLinkedList<'a, T, K> {
head: Option<NonNull<Node<T>>>,
_kind: PhantomData<K>,
_lt: PhantomData<&'a ()>,
}
impl<'a, T, K> fmt::Debug for IntrusiveSortedLinkedList<'a, T, K>
where
T: Ord + core::fmt::Debug,
K: Kind,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut l = f.debug_list();
let mut current = self.head;
while let Some(head) = current {
let head = unsafe { head.as_ref() };
current = head.next;
l.entry(&head.val);
}
l.finish()
}
}
impl<'a, T, K> IntrusiveSortedLinkedList<'a, T, K>
where
T: Ord,
K: Kind,
{
pub const fn new() -> Self {
Self {
head: None,
_kind: PhantomData,
_lt: PhantomData,
}
}
// Push to the list.
pub fn push(&mut self, new: &'a mut Node<T>) {
unsafe {
if let Some(head) = self.head {
if head.as_ref().val.cmp(&new.val) != K::ordering() {
// This is newer than head, replace head
new.next = self.head;
self.head = Some(NonNull::new_unchecked(new));
} else {
// It's not head, search the list for the correct placement
let mut current = head;
while let Some(next) = current.as_ref().next {
if next.as_ref().val.cmp(&new.val) != K::ordering() {
break;
}
current = next;
}
new.next = current.as_ref().next;
current.as_mut().next = Some(NonNull::new_unchecked(new));
}
} else {
// List is empty, place at head
self.head = Some(NonNull::new_unchecked(new))
}
}
}
/// Get an iterator over the sorted list.
pub fn iter(&self) -> Iter<'_, T, K> {
Iter {
_list: self,
index: self.head,
}
}
/// Find an element in the list that can be changed and resorted.
pub fn find_mut<F>(&mut self, mut f: F) -> Option<FindMut<'_, 'a, T, K>>
where
F: FnMut(&T) -> bool,
{
let head = self.head?;
// Special-case, first element
if f(&unsafe { head.as_ref() }.val) {
return Some(FindMut {
is_head: true,
prev_index: None,
index: self.head,
list: self,
maybe_changed: false,
});
}
let mut current = head;
while let Some(next) = unsafe { current.as_ref() }.next {
if f(&unsafe { next.as_ref() }.val) {
return Some(FindMut {
is_head: false,
prev_index: Some(current),
index: Some(next),
list: self,
maybe_changed: false,
});
}
current = next;
}
None
}
/// Peek at the first element.
pub fn peek(&self) -> Option<&T> {
self.head.map(|head| unsafe { &head.as_ref().val })
}
/// Pops the first element in the list.
///
/// Complexity is worst-case `O(1)`.
pub fn pop(&mut self) -> Option<&'a Node<T>> {
if let Some(head) = self.head {
let v = unsafe { head.as_ref() };
self.head = v.next;
Some(v)
} else {
None
}
}
/// Checks if the linked list is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.head.is_none()
}
}
/// Iterator for the linked list.
pub struct Iter<'a, T, K>
where
T: Ord,
K: Kind,
{
_list: &'a IntrusiveSortedLinkedList<'a, T, K>,
index: Option<NonNull<Node<T>>>,
}
impl<'a, T, K> Iterator for Iter<'a, T, K>
where
T: Ord,
K: Kind,
{
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
let index = self.index?;
let node = unsafe { index.as_ref() };
self.index = node.next;
Some(&node.val)
}
}
/// Comes from [`IntrusiveSortedLinkedList::find_mut`].
pub struct FindMut<'a, 'b, T, K>
where
T: Ord + 'b,
K: Kind,
{
list: &'a mut IntrusiveSortedLinkedList<'b, T, K>,
is_head: bool,
prev_index: Option<NonNull<Node<T>>>,
index: Option<NonNull<Node<T>>>,
maybe_changed: bool,
}
impl<'a, 'b, T, K> FindMut<'a, 'b, T, K>
where
T: Ord,
K: Kind,
{
unsafe fn pop_internal(&mut self) -> &'b mut Node<T> {
if self.is_head {
// If it is the head element, we can do a normal pop
let mut head = self.list.head.unwrap_unchecked();
let v = head.as_mut();
self.list.head = v.next;
v
} else {
// Somewhere in the list
let mut prev = self.prev_index.unwrap_unchecked();
let mut curr = self.index.unwrap_unchecked();
// Re-point the previous index
prev.as_mut().next = curr.as_ref().next;
curr.as_mut()
}
}
/// This will pop the element from the list.
///
/// Complexity is worst-case `O(1)`.
#[inline]
pub fn pop(mut self) -> &'b mut Node<T> {
unsafe { self.pop_internal() }
}
/// This will resort the element into the correct position in the list if needed. The resorting
/// will only happen if the element has been accessed mutably.
///
/// Same as calling `drop`.
///
/// Complexity is worst-case `O(N)`.
#[inline]
pub fn finish(self) {
drop(self)
}
}
impl<'b, T, K> Drop for FindMut<'_, 'b, T, K>
where
T: Ord + 'b,
K: Kind,
{
fn drop(&mut self) {
// Only resort the list if the element has changed
if self.maybe_changed {
unsafe {
let val = self.pop_internal();
self.list.push(val);
}
}
}
}
impl<T, K> Deref for FindMut<'_, '_, T, K>
where
T: Ord,
K: Kind,
{
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &self.index.unwrap_unchecked().as_ref().val }
}
}
impl<T, K> DerefMut for FindMut<'_, '_, T, K>
where
T: Ord,
K: Kind,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.maybe_changed = true;
unsafe { &mut self.index.unwrap_unchecked().as_mut().val }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn const_new() {
static mut _V1: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
}
#[test]
fn test_peek() {
let mut ll: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
let mut a = Node { val: 1, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &1);
let mut a = Node { val: 2, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &2);
let mut a = Node { val: 3, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &3);
let mut ll: IntrusiveSortedLinkedList<u32, Min> = IntrusiveSortedLinkedList::new();
let mut a = Node { val: 2, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &2);
let mut a = Node { val: 1, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &1);
let mut a = Node { val: 3, next: None };
ll.push(&mut a);
assert_eq!(ll.peek().unwrap(), &1);
}
#[test]
fn test_empty() {
let ll: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
assert!(ll.is_empty())
}
#[test]
fn test_updating() {
let mut ll: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
let mut a = Node { val: 1, next: None };
ll.push(&mut a);
let mut a = Node { val: 2, next: None };
ll.push(&mut a);
let mut a = Node { val: 3, next: None };
ll.push(&mut a);
let mut find = ll.find_mut(|v| *v == 2).unwrap();
*find += 1000;
find.finish();
assert_eq!(ll.peek().unwrap(), &1002);
let mut find = ll.find_mut(|v| *v == 3).unwrap();
*find += 1000;
find.finish();
assert_eq!(ll.peek().unwrap(), &1003);
// Remove largest element
ll.find_mut(|v| *v == 1003).unwrap().pop();
assert_eq!(ll.peek().unwrap(), &1002);
}
#[test]
fn test_updating_1() {
let mut ll: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
let mut a = Node { val: 1, next: None };
ll.push(&mut a);
let v = ll.pop().unwrap();
assert_eq!(v.val, 1);
}
#[test]
fn test_updating_2() {
let mut ll: IntrusiveSortedLinkedList<u32, Max> = IntrusiveSortedLinkedList::new();
let mut a = Node { val: 1, next: None };
ll.push(&mut a);
let mut find = ll.find_mut(|v| *v == 1).unwrap();
*find += 1000;
find.finish();
assert_eq!(ll.peek().unwrap(), &1001);
}
}

View file

@ -1,20 +1,23 @@
use crate::Monotonic; use crate::{
sll::{IntrusiveSortedLinkedList, Min as IsslMin, Node as IntrusiveNode},
Monotonic,
};
use core::cmp::Ordering; use core::cmp::Ordering;
use core::task::Waker; use core::task::Waker;
use heapless::sorted_linked_list::{LinkedIndexU16, Min, SortedLinkedList}; use heapless::sorted_linked_list::{LinkedIndexU16, Min as SllMin, SortedLinkedList};
pub struct TimerQueue<Mono, Task, const N_TASK: usize, const N_WAKER: usize> pub struct TimerQueue<'a, Mono, Task, const N_TASK: usize>
where where
Mono: Monotonic, Mono: Monotonic,
Task: Copy, Task: Copy,
{ {
pub task_queue: SortedLinkedList<TaskNotReady<Mono, Task>, LinkedIndexU16, Min, N_TASK>, pub task_queue: SortedLinkedList<TaskNotReady<Mono, Task>, LinkedIndexU16, SllMin, N_TASK>,
pub waker_queue: SortedLinkedList<WakerNotReady<Mono>, LinkedIndexU16, Min, N_WAKER>, pub waker_queue: IntrusiveSortedLinkedList<'a, WakerNotReady<Mono>, IsslMin>,
} }
impl<Mono, Task, const N_TASK: usize, const N_WAKER: usize> TimerQueue<Mono, Task, N_TASK, N_WAKER> impl<'a, Mono, Task, const N_TASK: usize> TimerQueue<'a, Mono, Task, N_TASK>
where where
Mono: Monotonic, Mono: Monotonic + 'a,
Task: Copy, Task: Copy,
{ {
fn check_if_enable<F1, F2>( fn check_if_enable<F1, F2>(
@ -70,17 +73,16 @@ where
#[inline] #[inline]
pub fn enqueue_waker<F1, F2>( pub fn enqueue_waker<F1, F2>(
&mut self, &mut self,
nr: WakerNotReady<Mono>, nr: &'a mut IntrusiveNode<WakerNotReady<Mono>>,
enable_interrupt: F1, enable_interrupt: F1,
pend_handler: F2, pend_handler: F2,
mono: Option<&mut Mono>, mono: Option<&mut Mono>,
) -> Result<(), ()> ) where
where
F1: FnOnce(), F1: FnOnce(),
F2: FnOnce(), F2: FnOnce(),
{ {
self.check_if_enable(nr.instant, enable_interrupt, pend_handler, mono); self.check_if_enable(nr.val.instant, enable_interrupt, pend_handler, mono);
self.waker_queue.push(nr).map_err(|_| ()) self.waker_queue.push(nr);
} }
/// Check if all the timer queue is empty. /// Check if all the timer queue is empty.
@ -133,12 +135,12 @@ where
&mut self, &mut self,
instant: Mono::Instant, instant: Mono::Instant,
mono: &mut Mono, mono: &mut Mono,
) -> Option<TaskOrWaker<Task>> { ) -> Option<(Task, u8)> {
let now = mono.now(); let now = mono.now();
if instant <= now { if instant <= now {
// task became ready // task became ready
let nr = unsafe { self.task_queue.pop_unchecked() }; let nr = unsafe { self.task_queue.pop_unchecked() };
Some(TaskOrWaker::Task((nr.task, nr.index))) Some((nr.task, nr.index))
} else { } else {
// Set compare // Set compare
mono.set_compare(instant); mono.set_compare(instant);
@ -149,23 +151,18 @@ where
// guard against this. // guard against this.
if instant <= now { if instant <= now {
let nr = unsafe { self.task_queue.pop_unchecked() }; let nr = unsafe { self.task_queue.pop_unchecked() };
Some(TaskOrWaker::Task((nr.task, nr.index))) Some((nr.task, nr.index))
} else { } else {
None None
} }
} }
} }
fn dequeue_waker_queue( fn dequeue_waker_queue(&mut self, instant: Mono::Instant, mono: &mut Mono) {
&mut self,
instant: Mono::Instant,
mono: &mut Mono,
) -> Option<TaskOrWaker<Task>> {
let now = mono.now(); let now = mono.now();
if instant <= now { if instant <= now {
// task became ready // Task became ready, wake the waker
let nr = unsafe { self.waker_queue.pop_unchecked() }; self.waker_queue.pop().map(|v| v.val.waker.wake_by_ref());
Some(TaskOrWaker::Waker(nr.waker))
} else { } else {
// Set compare // Set compare
mono.set_compare(instant); mono.set_compare(instant);
@ -175,16 +172,13 @@ where
// read of now to the set of the compare, the time can overflow. This is to // read of now to the set of the compare, the time can overflow. This is to
// guard against this. // guard against this.
if instant <= now { if instant <= now {
let nr = unsafe { self.waker_queue.pop_unchecked() }; self.waker_queue.pop().map(|v| v.val.waker.wake_by_ref());
Some(TaskOrWaker::Waker(nr.waker))
} else {
None
} }
} }
} }
/// Dequeue a task from the ``TimerQueue`` /// Dequeue a task from the ``TimerQueue``
pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<TaskOrWaker<Task>> pub fn dequeue<F>(&mut self, disable_interrupt: F, mono: &mut Mono) -> Option<(Task, u8)>
where where
F: FnOnce(), F: FnOnce(),
{ {
@ -228,16 +222,12 @@ where
if dequeue_task { if dequeue_task {
self.dequeue_task_queue(instant, mono) self.dequeue_task_queue(instant, mono)
} else { } else {
self.dequeue_waker_queue(instant, mono) self.dequeue_waker_queue(instant, mono);
None
} }
} }
} }
pub enum TaskOrWaker<Task> {
Task((Task, u8)),
Waker(Waker),
}
pub struct TaskNotReady<Mono, Task> pub struct TaskNotReady<Mono, Task>
where where
Task: Copy, Task: Copy,