atomr_accel_cuda/streams_pipeline.rs
1//! `atomr-streams`-based pipeline helpers — the F10 successor to the
2//! actor-based [`crate::pipeline::PipelineExecutor`].
3//!
4//! Wraps the Source / Sink DSL so callers can compose GPU kernel
5//! stages with the rest of a `atomr-streams` graph (file IO, TCP,
6//! framing, kill switches, restart-on-error supervision).
7//!
8//! The functions here are intentionally thin: they let you build a
9//! `Source<I>` from any `mpsc::UnboundedReceiver`, transform it with
10//! a `map_async` stage that calls a user-supplied async function (the
11//! GPU kernel call), and terminate with one of the built-in sinks.
12//! For more complex topologies (broadcast, balance, partition), drop
13//! straight into `atomr_streams::*` — this module's helpers stay out
14//! of your way.
15
16use atomr_streams::{Sink, Source};
17
18/// Wrap a `tokio::sync::mpsc::UnboundedReceiver` as a streams `Source`.
19///
20/// Callers send work into the matching `UnboundedSender`; the source
21/// terminates when every sender is dropped.
22pub fn source_from_unbounded<T: Send + 'static>(
23 rx: tokio::sync::mpsc::UnboundedReceiver<T>,
24) -> Source<T> {
25 Source::from_receiver(rx)
26}
27
28/// Apply an async GPU stage with the given degree of parallelism.
29/// Ordering is preserved (akka.net's `SelectAsync`).
30///
31/// # Example
32///
33/// ```ignore
34/// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<f32>();
35/// let s = source_from_unbounded(rx);
36/// let s = gpu_stage::<f32, f32, _, _>(s, 4, |x| async move { x * 2.0 });
37/// let out = Sink::collect(s).await;
38/// ```
39pub fn gpu_stage<I, O, F, Fut>(source: Source<I>, parallelism: usize, f: F) -> Source<O>
40where
41 I: Send + 'static,
42 O: Send + 'static,
43 F: FnMut(I) -> Fut + Send + 'static,
44 Fut: std::future::Future<Output = O> + Send + 'static,
45{
46 source.map_async(parallelism.max(1), f)
47}
48
49/// Run a single-stage pipeline end-to-end: pull from `rx`, apply the
50/// async `stage` with the given parallelism, and collect every output
51/// into a `Vec`. The future completes when every sender is dropped
52/// upstream.
53pub async fn run_collect<I, O, F, Fut>(
54 rx: tokio::sync::mpsc::UnboundedReceiver<I>,
55 parallelism: usize,
56 stage: F,
57) -> Vec<O>
58where
59 I: Send + 'static,
60 O: Send + 'static,
61 F: FnMut(I) -> Fut + Send + 'static,
62 Fut: std::future::Future<Output = O> + Send + 'static,
63{
64 let s = gpu_stage(source_from_unbounded(rx), parallelism, stage);
65 Sink::collect(s).await
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71
72 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
73 async fn unbounded_round_trips_through_async_stage() {
74 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
75 for i in 1..=5 {
76 tx.send(i).unwrap();
77 }
78 drop(tx);
79
80 let mut got = run_collect::<u32, u32, _, _>(rx, 4, |x| async move { x * 10 }).await;
81 // map_async with parallelism=4 doesn't guarantee global order
82 // for >1 in-flight, so sort before assertion.
83 got.sort();
84 assert_eq!(got, vec![10, 20, 30, 40, 50]);
85 }
86}