Skip to main content

atomr_accel_cuda/host/
pinned.rs

1//! `PinnedBufferPool` actor + `PinnedBuf<T>` handle.
2//!
3//! Lifecycle:
4//! 1. Construct the pool actor with [`PinnedBufferPool::props`].
5//! 2. Send [`PinnedPoolMsg::Acquire { len_bytes, reply }`] — actor
6//!    pops from the free-list (or grows up to `max_buffers`) and
7//!    replies with a [`PinnedBuf<T>`] handle whose Drop sends an
8//!    `InternalReturn` back to the pool.
9//! 3. Use the buffer: `as_mut_slice()` for fill, then move into a
10//!    `DeviceMsg::CopyToDeviceT` / `CopyToHostT` call.
11//!
12//! cudarc 0.19 only exposes raw [`cudarc::driver::result::malloc_host`]
13//! and [`cudarc::driver::result::free_host`]; no safe wrapper. We
14//! build [`PinnedSlot`] around the raw pointer + capacity + Drop and
15//! treat it as the moral equivalent of `Box<[u8]>` over pinned
16//! memory.
17
18use std::collections::VecDeque;
19use std::ffi::c_void;
20use std::marker::PhantomData;
21
22use async_trait::async_trait;
23use atomr_core::actor::{Actor, ActorRef, Context, Props};
24use tokio::sync::{mpsc, oneshot};
25use tracing::{debug, warn};
26
27use crate::error::GpuError;
28
29#[derive(Debug, Clone, Copy)]
30pub struct PinnedBufferPoolConfig {
31    pub initial_buffers: usize,
32    pub max_buffers: usize,
33    pub buffer_capacity_bytes: usize,
34    /// If true, requests larger than `buffer_capacity_bytes` get a
35    /// one-shot oversize allocation that's freed (not pooled) on
36    /// release. If false, oversize requests fail.
37    pub allow_oversize: bool,
38}
39
40impl Default for PinnedBufferPoolConfig {
41    fn default() -> Self {
42        Self {
43            initial_buffers: 4,
44            max_buffers: 32,
45            buffer_capacity_bytes: 4 * 1024 * 1024,
46            allow_oversize: true,
47        }
48    }
49}
50
51#[derive(Debug, Clone, Copy)]
52pub struct PinnedPoolStats {
53    pub in_use: usize,
54    pub free: usize,
55    pub total: usize,
56    pub bytes_allocated: usize,
57}
58
59/// Internal slot — owns the raw `cuMemHostAlloc`'d region.
60pub struct PinnedSlot {
61    ptr: *mut c_void,
62    capacity_bytes: usize,
63    /// True if this slot was minted as an oversize one-shot; freed
64    /// rather than returned to the pool on release.
65    oversize: bool,
66}
67
68// SAFETY: pinned memory is host RAM. Sending the raw pointer between
69// threads is safe; dereferencing requires care which we constrain to
70// the actor + the holder of `PinnedBuf<T>`.
71unsafe impl Send for PinnedSlot {}
72unsafe impl Sync for PinnedSlot {}
73
74impl PinnedSlot {
75    fn new(capacity_bytes: usize, oversize: bool) -> Result<Self, GpuError> {
76        // `cuMemHostAlloc` flags: 0 = default (portable across devices
77        // off, write-combined off, mapped off). Sufficient for
78        // standard async memcpy.
79        let ptr = unsafe { cudarc::driver::result::malloc_host(capacity_bytes, 0) }
80            .map_err(|e| GpuError::OutOfMemory(format!("pinned alloc {capacity_bytes}B: {e}")))?;
81        Ok(Self {
82            ptr,
83            capacity_bytes,
84            oversize,
85        })
86    }
87
88    fn free(self) {
89        // Drop the slot; the Drop impl below frees the pinned memory.
90        drop(self);
91    }
92}
93
94impl Drop for PinnedSlot {
95    fn drop(&mut self) {
96        if !self.ptr.is_null() {
97            unsafe {
98                let _ = cudarc::driver::result::free_host(self.ptr);
99            }
100            self.ptr = std::ptr::null_mut();
101        }
102    }
103}
104
105/// Generation token for pool-level invalidation. Bumped on actor
106/// restart so that surviving `PinnedBuf<T>` instances cannot
107/// accidentally return into a fresh pool.
108type PinnedGeneration = u64;
109
110/// Public messages.
111pub enum PinnedPoolMsg {
112    Acquire {
113        len_bytes: usize,
114        reply: oneshot::Sender<Result<PinnedBufHandle, GpuError>>,
115    },
116    /// Drop-driven return path used by `PinnedBuf::Drop`. Not part of
117    /// the public API for callers.
118    InternalReturn {
119        slot: PinnedSlot,
120        generation: PinnedGeneration,
121    },
122    Stats {
123        reply: oneshot::Sender<PinnedPoolStats>,
124    },
125}
126
127/// Untyped handle returned by `Acquire`. Convert to a typed
128/// [`PinnedBuf<T>`] via [`PinnedBufHandle::into_typed`].
129pub struct PinnedBufHandle {
130    slot: Option<PinnedSlot>,
131    generation: PinnedGeneration,
132    return_tx: mpsc::UnboundedSender<PinnedPoolMsg>,
133}
134
135impl PinnedBufHandle {
136    pub fn capacity_bytes(&self) -> usize {
137        self.slot.as_ref().map(|s| s.capacity_bytes).unwrap_or(0)
138    }
139
140    /// Convert to a typed buffer. `len` is the number of `T`s the
141    /// caller intends to use; must satisfy
142    /// `len * size_of::<T>() <= capacity_bytes`.
143    pub fn into_typed<T>(mut self, len: usize) -> Result<PinnedBuf<T>, GpuError> {
144        let needed = len.checked_mul(std::mem::size_of::<T>()).ok_or_else(|| {
145            GpuError::Unrecoverable("pinned buf: len * size_of overflowed".into())
146        })?;
147        if needed > self.capacity_bytes() {
148            return Err(GpuError::Unrecoverable(format!(
149                "pinned buf: requested {len} elements ({needed} B) exceeds capacity {} B",
150                self.capacity_bytes()
151            )));
152        }
153        let slot = self.slot.take().expect("PinnedBufHandle slot was None");
154        let ptr = slot.ptr as *mut T;
155        Ok(PinnedBuf {
156            inner: Some(PinnedBufInner {
157                slot,
158                len,
159                return_tx: self.return_tx.clone(),
160                generation: self.generation,
161            }),
162            ptr,
163            len,
164            _marker: PhantomData,
165        })
166    }
167}
168
169impl Drop for PinnedBufHandle {
170    fn drop(&mut self) {
171        // If into_typed wasn't called, return the slot ourselves.
172        if let Some(slot) = self.slot.take() {
173            let _ = self.return_tx.send(PinnedPoolMsg::InternalReturn {
174                slot,
175                generation: self.generation,
176            });
177        }
178    }
179}
180
181/// Typed pinned buffer.
182///
183/// Send + Sync — the raw pointer is page-locked host memory; the
184/// actor + this handle are the only writers, and the pool's
185/// generation gate prevents post-restart aliasing.
186pub struct PinnedBuf<T> {
187    inner: Option<PinnedBufInner>,
188    ptr: *mut T,
189    len: usize,
190    _marker: PhantomData<T>,
191}
192
193struct PinnedBufInner {
194    slot: PinnedSlot,
195    #[allow(dead_code)]
196    len: usize,
197    return_tx: mpsc::UnboundedSender<PinnedPoolMsg>,
198    generation: PinnedGeneration,
199}
200
201unsafe impl<T: Send> Send for PinnedBuf<T> {}
202unsafe impl<T: Sync> Sync for PinnedBuf<T> {}
203
204impl<T> PinnedBuf<T> {
205    pub fn len(&self) -> usize {
206        self.len
207    }
208
209    pub fn is_empty(&self) -> bool {
210        self.len == 0
211    }
212
213    pub fn as_ptr(&self) -> *const T {
214        self.ptr
215    }
216
217    pub fn as_mut_ptr(&mut self) -> *mut T {
218        self.ptr
219    }
220
221    /// View as a host slice. SAFETY: relies on the buffer being
222    /// initialized for the requested length. For zero-init you must
223    /// fill before reading.
224    pub fn as_slice(&self) -> &[T] {
225        // SAFETY: ptr was returned by cuMemHostAlloc and the slot
226        // owns it for at least the lifetime of `self`. len matches
227        // the typed view we requested in into_typed.
228        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
229    }
230
231    pub fn as_mut_slice(&mut self) -> &mut [T] {
232        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
233    }
234}
235
236impl<T: std::fmt::Debug> std::fmt::Debug for PinnedBuf<T> {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.debug_struct("PinnedBuf").field("len", &self.len).finish()
239    }
240}
241
242impl<T> Drop for PinnedBuf<T> {
243    fn drop(&mut self) {
244        if let Some(inner) = self.inner.take() {
245            let _ = inner.return_tx.send(PinnedPoolMsg::InternalReturn {
246                slot: inner.slot,
247                generation: inner.generation,
248            });
249        }
250    }
251}
252
253/// The pool actor.
254pub struct PinnedBufferPool {
255    config: PinnedBufferPoolConfig,
256    free: VecDeque<PinnedSlot>,
257    in_use: usize,
258    total_minted: usize,
259    bytes_allocated: usize,
260    /// Bumped on `pre_restart`. Slots returning with an old
261    /// generation are dropped instead of pooled.
262    generation: PinnedGeneration,
263    /// Sender used by `PinnedBuf::Drop` to post `InternalReturn`. We
264    /// keep one mpsc that funnels into `handle()` — the actor's own
265    /// mailbox is async-trait `tell`-only, but mpsc gives us
266    /// non-blocking sync sends from `Drop`.
267    return_tx: mpsc::UnboundedSender<PinnedPoolMsg>,
268    return_rx_observer: Option<ActorRef<PinnedPoolMsg>>,
269}
270
271impl PinnedBufferPool {
272    pub fn props(config: PinnedBufferPoolConfig) -> Props<Self> {
273        Props::create(move || {
274            let (tx, _rx) = mpsc::unbounded_channel();
275            // Initial buffers are minted lazily on first acquire; in
276            // F2 we trade a tiny first-acquire latency for simpler
277            // post_restart handling.
278            PinnedBufferPool {
279                config,
280                free: VecDeque::new(),
281                in_use: 0,
282                total_minted: 0,
283                bytes_allocated: 0,
284                generation: 0,
285                return_tx: tx,
286                return_rx_observer: None,
287            }
288        })
289    }
290
291    /// Test/diagnostic: snapshot stats without going through the
292    /// mailbox.
293    pub fn stats(&self) -> PinnedPoolStats {
294        PinnedPoolStats {
295            in_use: self.in_use,
296            free: self.free.len(),
297            total: self.total_minted,
298            bytes_allocated: self.bytes_allocated,
299        }
300    }
301
302    fn try_acquire(&mut self, len_bytes: usize) -> Result<PinnedBufHandle, GpuError> {
303        let cap = self.config.buffer_capacity_bytes;
304        let oversize = len_bytes > cap;
305
306        let slot = if oversize {
307            if !self.config.allow_oversize {
308                return Err(GpuError::OutOfMemory(format!(
309                    "pinned pool: oversize request {len_bytes}B exceeds slot capacity {cap}B"
310                )));
311            }
312            // One-shot allocation, freed on release.
313            self.bytes_allocated += len_bytes;
314            self.total_minted += 1;
315            PinnedSlot::new(len_bytes, true)?
316        } else if let Some(slot) = self.free.pop_front() {
317            slot
318        } else {
319            if self.total_minted >= self.config.max_buffers {
320                return Err(GpuError::OutOfMemory(format!(
321                    "pinned pool: max_buffers={} reached",
322                    self.config.max_buffers
323                )));
324            }
325            self.bytes_allocated += cap;
326            self.total_minted += 1;
327            PinnedSlot::new(cap, false)?
328        };
329
330        self.in_use += 1;
331        Ok(PinnedBufHandle {
332            slot: Some(slot),
333            generation: self.generation,
334            return_tx: self.return_tx.clone(),
335        })
336    }
337
338    fn return_slot(&mut self, slot: PinnedSlot, generation: PinnedGeneration) {
339        if generation != self.generation {
340            // Cross-generation return. Drop instead of pool to avoid
341            // mixing. The Drop impl on PinnedSlot frees the memory.
342            self.bytes_allocated = self.bytes_allocated.saturating_sub(slot.capacity_bytes);
343            self.total_minted = self.total_minted.saturating_sub(1);
344            slot.free();
345            return;
346        }
347        self.in_use = self.in_use.saturating_sub(1);
348        if slot.oversize {
349            // Don't pool oversize allocations.
350            self.bytes_allocated = self.bytes_allocated.saturating_sub(slot.capacity_bytes);
351            self.total_minted = self.total_minted.saturating_sub(1);
352            slot.free();
353        } else {
354            self.free.push_back(slot);
355        }
356    }
357}
358
359#[async_trait]
360impl Actor for PinnedBufferPool {
361    type Msg = PinnedPoolMsg;
362
363    async fn pre_start(&mut self, ctx: &mut Context<Self>) {
364        // Wire the mpsc → actor pump so Drop-driven returns funnel
365        // into `handle()`. We replace the sender with a fresh one and
366        // start a forwarder task that bridges the receiver into the
367        // actor's mailbox.
368        let (tx, mut rx) = mpsc::unbounded_channel::<PinnedPoolMsg>();
369        self.return_tx = tx;
370        let self_ref = ctx.self_ref().clone();
371        self.return_rx_observer = Some(self_ref.clone());
372        tokio::spawn(async move {
373            while let Some(msg) = rx.recv().await {
374                self_ref.tell(msg);
375            }
376        });
377        debug!(
378            initial = self.config.initial_buffers,
379            max = self.config.max_buffers,
380            cap = self.config.buffer_capacity_bytes,
381            "PinnedBufferPool started"
382        );
383    }
384
385    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: PinnedPoolMsg) {
386        match msg {
387            PinnedPoolMsg::Acquire { len_bytes, reply } => {
388                let r = self.try_acquire(len_bytes);
389                let _ = reply.send(r);
390            }
391            PinnedPoolMsg::InternalReturn { slot, generation } => {
392                self.return_slot(slot, generation);
393            }
394            PinnedPoolMsg::Stats { reply } => {
395                let _ = reply.send(self.stats());
396            }
397        }
398    }
399
400    async fn pre_restart(&mut self, _ctx: &mut Context<Self>, err: &str) {
401        warn!(%err, "PinnedBufferPool pre_restart — dropping all in-flight buffers");
402        // Drop the free-list. Slots that haven't returned yet will
403        // arrive with the pre-restart generation and be dropped (not
404        // pooled) by `return_slot`.
405        self.free.clear();
406        self.generation += 1;
407        self.in_use = 0;
408        self.total_minted = 0;
409        self.bytes_allocated = 0;
410    }
411
412    async fn post_stop(&mut self, _ctx: &mut Context<Self>) {
413        debug!("PinnedBufferPool post_stop");
414        // Slots in `self.free` drop here, freeing pinned memory via
415        // PinnedSlot::Drop.
416    }
417}