actor API

see rtic-rs/rfcs#52 for details
includes: core proposal and `#[init]` and `memory-watermark` extensions

Co-authored-by: Jonas Schievink <jonasschievink@gmail.com>
This commit is contained in:
Jorge Aparicio 2021-09-24 16:47:39 +02:00
parent 981fa1fb30
commit a58be575cb
27 changed files with 1088 additions and 63 deletions

View file

@ -22,6 +22,7 @@ name = "rtic"
[dependencies] [dependencies]
cortex-m = "0.7.0" cortex-m = "0.7.0"
cortex-m-rtic-macros = { path = "macros", version = "1.1.5" } cortex-m-rtic-macros = { path = "macros", version = "1.1.5" }
rtic-actor-traits = { path = "actor-traits" }
rtic-monotonic = "1.0.0" rtic-monotonic = "1.0.0"
rtic-core = "1.0.0" rtic-core = "1.0.0"
heapless = "0.7.7" heapless = "0.7.7"
@ -42,13 +43,18 @@ version = "0.5.2"
[target.x86_64-unknown-linux-gnu.dev-dependencies] [target.x86_64-unknown-linux-gnu.dev-dependencies]
trybuild = "1" trybuild = "1"
[features]
memory-watermark = ["cortex-m-rtic-macros/memory-watermark"]
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1
lto = true lto = true
[workspace] [workspace]
members = [ members = [
"actor-traits",
"macros", "macros",
"post-spy",
"xtask", "xtask",
] ]
@ -70,3 +76,5 @@ overflow-checks = false
[patch.crates-io] [patch.crates-io]
lm3s6965 = { git = "https://github.com/japaric/lm3s6965" } lm3s6965 = { git = "https://github.com/japaric/lm3s6965" }
# remove when rtic-rs/rtic-syntax#75 is merged
rtic-syntax = { branch = "actor", git = "https://github.com/rtic-rs/rtic-syntax" }

6
actor-traits/Cargo.toml Normal file
View file

@ -0,0 +1,6 @@
[package]
edition = "2018"
name = "rtic-actor-traits"
version = "0.1.0"
[dependencies]

9
actor-traits/src/lib.rs Normal file
View file

@ -0,0 +1,9 @@
#![no_std]
pub trait Post<M> {
fn post(&mut self, message: M) -> Result<(), M>;
}
pub trait Receive<M> {
fn receive(&mut self, message: M);
}

View file

@ -0,0 +1,60 @@
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])]
mod app {
use core::sync::atomic::{AtomicU8, Ordering};
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
struct Actor;
struct Message;
static CALL_COUNT: AtomicU8 = AtomicU8::new(0);
impl Receive<Message> for Actor {
fn receive(&mut self, _: Message) {
hprintln!("Actor::receive was called").ok();
CALL_COUNT.store(CALL_COUNT.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
}
}
#[actors]
struct Actors {
#[subscribe(Message, capacity = 2)]
actor: Actor,
}
#[init]
fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
assert!(cx.poster.post(Message).is_ok());
assert!(cx.poster.post(Message).is_ok());
assert!(cx.poster.post(Message).is_err());
(
Shared {},
Local {},
init::Monotonics(),
Actors { actor: Actor },
)
}
#[idle]
fn idle(_: idle::Context) -> ! {
assert_eq!(2, CALL_COUNT.load(Ordering::Relaxed));
loop {
debug::exit(debug::EXIT_SUCCESS);
}
}
#[shared]
struct Shared {}
#[local]
struct Local {}
}

47
examples/actor-init.rs Normal file
View file

@ -0,0 +1,47 @@
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
#[derive(Debug)]
struct Actor {
state: i32,
}
struct AssertActorWasInitialized;
const INITIAL_STATE: i32 = 42;
impl Receive<AssertActorWasInitialized> for Actor {
fn receive(&mut self, _: AssertActorWasInitialized) {
assert_eq!(INITIAL_STATE, self.state);
hprintln!("OK").ok();
debug::exit(debug::EXIT_SUCCESS);
}
}
#[actors]
struct Actors {
#[subscribe(AssertActorWasInitialized)]
#[init(Actor { state: INITIAL_STATE })]
actor: Actor,
}
#[init]
fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
cx.poster.post(AssertActorWasInitialized).ok();
(Shared {}, Local {}, init::Monotonics(), Actors {})
}
#[shared]
struct Shared {}
#[local]
struct Local {}
}

69
examples/actor-post.rs Normal file
View file

