From fb4f7c59fd7897d193496e4ab60f6ffc17a7b302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Krzy=C5=BCanowski?= Date: Mon, 11 Nov 2024 12:21:58 +0100 Subject: [PATCH] Update futures-unordered example --- Cargo.lock | 7 ++++ futures-unordered/Cargo.toml | 1 + futures-unordered/src/main.rs | 62 ++++++++++++++++++++++++++++++----- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5502530..6033721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "anyhow" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" + [[package]] name = "autocfg" version = "1.4.0" @@ -150,6 +156,7 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" name = "futures-unordered" version = "0.1.0" dependencies = [ + "anyhow", "futures", "kameo", "tokio", diff --git a/futures-unordered/Cargo.toml b/futures-unordered/Cargo.toml index 7aa3530..27190b9 100644 --- a/futures-unordered/Cargo.toml +++ b/futures-unordered/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.93" futures = "0.3.31" kameo = "0.12.2" tokio = { version = "1.41.1", features = ["full"] } diff --git a/futures-unordered/src/main.rs b/futures-unordered/src/main.rs index 3d82473..8d4f304 100644 --- a/futures-unordered/src/main.rs +++ b/futures-unordered/src/main.rs @@ -1,22 +1,68 @@ -async fn func_wait(name: &str, millis: u64) { - println!("{name} started"); - tokio::time::sleep(Duration::from_millis(millis)).await; - println!("{name} finished"); +use std::time::Duration; + +use anyhow::{anyhow, bail}; +use futures::{stream::futures_unordered, FutureExt, StreamExt}; +use kameo::{message::Message, request::MessageSend, Actor}; +use anyhow::Ok; + +#[derive(Actor)] +struct ActorA; +struct AMessage; + +impl Message for ActorA { + type Reply = u64; + + async fn handle( + &mut self, + _: AMessage, + _: kameo::message::Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + 0 + } } #[tokio::main] async fn main() { - let actor_a = kameo::spawn(ActorA {}); + let actor_a = kameo::spawn(ActorA{}); let first = actor_a.ask(AMessage).send(); let second = actor_a.ask(AMessage).send(); let mut waiter = futures_unordered::FuturesUnordered::new(); - waiter.push(func_wait("first", 2000)); - waiter.push(func_wait("second", 3000)); - waiter.push(func_wait("third", 1000)); + + waiter.push(async { + let _ = first.await; + Ok(2137) + }.boxed()); + + waiter.push(async { + second.await.map_err(|e| anyhow!(e)) + }.boxed()); + + waiter.push(async { + func_wait("first", 2000).await; + bail!("Oh no"); + }.boxed()); + + waiter.push(async { + func_wait("second", 3000).await; + Ok(123) + }.boxed()); + + waiter.push(async { + func_wait("ey ey ey", 1000).await; + Ok(69) + }.boxed()); while let Some(res) = waiter.next().await { println!("{:?}", res); } } + +async fn func_wait(name: &str, millis: u64) -> String { + println!("{name} started"); + tokio::time::sleep(Duration::from_millis(millis)).await; + println!("{name} finished"); + + "2137".to_string() +}