libobs_wrapper/
runtime.rs

1//! Runtime management for safe OBS API access across threads
2//!
3//! This module provides the core thread management functionality for the libobs-wrapper.
4//! It ensures that OBS API calls are always executed on the same thread, as required by
5//! the OBS API, while still allowing application code to interact with OBS from any thread.
6//!
7//! # Thread Safety
8//!
9//! The OBS C API is not thread-safe and requires that all operations occur on the same thread.
10//! The `ObsRuntime` struct creates a dedicated thread for all OBS operations and manages
11//! message passing between application threads and the OBS thread.
12//!
13//! # Async and Blocking APIs
14//!
15//! The runtime supports both async and blocking APIs:
16//! - By default, all operations are asynchronous
17//! - With the `blocking` feature enabled, operations are synchronous
18//!
19//! # Example
20//!
21//! ```no_run
22//! use libobs_wrapper::runtime::ObsRuntime;
23//! use libobs_wrapper::utils::StartupInfo;
24//!
25//! async fn example() {
26//!     // Assuming that the OBS context is already initialized
27//!
28//!     // Run an operation on the OBS thread
29//!     let runtime = context.runtime();
30
31//!     runtime.run_with_obs(|| {
32//!         // This code runs on the OBS thread
33//!         println!("Running on OBS thread");
34//!     }).await.unwrap();
35//! }
36//! ```
37
38use std::ffi::CStr;
39use std::sync::mpsc::{channel, Sender};
40use std::sync::Arc;
41use std::{fmt::Debug, thread::JoinHandle};
42use std::{ptr, thread};
43use tokio::sync::oneshot;
44
45use crate::bootstrap::bootstrap;
46use crate::crash_handler::main_crash_handler;
47use crate::enums::{ObsLogLevel, ObsResetVideoStatus};
48use crate::logger::{extern_log_callback, internal_log_global, LOGGER};
49use crate::unsafe_send::Sendable;
50use crate::utils::initialization::load_debug_privilege;
51use crate::utils::{ObsBootstrapError, ObsError, ObsModules, ObsString};
52use crate::{
53    context::OBS_THREAD_ID,
54    utils::{async_sync::Mutex, StartupInfo},
55};
56use crate::{mutex_blocking_lock, oneshot_rx_recv};
57
58/// Command type for operations to perform on the OBS thread
59enum ObsCommand {
60    /// Execute a function on the OBS thread and send result back
61    Execute(
62        Box<dyn FnOnce() -> Box<dyn std::any::Any + Send> + Send>,
63        oneshot::Sender<Box<dyn std::any::Any + Send>>,
64    ),
65    /// Signal the OBS thread to terminate
66    Terminate,
67}
68
69/// Return type for OBS runtime initialization
70pub enum ObsRuntimeReturn {
71    /// The OBS context is ready to use
72    Done((ObsRuntime, ObsModules, StartupInfo)),
73
74    /// The application must be restarted to apply OBS updates
75    Restart,
76}
77
78/// Core runtime that manages the OBS thread
79///
80/// This struct represents the runtime environment for OBS operations.
81/// It creates and manages a dedicated thread for OBS API calls to
82/// ensure thread safety while allowing interaction from any thread.
83///
84/// # Thread Safety
85///
86/// `ObsRuntime` can be safely cloned and shared across threads. All operations
87/// are automatically dispatched to the dedicated OBS thread.
88///
89/// # Lifecycle Management
90///
91/// When the last `ObsRuntime` instance is dropped, the OBS thread is automatically
92/// shut down and all OBS resources are properly released.
93///
94/// # Examples
95///
96/// Creating a runtime:
97///
98/// ```
99/// use libobs_wrapper::runtime::ObsRuntime;
100/// use libobs_wrapper::utils::StartupInfo;
101///
102/// async fn example() {
103///     let startup_info = StartupInfo::default();
104///     let (runtime, _, _) = match ObsRuntime::startup(startup_info).await.unwrap() {
105///         ObsRuntimeReturn::Done(res) => res,
106///         _ => panic!("OBS initialization failed"),
107///     };
108///     // Now you can use runtime to interact with OBS
109/// }
110/// ```
111#[derive(Debug, Clone)]
112pub struct ObsRuntime {
113    command_sender: Arc<Sender<ObsCommand>>,
114    _guard: Arc<_ObsRuntimeGuard>,
115}
116
117impl ObsRuntime {
118    /// Initializes the OBS runtime.
119    ///
120    /// This function starts up OBS on a dedicated thread and prepares it for use.
121    /// It handles bootstrapping (if configured), OBS initialization, module loading,
122    /// and setup of audio/video subsystems.
123    ///
124    /// # Parameters
125    ///
126    /// * `options` - The startup configuration for OBS
127    ///
128    /// # Returns
129    ///
130    /// A `Result` containing either:
131    /// - `ObsRuntimeReturn::Done` with the initialized runtime, modules, and startup info
132    /// - `ObsRuntimeReturn::Restart` if OBS needs to be updated and the application should restart
133    /// - An `ObsError` if initialization fails
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// use libobs_wrapper::runtime::{ObsRuntime, ObsRuntimeReturn};
139    /// use libobs_wrapper::utils::StartupInfo;
140    ///
141    /// async fn initialize() {
142    ///     let startup_info = StartupInfo::default();
143    ///     match ObsRuntime::startup(startup_info).await {
144    ///         Ok(ObsRuntimeReturn::Done(runtime_components)) => {
145    ///             // Use the initialized runtime
146    ///         },
147    ///         Ok(ObsRuntimeReturn::Restart) => {
148    ///             // Handle restart for OBS update
149    ///         },
150    ///         Err(e) => {
151    ///             // Handle initialization error
152    ///         }
153    ///     }
154    /// }
155    /// ```
156    #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
157    pub(crate) async fn startup(mut options: StartupInfo) -> Result<ObsRuntimeReturn, ObsError> {
158        // Check if OBS is already running on another thread
159        let obs_id = OBS_THREAD_ID.lock().await;
160        if obs_id.is_some() {
161            return Err(ObsError::ThreadFailure);
162        }
163
164        drop(obs_id);
165
166        // Handle bootstrapping if enabled and configured
167        #[cfg(feature = "bootstrapper")]
168        if options.bootstrap_handler.is_some() {
169            use crate::bootstrap::BootstrapStatus;
170            use futures_util::pin_mut;
171            #[cfg(not(feature = "blocking"))]
172            use futures_util::stream::StreamExt;
173
174            log::trace!("Starting bootstrapper");
175            let stream = bootstrap(&options.bootstrapper_options)
176                .await
177                .map_err(|e| {
178                    ObsError::BootstrapperFailure(ObsBootstrapError::GeneralError(e.to_string()))
179                })?;
180            if let Some(stream) = stream {
181                pin_mut!(stream);
182                #[cfg(feature = "blocking")]
183                let mut stream = futures::executor::block_on_stream(stream);
184
185                log::trace!("Waiting for bootstrapper to finish");
186                while let Some(item) = stream.next().await {
187                    match item {
188                        BootstrapStatus::Downloading(progress, message) => {
189                            if let Some(handler) = &mut options.bootstrap_handler {
190                                handler
191                                    .handle_downloading(progress, message)
192                                    .await
193                                    .map_err(|e| {
194                                        ObsError::BootstrapperFailure(
195                                            ObsBootstrapError::DownloadError(e.to_string()),
196                                        )
197                                    })?;
198                            }
199                        }
200                        BootstrapStatus::Extracting(progress, message) => {
201                            if let Some(handler) = &mut options.bootstrap_handler {
202                                handler.handle_extraction(progress, message).await.map_err(
203                                    |e| {
204                                        ObsError::BootstrapperFailure(
205                                            ObsBootstrapError::ExtractError(e.to_string()),
206                                        )
207                                    },
208                                )?;
209                            }
210                        }
211                        BootstrapStatus::Error(err) => {
212                            return Err(ObsError::BootstrapperFailure(
213                                ObsBootstrapError::GeneralError(err.to_string()),
214                            ));
215                        }
216                        BootstrapStatus::RestartRequired => {
217                            return Ok(ObsRuntimeReturn::Restart);
218                        }
219                    }
220                }
221            }
222        }
223
224        log::trace!("Initializing OBS context");
225        return Ok(ObsRuntimeReturn::Done(
226            ObsRuntime::init(options).await.map_err(|e| {
227                ObsError::BootstrapperFailure(ObsBootstrapError::GeneralError(e.to_string()))
228            })?,
229        ));
230    }
231
232    /// Internal initialization method
233    ///
234    /// Creates the OBS thread and performs core initialization.
235    #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
236    async fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
237        let (command_sender, command_receiver) = channel();
238        let (init_tx, init_rx) = oneshot::channel();
239
240        let handle = std::thread::spawn(move || {
241            log::trace!("Starting OBS thread");
242
243            let res = Self::initialize_inner(info);
244
245            match res {
246                Ok((info, modules)) => {
247                    log::trace!("OBS context initialized successfully");
248                    let e = init_tx.send(Ok((Sendable(modules), info)));
249                    if let Err(err) = e {
250                        log::error!("Failed to send initialization signal: {:?}", err);
251                    }
252
253                    // Process commands until termination
254                    while let Ok(command) = command_receiver.recv() {
255                        match command {
256                            ObsCommand::Execute(func, result_sender) => {
257                                let result = func();
258                                let _ = result_sender.send(result);
259                            }
260                            ObsCommand::Terminate => break,
261                        }
262                    }
263
264                    Self::shutdown_inner();
265                }
266                Err(err) => {
267                    log::error!("Failed to initialize OBS context: {:?}", err);
268                    let _ = init_tx.send(Err(err));
269                }
270            }
271        });
272
273        log::trace!("Waiting for OBS thread to initialize");
274        // Wait for initialization to complete
275        let (mut m, info) = oneshot_rx_recv!(init_rx)??;
276
277        let handle = Arc::new(Mutex::new(Some(handle)));
278        let command_sender = Arc::new(command_sender);
279        let runtime = Self {
280            command_sender: command_sender.clone(),
281            _guard: Arc::new(_ObsRuntimeGuard {
282                handle,
283                command_sender,
284            }),
285        };
286
287        m.0.runtime = Some(runtime.clone());
288        Ok((runtime, m.0, info))
289    }
290
291    /// Executes an operation on the OBS thread without returning a value
292    ///
293    /// This is a convenience wrapper around `run_with_obs_result` for operations
294    /// that don't need to return a value.
295    ///
296    /// # Parameters
297    ///
298    /// * `operation` - A function to execute on the OBS thread
299    ///
300    /// # Returns
301    ///
302    /// A `Result` indicating success or failure
303    ///
304    /// # Examples
305    ///
306    /// ```
307    /// use libobs_wrapper::runtime::ObsRuntime;
308    ///
309    /// async fn example(runtime: &ObsRuntime) {
310    ///     runtime.run_with_obs(|| {
311    ///         // This code runs on the OBS thread
312    ///         println!("Hello from the OBS thread!");
313    ///     }).await.unwrap();
314    /// }
315    /// ```
316    #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
317    pub async fn run_with_obs<F>(&self, operation: F) -> anyhow::Result<()>
318    where
319        F: FnOnce() -> () + Send + 'static,
320    {
321        self.run_with_obs_result(move || {
322            operation();
323            Result::<(), anyhow::Error>::Ok(())
324        })
325        .await??;
326
327        Ok(())
328    }
329
330    /// Executes an operation on the OBS thread and returns a result (blocking version)
331    ///
332    /// This method blocks the current thread until the operation completes.
333    ///
334    /// # Parameters
335    ///
336    /// * `operation` - A function to execute on the OBS thread
337    ///
338    /// # Returns
339    ///
340    /// A `Result` containing the value returned by the operation
341    ///
342    /// # Panics
343    ///
344    /// This function panics if called within an asynchronous execution context.
345    pub fn run_with_obs_result_blocking<F, T>(&self, operation: F) -> anyhow::Result<T>
346    where
347        F: FnOnce() -> T + Send + 'static,
348        T: Send + 'static,
349    {
350        let (tx, rx) = oneshot::channel();
351
352        // Create a wrapper closure that boxes the result as Any
353        let wrapper = move || -> Box<dyn std::any::Any + Send> {
354            let result = operation();
355            Box::new(result)
356        };
357
358        self.command_sender
359            .send(ObsCommand::Execute(Box::new(wrapper), tx))
360            .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
361
362        let result = rx
363            .blocking_recv()
364            .map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
365
366        // Downcast the Any type back to T
367        result
368            .downcast::<T>()
369            .map(|boxed| *boxed)
370            .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))
371    }
372
373    /// Executes an operation on the OBS thread and returns a result (async version)
374    ///
375    /// This method dispatches a task to the OBS thread and asynchronously waits for the result.
376    ///
377    /// # Parameters
378    ///
379    /// * `operation` - A function to execute on the OBS thread
380    ///
381    /// # Returns
382    ///
383    /// A `Result` containing the value returned by the operation
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use libobs_wrapper::runtime::ObsRuntime;
389    ///
390    /// async fn example(runtime: &ObsRuntime) {
391    ///     let version = runtime.run_with_obs_result(|| {
392    ///         // This code runs on the OBS thread
393    ///         unsafe { libobs::obs_get_version_string() }
394    ///     }).await.unwrap();
395    ///     
396    ///     println!("OBS Version: {:?}", version);
397    /// }
398    /// ```
399    #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
400    pub async fn run_with_obs_result<F, T>(&self, operation: F) -> anyhow::Result<T>
401    where
402        F: FnOnce() -> T + Send + 'static,
403        T: Send + 'static,
404    {
405        let (tx, rx) = oneshot::channel();
406
407        // Create a wrapper closure that boxes the result as Any
408        let wrapper = move || -> Box<dyn std::any::Any + Send> {
409            let result = operation();
410            Box::new(result)
411        };
412
413        self.command_sender
414            .send(ObsCommand::Execute(Box::new(wrapper), tx))
415            .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
416
417        let result =
418            oneshot_rx_recv!(rx).map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
419
420        // Downcast the Any type back to T
421        let res = result
422            .downcast::<T>()
423            .map(|boxed| *boxed)
424            .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))?;
425
426        Ok(res)
427    }
428
429    /// Initializes the libobs context and prepares it for recording.
430    ///
431    /// This method handles core OBS initialization including:
432    /// - Starting up the OBS core (`obs_startup`)
433    /// - Resetting video and audio subsystems 
434    /// - Loading OBS modules
435    /// 
436    /// # Parameters
437    ///
438    /// * `info` - The startup configuration for OBS
439    ///
440    /// # Returns
441    ///
442    /// A `Result` containing the updated startup info and loaded modules, or an error
443    fn initialize_inner(mut info: StartupInfo) -> Result<(StartupInfo, ObsModules), ObsError> {
444        // Checks that there are no other threads
445        // using libobs using a static Mutex.
446        //
447        // Fun fact: this code caused a huge debate
448        // about whether AtomicBool is UB or whatever
449        // in the Rust Programming Discord server.
450        // I didn't read too closely into it because
451        // they were talking about what architecture
452        // fridges have or something.
453        //
454        // Since this function is not meant to be
455        // high-performance or called a thousand times,
456        // a Mutex is fine here.#
457        let mut mutex_value = mutex_blocking_lock!(OBS_THREAD_ID);
458
459        // Directly checks if the value of the
460        // Mutex is false. If true, then error.
461        // We've checked already but keeping this
462        if *mutex_value != None {
463            return Err(ObsError::ThreadFailure);
464        }
465
466        // If the Mutex is None, then change
467        // it to current thread ID so that no
468        // other thread can use libobs while
469        // the current thread is using it.
470        *mutex_value = Some(thread::current().id());
471
472        // Install DLL blocklist hook here
473
474        unsafe {
475            libobs::obs_init_win32_crash_handler();
476        }
477
478        // Set logger, load debug privileges and crash handler
479        unsafe {
480            libobs::base_set_crash_handler(Some(main_crash_handler), std::ptr::null_mut());
481            load_debug_privilege();
482            libobs::base_set_log_handler(Some(extern_log_callback), std::ptr::null_mut());
483        }
484
485        let mut log_callback = LOGGER.lock().map_err(|_e| ObsError::MutexFailure)?;
486
487        *log_callback = info.logger.take().expect("Logger can never be null");
488        drop(log_callback);
489
490        // Locale will only be used internally by
491        // libobs for logging purposes, making it
492        // unnecessary to support other languages.
493        let locale_str = ObsString::new("en-US");
494        let startup_status =
495            unsafe { libobs::obs_startup(locale_str.as_ptr().0, ptr::null(), ptr::null_mut()) };
496
497        let version = unsafe { libobs::obs_get_version_string() };
498        let version_cstr = unsafe { CStr::from_ptr(version) };
499        let version_str = version_cstr.to_string_lossy().into_owned();
500
501        internal_log_global(ObsLogLevel::Info, format!("OBS {}", version_str));
502        internal_log_global(
503            ObsLogLevel::Info,
504            "---------------------------------".to_string(),
505        );
506
507        if !startup_status {
508            return Err(ObsError::Failure);
509        }
510
511        let mut obs_modules = ObsModules::add_paths(&info.startup_paths);
512
513        // Note that audio is meant to only be reset
514        // once. See the link below for information.
515        //
516        // https://docs.obsproject.com/frontends
517        unsafe {
518            libobs::obs_reset_audio2(info.obs_audio_info.as_ptr().0);
519        }
520
521        // Resets the video context. Note that this
522        // is similar to Self::reset_video, but it
523        // does not call that function because the
524        // ObsContext struct is not created yet,
525        // and also because there is no need to free
526        // anything tied to the OBS context.
527        let reset_video_status = num_traits::FromPrimitive::from_i32(unsafe {
528            libobs::obs_reset_video(info.obs_video_info.as_ptr())
529        });
530
531        let reset_video_status = match reset_video_status {
532            Some(x) => x,
533            None => ObsResetVideoStatus::Failure,
534        };
535
536        if reset_video_status != ObsResetVideoStatus::Success {
537            return Err(ObsError::ResetVideoFailure(reset_video_status));
538        }
539
540        obs_modules.load_modules();
541
542        internal_log_global(
543            ObsLogLevel::Info,
544            "==== Startup complete ===============================================".to_string(),
545        );
546
547        Ok((info, obs_modules))
548    }
549
550    /// Shuts down the OBS context and cleans up resources
551    ///
552    /// This method performs a clean shutdown of OBS, including:
553    /// - Removing sources from output channels
554    /// - Calling `obs_shutdown` to clean up OBS resources
555    /// - Removing log and crash handlers
556    /// - Checking for memory leaks
557    fn shutdown_inner() {
558        // Clean up sources
559        for i in 0..libobs::MAX_CHANNELS {
560            unsafe { libobs::obs_set_output_source(i, ptr::null_mut()) };
561        }
562
563        unsafe { libobs::obs_shutdown() }
564
565        let r = LOGGER.lock();
566        match r {
567            Ok(mut logger) => {
568                logger.log(ObsLogLevel::Info, "OBS context shutdown.".to_string());
569                let allocs = unsafe { libobs::bnum_allocs() };
570
571                // Increasing this to 1 because of whats described below
572                let mut notice = "";
573                let level = if allocs > 1 {
574                    ObsLogLevel::Error
575                } else {
576                    notice = " (this is an issue in the OBS source code that cannot be fixed)";
577                    ObsLogLevel::Info
578                };
579                // One memory leak is expected here because OBS does not free array elements of the obs_data_path when calling obs_add_data_path
580                // even when obs_remove_data_path is called. This is a bug in OBS.
581                logger.log(level, format!("Number of memory leaks: {}{}", allocs, notice))
582            }
583            Err(_) => {
584                println!("OBS context shutdown. (but couldn't lock logger)");
585            }
586        }
587
588        unsafe {
589            // Clean up log and crash handler
590            libobs::base_set_crash_handler(None, std::ptr::null_mut());
591            libobs::base_set_log_handler(None, std::ptr::null_mut());
592        }
593
594        let mut mutex_value = mutex_blocking_lock!(OBS_THREAD_ID);
595        *mutex_value = None;
596    }
597}
598
599/// Guard object to ensure proper cleanup when the runtime is dropped
600///
601/// This guard ensures that when the last reference to the runtime is dropped,
602/// the OBS thread is properly terminated and all resources are cleaned up.
603#[derive(Debug)]
604pub struct _ObsRuntimeGuard {
605    /// Thread handle for the OBS thread
606    handle: Arc<Mutex<Option<JoinHandle<()>>>>,
607    /// Sender channel for the OBS thread
608    command_sender: Arc<Sender<ObsCommand>>,
609}
610
611impl _ObsRuntimeGuard {
612    /// Shutdown the OBS runtime and terminate the thread
613    ///
614    /// This method sends a terminate command to the OBS thread and waits
615    /// for it to complete its shutdown process.
616    ///
617    /// # Returns
618    ///
619    /// A `Result` indicating success or failure
620    #[cfg_attr(feature = "blocking", remove_async_await::remove_async_await)]
621    async fn shutdown(&mut self) -> anyhow::Result<()> {
622        self.command_sender
623            .send(ObsCommand::Terminate)
624            .map_err(|_| anyhow::anyhow!("Failed to send termination command to OBS thread"))?;
625
626        // Wait for the thread to finish
627        let mut handle = self.handle.lock().await;
628        let handle = handle.take().expect("Handle can not be empty");
629
630        if let Err(err) = handle.join() {
631            return Err(anyhow::anyhow!("OBS thread panicked: {:?}", err));
632        }
633        Ok(())
634    }
635}
636
637impl Drop for _ObsRuntimeGuard {
638    /// Ensures the OBS thread is properly shut down when the runtime is dropped
639    fn drop(&mut self) {
640        #[cfg(feature = "blocking")]
641        let r = self.shutdown();
642
643        #[cfg(not(feature = "blocking"))]
644        let r = futures::executor::block_on(self.shutdown());
645
646        if thread::panicking() {
647            return;
648        }
649
650        r.unwrap();
651    }
652}