Skip to main content

atomr_accel_cuda/stream/
per_actor.rs

1//! `PerActorAllocator` โ€” F2 default. Each `KernelActor` gets a fresh
2//! `CudaStream`, maximising kernel concurrency at the cost of one
3//! stream per actor (ยง5.7 default).
4//!
5//! Two construction modes:
6//!
7//! - [`PerActorAllocator::new`] โ€” F1 back-compat, takes a pre-created
8//!   stream and hands the same stream to every `acquire()` call.
9//!   Equivalent to the F1 single-stream behaviour. Kept so the
10//!   foundation tests / `BlasActor::props_legacy` keep working.
11//! - [`PerActorAllocator::with_context`] โ€” F2 fresh-stream mode.
12//!   Holds an `Arc<CudaContext>` and mints a brand-new stream on
13//!   every `acquire()`. cudarc 0.19 does not expose
14//!   `cuStreamCreateWithPriority` at the safe layer, so the
15//!   `Priority` hint is currently a record-only field; future
16//!   cudarc versions will let us honour it without changing this
17//!   surface.
18
19use std::sync::{Arc, Weak};
20
21use cudarc::driver::{CudaContext, CudaStream};
22
23use super::{ActorHints, StreamAllocator};
24
25#[derive(Clone)]
26pub struct PerActorAllocator {
27    inner: Arc<PerActorInner>,
28}
29
30enum PerActorInner {
31    /// Hand the same stream to every caller. F1 behaviour.
32    Shared { stream: Arc<CudaStream> },
33    /// Mint a fresh stream per `acquire()`.
34    Fresh {
35        ctx: Arc<CudaContext>,
36        minted: parking_lot::Mutex<Vec<Weak<CudaStream>>>,
37    },
38}
39
40impl PerActorAllocator {
41    /// F1 back-compat: every `acquire()` returns the supplied stream.
42    pub fn new(stream: Arc<CudaStream>) -> Self {
43        Self {
44            inner: Arc::new(PerActorInner::Shared { stream }),
45        }
46    }
47
48    /// F2 default: each `acquire()` mints a fresh stream on the given
49    /// context. Tracks weak refs to the minted streams for diagnostics.
50    pub fn with_context(ctx: Arc<CudaContext>) -> Self {
51        Self {
52            inner: Arc::new(PerActorInner::Fresh {
53                ctx,
54                minted: parking_lot::Mutex::new(Vec::new()),
55            }),
56        }
57    }
58
59    /// Number of live streams this allocator has minted (Fresh mode
60    /// only). Returns 1 for Shared mode.
61    pub fn live_streams(&self) -> usize {
62        match self.inner.as_ref() {
63            PerActorInner::Shared { .. } => 1,
64            PerActorInner::Fresh { minted, .. } => {
65                let mut g = minted.lock();
66                g.retain(|w| w.strong_count() > 0);
67                g.len()
68            }
69        }
70    }
71}
72
73impl StreamAllocator for PerActorAllocator {
74    fn acquire(&self, _hints: ActorHints) -> Arc<CudaStream> {
75        match self.inner.as_ref() {
76            PerActorInner::Shared { stream } => stream.clone(),
77            PerActorInner::Fresh { ctx, minted } => {
78                // cudarc 0.19 doesn't expose stream priority at the
79                // safe layer; we ignore `_hints.priority` for now.
80                let s = ctx
81                    .new_stream()
82                    .unwrap_or_else(|e| panic!("ContextPoisoned: new_stream: {e}"));
83                minted.lock().push(Arc::downgrade(&s));
84                s
85            }
86        }
87    }
88}