Перейти к содержанию

Rust-обёртка и token buckets

vozduxan_stream.rs — это безопасный Rust-слой над сырым C API. Он владеет указателем на C++-session, управляет тремя одновременными слотами потоков (token buckets) и предоставляет обработчики Tauri-команд.

Источник: src-tauri/src/vozduxan_stream.rs


VozduxanSessionInner

Основная структура владеет сырым C-указателем и всем состоянием времени выполнения:

struct VozduxanSessionInner {
    ptr: *mut ffi::VozduxanSession,

    // Три token bucket — см. ниже
    current_token:      Mutex<Option<String>>,
    prefetch_token:     Mutex<Option<String>>,
    warm_prefetch_token: Mutex<Option<String>>,

    // Флаг отмены: установите в true, чтобы отбросить следующий результат prepare
    prepare_cancelled: AtomicBool,

    // Счётчик версий: увеличивается при каждом prepare(); устаревшие результаты сравниваются с ним
    prepare_version: AtomicU64,

    // Счётчик release: evict() вызывается каждый 10-й release
    evict_counter: AtomicU64,

    // Удерживает Arc живым, чтобы сырой указатель log_userdata оставался валидным
    _debug_log_arc: Arc<AppDebugLog>,
}

VozduxanStreamState — публичный дескриптор, который хранит система управляемого состояния Tauri:

pub struct VozduxanStreamState {
    inner: Arc<VozduxanSessionInner>,
    pub debug_log: Arc<AppDebugLog>,
}

Token buckets

neegde может одновременно держать открытыми до трёх слотов потоков:

Bucket Поле Назначение Вытесняет при новом prepare?
current_token активный поток воспроизведения Трек, который пользователь слышит в данный момент Да — старый token освобождается при завершении нового prepare
prefetch_token позиция в очереди +1 Следующий трек, прогревается тихо во время воспроизведения текущего Да — старый prefetch освобождается при запуске нового prefetch следующего трека
warm_prefetch_token позиция в очереди +2 Прогрев второго трека вперёд Да — старый warm освобождается при изменении второго трека вперёд

Warm bucket существует специально для защиты prefetch_token. Без него прогрев queue+2 вытеснял бы queue+1, что сводило бы на нет смысл prefetch.

Очередь:  [▶ СЕЙЧАС] [+1 prefetch] [+2 warm] [+3] ...
                ↑              ↑          ↑
         current_token  prefetch_token  warm_prefetch_token

Флаг warm_only: bool в prefetch_next() направляет вызов либо в prefetch_token, либо в warm_prefetch_token:

if warm_only {
    // Паркуется в warm_prefetch_token — никогда не вытесняет настоящий prefetch следующего трека
    let old = inner.warm_prefetch_token.lock().unwrap().take();
    // ... освобождаем старый, сохраняем новый
} else {
    // Паркуется в prefetch_token — настоящий слот «следующего трека»
    let old = inner.prefetch_token.lock().unwrap().take();
    // ... освобождаем старый, сохраняем новый
}

prepare() — активный поток

prepare() запускает новый активный поток и обрабатывает версионирование, отмену и ротацию token:

Версионирование

Каждый вызов prepare() увеличивает prepare_version и захватывает значение перед входом в spawn_blocking:

self.inner.prepare_version.fetch_add(1, Ordering::AcqRel);
let my_version = self.inner.prepare_version.load(Ordering::Acquire);

Когда блокирующий вызов возвращается, он проверяет, не был ли запущен другой prepare() за это время:

if inner.prepare_version.load(Ordering::Acquire) != my_version {
    // Победил более новый prepare() — освобождаем устаревший результат и возвращаем Err
    if info.error == ffi::VozduxanError::Ok {
        let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
        unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
    }
    return Err("Отменено".into());
}

Это обрабатывает гонку, когда пользователь кликает на другой трек во время медленной DHT-резолюции. Результат первого prepare отбрасывается, так и не затронув current_token.

Флаг отмены

torrent_prepare_cancel устанавливает prepare_cancelled = true. Следующий результат prepare проверяет этот флаг:

if inner.prepare_cancelled.load(Ordering::Relaxed) {
    // Prepare завершился, но вызывающий сдался — тихо освобождаем результат
    if info.error == ffi::VozduxanError::Ok {
        let token_c = CString::new(ffi::c_bytes_to_string(&info.token)).unwrap();
        unsafe { ffi::vozduxan_stream_release(inner.ptr, token_c.as_ptr()) };
    }
    return Err("Отменено".into());
}

Ротация token (снятие mutex до вызова C++)

