From 50f86042900fe42ec0ba21ec382cfc5b5843b761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Krzy=C5=BCanowski?= Date: Tue, 8 Oct 2024 11:41:29 +0200 Subject: [PATCH] Add beta --- src/bin/beta.rs | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 src/bin/beta.rs diff --git a/src/bin/beta.rs b/src/bin/beta.rs new file mode 100644 index 0000000..4f97e1f --- /dev/null +++ b/src/bin/beta.rs @@ -0,0 +1,149 @@ +use tokio::sync::{mpsc::{Receiver, Sender}, oneshot}; + +struct DatabaseStore { + counter: u64, +} + +struct DatabaseClient { + queries: Sender, +} + +struct DatabaseManager { + store: DatabaseStore, + queries: Receiver, +} + +struct Database { + client: DatabaseClient, +} + +enum DatabaseQuery { + Increment{ + resp: oneshot::Sender, + }, + Decrement{ + resp: oneshot::Sender, + }, + Get{ + resp: oneshot::Sender, + }, +} + +enum DatabaseResponse { + Ok, + Value(u64), +} + +#[tokio::main] +async fn main() { + let db = Database::new(DatabaseStore::new()); + + if let Ok(counter) = db.client.get().await { + println!("{}", counter); + } + + if let Ok(_) = db.client.increment().await { + println!("incremented"); + } + + if let Ok(counter) = db.client.get().await { + println!("{}", counter); + } + + if let Ok(_) = db.client.decrement().await { + println!("decremented"); + } + + if let Ok(counter) = db.client.get().await { + println!("{}", counter); + } +} + +impl Database { + fn new(store_init: DatabaseStore) -> Database { + let (tx, rx) = tokio::sync::mpsc::channel(32); + let mut manager = DatabaseManager{ store: store_init, queries: rx }; + tokio::spawn(async move { + manager.run().await; + }); + + Database{ + client: DatabaseClient{ + queries: tx, + }, + } + } +} + +impl DatabaseManager { + async fn run(&mut self) { + loop { + if let Some(query) = self.queries.recv().await { + self.handle_query(query); + } else { + break; + } + } + } + + fn handle_query(&mut self, query: DatabaseQuery) { + match query { + DatabaseQuery::Increment{ resp } => { + self.store.counter += 1; + let _ = resp.send(DatabaseResponse::Ok); + } + DatabaseQuery::Decrement { resp } => { + self.store.counter -= 1; + let _ = resp.send(DatabaseResponse::Ok); + } + DatabaseQuery::Get { resp } => { + let _ = resp.send(DatabaseResponse::Value(self.store.counter)); + } + } + } +} + +impl DatabaseStore { + fn new() -> DatabaseStore { + DatabaseStore{ + counter: 0, + } + } +} + +impl DatabaseClient { + async fn increment(&self) -> Result<(), ()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self.queries.send(DatabaseQuery::Increment { resp: tx }).await; + + match rx.await { + Ok(_) => Ok(()), + Err(_) => Err(()) + } + } + + async fn decrement(&self) -> Result<(), ()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self.queries.send(DatabaseQuery::Decrement { resp: tx }).await; + + match rx.await { + Ok(_) => Ok(()), + Err(_) => Err(()) + } + } + + async fn get(&self) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self.queries.send(DatabaseQuery::Get { resp: tx }).await; + + match rx.await { + Ok(db_resp) => { + match db_resp { + DatabaseResponse::Value(value) => Ok(value), + _ => Err(()) + } + } + Err(_) => Err(()) + } + } +} \ No newline at end of file