Skip to main content

atomr_accel_cuda/
observability.rs

1//! Observability glue: install [`atomr_telemetry::TelemetryExtension`]
2//! on a host `ActorSystem` and expose a small set of GPU-specific
3//! probes that callers feed from kernel actors / placement actors /
4//! stream allocators.
5//!
6//! Probes are designed to be **callable from anywhere**: the helpers
7//! here just look up the installed extension via
8//! `TelemetryExtension::from_system(...)` and update an internal
9//! counter. When telemetry isn't installed the call short-circuits.
10//!
11//! The dashboard at `atomr-dashboard/` (in the atomr workspace)
12//! consumes the resulting [`atomr_telemetry::dto::NodeSnapshot`] over
13//! WebSocket — point it at any atomr-accel-cuda host and the GPU probes
14//! show up automatically alongside the standard actor / cluster /
15//! sharding panels.
16
17use std::sync::Arc;
18
19use atomr_core::actor::ActorSystem;
20use atomr_telemetry::TelemetryExtension;
21use parking_lot::Mutex;
22
23/// Convenience helper: construct + install a telemetry extension with
24/// sensible defaults (1024-deep broadcast bus). Returns the shared
25/// `Arc` so the caller can register exporters or read snapshots.
26///
27/// ```ignore
28/// let sys = ActorSystem::create("gpu-host", Config::empty()).await?;
29/// let _telemetry = atomr_accel_cuda::observability::install(&sys, "gpu-host-1");
30/// ```
31pub fn install(system: &ActorSystem, node_name: impl Into<String>) -> Arc<TelemetryExtension> {
32    TelemetryExtension::new(node_name, 1024).install(system)
33}
34
35/// In-memory counters for the GPU-specific probes. The probes are
36/// passive: kernel actors / stream allocators bump the counters
37/// directly; the dashboard polls via [`GpuProbes::snapshot`].
38///
39/// Construct one per host (or per device if you want per-device
40/// breakdowns), share via `Arc`, and pass to whichever code paths
41/// produce the events.
42#[derive(Default)]
43pub struct GpuProbes {
44    inner: Mutex<GpuProbeState>,
45}
46
47#[derive(Default, Debug, Clone)]
48pub struct GpuProbeState {
49    /// Cumulative number of `GpuRef` allocations that succeeded.
50    pub allocations_total: u64,
51    /// Cumulative number of `GpuRef` allocations that returned
52    /// `OutOfMemory`.
53    pub oom_total: u64,
54    /// Highest observed `DeviceState::generation` (bumps on context
55    /// rebuild).
56    pub max_generation_observed: u64,
57    /// Currently in-flight kernel launches across all actors.
58    pub kernels_in_flight: u32,
59    /// Cumulative kernel-launch count.
60    pub kernels_total: u64,
61    /// Free / total VRAM as last reported by a `Stats` poll. 0 when
62    /// no poll has occurred yet.
63    pub vram_free_bytes: u64,
64    pub vram_total_bytes: u64,
65}
66
67impl GpuProbes {
68    pub fn new() -> Arc<Self> {
69        Arc::new(Self::default())
70    }
71
72    /// Record a successful allocation.
73    pub fn record_alloc_ok(&self) {
74        let mut g = self.inner.lock();
75        g.allocations_total = g.allocations_total.saturating_add(1);
76    }
77
78    /// Record an allocation that hit OOM.
79    pub fn record_alloc_oom(&self) {
80        let mut g = self.inner.lock();
81        g.oom_total = g.oom_total.saturating_add(1);
82    }
83
84    /// Record observation of a new `DeviceState::generation`. The
85    /// stored value is monotonically max'd.
86    pub fn record_generation(&self, gen: u64) {
87        let mut g = self.inner.lock();
88        if gen > g.max_generation_observed {
89            g.max_generation_observed = gen;
90        }
91    }
92
93    /// Bump the in-flight kernel count when a kernel is enqueued.
94    pub fn kernel_enter(&self) {
95        let mut g = self.inner.lock();
96        g.kernels_in_flight = g.kernels_in_flight.saturating_add(1);
97        g.kernels_total = g.kernels_total.saturating_add(1);
98    }
99
100    /// Decrement the in-flight kernel count when a kernel completes.
101    pub fn kernel_exit(&self) {
102        let mut g = self.inner.lock();
103        g.kernels_in_flight = g.kernels_in_flight.saturating_sub(1);
104    }
105
106    /// Record the latest VRAM snapshot (typically from
107    /// `cuMemGetInfo` once per `PlacementActor::PollStats` tick).
108    pub fn record_vram(&self, free_bytes: u64, total_bytes: u64) {
109        let mut g = self.inner.lock();
110        g.vram_free_bytes = free_bytes;
111        g.vram_total_bytes = total_bytes;
112    }
113
114    /// Snapshot the current counter state.
115    pub fn snapshot(&self) -> GpuProbeState {
116        self.inner.lock().clone()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use atomr_config::Config;
124
125    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
126    async fn install_returns_handle_and_from_system_finds_it() {
127        let sys = ActorSystem::create("obs-test", Config::empty())
128            .await
129            .unwrap();
130        let handle = install(&sys, "obs-test");
131        assert_eq!(handle.node, "obs-test");
132        let from_sys = TelemetryExtension::from_system(&sys).expect("installed");
133        assert_eq!(from_sys.node, "obs-test");
134        sys.terminate().await;
135    }
136
137    #[test]
138    fn gpu_probes_record_lifecycle() {
139        let p = GpuProbes::new();
140        p.record_alloc_ok();
141        p.record_alloc_ok();
142        p.record_alloc_oom();
143        p.record_generation(1);
144        p.record_generation(3);
145        p.record_generation(2); // out-of-order — max stays at 3
146        p.kernel_enter();
147        p.kernel_enter();
148        p.kernel_exit();
149        p.record_vram(2048, 4096);
150        let s = p.snapshot();
151        assert_eq!(s.allocations_total, 2);
152        assert_eq!(s.oom_total, 1);
153        assert_eq!(s.max_generation_observed, 3);
154        assert_eq!(s.kernels_in_flight, 1);
155        assert_eq!(s.kernels_total, 2);
156        assert_eq!(s.vram_free_bytes, 2048);
157        assert_eq!(s.vram_total_bytes, 4096);
158    }
159}