@ -0,0 +1,69 @@
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])]
mod app {
use core::sync::atomic::{AtomicBool, Ordering};
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
struct Actor;
const PAYLOAD: i32 = 42;
struct Message {
payload: i32,
}
static RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false);
impl Receive<Message> for Actor {
fn receive(&mut self, m: Message) {
hprintln!("Actor::receive was called").ok();
RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed);
assert_eq!(PAYLOAD, m.payload);
}
}
#[actors]
struct Actors {
#[subscribe(Message)]
actor: Actor,
}
#[init]
fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
cx.poster.post(Message { payload: PAYLOAD }).ok();
// receive invocation withheld
assert!(!RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
(
Shared {},
Local {},
init::Monotonics(),
Actors { actor: Actor },
)
}
#[idle]
fn idle(_: idle::Context) -> ! {
// receive invocation must have executed by now
assert!(RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
loop {
debug::exit(debug::EXIT_SUCCESS);
}
}
#[shared]
struct Shared {}
#[local]
struct Local {}
}

View file

@ -0,0 +1,87 @@
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA, GPIOB])]
mod app {
use core::sync::atomic::{AtomicBool, Ordering};
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
struct A;
struct B;
#[derive(Default)]
struct M {
must_not_be_cloned: bool,
}
impl Clone for M {
fn clone(&self) -> Self {
assert!(!self.must_not_be_cloned);
M {
must_not_be_cloned: self.must_not_be_cloned,
}
}
}
impl Receive<M> for A {
fn receive(&mut self, _: M) {
static WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false);
hprintln!("A::receive was called").ok();
assert!(!WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed));
WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed);
}
}
impl Receive<M> for B {
fn receive(&mut self, _: M) {
static WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false);
hprintln!("B::receive was called").ok();
assert!(!WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed));
WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed);
}
}
#[actors]
struct Actors {
#[subscribe(M, capacity = 2)]
#[init(A)]
a: A,
#[subscribe(M, capacity = 1)]
#[init(B)]
b: B,
}
#[init]
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
let mut poster = cx.poster;
assert!(poster.post(M::default()).is_ok());
// B's message queue is full so message must NOT be cloned
// this must also NOT trigger task A even if it has capacity
assert!(poster
.post(M {
must_not_be_cloned: true
})
.is_err());
(Shared {}, Local {}, init::Monotonics(), Actors {})
}
#[idle]
fn idle(_: idle::Context) -> ! {
loop {
debug::exit(debug::EXIT_SUCCESS)
}
}
#[local]
struct Local {}
#[shared]
struct Shared {}
}

99
examples/actor-publish.rs Normal file
View file

@ -0,0 +1,99 @@
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA, GPIOB])]
mod app {
use core::sync::atomic::{AtomicBool, Ordering};
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
struct A;
struct B;
const PAYLOAD: i32 = 42;
static CLONE_WAS_CALLED_EXACTLY_ONCE: AtomicBool = AtomicBool::new(false);
struct M {
payload: i32,
}
impl Clone for M {
fn clone(&self) -> Self {
assert!(!CLONE_WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed));
CLONE_WAS_CALLED_EXACTLY_ONCE.store(true, Ordering::Relaxed);
// `derive(Clone)` implementation
Self {
payload: self.payload.clone(),
}
}
}
static A_RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false);
impl Receive<M> for A {
fn receive(&mut self, m: M) {
hprintln!("A::receive was called").ok();
assert_eq!(PAYLOAD, m.payload);
A_RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed);
}
}
static B_RECEIVE_WAS_CALLED: AtomicBool = AtomicBool::new(false);
impl Receive<M> for B {
fn receive(&mut self, m: M) {
hprintln!("B::receive was called").ok();
assert_eq!(PAYLOAD, m.payload);
B_RECEIVE_WAS_CALLED.store(true, Ordering::Relaxed);
}
}
#[actors]
struct Actors {
#[subscribe(M, capacity = 2)]
#[init(A)]
a: A,
#[subscribe(M, capacity = 1)]
#[init(B)]
b: B,
}
#[init]
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
let mut poster = cx.poster;
assert!(poster.post(M { payload: PAYLOAD }).is_ok());
assert!(CLONE_WAS_CALLED_EXACTLY_ONCE.load(Ordering::Relaxed));
// receive invocations withheld
assert!(!A_RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
assert!(!B_RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
(Shared {}, Local {}, init::Monotonics(), Actors {})
}
#[idle]
fn idle(_: idle::Context) -> ! {
// receive invocations must have executed by now
assert!(A_RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
assert!(B_RECEIVE_WAS_CALLED.load(Ordering::Relaxed));
loop {
debug::exit(debug::EXIT_SUCCESS)
}
}
#[local]
struct Local {}
#[shared]
struct Shared {}
}

View file

@ -0,0 +1,68 @@
// This example depends on the `memory-watermark` feature
#![no_main]
#![no_std]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [GPIOA])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_actor_traits::Receive;
struct Actor;
struct Message;
impl Receive<Message> for Actor {
fn receive(&mut self, _: Message) {
hprintln!("Actor::receive was called").ok();
}
}
#[actors]
struct Actors {
#[subscribe(Message, capacity = 2)]
actor: Actor,
}
#[init]
fn init(mut cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
assert_eq!(0, actor::SUBSCRIPTIONS[0].watermark());
assert_eq!("Message", actor::SUBSCRIPTIONS[0].message_type);
assert_eq!(2, actor::SUBSCRIPTIONS[0].capacity);
assert!(cx.poster.post(Message).is_ok());
assert_eq!(1, actor::SUBSCRIPTIONS[0].watermark());
// monotonically increasing
assert!(cx.poster.post(Message).is_ok());
assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark());
// bounded value
assert!(cx.poster.post(Message).is_err());
assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark());
(
Shared {},
Local {},
init::Monotonics(),
Actors { actor: Actor },
)
}
#[idle]
fn idle(_: idle::Context) -> ! {
// monotonically increasing: does not decrease after receive is called
assert_eq!(2, actor::SUBSCRIPTIONS[0].watermark());
loop {
debug::exit(debug::EXIT_SUCCESS);
}
}
#[shared]
struct Shared {}
#[local]
struct Local {}
}

