diff --git a/Cargo.lock b/Cargo.lock index a428c5e..68c7697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ version = "0.1.0" dependencies = [ "anyhow", "futures", - "kameo", + "kameo 0.12.2", "tokio", ] @@ -233,7 +233,24 @@ dependencies = [ "dyn-clone", "futures", "itertools", - "kameo_macros", + "kameo_macros 0.12.2", + "once_cell", + "serde", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "kameo" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62237a96597618543798a36ec723eb75c5ac301e2690243fd600be1f5eb3dd2d" +dependencies = [ + "dyn-clone", + "futures", + "itertools", + "kameo_macros 0.13.0", "once_cell", "serde", "tokio", @@ -245,7 +262,7 @@ dependencies = [ name = "kameo-actors" version = "0.1.0" dependencies = [ - "kameo", + "kameo 0.12.2", "tokio", ] @@ -254,11 +271,21 @@ name = "kameo-delegate-forward" version = "0.1.0" dependencies = [ "futures", - "kameo", + "kameo 0.12.2", "rand", "tokio", ] +[[package]] +name = "kameo-streams" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures", + "kameo 0.13.0", + "tokio", +] + [[package]] name = "kameo_macros" version = "0.12.2" @@ -272,6 +299,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "kameo_macros" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bbbd8e8d7b02bc67eae0dcbdb82c0a71cc7cc61734059ee3e7439a1ee1e0e85" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", + "uuid", +] + [[package]] name = "libc" version = "0.2.161" diff --git a/Cargo.toml b/Cargo.toml index 414ce66..4e597f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "2" -members = ["alpha", "futures-unordered", "kameo-actors", "kameo-delegate-forward", "my-actors"] +members = ["alpha", "futures-unordered", "kameo-actors", "kameo-delegate-forward", "kameo-streams", "my-actors"] diff --git a/kameo-streams/Cargo.toml b/kameo-streams/Cargo.toml new file mode 100644 index 0000000..6a8edfb --- /dev/null +++ b/kameo-streams/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "kameo-streams" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.93" +futures = "0.3.31" +kameo = "0.13.0" +tokio = { version = "1.41.1", features = ["full"] } diff --git a/kameo-streams/src/main.rs b/kameo-streams/src/main.rs new file mode 100644 index 0000000..c925bcf --- /dev/null +++ b/kameo-streams/src/main.rs @@ -0,0 +1,51 @@ +use anyhow::Result; +use futures::channel::oneshot::{self, Sender}; +use kameo::message::{Context, Message, StreamMessage}; + +#[derive(kameo::Actor)] +struct MyActor; + +impl Message, (), ()>> for MyActor { + type Reply = (); + + async fn handle( + &mut self, + msg: StreamMessage, (), ()>, + _: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + match msg { + StreamMessage::Next(ch) => { + println!("Received item!"); + let _ = ch.send(2137); + } + StreamMessage::Started(()) => { + println!("Stream attached!"); + } + StreamMessage::Finished(()) => { + println!("Stream finished!"); + } + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let (ch1_snd, ch1_rcv) = oneshot::channel::(); + let (ch2_snd, ch2_rcv) = oneshot::channel::(); + let (ch3_snd, ch3_rcv) = oneshot::channel::(); + + let stream = futures::stream::iter(vec![ch1_snd, ch2_snd, ch3_snd]); + + let actor_ref = kameo::spawn(MyActor); + let _ = actor_ref + .attach_stream(stream, (), ()) + .await + .unwrap() + .unwrap(); + + println!("{}", ch1_rcv.await.unwrap()); + println!("{}", ch2_rcv.await.unwrap()); + println!("{}", ch3_rcv.await.unwrap()); + + Ok(()) +}