Старый token должен быть освобождён после успешного завершения нового prepare. Реализация намеренно снимает блокировку mutex перед вызовом vozduxan_stream_release, который блокирует на ~100 мс:

// Берём старый token и снимаем mutex ДО вызова в C++.
// vozduxan_stream_release ожидает завершения priority-потока (~100 мс); удержание
// current_token заблокированным во время этого вызова остановило бы любой конкурентный
// torrent_release_stream на всё время ожидания.
let old_token = inner.current_token.lock().unwrap().take();
if let Some(ref old) = old_token {
    let old_c = CString::new(old.as_str()).unwrap();
    unsafe { ffi::vozduxan_stream_release(inner.ptr, old_c.as_ptr()) };
}
*inner.current_token.lock().unwrap() = Some(token.clone());

Последовательность: заблокировать → взять (устанавливает в None) → разблокировать → вызвать C++ → заблокировать → сохранить новый token.


dispose() — освобождение всех токенов

torrent_dispose_preview освобождает все три bucket. Чтобы не держать ни один mutex во время 100 мс C++-вызовов:

// Собираем все tokens и снимаем ВСЕ mutex guard'ы до вызова в C++
let tokens: Vec<(&'static str, String)> = {
    let mut v = Vec::new();
    if let Some(t) = inner.current_token.lock().unwrap().take()       { v.push(("current", t)); }
    if let Some(t) = inner.prefetch_token.lock().unwrap().take()      { v.push(("prefetch", t)); }
    if let Some(t) = inner.warm_prefetch_token.lock().unwrap().take() { v.push(("warm", t)); }
    v
};
// Все три блокировки теперь сняты. Вызываем C++ без удержания блокировок.
for (_, token) in &tokens {
    let c = CString::new(token.as_str()).unwrap();
    unsafe { ffi::vozduxan_stream_release(inner.ptr, c.as_ptr()) };
}

Без этого паттерна dispose() с тремя tokens удерживал бы все три mutex суммарно 300 мс, сериализуя каждую конкурентную Tauri-команду, обращающуюся к этим bucket.


Progress callback

vozduxan_stream_prepare принимает опциональный C progress callback. Rust выделяет ProgressCtx в куче и передаёт сырой указатель:

let ctx = Box::new(ProgressCtx { app: app_clone });
let ctx_ptr = Box::into_raw(ctx) as *mut c_void;

let info = unsafe {
    ffi::vozduxan_stream_prepare(
        inner.ptr, magnet_c.as_ptr(), ...,
        Some(on_progress),
        ctx_ptr,      // ← сырой указатель на ProgressCtx
    )
};

// Освобождаем ProgressCtx — callback больше не будет вызываться после возврата prepare
let _ = unsafe { Box::from_raw(ctx_ptr as *mut ProgressCtx) };

Callback срабатывает в C++-потоке и генерирует Tauri-event torrent-prepare-progress для фронтенда.


Дисциплина spawn_blocking

Все вызовы, обращающиеся к блокирующим C++-функциям (vozduxan_stream_prepare, vozduxan_stream_release), оборачиваются в tokio::task::spawn_blocking. Это освобождает поток tokio async executor, пока C++ выполняет работу (загрузка кусков, ожидание завершения priority-потока).

inner: Arc<VozduxanSessionInner> клонируется перед входом в spawn_blocking, чтобы он мог пересечь границу потока:

let inner = self.inner.clone();
let dlog_arc = self.debug_log.clone();

let url = tokio::task::spawn_blocking(move || -> Result<String, String> {
    // ... вызываем C++ через inner.ptr
})
.await
.map_err(|e| format!("spawn_blocking error: {e}"))??;

Двойной ? разворачивает и JoinError от spawn_blocking, и Result<String, String>, возвращаемый замыканием.


Счётчик вытеснений

evict_counter — это AtomicU64, увеличивающийся при каждом освобождении потока. Каждый 10-й release запускает vozduxan_session_evict:

let n = inner.evict_counter.fetch_add(1, Ordering::Relaxed) + 1;
if n % 10 == 0 {
    unsafe { ffi::vozduxan_session_evict(inner.ptr) };
}

Это размазывает стоимость очистки libtorrent-session по release'ам, вместо того чтобы платить её за каждый отдельный release.


Потокобезопасность

VozduxanSessionInner реализует Send + Sync через явный unsafe impl. Это корректно, поскольку:

  • ptr защищён инвариантом, что все C++-вызовы сериализуются через собственную внутреннюю блокировку C API
  • current_token, prefetch_token, warm_prefetch_token — это Mutex<Option<String>>
  • prepare_cancelled, prepare_version, evict_counter — атомики