View file

@ -23,6 +23,8 @@ proc-macro-error = "1"
quote = "1" quote = "1"
syn = "1" syn = "1"
rtic-syntax = "1.0.2" rtic-syntax = "1.0.2"
indexmap = "1.0.2"
[features] [features]
debugprint = [] debugprint = []
memory-watermark = []

View file

@ -29,6 +29,7 @@ pub fn app(analysis: P<analyze::Analysis>, app: &App) -> P<Analysis> {
.software_tasks .software_tasks
.values() .values()
.map(|task| task.args.priority) .map(|task| task.args.priority)
.chain(app.actors.values().map(|actor| actor.priority))
.collect::<BTreeSet<_>>(); .collect::<BTreeSet<_>>();
// map from priorities to interrupts (holding name and attributes) // map from priorities to interrupts (holding name and attributes)

View file

@ -1,4 +1,4 @@
use std::collections::HashSet; use std::{cell::Cell, collections::HashSet};
use proc_macro2::Span; use proc_macro2::Span;
use rtic_syntax::{analyze::Analysis, ast::App}; use rtic_syntax::{analyze::Analysis, ast::App};
@ -30,14 +30,18 @@ pub fn app(app: &App, _analysis: &Analysis) -> parse::Result<Extra> {
// Check that there are enough external interrupts to dispatch the software tasks and the timer // Check that there are enough external interrupts to dispatch the software tasks and the timer
// queue handler // queue handler
let mut first = None; let first = Cell::new(None);
let priorities = app let priorities = app
.software_tasks .software_tasks
.iter() .iter()
.map(|(name, task)| { .map(|(name, task)| {
first = Some(name); first.set(Some(name));
task.args.priority task.args.priority
}) })
.chain(app.actors.iter().map(|(name, ao)| {
first.set(Some(name));
ao.priority
}))
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
let need = priorities.len(); let need = priorities.len();
@ -46,14 +50,14 @@ pub fn app(app: &App, _analysis: &Analysis) -> parse::Result<Extra> {
let s = { let s = {
format!( format!(
"not enough interrupts to dispatch \ "not enough interrupts to dispatch \
all software tasks (need: {}; given: {})", all software tasks / actors (need: {}; given: {})",
need, given need, given
) )
}; };
// If not enough tasks and first still is None, may cause // If not enough interrupts and `first` still is None, may cause
// "custom attribute panicked" due to unwrap on None // "custom attribute panicked" due to unwrap on None
return Err(parse::Error::new(first.unwrap().span(), &s)); return Err(parse::Error::new(first.get().unwrap().span(), &s));
} }
// Check that all exceptions are valid; only exceptions with configurable priorities are // Check that all exceptions are valid; only exceptions with configurable priorities are

View file

@ -4,6 +4,7 @@ use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra}; use crate::{analyze::Analysis, check::Extra};
mod actors;
mod assertions; mod assertions;
mod dispatchers; mod dispatchers;
mod hardware_tasks; mod hardware_tasks;
@ -84,6 +85,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
} }
)); ));
let mod_app_actors = actors::codegen(app, analysis, extra);
let (mod_app_shared_resources, mod_shared_resources) = let (mod_app_shared_resources, mod_shared_resources) =
shared_resources::codegen(app, analysis, extra); shared_resources::codegen(app, analysis, extra);
let (mod_app_local_resources, mod_local_resources) = let (mod_app_local_resources, mod_local_resources) =
@ -202,6 +205,8 @@ pub fn app(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
#(#mod_app_timer_queue)* #(#mod_app_timer_queue)*
#mod_app_actors
#(#mains)* #(#mains)*
} }
) )

View file

