1use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use atomr_core::actor::{Actor, Context, Props};
25use cudarc::driver::{CudaContext, CudaEvent, CudaStream};
26use parking_lot::Mutex;
27use tokio::sync::oneshot;
28
29use crate::error::GpuError;
30#[cfg(feature = "cuda-ipc")]
31use crate::sys::cuda_driver;
32#[cfg(feature = "cuda-ipc")]
33use cudarc::driver::sys as driver_sys;
34
35const LIB: &str = "event";
36
37#[derive(Clone)]
40pub struct Event {
41 inner: Arc<EventInner>,
42}
43
44struct EventInner {
48 event: CudaEvent,
49}
50
51impl Event {
52 pub fn from_cuda(event: CudaEvent) -> Self {
55 Self {
56 inner: Arc::new(EventInner { event }),
57 }
58 }
59
60 pub fn cuda_event(&self) -> &CudaEvent {
64 &self.inner.event
65 }
66
67 #[cfg(feature = "cuda-ipc")]
70 pub fn cu_event(&self) -> driver_sys::CUevent {
71 self.inner.event.cu_event()
72 }
73}
74
75#[cfg(feature = "cuda-ipc")]
82#[derive(Clone, Copy)]
83pub struct IpcEventHandle {
84 pub(crate) raw: driver_sys::CUipcEventHandle,
85}
86
87#[cfg(feature = "cuda-ipc")]
88impl IpcEventHandle {
89 pub fn as_bytes(&self) -> [u8; 64] {
91 unsafe { std::mem::transmute::<[std::ffi::c_char; 64], [u8; 64]>(self.raw.reserved) }
93 }
94
95 pub fn from_bytes(bytes: [u8; 64]) -> Self {
98 let raw = driver_sys::CUipcEventHandle_st {
99 reserved: unsafe { std::mem::transmute::<[u8; 64], [std::ffi::c_char; 64]>(bytes) },
101 };
102 Self { raw }
103 }
104}
105
106#[cfg(feature = "cuda-ipc")]
107impl std::fmt::Debug for IpcEventHandle {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("IpcEventHandle")
110 .field("bytes_hash", &fxhash(&self.as_bytes()))
111 .finish()
112 }
113}
114
115#[cfg(feature = "cuda-ipc")]
116fn fxhash(bytes: &[u8]) -> u64 {
117 let mut h: u64 = 0xcbf29ce484222325;
119 for b in bytes {
120 h ^= *b as u64;
121 h = h.wrapping_mul(0x100000001b3);
122 }
123 h
124}
125
126pub enum EventMsg {
127 Create {
129 reply: oneshot::Sender<Result<Event, GpuError>>,
130 },
131 Record {
133 event: Event,
134 stream: Arc<CudaStream>,
135 reply: oneshot::Sender<Result<(), GpuError>>,
136 },
137 Wait {
139 event: Event,
140 stream: Arc<CudaStream>,
141 reply: oneshot::Sender<Result<(), GpuError>>,
142 },
143 Query {
145 event: Event,
146 reply: oneshot::Sender<Result<bool, GpuError>>,
147 },
148 ElapsedTime {
150 start: Event,
151 end: Event,
152 reply: oneshot::Sender<Result<Duration, GpuError>>,
153 },
154 Synchronize {
156 event: Event,
157 reply: oneshot::Sender<Result<(), GpuError>>,
158 },
159 #[cfg(feature = "cuda-ipc")]
162 GetIpcHandle {
163 event: Event,
164 reply: oneshot::Sender<Result<IpcEventHandle, GpuError>>,
165 },
166 #[cfg(feature = "cuda-ipc")]
169 OpenIpcHandle {
170 handle: IpcEventHandle,
171 reply: oneshot::Sender<Result<Event, GpuError>>,
172 },
173}
174
175struct SendCtx(Arc<CudaContext>);
176unsafe impl Send for SendCtx {}
177unsafe impl Sync for SendCtx {}
178
179#[allow(dead_code)]
180enum EventInnerActor {
181 Real { ctx: Mutex<SendCtx> },
182 Mock,
183}
184
185pub struct EventActor {
186 inner: EventInnerActor,
187}
188
189impl EventActor {
190 pub fn props(ctx: Arc<CudaContext>) -> Props<Self> {
191 Props::create(move || EventActor {
192 inner: EventInnerActor::Real {
193 ctx: Mutex::new(SendCtx(ctx.clone())),
194 },
195 })
196 }
197
198 pub fn mock_props() -> Props<Self> {
199 Props::create(|| EventActor {
200 inner: EventInnerActor::Mock,
201 })
202 }
203}
204
205#[async_trait]
206impl Actor for EventActor {
207 type Msg = EventMsg;
208
209 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: EventMsg) {
210 match &self.inner {
211 EventInnerActor::Mock => mock_reply(msg),
212 EventInnerActor::Real { ctx } => {
213 let ctx = ctx.lock().0.clone();
214 handle_real(&ctx, msg);
215 }
216 }
217 }
218}
219
220fn mock_reply(msg: EventMsg) {
221 match msg {
222 EventMsg::Create { reply } => {
223 let _ = reply.send(Err(GpuError::Unrecoverable(
224 "EventActor in mock mode".into(),
225 )));
226 }
227 EventMsg::Record { reply, .. } => {
228 let _ = reply.send(Err(GpuError::Unrecoverable(
229 "EventActor in mock mode".into(),
230 )));
231 }
232 EventMsg::Wait { reply, .. } => {
233 let _ = reply.send(Err(GpuError::Unrecoverable(
234 "EventActor in mock mode".into(),
235 )));
236 }
237 EventMsg::Query { reply, .. } => {
238 let _ = reply.send(Err(GpuError::Unrecoverable(
239 "EventActor in mock mode".into(),
240 )));
241 }
242 EventMsg::ElapsedTime { reply, .. } => {
243 let _ = reply.send(Err(GpuError::Unrecoverable(
244 "EventActor in mock mode".into(),
245 )));
246 }
247 EventMsg::Synchronize { reply, .. } => {
248 let _ = reply.send(Err(GpuError::Unrecoverable(
249 "EventActor in mock mode".into(),
250 )));
251 }
252 #[cfg(feature = "cuda-ipc")]
253 EventMsg::GetIpcHandle { reply, .. } => {
254 let _ = reply.send(Err(GpuError::Unrecoverable(
255 "EventActor in mock mode".into(),
256 )));
257 }
258 #[cfg(feature = "cuda-ipc")]
259 EventMsg::OpenIpcHandle { reply, .. } => {
260 let _ = reply.send(Err(GpuError::Unrecoverable(
261 "EventActor in mock mode".into(),
262 )));
263 }
264 }
265}
266
267fn handle_real(ctx: &Arc<CudaContext>, msg: EventMsg) {
268 match msg {
269 EventMsg::Create { reply } => {
270 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
271 ctx.new_event(None)
272 .map(Event::from_cuda)
273 .map_err(|e| GpuError::LibraryError {
274 lib: LIB,
275 msg: format!("new_event: {e}"),
276 })
277 }))
278 .unwrap_or_else(|_| {
279 Err(GpuError::Unrecoverable(
280 "EventActor::Create: CUDA driver not loadable".into(),
281 ))
282 });
283 let _ = reply.send(r);
284 }
285 EventMsg::Record {
286 event,
287 stream,
288 reply,
289 } => {
290 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
291 event
292 .cuda_event()
293 .record(&stream)
294 .map_err(|e| GpuError::LibraryError {
295 lib: LIB,
296 msg: format!("record: {e}"),
297 })
298 }))
299 .unwrap_or_else(|_| {
300 Err(GpuError::Unrecoverable(
301 "EventActor::Record: CUDA driver not loadable".into(),
302 ))
303 });
304 let _ = reply.send(r);
305 }
306 EventMsg::Wait {
307 event,
308 stream,
309 reply,
310 } => {
311 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
312 stream
313 .wait(event.cuda_event())
314 .map_err(|e| GpuError::LibraryError {
315 lib: LIB,
316 msg: format!("wait: {e}"),
317 })
318 }))
319 .unwrap_or_else(|_| {
320 Err(GpuError::Unrecoverable(
321 "EventActor::Wait: CUDA driver not loadable".into(),
322 ))
323 });
324 let _ = reply.send(r);
325 }
326 EventMsg::Query { event, reply } => {
327 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
328 Ok::<_, GpuError>(event.cuda_event().is_complete())
329 }))
330 .unwrap_or_else(|_| {
331 Err(GpuError::Unrecoverable(
332 "EventActor::Query: CUDA driver not loadable".into(),
333 ))
334 });
335 let _ = reply.send(r);
336 }
337 EventMsg::ElapsedTime { start, end, reply } => {
338 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
339 start
340 .cuda_event()
341 .elapsed_ms(end.cuda_event())
342 .map(|ms| Duration::from_secs_f64(ms as f64 / 1000.0))
343 .map_err(|e| GpuError::LibraryError {
344 lib: LIB,
345 msg: format!("elapsed: {e}"),
346 })
347 }))
348 .unwrap_or_else(|_| {
349 Err(GpuError::Unrecoverable(
350 "EventActor::ElapsedTime: CUDA driver not loadable".into(),
351 ))
352 });
353 let _ = reply.send(r);
354 }
355 EventMsg::Synchronize { event, reply } => {
356 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
357 event
358 .cuda_event()
359 .synchronize()
360 .map_err(|e| GpuError::LibraryError {
361 lib: LIB,
362 msg: format!("synchronize: {e}"),
363 })
364 }))
365 .unwrap_or_else(|_| {
366 Err(GpuError::Unrecoverable(
367 "EventActor::Synchronize: CUDA driver not loadable".into(),
368 ))
369 });
370 let _ = reply.send(r);
371 }
372 #[cfg(feature = "cuda-ipc")]
373 EventMsg::GetIpcHandle { event, reply } => {
374 let r = cuda_driver::ipc_get_event_handle(event.cu_event())
375 .map(|raw| IpcEventHandle { raw });
376 let _ = reply.send(r);
377 }
378 #[cfg(feature = "cuda-ipc")]
379 EventMsg::OpenIpcHandle { handle, reply } => {
380 let raw_event = match cuda_driver::ipc_open_event_handle(handle.raw) {
381 Ok(e) => e,
382 Err(e) => {
383 let _ = reply.send(Err(e));
384 return;
385 }
386 };
387 let _ = raw_event;
402 let _ = reply.send(Err(GpuError::Unrecoverable(
403 "EventActor::OpenIpcHandle: cudarc 0.19 lacks CudaEvent::from_raw — \
404 use the IpcEventHandle bytes directly with cuStreamWaitEvent on the \
405 destination context"
406 .into(),
407 )));
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use atomr_config::Config;
416 use atomr_core::actor::ActorSystem;
417
418 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
419 async fn event_msg_round_trip() {
420 let sys = ActorSystem::create("event-msg-test", Config::empty())
421 .await
422 .unwrap();
423 let actor = sys.actor_of(EventActor::mock_props(), "evt").unwrap();
424
425 let (tx, rx) = oneshot::channel();
427 actor.tell(EventMsg::Create { reply: tx });
428 let r = tokio::time::timeout(Duration::from_secs(2), rx)
429 .await
430 .unwrap()
431 .unwrap();
432 assert!(matches!(r, Err(GpuError::Unrecoverable(_))));
433
434 sys.terminate().await;
435 }
436
437 #[cfg(feature = "cuda-ipc")]
438 #[test]
439 fn ipc_event_handle_serializes() {
440 let bytes: [u8; 64] = std::array::from_fn(|i| i as u8);
441 let h = IpcEventHandle::from_bytes(bytes);
442 let round = h.as_bytes();
443 assert_eq!(round, bytes);
444 fn assert_send_sync<T: Send + Sync>() {}
446 assert_send_sync::<IpcEventHandle>();
447 let _clone: IpcEventHandle = h;
448 }
449}