Update futures-unordered example

This commit is contained in:
Maciej Krzyżanowski 2024-11-11 12:21:58 +01:00
parent 0f538ea7fc
commit fb4f7c59fd
3 changed files with 62 additions and 8 deletions

7
Cargo.lock generated
View File

@ -24,6 +24,12 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "anyhow"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
@ -150,6 +156,7 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
name = "futures-unordered" name = "futures-unordered"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"futures", "futures",
"kameo", "kameo",
"tokio", "tokio",

View File

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.93"
futures = "0.3.31" futures = "0.3.31"
kameo = "0.12.2" kameo = "0.12.2"
tokio = { version = "1.41.1", features = ["full"] } tokio = { version = "1.41.1", features = ["full"] }

View File

@ -1,22 +1,68 @@
async fn func_wait(name: &str, millis: u64) { use std::time::Duration;
println!("{name} started");
tokio::time::sleep(Duration::from_millis(millis)).await; use anyhow::{anyhow, bail};
println!("{name} finished"); use futures::{stream::futures_unordered, FutureExt, StreamExt};
use kameo::{message::Message, request::MessageSend, Actor};
use anyhow::Ok;
#[derive(Actor)]
struct ActorA;
struct AMessage;
impl Message<AMessage> for ActorA {
type Reply = u64;
async fn handle(
&mut self,
_: AMessage,
_: kameo::message::Context<'_, Self, Self::Reply>,
) -> Self::Reply {
0
}
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let actor_a = kameo::spawn(ActorA {}); let actor_a = kameo::spawn(ActorA{});
let first = actor_a.ask(AMessage).send(); let first = actor_a.ask(AMessage).send();
let second = actor_a.ask(AMessage).send(); let second = actor_a.ask(AMessage).send();
let mut waiter = futures_unordered::FuturesUnordered::new(); let mut waiter = futures_unordered::FuturesUnordered::new();
waiter.push(func_wait("first", 2000));
waiter.push(func_wait("second", 3000)); waiter.push(async {
waiter.push(func_wait("third", 1000)); let _ = first.await;
Ok(2137)
}.boxed());
waiter.push(async {
second.await.map_err(|e| anyhow!(e))
}.boxed());
waiter.push(async {
func_wait("first", 2000).await;
bail!("Oh no");
}.boxed());
waiter.push(async {
func_wait("second", 3000).await;
Ok(123)
}.boxed());
waiter.push(async {
func_wait("ey ey ey", 1000).await;
Ok(69)
}.boxed());
while let Some(res) = waiter.next().await { while let Some(res) = waiter.next().await {
println!("{:?}", res); println!("{:?}", res);
} }
} }
async fn func_wait(name: &str, millis: u64) -> String {
println!("{name} started");
tokio::time::sleep(Duration::from_millis(millis)).await;
println!("{name} finished");
"2137".to_string()
}