@ -0,0 +1,299 @@
use indexmap::IndexMap;
use proc_macro2::TokenStream as TokenStream2;
use quote::{format_ident, quote};
use rtic_syntax::ast::App;
use crate::{analyze::Analysis, check::Extra, codegen::util};
pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> TokenStream2 {
// Generate a `Poster` type, and `Post` implementations for every message that a task has
// subscribed to.
let mut map: IndexMap<_, Vec<_>> = IndexMap::new();
for (name, obj) in &app.actors {
for (subscription_index, subscription) in obj.subscriptions.iter().enumerate() {
map.entry(subscription.ty.clone())
.or_default()
.push((name, subscription_index));
}
}
let post_impls = map.iter().map(|(message_ty, pairs)| {
let last_index = pairs.len() - 1;
let any_is_full = pairs
.iter()
.map(|(actor_name, subscription_index)| {
let post_name = util::actor_post(actor_name, *subscription_index);
quote!(#post_name::is_full())
})
.collect::<Vec<_>>();
let posts = pairs
.iter()
.enumerate()
.map(|(i, (actor_name, subscription_index))| {
let post_name = util::actor_post(actor_name, *subscription_index);
if i == last_index {
// avoid Clone on last message
quote!(#post_name(message)?;)
} else {
quote!(#post_name(message.clone())?;)
}
})
.collect::<Vec<_>>();
quote! {
impl rtic::export::Post<#message_ty> for Poster {
fn post(&mut self, message: #message_ty) -> Result<(), #message_ty> {
// TODO(micro-optimization) do the `clone`-ing *outside* the critical section
// Atomically posts all messages
rtic::export::interrupt::free(|_| unsafe {
if false #(|| #any_is_full)* {
return Err(message)
}
#(#posts)*
Ok(())
})?;
Ok(())
}
}
}
});
// Actor receive "task" functions
let mut task_functions = vec![];
for (name, actor) in &app.actors {
let actor_ty = &actor.ty;
for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() {
let function_name = &util::internal_actor_receive_task(name, subscription_index);
let actor_state = util::actor_state_ident(name);
let input_ty = &subscription.ty;
let refmut = if actor.init.is_none() {
quote!(&mut *(&mut *#actor_state.get_mut()).as_mut_ptr())
} else {
quote!((&mut *#actor_state.get_mut()))
};
task_functions.push(quote!(
fn #function_name(message: #input_ty) {
// NOTE(safety) all the Receive methods of an actor instance run at the same
// priority so no lock required
unsafe {
<#actor_ty as rtic::export::Receive<#input_ty>>::receive(
#refmut,
message,
)
}
}
));
}
}
// "Spawn" infrastructure
let mut spawn_infra = vec![];
for (actor_name, actor) in &app.actors {
for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() {
let capacity = subscription.capacity;
let message_ty = &subscription.ty;
let cap_lit = util::capacity_literal(capacity as usize);
let cap_lit_p1 = util::capacity_literal(capacity as usize + 1);
let pseudo_task_name = util::actor_receive_task(actor_name, subscription_index);
let inputs_ident = util::inputs_ident(&pseudo_task_name);
let elems = (0..capacity).map(|_| quote!(core::mem::MaybeUninit::uninit()));
let uninit_section = util::link_section_uninit();
spawn_infra.push(quote!(
#uninit_section
// /// Buffer that holds the inputs of a task
#[doc(hidden)]
static #inputs_ident: rtic::RacyCell<[core::mem::MaybeUninit<#message_ty>; #cap_lit]> =
rtic::RacyCell::new([#(#elems,)*]);
));
let fq_ident = util::fq_ident(&pseudo_task_name);
let fq_ty = quote!(rtic::export::SCFQ<#cap_lit_p1>);
let fq_expr = quote!(rtic::export::Queue::new());
spawn_infra.push(quote!(
// /// Queue version of a free-list that keeps track of empty slots in
// /// the following buffers
#[doc(hidden)]
static #fq_ident: rtic::RacyCell<#fq_ty> = rtic::RacyCell::new(#fq_expr);
));
let priority = actor.priority;
let t = util::spawn_t_ident(priority);
let device = &extra.device;
let enum_ = util::interrupt_ident();
let interrupt = &analysis
.interrupts
.get(&priority)
.expect("RTIC-ICE: interrupt identifer not found")
.0;
let call_update_watermark = if cfg!(feature = "memory-watermark") {
let update_watermark = util::update_watermark(subscription_index);
quote!(
#actor_name::#update_watermark((*#fq_ident.get()).len());
)
} else {
quote!()
};
let rq = util::rq_ident(priority);
let dequeue = quote!((&mut *#fq_ident.get_mut()).dequeue());
let post_name = util::actor_post(actor_name, subscription_index);
spawn_infra.push(quote!(
/// Safety: needs to be wrapped in a critical section
unsafe fn #post_name(message: #message_ty) -> Result<(), #message_ty> {
unsafe {
if let Some(index) = #dequeue {
#call_update_watermark
(*#inputs_ident
.get_mut())
.get_unchecked_mut(usize::from(index))
.as_mut_ptr()
.write(message);
(*#rq.get_mut()).enqueue_unchecked((#t::#pseudo_task_name, index));
rtic::pend(#device::#enum_::#interrupt);
Ok(())
} else {
Err(message)
}
}
}
mod #post_name {
/// Safety: needs to be wrapped in a critical section
pub unsafe fn is_full() -> bool {
// this is the queue version of a "free list" when it's empty the message
// queue of the task is full (= no more messages can be posted)
(&*super::#fq_ident.get()).len() == 0
}
}
));
}
}
// watermark API
let watermark_api = if cfg!(feature = "memory-watermark") {
watermark_api(app)
} else {
quote!()
};
quote! {
// Make `Post` methods available in the app module.
use rtic::export::Post as _;
#[derive(Clone, Copy)]
pub struct Poster;
#(#post_impls)*
#(#task_functions)*
#(#spawn_infra)*
#watermark_api
}
}
fn watermark_api(app: &App) -> TokenStream2 {
let mut actor_mods = vec![];
for (actor_name, actor) in &app.actors {
if actor.subscriptions.is_empty() {
// skip disconnected actors
continue;
}
let mut mod_items = vec![];
let mut subscriptions_elements = vec![];
for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() {
let capacity = util::capacity_literal(subscription.capacity.into());
let counter = format_ident!("COUNTER{}", subscription_index);
let update_watermark = util::update_watermark(subscription_index);
let watermark = util::watermark(subscription_index);
mod_items.push(quote!(
static #counter: AtomicUsize = AtomicUsize::new(0);
pub fn #update_watermark(fq_len: usize) {
let new_usage = #capacity - fq_len;
if new_usage > #counter.load(Ordering::Relaxed) {
#counter.store(new_usage, Ordering::Relaxed)
}
}
pub fn #watermark() -> usize {
#counter.load(Ordering::Relaxed)
}
));
let ty = &subscription.ty;
let message_type = quote!(#ty).to_string();
subscriptions_elements.push(quote!(
Subscription {
capacity: #capacity,
message_type: #message_type,
watermark: #watermark,
}
));
}
actor_mods.push(quote!(
pub mod #actor_name {
use core::sync::atomic::{AtomicUsize, Ordering};
use super::Subscription;
pub static SUBSCRIPTIONS: &[Subscription] =
&[#(#subscriptions_elements),*];
#(#mod_items)*
}
))
}
if actor_mods.is_empty() {
// all actors are disconnected
return quote!();
}
// NOTE this API could live in a crate like `rtic-core`
let subscription_api = quote!(
pub struct Subscription {
pub capacity: usize,
pub message_type: &'static str,
watermark: fn() -> usize,
}
impl Subscription {
pub fn watermark(&self) -> usize {
(self.watermark)()
}
}
impl core::fmt::Debug for Subscription {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Subscription")
.field("capacity", &self.capacity)
.field("message_type", &self.message_type)
.field("watermark", &self.watermark())
.finish()
}
}
);
quote!(
#subscription_api
#(#actor_mods)*
)
}

View file

@ -1,6 +1,6 @@
use proc_macro2::TokenStream as TokenStream2; use proc_macro2::TokenStream as TokenStream2;
use quote::quote; use quote::quote;
use rtic_syntax::ast::App; use rtic_syntax::{analyze::Spawnee, ast::App};
use crate::{analyze::Analysis, check::Extra, codegen::util}; use crate::{analyze::Analysis, check::Extra, codegen::util};
@ -14,15 +14,25 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
let mut stmts = vec![]; let mut stmts = vec![];
let variants = channel let variants = channel
.tasks .spawnees
.iter() .iter()
.map(|name| { .map(|spawnee| match spawnee {
let cfgs = &app.software_tasks[name].cfgs; Spawnee::Task { name } => {
let cfgs = &app.software_tasks[name].cfgs;
quote!( quote!(
#(#cfgs)* #(#cfgs)*
#name #name
) )
}
Spawnee::Actor {
name,
subscription_index,
} => {
let task_name = util::actor_receive_task(name, *subscription_index);
quote!(#task_name)
}
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -65,32 +75,58 @@ pub fn codegen(app: &App, analysis: &Analysis, _extra: &Extra) -> Vec<TokenStrea
)); ));
let arms = channel let arms = channel
.tasks .spawnees
.iter() .iter()
.map(|name| { .map(|spawnee| match spawnee {
let task = &app.software_tasks[name]; Spawnee::Task { name } => {
let cfgs = &task.cfgs; let task = &app.software_tasks[name];
let fq = util::fq_ident(name); let cfgs = &task.cfgs;
let inputs = util::inputs_ident(name); let fq = util::fq_ident(name);
let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs); let inputs = util::inputs_ident(name);
let (_, tupled, pats, _) = util::regroup_inputs(&task.inputs);
quote!( quote!(
#(#cfgs)* #(#cfgs)*
#t::#name => { #t::#name => {
let #tupled = let #tupled =
(&*#inputs (&*#inputs
.get()) .get())
.get_unchecked(usize::from(index)) .get_unchecked(usize::from(index))
.as_ptr() .as_ptr()
.read(); .read();
(&mut *#fq.get_mut()).split().0.enqueue_unchecked(index); (&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
let priority = &rtic::export::Priority::new(PRIORITY); let priority = &rtic::export::Priority::new(PRIORITY);
#name( #name(
#name::Context::new(priority) #name::Context::new(priority)
#(,#pats)* #(,#pats)*
) )
} }
) )
}
Spawnee::Actor {
name: actor_name,
subscription_index,
} => {
let task_name = util::actor_receive_task(actor_name, *subscription_index);
let fq = util::fq_ident(&task_name);
let inputs = util::inputs_ident(&task_name);
let function_name =
util::internal_actor_receive_task(actor_name, *subscription_index);
quote!(
#t::#task_name => {
let input =
(&*#inputs
.get())
.get_unchecked(usize::from(index))
.as_ptr()
.read();
(&mut *#fq.get_mut()).split().0.enqueue_unchecked(index);
#function_name(input)
}
)
}
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View file

@ -65,6 +65,28 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult {
) )
}) })
.collect(); .collect();
let actors_struct = if let Some(actors) = &init.user_actors_struct {
let fields = app
.actors
.iter()
.filter_map(|(name, ao)| {
if ao.init.is_none() {
let ty = &ao.ty;
Some(quote!(#name: #ty,))
} else {
None
}
})
.collect::<Vec<_>>();
quote!(
struct #actors {
#(#fields)*
}
)
} else {
quote!()
};
root_init.push(quote! { root_init.push(quote! {
struct #shared { struct #shared {
#(#shared_resources)* #(#shared_resources)*
@ -73,11 +95,17 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult {
struct #local { struct #local {
#(#local_resources)* #(#local_resources)*
} }
#actors_struct
}); });
// let locals_pat = locals_pat.iter(); // let locals_pat = locals_pat.iter();
let user_init_return = quote! {#shared, #local, #name::Monotonics}; let user_init_return = if let Some(actors) = &init.user_actors_struct {
quote! {#shared, #local, #name::Monotonics, #actors}
} else {
quote! {#shared, #local, #name::Monotonics}
};
let user_init = quote!( let user_init = quote!(
#(#attrs)* #(#attrs)*
@ -101,8 +129,13 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> CodegenResult {
} }
// let locals_new = locals_new.iter(); // let locals_new = locals_new.iter();
let let_pat = if init.user_actors_struct.is_some() {
quote!((shared_resources, local_resources, mut monotonics, actors))
} else {
quote!((shared_resources, local_resources, mut monotonics))
};
let call_init = quote! { let call_init = quote! {
let (shared_resources, local_resources, mut monotonics) = #name(#name::Context::new(core.into())); let #let_pat = #name(#name::Context::new(core.into()));
}; };
root_init.push(module::codegen( root_init.push(module::codegen(

View file

@ -66,5 +66,30 @@ pub fn codegen(
)); ));
} }
// Actor states
for (actor_name, actor) in &app.actors {
let mangled_name = util::actor_state_ident(actor_name);
let ty = &actor.ty;
let item = if let Some(init) = &actor.init {
quote!(
#[allow(non_upper_case_globals)]
#[doc(hidden)]
static #mangled_name: rtic::RacyCell<#ty> = rtic::RacyCell::new(#init);
)
} else {
let uninit_section = util::link_section_uninit();
quote!(
#[allow(non_upper_case_globals)]
#[doc(hidden)]
#uninit_section
static #mangled_name: rtic::RacyCell<core::mem::MaybeUninit<#ty>> = rtic::RacyCell::new(core::mem::MaybeUninit::uninit());
)
};
mod_app.push(item);
}
(mod_app, TokenStream2::new()) (mod_app, TokenStream2::new())
} }

