Add kameo-streams example

This commit is contained in:
Maciej Krzyżanowski 2024-11-16 01:13:39 +01:00
parent 841b9ebacb
commit a5f7f6c45e
4 changed files with 106 additions and 5 deletions

48
Cargo.lock generated
View File

@ -164,7 +164,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"kameo",
"kameo 0.12.2",
"tokio",
]
@ -233,7 +233,24 @@ dependencies = [
"dyn-clone",
"futures",
"itertools",
"kameo_macros",
"kameo_macros 0.12.2",
"once_cell",
"serde",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "kameo"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62237a96597618543798a36ec723eb75c5ac301e2690243fd600be1f5eb3dd2d"
dependencies = [
"dyn-clone",
"futures",
"itertools",
"kameo_macros 0.13.0",
"once_cell",
"serde",
"tokio",
@ -245,7 +262,7 @@ dependencies = [
name = "kameo-actors"
version = "0.1.0"
dependencies = [
"kameo",
"kameo 0.12.2",
"tokio",
]
@ -254,11 +271,21 @@ name = "kameo-delegate-forward"
version = "0.1.0"
dependencies = [
"futures",
"kameo",
"kameo 0.12.2",
"rand",
"tokio",
]
[[package]]
name = "kameo-streams"
version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"kameo 0.13.0",
"tokio",
]
[[package]]
name = "kameo_macros"
version = "0.12.2"
@ -272,6 +299,19 @@ dependencies = [
"uuid",
]
[[package]]
name = "kameo_macros"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bbbd8e8d7b02bc67eae0dcbdb82c0a71cc7cc61734059ee3e7439a1ee1e0e85"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
"uuid",
]
[[package]]
name = "libc"
version = "0.2.161"

View File

@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["alpha", "futures-unordered", "kameo-actors", "kameo-delegate-forward", "my-actors"]
members = ["alpha", "futures-unordered", "kameo-actors", "kameo-delegate-forward", "kameo-streams", "my-actors"]

10
kameo-streams/Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "kameo-streams"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.93"
futures = "0.3.31"
kameo = "0.13.0"
tokio = { version = "1.41.1", features = ["full"] }

51
kameo-streams/src/main.rs Normal file
View File

@ -0,0 +1,51 @@
use anyhow::Result;
use futures::channel::oneshot::{self, Sender};
use kameo::message::{Context, Message, StreamMessage};
#[derive(kameo::Actor)]
struct MyActor;
impl Message<StreamMessage<Sender<u32>, (), ()>> for MyActor {
type Reply = ();
async fn handle(
&mut self,
msg: StreamMessage<Sender<u32>, (), ()>,
_: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
match msg {
StreamMessage::Next(ch) => {
println!("Received item!");
let _ = ch.send(2137);
}
StreamMessage::Started(()) => {
println!("Stream attached!");
}
StreamMessage::Finished(()) => {
println!("Stream finished!");
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let (ch1_snd, ch1_rcv) = oneshot::channel::<u32>();
let (ch2_snd, ch2_rcv) = oneshot::channel::<u32>();
let (ch3_snd, ch3_rcv) = oneshot::channel::<u32>();
let stream = futures::stream::iter(vec![ch1_snd, ch2_snd, ch3_snd]);
let actor_ref = kameo::spawn(MyActor);
let _ = actor_ref
.attach_stream(stream, (), ())
.await
.unwrap()
.unwrap();
println!("{}", ch1_rcv.await.unwrap());
println!("{}", ch2_rcv.await.unwrap());
println!("{}", ch3_rcv.await.unwrap());
Ok(())
}