This commit is contained in:
Paul Zinselmeyer 2023-04-28 18:55:41 +02:00
commit 6cf9e62f00
Signed by: pfzetto
GPG key ID: 4EEF46A5B276E648
13 changed files with 2609 additions and 0 deletions

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
/target
.env
graph.dat
graph.dat.bak

1444
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

18
Cargo.toml Normal file
View file

@ -0,0 +1,18 @@
[package]
name = "permission"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version="1.0", features=["derive"] }
tokio = { version = "1.27.0", features = ["full"] }
log = "0.4.17"
pretty_env_logger = "0.4.0"
dotenvy = "0.15.7"
tonic = { version="0.9.2", features=["tls"] }
prost = "0.11.9"
[build-dependencies]
tonic-build = "0.9.2"

6
build.rs Normal file
View file

@ -0,0 +1,6 @@
fn main() {
tonic_build::configure()
.build_server(true)
.compile(&["proto/graph.proto"], &["proto"])
.unwrap();
}

10
graph.dat Normal file
View file

@ -0,0 +1,10 @@
[user:bob]
[doc:foo]
read = [doc:foo#owner, user:bob]
owner = [user:alice]
[user:oscar]
[user:alice]

10
graph.dat.bak Normal file
View file

@ -0,0 +1,10 @@
[user:bob]
[doc:foo]
read = [user:bob, doc:foo#owner]
owner = [user:alice]
[user:oscar]
[user:alice]

73
proto/graph.proto Normal file
View file

@ -0,0 +1,73 @@
syntax = "proto3";
package eu.zettoit.graph;
service ObjectService{
rpc Create(Object) returns (Empty);
rpc Delete(Object) returns (Empty);
rpc Exists(Object) returns (ExistsResponse);
}
service RelationService {
rpc Create(Relation) returns (Empty);
rpc Delete(Relation) returns (Empty);
rpc Exists(Relation) returns (ExistsResponse);
}
service QueryService {
// check if one object or objectset is related to another by a relation
rpc IsRelatedTo(Relation) returns (IsRelatedToResponse);
// get all objects that are related to one object by a relation
rpc GetRelatedTo(Set) returns (GetRelatedToResponse);
// get all objects that the given object has a relation with
rpc GetRelations(GetRelationsRequest) returns (GetRelationsResponse);
}
message ExistsResponse {
bool exists = 1;
}
message IsRelatedToResponse{
bool related = 1;
}
message GetRelatedToResponse{
repeated Object objects = 1;
}
message GetRelationsRequest{
Object object = 1;
string relation = 2;
}
message GetRelationsResponse{
repeated Object objects = 1;
}
message Object{
string namespace = 1;
string id = 2;
}
message Set{
string namespace = 1;
string id = 2;
string relation = 3;
}
message ObjectOrSet {
oneof object_or_set{
Object object = 1;
Set set = 2;
};
}
message Relation{
oneof src{
Object src_obj = 1;
Set src_set = 2;
};
Object dst = 3;
string relation = 4;
}
message Empty{}

View file

@ -0,0 +1,20 @@
syntax = "proto3";
package eu.zettoit.graph;
message Object{
string namespace = 1;
string id = 2;
}
message ObjectSet{
Object object = 1;
string relation = 2;
}
message Relation{
oneof src{
Object object = 1;
ObjectSet object_set = 2;
}
ObjectSet dst = 3;
}

586
src/graph.rs Normal file
View file

@ -0,0 +1,586 @@
use std::{
collections::{
hash_map::{Iter, IterMut},
BinaryHeap, HashMap, HashSet,
},
hash::Hash,
ops::Deref,
sync::Arc,
};
use log::info;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
};
#[derive(Default)]
pub struct Graph {
nodes: BidMap<Object, ObjectRef>,
edges: BidThreeMap<ObjectOrSet, Relation, ObjectRef>,
counter: u32,
}
#[derive(Hash, PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
pub struct Object {
pub namespace: String,
pub id: String,
}
#[derive(Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ObjectRef(pub u32);
#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Serialize)]
pub enum ObjectOrSet {
Object(ObjectRef),
Set((ObjectRef, Relation)),
}
#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Relation(String);
#[derive(PartialEq, Eq, Clone, Hash, Serialize, Deserialize, Debug)]
pub struct ObjectRelation(pub ObjectRef, pub Relation);
impl Object {
pub fn new(namespace: &str, id: &str) -> Self {
Self {
namespace: namespace.to_string(),
id: id.to_string(),
}
}
}
impl ObjectOrSet {
pub fn object_ref(&self) -> &ObjectRef {
match self {
ObjectOrSet::Object(obj) => obj,
ObjectOrSet::Set((obj, _)) => obj,
}
}
pub fn relation(&self) -> Option<&Relation> {
match self {
ObjectOrSet::Object(_) => None,
ObjectOrSet::Set((_, rel)) => Some(rel),
}
}
}
impl From<ObjectRef> for ObjectOrSet {
fn from(value: ObjectRef) -> Self {
Self::Object(value)
}
}
impl From<(ObjectRef, &str)> for ObjectOrSet {
fn from(value: (ObjectRef, &str)) -> Self {
Self::Set((value.0, Relation::new(value.1)))
}
}
impl Relation {
pub fn new(relation: &str) -> Self {
Self(relation.to_string())
}
}
impl Deref for Relation {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<(ObjectRef, Relation)> for ObjectRelation {
fn from(value: (ObjectRef, Relation)) -> Self {
Self(value.0, value.1)
}
}
impl From<(ObjectRef, &str)> for ObjectRelation {
fn from(value: (ObjectRef, &str)) -> Self {
Self(value.0, Relation::new(value.1))
}
}
impl Graph {
pub fn get_node(&self, namespace: &str, id: &str) -> Option<ObjectRef> {
self.nodes.get_by_a(&Object::new(namespace, id)).cloned()
}
pub fn object_from_ref(&self, obj: &ObjectRef) -> Object {
self.nodes.get_by_b(obj).unwrap().clone()
}
pub fn add_node(&mut self, node: Object) -> ObjectRef {
let obj_ref = ObjectRef(self.counter);
self.nodes.insert(node, obj_ref);
self.counter += 1;
obj_ref
}
pub fn remove_node(&mut self, node: Object) {
let index = self.nodes.remove_by_a(&node);
if let Some(index) = index {
self.edges.remove_by_c(&index);
self.edges.get_by_a(&ObjectOrSet::Object(*index));
//TODO: remove edges with ObjectOrSet::Set
}
}
pub fn has_relation(&self, src: ObjectOrSet, dst: ObjectRelation) -> bool {
self.edges.has(&src, &dst.1, &dst.0)
}
pub fn add_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.edges.insert(src, dst.1, dst.0);
}
pub fn remove_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.edges.remove(&src, &dst.1, &dst.0);
}
pub fn is_related_to(
&self,
src: impl Into<ObjectOrSet>,
dst: impl Into<ObjectRelation>,
) -> bool {
let src = src.into();
let dst = dst.into();
let mut dist: HashMap<ObjectRelation, u32> = HashMap::new();
let mut q: BinaryHeap<ObjectRelationDist> = BinaryHeap::new();
for neighbor in self
.edges
.get_by_a(&src)
.iter()
.flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone())))
{
if neighbor == dst {
return true;
}
dist.insert(neighbor.clone(), 1);
q.push(ObjectRelationDist(1, neighbor.clone()));
}
while let Some(ObjectRelationDist(node_dist, node)) = q.pop() {
let node_dist = node_dist + 1;
let node = ObjectOrSet::Set((node.0, node.1));
for neighbor in self
.edges
.get_by_a(&node)
.iter()
.flat_map(|(r, m)| m.iter().map(|x| ObjectRelation(**x, (**r).clone())))
{
if neighbor == dst {
return true;
}
if let Some(existing_node_dist) = dist.get(&neighbor) {
if *existing_node_dist < node_dist {
continue;
}
}
dist.insert(neighbor.clone(), node_dist);
q.push(ObjectRelationDist(node_dist, neighbor.clone()));
}
}
false
}
pub fn related_to(&self, dst: ObjectRef, relation: Relation) -> HashSet<ObjectRef> {
let mut relation_sets = vec![];
let mut relations: HashSet<ObjectRef> = HashSet::new();
for obj in self.edges.get_by_cb(&dst, &relation) {
match obj {
ObjectOrSet::Object(obj) => {
relations.insert(*obj);
}
ObjectOrSet::Set(set) => relation_sets.push(set),
}
}
while let Some(set) = relation_sets.pop() {
for obj in self.edges.get_by_cb(&set.0, &set.1) {
match obj {
ObjectOrSet::Object(obj) => {
relations.insert(*obj);
}
ObjectOrSet::Set(set) => relation_sets.push(set),
}
}
}
relations
}
pub fn relations(&self, src: impl Into<ObjectRelation>) -> HashSet<ObjectRef> {
let src: ObjectRelation = src.into();
let mut visited = HashSet::new();
let mut relation_sets = vec![];
let mut relations = HashSet::new();
for (rel, neighbors) in self.edges.get_by_a(&ObjectOrSet::Object(src.0)) {
for neighbor in neighbors {
if *rel == src.1 {
relations.insert(*neighbor);
}
relation_sets.push((rel, neighbor));
}
}
while let Some((rel, obj_ref)) = relation_sets.pop() {
if !visited.contains(&(rel, obj_ref)) {
for (rel, neighbors) in self
.edges
.get_by_a(&ObjectOrSet::Set((*obj_ref, (*rel).clone())))
{
for neighbor in neighbors {
if *rel == src.1 {
relations.insert(*neighbor);
}
relation_sets.push((rel, neighbor));
}
}
visited.insert((rel, obj_ref));
}
}
relations
}
pub async fn to_file(&self, file: &mut File) {
info!("writing graph to file");
for (obj, obj_ref) in self.nodes.iter() {
file.write_all(format!("[{}:{}]\n", &obj.namespace, &obj.id).as_bytes())
.await
.unwrap();
for (rel, arr) in self.edges.get_by_c(obj_ref.as_ref()) {
let arr = arr
.iter()
.filter_map(|x| {
let obj_ref = x.object_ref();
self.nodes.get_by_b(obj_ref).map(|obj| {
let (namespace, id) = (&obj.namespace, &obj.id);
match x.relation() {
None => format!("{}:{}", &namespace, &id),
Some(rel) => format!("{}:{}#{}", &namespace, &id, &rel.0),
}
})
})
.reduce(|acc, e| acc + ", " + &e)
.unwrap_or_default();
file.write_all(format!("{} = [{}]\n", &rel.0, &arr).as_bytes())
.await
.unwrap();
}
file.write_all("\n".as_bytes()).await.unwrap();
}
}
pub async fn from_file(file: &mut File) -> Self {
info!("reading graph from file");
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut graph = Graph::default();
let mut node: Option<ObjectRef> = None;
let mut relations = vec![];
while let Ok(Some(line)) = lines.next_line().await {
if line.starts_with('[') && line.ends_with(']') {
let line = &mut line[1..line.len() - 1].split(':');
let obj_ref =
graph.add_node(Object::new(line.next().unwrap(), line.next().unwrap()));
node = Some(obj_ref);
} else if line.contains('=') && line.contains('[') && line.contains(']') {
if let Some(dst) = node {
let equals_pos = line.find('=').unwrap();
let arr_start = line.find('[').unwrap();
let arr_stop = line.find(']').unwrap();
let rel = line[..equals_pos].trim();
let arr = line[arr_start + 1..arr_stop].split(", ");
for obj in arr {
let (src_namespace, src_id, src_rel) = if obj.contains('#') {
let sep_1 = obj.find(':').unwrap();
let sep_2 = obj.find('#').unwrap();
let namespace = &obj[..sep_1];
let id = &obj[sep_1 + 1..sep_2];
let rel = &obj[sep_2 + 1..];
(namespace, id, Some(rel))
} else {
let sep_1 = obj.find(':').unwrap();
let namespace = &obj[..sep_1];
let id = &obj[sep_1 + 1..];
(namespace, id, None)
};
relations.push((
src_namespace.to_string(),
src_id.to_string(),
src_rel.map(String::from),
dst,
rel.to_string(),
));
}
}
}
}
for relation in relations {
let src = match relation.2 {
Some(rel) => {
let obj = graph.get_node(&relation.0, &relation.1).unwrap();
ObjectOrSet::Set((obj, Relation::new(&rel)))
}
None => {
let obj = graph.get_node(&relation.0, &relation.1).unwrap();
ObjectOrSet::Object(obj)
}
};
graph.add_relation(src, ObjectRelation(relation.3, Relation(relation.4)));
}
graph
}
}
/// Helper Struct used for Dijkstra
#[derive(PartialEq, Eq)]
struct ObjectRelationDist(u32, ObjectRelation);
impl Ord for ObjectRelationDist {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.0.cmp(&self.0)
}
}
impl PartialOrd for ObjectRelationDist {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.0.cmp(&self.0))
}
}
pub struct BidMap<A, B> {
left_to_right: HashMap<Arc<A>, Arc<B>>,
right_to_left: HashMap<Arc<B>, Arc<A>>,
}
impl<A, B> Default for BidMap<A, B> {
fn default() -> Self {
Self {
left_to_right: Default::default(),
right_to_left: Default::default(),
}
}
}
impl<A, B> BidMap<A, B>
where
A: Eq + Hash,
B: Eq + Hash,
{
pub fn new() -> Self {
Self {
left_to_right: HashMap::new(),
right_to_left: HashMap::new(),
}
}
pub fn insert(&mut self, a: A, b: B) {
let a = Arc::new(a);
let b = Arc::new(b);
self.left_to_right.insert(a.clone(), b.clone());
self.right_to_left.insert(b, a);
}
pub fn remove_by_a(&mut self, a: &A) -> Option<Arc<B>> {
if let Some(b) = self.left_to_right.remove(a) {
self.right_to_left.remove(&b);
Some(b)
} else {
None
}
}
pub fn remove_by_b(&mut self, b: &B) -> Option<Arc<A>> {
if let Some(a) = self.right_to_left.remove(b) {
self.left_to_right.remove(&a);
Some(a)
} else {
None
}
}
pub fn get_by_a(&self, a: &A) -> Option<&B> {
self.left_to_right.get(a).map(Deref::deref)
}
pub fn get_by_b(&self, b: &B) -> Option<&A> {
self.right_to_left.get(b).map(Deref::deref)
}
pub fn iter(&self) -> Iter<Arc<A>, Arc<B>> {
self.left_to_right.iter()
}
pub fn iter_mut(&mut self) -> IterMut<Arc<A>, Arc<B>> {
self.left_to_right.iter_mut()
}
}
pub struct BidThreeMap<A, B, C> {
left_to_right: HashMap<Arc<A>, HashMap<Arc<B>, HashSet<Arc<C>>>>,
right_to_left: HashMap<Arc<C>, HashMap<Arc<B>, HashSet<Arc<A>>>>,
}
impl<A, B, C> BidThreeMap<A, B, C>
where
A: Eq + Hash,
B: Eq + Hash,
C: Eq + Hash,
{
pub fn new() -> Self {
Self {
left_to_right: HashMap::new(),
right_to_left: HashMap::new(),
}
}
pub fn insert(&mut self, a: A, b: B, c: C) {
let a = Arc::new(a);
let b = Arc::new(b);
let c = Arc::new(c);
if let Some(middle) = self.left_to_right.get_mut(&a) {
if let Some(right) = middle.get_mut(&b) {
right.insert(c.clone());
} else {
let mut right = HashSet::new();
right.insert(c.clone());
middle.insert(b.clone(), right);
}
} else {
let mut middle = HashMap::new();
let mut right = HashSet::new();
right.insert(c.clone());
middle.insert(b.clone(), right);
self.left_to_right.insert(a.clone(), middle);
}
if let Some(middle) = self.right_to_left.get_mut(&c) {
if let Some(left) = middle.get_mut(&b) {
left.insert(a);
} else {
let mut left = HashSet::new();
left.insert(a);
middle.insert(b, left);
}
} else {
let mut middle = HashMap::new();
let mut left = HashSet::new();
left.insert(a);
middle.insert(b, left);
self.right_to_left.insert(c, middle);
}
}
pub fn remove(&mut self, a: &A, b: &B, c: &C) {
if let Some(right) = self.left_to_right.get_mut(a).and_then(|ltr| ltr.get_mut(b)) {
right.remove(c);
}
if let Some(left) = self.right_to_left.get_mut(c).and_then(|rtl| rtl.get_mut(b)) {
left.remove(a);
}
}
pub fn remove_by_a(&mut self, a: &A) {
if let Some(map) = self.left_to_right.remove(a) {
for (b, set) in map {
for c in set {
if let Some(set) = self
.right_to_left
.get_mut(&c)
.and_then(|ltr| ltr.get_mut(&b))
{
set.remove(a);
}
}
}
}
}
pub fn remove_by_c(&mut self, c: &C) {
if let Some(map) = self.right_to_left.remove(c) {
for (b, set) in map {
for a in set {
if let Some(set) = self
.left_to_right
.get_mut(&a)
.and_then(|ltr| ltr.get_mut(&b))
{
set.remove(c);
}
}
}
}
}
pub fn has(&self, a: &A, b: &B, c: &C) -> bool {
self.left_to_right
.get(a)
.and_then(|ltr| ltr.get(b))
.and_then(|ltr| ltr.get(c))
.is_some()
}
pub fn get_by_ab(&self, a: &A, b: &B) -> HashSet<&C> {
self.left_to_right
.get(a)
.and_then(|ltr| ltr.get(b))
.map(|ltr| ltr.iter().map(|x| x.as_ref()).collect::<HashSet<_>>())
.unwrap_or_default()
}
pub fn get_by_cb(&self, c: &C, b: &B) -> HashSet<&A> {
self.right_to_left
.get(c)
.and_then(|rtl| rtl.get(b))
.map(|rtl| rtl.iter().map(|x| x.as_ref()).collect::<HashSet<_>>())
.unwrap_or_default()
}
pub fn get_by_a(&self, a: &A) -> HashMap<&B, HashSet<&C>> {
self.left_to_right
.get(a)
.iter()
.flat_map(|x| x.iter())
.map(|(b, c)| {
(
b.as_ref(),
c.iter().map(|x| x.as_ref()).collect::<HashSet<&C>>(),
)
})
.collect::<_>()
}
pub fn get_by_c(&self, c: &C) -> HashMap<&B, HashSet<&A>> {
self.right_to_left
.get(c)
.iter()
.flat_map(|x| x.iter())
.map(|(b, a)| {
(
b.as_ref(),
a.iter().map(|x| x.as_ref()).collect::<HashSet<&A>>(),
)
})
.collect::<_>()
}
}
impl<A, B, C> Default for BidThreeMap<A, B, C> {
fn default() -> Self {
Self {
left_to_right: Default::default(),
right_to_left: Default::default(),
}
}
}