View file

@ -46,9 +46,13 @@ pub fn codegen(
pub cs: rtic::export::CriticalSection<#lt> pub cs: rtic::export::CriticalSection<#lt>
)); ));
fields.push(quote!(poster: Poster));
values.push(quote!(cs: rtic::export::CriticalSection::new())); values.push(quote!(cs: rtic::export::CriticalSection::new()));
values.push(quote!(core)); values.push(quote!(core));
values.push(quote!(poster: Poster));
} }
Context::Idle | Context::HardwareTask(_) | Context::SoftwareTask(_) => {} Context::Idle | Context::HardwareTask(_) | Context::SoftwareTask(_) => {}
@ -222,33 +226,33 @@ pub fn codegen(
// Spawn caller // Spawn caller
items.push(quote!( items.push(quote!(
#(#cfgs)*
/// Spawns the task directly
pub fn #internal_spawn_ident(#(#args,)*) -> Result<(), #ty> {
let input = #tupled;
#(#cfgs)* unsafe {
/// Spawns the task directly if let Some(index) = rtic::export::interrupt::free(|_| (&mut *#fq.get_mut()).dequeue()) {
pub fn #internal_spawn_ident(#(#args,)*) -> Result<(), #ty> { (&mut *#inputs
let input = #tupled; .get_mut())
.get_unchecked_mut(usize::from(index))
.as_mut_ptr()
.write(input);
unsafe { rtic::export::interrupt::free(|_| {
if let Some(index) = rtic::export::interrupt::free(|_| (&mut *#fq.get_mut()).dequeue()) { (&mut *#rq.get_mut()).enqueue_unchecked((#t::#name, index));
(&mut *#inputs });
.get_mut())
.get_unchecked_mut(usize::from(index))
.as_mut_ptr()
.write(input);
rtic::export::interrupt::free(|_| { rtic::pend(#device::#enum_::#interrupt);
(&mut *#rq.get_mut()).enqueue_unchecked((#t::#name, index));
});
rtic::pend(#device::#enum_::#interrupt); Ok(())
} else {
Ok(()) Err(input)
} else { }
Err(input)
} }
}
})); }
));
module_items.push(quote!( module_items.push(quote!(
#(#cfgs)* #(#cfgs)*

View file

@ -43,6 +43,24 @@ pub fn codegen(app: &App, analysis: &Analysis) -> Vec<TokenStream2> {
} }
} }
// Initialize actors
for name in app.actors.iter().filter_map(|(name, actor)| {
if actor.init.is_none() {
Some(name)
} else {
None
}
}) {
let mangled_name = util::actor_state_ident(name);
stmts.push(quote!(
// Resource is a RacyCell<MaybeUninit<T>>
// - `get_mut_unchecked` to obtain `MaybeUninit<T>`
// - `as_mut_ptr` to obtain a raw pointer to `MaybeUninit<T>`
// - `write` the defined value for the late resource T
(&mut *#mangled_name.get_mut()).as_mut_ptr().write(actors.#name);
))
}
for (i, (monotonic, _)) in app.monotonics.iter().enumerate() { for (i, (monotonic, _)) in app.monotonics.iter().enumerate() {
// For future use // For future use
// let doc = format!(" RTIC internal: {}:{}", file!(), line!()); // let doc = format!(" RTIC internal: {}:{}", file!(), line!());

View file

@ -23,6 +23,18 @@ pub fn codegen(app: &App, analysis: &Analysis, extra: &Extra) -> Vec<TokenStream
)); ));
} }
for (actor_name, actor) in &app.actors {
for (subscription_index, subscription) in actor.subscriptions.iter().enumerate() {
let capacity = subscription.capacity;
let pseudo_task_name = util::actor_receive_task(actor_name, subscription_index);
let fq_ident = util::fq_ident(&pseudo_task_name);
stmts.push(quote!(
(0..#capacity).for_each(|i| (&mut *#fq_ident.get_mut()).enqueue_unchecked(i));
));
}
}
stmts.push(quote!( stmts.push(quote!(
// To set the variable in cortex_m so the peripherals cannot be taken multiple times // To set the variable in cortex_m so the peripherals cannot be taken multiple times
let mut core: rtic::export::Peripherals = rtic::export::Peripherals::steal().into(); let mut core: rtic::export::Peripherals = rtic::export::Peripherals::steal().into();

View file

@ -1,7 +1,7 @@
use core::sync::atomic::{AtomicUsize, Ordering}; use core::sync::atomic::{AtomicUsize, Ordering};
use proc_macro2::{Span, TokenStream as TokenStream2}; use proc_macro2::{Span, TokenStream as TokenStream2};
use quote::quote; use quote::{format_ident, quote};
use rtic_syntax::{ast::App, Context}; use rtic_syntax::{ast::App, Context};
use syn::{Attribute, Ident, LitInt, PatType}; use syn::{Attribute, Ident, LitInt, PatType};
@ -276,3 +276,27 @@ pub fn rt_err_ident() -> Ident {
Span::call_site(), Span::call_site(),
) )
} }
pub fn actor_state_ident(name: &Ident) -> Ident {
mark_internal_name(&format!("actor_{}_state", name))
}
pub fn actor_receive_task(name: &Ident, subscription_index: usize) -> Ident {
format_ident!("{}_receive_{}", name, subscription_index)
}
pub fn internal_actor_receive_task(name: &Ident, subscription_index: usize) -> Ident {
mark_internal_name(&actor_receive_task(name, subscription_index).to_string())
}
pub fn actor_post(name: &Ident, subscription_index: usize) -> Ident {
mark_internal_name(&format!("{}_post_{}", name, subscription_index))
}
pub fn update_watermark(subscription_index: usize) -> Ident {
mark_internal_name(&format!("update_watermark{}", subscription_index))
}
pub fn watermark(subscription_index: usize) -> Ident {
mark_internal_name(&format!("watermark{}", subscription_index))
}

7
post-spy/Cargo.toml Normal file
View file

@ -0,0 +1,7 @@
[package]
edition = "2018"
name = "rtic-post-spy"
version = "0.1.0"
[dependencies]
rtic-actor-traits = { path = "../actor-traits" }

76
post-spy/src/lib.rs Normal file
View file

@ -0,0 +1,76 @@
use std::any::Any;
use rtic_actor_traits::Post;
/// An implementation of `Post` that accepts "any" message type and lets you inspect all `post`-ed
/// messages
#[derive(Default)]
pub struct PostSpy {
posted_messages: Vec<Box<dyn Any>>,
}
impl PostSpy {
/// Returns an *iterator* over the posted messages
///
/// Note that you must specify *which* type of message you want to retrieve (the `T` in the
/// signature)
/// In practice, this will most likely mean using "turbo fish" syntax to specify the type:
/// `post_spy.posted_messages::<MyMessage>()`
pub fn posted_messages<T>(&self) -> impl Iterator<Item = &T>
where
T: Any,
{
self.posted_messages
.iter()
.filter_map(|message| message.downcast_ref())
}
}
impl<M> Post<M> for PostSpy
where
M: Any,
{
fn post(&mut self, message: M) -> Result<(), M> {
self.posted_messages.push(Box::new(message));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn post_and_inspect() {
let mut spy = PostSpy::default();
assert_eq!(None, spy.posted_messages::<i32>().next());
spy.post(42).unwrap();
assert_eq!(vec![&42], spy.posted_messages::<i32>().collect::<Vec<_>>());
}
#[test]
fn can_post_two_types_to_the_same_spy() {
#[derive(Debug, PartialEq)]
struct MessageA(i32);
#[derive(Debug, PartialEq)]
struct MessageB(i32);
let mut post_spy = PostSpy::default();
post_spy.post(MessageA(0)).unwrap();
post_spy.post(MessageB(1)).unwrap();
post_spy.post(MessageA(2)).unwrap();
post_spy.post(MessageB(3)).unwrap();
// peek *only* `MessageA` messages in `post` order
assert_eq!(
vec![&MessageA(0), &MessageA(2)],
post_spy.posted_messages::<MessageA>().collect::<Vec<_>>()
);
// peek *only* `MessageB` messages in `post` order
assert_eq!(
vec![&MessageB(1), &MessageB(3)],
post_spy.posted_messages::<MessageB>().collect::<Vec<_>>()
);
}
}

View file

@ -16,6 +16,7 @@ pub use cortex_m::{
pub use heapless::sorted_linked_list::SortedLinkedList; pub use heapless::sorted_linked_list::SortedLinkedList;
pub use heapless::spsc::Queue; pub use heapless::spsc::Queue;
pub use heapless::BinaryHeap; pub use heapless::BinaryHeap;
pub use rtic_actor_traits::{Post, Receive};
pub use rtic_monotonic as monotonic; pub use rtic_monotonic as monotonic;
pub type SCFQ<const N: usize> = Queue<u8, N>; pub type SCFQ<const N: usize> = Queue<u8, N>;

View file

@ -0,0 +1,20 @@
#![no_main]
#[rtic::app(device = lm3s6965)]
mod app {
#[actors]
struct Actors {
a: A,
}
#[shared]
struct Shared {}
#[local]
struct Local {}
#[init]
fn init(cx: init::Context) -> (Shared, Local, init::Monotonics, Actors) {
(Shared {}, Local {}, init::Monotonics {}, Actors { a: A })
}
}

View file

@ -0,0 +1,5 @@
error: not enough interrupts to dispatch all software tasks / actors (need: 1; given: 0)
--> $DIR/actor-extern-interrupt-not-enough.rs:7:9
|
7 | a: A,
| ^

View file

@ -1,4 +1,4 @@
error: not enough interrupts to dispatch all software tasks (need: 1; given: 0) error: not enough interrupts to dispatch all software tasks / actors (need: 1; given: 0)
--> $DIR/extern-interrupt-not-enough.rs:17:8 --> $DIR/extern-interrupt-not-enough.rs:17:8
| |
17 | fn a(_: a::Context) {} 17 | fn a(_: a::Context) {}