Перейти к содержанию

Pipeline Stages

Pipeline преобразует плоский массив PipelineEntity[] от всех provider'ов в итоговый отсортированный, дедуплицированный, отфильтрованный набор результатов. Он запускается внутри SearchSession._flushImmediate() на каждом кадре анимации при наличии новых данных.

Источник: src/search/pipeline/


Интерфейс pipeline

export type PipelineEntity = { id: string; score?: number; mergedFrom?: number };

export type PipelineStage<T extends PipelineEntity = PipelineEntity> =
  (entities: T[], opts?: Record<string, unknown>) => T[];

Каждый stage получает полный массив и возвращает (возможно новый) массив. Stage'и — чистые функции; они не должны изменять entity из других session'ов.

export function defaultPipeline(): PipelineStage[] {
  return [normalizeStage, dedupStage, scoreStage, filterStage];
}

export function runPipeline<T extends PipelineEntity>(
  entities: T[],
  stages: PipelineStage<T>[],
): T[] {
  let out = entities;
  for (const stage of stages) out = stage(out);
  return out;
}

Stage 1: normalize

export function normalizeStage<T>(entities: T[]): T[] {
  return entities; // pass-through
}

В настоящее время no-op. Stage существует как выделенное место для будущей работы: разбор исполнителя/альбома из имён файлов, когда provider их не предоставил, унификация названий форматов ("mp3" → "MP3"), удаление тегов группы релизов вроде "[FLAC 24/96]" из названий альбомов.

Provider'ы уже эмитируют entity в достаточно нормализованном виде для текущего набора функций.


Stage 2: dedup

export function dedupStage<T extends { id: string; mergedFrom?: number }>(
  entities: T[]
): T[] {
  const byId = new Map<string, T>();
  for (const e of entities) {
    const existing = byId.get(e.id);
    if (!existing) { byId.set(e.id, e); continue; }
    if ((e.mergedFrom ?? 0) > (existing.mergedFrom ?? 0)) byId.set(e.id, e);
  }
  return Array.from(byId.values());
}

Дедупликация осуществляется по entity.id. Идентификаторы имеют префикс provider'а (rt:album:…, rt:track:…, slsk:track:…), поэтому коллизий между provider'ами здесь быть не может — один и тот же трек из RuTracker и SoulSeek представлен двумя различными entity.

Внутри одного provider'а mergedFrom (устанавливается provider'ом как счётчик batch'ей на момент создания entity) используется как разграничитель: более новая запись snapshot для того же id побеждает.

Примечание: Кросс-провайдерная дедупликация (объединение одного и того же трека из RuTracker и SoulSeek в одну entity с обоими источниками) — запланированная функциональность, которая пока не реализована. В настоящее время два provider'а создают отдельные entity для одной и той же песни.


Stage 3: score

export function scoreStage<T extends { score?: number }>(entities: T[]): T[] {
  for (const e of entities) e.score = 0;
  return entities;
}

Заглушка. Всем score'ам присваивается значение 0, поэтому итоговый порядок совпадает с порядком вставки (который определяется provider'ами: результаты RuTracker появляются перед результатами SoulSeek, поскольку RuTracker первым в реестре и его snapshot идёт первым в итерации Map.values()).

Реальный алгоритм scoring'а разработан, но ещё не реализован. Он будет взвешен по:

  • Качеству текстового совпадения между каноническим запросом и полями artist/title entity
  • Формату и битрейту (FLAC > MP3 > lossy, более высокий битрейт > более низкий)
  • Доступности (log(seeders + 1) для RuTracker, peer rank для SoulSeek)
  • Бонусу за наличие в обоих источниках (одна песня присутствует у обоих provider'ов)

Stage 4: filter

export function filterStage<T>(entities: T[]): T[] {
  return entities; // pass-through
}

Заглушка. Существующие фильтры на уровне интерфейса (проверка воспроизводимости, filter очереди SoulSeek peer) по-прежнему находятся в Results.vue. Они будут перенесены сюда при переписывании интерфейса для прямого потребления вывода pipeline.


Форма entity

Entity, создаваемые provider'ами, имеют следующую минимальную форму:

{
  id: string,            // уникальный идентификатор с префиксом provider'а
  type: "track" | "album",
  title: string,
  artist: string | null,
  albumTitle: string | null,
  format: string | null, // "FLAC", "MP3" и т.д.
  bitrate: number | null,
  duration: number | null,
  size: number | null,
  score: number,         // 0 до реализации scoring'а
  mergedFrom: number,    // индекс batch'а на момент создания (используется dedup'ом)
  sources: Array<{
    kind: "rutracker" | "soulseek",
    refs: { ... },  // специфичные для provider'а ссылки для воспроизведения
    raw: { ... },   // исходные данные для отладки / обложки
  }>,
}

Массив sources — ключевая точка расширения. Когда будет реализована кросс-провайдерная дедупликация, одна entity будет содержать источники от обоих provider'ов, и плеер выберет лучший из них исходя из текущих сетевых условий.