1
src/graph_permissions.rs Normal file
View file

@ -0,0 +1 @@
tonic::include_proto!("eu.zettoit.graph");

230
src/grpc_service.rs Normal file
View file

@ -0,0 +1,230 @@
use std::sync::Arc;
use log::info;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};
use crate::graph::{self, Graph, ObjectRelation};
use crate::graph_permissions::{
object_service_server::ObjectService, query_service_server::QueryService, relation::Src,
relation_service_server::RelationService, Empty, ExistsResponse, GetRelatedToResponse,
GetRelationsRequest, GetRelationsResponse, IsRelatedToResponse, Object, Relation, Set,
};
#[derive(Clone)]
pub struct GraphService {
pub graph: Arc<Mutex<Graph>>,
pub save_trigger: Sender<()>,
}
#[tonic::async_trait]
impl ObjectService for GraphService {
async fn create(&self, request: Request<Object>) -> Result<Response<Empty>, Status> {
let mut graph = self.graph.lock().await;
if request.get_ref().namespace.is_empty() || request.get_ref().id.is_empty() {
return Err(Status::invalid_argument("namespace and id must be set"));
}
graph.add_node(graph::Object::new(
&request.get_ref().namespace,
&request.get_ref().id,
));
info!(
"created object {}:{}",
&request.get_ref().namespace,
&request.get_ref().id
);
self.save_trigger.send(()).await.unwrap();
Ok(Response::new(Empty {}))
}
async fn delete(&self, request: Request<Object>) -> Result<Response<Empty>, Status> {
let mut graph = self.graph.lock().await;
if request.get_ref().namespace.is_empty() || request.get_ref().id.is_empty() {
return Err(Status::invalid_argument("namespace and id must be set"));
}
graph.remove_node(graph::Object::new(
&request.get_ref().namespace,
&request.get_ref().id,
));
info!(
"removed object {}:{}",
&request.get_ref().namespace,
&request.get_ref().id
);
self.save_trigger.send(()).await.unwrap();
Ok(Response::new(Empty {}))
}
async fn exists(&self, request: Request<Object>) -> Result<Response<ExistsResponse>, Status> {
let graph = self.graph.lock().await;
if request.get_ref().namespace.is_empty() || request.get_ref().id.is_empty() {
return Err(Status::invalid_argument("namespace and id must be set"));
}
let exists = graph
.get_node(&request.get_ref().namespace, &request.get_ref().id)
.is_some();
Ok(Response::new(ExistsResponse { exists }))
}
}
#[tonic::async_trait]
impl RelationService for GraphService {
async fn create(&self, request: Request<Relation>) -> Result<Response<Empty>, Status> {
let mut graph = self.graph.lock().await;
let (src, dst) = transform_relation(request.get_ref(), &graph)?;
graph.add_relation(src, dst);
info!("created relation");
self.save_trigger.send(()).await.unwrap();
Ok(Response::new(Empty {}))
}
async fn delete(&self, request: Request<Relation>) -> Result<Response<Empty>, Status> {
let mut graph = self.graph.lock().await;
let (src, dst) = transform_relation(request.get_ref(), &graph)?;
graph.remove_relation(src, dst);
info!("removed relation relation");
self.save_trigger.send(()).await.unwrap();
Ok(Response::new(Empty {}))
}
async fn exists(&self, request: Request<Relation>) -> Result<Response<ExistsResponse>, Status> {
let graph = self.graph.lock().await;
let (src, dst) = transform_relation(request.get_ref(), &graph)?;
let exists = graph.has_relation(src, dst);
Ok(Response::new(ExistsResponse { exists }))
}
}
#[tonic::async_trait]
impl QueryService for GraphService {
async fn is_related_to(
&self,
request: Request<Relation>,
) -> Result<Response<IsRelatedToResponse>, Status> {
let graph = self.graph.lock().await;
let (src, dst) = transform_relation(request.get_ref(), &graph)?;
let related = graph.is_related_to(src, dst);
Ok(Response::new(IsRelatedToResponse { related }))
}
async fn get_related_to(
&self,
request: Request<Set>,
) -> Result<Response<GetRelatedToResponse>, Status> {
let graph = self.graph.lock().await;
let obj = graph
.get_node(&request.get_ref().namespace, &request.get_ref().id)
.ok_or(Status::not_found("object not found"))?;
let rel = graph::Relation::new(&request.get_ref().relation);
Ok(Response::new(GetRelatedToResponse {
objects: graph
.related_to(obj, rel)
.into_iter()
.map(|x| {
let obj = graph.object_from_ref(&x);
Object {
namespace: obj.namespace.to_string(),
id: obj.id,
}
})
.collect::<Vec<_>>(),
}))
}
async fn get_relations(
&self,
request: Request<GetRelationsRequest>,
) -> Result<Response<GetRelationsResponse>, Status> {
let graph = self.graph.lock().await;
if request.get_ref().relation.is_empty() {
return Err(Status::invalid_argument("relation must be set"));
}
let obj = request
.get_ref()
.object
.as_ref()
.ok_or(Status::invalid_argument("object must be set"))?;
let obj = graph
.get_node(&obj.namespace, &obj.id)
.ok_or(Status::not_found("object not found"))?;
Ok(Response::new(GetRelationsResponse {
objects: graph
.relations(ObjectRelation(
obj,
graph::Relation::new(&request.get_ref().relation),
))
.into_iter()
.map(|x| {
let obj = graph.object_from_ref(&x);
Object {
namespace: obj.namespace.to_string(),
id: obj.id,
}
})
.collect::<Vec<_>>(),
}))
}
}
fn transform_relation(
rel: &Relation,
graph: &Graph,
) -> Result<(graph::ObjectOrSet, graph::ObjectRelation), Status> {
let src = match rel
.src
.as_ref()
.ok_or(Status::invalid_argument("src must be set"))?
{
Src::SrcObj(object) => graph::ObjectOrSet::Object(
graph
.get_node(&object.namespace, &object.id)
.ok_or(Status::not_found("src object could not be found"))?,
),
Src::SrcSet(set) => graph::ObjectOrSet::Set((
graph
.get_node(&set.namespace, &set.id)
.ok_or(Status::not_found("src object could not be found"))?,
graph::Relation::new(&set.relation),
)),
};
let dst = rel
.dst
.as_ref()
.ok_or(Status::invalid_argument("dst must be set"))?;
let dst = graph
.get_node(&dst.namespace, &dst.id)
.ok_or(Status::not_found("dst object could not be found"))?;
let dst = ObjectRelation(dst, graph::Relation::new(&rel.relation));
Ok((src, dst))
}

