rtic/book/en/src/internals/tasks.md

12 KiB

Software tasks

RTIC supports software tasks and hardware tasks. Each hardware task is bound to a different interrupt handler. On the other hand, several software tasks may be dispatched by the same interrupt handler -- this is done to minimize the number of interrupts handlers used by the framework.

The framework groups spawn-able tasks by priority level and generates one task dispatcher per priority level. Each task dispatcher runs on a different interrupt handler and the priority of said interrupt handler is set to match the priority level of the tasks managed by the dispatcher.

Each task dispatcher keeps a queue of tasks which are ready to execute; this queue is referred to as the ready queue. Spawning a software task consists of adding an entry to this queue and pending the interrupt that runs the corresponding task dispatcher. Each entry in this queue contains a tag (enum) that identifies the task to execute and a pointer to the message passed to the task.

The ready queue is a SPSC (Single Producer Single Consumer) lock-free queue. The task dispatcher owns the consumer endpoint of the queue; the producer endpoint is treated as a resource contended by the tasks that can spawn other tasks.

The task dispatcher

Let's first take a look the code generated by the framework to dispatch tasks. Consider this example:

#[rtic::app(device = ..)]
mod app {
    // ..

    #[interrupt(binds = UART0, priority = 2, spawn = [bar, baz])]
    fn foo(c: foo::Context) {
        foo.spawn.bar().ok();

        foo.spawn.baz(42).ok();
    }

    #[task(capacity = 2, priority = 1)]
    fn bar(c: bar::Context) {
        // ..
    }

    #[task(capacity = 2, priority = 1, resources = [X])]
    fn baz(c: baz::Context, input: i32) {
        // ..
    }

    extern "C" {
        fn UART1();
    }
}

The framework produces the following task dispatcher which consists of an interrupt handler and a ready queue:

fn bar(c: bar::Context) {
    // .. user code ..
}

mod app {
    use heapless::spsc::Queue;
    use cortex_m::register::basepri;

    struct Ready<T> {
        task: T,
        // ..
    }

    /// `spawn`-able tasks that run at priority level `1`
    enum T1 {
        bar,
        baz,
    }

    // ready queue of the task dispatcher
    // `5-1=4` represents the capacity of this queue
    static mut RQ1: Queue<Ready<T1>, 5> = Queue::new();

    // interrupt handler chosen to dispatch tasks at priority `1`
    #[no_mangle]
    unsafe UART1() {
        // the priority of this interrupt handler
        const PRIORITY: u8 = 1;

        let snapshot = basepri::read();

        while let Some(ready) = RQ1.split().1.dequeue() {
            match ready.task {
                T1::bar => {
                    // **NOTE** simplified implementation

                    // used to track the dynamic priority
                    let priority = Cell::new(PRIORITY);

                    // call into user code
                    bar(bar::Context::new(&priority));
                }

                T1::baz => {
                    // we'll look at `baz` later
                }
            }
        }

        // BASEPRI invariant
        basepri::write(snapshot);
    }
}

Spawning a task

The spawn API is exposed to the user as the methods of a Spawn struct. There's one Spawn struct per task.

The Spawn code generated by the framework for the previous example looks like this:

mod foo {
    // ..

    pub struct Context<'a> {
        pub spawn: Spawn<'a>,
        // ..
    }

    pub struct Spawn<'a> {
        // tracks the dynamic priority of the task
        priority: &'a Cell<u8>,
    }

    impl<'a> Spawn<'a> {
        // `unsafe` and hidden because we don't want the user to tamper with it
        #[doc(hidden)]
        pub unsafe fn priority(&self) -> &Cell<u8> {
            self.priority
        }
    }
}

mod app {
    // ..

    // Priority ceiling for the producer endpoint of the `RQ1`
    const RQ1_CEILING: u8 = 2;

    // used to track how many more `bar` messages can be enqueued
    // `3-1=2` represents the capacity of this queue
    // this queue is filled by the framework before `init` runs
    static mut bar_FQ: Queue<(), 3> = Queue::new();

    // Priority ceiling for the consumer endpoint of `bar_FQ`
    const bar_FQ_CEILING: u8 = 2;

    // a priority-based critical section
    //
    // this run the given closure `f` at a dynamic priority of at least
    // `ceiling`
    fn lock(priority: &Cell<u8>, ceiling: u8, f: impl FnOnce()) {
        // ..
    }

    impl<'a> foo::Spawn<'a> {
        /// Spawns the `bar` task
        pub fn bar(&self) -> Result<(), ()> {
            unsafe {
                match lock(self.priority(), bar_FQ_CEILING, || {
                    bar_FQ.split().1.dequeue()
                }) {
                    Some(()) => {
                        lock(self.priority(), RQ1_CEILING, || {
                            // put the taks in the ready queue
                            RQ1.split().1.enqueue_unchecked(Ready {
                                task: T1::bar,
                                // ..
                            })
                        });

                        // pend the interrupt that runs the task dispatcher
                        rtic::pend(Interrupt::UART0);
                    }

                    None => {
                        // maximum capacity reached; spawn failed
                        Err(())
                    }
                }
            }
        }
    }
}

Using bar_FQ to limit the number of bar tasks that can be spawned may seem like an artificial limitation but it will make more sense when we talk about task capacities.

Messages

We have omitted how message passing actually works so let's revisit the spawn implementation but this time for task baz which receives a u64 message.

fn baz(c: baz::Context, input: u64) {
    // .. user code ..
}

mod app {
    // ..

