From 666544c3556f506b14cc167b75f0be7d8cb27baa Mon Sep 17 00:00:00 2001 From: Paul Z Date: Tue, 5 Sep 2023 17:14:14 +0200 Subject: [PATCH] split graph implementation into rebacs_core --- .drone.yml | 39 + .gitignore | 2 +- .gitlab-ci.yml | 36 - Cargo.lock | 102 ++- Cargo.toml | 35 +- Dockerfile | 4 - flake.nix | 5 +- rebacs_core/Cargo.toml | 9 + src/relation_set.rs => rebacs_core/src/lib.rs | 27 +- rebacs_core/src/tests.rs | 52 ++ rebacs_server/Cargo.toml | 32 + build.rs => rebacs_server/build.rs | 0 {proto => rebacs_server/proto}/rebacs.proto | 0 {src => rebacs_server/src}/grpc_service.rs | 6 +- {src => rebacs_server/src}/main.rs | 13 +- {src => rebacs_server/src}/rebacs_proto.rs | 0 src/graph.rs | 735 ------------------ src/kafka_backend.rs | 145 ---- 18 files changed, 226 insertions(+), 1016 deletions(-) create mode 100644 .drone.yml delete mode 100644 .gitlab-ci.yml delete mode 100644 Dockerfile create mode 100644 rebacs_core/Cargo.toml rename src/relation_set.rs => rebacs_core/src/lib.rs (94%) create mode 100644 rebacs_core/src/tests.rs create mode 100644 rebacs_server/Cargo.toml rename build.rs => rebacs_server/build.rs (100%) rename {proto => rebacs_server/proto}/rebacs.proto (100%) rename {src => rebacs_server/src}/grpc_service.rs (98%) rename {src => rebacs_server/src}/main.rs (87%) rename {src => rebacs_server/src}/rebacs_proto.rs (100%) delete mode 100644 src/graph.rs delete mode 100644 src/kafka_backend.rs diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e54a5b3 --- /dev/null +++ b/.drone.yml @@ -0,0 +1,39 @@ +kind: pipeline +ype: docker +name: rebacs + +trigger: + event: + - push + +steps: +- name: create_image + image: nixos/nix + commands: + - nix build --extra-experimental-features nix-command --extra-experimental-features flakes --cores 0 --max-jobs auto .#dockerImage + - cp result rebacs.tar.gz +- name: upload_image + image: docker:dind + environment: + REGISTRY_PASSWD: + from_secret: REGISTRY_PASSWD + volumes: + - name: dockersock + path: /var/run + commands: + - docker login --username droneci --password $REGISTRY_PASSWD git2.zettoit.eu + - docker load < rebacs.tar.gz + - docker tag rebacs:latest git2.zettoit.eu/zettoit/rebacs:latest + - docker push git2.zettoit.eu/zettoit/rebacs:latest + +services: +- name: docker + image: docker:dind + privileged: true + volumes: + - name: dockersock + path: /var/run + +volumes: +- name: dockersock + temp: {} diff --git a/.gitignore b/.gitignore index ad6bc70..7595c70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ /target +/result .env graph.dat graph.dat.bak -api_keys.dat diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 684e56d..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,36 +0,0 @@ -variables: - DOCKER_TLS_CERTDIR: "" - DOCKER_HOST: tcp://docker:2375 - -services: - - name: docker:dind - entrypoint: ["dockerd-entrypoint.sh", "--tls=false"] - -stages: - - build - - pack - -cargo-build: - stage: build - image: rust:slim-bookworm - before_script: - - apt update && apt install protobuf-compiler -y - - rustup toolchain install nightly - - rustup default nightly - script: - - cargo build --release - artifacts: - paths: - - target/release/rebacs - -docker: - stage: pack - image: docker:latest - only: - - master - script: - - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY - - docker build --network host -t $CI_REGISTRY_IMAGE/master . -f Dockerfile - - docker push $CI_REGISTRY_IMAGE/master - dependencies: - - cargo-build diff --git a/Cargo.lock b/Cargo.lock index b561260..899f946 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" dependencies = [ "memchr", ] @@ -51,7 +51,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -62,7 +62,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -290,9 +290,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" +checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" dependencies = [ "errno-dragonfly", "libc", @@ -359,7 +359,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -464,6 +464,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "home" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +dependencies = [ + "windows-sys", +] + [[package]] name = "http" version = "0.2.9" @@ -681,9 +690,9 @@ checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "mime" @@ -759,9 +768,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "memchr", ] @@ -837,7 +846,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -971,7 +980,14 @@ dependencies = [ ] [[package]] -name = "rebacs" +name = "rebacs_core" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "rebacs_server" version = "0.1.0" dependencies = [ "compact_str", @@ -981,6 +997,7 @@ dependencies = [ "jsonwebtoken", "log", "prost", + "rebacs_core", "reqwest", "serde", "sha2", @@ -1001,9 +1018,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", @@ -1013,9 +1030,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", @@ -1090,9 +1107,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.9" +version = "0.38.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bfe0f2582b4931a45d1fa608f8a8722e8b3c7ac54dd6d5f3b3212791fedef49" +checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453" dependencies = [ "bitflags 2.4.0", "errno", @@ -1103,9 +1120,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.6" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", @@ -1177,7 +1194,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1295,9 +1312,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" dependencies = [ "proc-macro2", "quote", @@ -1334,29 +1351,29 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] name = "time" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb39ee79a6d8de55f48f2293a830e040392f1c5f16e336bdd1788cd0aadce07" +checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ "deranged", "itoa", @@ -1373,9 +1390,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "733d258752e9303d392b94b75230d07b0b9c489350c69b851fc6c065fde3e8f9" +checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" dependencies = [ "time-core", ] @@ -1432,7 +1449,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1566,7 +1583,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1619,9 +1636,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" dependencies = [ "form_urlencoded", "idna", @@ -1670,7 +1687,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-shared", ] @@ -1704,7 +1721,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1733,13 +1750,14 @@ checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" [[package]] name = "which" -version = "4.4.0" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" dependencies = [ "either", - "libc", + "home", "once_cell", + "rustix", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3c45978..7813282 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,29 +1,6 @@ -[package] -name = "rebacs" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -dotenvy = "0.15.7" -log = "0.4.17" -env_logger = "0.10.0" - -serde = { version="1.0", features=["derive"] } -tokio = { version = "1.27.0", features = ["full"] } - -tonic = { version="0.9.2", features=["tls"] } -prost = "0.11.9" -sha2 = "0.10.6" -hex = "0.4.3" -compact_str = "0.7.0" - -thiserror = "1.0.47" - -jsonwebtoken = "8.3.0" - -reqwest = { version="0.11.20", features=["json", "rustls-tls"], default-features=false} - -[build-dependencies] -tonic-build = "0.9.2" +[workspace] +resolver = "2" +members = [ + "rebacs_server", + "rebacs_core", +] diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 057125f..0000000 --- a/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM debian:bookworm-slim AS final -WORKDIR /app -COPY ./target/release/rebacs ./server -CMD [ "/app/server" ] diff --git a/flake.nix b/flake.nix index 4376614..ca6150c 100644 --- a/flake.nix +++ b/flake.nix @@ -33,8 +33,7 @@ rustToolchain = pkgs.rust-bin.nightly.latest.default; protoFilter = path: _type: builtins.match ".*proto$" path != null; - tailwindFilter = path: _type: builtins.match "^tailwind.config.js$" path != null; - protoOrCargo = path: type: (protoFilter path type) || (tailwindFilter path type) || (craneLib.filterCargoSources path type); + protoOrCargo = path: type: (protoFilter path type) || (craneLib.filterCargoSources path type); craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain; src = pkgs.lib.cleanSourceWith { @@ -58,7 +57,7 @@ name = "rebacs"; tag = "latest"; config = { - Cmd = [ "${bin}/bin/rebacs" ]; + Cmd = [ "${bin}/bin/rebacs_server" ]; }; }; diff --git a/rebacs_core/Cargo.toml b/rebacs_core/Cargo.toml new file mode 100644 index 0000000..c192439 --- /dev/null +++ b/rebacs_core/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "rebacs_core" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.32.0", features = [] } diff --git a/src/relation_set.rs b/rebacs_core/src/lib.rs similarity index 94% rename from src/relation_set.rs rename to rebacs_core/src/lib.rs index c35ca8c..8badb83 100644 --- a/src/relation_set.rs +++ b/rebacs_core/src/lib.rs @@ -9,11 +9,13 @@ use std::{ }; use tokio::{ - fs::File, - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + io::{AsyncBufReadExt, AsyncWriteExt}, sync::RwLock, }; +#[cfg(test)] +mod tests; + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodeId { pub namespace: String, @@ -34,11 +36,11 @@ struct Distanced { } #[derive(Default)] -pub struct RelationSet { +pub struct RelationGraph { nodes: RwLock>>, } -impl RelationSet { +impl RelationGraph { pub async fn insert(&self, src: impl Into, dst: impl Into) { let src = src.into(); let dst = dst.into(); @@ -157,13 +159,14 @@ impl RelationSet { false } - pub async fn to_file(&self, file: &mut File) { + pub async fn write_savefile(&self, writeable: &mut (impl AsyncWriteExt + Unpin)) { let mut current: (String, String) = (String::new(), String::new()); for node in self.nodes.read().await.iter() { if current != (node.id.namespace.clone(), node.id.id.clone()) { current = (node.id.namespace.clone(), node.id.id.clone()); - file.write_all("\n".as_bytes()).await.unwrap(); - file.write_all(format!("[{}:{}]\n", ¤t.0, ¤t.1).as_bytes()) + writeable.write_all("\n".as_bytes()).await.unwrap(); + writeable + .write_all(format!("[{}:{}]\n", ¤t.0, ¤t.1).as_bytes()) .await .unwrap(); } @@ -186,15 +189,15 @@ impl RelationSet { .unwrap_or_default(); if let Some(rel) = &node.id.relation { - file.write_all(format!("{} = [ {} ]\n", &rel, &srcs).as_bytes()) + writeable + .write_all(format!("{} = [ {} ]\n", &rel, &srcs).as_bytes()) .await .unwrap(); } } } - pub async fn from_file(file: &mut File) -> Self { - let reader = BufReader::new(file); - let mut lines = reader.lines(); + pub async fn read_savefile(readable: &mut (impl AsyncBufReadExt + Unpin)) -> Self { + let mut lines = readable.lines(); let graph = Self::default(); let mut node: Option<(String, String)> = None; while let Ok(Some(line)) = lines.next_line().await { @@ -274,7 +277,7 @@ impl Eq for Node {} impl PartialOrd for Node { fn partial_cmp(&self, other: &Self) -> Option { - self.id.partial_cmp(&other.id) + Some(self.cmp(other)) } } impl Ord for Node { diff --git a/rebacs_core/src/tests.rs b/rebacs_core/src/tests.rs new file mode 100644 index 0000000..06af760 --- /dev/null +++ b/rebacs_core/src/tests.rs @@ -0,0 +1,52 @@ +//hello world + +use crate::{Distanced, NodeId, RelationGraph}; + +#[test] +fn distanced_ordering() { + let a = Distanced::new((), 0); + let b = Distanced::one(()); + let c = Distanced::new((), 1); + let d = Distanced::new((), 2); + + assert!(a < b); + assert!(b == c); + assert!(c < d); + assert!(a < d); +} + +#[tokio::test] +async fn simple_graph() { + let graph = RelationGraph::default(); + + let alice = ("user", "alice"); + let bob = ("user", "bob"); + let charlie = ("user", "charlie"); + + let foo_read = ("application", "foo", "read"); + let bar_read = ("application", "bar", "read"); + + graph.insert(alice, foo_read).await; + graph.insert(bob, bar_read).await; + + assert!(graph.has_recursive(alice, foo_read, None).await); + assert!(!graph.has_recursive(alice, bar_read, None).await); + + assert!(!graph.has_recursive(bob, foo_read, None).await); + assert!(graph.has_recursive(bob, bar_read, None).await); + + assert!(!graph.has_recursive(charlie, foo_read, None).await); + assert!(!graph.has_recursive(charlie, bar_read, None).await); + + graph.remove(alice, foo_read).await; + graph.remove(alice, bar_read).await; + + assert!(!graph.has_recursive(alice, foo_read, None).await); + assert!(!graph.has_recursive(alice, bar_read, None).await); + + graph.insert(charlie, foo_read).await; + graph.insert(charlie, bar_read).await; + + assert!(graph.has_recursive(charlie, foo_read, None).await); + assert!(graph.has_recursive(charlie, bar_read, None).await); +} diff --git a/rebacs_server/Cargo.toml b/rebacs_server/Cargo.toml new file mode 100644 index 0000000..e630a75 --- /dev/null +++ b/rebacs_server/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "rebacs_server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dotenvy = "0.15.7" +log = "0.4.17" +env_logger = "0.10.0" + +serde = { version="1.0", features=["derive"] } +tokio = { version = "1.27.0", features = ["full"] } + +tonic = { version="0.9.2", features=["tls"] } +prost = "0.11.9" +sha2 = "0.10.6" +hex = "0.4.3" +compact_str = "0.7.0" + +thiserror = "1.0.47" + +jsonwebtoken = "8.3.0" + +reqwest = { version="0.11.20", features=["json", "rustls-tls"], default-features=false} + +rebacs_core = { path="../rebacs_core" } + +[build-dependencies] +tonic-build = "0.9.2" + diff --git a/build.rs b/rebacs_server/build.rs similarity index 100% rename from build.rs rename to rebacs_server/build.rs diff --git a/proto/rebacs.proto b/rebacs_server/proto/rebacs.proto similarity index 100% rename from proto/rebacs.proto rename to rebacs_server/proto/rebacs.proto diff --git a/src/grpc_service.rs b/rebacs_server/src/grpc_service.rs similarity index 98% rename from src/grpc_service.rs rename to rebacs_server/src/grpc_service.rs index 7b47a82..28ac38b 100644 --- a/src/grpc_service.rs +++ b/rebacs_server/src/grpc_service.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use jsonwebtoken::{decode, DecodingKey, TokenData, Validation}; use log::info; +use rebacs_core::{NodeId, RelationGraph}; use serde::Deserialize; use tokio::sync::mpsc::Sender; use tonic::metadata::MetadataMap; @@ -12,11 +13,10 @@ use crate::rebacs_proto::{ rebac_service_server, ExistsReq, ExistsRes, GrantReq, GrantRes, IsPermittedReq, IsPermittedRes, RevokeReq, RevokeRes, }; -use crate::relation_set::{NodeId, RelationSet}; #[derive(Clone)] pub struct RebacService { - pub graph: Arc, + pub graph: Arc, pub oidc_pubkey: DecodingKey, pub oidc_validation: Validation, pub save_trigger: Sender<()>, @@ -158,7 +158,7 @@ async fn is_permitted( token: &TokenData, dst: &NodeId, relation: &str, - graph: &RelationSet, + graph: &RelationGraph, ) -> bool { let s1 = graph .has_recursive( diff --git a/src/main.rs b/rebacs_server/src/main.rs similarity index 87% rename from src/main.rs rename to rebacs_server/src/main.rs index df1e0ec..4fc1c84 100644 --- a/src/main.rs +++ b/rebacs_server/src/main.rs @@ -5,10 +5,11 @@ use std::{env, sync::Arc, time::Duration}; use grpc_service::RebacService; use jsonwebtoken::{Algorithm, DecodingKey, Validation}; use log::info; -use relation_set::RelationSet; +use rebacs_core::RelationGraph; use serde::Deserialize; use tokio::{ fs::{self, File}, + io::BufReader, select, sync::mpsc::channel, }; @@ -16,7 +17,6 @@ use tonic::transport::Server; pub mod grpc_service; pub mod rebacs_proto; -pub mod relation_set; use crate::rebacs_proto::rebac_service_server; @@ -31,10 +31,11 @@ async fn main() { env_logger::init(); info!("loading graph from graph.dat"); - let graph = if let Ok(mut file) = File::open("graph.dat").await { - RelationSet::from_file(&mut file).await + let graph = if let Ok(file) = File::open("graph.dat").await { + let mut reader = BufReader::new(file); + RelationGraph::read_savefile(&mut reader).await } else { - RelationSet::default() + RelationGraph::default() }; let graph = Arc::new(graph); @@ -50,7 +51,7 @@ async fn main() { info!("saving graph"); let _ = fs::copy("graph.dat", "graph.dat.bak").await; let mut file = File::create("graph.dat").await.unwrap(); - save_thread_graph.to_file(&mut file).await; + save_thread_graph.write_savefile(&mut file).await; } }); diff --git a/src/rebacs_proto.rs b/rebacs_server/src/rebacs_proto.rs similarity index 100% rename from src/rebacs_proto.rs rename to rebacs_server/src/rebacs_proto.rs diff --git a/src/graph.rs b/src/graph.rs deleted file mode 100644 index 500fc61..0000000 --- a/src/graph.rs +++ /dev/null @@ -1,735 +0,0 @@ -use std::{ - cmp::Ordering, - collections::{ - hash_map::{Iter, IterMut}, - BinaryHeap, HashMap, HashSet, - }, - hash::Hash, - ops::Deref, - sync::Arc, -}; - -use log::info; -use serde::{Deserialize, Serialize}; -use tokio::{ - fs::File, - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, -}; - -#[derive(Default)] -pub struct Graph { - nodes: BidMap, - edges: BidThreeMap, - counter: u32, -} - -#[derive(Hash, PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] -pub struct Object { - pub namespace: String, - pub id: String, -} - -#[derive(Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Debug)] -pub struct ObjectRef(pub u32); - -#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Serialize)] -pub enum ObjectOrSet { - Object(Object), - Set((Object, Relation)), -} - -#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Relation(pub String); - -#[derive(PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Debug)] -pub struct ObjectRelation(pub ObjectRef, pub Relation); - -impl Object { - pub fn new(namespace: &str, id: &str) -> Self { - Self { - namespace: namespace.to_string(), - id: id.to_string(), - } - } -} - -impl ObjectOrSet { - pub fn object(&self) -> &Object { - match self { - ObjectOrSet::Object(obj) => obj, - ObjectOrSet::Set((obj, _)) => obj, - } - } - pub fn relation(&self) -> Option<&Relation> { - match self { - ObjectOrSet::Object(_) => None, - ObjectOrSet::Set((_, rel)) => Some(rel), - } - } -} - -impl From for ObjectOrSet { - fn from(value: Object) -> Self { - Self::Object(value) - } -} - -impl From<(Object, &str)> for ObjectOrSet { - fn from(value: (Object, &str)) -> Self { - Self::Set((value.0, Relation::new(value.1))) - } -} - -impl Relation { - pub fn new(relation: &str) -> Self { - Self(relation.to_string()) - } -} -impl Deref for Relation { - type Target = String; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From<(ObjectRef, Relation)> for ObjectRelation { - fn from(value: (ObjectRef, Relation)) -> Self { - Self(value.0, value.1) - } -} -impl From<(ObjectRef, &str)> for ObjectRelation { - fn from(value: (ObjectRef, &str)) -> Self { - Self(value.0, Relation::new(value.1)) - } -} -impl From<(&str, &str)> for Object { - fn from((namespace, id): (&str, &str)) -> Self { - Self { - namespace: namespace.to_string(), - id: id.to_string(), - } - } -} -impl From<(&String, &String)> for Object { - fn from((namespace, id): (&String, &String)) -> Self { - Self { - namespace: namespace.to_string(), - id: id.to_string(), - } - } -} -impl From<(String, String)> for Object { - fn from((namespace, id): (String, String)) -> Self { - Self { namespace, id } - } -} - -impl Graph { - pub fn get_node(&self, namespace: &str, id: &str) -> Option { - self.nodes.get_by_a(&Object::new(namespace, id)).cloned() - } - pub fn object_from_ref(&self, obj: &ObjectRef) -> Object { - self.nodes.get_by_b(obj).unwrap().clone() - } - pub fn get_or_add_node(&mut self, namespace: &str, id: &str) -> ObjectRef { - if let Some(node) = self.get_node(namespace, id) { - node - } else { - self.add_node((namespace, id)) - } - } - pub fn add_node(&mut self, node: impl Into) -> ObjectRef { - let obj_ref = ObjectRef(self.counter); - self.nodes.insert(node.into(), obj_ref); - self.counter += 1; - obj_ref - } - pub fn remove_node(&mut self, node: impl Into) { - let node = node.into(); - let index = self.nodes.remove_by_a(&node); - if let Some(index) = index { - self.edges.remove_by_c(&index); - //self.edges.get_by_a(&ObjectOrSet::Object(*index)); - //TODO: remove edges with ObjectOrSet::Set - } - } - pub fn remove_node_by_ref(&mut self, node: impl Into) { - let node = node.into(); - let index = self.nodes.remove_by_b(&node); - if index.is_some() { - self.edges.remove_by_c(&node); - - //let edges = self - // .edges - // .left_to_right - // .keys() - // .filter(|x| *x.object_ref() == node) - // .map(|x| (**x).clone()) - // .collect::>(); - //for edge in edges { - // self.edges.remove_by_a(&edge); - //} - } - } - - pub fn has_relation(&self, src: ObjectOrSet, dst: ObjectRelation) -> bool { - self.edges.has(&src, &dst.1, &dst.0) - } - pub fn add_relation(&mut self, src: impl Into, dst: impl Into) { - let dst = dst.into(); - self.edges.insert(src.into(), dst.1, dst.0); - } - pub fn remove_relation(&mut self, src: impl Into, dst: impl Into) { - let dst = dst.into(); - self.edges.remove(&src.into(), &dst.1, &dst.0); - } - pub fn remove_relation_and_residual_node( - &mut self, - src: impl Into, - dst: impl Into, - ) { - let src = src.into(); - let dst = dst.into(); - self.edges.remove(&src, &dst.1, &dst.0); - - //if self.edges.get_by_c(src.object_ref()).is_empty() - // && !self - // .edges - // .left_to_right - // .keys() - // .any(|x| x.object_ref() == src.object_ref()) - //{ - // self.remove_node_by_ref(*src.object_ref()); - //} - - //if self.edges.get_by_c(&dst.0).is_empty() - // && !self - // .edges - // .left_to_right - // .keys() - // .any(|x| *x.object_ref() == dst.0) - //{ - // self.remove_node_by_ref(dst.0); - //} - } - - pub fn is_related_to( - &self, - src: impl Into, - dst: impl Into, - ) -> bool { - let src = src.into(); - let dst = dst.into(); - let mut dist: HashMap = HashMap::new(); - let mut q: BinaryHeap = BinaryHeap::new(); - - for neighbor in self - .edges - .get_by_a(&src) - .iter() - .flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone()))) - { - if neighbor == dst { - return true; - } - dist.insert(neighbor.clone(), 1); - q.push(ObjectRelationDist(1, neighbor.clone())); - } - - //while let Some(ObjectRelationDist(node_dist, node)) = q.pop() { - // let node_dist = node_dist + 1; - // let node = ObjectOrSet::Set((node.0, node.1)); - // for neighbor in self - // .edges - // .get_by_a(&node) - // .iter() - // .flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone()))) - // { - // if neighbor == dst { - // return true; - // } - // if let Some(existing_node_dist) = dist.get(&neighbor) { - // if *existing_node_dist < node_dist { - // continue; - // } - // } - // dist.insert(neighbor.clone(), node_dist); - // q.push(ObjectRelationDist(node_dist, neighbor.clone())); - // } - //} - - false - } - pub fn related_to(&self, dst: ObjectRef, relation: Relation) -> HashSet { - //let mut relation_sets = vec![]; - //let mut relations: HashSet = HashSet::new(); - //for obj in self.edges.get_by_cb(&dst, &relation) { - // match obj { - // ObjectOrSet::Object(obj) => { - // relations.insert(*obj); - // } - // ObjectOrSet::Set(set) => relation_sets.push(set), - // } - //} - //while let Some(set) = relation_sets.pop() { - // for obj in self.edges.get_by_cb(&set.0, &set.1) { - // match obj { - // ObjectOrSet::Object(obj) => { - // relations.insert(*obj); - // } - // ObjectOrSet::Set(set) => relation_sets.push(set), - // } - // } - //} - //relations - todo!() - } - pub fn relations(&self, src: impl Into) -> HashSet { - //let src: ObjectRelation = src.into(); - - //let mut visited = HashSet::new(); - //let mut relation_sets = vec![]; - //let mut relations = HashSet::new(); - - //for (rel, neighbors) in self.edges.get_by_a(&ObjectOrSet::Object(src.0)) { - // for neighbor in neighbors { - // if *rel == src.1 { - // relations.insert(*neighbor); - // } - // relation_sets.push((rel, neighbor)); - // } - //} - - //while let Some((rel, obj_ref)) = relation_sets.pop() { - // if !visited.contains(&(rel, obj_ref)) { - // for (rel, neighbors) in self - // .edges - // .get_by_a(&ObjectOrSet::Set((*obj_ref, (*rel).clone()))) - // { - // for neighbor in neighbors { - // if *rel == src.1 { - // relations.insert(*neighbor); - // } - // relation_sets.push((rel, neighbor)); - // } - // } - // visited.insert((rel, obj_ref)); - // } - //} - //relations - todo!() - } - - pub async fn to_file(&self, file: &mut File) { - info!("writing graph to file"); - for (obj, obj_ref) in self.nodes.iter() { - file.write_all(format!("[{}:{}]\n", &obj.namespace, &obj.id).as_bytes()) - .await - .unwrap(); - //for (rel, arr) in self.edges.get_by_c(obj_ref.as_ref()) { - // let arr = arr - // .iter() - // .filter_map(|x| { - // let rel_obj_ref = x.object_ref(); - // self.nodes.get_by_b(rel_obj_ref).map(|rel_obj| { - // let (namespace, id) = (&rel_obj.namespace, &rel_obj.id); - - // if *namespace == obj.namespace && *id == obj.id { - // match x.relation() { - // None => "self".to_string(), - // Some(rel) => format!("self#{}", &rel.0), - // } - // } else { - // match x.relation() { - // None => format!("{}:{}", &namespace, &id), - // Some(rel) => format!("{}:{}#{}", &namespace, &id, &rel.0), - // } - // } - // }) - // }) - // .reduce(|acc, e| acc + ", " + &e) - // .unwrap_or_default(); - // file.write_all(format!("{} = [{}]\n", &rel.0, &arr).as_bytes()) - // .await - // .unwrap(); - //} - file.write_all("\n".as_bytes()).await.unwrap(); - } - } - - pub async fn from_file(file: &mut File) -> Self { - info!("reading graph from file"); - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut graph = Graph::default(); - - let mut node: Option<(ObjectRef, String, String)> = None; - let mut relations = vec![]; - while let Ok(Some(line)) = lines.next_line().await { - if line.starts_with('[') && line.ends_with(']') { - let line = &mut line[1..line.len() - 1].split(':'); - let namespace = line.next().unwrap(); - let id = line.next().unwrap(); - let obj_ref = graph.add_node(Object::new(namespace, id)); - node = Some((obj_ref, namespace.to_string(), id.to_string())); - } else if line.contains('=') && line.contains('[') && line.contains(']') { - if let Some(dst) = &node { - let equals_pos = line.find('=').unwrap(); - let arr_start = line.find('[').unwrap(); - let arr_stop = line.find(']').unwrap(); - - let rel = line[..equals_pos].trim(); - let arr = line[arr_start + 1..arr_stop].split(", "); - - for obj in arr { - let (src_namespace, src_id, src_rel) = if obj.contains('#') { - let sep_1 = obj.find(':'); - let sep_2 = obj.find('#').unwrap(); - - let (namespace, id) = if let Some(sep_1) = sep_1 { - (&obj[..sep_1], &obj[sep_1 + 1..sep_2]) - } else { - (dst.1.as_str(), dst.2.as_str()) - }; - - let rel = &obj[sep_2 + 1..]; - - (namespace, id, Some(rel)) - } else { - let sep_1 = obj.find(':'); - - let (namespace, id) = if let Some(sep_1) = sep_1 { - (&obj[..sep_1], &obj[sep_1 + 1..]) - } else { - (dst.1.as_str(), dst.2.as_str()) - }; - (namespace, id, None) - }; - - relations.push(( - src_namespace.to_string(), - src_id.to_string(), - src_rel.map(String::from), - dst.0, - rel.to_string(), - )); - } - } - } - } - - //for relation in relations { - // let src = match relation.2 { - // Some(rel) => { - // let obj = graph.get_node(&relation.0, &relation.1).unwrap(); - // ObjectOrSet::Set((obj, Relation::new(&rel))) - // } - // None => { - // let obj = graph.get_node(&relation.0, &relation.1).unwrap(); - // ObjectOrSet::Object(obj) - // } - // }; - // graph.add_relation(src, ObjectRelation(relation.3, Relation(relation.4))); - //} - - graph - } -} - -/// Helper Struct used for Dijkstra -#[derive(PartialEq, Eq)] -struct ObjectRelationDist(u32, ObjectRelation); - -impl Ord for ObjectRelationDist { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other.0.cmp(&self.0) - } -} -impl PartialOrd for ObjectRelationDist { - fn partial_cmp(&self, other: &Self) -> Option { - Some(other.0.cmp(&self.0)) - } -} - -pub struct BidMap { - left_to_right: HashMap, Arc>, - right_to_left: HashMap, Arc>, -} - -impl Default for BidMap { - fn default() -> Self { - Self { - left_to_right: Default::default(), - right_to_left: Default::default(), - } - } -} - -impl BidMap -where - A: Eq + Hash, - B: Eq + Hash, -{ - pub fn new() -> Self { - Self { - left_to_right: HashMap::new(), - right_to_left: HashMap::new(), - } - } - - pub fn insert(&mut self, a: A, b: B) { - let a = Arc::new(a); - let b = Arc::new(b); - self.left_to_right.insert(a.clone(), b.clone()); - self.right_to_left.insert(b, a); - } - - pub fn remove_by_a(&mut self, a: &A) -> Option> { - if let Some(b) = self.left_to_right.remove(a) { - self.right_to_left.remove(&b); - Some(b) - } else { - None - } - } - - pub fn remove_by_b(&mut self, b: &B) -> Option> { - if let Some(a) = self.right_to_left.remove(b) { - self.left_to_right.remove(&a); - Some(a) - } else { - None - } - } - - pub fn get_by_a(&self, a: &A) -> Option<&B> { - self.left_to_right.get(a).map(Deref::deref) - } - - pub fn get_by_b(&self, b: &B) -> Option<&A> { - self.right_to_left.get(b).map(Deref::deref) - } - - pub fn iter(&self) -> Iter, Arc> { - self.left_to_right.iter() - } - - pub fn iter_mut(&mut self) -> IterMut, Arc> { - self.left_to_right.iter_mut() - } -} - -pub struct BidThreeMap { - left_to_right: HashMap, HashMap, HashSet>>>, - right_to_left: HashMap, HashMap, HashSet>>>, -} - -impl BidThreeMap -where - A: Eq + Hash, - B: Eq + Hash, - C: Eq + Hash, -{ - pub fn new() -> Self { - Self { - left_to_right: HashMap::new(), - right_to_left: HashMap::new(), - } - } - - pub fn insert(&mut self, a: A, b: B, c: C) { - let a = Arc::new(a); - let b = Arc::new(b); - let c = Arc::new(c); - - if let Some(middle) = self.left_to_right.get_mut(&a) { - if let Some(right) = middle.get_mut(&b) { - right.insert(c.clone()); - } else { - let mut right = HashSet::new(); - right.insert(c.clone()); - middle.insert(b.clone(), right); - } - } else { - let mut middle = HashMap::new(); - let mut right = HashSet::new(); - right.insert(c.clone()); - middle.insert(b.clone(), right); - self.left_to_right.insert(a.clone(), middle); - } - - if let Some(middle) = self.right_to_left.get_mut(&c) { - if let Some(left) = middle.get_mut(&b) { - left.insert(a); - } else { - let mut left = HashSet::new(); - left.insert(a); - middle.insert(b, left); - } - } else { - let mut middle = HashMap::new(); - let mut left = HashSet::new(); - left.insert(a); - middle.insert(b, left); - self.right_to_left.insert(c, middle); - } - } - - pub fn remove(&mut self, a: &A, b: &B, c: &C) { - if let Some(right) = self.left_to_right.get_mut(a).and_then(|ltr| ltr.get_mut(b)) { - right.remove(c); - } - if let Some(left) = self.right_to_left.get_mut(c).and_then(|rtl| rtl.get_mut(b)) { - left.remove(a); - } - } - - pub fn remove_by_a(&mut self, a: &A) { - if let Some(map) = self.left_to_right.remove(a) { - for (b, set) in map { - for c in set { - if let Some(set) = self - .right_to_left - .get_mut(&c) - .and_then(|ltr| ltr.get_mut(&b)) - { - set.remove(a); - } - } - } - } - } - - pub fn remove_by_c(&mut self, c: &C) { - if let Some(map) = self.right_to_left.remove(c) { - for (b, set) in map { - for a in set { - if let Some(set) = self - .left_to_right - .get_mut(&a) - .and_then(|ltr| ltr.get_mut(&b)) - { - set.remove(c); - } - } - } - } - } - - pub fn has(&self, a: &A, b: &B, c: &C) -> bool { - self.left_to_right - .get(a) - .and_then(|ltr| ltr.get(b)) - .and_then(|ltr| ltr.get(c)) - .is_some() - } - - pub fn get_by_ab(&self, a: &A, b: &B) -> HashSet<&C> { - self.left_to_right - .get(a) - .and_then(|ltr| ltr.get(b)) - .map(|ltr| ltr.iter().map(|x| x.as_ref()).collect::>()) - .unwrap_or_default() - } - - pub fn get_by_cb(&self, c: &C, b: &B) -> HashSet<&A> { - self.right_to_left - .get(c) - .and_then(|rtl| rtl.get(b)) - .map(|rtl| rtl.iter().map(|x| x.as_ref()).collect::>()) - .unwrap_or_default() - } - - pub fn get_by_a(&self, a: &A) -> HashMap<&B, HashSet<&C>> { - self.left_to_right - .get(a) - .iter() - .flat_map(|x| x.iter()) - .map(|(b, c)| { - ( - b.as_ref(), - c.iter().map(|x| x.as_ref()).collect::>(), - ) - }) - .collect::<_>() - } - - pub fn get_by_c(&self, c: &C) -> HashMap<&B, HashSet<&A>> { - self.right_to_left - .get(c) - .iter() - .flat_map(|x| x.iter()) - .map(|(b, a)| { - ( - b.as_ref(), - a.iter().map(|x| x.as_ref()).collect::>(), - ) - }) - .collect::<_>() - } -} - -impl Default for BidThreeMap { - fn default() -> Self { - Self { - left_to_right: Default::default(), - right_to_left: Default::default(), - } - } -} - -//impl Ord for ObjectOrSet { -// fn cmp(&self, other: &Self) -> Ordering {} -//} -// -//impl Ord for ObjectRef {}cmp - -impl PartialOrd for Relation { - fn partial_cmp(&self, other: &Self) -> Option { - self.0.partial_cmp(&other.0) - } -} -impl Ord for Relation { - fn cmp(&self, other: &Self) -> Ordering { - self.0.cmp(&other.0) - } -} - -impl PartialOrd for ObjectOrSet { - fn partial_cmp(&self, other: &Self) -> Option { - match ( - self.object().partial_cmp(other.object()), - self.relation(), - other.relation(), - ) { - (Some(Ordering::Equal), Some(self_rel), Some(other_rel)) => { - self_rel.partial_cmp(other_rel) - } - (ord, _, _) => ord, - } - } -} -impl Ord for ObjectOrSet { - fn cmp(&self, other: &Self) -> Ordering { - self.object() - .cmp(other.object()) - .then(self.relation().cmp(&other.relation())) - } -} - -impl PartialOrd for Object { - fn partial_cmp(&self, other: &Self) -> Option { - match self.namespace.partial_cmp(&other.namespace) { - Some(core::cmp::Ordering::Equal) => self.id.partial_cmp(&other.id), - ord => ord, - } - } -} -impl Ord for Object { - fn cmp(&self, other: &Self) -> Ordering { - self.namespace - .cmp(&other.namespace) - .then(self.id.cmp(&other.id)) - } -} diff --git a/src/kafka_backend.rs b/src/kafka_backend.rs deleted file mode 100644 index a0f77e3..0000000 --- a/src/kafka_backend.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use kafka::{ - consumer::{Consumer, FetchOffset}, - producer::{Producer, Record, RequiredAcks}, -}; -use log::debug; -use serde::{Deserialize, Serialize}; -use tokio::{ - runtime::Runtime, - sync::{mpsc, RwLock}, - task::JoinHandle, -}; - -use crate::{ - graph::{Graph, ObjectRelation}, - object::{Object, ObjectOrSet, ObjectRef}, -}; - -#[derive(Serialize, Deserialize, Debug)] -pub enum Event { - AddObject(Object), - RemoveObject(Object), - AddRelation((ObjectOrSet, ObjectRelation)), - RemoveRelation((ObjectOrSet, ObjectRelation)), -} - -pub struct GraphProxy { - graph: Arc>, - producer_thread: JoinHandle<()>, - producer_tx: mpsc::Sender, - consumer_thread: JoinHandle<()>, -} -impl GraphProxy { - pub async fn run() -> Self { - let graph = Arc::new(RwLock::new(Graph::default())); - let (producer_tx, mut producer_rx) = mpsc::channel(1024); - - let mut producer = Producer::from_hosts(vec!["localhost:9092".to_owned()]) - .with_ack_timeout(Duration::from_secs(1)) - .with_required_acks(RequiredAcks::One) - .create() - .unwrap(); - - let producer_thread = tokio::spawn(async move { - loop { - if let Some(event) = producer_rx.recv().await { - let ser_event = serde_cbor::to_vec(&event).unwrap(); - producer - .send(&Record::from_value("gpm", ser_event)) - .unwrap(); - debug!("emitted Event: {:?}", event); - } - } - }); - - let mut consumer = Consumer::from_hosts(vec!["localhost:9092".to_string()]) - .with_client_id("gpm_dev".to_string()) - .with_topic("gpm".to_string()) - .with_fallback_offset(FetchOffset::Earliest) - .create() - .unwrap(); - let consumer_graph = graph.clone(); - let consumer_thread = tokio::task::spawn_blocking(move || { - let runtime = Runtime::new().unwrap(); - loop { - for msg_sets in consumer.poll().unwrap().iter() { - for msg in msg_sets.messages() { - let event: Event = serde_cbor::from_slice(msg.value).unwrap(); - debug!("received Event: {:?}", event); - let mut graph = runtime.block_on(consumer_graph.write()); - match event { - Event::AddObject(obj) => { - graph.add_node(obj); - } - Event::RemoveObject(obj) => { - graph.remove_node(obj); - } - Event::AddRelation((src, dst)) => { - graph.add_relation(src, dst); - } - Event::RemoveRelation((src, dst)) => { - graph.remove_relation(src, dst); - } - }; - } - consumer.consume_messageset(msg_sets).unwrap(); - } - consumer.commit_consumed().unwrap(); - } - }); - - Self { - graph, - producer_thread, - producer_tx, - consumer_thread, - } - } - - pub fn stop(&mut self) { - self.producer_thread.abort(); - self.consumer_thread.abort(); - } - - pub async fn add_node(&mut self, node: Object) { - self.producer_tx.send(Event::AddObject(node)).await.unwrap(); - } - pub async fn remove_node(&mut self, node: Object) { - self.producer_tx - .send(Event::RemoveObject(node)) - .await - .unwrap(); - } - pub async fn add_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) { - self.producer_tx - .send(Event::AddRelation((src, dst))) - .await - .unwrap(); - } - pub async fn remove_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) { - self.producer_tx - .send(Event::RemoveRelation((src, dst))) - .await - .unwrap(); - } - - pub async fn get_node(&self, namespace: &str, id: &str) -> Option { - let graph = self.graph.read().await; - graph.get_node(namespace, id) - } - - pub async fn is_related_to( - &self, - src: impl Into, - dst: impl Into, - ) -> bool { - let graph = self.graph.read().await; - graph.is_related_to(src, dst) - } - pub async fn related_by(&self, src: impl Into) -> Vec { - let graph = self.graph.read().await; - graph.related_by(src) - } -}