18 KiB
Очередь таймера
Функциональность очередь таймера позволяет пользователю планировать задачи на запуск в опреленное время в будущем. Неудивительно, что эта функция также реализуется с помощью очереди: очередь приоритетов, где запланированные задачи сортируются в порядке аозрастания времени. Эта функция требует таймер, способный устанавливать прерывания истечения времени. Таймер используется для пуска прерывания, когда настает запланированное время задачи; в этот момент задача удаляется из очереди таймера и помещается в очередь готовности.
Давайте посмотрим, как это реализовано в коде. Рассмотрим следующую программу:
#[rtic::app(device = ..)]
mod app {
// ..
#[task(capacity = 2, schedule = [foo])]
fn foo(c: foo::Context, x: u32) {
// запланировать задачу на повторный запуск через 1 млн. тактов
c.schedule.foo(c.scheduled + Duration::cycles(1_000_000), x + 1).ok();
}
extern "C" {
fn UART0();
}
}
schedule
Давайте сначала взглянем на интерфейс schedule
.
mod foo {
pub struct Schedule<'a> {
priority: &'a Cell<u8>,
}
impl<'a> Schedule<'a> {
// `unsafe` и спрятано, потому что мы не хотим, чтобы пользовать сюда вмешивался
#[doc(hidden)]
pub unsafe fn priority(&self) -> &Cell<u8> {
self.priority
}
}
}
mod app {
type Instant = <path::to::user::monotonic::timer as rtic::Monotonic>::Instant;
// все задачи, которые могут быть запланированы (`schedule`)
enum T {
foo,
}
struct NotReady {
index: u8,
instant: Instant,
task: T,
}
// Очередь таймера - двоичная куча (min-heap) задач `NotReady`
static mut TQ: TimerQueue<U2> = ..;
const TQ_CEILING: u8 = 1;
static mut foo_FQ: Queue<u8, U2> = Queue::new();
const foo_FQ_CEILING: u8 = 1;
static mut foo_INPUTS: [MaybeUninit<u32>; 2] =
[MaybeUninit::uninit(), MaybeUninit::uninit()];
static mut foo_INSTANTS: [MaybeUninit<Instant>; 2] =
[MaybeUninit::uninit(), MaybeUninit::uninit()];
impl<'a> foo::Schedule<'a> {
fn foo(&self, instant: Instant, input: u32) -> Result<(), u32> {
unsafe {
let priority = self.priority();
if let Some(index) = lock(priority, foo_FQ_CEILING, || {
foo_FQ.split().1.dequeue()
}) {
// `index` - владеющий укачатель на ячейки в этих буферах
foo_INSTANTS[index as usize].write(instant);
foo_INPUTS[index as usize].write(input);
let nr = NotReady {
index,
instant,
task: T::foo,
};
lock(priority, TQ_CEILING, || {
TQ.enqueue_unchecked(nr);
});
} else {
// Не осталось места, чтобы разместить входные данные / instant
Err(input)
}
}
}
}
}
Это очень похоже на реализацию Spawn
. На самом деле одни и те же буфер
INPUTS
и список сободной памяти (FQ
) используются совместно интерфейсами
spawn
и schedule
. Главное отличие между ними в том, что schedule
также
размещает Instant
, момент на который задача запланирована на запуск,
в отдельном буфере (foo_INSTANTS
в нашем случае).
TimerQueue::enqueue_unchecked
делает немного больше работы, чем
просто добавление записи в min-heap: он также вызывает прерывание
системного таймера (SysTick
), если новая запись оказывается первой в очереди.
Системный таймер
Прерывание системного таймера (SysTick
) заботится о двух вещах:
передаче задач, которых становятся готовыми из очереди таймера в очередь готовности
и установке прерывания истечения времени, когда наступит запланированное
время следующей задачи.
Давайте посмотрим на соответствующий код.
mod app {
#[no_mangle]
fn SysTick() {
const PRIORITY: u8 = 1;
let priority = &Cell::new(PRIORITY);
while let Some(ready) = lock(priority, TQ_CEILING, || TQ.dequeue()) {
match ready.task {
T::foo => {
// переместить эту задачу в очередь готовности `RQ1`
lock(priority, RQ1_CEILING, || {
RQ1.split().0.enqueue_unchecked(Ready {
task: T1::foo,
index: ready.index,
})
});
// вызвать диспетчер задач
rtic::pend(Interrupt::UART0);
}
}
}
}
}
Выглядит похоже на диспетчер задач, за исключением того, что вместо запуска готовой задачи, она лишь переносится в очередь готовности, что ведет к ее запуску с нужным приоритетом.
TimerQueue::dequeue
установит новое прерывание истечения времени, если вернет
None
. Он сязан с TimerQueue::enqueue_unchecked
, который вызывает это
прерывание; на самом деле, enqueue_unchecked
передает задачу установки
нового прерывание истечения времени обработчику SysTick
.
Точность и диапазон cyccnt::Instant
и cyccnt::Duration
RTIC предоставляет реализацию Monotonic
, основанную на счетчике тактов DWT
(Data Watchpoint and Trace). Instant::now
возвращает снимок таймера; эти снимки
DWT (Instant
ы) используются для сортировки записей в очереди таймера.
Счетчик тактов - 32-битный счетчик, работающий на частоте ядра.
Этот счетчик обнуляется каждые (1 << 32)
тактов; у нас нет прерывания,
ассоциированног с этим счетчиком, поэтому ничего ужасного не случится,
когда он пройдет оборот.
Чтобы упорядочить Instant
ы в очереди, нам нужно сравнить 32-битные целые.
Чтобы учесть обороты, мы используем разницу между двумя Instant
ами, a - b
,
и рассматриваем результат как 32-битное знаковое целое.
Если результат меньше нуля, значит b
более поздний Instant
;
если результат больше нуля, значит b
более ранний Instant
.
Это значит, что планирование задачи на Instant
, который на (1 << 31) - 1
тактов
больше, чем запланированное время (Instant
) первой (самой ранней) записи
в очереди приведет к тому, что задача будет помещена в неправильное
место в очереди. У нас есть несколько debug assertions в коде, чтобы
предотвратить эту пользовательскую ошибку, но этого нельзя избежать,
поскольку пользователь может написать
(instant + duration_a) + duration_b
и переполнить Instant
.
Системный таймер, SysTick
- 24-битный счетчик также работающий
на частоте процессора. Когда следующая планируемая задача более, чем в
1 << 24
тактов в будущем, прерывание устанавливается на время в пределах
1 << 24
тактов. Этот процесс может происходить несколько раз, пока
следующая запланированная задача не будет в диапазоне счетчика SysTick
.
Подведем итог, оба Instant
и Duration
имеют разрешение 1 такт ядра, и Duration
эффективно имеет (полуоткрытый) диапазон 0..(1 << 31)
(не включая максимум) тактов ядра.
Вместительность очереди
Вместительность очереди таймера рассчитывается как сумма вместительностей
всех планируемых (schedule
) задач. Как и в случае очередей готовности,
это значит, что как только мы затребовали пустую ячейку в буфере INPUTS
,
мы гарантируем, что способны передать задачу в очередь таймера;
это позволяет нам опустить проверки времени выполнения.
Приоритет системного таймера
Приориет системного таймера не может быть установлен пользователем; он выбирается фреймворком. Чтобы убедиться, что низкоприоритетные задачи не препятствуют запуску высокоприоритетных, мы выбираем приоритет системного таймера максимальным из всех планируемых задач.
Чтобы понять, почему это нужно, рассмотрим вариант, когда две ранее
запланированные задачи с приоритетами 2
и 3
становятся готовыми в
примерно одинаковое время, но низкоприоритетная задача перемещается
в очередь готовности первой.
Если бы приоритет системного таймера был, например, равен 1
,
тогда после перемещения низкоприоритетной (2
) задачи, это бы привело
к завершению (из-за того, что приоритет выше приоритета системного таймера)
ожидания выполнения высокоприоритетной задачи (3
).
Чтобы избежать такого сценария, системный таймер должен работать на
приоритете, равном наивысшему из приоритетов планируемых задач;
в этом примере это 3
.
Анализ приоритетов
Очередь таймера - это ресурс, разделяемый всеми задачами, которые могут
планировать (schedule
) задачи и обработчиком SysTick
.
Также интерфейс schedule
соперничает с интерфейсом spawn
за списки свободной памяти. Все это должно уситываться в анализе приоритетов.
Чтобы проиллюстрировать, рассмотрим следующий пример:
#[rtic::app(device = ..)]
mod app {
#[task(priority = 3, spawn = [baz])]
fn foo(c: foo::Context) {
// ..
}
#[task(priority = 2, schedule = [foo, baz])]
fn bar(c: bar::Context) {
// ..
}
#[task(priority = 1)]
fn baz(c: baz::Context) {
// ..
}
}
Анализ приоритетов происходил бы вот так:
-
foo
(prio = 3) иbaz
(prio = 1) планируемые задачи, поэтомуSysTick
должен работать на максимальном из этих двух приоритетов, т.е.3
. -
foo::Spawn
(prio = 3) иbar::Schedule
(prio = 2) соперничают за конечный потребительbaz_FQ
; это приводит к максимальному приоритету3
. -
bar::Schedule
(prio = 2) имеет экслюзивный доступ к конечному потребителюfoo_FQ
; поэтому максимальный приоритетfoo_FQ
фактически2
. -
SysTick
(prio = 3) иbar::Schedule
(prio = 2) соперничают за очередь таймераTQ
; это приводит к максимальному приоритету3
. -
SysTick
(prio = 3) иfoo::Spawn
(prio = 3) оба имеют неблокируемый доступ к очереди готовностиRQ3
, что хранит записиfoo
; поэтому максимальный приоритетRQ3
фактически3
. -
SysTick
имеет эксклюзивный доступ к очереди готовностиRQ1
, которая хранит записиbaz
; поэтому максимальный приоритетRQ1
фактически3
.
Изменения в реализации spawn
Когда интерфейс schedule
используется, реализация spawn
немного
изменяется, чтобы отслеживать baseline задач. Как можете видеть в
реализации schedule
есть буферы INSTANTS
, используемые, чтобы
хранить время, в которое задача была запланирована навыполнение;
этот Instant
читается диспетчером задач и передается в пользовательский
код, как часть контекста задачи.
mod app {
// ..
#[no_mangle]
unsafe UART1() {
const PRIORITY: u8 = 1;
let snapshot = basepri::read();
while let Some(ready) = RQ1.split().1.dequeue() {
match ready.task {
Task::baz => {
let input = baz_INPUTS[ready.index as usize].read();
// ADDED
let instant = baz_INSTANTS[ready.index as usize].read();
baz_FQ.split().0.enqueue_unchecked(ready.index);
let priority = Cell::new(PRIORITY);
// ИЗМЕНЕНО instant передан как часть контекста задачи
baz(baz::Context::new(&priority, instant), input)
}
Task::bar => {
// выглядит также как ветка для `baz`
}
}
}
// инвариант BASEPRI
basepri::write(snapshot);
}
}
И наоборот, реализации spawn
нужно писать значение в буфер INSTANTS
.
Записанное значение располагается в структуре Spawn
и это либо
время start
аппаратной задачи, либо время scheduled
программной задачи.
mod foo {
// ..
pub struct Spawn<'a> {
priority: &'a Cell<u8>,
// ADDED
instant: Instant,
}
impl<'a> Spawn<'a> {
pub unsafe fn priority(&self) -> &Cell<u8> {
&self.priority
}
// ADDED
pub unsafe fn instant(&self) -> Instant {
self.instant
}
}
}
mod app {
impl<'a> foo::Spawn<'a> {
/// Spawns the `baz` task
pub fn baz(&self, message: u64) -> Result<(), u64> {
unsafe {
match lock(self.priority(), baz_FQ_CEILING, || {
baz_FQ.split().1.dequeue()
}) {
Some(index) => {
baz_INPUTS[index as usize].write(message);
// ADDED
baz_INSTANTS[index as usize].write(self.instant());
lock(self.priority(), RQ1_CEILING, || {
RQ1.split().1.enqueue_unchecked(Ready {
task: Task::foo,
index,
});
});
rtic::pend(Interrupt::UART0);
}
None => {
// достигнута максимальная вместительность; неудачный вызов
Err(message)
}
}
}
}
}
}