Rust-обёртка и token buckets¶
vozduxan_stream.rs — это безопасный Rust-слой над сырым C API. Он владеет указателем на C++-session, управляет тремя одновременными слотами потоков (token buckets) и предоставляет обработчики Tauri-команд.
Источник: src-tauri/src/vozduxan_stream.rs
VozduxanSessionInner¶
Основная структура владеет сырым C-указателем и всем состоянием времени выполнения:
struct VozduxanSessionInner {
ptr: *mut ffi::VozduxanSession,
// Три token bucket — см. ниже
current_token: Mutex<Option<String>>,
prefetch_token: Mutex<Option<String>>,
warm_prefetch_token: Mutex<Option<String>>,
// Флаг отмены: установите в true, чтобы отбросить следующий результат prepare
prepare_cancelled: AtomicBool,
// Счётчик версий: увеличивается при каждом prepare(); устаревшие результаты сравниваются с ним
prepare_version: AtomicU64,
// Счётчик release: evict() вызывается каждый 10-й release
evict_counter: AtomicU64,
// Удерживает Arc живым, чтобы сырой указатель log_userdata оставался валидным
_debug_log_arc: Arc<AppDebugLog>,
}
VozduxanStreamState — публичный дескриптор, который хранит система управляемого состояния Tauri:
pub struct VozduxanStreamState {
inner: Arc<VozduxanSessionInner>,
pub debug_log: Arc<AppDebugLog>,
}
Token buckets¶
neegde может одновременно держать открытыми до трёх слотов потоков:
| Bucket | Поле | Назначение | Вытесняет при новом prepare? |
|---|---|---|---|
current_token |
активный поток воспроизведения | Трек, который пользователь слышит в данный момент | Да — старый token освобождается при завершении нового prepare |
prefetch_token |
позиция в очереди +1 | Следующий трек, прогревается тихо во время воспроизведения текущего | Да — старый prefetch освобождается при запуске нового prefetch следующего трека |
warm_prefetch_token |
позиция в очереди +2 | Прогрев второго трека вперёд | Да — старый warm освобождается при изменении второго трека вперёд |
Warm bucket существует специально для защиты prefetch_token. Без него прогрев queue+2 вытеснял бы queue+1, что сводило бы на нет смысл prefetch.
Очередь: [▶ СЕЙЧАС] [+1 prefetch] [+2 warm] [+3] ...
↑ ↑ ↑
current_token prefetch_token warm_prefetch_token
Флаг warm_only: bool в prefetch_next() направляет вызов либо в prefetch_token, либо в warm_prefetch_token:
if warm_only {
// Паркуется в warm_prefetch_token — никогда не вытесняет настоящий prefetch следующего трека
let old = inner.warm_prefetch_token.lock().unwrap().take();
// ... освобождаем старый, сохраняем новый
} else {
// Паркуется в prefetch_token — настоящий слот «следующего трека»
let old = inner.prefetch_token.lock().unwrap().take();
// ... освобождаем старый, сохраняем новый
}
prepare() — активный поток¶
prepare() запускает новый активный поток и обрабатывает версионирование, отмену и ротацию token:
Версионирование¶
Каждый вызов prepare() увеличивает prepare_version и захватывает значение перед входом в spawn_blocking:
self.inner.prepare_version.fetch_add(1, Ordering::AcqRel);
let my_version = self.inner.prepare_version.load(Ordering::Acquire);
Когда блокирующий вызов возвращается, он проверяет, не был ли запущен другой prepare() за это время:
if inner.prepare_version.load(Ordering::Acquire) != my_version {
// Победил более новый prepare() — освобождаем устаревший результат и возвращаем Err
if info.error == ffi::VozduxanError::Ok {
let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
}
return Err("Отменено".into());
}
Это обрабатывает гонку, когда пользователь кликает на другой трек во время медленной DHT-резолюции. Результат первого prepare отбрасывается, так и не затронув current_token.
Флаг отмены¶
torrent_prepare_cancel устанавливает prepare_cancelled = true. Следующий результат prepare проверяет этот флаг:
if inner.prepare_cancelled.load(Ordering::Relaxed) {
// Prepare завершился, но вызывающий сдался — тихо освобождаем результат
if info.error == ffi::VozduxanError::Ok {
let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
}
return Err("Отменено".into());
}
Ротация token (снятие mutex до вызова C++)¶
Старый token должен быть освобождён после успешного завершения нового prepare. Реализация намеренно снимает блокировку mutex перед вызовом vozduxan_stream_release, который блокирует на ~100 мс:
// Берём старый token и снимаем mutex ДО вызова в C++.
// vozduxan_stream_release ожидает завершения priority-потока (~100 мс); удержание
// current_token заблокированным во время этого вызова остановило бы любой конкурентный
// torrent_release_stream на всё время ожидания.
let old_token = inner.current_token.lock().unwrap().take();
if let Some(ref old) = old_token {
let old_c = CString::new(old.as_str()).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, old_c.as_ptr()) };
}
*inner.current_token.lock().unwrap() = Some(token.clone());
Последовательность: заблокировать → взять (устанавливает в None) → разблокировать → вызвать C++ → заблокировать → сохранить новый token.
dispose() — освобождение всех токенов¶
torrent_dispose_preview освобождает все три bucket. Чтобы не держать ни один mutex во время 100 мс C++-вызовов:
// Собираем все tokens и снимаем ВСЕ mutex guard'ы до вызова в C++
let tokens: Vec<(&'static str, String)> = {
let mut v = Vec::new();
if let Some(t) = inner.current_token.lock().unwrap().take() { v.push(("current", t)); }
if let Some(t) = inner.prefetch_token.lock().unwrap().take() { v.push(("prefetch", t)); }
if let Some(t) = inner.warm_prefetch_token.lock().unwrap().take() { v.push(("warm", t)); }
v
};
// Все три блокировки теперь сняты. Вызываем C++ без удержания блокировок.
for (_, token) in &tokens {
let c = CString::new(token.as_str()).unwrap();
unsafe { ffi::vozduxan_stream_release(inner.ptr, c.as_ptr()) };
}
Без этого паттерна dispose() с тремя tokens удерживал бы все три mutex суммарно 300 мс, сериализуя каждую конкурентную Tauri-команду, обращающуюся к этим bucket.
Progress callback¶
vozduxan_stream_prepare принимает опциональный C progress callback. Rust выделяет ProgressCtx в куче и передаёт сырой указатель:
let ctx = Box::new(ProgressCtx { app: app_clone });
let ctx_ptr = Box::into_raw(ctx) as *mut c_void;
let info = unsafe {
ffi::vozduxan_stream_prepare(
inner.ptr, magnet_c.as_ptr(), ...,
Some(on_progress),
ctx_ptr, // ← сырой указатель на ProgressCtx
)
};
// Освобождаем ProgressCtx — callback больше не будет вызываться после возврата prepare
let _ = unsafe { Box::from_raw(ctx_ptr as *mut ProgressCtx) };
Callback срабатывает в C++-потоке и генерирует Tauri-event torrent-prepare-progress для фронтенда.
Дисциплина spawn_blocking¶
Все вызовы, обращающиеся к блокирующим C++-функциям (vozduxan_stream_prepare, vozduxan_stream_release), оборачиваются в tokio::task::spawn_blocking. Это освобождает поток tokio async executor, пока C++ выполняет работу (загрузка кусков, ожидание завершения priority-потока).
inner: Arc<VozduxanSessionInner> клонируется перед входом в spawn_blocking, чтобы он мог пересечь границу потока:
let inner = self.inner.clone();
let dlog_arc = self.debug_log.clone();
let url = tokio::task::spawn_blocking(move || -> Result<String, String> {
// ... вызываем C++ через inner.ptr
})
.await
.map_err(|e| format!("spawn_blocking error: {e}"))??;
Двойной ? разворачивает и JoinError от spawn_blocking, и Result<String, String>, возвращаемый замыканием.
Счётчик вытеснений¶
evict_counter — это AtomicU64, увеличивающийся при каждом освобождении потока. Каждый 10-й release запускает vozduxan_session_evict:
let n = inner.evict_counter.fetch_add(1, Ordering::Relaxed) + 1;
if n % 10 == 0 {
unsafe { ffi::vozduxan_session_evict(inner.ptr) };
}
Это размазывает стоимость очистки libtorrent-session по release'ам, вместо того чтобы платить её за каждый отдельный release.
Потокобезопасность¶
VozduxanSessionInner реализует Send + Sync через явный unsafe impl. Это корректно, поскольку:
ptrзащищён инвариантом, что все C++-вызовы сериализуются через собственную внутреннюю блокировку C APIcurrent_token,prefetch_token,warm_prefetch_token— этоMutex<Option<String>>prepare_cancelled,prepare_version,evict_counter— атомики