libobs_wrapper/runtime.rs
1//! Runtime management for safe OBS API access across threads
2//!
3//! This module provides the core thread management functionality for the libobs-wrapper.
4//! It ensures that OBS API calls are always executed on the same thread, as required by
5//! the OBS API, while still allowing application code to interact with OBS from any thread.
6//!
7//! # Thread Safety
8//!
9//! The OBS C API is not thread-safe and requires that all operations occur on the same thread.
10//! The `ObsRuntime` struct creates a dedicated thread for all OBS operations and manages
11//! message passing between application threads and the OBS thread.
12//!
13//! # Async and Blocking APIs
14//!
15//! The runtime supports both async and blocking APIs:
16//! - By default, all operations are asynchronous
17//! - With the `blocking` feature enabled, operations are synchronous
18//!
19//! # Example
20//!
21//! ```no_run
22//! use libobs_wrapper::runtime::ObsRuntime;
23//! use libobs_wrapper::utils::StartupInfo;
24//!
25//! async fn example() {
26//! // Assuming that the OBS context is already initialized
27//!
28//! // Run an operation on the OBS thread
29//! let runtime = context.runtime();
30
31//! runtime.run_with_obs(|| {
32//! // This code runs on the OBS thread
33//! println!("Running on OBS thread");
34//! }).await.unwrap();
35//! }
36//! ```
37
38use std::ffi::CStr;
39use std::sync::mpsc::{channel, Sender};
40use std::sync::Arc;
41use std::{fmt::Debug, thread::JoinHandle};
42use std::{ptr, thread};
43use tokio::sync::oneshot;
44
45use crate::bootstrap::bootstrap;
46use crate::crash_handler::main_crash_handler;
47use crate::enums::{ObsLogLevel, ObsResetVideoStatus};
48use crate::logger::{extern_log_callback, internal_log_global, LOGGER};
49use crate::unsafe_send::Sendable;
50use crate::utils::initialization::load_debug_privilege;
51use crate::utils::{ObsBootstrapError, ObsError, ObsModules, ObsString};
52use crate::{
53 context::OBS_THREAD_ID,
54 utils::{async_sync::Mutex, StartupInfo},
55};
56use crate::{mutex_blocking_lock, oneshot_rx_recv};
57
58/// Command type for operations to perform on the OBS thread
59enum ObsCommand {
60 /// Execute a function on the OBS thread and send result back
61 Execute(
62 Box<dyn FnOnce() -> Box<dyn std::any::Any + Send> + Send>,
63 oneshot::Sender<Box<dyn std::any::Any + Send>>,
64 ),
65 /// Signal the OBS thread to terminate
66 Terminate,
67}
68
69/// Return type for OBS runtime initialization
70pub enum ObsRuntimeReturn {
71 /// The OBS context is ready to use
72 Done((ObsRuntime, ObsModules, StartupInfo)),
73
74 /// The application must be restarted to apply OBS updates
75 Restart,
76}
77
78/// Core runtime that manages the OBS thread
79///
80/// This struct represents the runtime environment for OBS operations.
81/// It creates and manages a dedicated thread for OBS API calls to
82/// ensure thread safety while allowing interaction from any thread.
83///
84/// # Thread Safety
85///
86/// `ObsRuntime` can be safely cloned and shared across threads. All operations
87/// are automatically dispatched to the dedicated OBS thread.
88///
89/// # Lifecycle Management
90///
91/// When the last `ObsRuntime` instance is dropped, the OBS thread is automatically
92/// shut down and all OBS resources are properly released.
93///
94/// # Examples
95///
96/// Creating a runtime:
97///
98/// ```
99/// use libobs_wrapper::runtime::ObsRuntime;
100/// use libobs_wrapper::utils::StartupInfo;
101///
102/// async fn example() {
103/// let startup_info = StartupInfo::default();
104/// let (runtime, _, _) = match ObsRuntime::startup(startup_info).await.unwrap() {
105/// ObsRuntimeReturn::Done(res) => res,
106/// _ => panic!("OBS initialization failed"),
107/// };
108/// // Now you can use runtime to interact with OBS
109/// }
110/// ```
111#[derive(Debug, Clone)]
112pub struct ObsRuntime {
113 command_sender: Arc<Sender<ObsCommand>>,
114 _guard: Arc<_ObsRuntimeGuard>,
115}
116
117impl ObsRuntime {
118 /// Initializes the OBS runtime.
119 ///
120 /// This function starts up OBS on a dedicated thread and prepares it for use.
121 /// It handles bootstrapping (if configured), OBS initialization, module loading,
122 /// and setup of audio/video subsystems.
123 ///
124 /// # Parameters
125 ///
126 /// * `options` - The startup configuration for OBS
127 ///
128 /// # Returns
129 ///
130 /// A `Result` containing either:
131 /// - `ObsRuntimeReturn::Done` with the initialized runtime, modules, and startup info
132 /// - `ObsRuntimeReturn::Restart` if OBS needs to be updated and the application should restart
133 /// - An `ObsError` if initialization fails
134 ///
135 /// # Examples
136 ///
137 /// ```
138 /// use libobs_wrapper::runtime::{ObsRuntime, ObsRuntimeReturn};
139 /// use libobs_wrapper::utils::StartupInfo;
140 ///
141 /// async fn initialize() {
142 /// let startup_info = StartupInfo::default();
143 /// match ObsRuntime::startup(startup_info).await {
144 /// Ok(ObsRuntimeReturn::Done(runtime_components)) => {
145 /// // Use the initialized runtime
146 /// },
147 /// Ok(ObsRuntimeReturn::Restart) => {
148 /// // Handle restart for OBS update
149 /// },
150 /// Err(e) => {
151 /// // Handle initialization error
152 /// }
153 /// }
154 /// }
155 /// ```
156 #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
157 pub(crate) async fn startup(mut options: StartupInfo) -> Result<ObsRuntimeReturn, ObsError> {
158 // Check if OBS is already running on another thread
159 let obs_id = OBS_THREAD_ID.lock().await;
160 if obs_id.is_some() {
161 return Err(ObsError::ThreadFailure);
162 }
163
164 drop(obs_id);
165
166 // Handle bootstrapping if enabled and configured
167 #[cfg(feature = "bootstrapper")]
168 if options.bootstrap_handler.is_some() {
169 use crate::bootstrap::BootstrapStatus;
170 use futures_util::pin_mut;
171 #[cfg(not(feature = "blocking"))]
172 use futures_util::stream::StreamExt;
173
174 log::trace!("Starting bootstrapper");
175 let stream = bootstrap(&options.bootstrapper_options)
176 .await
177 .map_err(|e| {
178 ObsError::BootstrapperFailure(ObsBootstrapError::GeneralError(e.to_string()))
179 })?;
180 if let Some(stream) = stream {
181 pin_mut!(stream);
182 #[cfg(feature = "blocking")]
183 let mut stream = futures::executor::block_on_stream(stream);
184
185 log::trace!("Waiting for bootstrapper to finish");
186 while let Some(item) = stream.next().await {
187 match item {
188 BootstrapStatus::Downloading(progress, message) => {
189 if let Some(handler) = &mut options.bootstrap_handler {
190 handler
191 .handle_downloading(progress, message)
192 .await
193 .map_err(|e| {
194 ObsError::BootstrapperFailure(
195 ObsBootstrapError::DownloadError(e.to_string()),
196 )
197 })?;
198 }
199 }
200 BootstrapStatus::Extracting(progress, message) => {
201 if let Some(handler) = &mut options.bootstrap_handler {
202 handler.handle_extraction(progress, message).await.map_err(
203 |e| {
204 ObsError::BootstrapperFailure(
205 ObsBootstrapError::ExtractError(e.to_string()),
206 )
207 },
208 )?;
209 }
210 }
211 BootstrapStatus::Error(err) => {
212 return Err(ObsError::BootstrapperFailure(
213 ObsBootstrapError::GeneralError(err.to_string()),
214 ));
215 }
216 BootstrapStatus::RestartRequired => {
217 return Ok(ObsRuntimeReturn::Restart);
218 }
219 }
220 }
221 }
222 }
223
224 log::trace!("Initializing OBS context");
225 return Ok(ObsRuntimeReturn::Done(
226 ObsRuntime::init(options).await.map_err(|e| {
227 ObsError::BootstrapperFailure(ObsBootstrapError::GeneralError(e.to_string()))
228 })?,
229 ));
230 }
231
232 /// Internal initialization method
233 ///
234 /// Creates the OBS thread and performs core initialization.
235 #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
236 async fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
237 let (command_sender, command_receiver) = channel();
238 let (init_tx, init_rx) = oneshot::channel();
239
240 let handle = std::thread::spawn(move || {
241 log::trace!("Starting OBS thread");
242
243 let res = Self::initialize_inner(info);
244
245 match res {
246 Ok((info, modules)) => {
247 log::trace!("OBS context initialized successfully");
248 let e = init_tx.send(Ok((Sendable(modules), info)));
249 if let Err(err) = e {
250 log::error!("Failed to send initialization signal: {:?}", err);
251 }
252
253 // Process commands until termination
254 while let Ok(command) = command_receiver.recv() {
255 match command {
256 ObsCommand::Execute(func, result_sender) => {
257 let result = func();
258 let _ = result_sender.send(result);
259 }
260 ObsCommand::Terminate => break,
261 }
262 }
263
264 Self::shutdown_inner();
265 }
266 Err(err) => {
267 log::error!("Failed to initialize OBS context: {:?}", err);
268 let _ = init_tx.send(Err(err));
269 }
270 }
271 });
272
273 log::trace!("Waiting for OBS thread to initialize");
274 // Wait for initialization to complete
275 let (mut m, info) = oneshot_rx_recv!(init_rx)??;
276
277 let handle = Arc::new(Mutex::new(Some(handle)));
278 let command_sender = Arc::new(command_sender);
279 let runtime = Self {
280 command_sender: command_sender.clone(),
281 _guard: Arc::new(_ObsRuntimeGuard {
282 handle,
283 command_sender,
284 }),
285 };
286
287 m.0.runtime = Some(runtime.clone());
288 Ok((runtime, m.0, info))
289 }
290
291 /// Executes an operation on the OBS thread without returning a value
292 ///
293 /// This is a convenience wrapper around `run_with_obs_result` for operations
294 /// that don't need to return a value.
295 ///
296 /// # Parameters
297 ///
298 /// * `operation` - A function to execute on the OBS thread
299 ///
300 /// # Returns
301 ///
302 /// A `Result` indicating success or failure
303 ///
304 /// # Examples
305 ///
306 /// ```
307 /// use libobs_wrapper::runtime::ObsRuntime;
308 ///
309 /// async fn example(runtime: &ObsRuntime) {
310 /// runtime.run_with_obs(|| {
311 /// // This code runs on the OBS thread
312 /// println!("Hello from the OBS thread!");
313 /// }).await.unwrap();
314 /// }
315 /// ```
316 #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
317 pub async fn run_with_obs<F>(&self, operation: F) -> anyhow::Result<()>
318 where
319 F: FnOnce() -> () + Send + 'static,
320 {
321 self.run_with_obs_result(move || {
322 operation();
323 Result::<(), anyhow::Error>::Ok(())
324 })
325 .await??;
326
327 Ok(())
328 }
329
330 /// Executes an operation on the OBS thread and returns a result (blocking version)
331 ///
332 /// This method blocks the current thread until the operation completes.
333 ///
334 /// # Parameters
335 ///
336 /// * `operation` - A function to execute on the OBS thread
337 ///
338 /// # Returns
339 ///
340 /// A `Result` containing the value returned by the operation
341 ///
342 /// # Panics
343 ///
344 /// This function panics if called within an asynchronous execution context.
345 pub fn run_with_obs_result_blocking<F, T>(&self, operation: F) -> anyhow::Result<T>
346 where
347 F: FnOnce() -> T + Send + 'static,
348 T: Send + 'static,
349 {
350 let (tx, rx) = oneshot::channel();
351
352 // Create a wrapper closure that boxes the result as Any
353 let wrapper = move || -> Box<dyn std::any::Any + Send> {
354 let result = operation();
355 Box::new(result)
356 };
357
358 self.command_sender
359 .send(ObsCommand::Execute(Box::new(wrapper), tx))
360 .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
361
362 let result = rx
363 .blocking_recv()
364 .map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
365
366 // Downcast the Any type back to T
367 result
368 .downcast::<T>()
369 .map(|boxed| *boxed)
370 .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))
371 }
372
373 /// Executes an operation on the OBS thread and returns a result (async version)
374 ///
375 /// This method dispatches a task to the OBS thread and asynchronously waits for the result.
376 ///
377 /// # Parameters
378 ///
379 /// * `operation` - A function to execute on the OBS thread
380 ///
381 /// # Returns
382 ///
383 /// A `Result` containing the value returned by the operation
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use libobs_wrapper::runtime::ObsRuntime;
389 ///
390 /// async fn example(runtime: &ObsRuntime) {
391 /// let version = runtime.run_with_obs_result(|| {
392 /// // This code runs on the OBS thread
393 /// unsafe { libobs::obs_get_version_string() }
394 /// }).await.unwrap();
395 ///
396 /// println!("OBS Version: {:?}", version);
397 /// }
398 /// ```
399 #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
400 pub async fn run_with_obs_result<F, T>(&self, operation: F) -> anyhow::Result<T>
401 where
402 F: FnOnce() -> T + Send + 'static,
403 T: Send + 'static,
404 {
405 let (tx, rx) = oneshot::channel();
406
407 // Create a wrapper closure that boxes the result as Any
408 let wrapper = move || -> Box<dyn std::any::Any + Send> {
409 let result = operation();
410 Box::new(result)
411 };
412
413 self.command_sender
414 .send(ObsCommand::Execute(Box::new(wrapper), tx))
415 .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
416
417 let result =
418 oneshot_rx_recv!(rx).map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
419
420 // Downcast the Any type back to T
421 let res = result
422 .downcast::<T>()
423 .map(|boxed| *boxed)
424 .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))?;
425
426 Ok(res)
427 }
428
429 /// Initializes the libobs context and prepares it for recording.
430 ///
431 /// This method handles core OBS initialization including:
432 /// - Starting up the OBS core (`obs_startup`)
433 /// - Resetting video and audio subsystems
434 /// - Loading OBS modules
435 ///
436 /// # Parameters
437 ///
438 /// * `info` - The startup configuration for OBS
439 ///
440 /// # Returns
441 ///
442 /// A `Result` containing the updated startup info and loaded modules, or an error
443 fn initialize_inner(mut info: StartupInfo) -> Result<(StartupInfo, ObsModules), ObsError> {
444 // Checks that there are no other threads
445 // using libobs using a static Mutex.
446 //
447 // Fun fact: this code caused a huge debate
448 // about whether AtomicBool is UB or whatever
449 // in the Rust Programming Discord server.
450 // I didn't read too closely into it because
451 // they were talking about what architecture
452 // fridges have or something.
453 //
454 // Since this function is not meant to be
455 // high-performance or called a thousand times,
456 // a Mutex is fine here.#
457 let mut mutex_value = mutex_blocking_lock!(OBS_THREAD_ID);
458
459 // Directly checks if the value of the
460 // Mutex is false. If true, then error.
461 // We've checked already but keeping this
462 if *mutex_value != None {
463 return Err(ObsError::ThreadFailure);
464 }
465
466 // If the Mutex is None, then change
467 // it to current thread ID so that no
468 // other thread can use libobs while
469 // the current thread is using it.
470 *mutex_value = Some(thread::current().id());
471
472 // Install DLL blocklist hook here
473
474 unsafe {
475 libobs::obs_init_win32_crash_handler();
476 }
477
478 // Set logger, load debug privileges and crash handler
479 unsafe {
480 libobs::base_set_crash_handler(Some(main_crash_handler), std::ptr::null_mut());
481 load_debug_privilege();
482 libobs::base_set_log_handler(Some(extern_log_callback), std::ptr::null_mut());
483 }
484
485 let mut log_callback = LOGGER.lock().map_err(|_e| ObsError::MutexFailure)?;
486
487 *log_callback = info.logger.take().expect("Logger can never be null");
488 drop(log_callback);
489
490 // Locale will only be used internally by
491 // libobs for logging purposes, making it
492 // unnecessary to support other languages.
493 let locale_str = ObsString::new("en-US");
494 let startup_status =
495 unsafe { libobs::obs_startup(locale_str.as_ptr().0, ptr::null(), ptr::null_mut()) };
496
497 let version = unsafe { libobs::obs_get_version_string() };
498 let version_cstr = unsafe { CStr::from_ptr(version) };
499 let version_str = version_cstr.to_string_lossy().into_owned();
500
501 internal_log_global(ObsLogLevel::Info, format!("OBS {}", version_str));
502 internal_log_global(
503 ObsLogLevel::Info,
504 "---------------------------------".to_string(),
505 );
506
507 if !startup_status {
508 return Err(ObsError::Failure);
509 }
510
511 let mut obs_modules = ObsModules::add_paths(&info.startup_paths);
512
513 // Note that audio is meant to only be reset
514 // once. See the link below for information.
515 //
516 // https://docs.obsproject.com/frontends
517 unsafe {
518 libobs::obs_reset_audio2(info.obs_audio_info.as_ptr().0);
519 }
520
521 // Resets the video context. Note that this
522 // is similar to Self::reset_video, but it
523 // does not call that function because the
524 // ObsContext struct is not created yet,
525 // and also because there is no need to free
526 // anything tied to the OBS context.
527 let reset_video_status = num_traits::FromPrimitive::from_i32(unsafe {
528 libobs::obs_reset_video(info.obs_video_info.as_ptr())
529 });
530
531 let reset_video_status = match reset_video_status {
532 Some(x) => x,
533 None => ObsResetVideoStatus::Failure,
534 };
535
536 if reset_video_status != ObsResetVideoStatus::Success {
537 return Err(ObsError::ResetVideoFailure(reset_video_status));
538 }
539
540 obs_modules.load_modules();
541
542 internal_log_global(
543 ObsLogLevel::Info,
544 "==== Startup complete ===============================================".to_string(),
545 );
546
547 Ok((info, obs_modules))
548 }
549
550 /// Shuts down the OBS context and cleans up resources
551 ///
552 /// This method performs a clean shutdown of OBS, including:
553 /// - Removing sources from output channels
554 /// - Calling `obs_shutdown` to clean up OBS resources
555 /// - Removing log and crash handlers
556 /// - Checking for memory leaks
557 fn shutdown_inner() {
558 // Clean up sources
559 for i in 0..libobs::MAX_CHANNELS {
560 unsafe { libobs::obs_set_output_source(i, ptr::null_mut()) };
561 }
562
563 unsafe { libobs::obs_shutdown() }
564
565 let r = LOGGER.lock();
566 match r {
567 Ok(mut logger) => {
568 logger.log(ObsLogLevel::Info, "OBS context shutdown.".to_string());
569 let allocs = unsafe { libobs::bnum_allocs() };
570
571 // Increasing this to 1 because of whats described below
572 let mut notice = "";
573 let level = if allocs > 1 {
574 ObsLogLevel::Error
575 } else {
576 notice = " (this is an issue in the OBS source code that cannot be fixed)";
577 ObsLogLevel::Info
578 };
579 // One memory leak is expected here because OBS does not free array elements of the obs_data_path when calling obs_add_data_path
580 // even when obs_remove_data_path is called. This is a bug in OBS.
581 logger.log(level, format!("Number of memory leaks: {}{}", allocs, notice))
582 }
583 Err(_) => {
584 println!("OBS context shutdown. (but couldn't lock logger)");
585 }
586 }
587
588 unsafe {
589 // Clean up log and crash handler
590 libobs::base_set_crash_handler(None, std::ptr::null_mut());
591 libobs::base_set_log_handler(None, std::ptr::null_mut());
592 }
593
594 let mut mutex_value = mutex_blocking_lock!(OBS_THREAD_ID);
595 *mutex_value = None;
596 }
597}
598
599/// Guard object to ensure proper cleanup when the runtime is dropped
600///
601/// This guard ensures that when the last reference to the runtime is dropped,
602/// the OBS thread is properly terminated and all resources are cleaned up.
603#[derive(Debug)]
604pub struct _ObsRuntimeGuard {
605 /// Thread handle for the OBS thread
606 handle: Arc<Mutex<Option<JoinHandle<()>>>>,
607 /// Sender channel for the OBS thread
608 command_sender: Arc<Sender<ObsCommand>>,
609}
610
611impl _ObsRuntimeGuard {
612 /// Shutdown the OBS runtime and terminate the thread
613 ///
614 /// This method sends a terminate command to the OBS thread and waits
615 /// for it to complete its shutdown process.
616 ///
617 /// # Returns
618 ///
619 /// A `Result` indicating success or failure
620 #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
621 async fn shutdown(&mut self) -> anyhow::Result<()> {
622 self.command_sender
623 .send(ObsCommand::Terminate)
624 .map_err(|_| anyhow::anyhow!("Failed to send termination command to OBS thread"))?;
625
626 // Wait for the thread to finish
627 let mut handle = self.handle.lock().await;
628 let handle = handle.take().expect("Handle can not be empty");
629
630 if let Err(err) = handle.join() {
631 return Err(anyhow::anyhow!("OBS thread panicked: {:?}", err));
632 }
633 Ok(())
634 }
635}
636
637impl Drop for _ObsRuntimeGuard {
638 /// Ensures the OBS thread is properly shut down when the runtime is dropped
639 fn drop(&mut self) {
640 #[cfg(feature = "blocking")]
641 let r = self.shutdown();
642
643 #[cfg(not(feature = "blocking"))]
644 let r = futures::executor::block_on(self.shutdown());
645
646 if thread::panicking() {
647 return;
648 }
649
650 r.unwrap();
651 }
652}