    // Now we show the full contents of the `Ready` struct
    struct Ready {
        task: Task,
        // message index; used to index the `INPUTS` buffer
        index: u8,
    }

    // memory reserved to hold messages passed to `baz`
    static mut baz_INPUTS: [MaybeUninit<u64>; 2] =
        [MaybeUninit::uninit(), MaybeUninit::uninit()];

    // the free queue: used to track free slots in the `baz_INPUTS` array
    // this queue is initialized with values `0` and `1` before `init` is executed
    static mut baz_FQ: Queue<u8, 3> = Queue::new();

    // Priority ceiling for the consumer endpoint of `baz_FQ`
    const baz_FQ_CEILING: u8 = 2;

    impl<'a> foo::Spawn<'a> {
        /// Spawns the `baz` task
        pub fn baz(&self, message: u64) -> Result<(), u64> {
            unsafe {
                match lock(self.priority(), baz_FQ_CEILING, || {
                    baz_FQ.split().1.dequeue()
                }) {
                    Some(index) => {
                        // NOTE: `index` is an ownining pointer into this buffer
                        baz_INPUTS[index as usize].write(message);

                        lock(self.priority(), RQ1_CEILING, || {
                            // put the task in the ready queue
                            RQ1.split().1.enqueue_unchecked(Ready {
                                task: T1::baz,
                                index,
                            });
                        });

                        // pend the interrupt that runs the task dispatcher
                        rtic::pend(Interrupt::UART0);
                    }

                    None => {
                        // maximum capacity reached; spawn failed
                        Err(message)
                    }
                }
            }
        }
    }
}

And now let's look at the real implementation of the task dispatcher:

mod app {
    // ..

    #[no_mangle]
    unsafe UART1() {
        const PRIORITY: u8 = 1;

        let snapshot = basepri::read();

        while let Some(ready) = RQ1.split().1.dequeue() {
            match ready.task {
                Task::baz => {
                    // NOTE: `index` is an ownining pointer into this buffer
                    let input = baz_INPUTS[ready.index as usize].read();

                    // the message has been read out so we can return the slot
                    // back to the free queue
                    // (the task dispatcher has exclusive access to the producer
                    // endpoint of this queue)
                    baz_FQ.split().0.enqueue_unchecked(ready.index);

                    let priority = Cell::new(PRIORITY);
                    baz(baz::Context::new(&priority), input)
                }

                Task::bar => {
                    // looks just like the `baz` branch
                }

            }
        }

        // BASEPRI invariant
        basepri::write(snapshot);
    }
}

INPUTS plus FQ, the free queue, is effectively a memory pool. However, instead of using the usual free list (linked list) to track empty slots in the INPUTS buffer we use a SPSC queue; this lets us reduce the number of critical sections. In fact, thanks to this choice the task dispatching code is lock-free.

Queue capacity

The RTIC framework uses several queues like ready queues and free queues. When the free queue is empty trying to spawn a task results in an error; this condition is checked at runtime. Not all the operations performed by the framework on these queues check if the queue is empty / full. For example, returning an slot to the free queue (see the task dispatcher) is unchecked because there's a fixed number of such slots circulating in the system that's equal to the capacity of the free queue. Similarly, adding an entry to the ready queue (see Spawn) is unchecked because of the queue capacity chosen by the framework.

Users can specify the capacity of software tasks; this capacity is the maximum number of messages one can post to said task from a higher priority task before spawn returns an error. This user-specified capacity is the capacity of the free queue of the task (e.g. foo_FQ) and also the size of the array that holds the inputs to the task (e.g. foo_INPUTS).

The capacity of the ready queue (e.g. RQ1) is chosen to be the sum of the capacities of all the different tasks managed by the dispatcher; this sum is also the number of messages the queue will hold in the worst case scenario of all possible messages being posted before the task dispatcher gets a chance to run. For this reason, getting a slot from the free queue in any spawn operation implies that the ready queue is not yet full so inserting an entry into the ready queue can omit the "is it full?" check.

In our running example the task bar takes no input so we could have omitted both bar_INPUTS and bar_FQ and let the user post an unbounded number of messages to this task, but if we did that it would have not be possible to pick a capacity for RQ1 that lets us omit the "is it full?" check when spawning a baz task. In the section about the timer queue we'll see how the free queue is used by tasks that have no inputs.

Ceiling analysis

The queues internally used by the spawn API are treated like normal resources and included in the ceiling analysis. It's important to note that these are SPSC queues and that only one of the endpoints is behind a resource; the other endpoint is owned by a task dispatcher.

Consider the following example:

#[rtic::app(device = ..)]
mod app {
    #[idle(spawn = [foo, bar])]
    fn idle(c: idle::Context) -> ! {
        // ..
    }

    #[task]
    fn foo(c: foo::Context) {
        // ..
    }

    #[task]
    fn bar(c: bar::Context) {
        // ..
    }

    #[task(priority = 2, spawn = [foo])]
    fn baz(c: baz::Context) {
        // ..
    }

    #[task(priority = 3, spawn = [bar])]
    fn quux(c: quux::Context) {
        // ..
    }
}

This is how the ceiling analysis would go:

  • idle (prio = 0) and baz (prio = 2) contend for the consumer endpoint of foo_FQ; this leads to a priority ceiling of 2.

  • idle (prio = 0) and quux (prio = 3) contend for the consumer endpoint of bar_FQ; this leads to a priority ceiling of 3.

  • idle (prio = 0), baz (prio = 2) and quux (prio = 3) all contend for the producer endpoint of RQ1; this leads to a priority ceiling of 3