Skip to content

Rust Wrapper & Token Buckets

vozduxan_stream.rs is the safe Rust layer over the raw C API. It owns the C++ session pointer, manages three concurrent stream slots (token buckets), and exposes Tauri command handlers.

Source: src-tauri/src/vozduxan_stream.rs


VozduxanSessionInner

The core struct owns the raw C pointer and all runtime state:

struct VozduxanSessionInner {
    ptr: *mut ffi::VozduxanSession,

    // Three token buckets — see below
    current_token:      Mutex<Option<String>>,
    prefetch_token:     Mutex<Option<String>>,
    warm_prefetch_token: Mutex<Option<String>>,

    // Cancellation flag: set true to discard the next prepare result
    prepare_cancelled: AtomicBool,

    // Version counter: bumped on every prepare(), stale results compare against it
    prepare_version: AtomicU64,

    // Release counter: evict() is called every 10th release
    evict_counter: AtomicU64,

    // Keeps the Arc alive so log_userdata raw pointer stays valid
    _debug_log_arc: Arc<AppDebugLog>,
}

VozduxanStreamState is the public handle that Tauri's managed state system holds:

pub struct VozduxanStreamState {
    inner: Arc<VozduxanSessionInner>,
    pub debug_log: Arc<AppDebugLog>,
}

Token buckets

neegde can have up to three concurrent stream slots open simultaneously:

Bucket Field Purpose Evicts on new prepare?
current_token active playback stream The track the user is currently hearing Yes — old token released when new prepare completes
prefetch_token queue position +1 Next track, warmed up silently while current plays Yes — old prefetch released when new next-track prefetch starts
warm_prefetch_token queue position +2 Second-ahead warm-up Yes — old warm released when second-ahead changes

The warm bucket exists specifically to protect prefetch_token. Without it, warming up queue+2 would evict queue+1, defeating the purpose of prefetch.

Queue:  [▶ NOW PLAYING] [+1 prefetch] [+2 warm] [+3] ...
                  ↑              ↑          ↑
           current_token  prefetch_token  warm_prefetch_token

The flag warm_only: bool in prefetch_next() routes to either prefetch_token or warm_prefetch_token:

if warm_only {
    // Parks in warm_prefetch_token — never evicts the real next-track prefetch
    let old = inner.warm_prefetch_token.lock().unwrap().take();
    // ... release old, store new
} else {
    // Parks in prefetch_token — the true "next track" slot
    let old = inner.prefetch_token.lock().unwrap().take();
    // ... release old, store new
}

prepare() — active stream

prepare() starts a new active stream and handles version tracking, cancellation, and token rotation:

Version tracking

Every prepare() call bumps prepare_version and captures the value before entering spawn_blocking:

self.inner.prepare_version.fetch_add(1, Ordering::AcqRel);
let my_version = self.inner.prepare_version.load(Ordering::Acquire);

When the blocking call returns, it checks whether another prepare() was started in the meantime:

if inner.prepare_version.load(Ordering::Acquire) != my_version {
    // A newer prepare() won — release the stale result and return Err
    if info.error == ffi::VozduxanError::Ok {
        let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
        unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
    }
    return Err("Отменено".into());
}

This handles the race where the user clicks a different track while a slow DHT resolution is in progress. The first prepare's result is discarded without it ever touching current_token.

Cancellation flag

torrent_prepare_cancel sets prepare_cancelled = true. The next prepare result checks this flag:

if inner.prepare_cancelled.load(Ordering::Relaxed) {
    // Prepare finished but caller gave up — release result silently
    if info.error == ffi::VozduxanError::Ok {
        let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
        unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
    }
    return Err("Отменено".into());
}

Token rotation (mutex release before C++ call)

The old token must be released after the new prepare succeeds. The implementation deliberately drops the mutex lock before calling vozduxan_stream_release, which blocks for ~100 ms:

