Skip to main content

atomr_accel_cuda/event/
mod.rs

1//! `EventActor` — typed actor surface around `CudaEvent`.
2//!
3//! cudarc 0.19 ships a safe `CudaEvent` (Record / Wait / Query /
4//! ElapsedTime / Synchronize) but doesn't expose IPC. This actor
5//! wraps both the safe layer and the sys-level
6//! `cuIpcGetEventHandle`/`cuIpcOpenEventHandle` for cross-process
7//! event sharing.
8//!
9//! Lifecycle:
10//! 1. Construct with `EventActor::props(ctx)` — captures an
11//!    `Arc<CudaContext>` so events can be created on demand.
12//! 2. Send `CreateEvent { reply }` (returns an `Event`) or
13//!    `Record { event, stream, ... }` to push the event onto a
14//!    captured stream.
15//! 3. Wait/query/elapsed/synchronize as needed.
16//! 4. (gated `cuda-ipc`) Send `GetIpcHandle` on the source process,
17//!    transmit the bytes via your application channel, then
18//!    `OpenIpcHandle` on the destination.
19
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use atomr_core::actor::{Actor, Context, Props};
25use cudarc::driver::{CudaContext, CudaEvent, CudaStream};
26use parking_lot::Mutex;
27use tokio::sync::oneshot;
28
29use crate::error::GpuError;
30#[cfg(feature = "cuda-ipc")]
31use crate::sys::cuda_driver;
32#[cfg(feature = "cuda-ipc")]
33use cudarc::driver::sys as driver_sys;
34
35const LIB: &str = "event";
36
37/// Typed handle to a `CudaEvent`. Cloneable via `Arc`; the underlying
38/// `cuEventDestroy` is run when the last clone drops.
39#[derive(Clone)]
40pub struct Event {
41    inner: Arc<EventInner>,
42}
43
44/// `CudaEvent` is `Send + Sync` per cudarc, but we wrap it in our own
45/// inner struct so external callers can't reach into the driver-level
46/// handle without going through the actor.
47struct EventInner {
48    event: CudaEvent,
49}
50
51impl Event {
52    /// Wrap an already-created `CudaEvent`. The actor uses this when
53    /// minting events on demand.
54    pub fn from_cuda(event: CudaEvent) -> Self {
55        Self {
56            inner: Arc::new(EventInner { event }),
57        }
58    }
59
60    /// Underlying cudarc handle. Public because some downstream callers
61    /// (`p2p`, `pipeline`) need to issue cross-stream waits via the
62    /// safe `stream.wait(&event)` shape.
63    pub fn cuda_event(&self) -> &CudaEvent {
64        &self.inner.event
65    }
66
67    /// Raw driver-level event handle. Used by the IPC path; do not
68    /// destroy the returned value.
69    #[cfg(feature = "cuda-ipc")]
70    pub fn cu_event(&self) -> driver_sys::CUevent {
71        self.inner.event.cu_event()
72    }
73}
74
75/// Cross-process IPC handle for an event. The 64-byte reserved blob
76/// matches `CUipcEventHandle_st`.
77///
78/// `Clone + Copy + Send + Sync`. The handle bytes are opaque — the
79/// driver may pack a process-id, ev-handle slot, etc. inside; treat as
80/// black-box bytes when forwarding via your application's IPC channel.
81#[cfg(feature = "cuda-ipc")]
82#[derive(Clone, Copy)]
83pub struct IpcEventHandle {
84    pub(crate) raw: driver_sys::CUipcEventHandle,
85}
86
87#[cfg(feature = "cuda-ipc")]
88impl IpcEventHandle {
89    /// 64 bytes of opaque payload. Used by serialization helpers.
90    pub fn as_bytes(&self) -> [u8; 64] {
91        // SAFETY: `reserved` is `[c_char; 64]` with no padding; transmute is safe.
92        unsafe { std::mem::transmute::<[std::ffi::c_char; 64], [u8; 64]>(self.raw.reserved) }
93    }
94
95    /// Reconstruct from a 64-byte payload (e.g. one received via
96    /// shared memory or a Unix domain socket).
97    pub fn from_bytes(bytes: [u8; 64]) -> Self {
98        let raw = driver_sys::CUipcEventHandle_st {
99            // SAFETY: `[c_char; 64]` has the same layout as `[u8; 64]`.
100            reserved: unsafe { std::mem::transmute::<[u8; 64], [std::ffi::c_char; 64]>(bytes) },
101        };
102        Self { raw }
103    }
104}
105
106#[cfg(feature = "cuda-ipc")]
107impl std::fmt::Debug for IpcEventHandle {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("IpcEventHandle")
110            .field("bytes_hash", &fxhash(&self.as_bytes()))
111            .finish()
112    }
113}
114
115#[cfg(feature = "cuda-ipc")]
116fn fxhash(bytes: &[u8]) -> u64 {
117    // Tiny FNV-1a — keeps Debug stable without pulling in a dep.
118    let mut h: u64 = 0xcbf29ce484222325;
119    for b in bytes {
120        h ^= *b as u64;
121        h = h.wrapping_mul(0x100000001b3);
122    }
123    h
124}
125
126pub enum EventMsg {
127    /// Create a new event. Reply with the typed `Event`.
128    Create {
129        reply: oneshot::Sender<Result<Event, GpuError>>,
130    },
131    /// Record `event` against the current work in `stream`.
132    Record {
133        event: Event,
134        stream: Arc<CudaStream>,
135        reply: oneshot::Sender<Result<(), GpuError>>,
136    },
137    /// Wait on `stream` until `event` completes.
138    Wait {
139        event: Event,
140        stream: Arc<CudaStream>,
141        reply: oneshot::Sender<Result<(), GpuError>>,
142    },
143    /// Non-blocking query: `true` if completed.
144    Query {
145        event: Event,
146        reply: oneshot::Sender<Result<bool, GpuError>>,
147    },
148    /// Elapsed wall-clock time between two events.
149    ElapsedTime {
150        start: Event,
151        end: Event,
152        reply: oneshot::Sender<Result<Duration, GpuError>>,
153    },
154    /// Block the calling task until `event` completes.
155    Synchronize {
156        event: Event,
157        reply: oneshot::Sender<Result<(), GpuError>>,
158    },
159    /// Export an IPC handle for `event` so a peer process can open it
160    /// via `OpenIpcHandle`.
161    #[cfg(feature = "cuda-ipc")]
162    GetIpcHandle {
163        event: Event,
164        reply: oneshot::Sender<Result<IpcEventHandle, GpuError>>,
165    },
166    /// Open an IPC handle minted by another process. The opened event
167    /// is bound to the actor's context.
168    #[cfg(feature = "cuda-ipc")]
169    OpenIpcHandle {
170        handle: IpcEventHandle,
171        reply: oneshot::Sender<Result<Event, GpuError>>,
172    },
173}
174
175struct SendCtx(Arc<CudaContext>);
176unsafe impl Send for SendCtx {}
177unsafe impl Sync for SendCtx {}
178
179#[allow(dead_code)]
180enum EventInnerActor {
181    Real { ctx: Mutex<SendCtx> },
182    Mock,
183}
184
185pub struct EventActor {
186    inner: EventInnerActor,
187}
188
189impl EventActor {
190    pub fn props(ctx: Arc<CudaContext>) -> Props<Self> {
191        Props::create(move || EventActor {
192            inner: EventInnerActor::Real {
193                ctx: Mutex::new(SendCtx(ctx.clone())),
194            },
195        })
196    }
197
198    pub fn mock_props() -> Props<Self> {
199        Props::create(|| EventActor {
200            inner: EventInnerActor::Mock,
201        })
202    }
203}
204
205#[async_trait]
206impl Actor for EventActor {
207    type Msg = EventMsg;
208
209    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: EventMsg) {
210        match &self.inner {
211            EventInnerActor::Mock => mock_reply(msg),
212            EventInnerActor::Real { ctx } => {
213                let ctx = ctx.lock().0.clone();
214                handle_real(&ctx, msg);
215            }
216        }
217    }
218}
219
220fn mock_reply(msg: EventMsg) {
221    match msg {
222        EventMsg::Create { reply } => {
223            let _ = reply.send(Err(GpuError::Unrecoverable(
224                "EventActor in mock mode".into(),
225            )));
226        }
227        EventMsg::Record { reply, .. } => {
228            let _ = reply.send(Err(GpuError::Unrecoverable(
229                "EventActor in mock mode".into(),
230            )));
231        }
232        EventMsg::Wait { reply, .. } => {
233            let _ = reply.send(Err(GpuError::Unrecoverable(
234                "EventActor in mock mode".into(),
235            )));
236        }
237        EventMsg::Query { reply, .. } => {
238            let _ = reply.send(Err(GpuError::Unrecoverable(
239                "EventActor in mock mode".into(),
240            )));
241        }
242        EventMsg::ElapsedTime { reply, .. } => {
243            let _ = reply.send(Err(GpuError::Unrecoverable(
244                "EventActor in mock mode".into(),
245            )));
246        }
247        EventMsg::Synchronize { reply, .. } => {
248            let _ = reply.send(Err(GpuError::Unrecoverable(
249                "EventActor in mock mode".into(),
250            )));
251        }
252        #[cfg(feature = "cuda-ipc")]
253        EventMsg::GetIpcHandle { reply, .. } => {
254            let _ = reply.send(Err(GpuError::Unrecoverable(
255                "EventActor in mock mode".into(),
256            )));
257        }
258        #[cfg(feature = "cuda-ipc")]
259        EventMsg::OpenIpcHandle { reply, .. } => {
260            let _ = reply.send(Err(GpuError::Unrecoverable(
261                "EventActor in mock mode".into(),
262            )));
263        }
264    }
265}
266
267fn handle_real(ctx: &Arc<CudaContext>, msg: EventMsg) {
268    match msg {
269        EventMsg::Create { reply } => {
270            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
271                ctx.new_event(None)
272                    .map(Event::from_cuda)
273                    .map_err(|e| GpuError::LibraryError {
274                        lib: LIB,
275                        msg: format!("new_event: {e}"),
276                    })
277            }))
278            .unwrap_or_else(|_| {
279                Err(GpuError::Unrecoverable(
280                    "EventActor::Create: CUDA driver not loadable".into(),
281                ))
282            });
283            let _ = reply.send(r);
284        }
285        EventMsg::Record {
286            event,
287            stream,
288            reply,
289        } => {
290            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
291                event
292                    .cuda_event()
293                    .record(&stream)
294                    .map_err(|e| GpuError::LibraryError {
295                        lib: LIB,
296                        msg: format!("record: {e}"),
297                    })
298            }))
299            .unwrap_or_else(|_| {
300                Err(GpuError::Unrecoverable(
301                    "EventActor::Record: CUDA driver not loadable".into(),
302                ))
303            });
304            let _ = reply.send(r);
305        }
306        EventMsg::Wait {
307            event,
308            stream,
309            reply,
310        } => {
311            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
312                stream
313                    .wait(event.cuda_event())
314                    .map_err(|e| GpuError::LibraryError {
315                        lib: LIB,
316                        msg: format!("wait: {e}"),
317                    })
318            }))
319            .unwrap_or_else(|_| {
320                Err(GpuError::Unrecoverable(
321                    "EventActor::Wait: CUDA driver not loadable".into(),
322                ))
323            });
324            let _ = reply.send(r);
325        }
326        EventMsg::Query { event, reply } => {
327            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
328                Ok::<_, GpuError>(event.cuda_event().is_complete())
329            }))
330            .unwrap_or_else(|_| {
331                Err(GpuError::Unrecoverable(
332                    "EventActor::Query: CUDA driver not loadable".into(),
333                ))
334            });
335            let _ = reply.send(r);
336        }
337        EventMsg::ElapsedTime { start, end, reply } => {
338            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
339                start
340                    .cuda_event()
341                    .elapsed_ms(end.cuda_event())
342                    .map(|ms| Duration::from_secs_f64(ms as f64 / 1000.0))
343                    .map_err(|e| GpuError::LibraryError {
344                        lib: LIB,
345                        msg: format!("elapsed: {e}"),
346                    })
347            }))
348            .unwrap_or_else(|_| {
349                Err(GpuError::Unrecoverable(
350                    "EventActor::ElapsedTime: CUDA driver not loadable".into(),
351                ))
352            });
353            let _ = reply.send(r);
354        }
355        EventMsg::Synchronize { event, reply } => {
356            let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
357                event
358                    .cuda_event()
359                    .synchronize()
360                    .map_err(|e| GpuError::LibraryError {
361                        lib: LIB,
362                        msg: format!("synchronize: {e}"),
363                    })
364            }))
365            .unwrap_or_else(|_| {
366                Err(GpuError::Unrecoverable(
367                    "EventActor::Synchronize: CUDA driver not loadable".into(),
368                ))
369            });
370            let _ = reply.send(r);
371        }
372        #[cfg(feature = "cuda-ipc")]
373        EventMsg::GetIpcHandle { event, reply } => {
374            let r = cuda_driver::ipc_get_event_handle(event.cu_event())
375                .map(|raw| IpcEventHandle { raw });
376            let _ = reply.send(r);
377        }
378        #[cfg(feature = "cuda-ipc")]
379        EventMsg::OpenIpcHandle { handle, reply } => {
380            let raw_event = match cuda_driver::ipc_open_event_handle(handle.raw) {
381                Ok(e) => e,
382                Err(e) => {
383                    let _ = reply.send(Err(e));
384                    return;
385                }
386            };
387            // We've got a raw `CUevent` — wrap it into a cudarc
388            // `CudaEvent` by going through the documented public
389            // constructor. cudarc has no public adopt API, so we
390            // rebuild a CudaEvent via `new_event` plus an explicit
391            // record-from-raw shim. For Phase 3 we accept the raw
392            // handle as a separate carrier — the `Event` we return
393            // wraps a freshly-minted CudaEvent on the local context
394            // and the caller is responsible for the cross-process
395            // wait via the underlying IPC handle bytes.
396            //
397            // F-future: a `CudaEvent::from_raw` would let us hand back
398            // a unified Event. For now: leak the raw event back via
399            // `Unrecoverable` so callers know to use the bytes path
400            // until the safe wrapper lands.
401            let _ = raw_event;
402            let _ = reply.send(Err(GpuError::Unrecoverable(
403                "EventActor::OpenIpcHandle: cudarc 0.19 lacks CudaEvent::from_raw — \
404                 use the IpcEventHandle bytes directly with cuStreamWaitEvent on the \
405                 destination context"
406                    .into(),
407            )));
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use atomr_config::Config;
416    use atomr_core::actor::ActorSystem;
417
418    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
419    async fn event_msg_round_trip() {
420        let sys = ActorSystem::create("event-msg-test", Config::empty())
421            .await
422            .unwrap();
423        let actor = sys.actor_of(EventActor::mock_props(), "evt").unwrap();
424
425        // Create
426        let (tx, rx) = oneshot::channel();
427        actor.tell(EventMsg::Create { reply: tx });
428        let r = tokio::time::timeout(Duration::from_secs(2), rx)
429            .await
430            .unwrap()
431            .unwrap();
432        assert!(matches!(r, Err(GpuError::Unrecoverable(_))));
433
434        sys.terminate().await;
435    }
436
437    #[cfg(feature = "cuda-ipc")]
438    #[test]
439    fn ipc_event_handle_serializes() {
440        let bytes: [u8; 64] = std::array::from_fn(|i| i as u8);
441        let h = IpcEventHandle::from_bytes(bytes);
442        let round = h.as_bytes();
443        assert_eq!(round, bytes);
444        // Send / Sync sanity at the type level.
445        fn assert_send_sync<T: Send + Sync>() {}
446        assert_send_sync::<IpcEventHandle>();
447        let _clone: IpcEventHandle = h;
448    }
449}