atomr_accel_cuda/observability.rs
1//! Observability glue: install [`atomr_telemetry::TelemetryExtension`]
2//! on a host `ActorSystem` and expose a small set of GPU-specific
3//! probes that callers feed from kernel actors / placement actors /
4//! stream allocators.
5//!
6//! Probes are designed to be **callable from anywhere**: the helpers
7//! here just look up the installed extension via
8//! `TelemetryExtension::from_system(...)` and update an internal
9//! counter. When telemetry isn't installed the call short-circuits.
10//!
11//! The dashboard at `atomr-dashboard/` (in the atomr workspace)
12//! consumes the resulting [`atomr_telemetry::dto::NodeSnapshot`] over
13//! WebSocket — point it at any atomr-accel-cuda host and the GPU probes
14//! show up automatically alongside the standard actor / cluster /
15//! sharding panels.
16
17use std::sync::Arc;
18
19use atomr_core::actor::ActorSystem;
20use atomr_telemetry::TelemetryExtension;
21use parking_lot::Mutex;
22
23/// Convenience helper: construct + install a telemetry extension with
24/// sensible defaults (1024-deep broadcast bus). Returns the shared
25/// `Arc` so the caller can register exporters or read snapshots.
26///
27/// ```ignore
28/// let sys = ActorSystem::create("gpu-host", Config::empty()).await?;
29/// let _telemetry = atomr_accel_cuda::observability::install(&sys, "gpu-host-1");
30/// ```
31pub fn install(system: &ActorSystem, node_name: impl Into<String>) -> Arc<TelemetryExtension> {
32 TelemetryExtension::new(node_name, 1024).install(system)
33}
34
35/// In-memory counters for the GPU-specific probes. The probes are
36/// passive: kernel actors / stream allocators bump the counters
37/// directly; the dashboard polls via [`GpuProbes::snapshot`].
38///
39/// Construct one per host (or per device if you want per-device
40/// breakdowns), share via `Arc`, and pass to whichever code paths
41/// produce the events.
42#[derive(Default)]
43pub struct GpuProbes {
44 inner: Mutex<GpuProbeState>,
45}
46
47#[derive(Default, Debug, Clone)]
48pub struct GpuProbeState {
49 /// Cumulative number of `GpuRef` allocations that succeeded.
50 pub allocations_total: u64,
51 /// Cumulative number of `GpuRef` allocations that returned
52 /// `OutOfMemory`.
53 pub oom_total: u64,
54 /// Highest observed `DeviceState::generation` (bumps on context
55 /// rebuild).
56 pub max_generation_observed: u64,
57 /// Currently in-flight kernel launches across all actors.
58 pub kernels_in_flight: u32,
59 /// Cumulative kernel-launch count.
60 pub kernels_total: u64,
61 /// Free / total VRAM as last reported by a `Stats` poll. 0 when
62 /// no poll has occurred yet.
63 pub vram_free_bytes: u64,
64 pub vram_total_bytes: u64,
65}
66
67impl GpuProbes {
68 pub fn new() -> Arc<Self> {
69 Arc::new(Self::default())
70 }
71
72 /// Record a successful allocation.
73 pub fn record_alloc_ok(&self) {
74 let mut g = self.inner.lock();
75 g.allocations_total = g.allocations_total.saturating_add(1);
76 }
77
78 /// Record an allocation that hit OOM.
79 pub fn record_alloc_oom(&self) {
80 let mut g = self.inner.lock();
81 g.oom_total = g.oom_total.saturating_add(1);
82 }
83
84 /// Record observation of a new `DeviceState::generation`. The
85 /// stored value is monotonically max'd.
86 pub fn record_generation(&self, gen: u64) {
87 let mut g = self.inner.lock();
88 if gen > g.max_generation_observed {
89 g.max_generation_observed = gen;
90 }
91 }
92
93 /// Bump the in-flight kernel count when a kernel is enqueued.
94 pub fn kernel_enter(&self) {
95 let mut g = self.inner.lock();
96 g.kernels_in_flight = g.kernels_in_flight.saturating_add(1);
97 g.kernels_total = g.kernels_total.saturating_add(1);
98 }
99
100 /// Decrement the in-flight kernel count when a kernel completes.
101 pub fn kernel_exit(&self) {
102 let mut g = self.inner.lock();
103 g.kernels_in_flight = g.kernels_in_flight.saturating_sub(1);
104 }
105
106 /// Record the latest VRAM snapshot (typically from
107 /// `cuMemGetInfo` once per `PlacementActor::PollStats` tick).
108 pub fn record_vram(&self, free_bytes: u64, total_bytes: u64) {
109 let mut g = self.inner.lock();
110 g.vram_free_bytes = free_bytes;
111 g.vram_total_bytes = total_bytes;
112 }
113
114 /// Snapshot the current counter state.
115 pub fn snapshot(&self) -> GpuProbeState {
116 self.inner.lock().clone()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use atomr_config::Config;
124
125 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
126 async fn install_returns_handle_and_from_system_finds_it() {
127 let sys = ActorSystem::create("obs-test", Config::empty())
128 .await
129 .unwrap();
130 let handle = install(&sys, "obs-test");
131 assert_eq!(handle.node, "obs-test");
132 let from_sys = TelemetryExtension::from_system(&sys).expect("installed");
133 assert_eq!(from_sys.node, "obs-test");
134 sys.terminate().await;
135 }
136
137 #[test]
138 fn gpu_probes_record_lifecycle() {
139 let p = GpuProbes::new();
140 p.record_alloc_ok();
141 p.record_alloc_ok();
142 p.record_alloc_oom();
143 p.record_generation(1);
144 p.record_generation(3);
145 p.record_generation(2); // out-of-order — max stays at 3
146 p.kernel_enter();
147 p.kernel_enter();
148 p.kernel_exit();
149 p.record_vram(2048, 4096);
150 let s = p.snapshot();
151 assert_eq!(s.allocations_total, 2);
152 assert_eq!(s.oom_total, 1);
153 assert_eq!(s.max_generation_observed, 3);
154 assert_eq!(s.kernels_in_flight, 1);
155 assert_eq!(s.kernels_total, 2);
156 assert_eq!(s.vram_free_bytes, 2048);
157 assert_eq!(s.vram_total_bytes, 4096);
158 }
159}