Rust Wrapper & Token Buckets¶
vozduxan_stream.rs is the safe Rust layer over the raw C API. It owns the C++ session pointer, manages three concurrent stream slots (token buckets), and exposes Tauri command handlers.
Source: src-tauri/src/vozduxan_stream.rs
VozduxanSessionInner¶
The core struct owns the raw C pointer and all runtime state:
struct VozduxanSessionInner {
ptr: *mut ffi::VozduxanSession,
// Three token buckets — see below
current_token: Mutex<Option<String>>,
prefetch_token: Mutex<Option<String>>,
warm_prefetch_token: Mutex<Option<String>>,
// Cancellation flag: set true to discard the next prepare result
prepare_cancelled: AtomicBool,
// Version counter: bumped on every prepare(), stale results compare against it
prepare_version: AtomicU64,
// Release counter: evict() is called every 10th release
evict_counter: AtomicU64,
// Keeps the Arc alive so log_userdata raw pointer stays valid
_debug_log_arc: Arc<AppDebugLog>,
}
VozduxanStreamState is the public handle that Tauri's managed state system holds:
pub struct VozduxanStreamState {
inner: Arc<VozduxanSessionInner>,
pub debug_log: Arc<AppDebugLog>,
}
Token buckets¶
neegde can have up to three concurrent stream slots open simultaneously:
| Bucket | Field | Purpose | Evicts on new prepare? |
|---|---|---|---|
current_token |
active playback stream | The track the user is currently hearing | Yes — old token released when new prepare completes |
prefetch_token |
queue position +1 | Next track, warmed up silently while current plays | Yes — old prefetch released when new next-track prefetch starts |
warm_prefetch_token |
queue position +2 | Second-ahead warm-up | Yes — old warm released when second-ahead changes |
The warm bucket exists specifically to protect prefetch_token. Without it, warming up queue+2 would evict queue+1, defeating the purpose of prefetch.
Queue: [▶ NOW PLAYING] [+1 prefetch] [+2 warm] [+3] ...
↑ ↑ ↑
current_token prefetch_token warm_prefetch_token
The flag warm_only: bool in prefetch_next() routes to either prefetch_token or warm_prefetch_token:
if warm_only {
// Parks in warm_prefetch_token — never evicts the real next-track prefetch
let old = inner.warm_prefetch_token.lock().unwrap().take();
// ... release old, store new
} else {
// Parks in prefetch_token — the true "next track" slot
let old = inner.prefetch_token.lock().unwrap().take();
// ... release old, store new
}
prepare() — active stream¶
prepare() starts a new active stream and handles version tracking, cancellation, and token rotation:
Version tracking¶
Every prepare() call bumps prepare_version and captures the value before entering spawn_blocking:
self.inner.prepare_version.fetch_add(1, Ordering::AcqRel);
let my_version = self.inner.prepare_version.load(Ordering::Acquire);
When the blocking call returns, it checks whether another prepare() was started in the meantime:
if inner.prepare_version.load(Ordering::Acquire) != my_version {
// A newer prepare() won — release the stale result and return Err
if info.error == ffi::VozduxanError::Ok {
let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
}
return Err("Отменено".into());
}
This handles the race where the user clicks a different track while a slow DHT resolution is in progress. The first prepare's result is discarded without it ever touching current_token.
Cancellation flag¶
torrent_prepare_cancel sets prepare_cancelled = true. The next prepare result checks this flag:
if inner.prepare_cancelled.load(Ordering::Relaxed) {
// Prepare finished but caller gave up — release result silently
if info.error == ffi::VozduxanError::Ok {
let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
}
return Err("Отменено".into());
}
Token rotation (mutex release before C++ call)¶
The old token must be released after the new prepare succeeds. The implementation deliberately drops the mutex lock before calling vozduxan_stream_release, which blocks for ~100 ms:
// Take the old token and drop the mutex BEFORE calling into C++.
// vozduxan_stream_release joins the priority thread (~100 ms); holding
// current_token locked during that call would stall any concurrent
// torrent_release_stream for the full join duration.
let old_token = inner.current_token.lock().unwrap().take();
if let Some(ref old) = old_token {
let old_c = CString::new(old.as_str()).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, old_c.as_ptr()) };
}
*inner.current_token.lock().unwrap() = Some(token.clone());
The sequence is: lock → take (sets to None) → unlock → call C++ → lock → store new token.
dispose() — release all¶
torrent_dispose_preview releases all three buckets. To avoid holding any mutex during the 100 ms C++ calls:
// Collect all tokens and drop ALL mutex guards before calling into C++
let tokens: Vec<(&'static str, String)> = {
let mut v = Vec::new();
if let Some(t) = inner.current_token.lock().unwrap().take() { v.push(("current", t)); }
if let Some(t) = inner.prefetch_token.lock().unwrap().take() { v.push(("prefetch", t)); }
if let Some(t) = inner.warm_prefetch_token.lock().unwrap().take() { v.push(("warm", t)); }
v
};
// All three locks are now released. Call C++ without any lock held.
for (_, token) in &tokens {
let c = CString::new(token.as_str()).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, c.as_ptr()) };
}
Without this pattern, a dispose() with three tokens would hold all three mutexes for 300 ms combined, serializing every concurrent Tauri command that touches those buckets.
Progress callback¶
vozduxan_stream_prepare accepts an optional C progress callback. Rust allocates a ProgressCtx on the heap and passes a raw pointer:
let ctx = Box::new(ProgressCtx { app: app_clone });
let ctx_ptr = Box::into_raw(ctx) as *mut c_void;
let info = unsafe {
ffi::vozduxan_stream_prepare(
inner.ptr, magnet_c.as_ptr(), ...,
Some(on_progress),
ctx_ptr, // ← raw pointer to ProgressCtx
)
};
// Free the ProgressCtx — callback will not be called after prepare returns
let _ = unsafe { Box::from_raw(ctx_ptr as *mut ProgressCtx) };
The callback fires on the C++ thread and emits torrent-prepare-progress Tauri events to the frontend.
spawn_blocking discipline¶
All calls that invoke C++ blocking functions (vozduxan_stream_prepare, vozduxan_stream_release) are wrapped in tokio::task::spawn_blocking. This keeps the tokio async executor thread free while C++ does work (piece downloads, priority thread joins).
The inner: Arc<VozduxanSessionInner> is cloned before entering spawn_blocking so it can cross the thread boundary:
let inner = self.inner.clone();
let dlog_arc = self.debug_log.clone();
let url = tokio::task::spawn_blocking(move || -> Result<String, String> {
// ... call C++ with inner.ptr
})
.await
.map_err(|e| format!("spawn_blocking error: {e}"))??;
The double ? unwraps both the JoinError from spawn_blocking and the Result<String, String> returned by the closure.
Eviction counter¶
evict_counter is an AtomicU64 incremented on every stream release. Every 10th release triggers vozduxan_session_evict:
let n = inner.evict_counter.fetch_add(1, Ordering::Relaxed) + 1;
if n % 10 == 0 {
unsafe { ffi::vozduxan_session_evict(inner.ptr) };
}
This amortizes the cost of clearing the libtorrent session across releases rather than paying it on every single release.
Thread safety¶
VozduxanSessionInner is Send + Sync by an explicit unsafe impl. This is correct because:
ptris guarded by the invariant that all C++ calls are serialized through the C API's own internal lockingcurrent_token,prefetch_token,warm_prefetch_tokenareMutex<Option<String>>prepare_cancelled,prepare_version,evict_counterare atomics