145
src/kafka_backend.rs Normal file
View file

@ -0,0 +1,145 @@
use std::{sync::Arc, time::Duration};
use kafka::{
consumer::{Consumer, FetchOffset},
producer::{Producer, Record, RequiredAcks},
};
use log::debug;
use serde::{Deserialize, Serialize};
use tokio::{
runtime::Runtime,
sync::{mpsc, RwLock},
task::JoinHandle,
};
use crate::{
graph::{Graph, ObjectRelation},
object::{Object, ObjectOrSet, ObjectRef},
};
#[derive(Serialize, Deserialize, Debug)]
pub enum Event {
AddObject(Object),
RemoveObject(Object),
AddRelation((ObjectOrSet, ObjectRelation)),
RemoveRelation((ObjectOrSet, ObjectRelation)),
}
pub struct GraphProxy {
graph: Arc<RwLock<Graph>>,
producer_thread: JoinHandle<()>,
producer_tx: mpsc::Sender<Event>,
consumer_thread: JoinHandle<()>,
}
impl GraphProxy {
pub async fn run() -> Self {
let graph = Arc::new(RwLock::new(Graph::default()));
let (producer_tx, mut producer_rx) = mpsc::channel(1024);
let mut producer = Producer::from_hosts(vec!["localhost:9092".to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
let producer_thread = tokio::spawn(async move {
loop {
if let Some(event) = producer_rx.recv().await {
let ser_event = serde_cbor::to_vec(&event).unwrap();
producer
.send(&Record::from_value("gpm", ser_event))
.unwrap();
debug!("emitted Event: {:?}", event);
}
}
});
let mut consumer = Consumer::from_hosts(vec!["localhost:9092".to_string()])
.with_client_id("gpm_dev".to_string())
.with_topic("gpm".to_string())
.with_fallback_offset(FetchOffset::Earliest)
.create()
.unwrap();
let consumer_graph = graph.clone();
let consumer_thread = tokio::task::spawn_blocking(move || {
let runtime = Runtime::new().unwrap();
loop {
for msg_sets in consumer.poll().unwrap().iter() {
for msg in msg_sets.messages() {
let event: Event = serde_cbor::from_slice(msg.value).unwrap();
debug!("received Event: {:?}", event);
let mut graph = runtime.block_on(consumer_graph.write());
match event {
Event::AddObject(obj) => {
graph.add_node(obj);
}
Event::RemoveObject(obj) => {
graph.remove_node(obj);
}
Event::AddRelation((src, dst)) => {
graph.add_relation(src, dst);
}
Event::RemoveRelation((src, dst)) => {
graph.remove_relation(src, dst);
}
};
}
consumer.consume_messageset(msg_sets).unwrap();
}
consumer.commit_consumed().unwrap();
}
});
Self {
graph,
producer_thread,
producer_tx,
consumer_thread,
}
}
pub fn stop(&mut self) {
self.producer_thread.abort();
self.consumer_thread.abort();
}
pub async fn add_node(&mut self, node: Object) {
self.producer_tx.send(Event::AddObject(node)).await.unwrap();
}
pub async fn remove_node(&mut self, node: Object) {
self.producer_tx
.send(Event::RemoveObject(node))
.await
.unwrap();
}
pub async fn add_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.producer_tx
.send(Event::AddRelation((src, dst)))
.await
.unwrap();
}
pub async fn remove_relation(&mut self, src: ObjectOrSet, dst: ObjectRelation) {
self.producer_tx
.send(Event::RemoveRelation((src, dst)))
.await
.unwrap();
}
pub async fn get_node(&self, namespace: &str, id: &str) -> Option<ObjectRef> {
let graph = self.graph.read().await;
graph.get_node(namespace, id)
}
pub async fn is_related_to(
&self,
src: impl Into<ObjectOrSet>,
dst: impl Into<ObjectRelation>,
) -> bool {
let graph = self.graph.read().await;
graph.is_related_to(src, dst)
}
pub async fn related_by(&self, src: impl Into<ObjectRelation>) -> Vec<ObjectOrSet> {
let graph = self.graph.read().await;
graph.related_by(src)
}
}

62
src/main.rs Normal file
View file

@ -0,0 +1,62 @@
use std::{sync::Arc, time::Duration};
use graph::Graph;
use grpc_service::GraphService;
use tokio::{
fs::{self, File},
select,
sync::{mpsc::channel, Mutex},
};
use tonic::transport::Server;
pub mod graph;
pub mod graph_permissions;
pub mod grpc_service;
use crate::graph_permissions::{
object_service_server::ObjectServiceServer, query_service_server::QueryServiceServer,
relation_service_server::RelationServiceServer,
};
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
pretty_env_logger::init();
let graph = if let Ok(mut file) = File::open("graph.dat").await {
Graph::from_file(&mut file).await
} else {
Graph::default()
};
let graph = Arc::new(Mutex::new(graph));
let (save_tx, mut save_rx) = channel::<()>(32);
let save_thread_graph = graph.clone();
tokio::spawn(async move {
loop {
select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {}
_ = save_rx.recv() => {}
};
let graph = save_thread_graph.lock().await;
let _ = fs::copy("graph.dat", "graph.dat.bak").await;
let mut file = File::create("graph.dat").await.unwrap();
graph.to_file(&mut file).await;
}
});
let graph_service = GraphService {
graph: graph.clone(),
save_trigger: save_tx.clone(),
};
Server::builder()
.add_service(ObjectServiceServer::new(graph_service.clone()))
.add_service(RelationServiceServer::new(graph_service.clone()))
.add_service(QueryServiceServer::new(graph_service))
.serve("0.0.0.0:50051".parse().unwrap())
.await
.unwrap()
}