12 KiB
Software tasks
RTFM supports software tasks and hardware tasks. Each hardware task is bound to a different interrupt handler. On the other hand, several software tasks may be dispatched by the same interrupt handler -- this is done to minimize the number of interrupts handlers used by the framework.
The framework groups spawn
-able tasks by priority level and generates one
task dispatcher per priority level. Each task dispatcher runs on a different
interrupt handler and the priority of said interrupt handler is set to match the
priority level of the tasks managed by the dispatcher.
Each task dispatcher keeps a queue of tasks which are ready to execute; this
queue is referred to as the ready queue. Spawning a software task consists of
adding an entry to this queue and pending the interrupt that runs the
corresponding task dispatcher. Each entry in this queue contains a tag (enum
)
that identifies the task to execute and a pointer to the message passed to the
task.
The ready queue is a SPSC (Single Producer Single Consumer) lock-free queue. The
task dispatcher owns the consumer endpoint of the queue; the producer endpoint
is treated as a resource contended by the tasks that can spawn
other tasks.
The task dispatcher
Let's first take a look the code generated by the framework to dispatch tasks. Consider this example:
#[rtfm::app(device = ..)]
const APP: () = {
// ..
#[interrupt(binds = UART0, priority = 2, spawn = [bar, baz])]
fn foo(c: foo::Context) {
foo.spawn.bar().ok();
foo.spawn.baz(42).ok();
}
#[task(capacity = 2, priority = 1)]
fn bar(c: bar::Context) {
// ..
}
#[task(capacity = 2, priority = 1, resources = [X])]
fn baz(c: baz::Context, input: i32) {
// ..
}
extern "C" {
fn UART1();
}
};
The framework produces the following task dispatcher which consists of an interrupt handler and a ready queue:
fn bar(c: bar::Context) {
// .. user code ..
}
const APP: () = {
use heapless::spsc::Queue;
use cortex_m::register::basepri;
struct Ready<T> {
task: T,
// ..
}
/// `spawn`-able tasks that run at priority level `1`
enum T1 {
bar,
baz,
}
// ready queue of the task dispatcher
// `U4` is a type-level integer that represents the capacity of this queue
static mut RQ1: Queue<Ready<T1>, U4> = Queue::new();
// interrupt handler chosen to dispatch tasks at priority `1`
#[no_mangle]
unsafe UART1() {
// the priority of this interrupt handler
const PRIORITY: u8 = 1;
let snapshot = basepri::read();
while let Some(ready) = RQ1.split().1.dequeue() {
match ready.task {
T1::bar => {
// **NOTE** simplified implementation
// used to track the dynamic priority
let priority = Cell::new(PRIORITY);
// call into user code
bar(bar::Context::new(&priority));
}
T1::baz => {
// we'll look at `baz` later
}
}
}
// BASEPRI invariant
basepri::write(snapshot);
}
};
Spawning a task
The spawn
API is exposed to the user as the methods of a Spawn
struct.
There's one Spawn
struct per task.
The Spawn
code generated by the framework for the previous example looks like
this:
mod foo {
// ..
pub struct Context<'a> {
pub spawn: Spawn<'a>,
// ..
}
pub struct Spawn<'a> {
// tracks the dynamic priority of the task
priority: &'a Cell<u8>,
}
impl<'a> Spawn<'a> {
// `unsafe` and hidden because we don't want the user to tamper with it
#[doc(hidden)]
pub unsafe fn priority(&self) -> &Cell<u8> {
self.priority
}
}
}
const APP: () = {
// ..
// Priority ceiling for the producer endpoint of the `RQ1`
const RQ1_CEILING: u8 = 2;
// used to track how many more `bar` messages can be enqueued
// `U2` is the capacity of the `bar` task; a max of two instances can be queued
// this queue is filled by the framework before `init` runs
static mut bar_FQ: Queue<(), U2> = Queue::new();
// Priority ceiling for the consumer endpoint of `bar_FQ`
const bar_FQ_CEILING: u8 = 2;
// a priority-based critical section
//
// this run the given closure `f` at a dynamic priority of at least
// `ceiling`
fn lock(priority: &Cell<u8>, ceiling: u8, f: impl FnOnce()) {
// ..
}
impl<'a> foo::Spawn<'a> {
/// Spawns the `bar` task
pub fn bar(&self) -> Result<(), ()> {
unsafe {
match lock(self.priority(), bar_FQ_CEILING, || {
bar_FQ.split().1.dequeue()
}) {
Some(()) => {
lock(self.priority(), RQ1_CEILING, || {
// put the taks in the ready queue
RQ1.split().1.enqueue_unchecked(Ready {
task: T1::bar,
// ..
})
});
// pend the interrupt that runs the task dispatcher
rtfm::pend(Interrupt::UART0);
}
None => {
// maximum capacity reached; spawn failed
Err(())
}
}
}
}
}
};
Using bar_FQ
to limit the number of bar
tasks that can be spawned may seem
like an artificial limitation but it will make more sense when we talk about
task capacities.
Messages
We have omitted how message passing actually works so let's revisit the spawn
implementation but this time for task baz
which receives a u64
message.
fn baz(c: baz::Context, input: u64) {
// .. user code ..
}
const APP: () = {
// ..
// Now we show the full contents of the `Ready` struct
struct Ready {
task: Task,
// message index; used to index the `INPUTS` buffer
index: u8,
}
// memory reserved to hold messages passed to `baz`
static mut baz_INPUTS: [MaybeUninit<u64>; 2] =
[MaybeUninit::uninit(), MaybeUninit::uninit()];
// the free queue: used to track free slots in the `baz_INPUTS` array
// this queue is initialized with values `0` and `1` before `init` is executed
static mut baz_FQ: Queue<u8, U2> = Queue::new();
// Priority ceiling for the consumer endpoint of `baz_FQ`
const baz_FQ_CEILING: u8 = 2;
impl<'a> foo::Spawn<'a> {
/// Spawns the `baz` task
pub fn baz(&self, message: u64) -> Result<(), u64> {
unsafe {
match lock(self.priority(), baz_FQ_CEILING, || {
baz_FQ.split().1.dequeue()
}) {
Some(index) => {
// NOTE: `index` is an ownining pointer into this buffer
baz_INPUTS[index as usize].write(message);
lock(self.priority(), RQ1_CEILING, || {
// put the task in the ready queue
RQ1.split().1.enqueue_unchecked(Ready {
task: T1::baz,
index,
});
});
// pend the interrupt that runs the task dispatcher
rtfm::pend(Interrupt::UART0);
}
None => {
// maximum capacity reached; spawn failed
Err(message)
}
}
}
}
}
};
And now let's look at the real implementation of the task dispatcher:
const APP: () = {
// ..
#[no_mangle]
unsafe UART1() {
const PRIORITY: u8 = 1;
let snapshot = basepri::read();
while let Some(ready) = RQ1.split().1.dequeue() {
match ready.task {
Task::baz => {
// NOTE: `index` is an ownining pointer into this buffer
let input = baz_INPUTS[ready.index as usize].read();
// the message has been read out so we can return the slot
// back to the free queue
// (the task dispatcher has exclusive access to the producer
// endpoint of this queue)
baz_FQ.split().0.enqueue_unchecked(ready.index);
let priority = Cell::new(PRIORITY);
baz(baz::Context::new(&priority), input)
}
Task::bar => {
// looks just like the `baz` branch
}
}
}
// BASEPRI invariant
basepri::write(snapshot);
}
};
INPUTS
plus FQ
, the free queue, is effectively a memory pool. However,
instead of using the usual free list (linked list) to track empty slots
in the INPUTS
buffer we use a SPSC queue; this lets us reduce the number of
critical sections. In fact, thanks to this choice the task dispatching code is
lock-free.
Queue capacity
The RTFM framework uses several queues like ready queues and free queues. When
the free queue is empty trying to spawn
a task results in an error; this
condition is checked at runtime. Not all the operations performed by the
framework on these queues check if the queue is empty / full. For example,
returning an slot to the free queue (see the task dispatcher) is unchecked
because there's a fixed number of such slots circulating in the system that's
equal to the capacity of the free queue. Similarly, adding an entry to the ready
queue (see Spawn
) is unchecked because of the queue capacity chosen by the
framework.
Users can specify the capacity of software tasks; this capacity is the maximum
number of messages one can post to said task from a higher priority task before
spawn
returns an error. This user-specified capacity is the capacity of the
free queue of the task (e.g. foo_FQ
) and also the size of the array that holds
the inputs to the task (e.g. foo_INPUTS
).
The capacity of the ready queue (e.g. RQ1
) is chosen to be the sum of the
capacities of all the different tasks managed by the dispatcher; this sum is
also the number of messages the queue will hold in the worst case scenario of
all possible messages being posted before the task dispatcher gets a chance to
run. For this reason, getting a slot from the free queue in any spawn
operation implies that the ready queue is not yet full so inserting an entry
into the ready queue can omit the "is it full?" check.
In our running example the task bar
takes no input so we could have omitted
both bar_INPUTS
and bar_FQ
and let the user post an unbounded number of
messages to this task, but if we did that it would have not be possible to pick
a capacity for RQ1
that lets us omit the "is it full?" check when spawning a
baz
task. In the section about the timer queue we'll see
how the free queue is used by tasks that have no inputs.
Ceiling analysis
The queues internally used by the spawn
API are treated like normal resources
and included in the ceiling analysis. It's important to note that these are SPSC
queues and that only one of the endpoints is behind a resource; the other
endpoint is owned by a task dispatcher.
Consider the following example:
#[rtfm::app(device = ..)]
const APP: () = {
#[idle(spawn = [foo, bar])]
fn idle(c: idle::Context) -> ! {
// ..
}
#[task]
fn foo(c: foo::Context) {
// ..
}
#[task]
fn bar(c: bar::Context) {
// ..
}
#[task(priority = 2, spawn = [foo])]
fn baz(c: baz::Context) {
// ..
}
#[task(priority = 3, spawn = [bar])]
fn quux(c: quux::Context) {
// ..
}
};
This is how the ceiling analysis would go:
-
idle
(prio = 0) andbaz
(prio = 2) contend for the consumer endpoint offoo_FQ
; this leads to a priority ceiling of2
. -
idle
(prio = 0) andquux
(prio = 3) contend for the consumer endpoint ofbar_FQ
; this leads to a priority ceiling of3
. -
idle
(prio = 0),baz
(prio = 2) andquux
(prio = 3) all contend for the producer endpoint ofRQ1
; this leads to a priority ceiling of3