From de4c2e6bdcc50ce6339492944481ce687f3fa674 Mon Sep 17 00:00:00 2001 From: Paul Zinselmeyer Date: Thu, 14 Dec 2023 23:17:19 +0100 Subject: [PATCH] first implementation --- .gitignore | 2 + Cargo.toml | 10 +++ src/lib.rs | 235 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 247 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5ff73b4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "eventi" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.35.0", features = ["full"] } +async-trait = "0.1.74" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5fe8faa --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,235 @@ +#![forbid(unsafe_code)] +use std::{ + any::{Any, TypeId}, + collections::HashMap, + future::Future, + sync::Arc, +}; + +use async_trait::async_trait; +use tokio::{sync::RwLock, task::JoinHandle}; + +type AnonEventArc = Arc; +type AnonHandlerBox = Box; +pub type HandlerResult = (); + +pub trait Event: Any + Send + Sync {} +#[async_trait] +pub trait Handler: 'static + Send + Sync { + async fn run(&self, event: Arc, dispatcher: &dyn Dispatcher); +} + +#[async_trait] +pub trait Dispatcher: Send + Sync { + async fn dispatch_anon(&self, event: AnonEvent); +} + +impl dyn Dispatcher { + pub async fn dispatch(&self, event: Arc) { + self.dispatch_anon(AnonEvent::from(event)).await + } +} + +#[async_trait] +trait AnonHandlerTrait: Send + Sync { + async fn run(&self, event: AnonEvent, dispatcher: &dyn Dispatcher); + fn clone(&self) -> AnonHandlerBox; +} + +#[derive(Clone)] +pub struct AnonEvent { + type_id: TypeId, + value: AnonEventArc, +} + +struct SemiAnonHandler(Arc>); + +pub struct AnonHandler { + event_id: TypeId, + value: AnonHandlerBox, +} + +#[derive(Clone, Default)] +pub struct Bus { + registered: Arc>>>, + running: Arc>>>, +} + +impl AnonEvent { + pub fn deanon(self) -> Option> { + self.value.downcast::().ok() + } +} + +impl SemiAnonHandler { + pub fn new>(handler: Arc) -> Self { + Self(handler) + } +} + +impl AnonHandler { + pub fn new, E: Event>(handler: Arc) -> Self { + Self { + event_id: TypeId::of::(), + value: Box::new(SemiAnonHandler::new(handler)), + } + } + pub async fn run(&self, event: AnonEvent, dispatcher: &dyn Dispatcher) { + let handler = self.value.clone(); + handler.run(event, dispatcher).await; + } +} + +impl Bus { + pub async fn subscribe, E: Event>(&self, handler: H) { + let handler = AnonHandler::new(Arc::new(handler)); + self.subscribe_anon(handler).await; + } + pub async fn subscribe_shared, E: Event>(&self, handler: Arc) { + let handler = AnonHandler::new(handler); + self.subscribe_anon(handler).await; + } + pub async fn subscribe_anon(&self, handler: AnonHandler) { + let registered = &mut self.registered.write().await; + if let Some(handlers) = registered.get_mut(&handler.event_id) { + handlers.push(handler); + } else { + registered.insert(handler.event_id, vec![handler]); + } + } + + pub async fn shutdown(&self) { + let mut running = self.running.write().await; + let mut new_running = vec![]; + std::mem::swap(running.as_mut(), &mut new_running); + drop(running); + for running in new_running { + running.await; + } + } +} + +#[async_trait] +impl Dispatcher for Bus { + async fn dispatch_anon(&self, event: AnonEvent) { + let mut new_running = self + .registered + .read() + .await + .get(&event.type_id) + .iter() + .flat_map(|x| x.iter()) + .map(|handler| { + let handler = handler.clone(); + let dispatcher = self.clone(); + let event = event.clone(); + tokio::spawn(async move { handler.run(event, &dispatcher).await }) + }) + .collect::>(); + self.running.write().await.append(&mut new_running); + } +} + +impl From> for AnonEvent { + fn from(value: Arc) -> Self { + Self { + type_id: TypeId::of::(), + value, + } + } +} + +impl From for AnonEvent { + fn from(value: E) -> Self { + Arc::new(value).into() + } +} + +#[async_trait] +impl AnonHandlerTrait for SemiAnonHandler { + async fn run(&self, event: AnonEvent, dispatcher: &dyn Dispatcher) { + if let Some(event) = event.deanon::() { + self.0.run(event, dispatcher).await + } + } + + fn clone(&self) -> AnonHandlerBox { + Box::new(Self(self.0.clone())) + } +} + +impl Clone for AnonHandler { + fn clone(&self) -> Self { + Self { + event_id: self.event_id, + value: self.value.clone(), + } + } +} + +//TODO: doesn't work +#[async_trait] +impl Handler for F +where + F: Fn(Arc, &dyn Dispatcher) -> Fut + Send + Sync + 'static, + Fut: Future + Send + Sync, + E: Event, +{ + async fn run(&self, event: Arc, dispatcher: &dyn Dispatcher) { + (self)(event, dispatcher).await + } +} + +#[cfg(test)] +mod tests { + use std::{ + assert_eq, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + }; + + use async_trait::async_trait; + + use crate::{AnonEvent, Bus, Dispatcher, Event, Handler}; + + struct Hello { + callback: AtomicU8, + } + impl Event for Hello {} + + struct Hello2 {} + impl Event for Hello2 {} + + struct HelloHandler; + #[async_trait] + impl Handler for HelloHandler { + async fn run(&self, event: Arc, dispatcher: &dyn Dispatcher) { + event.callback.fetch_add(1, Ordering::AcqRel); + } + } + + #[tokio::test] + async fn basic() { + let bus = Bus::default(); + + bus.subscribe(HelloHandler).await; + + let event1 = Arc::new(Hello { + callback: AtomicU8::new(0), + }); + let event2 = Arc::new(Hello { + callback: AtomicU8::new(0), + }); + + bus.dispatch_anon(AnonEvent::from(event1.clone())).await; + bus.dispatch_anon(AnonEvent::from(event2.clone())).await; + bus.dispatch_anon(AnonEvent::from(event2.clone())).await; + + bus.shutdown().await; + + assert_eq!(event1.callback.load(Ordering::Acquire), 1); + assert_eq!(event2.callback.load(Ordering::Acquire), 2); + } +}