// Take the old token and drop the mutex BEFORE calling into C++.
// vozduxan_stream_release joins the priority thread (~100 ms); holding
// current_token locked during that call would stall any concurrent
// torrent_release_stream for the full join duration.
let old_token = inner.current_token.lock().unwrap().take();
if let Some(ref old) = old_token {
    let old_c = CString::new(old.as_str()).unwrap();
    unsafe { ffi::vozduxan_stream_release(inner.ptr, old_c.as_ptr()) };
}
*inner.current_token.lock().unwrap() = Some(token.clone());

The sequence is: lock → take (sets to None) → unlock → call C++ → lock → store new token.


dispose() — release all

torrent_dispose_preview releases all three buckets. To avoid holding any mutex during the 100 ms C++ calls:

// Collect all tokens and drop ALL mutex guards before calling into C++
let tokens: Vec<(&'static str, String)> = {
    let mut v = Vec::new();
    if let Some(t) = inner.current_token.lock().unwrap().take()       { v.push(("current", t)); }
    if let Some(t) = inner.prefetch_token.lock().unwrap().take()      { v.push(("prefetch", t)); }
    if let Some(t) = inner.warm_prefetch_token.lock().unwrap().take() { v.push(("warm", t)); }
    v
};
// All three locks are now released. Call C++ without any lock held.
for (_, token) in &tokens {
    let c = CString::new(token.as_str()).unwrap();
    unsafe { ffi::vozduxan_stream_release(inner.ptr, c.as_ptr()) };
}

Without this pattern, a dispose() with three tokens would hold all three mutexes for 300 ms combined, serializing every concurrent Tauri command that touches those buckets.


Progress callback

vozduxan_stream_prepare accepts an optional C progress callback. Rust allocates a ProgressCtx on the heap and passes a raw pointer:

let ctx = Box::new(ProgressCtx { app: app_clone });
let ctx_ptr = Box::into_raw(ctx) as *mut c_void;

let info = unsafe {
    ffi::vozduxan_stream_prepare(
        inner.ptr, magnet_c.as_ptr(), ...,
        Some(on_progress),
        ctx_ptr,      // ← raw pointer to ProgressCtx
    )
};

// Free the ProgressCtx — callback will not be called after prepare returns
let _ = unsafe { Box::from_raw(ctx_ptr as *mut ProgressCtx) };

The callback fires on the C++ thread and emits torrent-prepare-progress Tauri events to the frontend.


spawn_blocking discipline

All calls that invoke C++ blocking functions (vozduxan_stream_prepare, vozduxan_stream_release) are wrapped in tokio::task::spawn_blocking. This keeps the tokio async executor thread free while C++ does work (piece downloads, priority thread joins).

The inner: Arc<VozduxanSessionInner> is cloned before entering spawn_blocking so it can cross the thread boundary:

let inner = self.inner.clone();
let dlog_arc = self.debug_log.clone();

let url = tokio::task::spawn_blocking(move || -> Result<String, String> {
    // ... call C++ with inner.ptr
})
.await
.map_err(|e| format!("spawn_blocking error: {e}"))??;

The double ? unwraps both the JoinError from spawn_blocking and the Result<String, String> returned by the closure.


Eviction counter

evict_counter is an AtomicU64 incremented on every stream release. Every 10th release triggers vozduxan_session_evict:

let n = inner.evict_counter.fetch_add(1, Ordering::Relaxed) + 1;
if n % 10 == 0 {
    unsafe { ffi::vozduxan_session_evict(inner.ptr) };
}

This amortizes the cost of clearing the libtorrent session across releases rather than paying it on every single release.


Thread safety

VozduxanSessionInner is Send + Sync by an explicit unsafe impl. This is correct because:

  • ptr is guarded by the invariant that all C++ calls are serialized through the C API's own internal locking
  • current_token, prefetch_token, warm_prefetch_token are Mutex<Option<String>>
  • prepare_cancelled, prepare_version, evict_counter are atomics