From 491de0564f4b309f66943c2e32a244a6f9b60a21 Mon Sep 17 00:00:00 2001 From: Paul Zinselmeyer Date: Thu, 8 Aug 2024 23:52:46 +0200 Subject: [PATCH] double linked list can driver --- canome/Cargo.toml | 26 +- canome/src/bus.rs | 399 ++++++++++------------------ canome/src/bxcan.rs | 198 ++++++++++++++ canome/src/can.rs | 576 +++++++++++++++++++++++++++++++--------- canome/src/canome.rs | 242 +++++++++++++++++ canome/src/contact.rs | 13 +- canome/src/cover.rs | 88 +++++- canome/src/lib.rs | 62 ++++- canome/src/power.rs | 13 +- canome/src/socketcan.rs | 147 ++++++++++ canome/src/spmc.rs | 418 +++++++++++++++++++++++++++++ canome/src/state.rs | 13 +- canome/src/stm32.rs | 37 +-- canome/src/time.rs | 13 +- 14 files changed, 1789 insertions(+), 456 deletions(-) create mode 100644 canome/src/bxcan.rs create mode 100644 canome/src/canome.rs create mode 100644 canome/src/socketcan.rs create mode 100644 canome/src/spmc.rs diff --git a/canome/Cargo.toml b/canome/Cargo.toml index fdd4488..5a04483 100644 --- a/canome/Cargo.toml +++ b/canome/Cargo.toml @@ -11,16 +11,24 @@ keywords = [ "CAN", "embedded", "smarthome" ] [dependencies] defmt = "0.3" -heapless = { version = "0.8.0", default-features = false } -critical-section = "1.0" +heapless = { version = "0.8", default-features = false } +critical-section = "1.1" -embedded-hal = { version = "0.2.7", optional = true } -stm32f1xx-hal = { version = "0.10.0", optional = true } -rtic-monotonics = { version = "1.0.0", optional = true } -bxcan = { version = "0.7.0", optional = true } -nb = "1.1.0" +embedded-hal = { version = "0.2", optional = true } +stm32f1xx-hal = { version = "0.10", optional = true } +rtic-monotonics = { version = "1.0", optional = true } +bxcan = { version = "0.7", optional = true } +socketcan = { version = "3.3", default-features = false, optional = true } +nb = "1.1" +once_cell = { version = "1.19.0", default-features = false, features = ["critical-section"] } + +embedded-can = "0.4" [features] -default = [ "can", "stm32" ] -can = [ "dep:bxcan" ] +default = [ "bxcan" ] +bxcan = [ "dep:bxcan" ] +socketcan = [ "dep:socketcan" ] stm32 = [ "dep:embedded-hal", "dep:stm32f1xx-hal", "dep:rtic-monotonics", "dep:bxcan" ] + +[dev-dependencies] +critical-section = { version = "1.1", features = ["std"]} diff --git a/canome/src/bus.rs b/canome/src/bus.rs index 6fcb422..b81e6ad 100644 --- a/canome/src/bus.rs +++ b/canome/src/bus.rs @@ -1,291 +1,184 @@ use core::{ - cell::RefCell, + cell::{Cell, RefCell}, fmt::Debug, - future::{poll_fn, Future}, + future::poll_fn, marker::PhantomData, task::{Poll, Waker}, }; use critical_section::Mutex; -use heapless::{LinearMap, Vec}; +use defmt::warn; +use embedded_can::Id; +use heapless::Deque; +use once_cell::sync::OnceCell; -use crate::{BusBackend, Channel, Decode, Encode}; +use crate::{CanDataFormat, Channel, Decode, Encode}; -pub struct Bus { - rx_queues: critical_section::Mutex, CHANNELS>>>, - backend: B, +pub(crate) struct ReceiverInner { + pub(crate) id: Id, + pub(crate) queue: Mutex>>, + pub(crate) waker: Mutex>>, +} +pub(crate) struct TransmitterInner { + pub(crate) id: Id, + pub(crate) remote_received: Mutex>, + pub(crate) waker: Mutex>>, +} + +pub struct UnboundReceiver<'a> { + pub(crate) bus: &'a dyn Bus, + pub(crate) inner: &'a OnceCell, +} + +pub struct BoundReceiver<'a, C: Channel> { + pub(crate) bus: &'a dyn Bus, + pub(crate) _channel: PhantomData, + pub(crate) inner: &'a ReceiverInner, +} + +pub struct UnboundTransmitter<'a> { + pub(crate) bus: &'a dyn Bus, + pub(crate) inner: &'a OnceCell, +} + +pub struct BoundTransmitter<'a, C: Channel> { + pub(crate) bus: &'a dyn Bus, + pub(crate) _channel: PhantomData, + pub(crate) inner: &'a TransmitterInner, } #[derive(Debug)] -pub enum BusError { - Backend(B::Error), +pub enum BusError { ChannelCapExceeded, } -#[derive(Debug)] -struct BusInner { - state: B::Data, - wakers: Vec, - sender: bool, +pub trait Bus: Send + Sync { + fn transmit(&self, id: Id, value: CanDataFormat) -> Result<(), BusError>; + fn request(&self, id: Id) -> Result<(), BusError>; + + fn distribute_data(&self, id: Id, value: CanDataFormat); + fn distribute_remote(&self, id: Id); } -#[derive(Clone)] -pub struct BusChannelTransceiver<'a, B: BusBackend, C: Channel> { - bus: &'a dyn BusBehaviour, - _channel: PhantomData, -} -#[derive(Clone)] -pub struct BusChannelReceiver<'a, B: BusBackend, C: Channel> { - bus: &'a dyn BusBehaviour, - _channel: PhantomData, -} - -pub trait BusBehaviour { - fn get(&self, id: B::ID) -> Option; - fn register_waker(&self, id: B::ID, waker: &Waker); - fn publish(&self, id: B::ID, value: B::Data) -> Result<(), BusError>; - fn publish_local(&self, id: &B::ID, value: B::Data); - fn request(&self, id: B::ID) -> Result<(), BusError>; -} - -impl> BusBehaviour for &T { - fn get(&self, id: ::ID) -> Option<::Data> { - (*self).get(id) - } - - fn register_waker(&self, id: ::ID, waker: &Waker) { - (*self).register_waker(id, waker) - } - - fn publish( - &self, - id: ::ID, - value: ::Data, - ) -> Result<(), BusError> { - (*self).publish(id, value) - } - - fn publish_local(&self, id: &::ID, value: ::Data) { - (*self).publish_local(id, value) - } - - fn request(&self, id: ::ID) -> Result<(), BusError> { - (*self).request(id) - } -} - -pub trait BusRx, B: BusBackend> { - fn get(&self) -> C::State; - fn request(&self) -> Result<(), BusError>; - fn next(&self) -> impl Future; -} - -pub trait BusTx, B: BusBackend> { - fn publish(&self, value: C::State) -> Result<(), BusError>; -} - -impl BusBehaviour - for Bus -where - B::ID: Eq, -{ - fn get(&self, id: B::ID) -> Option { - critical_section::with(|cs| { - self.rx_queues - .borrow(cs) - .borrow_mut() - .get_mut(&id) - .map(|x| x.state.clone()) - }) - } - - fn register_waker(&self, id: B::ID, waker: &Waker) { - critical_section::with(|cs| { - if let Some(inner) = self.rx_queues.borrow(cs).borrow_mut().get_mut(&id) { - if inner.wakers.iter().any(|ew| waker.will_wake(ew)) { - return; - } - - if inner.wakers.is_full() { - inner.wake(); - } - - if inner.wakers.push(waker.clone()).is_err() { - panic!("tried to push a waker to a zero length waker list"); - } - } - }) - } - - fn publish(&self, id: B::ID, value: B::Data) -> Result<(), BusError> { - self.publish_local(&id, value.clone()); - - self.backend - .publish(id, Some(value)) - .map_err(BusError::Backend) - } - - fn publish_local(&self, id: &B::ID, value: B::Data) { - critical_section::with(|cs| { - if let Some(inner) = self.rx_queues.borrow(cs).borrow_mut().get_mut(id) { - inner.state = value.clone(); - inner.wake(); - } +impl<'a> UnboundTransmitter<'a> { + pub fn bind(self) -> BoundTransmitter<'a, C> { + self.inner.set(TransmitterInner { + id: C::ID, + remote_received: Mutex::new(Cell::new(false)), + waker: Mutex::new(Cell::new(None)), }); - } - - fn request(&self, id: B::ID) -> Result<(), BusError> { - self.backend.publish(id, None).map_err(BusError::Backend) - } -} - -impl BusInner { - fn wake(&mut self) { - let mut wakers = Vec::new(); - core::mem::swap(&mut wakers, &mut self.wakers); - wakers.into_iter().for_each(|w| w.wake()); - } -} - -impl Bus -where - B::ID: Eq, -{ - pub const fn new(backend: B) -> Self { - Self { - rx_queues: Mutex::new(RefCell::new(LinearMap::new())), - backend, + self.bus.request(C::ID); + BoundTransmitter { + bus: self.bus, + _channel: PhantomData, + inner: &self.inner.get().unwrap(), } } - - pub fn transceiver>(&self) -> Result, BusError> - where - C::State: Default + Encode, - { - critical_section::with(|cs| { - let mut rx_queues = self.rx_queues.borrow(cs).borrow_mut(); - if let Some(existing) = rx_queues.get_mut(&C::ID) { - existing.sender = true; - Ok(BusChannelTransceiver { - bus: self, - _channel: PhantomData, - }) - } else if rx_queues.len() < SUBS { - self.request(C::ID).ok(); - let _ = rx_queues.insert( - C::ID, - BusInner { - state: C::State::default().encode(), - wakers: Vec::new(), - sender: true, - }, - ); - Ok(BusChannelTransceiver { - bus: self, - _channel: PhantomData, - }) - } else { - Err(BusError::ChannelCapExceeded) - } - }) - } - - pub fn receiver>(&self) -> Result, BusError> - where - C::State: Default + Encode, - { - critical_section::with(|cs| { - let mut rx_queues = self.rx_queues.borrow(cs).borrow_mut(); - if rx_queues.contains_key(&C::ID) { - Ok(BusChannelReceiver { - bus: self, - _channel: PhantomData, - }) - } else if rx_queues.len() < SUBS { - self.request(C::ID).ok(); - let _ = rx_queues.insert( - C::ID, - BusInner { - state: C::State::default().encode(), - wakers: Vec::new(), - sender: false, - }, - ); - Ok(BusChannelReceiver { - bus: self, - _channel: PhantomData, - }) - } else { - Err(BusError::ChannelCapExceeded) - } - }) - } - - pub fn backend(&self) -> &B { - &self.backend +} +impl<'a> UnboundReceiver<'a> { + pub fn bind(self) -> BoundReceiver<'a, C> { + self.inner.set(ReceiverInner { + id: C::ID, + queue: Mutex::new(RefCell::new(Deque::new())), + waker: Mutex::new(Cell::new(None)), + }); + self.bus.request(C::ID); + BoundReceiver { + bus: self.bus, + _channel: PhantomData, + inner: &self.inner.get().unwrap(), + } } } -impl<'a, B: BusBackend, C: Channel> BusTx for BusChannelTransceiver<'a, B, C> +impl<'a, C: Channel> BoundReceiver<'a, C> where - C::State: Encode, + C::Message: Decode + Encode, { - fn publish(&self, value: C::State) -> Result<(), BusError> { - self.bus.publish(C::ID, value.encode()) - } -} - -impl<'a, B: BusBackend, C> BusRx for BusChannelTransceiver<'a, B, C> -where - C::State: Decode + PartialEq + Default, - C: Channel, -{ - fn get(&self) -> C::State { - self.bus - .get(C::ID) - .and_then(|x| C::State::decode(x).ok()) - .expect("valid group") - } - fn request(&self) -> Result<(), BusError> { + pub fn request(&self) -> Result<(), BusError> { self.bus.request(C::ID) } - async fn next(&self) -> C::State { - let old_state: C::State = self.get(); + pub fn try_recv(&self) -> Option { + critical_section::with(|cs| { + match self + .inner + .queue + .borrow(cs) + .borrow_mut() + .pop_back() + .map(C::Message::decode) + { + Some(Ok(frame)) => Some(frame), + Some(Err(_)) => { + warn!("Failed to decode Frame"); + None + } + None => None, + } + }) + } + pub async fn recv(&self) -> C::Message { poll_fn(|cx| { - self.bus.register_waker(C::ID, cx.waker()); + let frame = critical_section::with(|cs| { + let existing_waker = self.inner.waker.borrow(cs); - let state: C::State = self.get(); - match state == old_state { - false => Poll::Ready(state), - true => Poll::Pending, + let waker = match existing_waker.take() { + Some(waker) if cx.waker().will_wake(&waker) => waker, + Some(waker) => { + waker.wake(); + cx.waker().clone() + } + None => cx.waker().clone(), + }; + + existing_waker.set(Some(waker)); + + self.inner.queue.borrow(cs).borrow_mut().pop_back() + }) + .map(C::Message::decode); + + match frame { + Some(Ok(frame)) => Poll::Ready(frame), + Some(Err(_)) => { + warn!("Failed to decode Frame"); + Poll::Pending + } + None => Poll::Pending, } }) .await } } -impl<'a, B: BusBackend, C> BusRx for BusChannelReceiver<'a, B, C> -where - C::State: Decode + PartialEq + Default, - C: Channel, -{ - fn get(&self) -> C::State { - self.bus - .get(C::ID) - .and_then(|x| C::State::decode(x).ok()) - .expect("valid group") - } - fn request(&self) -> Result<(), BusError> { - self.bus.request(C::ID) - } - async fn next(&self) -> C::State { - let old_state: C::State = self.get(); - poll_fn(|cx| { - self.bus.register_waker(C::ID, cx.waker()); - let state: C::State = self.get(); - match state == old_state { - false => Poll::Ready(state), - true => Poll::Pending, +impl<'a, C: Channel> BoundTransmitter<'a, C> +where + C::Message: Decode + Encode, +{ + pub fn transmit(&self, value: C::Message) -> Result<(), BusError> { + let encoded = value.encode(); + self.bus.transmit(C::ID, encoded) + } + pub async fn wait_remote(&self) { + poll_fn(|cx| { + let remote_received = critical_section::with(|cs| { + let existing_waker = self.inner.waker.borrow(cs); + + let waker = match existing_waker.take() { + Some(waker) if cx.waker().will_wake(&waker) => waker, + Some(waker) => { + waker.wake(); + cx.waker().clone() + } + None => cx.waker().clone(), + }; + existing_waker.set(Some(waker)); + + self.inner.remote_received.borrow(cs).replace(false) + }); + + match remote_received { + true => Poll::Ready(()), + false => Poll::Pending, } }) .await diff --git a/canome/src/bxcan.rs b/canome/src/bxcan.rs new file mode 100644 index 0000000..2a6afeb --- /dev/null +++ b/canome/src/bxcan.rs @@ -0,0 +1,198 @@ +use core::{array, fmt::Debug, mem::MaybeUninit}; + +use crate::{ + bus::{Bus, BusError, ReceiverInner, TransmitterInner, UnboundReceiver, UnboundTransmitter}, + CanDataFormat, +}; +use bxcan::{Instance, Interrupt, Rx0, Tx}; +use embedded_can::{ExtendedId, Id, StandardId}; +use heapless::mpmc::MpMcQueue; +use once_cell::sync::OnceCell; + +pub struct BxCanBus { + transmitters: [OnceCell; TQ], + receivers: [OnceCell; RQ], + tx_queue: MpMcQueue, + tx: Tx, + rx: Rx0, +} + +#[derive(Debug)] +pub enum BxCanBusError { + /// An object couldn't be added to the TX queue of [`BxCanBus`] because it is full. + TxQueueFull, +} + +impl BxCanBus { + pub fn initialize( + bus: &mut MaybeUninit, + ) -> ( + &Self, + [UnboundTransmitter<'_>; TQ], + [UnboundReceiver<'_>; RQ], + ) { + let bus = bus.write(Self { + receivers: array::from_fn(|_| OnceCell::new()), + transmitters: array::from_fn(|_| OnceCell::new()), + tx_queue: MpMcQueue::new(), + tx: todo!(), + rx: todo!(), + }); + ( + bus, + array::from_fn(|i| UnboundTransmitter { + bus, + inner: &bus.transmitters[i], + }), + array::from_fn(|i| UnboundReceiver { + bus, + inner: &bus.receivers[i], + }), + ) + } +} + +impl Bus for BxCanBus { + fn transmit(&self, id: Id, value: CanDataFormat) -> Result<(), BusError> { + self.tx_queue.enqueue(encode_frame(id, Some(value))); + self.distribute_data(id, value); + //TODO: trigger CAN_TX interrupt + self.handle_tx_interrupt(); + Ok(()) + } + + fn request(&self, id: Id) -> Result<(), BusError> { + self.tx_queue.enqueue(encode_frame(id, None)); + self.distribute_remote(id); + //TODO: trigger CAN_TX interrupt + self.handle_tx_interrupt(); + Ok(()) + } + + fn distribute_data(&self, id: Id, value: CanDataFormat) { + for inner in self + .receivers + .iter() + .filter_map(|x| x.get()) + .filter(|x| x.id == id) + { + critical_section::with(|cs| { + inner.queue.borrow(cs).borrow_mut().push_front(value); + if let Some(waker) = inner.waker.borrow(cs).take() { + waker.wake() + } + }); + } + } + + fn distribute_remote(&self, id: Id) { + for inner in self + .transmitters + .iter() + .filter_map(|x| x.get()) + .filter(|x| x.id == id) + { + critical_section::with(|cs| { + inner.remote_received.borrow(cs).set(true); + if let Some(waker) = inner.waker.borrow(cs).take() { + waker.wake(); + } + }); + } + } +} + +impl BxCanBus { + pub fn handle_tx_interrupt(&self) -> Result<(), BxCanBusError> { + let tx = &self.tx_queue; + while let Some(frame) = tx.dequeue() { + match self.tx.transmit(&frame) { + Ok(status) => match status.dequeued_frame() { + None => { + defmt::debug!("Transmitting CAN frame to {}.", frame.id()); + } + Some(old_frame) => { + defmt::debug!( + "Transmitting CAN frame to {} by displacing other to {}, requeing it.", + frame.id(), + old_frame.id() + ); + tx.enqueue(old_frame.clone()) + .map_err(|_| BxCanBusError::TxQueueFull)?; + } + }, + Err(nb::Error::WouldBlock) => { + defmt::debug!( + "Couldn't trasmit CAN frame to {}, requeing frame.", + frame.id() + ); + tx.enqueue(frame).map_err(|_| BxCanBusError::TxQueueFull)?; + } + Err(_) => unreachable!(), + } + } + Ok(()) + } + pub fn handle_rx_interrupt( + &self, + can_rx: &mut Rx0, + ) -> Result<(), BxCanBusError> { + loop { + match can_rx.receive() { + Ok(frame) => { + let (channel_id, data) = decode_frame(&frame); + if let Some(data) = data { + self.distribute_data(channel_id, data); + } else { + self.distribute_remote(channel_id); + } + + /* + if let Some(data) = data { + bus.publish_local(&channel_id, data); + } else if let Some(state) = bus.get(channel_id) { + defmt::debug!("responding to remote frame"); + self.tx_queue + .enqueue(encode_frame(channel_id, Some(state))) + .map_err(|_| BxCanBusError::TxQueueFull)?; + + //TODO on publish + } + */ + } + Err(nb::Error::WouldBlock) => break, + Err(nb::Error::Other(_)) => break, + } + } + + Ok(()) + } +} + +fn decode_frame(frame: &bxcan::Frame) -> (Id, Option) { + let id = match frame.id() { + bxcan::Id::Standard(x) => Id::Standard(unsafe { StandardId::new_unchecked(x.as_raw()) }), + bxcan::Id::Extended(x) => Id::Extended(unsafe { ExtendedId::new_unchecked(x.as_raw()) }), + }; + if let Some(data) = frame.data() { + (id, Some(CanDataFormat::new(data))) + } else { + (id, None) + } +} + +fn encode_frame(id: Id, data: Option) -> bxcan::Frame { + let id = match id { + Id::Standard(x) => { + bxcan::Id::Standard(unsafe { bxcan::StandardId::new_unchecked(x.as_raw()) }) + } + Id::Extended(x) => { + bxcan::Id::Extended(unsafe { bxcan::ExtendedId::new_unchecked(x.as_raw()) }) + } + }; + if let Some(data) = data { + bxcan::Frame::new_data(id, bxcan::Data::new(data.data()).expect("valid can data")) + } else { + bxcan::Frame::new_remote(id, 8) + } +} diff --git a/canome/src/can.rs b/canome/src/can.rs index ca79d61..319f1d8 100644 --- a/canome/src/can.rs +++ b/canome/src/can.rs @@ -1,158 +1,474 @@ -use core::{fmt::Debug, marker::ConstParamTy}; - -use crate::{bus::BusBehaviour, BusBackend, BusDataFormat}; -use bxcan::{ExtendedId, Id, Instance, Rx0, StandardId, Tx}; +use core::borrow::{Borrow, BorrowMut}; +use core::fmt::Debug; +use core::mem::MaybeUninit; +use core::ops::{Deref, DerefMut}; use defmt::Format; -use heapless::mpmc::MpMcQueue; +use heapless::Vec; -pub struct BxCanBus { - tx_queue: MpMcQueue<(::ID, Option<::Data>), TX_CAP>, - on_publish: &'static (dyn Fn(&Self) + Send + Sync), +use crate::spmc::{Channel, ChannelClient, Receive, Receiver, ReceiverRef}; + +#[derive(Clone, Debug)] +pub struct CanData(pub Vec); +#[derive(Clone, Copy, PartialEq, Eq, Format, Debug)] +pub enum CanId { + Standard(CanStandardId), + Extended(CanExtendedId), +} +#[derive(Clone, Copy, PartialEq, Eq, Debug, Format)] +pub struct CanStandardId(u16); +#[derive(Clone, Copy, PartialEq, Eq, Debug, Format)] +pub struct CanExtendedId(u32); + +#[derive(Clone, Debug, Format)] +pub enum CanFrame { + Data(CanDataFrame), + Remote(CanRemoteFrame), } -impl Debug for BxCanBus { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("BxCanBus").finish() +#[derive(Clone, Debug, Format)] +pub struct CanDataFrame(pub CanId, pub CanData); +#[derive(Clone, Debug, Format)] +pub struct CanRemoteFrame(pub CanId); + +impl Format for CanData { + fn format(&self, fmt: defmt::Formatter) { + defmt::write!(fmt, "CanData({})", self.0.as_slice()) } } -#[derive(PartialEq, Eq, ConstParamTy, Debug, Clone, Copy)] -pub enum BxCanId { - Standard(u16), - Extended(u32), +impl CanStandardId { + pub const fn new(value: u16) -> Self { + assert!(value < (1 << 11)); + Self(value) + } } - -#[derive(Debug)] -pub enum BxCanBusError { - /// An object couldn't be added to the TX queue of [`BxCanBus`] because it is full. - TxQueueFull, -} - -impl BusBackend for BxCanBus { - type ID = BxCanId; - - type Data = CanDataFormat; - - type Error = BxCanBusError; - - fn publish(&self, id: Self::ID, data: Option) -> Result<(), Self::Error> { - self.tx_queue - .enqueue((id, data)) - .map_err(|_| Self::Error::TxQueueFull)?; - - (self.on_publish)(self); - - Ok(()) +impl CanExtendedId { + pub const fn new(value: u32) -> Self { + assert!(value < (1 << 29)); + Self(value) } } -impl BxCanBus { - pub const fn new_with_hook(on_publish: &'static (dyn Fn(&Self) + Send + Sync)) -> Self { - Self { - tx_queue: MpMcQueue::new(), - on_publish, +impl CanDataFrame { + pub fn id(&self) -> CanId { + self.0 + } +} + +impl CanRemoteFrame { + pub fn id(&self) -> CanId { + self.0 + } +} + +impl CanFrame { + pub fn id(&self) -> CanId { + match *self { + Self::Data(CanDataFrame(id, _)) => id, + Self::Remote(CanRemoteFrame(id)) => id, } } +} - pub fn handle_tx_interrupt( - &self, - can_tx: &mut Tx, - ) -> Result<(), BxCanBusError> { - let rx = &self.tx_queue; +impl CanData { + pub fn new(data: &[u8]) -> Self { + Self(Vec::from_slice(data).unwrap()) + } + pub fn data(&self) -> &[u8] { + self.0.as_slice() + } +} - while let Some((channel_id, data)) = rx.dequeue() { - let frame = encode_frame(channel_id, data); - match can_tx.transmit(&frame) { - Ok(status) => match status.dequeued_frame().map(decode_frame) { - None => {} - Some((channel_id, data)) => rx - .enqueue((channel_id, data)) - .map_err(|_| BxCanBusError::TxQueueFull)?, - }, - Err(nb::Error::WouldBlock) => { - rx.enqueue((channel_id, data)) - .map_err(|_| BxCanBusError::TxQueueFull)?; - break; - } - Err(_) => unreachable!(), +pub trait CanDriver { + type Error; + + fn handle<'driver: 'client, 'client, I: CanClientInner<'client, 'driver>>( + &'driver self, + unbound: &'client mut UnboundCanHandle<'driver, I>, + builder: I::InitData, + ) -> CanHandleRef<'client, 'driver, I>; + + fn handle_tx_interrupt(&self) -> Result<(), Self::Error>; + fn handle_rx_interrupt(&self) -> Result<(), Self::Error>; +} + +trait CanDriverInternal { + fn send(&self, value: CanFrame); + fn channel(&self) -> &Channel; +} + +pub struct CanHandleRef<'client, 'driver, I: CanClientInner<'client, 'driver>>( + ReceiverRef<'driver, 'client, CanFrame, CanClient<'driver, I>>, +); + +pub struct UnboundCanHandle<'driver, I>(MaybeUninit>); + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> Default + for UnboundCanHandle<'driver, I> +{ + fn default() -> Self { + Self(MaybeUninit::uninit()) + } +} + +pub struct CanHandle<'client, 'driver, I>(&'client CanClient<'driver, I>); + +struct CanClient<'driver, I> { + driver: &'driver dyn CanDriverInternal, + inner: I, + client: Option>, +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> Receive + for CanClient<'driver, I> +{ + fn receive(&self, value: &CanFrame) { + self.inner.handle_frame(value.clone()) + } +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> Receiver<'driver, CanFrame> + for CanClient<'driver, I> +{ + fn channel(&self) -> &crate::spmc::Channel { + self.driver.channel() + } + fn channel_client(&self) -> &ChannelClient { + self.client.as_ref().unwrap() + } +} + +pub trait CanClientInner<'client, 'driver> { + type InitData; + fn new(init_data: Self::InitData, handle: CanHandle<'client, 'driver, Self>) -> Self + where + Self: Sized; + fn handle_frame(&self, frame: CanFrame); +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> Borrow + for CanHandleRef<'client, 'driver, I> +{ + fn borrow(&self) -> &I { + &self.0.as_ref().inner + } +} +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> BorrowMut + for CanHandleRef<'client, 'driver, I> +{ + fn borrow_mut(&mut self) -> &mut I { + let x: &mut CanClient<'driver, I> = self.0.borrow_mut(); + &mut x.inner + } +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> Deref + for CanHandleRef<'client, 'driver, I> +{ + type Target = I; + fn deref(&self) -> &Self::Target { + &self.0.as_ref().inner + } +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> DerefMut + for CanHandleRef<'client, 'driver, I> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + let x: &mut CanClient<'driver, I> = self.0.borrow_mut(); + &mut x.inner + } +} + +impl<'client, 'driver, I: CanClientInner<'client, 'driver>> CanHandle<'client, 'driver, I> { + pub fn send(&self, frame: CanFrame) { + self.0.driver.send(frame) + } +} + +pub mod basic_client { + use super::{CanClientInner, CanFrame, CanHandle}; + use core::cell::RefCell; + use core::future::{poll_fn, Future}; + use core::task::{Poll, Waker}; + use critical_section::Mutex; + use heapless::Deque; + + pub enum Error { + Overflowed, + } + + pub struct BasicCanClient<'client, 'driver: 'client, const N: usize> { + handle: CanHandle<'client, 'driver, Self>, + inner: Mutex>>, + } + + struct BasicCanClientInner { + queue: Deque, + overflowed: bool, + waker: Option, + } + + impl<'client, 'driver: 'client, const N: usize> CanClientInner<'client, 'driver> + for BasicCanClient<'client, 'driver, N> + { + type InitData = (); + + fn new(_init_data: Self::InitData, handle: CanHandle<'client, 'driver, Self>) -> Self + where + Self: Sized, + { + Self { + handle, + inner: Mutex::new(RefCell::new(BasicCanClientInner { + queue: Deque::new(), + overflowed: false, + waker: None, + })), } } - Ok(()) - } - pub fn handle_rx_interrupt( - &self, - can_rx: &mut Rx0, - bus: impl BusBehaviour, - ) -> Result<(), BxCanBusError> { - loop { - match can_rx.receive() { - Ok(frame) => { - let (channel_id, data) = decode_frame(&frame); - if let Some(data) = data { - bus.publish_local(&channel_id, data); - } else if let Some(state) = bus.get(channel_id) { - self.tx_queue - .enqueue((channel_id, Some(state))) - .map_err(|_| BxCanBusError::TxQueueFull)?; - (self.on_publish)(self); + fn handle_frame(&self, frame: CanFrame) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + if inner.queue.is_full() { + inner.queue.pop_back(); + inner.overflowed = true; + } + let Ok(_) = inner.queue.push_front(frame) else { + unreachable!("queue always has at least one free slot") + }; + if let Some(waker) = inner.waker.take() { + waker.wake(); + } + }) + } + } + impl<'client, 'driver: 'client, const N: usize> BasicCanClient<'client, 'driver, N> { + pub fn recv(&mut self) -> impl Future> + '_ { + poll_fn(|cx| { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + + let waker = cx.waker(); + if !inner + .waker + .as_ref() + .map(|x| x.will_wake(waker)) + .unwrap_or_default() + { + let mut waker = Some(waker.clone()); + core::mem::swap(&mut waker, &mut inner.waker); + if let Some(waker) = waker { + waker.wake(); + } + } + + if inner.overflowed { + inner.overflowed = false; + Poll::Ready(Err(Error::Overflowed)) + } else { + match inner.queue.pop_back() { + Some(x) => Poll::Ready(Ok(x)), + None => Poll::Pending, + } + } + }) + }) + } + pub fn try_recv(&mut self) -> Result, Error> { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + + if inner.overflowed { + inner.overflowed = false; + Err(Error::Overflowed) + } else { + Ok(inner.queue.pop_back()) + } + }) + } + pub fn send(&mut self, frame: CanFrame) { + self.handle.send(frame) + } + } +} + +pub mod bxcan { + use bxcan::{Can, Instance}; + use critical_section::Mutex; + use heapless::{Deque, Vec}; + + use crate::{ + can::CanDriver, + spmc::{Channel, ChannelClient}, + }; + use core::cell::RefCell; + + use super::{ + CanClient, CanData, CanDataFrame, CanDriverInternal, CanExtendedId, CanFrame, CanHandle, + CanHandleRef, CanId, CanRemoteFrame, CanStandardId, + }; + pub enum Error { + TxQueueFull, + } + pub struct BxCanDriver { + clients: Channel, + inner: Mutex>>, + } + struct BxCanDriverInner { + tx_queue: Deque, + can: Can, + } + + impl BxCanDriver { + pub fn new(can: Can) -> Self { + Self { + clients: Channel::default(), + inner: Mutex::new(RefCell::new(BxCanDriverInner { + can, + tx_queue: Deque::new(), + })), + } + } + } + + impl CanDriverInternal for BxCanDriver { + fn channel(&self) -> &Channel { + &self.clients + } + fn send(&self, value: CanFrame) { + if critical_section::with(|cs| { + //TODO: error handling + let mut inner = self.inner.borrow_ref_mut(cs); + let was_empty = inner.tx_queue.is_empty(); + self.clients.distribute(&value); + inner.tx_queue.push_front(value); + was_empty + }) { + self.handle_tx_interrupt(); + } + } + } + + impl CanDriver for BxCanDriver { + type Error = Error; + + fn handle<'driver: 'client, 'client, I: super::CanClientInner<'client, 'driver>>( + &'driver self, + unbound: &'client mut super::UnboundCanHandle<'driver, I>, + builder: I::InitData, + ) -> super::CanHandleRef<'client, 'driver, I> { + let handle = unbound.0.write(CanClient { + driver: self, + inner: I::new(builder, CanHandle(unsafe { &*(unbound.0.as_ptr()) })), + client: None, + }); + + handle.client = Some(ChannelClient::new(&*handle)); + + CanHandleRef(self.clients.register(handle)) + } + + fn handle_tx_interrupt(&self) -> Result<(), Error> { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + inner.can.clear_tx_interrupt(); + while let Some(frame) = inner.tx_queue.pop_back() { + match inner.can.transmit(&frame.clone().into()) { + Ok(status) => match status.dequeued_frame() { + None => { + defmt::debug!("Transmitting CAN frame to {}.", frame.id()); + } + Some(old_frame) => { + defmt::debug!( + "Transmitting CAN frame to {} by displacing other to {}, requeing it.", + frame.id(), + CanId::from(old_frame.id()) + ); + inner + .tx_queue + .push_front(old_frame.clone().into()) + .map_err(|_| Error::TxQueueFull)?; + } + }, + Err(nb::Error::WouldBlock) => { + defmt::debug!( + "Couldn't trasmit CAN frame to {}, requeing frame.", + frame.id() + ); + inner + .tx_queue + .push_front(frame) + .map_err(|_| Error::TxQueueFull)?; + } + Err(_) => unreachable!(), } } - Err(nb::Error::WouldBlock) => break, - Err(nb::Error::Other(_)) => break, + Ok(()) + }) + } + fn handle_rx_interrupt(&self) -> Result<(), Error> { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + loop { + match inner.can.receive() { + Ok(frame) => { + let frame = frame.into(); + self.clients.distribute(&frame) + } + Err(nb::Error::WouldBlock) => break, + Err(nb::Error::Other(_)) => break, + } + } + + Ok(()) + }) + } + } + + impl From for bxcan::Frame { + fn from(value: CanFrame) -> bxcan::Frame { + match value { + CanFrame::Data(CanDataFrame(id, data)) => { + bxcan::Frame::new_data(id, bxcan::Data::new(&data.0).unwrap()) + } + CanFrame::Remote(CanRemoteFrame(id)) => bxcan::Frame::new_remote(id, 8), } } - - Ok(()) } -} - -#[derive(Clone, Copy, Format, Debug)] -pub struct CanDataFormat { - length: u8, - data: [u8; 8], -} - -impl CanDataFormat { - pub fn new(data: &[u8]) -> Self { - assert!(data.len() <= 8); - - let mut padded = [0_u8; 8]; - padded[0..data.len()].copy_from_slice(data); - - Self { - length: data.len() as u8, - data: padded, + impl From for CanFrame { + fn from(value: bxcan::Frame) -> CanFrame { + if let Some(data) = value.data() { + CanFrame::Data(CanDataFrame( + value.id().into(), + CanData(Vec::from_slice(data).unwrap()), + )) + } else { + CanFrame::Remote(CanRemoteFrame(value.id().into())) + } } } - pub fn data(&self) -> &[u8] { - &self.data[..self.length as usize] - } -} - -impl BusDataFormat for CanDataFormat {} - -fn decode_frame(frame: &bxcan::Frame) -> (BxCanId, Option) { - let id = match frame.id() { - Id::Standard(id) => BxCanId::Standard(id.as_raw()), - Id::Extended(id) => BxCanId::Extended(id.as_raw()), - }; - if let Some(data) = frame.data() { - (id, Some(CanDataFormat::new(data))) - } else { - (id, None) - } -} - -fn encode_frame(id: BxCanId, data: Option) -> bxcan::Frame { - let id = match id { - BxCanId::Standard(id) => Id::Standard(StandardId::new(id).unwrap()), - BxCanId::Extended(id) => Id::Extended(ExtendedId::new(id).unwrap()), - }; - if let Some(data) = data { - bxcan::Frame::new_data(id, bxcan::Data::new(data.data()).expect("valid can data")) - } else { - bxcan::Frame::new_remote(id, 8) + impl From for bxcan::Id { + fn from(value: CanId) -> bxcan::Id { + match value { + CanId::Standard(CanStandardId(id)) => { + bxcan::Id::Standard(unsafe { bxcan::StandardId::new_unchecked(id) }) + } + CanId::Extended(CanExtendedId(id)) => { + bxcan::Id::Extended(unsafe { bxcan::ExtendedId::new_unchecked(id) }) + } + } + } + } + + impl From for CanId { + fn from(value: bxcan::Id) -> CanId { + match value { + bxcan::Id::Standard(id) => CanId::Standard(CanStandardId(id.as_raw())), + bxcan::Id::Extended(id) => CanId::Extended(CanExtendedId(id.as_raw())), + } + } } } diff --git a/canome/src/canome.rs b/canome/src/canome.rs new file mode 100644 index 0000000..5f4c44d --- /dev/null +++ b/canome/src/canome.rs @@ -0,0 +1,242 @@ +use critical_section::Mutex; +use defmt::info; +use heapless::Deque; + +use crate::{ + can::{CanClientInner, CanData, CanFrame, CanHandle, CanId, CanRemoteFrame}, + Decode, Encode, +}; +use core::cell::RefCell; +use core::future::{poll_fn, Future}; +use core::task::{Poll, Waker}; + +pub enum Error { + Overflowed, +} + +pub trait CanomeChannel { + const ID: CanId; + type Data: Encode + Decode; +} + +pub struct CanomeEventClient<'client, 'driver: 'client, C: CanomeChannel, const N: usize> { + handle: CanHandle<'client, 'driver, Self>, + inner: Mutex>>, +} + +struct CanomeEventClientInner { + queue: Deque, + overflowed: bool, + waker: Option, +} + +impl<'client, 'driver: 'client, C: CanomeChannel, const N: usize> CanClientInner<'client, 'driver> + for CanomeEventClient<'client, 'driver, C, N> +{ + type InitData = (); + + fn new(_init_data: Self::InitData, handle: CanHandle<'client, 'driver, Self>) -> Self + where + Self: Sized, + { + Self { + handle, + inner: Mutex::new(RefCell::new(CanomeEventClientInner { + queue: Deque::new(), + overflowed: false, + waker: None, + })), + } + } + + fn handle_frame(&self, frame: CanFrame) { + if let CanFrame::Data(frame) = frame { + if frame.id() == C::ID { + if let Ok(data) = C::Data::decode(frame.1) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + if inner.queue.is_full() { + inner.queue.pop_back(); + inner.overflowed = true; + } + inner.queue.push_front(data); + + if let Some(waker) = inner.waker.take() { + waker.wake(); + } + }) + } else { + todo!() + } + } + } + } +} + +impl<'client, 'driver: 'client, C: CanomeChannel, const N: usize> + CanomeEventClient<'client, 'driver, C, N> +{ + pub fn recv(&mut self) -> impl Future> + '_ { + poll_fn(|cx| { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + + let waker = cx.waker(); + if !inner + .waker + .as_ref() + .map(|x| x.will_wake(waker)) + .unwrap_or_default() + { + let mut waker = Some(waker.clone()); + core::mem::swap(&mut waker, &mut inner.waker); + if let Some(waker) = waker { + waker.wake(); + } + } + + if inner.overflowed { + inner.overflowed = false; + Poll::Ready(Err(Error::Overflowed)) + } else { + match inner.queue.pop_back() { + Some(x) => Poll::Ready(Ok(x)), + None => Poll::Pending, + } + } + }) + }) + } + pub fn try_recv(&mut self) -> Result, Error> { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + + if inner.overflowed { + inner.overflowed = false; + Err(Error::Overflowed) + } else { + Ok(inner.queue.pop_back()) + } + }) + } + pub fn send(&mut self, data: C::Data) { + self.handle.send(CanFrame::Data(crate::can::CanDataFrame( + C::ID, + data.encode(), + ))) + } +} + +pub struct CanomeStateClient<'client, 'driver: 'client, C: CanomeChannel> { + handle: CanHandle<'client, 'driver, Self>, + inner: Mutex>>, +} + +struct CanomeStateClientInner { + last: C::Data, + counter: u64, + waker: Option, +} + +impl<'client, 'driver: 'client, C: CanomeChannel> CanClientInner<'client, 'driver> + for CanomeStateClient<'client, 'driver, C> +where + C::Data: Default, +{ + type InitData = (); + + fn new(_init_data: Self::InitData, handle: CanHandle<'client, 'driver, Self>) -> Self + where + Self: Sized, + { + Self { + handle, + inner: Mutex::new(RefCell::new(CanomeStateClientInner { + last: C::Data::default(), + counter: 0, + waker: None, + })), + } + } + + fn handle_frame(&self, frame: CanFrame) { + if let CanFrame::Data(frame) = frame { + if frame.id() == C::ID { + if let Ok(data) = C::Data::decode(frame.1) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow_ref_mut(cs); + + inner.last = data; + inner.counter += 1; + + if let Some(waker) = inner.waker.take() { + waker.wake(); + } + }) + } else { + todo!() + } + } + } + } +} + +impl<'client, 'driver: 'client, C: CanomeChannel> CanomeStateClient<'client, 'driver, C> +where + C::Data: Clone + Default, +{ + pub fn next(&mut self) -> impl Future + '_ { + let this: &'_ Self = self; + let counter_start: u64 = critical_section::with(|cs| self.inner.borrow_ref(cs).counter); + poll_fn(move |cx| { + critical_section::with(|cs| { + let mut inner = this.inner.borrow_ref_mut(cs); + + let waker = cx.waker(); + if !inner + .waker + .as_ref() + .map(|x| x.will_wake(waker)) + .unwrap_or_default() + { + let mut waker = Some(waker.clone()); + core::mem::swap(&mut waker, &mut inner.waker); + if let Some(waker) = waker { + waker.wake(); + } + } + + if inner.counter > counter_start { + Poll::Ready(inner.last.clone()) + } else { + Poll::Pending + } + }) + }) + } + pub fn get(&mut self) -> C::Data { + critical_section::with(|cs| { + let inner = self.inner.borrow_ref(cs); + inner.last.clone() + }) + } + pub fn counter(&self) -> u64 { + critical_section::with(|cs| { + let inner = self.inner.borrow_ref(cs); + inner.counter + }) + } + pub fn send(&mut self, data: C::Data) { + self.handle.send(CanFrame::Data(crate::can::CanDataFrame( + C::ID, + data.encode(), + ))) + } + pub fn request(&mut self) { + self.handle.send(CanFrame::Remote(CanRemoteFrame(C::ID))); + } + pub fn request_and_wait(&mut self) -> impl Future + '_ { + self.handle.send(CanFrame::Remote(CanRemoteFrame(C::ID))); + self.next() + } +} diff --git a/canome/src/contact.rs b/canome/src/contact.rs index 0a7d8e1..b1437de 100644 --- a/canome/src/contact.rs +++ b/canome/src/contact.rs @@ -16,24 +16,23 @@ impl ContactState { } } -#[cfg(feature = "can")] mod can { - use crate::{can::CanDataFormat, Decode, Encode}; + use crate::{can::CanData, Decode, Encode}; use super::ContactState; - impl Encode for ContactState { - fn encode(self) -> CanDataFormat { + impl Encode for ContactState { + fn encode(self) -> CanData { let data = match self { Self::Open => 0, Self::Closed => 1, }; - CanDataFormat::new(&[data]) + CanData::new(&[data]) } } - impl Decode for ContactState { - fn decode(value: CanDataFormat) -> Result + impl Decode for ContactState { + fn decode(value: CanData) -> Result where Self: Sized, { diff --git a/canome/src/cover.rs b/canome/src/cover.rs index a3fc784..f3b8e11 100644 --- a/canome/src/cover.rs +++ b/canome/src/cover.rs @@ -15,22 +15,40 @@ impl CoverState { } } -#[cfg(feature = "can")] +#[derive(Default, Debug, Format, Clone, Copy, PartialEq, Eq)] +pub struct CoverControl { + pub sequence_number: u8, + pub command: CoverCommand, +} + +#[derive(Default, Debug, Format, Clone, Copy, PartialEq, Eq)] +pub enum CoverCommand { + #[default] + None, + Stop, + Open, + Close, + TiltMin, + TiltMax, + Position(u16), + Tilt(u16), +} + mod can { - use crate::{can::CanDataFormat, Decode, Encode}; + use crate::{can::CanData, Decode, Encode}; - use super::CoverState; + use super::{CoverCommand, CoverControl, CoverState}; - impl Encode for CoverState { - fn encode(self) -> CanDataFormat { + impl Encode for CoverState { + fn encode(self) -> CanData { let hue = self.tilt.to_be_bytes(); let saturation = self.position.to_be_bytes(); - CanDataFormat::new(&[hue[0], hue[1], saturation[0], saturation[1]]) + CanData::new(&[hue[0], hue[1], saturation[0], saturation[1]]) } } - impl Decode for CoverState { - fn decode(value: CanDataFormat) -> Result + impl Decode for CoverState { + fn decode(value: CanData) -> Result where Self: Sized, { @@ -48,4 +66,58 @@ mod can { }) } } + + impl Encode for CoverControl { + fn encode(self) -> CanData { + let seq = self.sequence_number; + let (kind, data): (u8, &[u8]) = match self.command { + CoverCommand::None => (0, &[]), + CoverCommand::Stop => (1, &[]), + CoverCommand::Open => (2, &[]), + CoverCommand::Close => (3, &[]), + CoverCommand::TiltMin => (4, &[]), + CoverCommand::TiltMax => (5, &[]), + CoverCommand::Position(val) => (6, &val.to_be_bytes()), + CoverCommand::Tilt(val) => (7, &val.to_be_bytes()), + }; + let mut payload = [seq, kind, 0, 0, 0, 0, 0, 0]; + payload[2..].copy_from_slice(&data); + + CanData::new(&payload[0..(2 + data.len())]) + } + } + + impl Decode for CoverControl { + fn decode(value: CanData) -> Result + where + Self: Sized, + { + use CoverCommand as C; + let data = value.data(); + if data.len() < 2 { + return Err(value); + } + + let sequence_number = data[0]; + let command = data[1]; + let data = &data[2..]; + + let command = match command { + 0 => C::None, + 1 => C::Stop, + 2 => C::Open, + 3 => C::Close, + 4 => C::TiltMin, + 5 => C::TiltMax, + 6 if data.len() == 2 => C::Position(u16::from_be_bytes([data[0], data[1]])), + 7 if data.len() == 2 => C::Tilt(u16::from_be_bytes([data[0], data[1]])), + _ => return Err(value), + }; + + Ok(Self { + sequence_number, + command, + }) + } + } } diff --git a/canome/src/lib.rs b/canome/src/lib.rs index 5ee2431..77972a7 100644 --- a/canome/src/lib.rs +++ b/canome/src/lib.rs @@ -1,37 +1,73 @@ #![no_std] #![feature(adt_const_params)] +#![feature(associated_const_equality)] use core::fmt::Debug; -pub mod bus; -#[cfg(feature = "can")] +use defmt::Format; + +//pub mod bus; +//#[cfg(feature = "bxcan")] +//pub mod bxcan; pub mod can; +pub mod canome; pub mod contact; pub mod cover; pub mod light; pub mod power; +#[cfg(feature = "socketcan")] +pub mod socketcan; +pub mod spmc; pub mod state; #[cfg(feature = "stm32")] pub mod stm32; pub mod time; -pub trait Channel { - const ID: B::ID; - type State: Encode + Decode; -} +pub use embedded_can::{ExtendedId, Id, StandardId}; -pub trait Encode { +/* +pub trait Channel { + const ID: embedded_can::Id; + type Message: Encode + Decode; +} +*/ + +pub trait Encode { fn encode(self) -> F; } -pub trait Decode { +pub trait Decode { fn decode(value: F) -> Result where Self: Sized; } -pub trait BusDataFormat: Debug + Clone {} -pub trait BusBackend { - type ID: Debug + Clone; - type Data: BusDataFormat; +/* +pub trait BusBackend: Send + Sync { type Error; - fn publish(&self, id: Self::ID, data: Option) -> Result<(), Self::Error>; + fn publish(&self, id: embedded_can::Id, data: Option) + -> Result<(), Self::Error>; } + +#[derive(Clone, Copy, Format, Debug)] +pub struct CanDataFormat { + length: usize, + data: [u8; 8], +} + +impl CanDataFormat { + pub fn new(data: &[u8]) -> Self { + assert!(data.len() <= 8); + + let mut padded = [0_u8; 8]; + padded[0..data.len()].copy_from_slice(data); + + Self { + length: data.len(), + data: padded, + } + } + + pub fn data(&self) -> &[u8] { + &self.data[..self.length] + } +} +*/ diff --git a/canome/src/power.rs b/canome/src/power.rs index 554824b..f665ae3 100644 --- a/canome/src/power.rs +++ b/canome/src/power.rs @@ -5,25 +5,24 @@ pub struct PowermeterState { pub total: u32, pub current: u32, } -#[cfg(feature = "can")] mod can { - use crate::{can::CanDataFormat, Decode, Encode}; + use crate::{can::CanData, Decode, Encode}; use super::PowermeterState; - impl Encode for PowermeterState { - fn encode(self) -> CanDataFormat { + impl Encode for PowermeterState { + fn encode(self) -> CanData { let total = self.total.to_be_bytes(); let current = self.current.to_be_bytes(); - CanDataFormat::new(&[ + CanData::new(&[ total[0], total[1], total[2], total[3], current[0], current[1], current[2], current[3], ]) } } - impl Decode for PowermeterState { - fn decode(value: CanDataFormat) -> Result + impl Decode for PowermeterState { + fn decode(value: CanData) -> Result where Self: Sized, { diff --git a/canome/src/socketcan.rs b/canome/src/socketcan.rs new file mode 100644 index 0000000..bc69c75 --- /dev/null +++ b/canome/src/socketcan.rs @@ -0,0 +1,147 @@ +use core::fmt::Debug; + +use crate::{BusBackend, CanDataFormat}; +use defmt::Format; +use embedded_can::{ExtendedId, Frame, Id, StandardId}; +use heapless::mpmc::MpMcQueue; +use socketcan::{CanDataFrame, CanFrame, CanRemoteFrame}; + +pub struct SocketCanBus { + pub tx_queue: MpMcQueue, + on_publish: &'static (dyn Fn(&Self) + Send + Sync), +} + +impl Debug for SocketCanBus { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("SocketCanBus").finish() + } +} + +#[derive(Debug)] +pub enum SocketCanBusError { + /// An object couldn't be added to the TX queue of [`BxCanBus`] because it is full. + TxQueueFull, +} + +impl BusBackend for SocketCanBus { + type Error = SocketCanBusError; + + fn publish(&self, id: Id, data: Option) -> Result<(), Self::Error> { + self.tx_queue + .enqueue(encode_frame(id, data)) + .map_err(|_| Self::Error::TxQueueFull)?; + + (self.on_publish)(self); + + Ok(()) + } +} + +impl SocketCanBus { + pub const fn new_with_hook(on_publish: &'static (dyn Fn(&Self) + Send + Sync)) -> Self { + Self { + tx_queue: MpMcQueue::new(), + on_publish, + } + } +} + +impl SocketCanBus {} + +/* +impl SocketCanBus { + pub fn handle_tx_interrupt( + &self, + can_tx: &mut Tx, + ) -> Result<(), SocketCanBusError> { + let rx = &self.tx_queue; + while let Some(frame) = rx.dequeue() { + match can_tx.transmit(&frame) { + Ok(status) => match status.dequeued_frame() { + None => { + defmt::debug!("Transmitting CAN frame to {}.", frame.id()); + } + Some(old_frame) => { + defmt::debug!( + "Transmitting CAN frame to {} by displacing other to {}, requeing it.", + frame.id(), + old_frame.id() + ); + rx.enqueue(old_frame.clone()) + .map_err(|_| SocketCanBusError::TxQueueFull)?; + } + }, + Err(nb::Error::WouldBlock) => { + defmt::debug!( + "Couldn't trasmit CAN frame to {}, requeing frame.", + frame.id() + ); + rx.enqueue(frame) + .map_err(|_| SocketCanBusError::TxQueueFull)?; + } + Err(_) => unreachable!(), + } + } + Ok(()) + } + pub fn handle_rx_interrupt( + &self, + can_rx: &mut Rx0, + bus: impl BusBehaviour, + ) -> Result<(), SocketCanBusError> { + loop { + match can_rx.receive() { + Ok(frame) => { + let (channel_id, data) = decode_frame(&frame); + if let Some(data) = data { + bus.publish_local(&channel_id, data); + } else if let Some(state) = bus.get(channel_id) { + defmt::debug!("responding to remote frame"); + self.tx_queue + .enqueue(encode_frame(channel_id, Some(state))) + .map_err(|_| SocketCanBusError::TxQueueFull)?; + + (self.on_publish)(self); + } + } + Err(nb::Error::WouldBlock) => break, + Err(nb::Error::Other(_)) => break, + } + } + + Ok(()) + } +} +*/ + +pub fn decode_frame(frame: &socketcan::CanFrame) -> (Id, Option) { + let id = match frame.id() { + socketcan::Id::Standard(x) => { + Id::Standard(unsafe { StandardId::new_unchecked(x.as_raw()) }) + } + socketcan::Id::Extended(x) => { + Id::Extended(unsafe { ExtendedId::new_unchecked(x.as_raw()) }) + } + }; + match frame { + CanFrame::Data(data) => (id, Some(CanDataFormat::new(data.data()))), + CanFrame::Remote(_) => (id, None), + CanFrame::Error(_) => unimplemented!(), + } +} + +pub fn encode_frame(id: Id, data: Option) -> CanFrame { + let id = match id { + Id::Standard(x) => { + socketcan::Id::Standard(unsafe { socketcan::StandardId::new_unchecked(x.as_raw()) }) + } + Id::Extended(x) => { + socketcan::Id::Extended(unsafe { socketcan::ExtendedId::new_unchecked(x.as_raw()) }) + } + }; + if let Some(data) = data { + CanFrame::Data(CanDataFrame::new(id, data.data()).unwrap()) + } else { + CanFrame::Remote(CanRemoteFrame::new_remote(id, 8).unwrap()) + } +} diff --git a/canome/src/spmc.rs b/canome/src/spmc.rs new file mode 100644 index 0000000..daa182e --- /dev/null +++ b/canome/src/spmc.rs @@ -0,0 +1,418 @@ +use core::borrow::{Borrow, BorrowMut}; +use core::cell::RefCell; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; + +use core::ptr::NonNull; + +use critical_section::Mutex; + +#[derive(PartialEq)] +pub enum Error { + Overflowed, +} + +pub type CanData = [u8; 8]; +pub type CanId = u32; + +pub struct Channel(Mutex>>>); + +unsafe impl Send for Channel {} + +struct ClientList { + head: NonNull>, + tail: NonNull>, +} + +pub struct ChannelClient { + receiver: NonNull>, + nav: Mutex>>, +} + +struct ChannelClientNav { + next: Option>>, + prev: Option>>, +} + +pub trait Receiver<'channel, T>: Receive { + fn channel_client(&self) -> &ChannelClient; + fn channel(&self) -> &Channel; +} +pub trait Receive { + fn receive(&self, value: &T); +} + +pub struct ReceiverRef<'channel, 'receiver, T, R: Receiver<'channel, T>>( + &'receiver mut R, + PhantomData<&'channel Channel>, +); + +impl ChannelClient { + pub fn new<'channel, R: Receiver<'channel, T>>(receiver: &R) -> Self { + Self { + receiver: NonNull::new(receiver as *const dyn Receive as *mut _).unwrap(), + nav: Mutex::new(RefCell::new(ChannelClientNav { + prev: None, + next: None, + })), + } + } +} + +impl Channel { + pub fn register<'channel, 'receiver, R: Receiver<'channel, T>>( + &'channel self, + receiver: &'receiver mut R, + ) -> ReceiverRef<'channel, 'receiver, T, R> + where + 'channel: 'receiver, + { + let client_ref = receiver.channel_client(); + let client_ptr = NonNull::new(client_ref as *const _ as *mut _).unwrap(); + + critical_section::with(|cs| { + let mut client_list = self.0.borrow(cs).borrow_mut(); + if let Some(client_list) = &mut *client_list { + let old_head = client_list.head; + client_ref.nav.borrow(cs).borrow_mut().next = Some(old_head); + + let old_head = unsafe { old_head.as_ref() }; + old_head.nav.borrow(cs).borrow_mut().prev = Some(client_ptr); + client_list.head = client_ptr; + } else { + *client_list = Some(ClientList { + head: client_ptr, + tail: client_ptr, + }); + } + }); + + ReceiverRef(receiver, PhantomData) + } + pub fn distribute(&self, value: &T) { + critical_section::with(|cs| { + let list = self.0.borrow(cs).borrow(); + if let Some(list) = &*list { + let mut next = Some(list.head); + while let Some(current) = next { + let current = unsafe { current.as_ref() }; + + let receiver = unsafe { current.receiver.as_ref() }; + receiver.receive(value); + + next = current.nav.borrow(cs).borrow().next; + } + } + }) + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> Deref + for ReceiverRef<'channel, 'receiver, T, R> +{ + type Target = R; + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> DerefMut + for ReceiverRef<'channel, 'receiver, T, R> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> AsRef + for ReceiverRef<'channel, 'receiver, T, R> +{ + fn as_ref(&self) -> &R { + self.0 + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> Borrow + for ReceiverRef<'channel, 'receiver, T, R> +{ + fn borrow(&self) -> &R { + self.0 + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> BorrowMut + for ReceiverRef<'channel, 'receiver, T, R> +{ + fn borrow_mut(&mut self) -> &mut R { + self.0 + } +} + +impl Default for Channel { + fn default() -> Self { + Self(Mutex::new(RefCell::new(None))) + } +} + +impl<'channel, 'receiver, T, R: Receiver<'channel, T>> Drop + for ReceiverRef<'channel, 'receiver, T, R> +{ + fn drop(&mut self) { + critical_section::with(|cs| { + let receiver = &mut self.0; + let channel = receiver.channel(); + let own_nav = receiver.channel_client().nav.borrow(cs).borrow_mut(); + + match (own_nav.prev, own_nav.next) { + (None, None) => { + // this item is the last one in the list, we need to tear down the entire list + let mut can_driver_list = channel.0.borrow(cs).borrow_mut(); + *can_driver_list = None; + } + (None, Some(next)) => { + // this item is the first in the list, we need to update the head ptr and the + // prev ptr of next + let mut can_driver_list = channel.0.borrow(cs).borrow_mut(); + let can_driver_list = can_driver_list.as_mut().unwrap(); + can_driver_list.head = next; + + let next = unsafe { next.as_ref() }; + next.nav.borrow(cs).borrow_mut().prev = None; + } + (Some(prev), None) => { + // this item is the last one in the list, we need to update the tail ptr and + // the next ptr of prev + + let mut can_driver_list = channel.0.borrow(cs).borrow_mut(); + let can_driver_list = can_driver_list.as_mut().unwrap(); + can_driver_list.tail = prev; + + let prev = unsafe { prev.as_ref() }; + prev.nav.borrow(cs).borrow_mut().next = None; + } + (Some(prev), Some(next)) => { + // this item is in the middle of the list, we need to update the next ptr of + // prev and the prev ptr of next + + { + let next = unsafe { next.as_ref() }; + next.nav.borrow(cs).borrow_mut().prev = Some(prev); + } + { + let prev = unsafe { prev.as_ref() }; + prev.nav.borrow(cs).borrow_mut().next = Some(next); + } + } + } + }); + } +} + +#[cfg(test)] +mod test { + use super::{Channel, ChannelClient, ChannelClientNav, Error, Receive, Receiver}; + use core::cell::RefCell; + use core::future::{poll_fn, Future}; + use core::mem::MaybeUninit; + use core::ptr::NonNull; + use core::task::{Poll, Waker}; + use critical_section::Mutex; + use heapless::Deque; + + pub struct UnboundReceiver<'channel, T, const N: usize>( + MaybeUninit>, + ); + impl<'channel, T, const N: usize> AsMut>> + for UnboundReceiver<'channel, T, N> + { + fn as_mut(&mut self) -> &mut MaybeUninit> { + &mut self.0 + } + } + impl<'channel, T, const N: usize> Default for UnboundReceiver<'channel, T, N> { + fn default() -> Self { + Self(MaybeUninit::uninit()) + } + } + impl<'candriver, T, const N: usize> ChannelReceiver<'candriver, T, N> { + pub fn recv(&mut self) -> impl Future> + '_ { + poll_fn(|cx| { + match critical_section::with(|cs| { + let mut inner = self.inner.borrow(cs).borrow_mut(); + + // register waker + let waker = cx.waker(); + if !inner + .waker + .as_ref() + .map(|x| x.will_wake(waker)) + .unwrap_or_default() + { + let mut waker = Some(waker.clone()); + core::mem::swap(&mut inner.waker, &mut waker); + if let Some(waker) = waker { + waker.wake(); + } + } + + if inner.overflowed { + inner.overflowed = false; + Err(Error::Overflowed) + } else { + Ok(inner.queue.pop_back()) + } + }) { + Err(x) => Poll::Ready(Err(x)), + Ok(Some(x)) => Poll::Ready(Ok(x)), + Ok(None) => Poll::Pending, + } + }) + } + pub fn try_recv(&mut self) -> Result, Error> { + critical_section::with(|cs| { + let mut inner = self.inner.borrow(cs).borrow_mut(); + + if inner.overflowed { + inner.overflowed = false; + Err(Error::Overflowed) + } else { + Ok(inner.queue.pop_back()) + } + }) + } + } + + pub struct ChannelReceiver<'channel, T, const N: usize> { + channel: &'channel Channel, + client: Option>, + inner: Mutex>>, + } + impl<'channel, T: Clone, const N: usize> ChannelReceiver<'channel, T, N> { + fn init<'a>( + channel: &'channel Channel, + memory: &'a mut UnboundReceiver<'channel, T, N>, + ) -> &'a mut Self + where + Self: Sized, + { + let receiver = memory.0.write(ChannelReceiver { + channel, + client: None, + inner: Mutex::new(RefCell::new(ChannelReceiverInner { + queue: Deque::new(), + overflowed: false, + waker: None, + })), + }); + + receiver.client = Some(ChannelClient { + receiver: NonNull::new(receiver as *mut _ as *const dyn Receive as *mut _) + .unwrap(), + nav: Mutex::new(RefCell::new(ChannelClientNav { + next: None, + prev: None, + })), + }); + + receiver + } + } + impl<'channel, T: Clone, const N: usize> Receiver<'channel, T> for ChannelReceiver<'channel, T, N> { + fn channel(&self) -> &Channel { + self.channel + } + fn channel_client(&self) -> &ChannelClient { + self.client.as_ref().unwrap() + } + } + impl<'candriver, T: Clone, const N: usize> Receive for ChannelReceiver<'candriver, T, N> { + fn receive(&self, value: &T) { + critical_section::with(|cs| { + let mut inner = self.inner.borrow(cs).borrow_mut(); + if inner.queue.is_full() { + inner.queue.pop_back(); + inner.overflowed = true; + } + let Ok(_) = inner.queue.push_front(value.clone()) else { + unreachable!() + }; + }) + } + } + + struct ChannelReceiverInner { + queue: Deque, + overflowed: bool, + waker: Option, + } + + #[test] + pub fn basic() { + let can_driver = Channel::default(); + + let mut recv1 = UnboundReceiver::default(); + let recv1 = ChannelReceiver::init(&can_driver, &mut recv1); + let mut recv1 = can_driver.register::>(recv1); + + //test overflowing behaviour + assert!(recv1.try_recv() == Ok(None)); + can_driver.distribute(&0); + + assert!(recv1.try_recv() == Ok(Some(0))); + assert!(recv1.try_recv() == Ok(None)); + } + + #[test] + pub fn overflow() { + let can_driver = Channel::default(); + + let mut recv1 = UnboundReceiver::default(); + let recv1 = ChannelReceiver::init(&can_driver, &mut recv1); + let mut recv1 = can_driver.register::>(recv1); + + //test overflowing behaviour + assert!(recv1.try_recv() == Ok(None)); + for i in 0..11 { + can_driver.distribute(&i); + } + assert!(recv1.try_recv() == Err(Error::Overflowed)); + for i in 1..11 { + assert!(recv1.try_recv() == Ok(Some(i))); + } + assert!(recv1.try_recv() == Ok(None)); + } + #[test] + pub fn multi_receiver() { + let can_driver = Channel::default(); + + let mut recv1 = UnboundReceiver::default(); + let recv1 = ChannelReceiver::init(&can_driver, &mut recv1); + let mut recv1 = can_driver.register::>(recv1); + + let mut recv2 = UnboundReceiver::default(); + let recv2 = ChannelReceiver::init(&can_driver, &mut recv2); + let mut recv2 = can_driver.register::>(recv2); + + { + can_driver.distribute(&0); + assert!(recv1.try_recv() == Ok(Some(0))); + assert!(recv2.try_recv() == Ok(Some(0))); + } + + { + can_driver.distribute(&1); + can_driver.distribute(&2); + assert!(recv1.try_recv() == Ok(Some(1))); + assert!(recv1.try_recv() == Ok(Some(2))); + assert!(recv2.try_recv() == Ok(Some(1))); + assert!(recv2.try_recv() == Ok(Some(2))); + } + + drop(recv1); + + { + can_driver.distribute(&3); + assert!(recv2.try_recv() == Ok(Some(3))); + } + } +} diff --git a/canome/src/state.rs b/canome/src/state.rs index 5716d80..a0ea353 100644 --- a/canome/src/state.rs +++ b/canome/src/state.rs @@ -6,24 +6,23 @@ pub enum ThreeWaySwitchState { B, } -#[cfg(feature = "can")] mod can { - use crate::{can::CanDataFormat, Decode, Encode}; + use crate::{can::CanData, Decode, Encode}; use super::ThreeWaySwitchState; - impl Encode for ThreeWaySwitchState { - fn encode(self) -> CanDataFormat { + impl Encode for ThreeWaySwitchState { + fn encode(self) -> CanData { let desc = match self { Self::Off => 0, Self::A => 1, Self::B => 2, }; - CanDataFormat::new(&[desc]) + CanData::new(&[desc]) } } - impl Decode for ThreeWaySwitchState { - fn decode(value: CanDataFormat) -> Result + impl Decode for ThreeWaySwitchState { + fn decode(value: CanData) -> Result where Self: Sized, { diff --git a/canome/src/stm32.rs b/canome/src/stm32.rs index 0ecc0bf..6d58ea4 100644 --- a/canome/src/stm32.rs +++ b/canome/src/stm32.rs @@ -1,6 +1,10 @@ use core::ops::{Deref, DerefMut}; -use crate::{bus::BusRx, state::ThreeWaySwitchState, BusBackend, Channel}; +use crate::{ + can::{CanHandle, CanHandleRef}, + canome::{CanomeChannel, CanomeEventClient, CanomeStateClient}, + state::ThreeWaySwitchState, +}; use rtic_monotonics::{ systick::{ fugit::{Duration, Instant}, @@ -37,26 +41,26 @@ impl ThreeWaySwitch< } } -pub struct ChannelDerivative<'a, C: Channel, R: BusRx, B: BusBackend> { - channel: &'a R, - last: C::State, +pub struct ChannelDerivative<'handle, 'driver, C: CanomeChannel, const N: usize> { + channel: CanHandleRef<'handle, 'driver, CanomeEventClient<'handle, 'driver, C, N>>, + last: C::Data, updated: Instant, } -impl<'a, C, R, B> ChannelDerivative<'a, C, R, B> +impl<'handle, 'driver, C, const N: usize> ChannelDerivative<'handle, 'driver, C, N> where - C: Channel, - C::State: Default + PartialEq + Clone, - R: BusRx, - B: BusBackend, + C: CanomeChannel, + C::Data: Default + PartialEq + Clone, { - pub fn new(channel: &'a R) -> Self { + pub fn new( + channel: CanHandleRef<'handle, 'driver, CanomeEventClient<'handle, 'driver, C, N>>, + ) -> Self { Self { channel, - last: C::State::default(), + last: C::Data::default(), updated: Systick::now(), } } - pub fn update(&mut self, new: C::State) -> Option<(C::State, Duration)> { + pub fn update(&mut self, new: C::Data) -> Option<(C::Data, Duration)> { if new != self.last { self.last = new.clone(); let now = Systick::now(); @@ -67,10 +71,13 @@ where None } } - pub async fn next(&mut self) -> (C::State, Duration) { + pub async fn recv( + &mut self, + ) -> Result<(C::Data, Duration), crate::canome::Error> { loop { - if let Some(value) = self.update(self.channel.next().await) { - return value; + let new = self.channel.recv().await?; + if let Some(value) = self.update(new) { + return Ok(value); } } } diff --git a/canome/src/time.rs b/canome/src/time.rs index a60c26b..96caa10 100644 --- a/canome/src/time.rs +++ b/canome/src/time.rs @@ -5,21 +5,20 @@ pub struct Timestamp { millis_since_epoch: i64, } -#[cfg(feature = "can")] mod can { - use crate::{can::CanDataFormat, Decode, Encode}; + use crate::{can::CanData, Decode, Encode}; use super::Timestamp; - impl Encode for Timestamp { - fn encode(self) -> CanDataFormat { + impl Encode for Timestamp { + fn encode(self) -> CanData { let timestamp = self.millis_since_epoch.to_be_bytes(); - CanDataFormat::new(×tamp) + CanData::new(×tamp) } } - impl Decode for Timestamp { - fn decode(value: CanDataFormat) -> Result + impl Decode for Timestamp { + fn decode(value: CanData) -> Result where Self: Sized, {