feat: periodically sync jellyfin and emby collections

This commit is contained in:
Christian Benincasa
2026-04-09 12:53:12 -04:00
parent fcb3ce8b09
commit fcee08309f
9 changed files with 1713 additions and 660 deletions

View File

@@ -17,7 +17,11 @@ import { getHeapStatistics } from 'v8';
import z from 'zod/v4';
import { container } from '../container.ts';
import { TunarrWorkerPool } from '../services/TunarrWorkerPool.ts';
import { PlexCollectionScanner } from '../services/scanner/PlexCollectionScanner.ts';
import type { GenericExternalCollectionScanner } from '../services/scanner/ExternalCollectionScanner.ts';
import type { MediaSourceType } from '../db/schema/base.ts';
import { KEYS } from '../types/inject.ts';
import type { interfaces } from 'inversify';
import type { Maybe } from '../types/util.ts';
import { debugFfmpegApiRouter } from './debug/debugFfmpegApi.ts';
import { DebugJellyfinApiRouter } from './debug/debugJellyfinApi.js';
import { debugStreamApiRouter } from './debug/debugStreamApi.js';
@@ -448,18 +452,28 @@ export const debugApi: RouterPluginAsyncCallback = async (fastify) => {
return res
.status(404)
.send(`No media source with ID ${req.params.mediaSourceId}`);
} else if (mediaSource.type !== 'plex') {
return res
.status(400)
.send('Only Plex collection scanning is currently supported');
}
const scanRes = await container
.get<PlexCollectionScanner>(PlexCollectionScanner)
.scan({
mediaSourceId: mediaSource.uuid,
force: true,
});
const scannerFactory = container.get<
interfaces.SimpleFactory<
Maybe<GenericExternalCollectionScanner>,
[MediaSourceType]
>
>(KEYS.ExternalCollectionScannerFactory);
const scanner = scannerFactory(mediaSource.type);
if (!scanner) {
return res
.status(400)
.send(
`Collection scanning is not supported for media source type "${mediaSource.type}"`,
);
}
const scanRes = await scanner.scan({
mediaSourceId: mediaSource.uuid,
force: true,
});
return res.send(scanRes);
},

View File

@@ -11,10 +11,12 @@ import { getTunarrVersion } from '@/util/version.js';
import { seq } from '@tunarr/shared/util';
import type {
Actor,
Collection,
Director,
Folder,
Library,
MediaArtwork,
ProgramOrFolder,
Writer,
} from '@tunarr/types';
import type { MediaSourceStatus } from '@tunarr/types/api';
@@ -55,6 +57,7 @@ import { match, P } from 'ts-pattern';
import { v4 } from 'uuid';
import { z } from 'zod/v4';
import type { ArtworkType } from '../../db/schema/Artwork.ts';
import { MediaSourceType } from '../../db/schema/base.ts';
import type { ProgramType } from '../../db/schema/Program.ts';
import type { ProgramGroupingType } from '../../db/schema/ProgramGrouping.ts';
import type { Canonicalizer } from '../../services/Canonicalizer.ts';
@@ -855,6 +858,93 @@ export class EmbyApiClient extends MediaSourceApiClient<EmbyItemTypes> {
);
}
async *getAllLibraryCollections(
libraryId: string,
pageSize: number = 50,
): AsyncGenerator<Collection> {
const library = this.options.mediaSource.libraries.find(
(lib) => lib.externalKey === libraryId,
);
if (!library) {
throw new Error(
`Could not find matching library in DB for key = ${libraryId}`,
);
}
let page = 0;
let totalCount = Number.MAX_SAFE_INTEGER;
while (page * pageSize < totalCount) {
const result = await this.getRawItems(
null,
libraryId,
['BoxSet'],
[],
{ offset: page * pageSize, limit: pageSize },
);
if (result.isFailure()) {
throw result.error;
}
const data = result.get();
totalCount = data.TotalRecordCount;
for (const item of data.Items ?? []) {
if (item.Id && item.Name) {
yield {
type: 'collection',
externalId: item.Id,
title: item.Name,
sourceType: MediaSourceType.Emby,
childCount: item.ChildCount ?? undefined,
childType: undefined,
uuid: v4(),
mediaSourceId: this.options.mediaSource.uuid,
libraryId: library.uuid,
} satisfies Collection;
}
}
page++;
}
}
async *getCollectionItems(
collectionId: string,
pageSize: number = 50,
): AsyncGenerator<ProgramOrFolder> {
let page = 0;
let totalCount = Number.MAX_SAFE_INTEGER;
while (page * pageSize < totalCount) {
const result = await this.getRawItems(
null,
collectionId,
null,
[],
{ offset: page * pageSize, limit: pageSize },
{ recursive: false },
);
if (result.isFailure()) {
throw result.error;
}
const data = result.get();
totalCount = data.TotalRecordCount;
for (const item of data.Items ?? []) {
const converted = this.embyApiItemInjection(item);
if (converted) {
yield converted as ProgramOrFolder;
}
}
page++;
}
}
getMusicArtist(key: string): Promise<QueryResult<EmbyMusicArtist>> {
return this.getItemOfType(key, 'MusicArtist', (track) =>
this.embyApiMusicArtistInjection(track),

View File

@@ -12,11 +12,13 @@ import { getTunarrVersion } from '@/util/version.js';
import { seq } from '@tunarr/shared/util';
import type {
Actor,
Collection,
Director,
Folder,
Library,
MediaArtwork,
MediaChapter,
ProgramOrFolder,
Writer,
} from '@tunarr/types';
import type { MediaSourceStatus, PagedResult } from '@tunarr/types/api';
@@ -59,6 +61,7 @@ import type { NonEmptyArray } from 'ts-essentials';
import { match, P } from 'ts-pattern';
import { v4 } from 'uuid';
import type { ArtworkType } from '../../db/schema/Artwork.ts';
import { MediaSourceType } from '../../db/schema/base.ts';
import type { ProgramType } from '../../db/schema/Program.ts';
import type { ProgramGroupingType } from '../../db/schema/ProgramGrouping.ts';
import type { Canonicalizer } from '../../services/Canonicalizer.ts';
@@ -735,6 +738,91 @@ export class JellyfinApiClient extends MediaSourceApiClient<JellyfinItemTypes> {
);
}
async *getAllLibraryCollections(
libraryId: string,
pageSize: number = 50,
): AsyncGenerator<Collection> {
const library = this.options.mediaSource.libraries.find(
(lib) => lib.externalKey === libraryId,
);
if (!library) {
throw new Error(
`Could not find matching library in DB for key = ${libraryId}`,
);
}
let page = 0;
let totalCount = Number.MAX_SAFE_INTEGER;
while (page * pageSize < totalCount) {
const result = await this.getRawItems(
libraryId,
['BoxSet'],
[],
{ offset: page * pageSize, limit: pageSize },
);
if (result.isFailure()) {
throw result.error;
}
const data = result.get();
totalCount = data.TotalRecordCount;
for (const item of data.Items ?? []) {
if (item.Id && item.Name) {
yield {
type: 'collection',
externalId: item.Id,
title: item.Name,
sourceType: MediaSourceType.Jellyfin,
childCount: item.ChildCount ?? undefined,
childType: undefined,
uuid: v4(),
mediaSourceId: this.options.mediaSource.uuid,
libraryId: library.uuid,
} satisfies Collection;
}
}
page++;
}
}
async *getCollectionItems(
collectionId: string,
pageSize: number = 50,
): AsyncGenerator<ProgramOrFolder> {
let page = 0;
let totalCount = Number.MAX_SAFE_INTEGER;
while (page * pageSize < totalCount) {
const result = await this.getRawItems(
collectionId,
null,
[],
{ offset: page * pageSize, limit: pageSize },
{ recursive: false },
);
if (result.isFailure()) {
throw result.error;
}
const data = result.get();
totalCount = data.TotalRecordCount;
for (const item of data.Items ?? []) {
const converted = this.jelllyfinApiItemInjection(item);
if (converted) {
yield converted as ProgramOrFolder;
}
}
page++;
}
}
private async *getChildContents<ItemTypeT extends JellyfinItemKind, OutType>(
parentId: string,
itemType: ItemTypeT,

View File

@@ -47,6 +47,8 @@ import type {
GenericMediaSourceScannerFactory,
} from './scanner/MediaSourceScanner.ts';
import type { GenericMediaSourceTvShowLibraryScanner } from './scanner/MediaSourceTvShowLibraryScanner.ts';
import { EmbyCollectionScanner } from './scanner/EmbyCollectionScanner.ts';
import { JellyfinCollectionScanner } from './scanner/JellyfinCollectionScanner.ts';
import { PlexCollectionScanner } from './scanner/PlexCollectionScanner.ts';
import { PlexMediaSourceMovieScanner } from './scanner/PlexMediaSourceMovieScanner.ts';
import { PlexMediaSourceMusicScanner } from './scanner/PlexMediaSourceMusicScanner.ts';
@@ -196,6 +198,15 @@ export const ServicesModule = new ContainerModule((bind) => {
bind<GenericExternalCollectionScanner>(KEYS.ExternalCollectionScanner)
.to(PlexCollectionScanner)
.whenTargetNamed(MediaSourceType.Plex);
bind<GenericExternalCollectionScanner>(KEYS.ExternalCollectionScanner)
.to(JellyfinCollectionScanner)
.whenTargetNamed(MediaSourceType.Jellyfin);
bind<GenericExternalCollectionScanner>(KEYS.ExternalCollectionScanner)
.to(EmbyCollectionScanner)
.whenTargetNamed(MediaSourceType.Emby);
bindFactoryFunc(
bind,
KEYS.ExternalCollectionScannerFactory,

View File

@@ -0,0 +1,69 @@
import type { Collection, ProgramOrFolder } from '@tunarr/types';
import { inject, injectable } from 'inversify';
import { ExternalCollectionRepo } from '../../db/ExternalCollectionRepo.ts';
import { IProgramDB } from '../../db/interfaces/IProgramDB.ts';
import { MediaSourceDB } from '../../db/mediaSourceDB.ts';
import type { MediaSourceWithRelations } from '../../db/schema/derivedTypes.ts';
import type { RemoteSourceType } from '../../db/schema/base.ts';
import { TagRepo } from '../../db/TagRepo.ts';
import { MediaSourceApiFactory } from '../../external/MediaSourceApiFactory.ts';
import type { EmbyApiClient } from '../../external/emby/EmbyApiClient.ts';
import { KEYS } from '../../types/inject.ts';
import { Logger } from '../../util/logging/LoggerFactory.ts';
import { MeilisearchService } from '../MeilisearchService.ts';
import { ExternalCollectionScanner } from './ExternalCollectionScanner.ts';
@injectable()
export class EmbyCollectionScanner extends ExternalCollectionScanner<EmbyApiClient> {
get sourceType(): RemoteSourceType {
return 'emby';
}
constructor(
@inject(KEYS.Logger) logger: Logger,
@inject(MediaSourceDB) mediaSourceDB: MediaSourceDB,
@inject(MediaSourceApiFactory)
mediaSourceApiFactory: MediaSourceApiFactory,
@inject(ExternalCollectionRepo)
externalCollectionsRepo: ExternalCollectionRepo,
@inject(MeilisearchService)
searchService: MeilisearchService,
@inject(KEYS.ProgramDB)
programDB: IProgramDB,
@inject(TagRepo)
tagRepo: TagRepo,
) {
super(
logger,
mediaSourceDB,
mediaSourceApiFactory,
externalCollectionsRepo,
searchService,
programDB,
tagRepo,
);
}
protected getApiClient(
mediaSource: MediaSourceWithRelations,
): Promise<EmbyApiClient> {
return this.mediaSourceApiFactory.getEmbyApiClientForMediaSource(
mediaSource,
);
}
getAllLibraryCollections(
apiClient: EmbyApiClient,
libraryExternalKey: string,
): AsyncIterable<Collection> {
return apiClient.getAllLibraryCollections(libraryExternalKey);
}
getCollectionItems(
apiClient: EmbyApiClient,
_libraryId: string,
collectionId: string,
): AsyncIterable<ProgramOrFolder> {
return apiClient.getCollectionItems(collectionId);
}
}

View File

@@ -0,0 +1,641 @@
import type { Collection, ProgramOrFolder } from '@tunarr/types';
import { v4 } from 'uuid';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { ExternalCollectionRepo } from '../../db/ExternalCollectionRepo.ts';
import type { IProgramDB } from '../../db/interfaces/IProgramDB.ts';
import type { MediaSourceDB } from '../../db/mediaSourceDB.ts';
import type { ExternalCollection } from '../../db/schema/ExternalCollection.ts';
import type { MediaSourceLibraryOrm } from '../../db/schema/MediaSourceLibrary.ts';
import type { MediaSourceOrm } from '../../db/schema/MediaSource.ts';
import type { MediaSourceWithRelations } from '../../db/schema/derivedTypes.ts';
import type { MediaSourceApiFactory } from '../../external/MediaSourceApiFactory.ts';
import type { TagRepo } from '../../db/TagRepo.ts';
import type { Logger } from '../../util/logging/LoggerFactory.ts';
import type {
MeilisearchService,
ProgramSearchDocument,
TerminalProgramSearchDocument,
} from '../MeilisearchService.ts';
import type { Tag } from '../../db/schema/Tag.ts';
import type { RemoteSourceType } from '../../db/schema/base.ts';
import {
ExternalCollectionScanner,
type ExternalCollectionLibraryScanRequest,
type ExternalCollectionScanRequest,
} from './ExternalCollectionScanner.ts';
import type { MediaSourceId } from '@tunarr/shared';
// --- Test double: concrete subclass of the abstract scanner ---
type MockApiClient = Record<string, never>;
class TestCollectionScanner extends ExternalCollectionScanner<MockApiClient> {
readonly sourceType: RemoteSourceType = 'jellyfin';
getApiClient = vi.fn<[MediaSourceWithRelations], Promise<MockApiClient>>(
() => Promise.resolve({}),
);
getAllLibraryCollections = vi.fn<
[MockApiClient, string],
AsyncIterable<Collection>
>();
getCollectionItems = vi.fn<
[MockApiClient, string, string],
AsyncIterable<ProgramOrFolder>
>();
}
// --- Factory helpers for test data ---
function makeMediaSourceId(): MediaSourceId {
return v4() as MediaSourceId;
}
function makeMediaSource(
libraries: MediaSourceLibraryOrm[] = [],
overrides: Partial<MediaSourceOrm> = {},
): MediaSourceWithRelations {
return {
uuid: makeMediaSourceId(),
type: 'jellyfin',
name: 'Test Jellyfin' as never,
uri: 'http://localhost:8096',
accessToken: 'token',
index: 0,
createdAt: null,
updatedAt: null,
clientIdentifier: null,
sendChannelUpdates: false,
sendGuideUpdates: false,
username: null,
userId: null,
mediaType: null,
libraries,
paths: [],
replacePaths: [],
...overrides,
};
}
function makeLibrary(
mediaSourceId: MediaSourceId,
overrides: Partial<MediaSourceLibraryOrm> = {},
): MediaSourceLibraryOrm {
return {
uuid: v4(),
name: 'Movies',
mediaType: 'movies',
mediaSourceId,
lastScannedAt: null,
externalKey: 'lib-ext-1',
enabled: true,
...overrides,
};
}
function makeCollection(
overrides: Partial<Collection> = {},
): Collection {
return {
type: 'collection',
externalId: v4(),
title: 'Test Collection',
sourceType: 'jellyfin',
uuid: v4(),
mediaSourceId: v4(),
libraryId: v4(),
childType: undefined,
...overrides,
};
}
function makeExternalCollection(
overrides: Partial<ExternalCollection> = {},
): ExternalCollection {
return {
uuid: v4(),
externalKey: v4(),
libraryId: v4(),
mediaSourceId: v4() as MediaSourceId,
sourceType: 'jellyfin',
title: 'Test Collection',
...overrides,
};
}
function makeTag(tagName: string): Tag {
return { uuid: v4(), tag: tagName };
}
function makeMovieItem(externalId = v4()): ProgramOrFolder {
return { type: 'movie', externalId } as ProgramOrFolder;
}
function makeShowItem(externalId = v4()): ProgramOrFolder {
return { type: 'show', externalId } as ProgramOrFolder;
}
/** Yields all items from an array as an async iterable */
async function* asyncOf<T>(...items: T[]): AsyncGenerator<T> {
for (const item of items) {
yield item;
}
}
// --- Mock factory helpers ---
function makeMocks() {
const logger: Logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
trace: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger;
const mediaSourceDB: MediaSourceDB = {
getById: vi.fn(),
getLibrary: vi.fn(),
} as unknown as MediaSourceDB;
const mediaSourceApiFactory: MediaSourceApiFactory =
{} as unknown as MediaSourceApiFactory;
const externalCollectionsRepo: ExternalCollectionRepo = {
getByLibraryId: vi.fn().mockResolvedValue([]),
insertCollection: vi.fn().mockResolvedValue(undefined),
deleteCollection: vi.fn().mockResolvedValue(undefined),
getById: vi.fn().mockResolvedValue(undefined),
getCollectionProgramGroupings: vi.fn().mockResolvedValue([]),
getCollectionPrograms: vi.fn().mockResolvedValue([]),
} as unknown as ExternalCollectionRepo;
const searchService: MeilisearchService = {
waitForPendingIndexTasks: vi.fn().mockResolvedValue(undefined),
getPrograms: vi.fn().mockResolvedValue([]),
updatePrograms: vi.fn().mockResolvedValue(undefined),
} as unknown as MeilisearchService;
const programDB: IProgramDB = {
lookupByExternalIds: vi.fn().mockResolvedValue([]),
getProgramGroupingsByExternalIds: vi.fn().mockResolvedValue([]),
getChildren: vi.fn().mockResolvedValue({ results: [], total: 0 }),
getProgramGroupingDescendants: vi.fn().mockResolvedValue([]),
} as unknown as IProgramDB;
const tagRepo: TagRepo = {
upsertTag: vi.fn(),
tagPrograms: vi.fn().mockResolvedValue(undefined),
tagProgramGroupings: vi.fn().mockResolvedValue(undefined),
untagPrograms: vi.fn().mockResolvedValue(undefined),
untagProgramGroupings: vi.fn().mockResolvedValue(undefined),
} as unknown as TagRepo;
return {
logger,
mediaSourceDB,
mediaSourceApiFactory,
externalCollectionsRepo,
searchService,
programDB,
tagRepo,
};
}
// --- Tests ---
describe('ExternalCollectionScanner', () => {
let mocks: ReturnType<typeof makeMocks>;
let scanner: TestCollectionScanner;
beforeEach(() => {
mocks = makeMocks();
scanner = new TestCollectionScanner(
mocks.logger,
mocks.mediaSourceDB,
mocks.mediaSourceApiFactory,
mocks.externalCollectionsRepo,
mocks.searchService,
mocks.programDB,
mocks.tagRepo,
);
});
describe('scan()', () => {
it('throws when the media source is not found', async () => {
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(undefined);
const mediaSourceId = makeMediaSourceId();
await expect(
scanner.scan({ mediaSourceId }),
).rejects.toThrow(`Could not find media source with ID ${mediaSourceId}`);
});
it('returns early when there are no enabled libraries', async () => {
const mediaSource = makeMediaSource([
makeLibrary(makeMediaSourceId(), { enabled: false }),
]);
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
await scanner.scan({ mediaSourceId: mediaSource.uuid });
expect(scanner.getAllLibraryCollections).not.toHaveBeenCalled();
expect(mocks.externalCollectionsRepo.getByLibraryId).not.toHaveBeenCalled();
});
it('scans only enabled libraries when some are disabled', async () => {
const msId = makeMediaSourceId();
const enabledLib = makeLibrary(msId, { uuid: 'enabled-lib', enabled: true });
const disabledLib = makeLibrary(msId, { uuid: 'disabled-lib', enabled: false });
const mediaSource = makeMediaSource([enabledLib, disabledLib], { uuid: msId });
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf());
await scanner.scan({ mediaSourceId: mediaSource.uuid });
expect(mocks.externalCollectionsRepo.getByLibraryId).toHaveBeenCalledOnce();
expect(mocks.externalCollectionsRepo.getByLibraryId).toHaveBeenCalledWith('enabled-lib');
});
it('inserts a new ExternalCollection and upserts a tag for each discovered collection', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf());
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.upsertTag).toHaveBeenCalledWith(collection.title);
expect(mocks.externalCollectionsRepo.insertCollection).toHaveBeenCalledWith(
expect.objectContaining({
externalKey: collection.externalId,
libraryId: library.uuid,
mediaSourceId: msId,
sourceType: 'jellyfin',
title: collection.title,
}),
);
});
it('does not re-insert an ExternalCollection that already exists', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const existingColl = makeExternalCollection({
externalKey: collection.externalId,
libraryId: library.uuid,
mediaSourceId: msId,
});
const tag = makeTag(collection.title);
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
vi.mocked(mocks.externalCollectionsRepo.getByLibraryId).mockResolvedValue([existingColl]);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf());
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.externalCollectionsRepo.insertCollection).not.toHaveBeenCalled();
});
it('tags new program items (movies, episodes) added to a collection', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const movieExtId = 'movie-ext-1';
const programUuid = v4();
const movieItem = makeMovieItem(movieExtId);
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf(movieItem));
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
vi.mocked(mocks.programDB.lookupByExternalIds).mockResolvedValue([
{
uuid: programUuid,
type: 'movie',
externalIds: [{ sourceType: 'jellyfin', externalKey: movieExtId, externalSourceId: msId }],
} as never,
]);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.tagPrograms).toHaveBeenCalledWith(
tag.uuid,
[programUuid],
);
});
it('tags new grouping items (shows, artists) added to a collection', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const showExtId = 'show-ext-1';
const groupingUuid = v4();
const showItem = makeShowItem(showExtId);
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf(showItem));
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
vi.mocked(mocks.programDB.getProgramGroupingsByExternalIds).mockResolvedValue([
{
uuid: groupingUuid,
type: 'show',
externalIds: [{ sourceType: 'jellyfin', externalKey: showExtId, externalSourceId: msId }],
} as never,
]);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.tagProgramGroupings).toHaveBeenCalledWith(
tag.uuid,
[groupingUuid],
);
});
it('handles mixed-type collections (programs + groupings) from Jellyfin/Emby', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const movieExtId = 'movie-ext-1';
const showExtId = 'show-ext-1';
const programUuid = v4();
const groupingUuid = v4();
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
// Collection contains both a movie (program) and a show (grouping)
scanner.getCollectionItems.mockReturnValue(
asyncOf(makeMovieItem(movieExtId), makeShowItem(showExtId)),
);
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
vi.mocked(mocks.programDB.lookupByExternalIds).mockResolvedValue([
{
uuid: programUuid,
type: 'movie',
externalIds: [{ sourceType: 'jellyfin', externalKey: movieExtId, externalSourceId: msId }],
} as never,
]);
vi.mocked(mocks.programDB.getProgramGroupingsByExternalIds).mockResolvedValue([
{
uuid: groupingUuid,
type: 'show',
externalIds: [{ sourceType: 'jellyfin', externalKey: showExtId, externalSourceId: msId }],
} as never,
]);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.tagPrograms).toHaveBeenCalledWith(tag.uuid, [programUuid]);
expect(mocks.tagRepo.tagProgramGroupings).toHaveBeenCalledWith(tag.uuid, [groupingUuid]);
});
it('untags programs removed from an existing collection', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const existingColl = makeExternalCollection({
externalKey: collection.externalId,
libraryId: library.uuid,
mediaSourceId: msId,
});
const removedProgramUuid = v4();
const removedProgramExtKey = 'removed-movie-ext';
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
vi.mocked(mocks.externalCollectionsRepo.getByLibraryId).mockResolvedValue([existingColl]);
// Previously this program was in the collection
vi.mocked(mocks.externalCollectionsRepo.getCollectionPrograms).mockResolvedValue([
{
uuid: removedProgramUuid,
type: 'movie',
externalKey: removedProgramExtKey,
externalIds: [{ sourceType: 'jellyfin', externalKey: removedProgramExtKey }],
} as never,
]);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
// Collection is now empty — program was removed
scanner.getCollectionItems.mockReturnValue(asyncOf());
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.untagPrograms).toHaveBeenCalledWith(
tag.uuid,
[removedProgramUuid],
);
});
it('untags groupings removed from an existing collection', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const existingColl = makeExternalCollection({
externalKey: collection.externalId,
libraryId: library.uuid,
mediaSourceId: msId,
});
const removedGroupingUuid = v4();
const removedGroupingExtKey = 'removed-show-ext';
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
vi.mocked(mocks.externalCollectionsRepo.getByLibraryId).mockResolvedValue([existingColl]);
vi.mocked(mocks.externalCollectionsRepo.getCollectionProgramGroupings).mockResolvedValue([
{
uuid: removedGroupingUuid,
type: 'show',
externalKey: removedGroupingExtKey,
externalIds: [{ sourceType: 'jellyfin', externalKey: removedGroupingExtKey }],
} as never,
]);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf());
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.tagRepo.untagProgramGroupings).toHaveBeenCalledWith(
tag.uuid,
[removedGroupingUuid],
);
});
it('deletes and untags items from collections that no longer exist in the source', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const missingCollectionExtId = 'missing-coll-ext';
const missingCollUuid = v4();
const missingColl = makeExternalCollection({
uuid: missingCollUuid,
externalKey: missingCollectionExtId,
libraryId: library.uuid,
mediaSourceId: msId,
title: 'Gone Collection',
});
const taggedProgramUuid = v4();
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
vi.mocked(mocks.externalCollectionsRepo.getByLibraryId).mockResolvedValue([missingColl]);
// API returns no collections now
scanner.getAllLibraryCollections.mockReturnValue(asyncOf());
// getById for the missing collection returns it with its programs
vi.mocked(mocks.externalCollectionsRepo.getById).mockResolvedValue({
...missingColl,
programs: [{ program: { uuid: taggedProgramUuid, type: 'movie' } }],
groupings: [],
} as never);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.externalCollectionsRepo.deleteCollection).toHaveBeenCalledWith(missingCollUuid);
});
it('catches errors from individual library scans and continues to the next library', async () => {
const msId = makeMediaSourceId();
const failingLib = makeLibrary(msId, { uuid: 'failing-lib', externalKey: 'fail' });
const successLib = makeLibrary(msId, { uuid: 'success-lib', externalKey: 'success' });
const mediaSource = makeMediaSource([failingLib, successLib], { uuid: msId });
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
// First library throws; second returns nothing
scanner.getAllLibraryCollections.mockImplementation((_, libraryKey) => {
if (libraryKey === 'fail') {
return (async function* () {
throw new Error('Scan failed');
// eslint-disable-next-line no-unreachable
yield makeCollection();
})();
}
return asyncOf();
});
// Should not throw — error is caught per-library
await expect(scanner.scan({ mediaSourceId: msId })).resolves.not.toThrow();
expect(mocks.logger.warn).toHaveBeenCalled();
});
it('sends search index updates for newly tagged programs', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId);
const mediaSource = makeMediaSource([library], { uuid: msId });
const collection = makeCollection({ libraryId: library.uuid, mediaSourceId: msId });
const tag = makeTag(collection.title);
const movieExtId = 'movie-ext-1';
const programUuid = v4();
vi.mocked(mocks.mediaSourceDB.getById).mockResolvedValue(mediaSource);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf(collection));
scanner.getCollectionItems.mockReturnValue(asyncOf(makeMovieItem(movieExtId)));
vi.mocked(mocks.tagRepo.upsertTag).mockResolvedValue(tag);
vi.mocked(mocks.programDB.lookupByExternalIds).mockResolvedValue([
{
uuid: programUuid,
type: 'movie',
externalIds: [{ sourceType: 'jellyfin', externalKey: movieExtId, externalSourceId: msId }],
} as never,
]);
// Return a search doc for the program
const existingDoc: ProgramSearchDocument = {
id: programUuid,
tags: ['ExistingTag'],
type: 'movie',
duration: 5400000,
title: 'A Movie',
} as TerminalProgramSearchDocument;
vi.mocked(mocks.searchService.getPrograms).mockResolvedValue([existingDoc]);
await scanner.scan({ mediaSourceId: msId });
expect(mocks.searchService.updatePrograms).toHaveBeenCalledWith(
expect.arrayContaining([
expect.objectContaining({
id: programUuid,
// New tag is added alongside existing tags
tags: expect.arrayContaining([collection.title, 'ExistingTag']),
}),
]),
);
});
});
describe('scanLibrary()', () => {
it('throws when the library is not found', async () => {
vi.mocked(mocks.mediaSourceDB.getLibrary).mockResolvedValue(undefined);
await expect(
scanner.scanLibrary({ libraryId: 'nonexistent-lib' }),
).rejects.toThrow('Could not find media source library with ID nonexistent-lib');
});
it('returns early when the library is disabled', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId, { enabled: false });
vi.mocked(mocks.mediaSourceDB.getLibrary).mockResolvedValue({
...library,
mediaSource: makeMediaSource([], { uuid: msId }),
} as never);
await scanner.scanLibrary({ libraryId: library.uuid });
expect(scanner.getAllLibraryCollections).not.toHaveBeenCalled();
});
it('throws when the library belongs to a different source type', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId, { enabled: true });
vi.mocked(mocks.mediaSourceDB.getLibrary).mockResolvedValue({
...library,
mediaSource: makeMediaSource([], { uuid: msId, type: 'plex' }),
} as never);
await expect(
scanner.scanLibrary({ libraryId: library.uuid }),
).rejects.toThrow(/non-jellyfin media source/);
});
it('scans successfully when library is enabled and type matches', async () => {
const msId = makeMediaSourceId();
const library = makeLibrary(msId, { enabled: true });
vi.mocked(mocks.mediaSourceDB.getLibrary).mockResolvedValue({
...library,
mediaSource: makeMediaSource([], { uuid: msId, type: 'jellyfin' }),
} as never);
scanner.getAllLibraryCollections.mockReturnValue(asyncOf());
await expect(
scanner.scanLibrary({ libraryId: library.uuid }),
).resolves.not.toThrow();
expect(scanner.getAllLibraryCollections).toHaveBeenCalledOnce();
});
});
});

View File

@@ -1,4 +1,45 @@
import type { MediaSourceId } from '@tunarr/shared';
import { isNonEmptyString, seq } from '@tunarr/shared/util';
import {
type Collection,
isGroupingItemType,
type ProgramLike,
type ProgramOrFolder,
} from '@tunarr/types';
import {
chunk,
compact,
difference,
differenceWith,
groupBy,
isEmpty,
isUndefined,
partition,
uniq,
} from 'lodash-es';
import { v4 } from 'uuid';
import { ExternalCollectionRepo } from '../../db/ExternalCollectionRepo.ts';
import { IProgramDB } from '../../db/interfaces/IProgramDB.ts';
import { MediaSourceDB } from '../../db/mediaSourceDB.ts';
import type {
ProgramGroupingOrmWithRelations,
ProgramWithRelationsOrm,
} from '../../db/schema/derivedTypes.ts';
import { ExternalCollection } from '../../db/schema/ExternalCollection.ts';
import type { MediaSourceOrm } from '../../db/schema/MediaSource.ts';
import type { MediaSourceLibraryOrm } from '../../db/schema/MediaSourceLibrary.ts';
import { ProgramGroupingType } from '../../db/schema/ProgramGrouping.ts';
import type { RemoteSourceType } from '../../db/schema/base.ts';
import { Tag } from '../../db/schema/Tag.ts';
import { TagRepo } from '../../db/TagRepo.ts';
import { MediaSourceApiFactory } from '../../external/MediaSourceApiFactory.ts';
import { Result } from '../../types/result.ts';
import { groupByTyped, groupByUniq } from '../../util/index.ts';
import { Logger } from '../../util/logging/LoggerFactory.ts';
import {
MeilisearchService,
ProgramIndexPartialUpdate,
} from '../MeilisearchService.ts';
import { BaseMediaSourceScanner } from './MediaSourceScanner.ts';
export type ExternalCollectionScanRequest = {
@@ -11,12 +52,654 @@ export type ExternalCollectionLibraryScanRequest = {
force?: boolean;
};
type Context<ApiClientT> = {
mediaSource: MediaSourceOrm;
library: MediaSourceLibraryOrm;
apiClient: ApiClientT;
};
// Update the tags field of the item directly.
type SearchUpdate = {
type: 'direct' | 'parent' | 'grandparent';
id: string;
collectionName: string;
opType: 'add' | 'del';
};
export abstract class ExternalCollectionScanner<
ApiClientT,
> extends BaseMediaSourceScanner<ApiClientT, ExternalCollectionScanRequest> {
abstract scanLibrary(
req: ExternalCollectionLibraryScanRequest,
): Promise<void>;
abstract get sourceType(): RemoteSourceType;
abstract getAllLibraryCollections(
apiClient: ApiClientT,
libraryExternalKey: string,
): AsyncIterable<Collection>;
abstract getCollectionItems(
apiClient: ApiClientT,
libraryId: string,
collectionId: string,
): AsyncIterable<ProgramOrFolder>;
constructor(
protected logger: Logger,
protected mediaSourceDB: MediaSourceDB,
protected mediaSourceApiFactory: MediaSourceApiFactory,
protected externalCollectionsRepo: ExternalCollectionRepo,
protected searchService: MeilisearchService,
protected programDB: IProgramDB,
protected tagRepo: TagRepo,
) {
super();
}
async scan(req: ExternalCollectionScanRequest): Promise<void> {
const mediaSource = await this.mediaSourceDB.getById(req.mediaSourceId);
if (!mediaSource) {
throw new Error(
`Could not find media source with ID ${req.mediaSourceId}`,
);
}
this.logger.debug(
'Scanning %s media source (ID = %s) for collections',
this.sourceType,
mediaSource.uuid,
);
const enabledLibraries = mediaSource.libraries.filter((lib) => lib.enabled);
if (enabledLibraries.length === 0) {
this.logger.debug(
'%s media source (ID = %s) has no enabled libraries. Skipping collections scan',
this.sourceType,
mediaSource.uuid,
);
return;
}
const apiClient = await this.getApiClient(mediaSource);
for (const library of enabledLibraries) {
const result = await Result.attemptAsync(() =>
this.scanLibraryInternal({
mediaSource,
library,
apiClient,
}),
);
if (result.isFailure()) {
this.logger.warn(
result.error,
'Failure while scanning %s library (%s) for collections',
this.sourceType,
library.name,
);
}
}
}
async scanLibrary(req: ExternalCollectionLibraryScanRequest): Promise<void> {
const library = await this.mediaSourceDB.getLibrary(req.libraryId);
if (!library) {
throw new Error(
`Could not find media source library with ID ${req.libraryId}`,
);
}
this.logger.debug(
'Scanning %s collections for library %s',
this.sourceType,
library.name,
);
if (!library.enabled) {
this.logger.debug(
'Skipping %s collection scan for library %s because it is disabled',
this.sourceType,
req.libraryId,
);
return;
}
if (library.mediaSource.type !== this.sourceType) {
throw new Error(
`Tried to scan library ID ${req.libraryId} but it belongs to a non-${this.sourceType} media source (actual type = ${library.mediaSource.type})`,
);
}
const apiClient = await this.getApiClient({
...library.mediaSource,
libraries: [library],
});
await this.scanLibraryInternal({
apiClient,
library,
mediaSource: library.mediaSource,
});
}
private async scanLibraryInternal(ctx: Context<ApiClientT>) {
await this.searchService.waitForPendingIndexTasks();
this.logger.debug(
'Scanning %s library "%s" for collections',
this.sourceType,
ctx.library.name,
);
const existingCollections = groupByUniq(
await this.externalCollectionsRepo.getByLibraryId(ctx.library.uuid),
(coll) => coll.externalKey,
);
const existingCollectionExternalIds = new Set(
Object.keys(existingCollections),
);
const seenIds = new Set<string>();
const searchUpdates: SearchUpdate[] = [];
for await (const collection of this.getAllLibraryCollections(
ctx.apiClient,
ctx.library.externalKey,
)) {
seenIds.add(collection.externalId);
// Upsert a tag for this collection name
const tag = await this.tagRepo.upsertTag(collection.title);
// Check if collection is new
let existingCollection = existingCollections[collection.externalId];
if (!existingCollection) {
existingCollection = {
uuid: v4(),
externalKey: collection.externalId,
libraryId: ctx.library.uuid,
mediaSourceId: ctx.mediaSource.uuid,
sourceType: this.sourceType,
title: collection.title,
};
await this.externalCollectionsRepo.insertCollection(existingCollection);
}
searchUpdates.push(
...(await this.enumerateCollection(
existingCollection,
collection,
tag,
ctx,
)),
);
}
const missingIds = existingCollectionExternalIds.difference(seenIds);
const missingCollections = seq.collect(
[...missingIds.values()],
(id) => existingCollections[id],
);
for (const missingCollection of missingCollections) {
const collection = await this.externalCollectionsRepo.getById(
missingCollection.uuid,
);
if (!collection) {
continue;
}
const relatedGroupings = seq.collect(collection.groupings, (g) =>
g.grouping?.uuid ? ([g.grouping.uuid, g.grouping.type] as const) : null,
);
const allRelatedIds = relatedGroupings
.map(([id]) => id)
.concat(seq.collect(collection.programs, (p) => p.program?.uuid));
const relatedDescendants = await this.getAllDescendents(
relatedGroupings.map(([id, type]) => ({ uuid: id, type })),
);
for (const id of allRelatedIds) {
searchUpdates.push({
type: 'direct',
collectionName: collection.title,
id,
opType: 'del',
});
}
for (const [id, type] of relatedDescendants) {
searchUpdates.push({
type: isGroupingItemType(type) ? 'parent' : 'grandparent',
collectionName: collection.title,
id,
opType: 'del',
});
}
await this.externalCollectionsRepo.deleteCollection(collection.uuid);
}
// Search indexes must be updated once to avoid the select + update flow
// given that index requests are async, this avoids the race:
// 1. Doc with tags: [A, B, C]
// 2. Add collection D -> send index request with tags [A, B, C, D]
// 3. Index request is queued, next collection, E, processed
// 4. Select document, get tags [A, B, C]. Send index request for [A, B, C, E]
// 5. Request from 2 processed, then clobbered by request from 4.
// Now we have to collect all search updates and apply them at once to avoid the race.
// This involves getting all of the documents current tags
const updatesById = groupBy(searchUpdates, (update) => update.id);
for (const updateChunk of chunk(Object.entries(updatesById), 100)) {
const ids = updateChunk.map(([id]) => id);
const searchDocs = await this.searchService.getPrograms(ids);
// Apply the updates to each doc
const partialPrograms: ProgramIndexPartialUpdate[] = [];
for (const doc of searchDocs) {
const updates = updatesById[doc.id];
if (!updates) continue;
const currentTags = new Set(doc.tags);
const updatesByType = groupByTyped(updates, (up) => up.type);
const [directAdds, directDels] = partition(
updatesByType.direct,
(up) => up.opType === 'add',
);
const newTags = currentTags
.union(
new Set(directAdds.map(({ collectionName }) => collectionName)),
)
.difference(
new Set(directDels.map(({ collectionName }) => collectionName)),
);
const partialUpdate: ProgramIndexPartialUpdate = {
id: doc.id,
tags: isEmpty(updatesByType.direct)
? undefined
: [...newTags.values()],
};
if (doc.parent && !isEmpty(updatesByType.parent)) {
const currentParentTags = new Set(doc.parent.tags);
const [parentAdds, parentDels] = partition(
updatesByType.parent,
(up) => up.opType === 'add',
);
const newParentTags = currentParentTags
.union(
new Set(parentAdds.map(({ collectionName }) => collectionName)),
)
.difference(
new Set(parentDels.map(({ collectionName }) => collectionName)),
);
partialUpdate.parent = {
...doc.parent,
tags: [...newParentTags.values()],
};
}
if (doc.grandparent && !isEmpty(updatesByType.grandparent)) {
const currentGrandparentTags = new Set(doc.grandparent.tags);
const [adds, dels] = partition(
updatesByType.grandparent,
(up) => up.opType === 'add',
);
const newGrandparentTags = currentGrandparentTags
.union(new Set(adds.map(({ collectionName }) => collectionName)))
.difference(
new Set(dels.map(({ collectionName }) => collectionName)),
);
partialUpdate.grandparent = {
...doc.grandparent,
tags: [...newGrandparentTags.values()],
};
}
if (
!isUndefined(partialUpdate.tags) ||
!isUndefined(partialUpdate.parent?.tags) ||
!isUndefined(partialUpdate.grandparent?.tags)
) {
partialPrograms.push(partialUpdate);
}
}
this.logger.debug(
'Sending %d %s collection tag updates to search server',
partialPrograms.length,
this.sourceType,
);
await this.searchService.updatePrograms(partialPrograms);
}
}
private async enumerateCollection(
collectionDao: ExternalCollection,
collection: Collection,
tag: Tag,
context: Context<ApiClientT>,
): Promise<SearchUpdate[]> {
this.logger.debug('Scanning collection "%s"', collection.title);
const it = this.getCollectionItems(
context.apiClient,
context.library.uuid,
collection.externalId,
);
// Fetch existing associations for both groupings and programs
const [existingGroupingsByExternalId, existingProgramsByExternalId] =
await Promise.all([
this.externalCollectionsRepo
.getCollectionProgramGroupings(collectionDao.uuid)
.then((groupings) =>
groupBy(
compact(groupings).filter((g) => isNonEmptyString(g.externalKey)),
(g) => g.externalKey!,
),
),
this.externalCollectionsRepo
.getCollectionPrograms(collectionDao.uuid)
.then((programs) =>
groupBy(
compact(programs).filter((p) => isNonEmptyString(p.externalKey)),
(p) => p.externalKey,
),
),
]);
// Partition items by type: groupings (shows/seasons/artists/albums) vs
// terminal programs (movies/episodes/tracks/etc.). JF/Emby BoxSets can be
// mixed, so we partition rather than relying on a collection-level childType.
const seenGroupingIds = new Set<string>();
const seenProgramIds = new Set<string>();
for await (const item of it) {
if (isGroupingItemType(item.type)) {
seenGroupingIds.add(item.externalId);
} else {
seenProgramIds.add(item.externalId);
}
}
const newGroupingKeys = seenGroupingIds.difference(
new Set(Object.keys(existingGroupingsByExternalId)),
);
const newProgramKeys = seenProgramIds.difference(
new Set(Object.keys(existingProgramsByExternalId)),
);
this.logger.debug(
'Adding %d grouping and %d program tag associations for collection "%s"',
newGroupingKeys.size,
newProgramKeys.size,
collection.title,
);
const searchUpdates: SearchUpdate[] = [];
// Process new groupings
for (const newKeyChunk of chunk([...newGroupingKeys], 100)) {
const groupings = await this.getProgramGroupingsAndSearchDocuments(
context,
newKeyChunk,
);
searchUpdates.push(
...groupings.daos.map(
({ uuid }) =>
({
id: uuid,
type: 'direct',
collectionName: collection.title,
opType: 'add',
}) satisfies SearchUpdate,
),
);
await this.tagRepo.tagProgramGroupings(
tag.uuid,
groupings.daos.map((dao) => dao.uuid),
);
const allDescendentIds = await this.getAllDescendents(groupings.daos);
for (const [typ, id] of allDescendentIds) {
if (typ === 'album' || typ === 'season') {
searchUpdates.push({
type: 'parent',
id,
collectionName: collection.title,
opType: 'add',
});
} else if (typ === 'episode' || typ === 'track') {
searchUpdates.push({
type: 'grandparent',
id,
collectionName: collection.title,
opType: 'add',
});
}
}
}
// Process new programs
for (const newKeyChunk of chunk([...newProgramKeys], 100)) {
const programs = await this.getProgramsAndSearchDocuments(
context,
newKeyChunk,
);
searchUpdates.push(
...programs.daos.map(
({ uuid }) =>
({
type: 'direct',
id: uuid,
collectionName: collection.title,
opType: 'add',
}) satisfies SearchUpdate,
),
);
await this.tagRepo.tagPrograms(
tag.uuid,
programs.daos.map((dao) => dao.uuid),
);
}
// Process removed groupings
const removedGroupingKeys = new Set(
Object.keys(existingGroupingsByExternalId),
).difference(seenGroupingIds);
const groupingIds = uniq(
[...removedGroupingKeys].flatMap((key) =>
(existingGroupingsByExternalId[key] ?? []).map(
(x) => [x.uuid, x.type] as const,
),
),
);
await this.tagRepo.untagProgramGroupings(tag.uuid, [
...groupingIds.map(([id]) => id),
]);
searchUpdates.push(
...groupingIds.map(
([id]) =>
({
id,
type: 'direct',
opType: 'del',
collectionName: collection.title,
}) satisfies SearchUpdate,
),
);
const groupingDescendantIds = await this.getAllDescendents(
groupingIds.map(([id, type]) => ({ uuid: id, type })),
);
for (const [typ, id] of groupingDescendantIds) {
searchUpdates.push({
collectionName: collection.title,
id,
type: isGroupingItemType(typ) ? 'parent' : 'grandparent',
opType: 'del',
});
}
// Process removed programs
const removedProgramKeys = new Set(
Object.keys(existingProgramsByExternalId),
).difference(seenProgramIds);
const programIds = uniq(
[...removedProgramKeys].flatMap((key) =>
(existingProgramsByExternalId[key] ?? []).map((x) => x.uuid),
),
);
await this.tagRepo.untagPrograms(tag.uuid, programIds);
searchUpdates.push(
...programIds.map(
(id) =>
({
id,
type: 'direct',
opType: 'del',
collectionName: collection.title,
}) satisfies SearchUpdate,
),
);
this.logger.debug(
'Removing %d grouping and %d program tag associations for collection "%s"',
removedGroupingKeys.size,
removedProgramKeys.size,
collection.title,
);
return searchUpdates;
}
private async getAllDescendents(
groupings: { uuid: string; type: ProgramGroupingType }[],
): Promise<Array<readonly [ProgramLike['type'], string]>> {
return uniq(
(
await Promise.all(
groupings.map(({ type, uuid }) => {
return Promise.all([
this.programDB
.getChildren(uuid, type)
.then((_) =>
_.results.map(
(
p:
| ProgramGroupingOrmWithRelations
| ProgramWithRelationsOrm,
) => [p.type, p.uuid] as const,
),
),
this.programDB
.getProgramGroupingDescendants(uuid, type)
.then((_) => _.map((p) => [p.type, p.uuid] as const)),
]).then((_) => uniq(_.flat()));
}),
)
).flat(),
);
}
private async getProgramsAndSearchDocuments(
ctx: Context<ApiClientT>,
externalKeys: string[],
) {
const externalIds = new Set(
externalKeys.map(
(id) => [this.sourceType, ctx.mediaSource.uuid, id] as const,
),
);
const programs = await this.programDB.lookupByExternalIds(externalIds);
const foundIds = programs
.flatMap((program) => program.externalIds)
.filter((eid) => eid.sourceType === this.sourceType)
.map((eid) => eid.externalKey);
const missingIds = difference(externalKeys, foundIds);
if (missingIds.length > 0) {
this.logger.warn(
'Could not resolve programs for %d %s IDs. IDs: %O',
missingIds.length,
this.sourceType,
missingIds,
);
}
const daoIds = programs.map((group) => group.uuid);
const docs = await this.searchService.getPrograms(daoIds);
if (daoIds.length > docs.length) {
const missingDocIds = differenceWith(
daoIds,
docs,
(id, doc) => id === doc.id,
);
this.logger.warn(
'Could not find %d documents in the search index. IDs: %O',
missingDocIds.length,
missingDocIds,
);
}
return {
daos: programs.map((group) => ({
uuid: group.uuid,
type: group.type,
})),
searchDocs: docs,
};
}
private async getProgramGroupingsAndSearchDocuments(
ctx: Context<ApiClientT>,
externalKeys: string[],
) {
const externalIds = new Set(
externalKeys.map(
(id) => [this.sourceType, ctx.mediaSource.uuid, id] as const,
),
);
const groups =
await this.programDB.getProgramGroupingsByExternalIds(externalIds);
const foundIds = groups
.flatMap((group) => group.externalIds)
.filter((eid) => eid.sourceType === this.sourceType)
.map((eid) => eid.externalKey);
const missingIds = difference(externalKeys, foundIds);
if (missingIds.length > 0) {
this.logger.warn(
'Could not resolve program groupings for %d %s IDs. IDs: %O',
missingIds.length,
this.sourceType,
missingIds,
);
}
const daoIds = groups.map((group) => group.uuid);
const docs = await this.searchService.getPrograms(daoIds);
if (daoIds.length > docs.length) {
const missingDocIds = differenceWith(
daoIds,
docs,
(id, doc) => id === doc.id,
);
this.logger.warn(
'Could not find %d documents in the search index. IDs: %O',
missingDocIds.length,
missingDocIds,
);
}
return {
daos: groups.map((group) => ({
uuid: group.uuid,
type: group.type,
})),
searchDocs: docs,
};
}
}
export type GenericExternalCollectionScanner =

View File

@@ -0,0 +1,69 @@
import type { Collection, ProgramOrFolder } from '@tunarr/types';
import { inject, injectable } from 'inversify';
import { ExternalCollectionRepo } from '../../db/ExternalCollectionRepo.ts';
import { IProgramDB } from '../../db/interfaces/IProgramDB.ts';
import { MediaSourceDB } from '../../db/mediaSourceDB.ts';
import type { MediaSourceWithRelations } from '../../db/schema/derivedTypes.ts';
import type { RemoteSourceType } from '../../db/schema/base.ts';
import { TagRepo } from '../../db/TagRepo.ts';
import { MediaSourceApiFactory } from '../../external/MediaSourceApiFactory.ts';
import type { JellyfinApiClient } from '../../external/jellyfin/JellyfinApiClient.ts';
import { KEYS } from '../../types/inject.ts';
import { Logger } from '../../util/logging/LoggerFactory.ts';
import { MeilisearchService } from '../MeilisearchService.ts';
import { ExternalCollectionScanner } from './ExternalCollectionScanner.ts';
@injectable()
export class JellyfinCollectionScanner extends ExternalCollectionScanner<JellyfinApiClient> {
get sourceType(): RemoteSourceType {
return 'jellyfin';
}
constructor(
@inject(KEYS.Logger) logger: Logger,
@inject(MediaSourceDB) mediaSourceDB: MediaSourceDB,
@inject(MediaSourceApiFactory)
mediaSourceApiFactory: MediaSourceApiFactory,
@inject(ExternalCollectionRepo)
externalCollectionsRepo: ExternalCollectionRepo,
@inject(MeilisearchService)
searchService: MeilisearchService,
@inject(KEYS.ProgramDB)
programDB: IProgramDB,
@inject(TagRepo)
tagRepo: TagRepo,
) {
super(
logger,
mediaSourceDB,
mediaSourceApiFactory,
externalCollectionsRepo,
searchService,
programDB,
tagRepo,
);
}
protected getApiClient(
mediaSource: MediaSourceWithRelations,
): Promise<JellyfinApiClient> {
return this.mediaSourceApiFactory.getJellyfinApiClientForMediaSource(
mediaSource,
);
}
getAllLibraryCollections(
apiClient: JellyfinApiClient,
libraryExternalKey: string,
): AsyncIterable<Collection> {
return apiClient.getAllLibraryCollections(libraryExternalKey);
}
getCollectionItems(
apiClient: JellyfinApiClient,
_libraryId: string,
collectionId: string,
): AsyncIterable<ProgramOrFolder> {
return apiClient.getCollectionItems(collectionId);
}
}

View File

@@ -1,552 +1,49 @@
import { isNonEmptyString, seq } from '@tunarr/shared/util';
import { Collection, isGroupingItemType, ProgramLike } from '@tunarr/types';
import type { Collection, ProgramOrFolder } from '@tunarr/types';
import { inject, injectable } from 'inversify';
import {
chunk,
compact,
difference,
differenceWith,
groupBy,
isEmpty,
isUndefined,
partition,
uniq,
} from 'lodash-es';
import { v4 } from 'uuid';
import { ExternalCollectionRepo } from '../../db/ExternalCollectionRepo.ts';
import { IProgramDB } from '../../db/interfaces/IProgramDB.ts';
import { MediaSourceDB } from '../../db/mediaSourceDB.ts';
import type {
MediaSourceWithRelations,
ProgramGroupingOrmWithRelations,
ProgramWithRelationsOrm,
} from '../../db/schema/derivedTypes.ts';
import { ExternalCollection } from '../../db/schema/ExternalCollection.ts';
import { MediaSourceOrm } from '../../db/schema/MediaSource.ts';
import { MediaSourceLibraryOrm } from '../../db/schema/MediaSourceLibrary.ts';
import { ProgramGroupingType } from '../../db/schema/ProgramGrouping.ts';
import { Tag } from '../../db/schema/Tag.ts';
import type { MediaSourceWithRelations } from '../../db/schema/derivedTypes.ts';
import type { RemoteSourceType } from '../../db/schema/base.ts';
import { TagRepo } from '../../db/TagRepo.ts';
import { MediaSourceApiFactory } from '../../external/MediaSourceApiFactory.ts';
import type { PlexApiClient } from '../../external/plex/PlexApiClient.ts';
import { KEYS } from '../../types/inject.ts';
import { Result } from '../../types/result.ts';
import { groupByTyped, groupByUniq } from '../../util/index.ts';
import { Logger } from '../../util/logging/LoggerFactory.ts';
import { MeilisearchService } from '../MeilisearchService.ts';
import {
MeilisearchService,
ProgramIndexPartialUpdate,
} from '../MeilisearchService.ts';
import {
ExternalCollectionLibraryScanRequest,
ExternalCollectionScanner,
ExternalCollectionScanRequest,
} from './ExternalCollectionScanner.ts';
type Context = {
mediaSource: MediaSourceOrm;
library: MediaSourceLibraryOrm;
apiClient: PlexApiClient;
};
// Update the tags field of the item directly.
type SearchUpdate = {
type: 'direct' | 'parent' | 'grandparent';
id: string;
collectionName: string;
opType: 'add' | 'del';
};
@injectable()
export class PlexCollectionScanner extends ExternalCollectionScanner<PlexApiClient> {
get sourceType(): RemoteSourceType {
return 'plex';
}
constructor(
@inject(KEYS.Logger) private logger: Logger,
@inject(MediaSourceDB) private mediaSourceDB: MediaSourceDB,
@inject(KEYS.Logger) logger: Logger,
@inject(MediaSourceDB) mediaSourceDB: MediaSourceDB,
@inject(MediaSourceApiFactory)
private mediaSourceApiFactory: MediaSourceApiFactory,
mediaSourceApiFactory: MediaSourceApiFactory,
@inject(ExternalCollectionRepo)
private externalCollectionsRepo: ExternalCollectionRepo,
externalCollectionsRepo: ExternalCollectionRepo,
@inject(MeilisearchService)
private searchService: MeilisearchService,
searchService: MeilisearchService,
@inject(KEYS.ProgramDB)
private programDB: IProgramDB,
programDB: IProgramDB,
@inject(TagRepo)
private tagRepo: TagRepo,
tagRepo: TagRepo,
) {
super();
}
async scan(req: ExternalCollectionScanRequest): Promise<void> {
const mediaSource = await this.mediaSourceDB.getById(req.mediaSourceId);
if (!mediaSource) {
throw new Error(
`Could not find media source with ID ${req.mediaSourceId}`,
);
}
this.logger.debug(
'Scanning Plex media source (ID = %s) for collections',
mediaSource.uuid,
super(
logger,
mediaSourceDB,
mediaSourceApiFactory,
externalCollectionsRepo,
searchService,
programDB,
tagRepo,
);
const enabledLibraries = mediaSource.libraries.filter((lib) => lib.enabled);
if (enabledLibraries.length === 0) {
this.logger.debug(
'Plex media source (ID = %s) has no enabled libraries. Skipping collections scan',
mediaSource.uuid,
);
return;
}
const apiClient = await this.getApiClient(mediaSource);
for (const library of enabledLibraries) {
const result = await Result.attemptAsync(() =>
this.scanLibraryInternal({
mediaSource,
library,
apiClient,
}),
);
if (result.isFailure()) {
this.logger.warn(
result.error,
'Failure while scanning Plex library (%s) for collections',
library.name,
);
}
}
}
async scanLibrary(req: ExternalCollectionLibraryScanRequest): Promise<void> {
const library = await this.mediaSourceDB.getLibrary(req.libraryId);
if (!library) {
throw new Error(
`Could not find media source library with ID ${req.libraryId}`,
);
}
this.logger.debug('Scanning Plex collections for library %s', library.name);
if (!library.enabled) {
this.logger.debug(
'Skipping Plex collection scan for library %s because it is disabled',
req.libraryId,
);
return;
}
if (library.mediaSource.type !== 'plex') {
throw new Error(
`Tried to scan library ID ${req.libraryId} but it belongs to a non-Plex media source (actual type = ${library.mediaSource.type})`,
);
}
const apiClient = await this.getApiClient({
...library.mediaSource,
libraries: [library],
});
await this.scanLibraryInternal({
apiClient,
library,
mediaSource: library.mediaSource,
});
}
private async scanLibraryInternal(ctx: Context) {
await this.searchService.waitForPendingIndexTasks();
this.logger.debug(
'Scanning Plex library "%s" for collections',
ctx.library.name,
);
const existingCollections = groupByUniq(
await this.externalCollectionsRepo.getByLibraryId(ctx.library.uuid),
(coll) => coll.externalKey,
);
const existingCollectionExternalIds = new Set(
Object.keys(existingCollections),
);
const seenIds = new Set<string>();
const searchUpdates: SearchUpdate[] = [];
for await (const collection of ctx.apiClient.getAllLibraryCollections(
ctx.library.externalKey,
)) {
seenIds.add(collection.externalId);
// Upsert a tag for this collection name
const tag = await this.tagRepo.upsertTag(collection.title);
// Check if collection is new
let existingCollection = existingCollections[collection.externalId];
if (!existingCollection) {
existingCollection = {
uuid: v4(),
externalKey: collection.externalId,
libraryId: ctx.library.uuid,
mediaSourceId: ctx.mediaSource.uuid,
sourceType: 'plex',
title: collection.title,
};
await this.externalCollectionsRepo.insertCollection(existingCollection);
}
searchUpdates.push(
...(await this.enumerateCollection(
existingCollection,
collection,
tag,
ctx,
)),
);
}
const missingIds = existingCollectionExternalIds.difference(seenIds);
const missingCollections = seq.collect(
[...missingIds.values()],
(id) => existingCollections[id],
);
for (const missingCollection of missingCollections) {
const collection = await this.externalCollectionsRepo.getById(
missingCollection.uuid,
);
if (!collection) {
continue;
}
const relatedGroupings = seq.collect(collection.groupings, (g) =>
g.grouping?.uuid ? ([g.grouping.uuid, g.grouping.type] as const) : null,
);
const allRelatedIds = relatedGroupings
.map(([id]) => id)
.concat(seq.collect(collection.programs, (p) => p.program?.uuid));
const relatedDescendants = await this.getAllDescendents(
relatedGroupings.map(([id, type]) => ({ uuid: id, type })),
);
for (const id of allRelatedIds) {
searchUpdates.push({
type: 'direct',
collectionName: collection.title,
id,
opType: 'del',
});
}
for (const [id, type] of relatedDescendants) {
searchUpdates.push({
type: isGroupingItemType(type) ? 'parent' : 'grandparent',
collectionName: collection.title,
id,
opType: 'del',
});
}
await this.externalCollectionsRepo.deleteCollection(collection.uuid);
}
// Search indexes must be updated once to avoid the select + update flow
// given that index requests are async, this avoids the race:
// 1. Doc with tags: [A, B, C]
// 2. Add collection D -> send index request with tags [A, B, C, D]
// 3. Index request is queued, next collection, E, processed
// 4. Select document, get tags [A, B, C]. Send index request for [A, B, C, E]
// 5. Request from 2 processed, then clobbered by request from 4.
// Now we have to collect all search updates and apply them at once to avoid the race.
// This involves getting all of the documents current tags
const updatesById = groupBy(searchUpdates, (update) => update.id);
for (const updateChunk of chunk(Object.entries(updatesById), 100)) {
const ids = updateChunk.map(([id]) => id);
const searchDocs = await this.searchService.getPrograms(ids);
// Apply the updates to each doc
const partialPrograms: ProgramIndexPartialUpdate[] = [];
for (const doc of searchDocs) {
const updates = updatesById[doc.id];
if (!updates) continue;
const currentTags = new Set(doc.tags);
const updatesByType = groupByTyped(updates, (up) => up.type);
const [directAdds, directDels] = partition(
updatesByType.direct,
(up) => up.opType === 'add',
);
const newTags = currentTags
.union(
new Set(directAdds.map(({ collectionName }) => collectionName)),
)
.difference(
new Set(directDels.map(({ collectionName }) => collectionName)),
);
const partialUpdate: ProgramIndexPartialUpdate = {
id: doc.id,
tags: isEmpty(updatesByType.direct)
? undefined
: [...newTags.values()],
};
if (doc.parent && !isEmpty(updatesByType.parent)) {
const currentParentTags = new Set(doc.parent.tags);
const [parentAdds, parentDels] = partition(
updatesByType.parent,
(up) => up.opType === 'add',
);
const newTags = currentParentTags
.union(
new Set(parentAdds.map(({ collectionName }) => collectionName)),
)
.difference(
new Set(parentDels.map(({ collectionName }) => collectionName)),
);
partialUpdate.parent = {
...doc.parent,
tags: [...newTags.values()],
};
}
if (doc.grandparent && !isEmpty(updatesByType.grandparent)) {
const currentGrandparentTags = new Set(doc.grandparent.tags);
const [adds, dels] = partition(
updatesByType.grandparent,
(up) => up.opType === 'add',
);
const newTags = currentGrandparentTags
.union(new Set(adds.map(({ collectionName }) => collectionName)))
.difference(
new Set(dels.map(({ collectionName }) => collectionName)),
);
partialUpdate.grandparent = {
...doc.grandparent,
tags: [...newTags.values()],
};
}
if (
!isUndefined(partialUpdate.tags) ||
!isUndefined(partialUpdate.parent?.tags) ||
!isUndefined(partialUpdate.grandparent?.tags)
) {
partialPrograms.push(partialUpdate);
}
}
this.logger.debug(
'Sending %d Plex collection tag updates to search server',
partialPrograms.length,
);
await this.searchService.updatePrograms(partialPrograms);
}
}
private async enumerateCollection(
collectionDao: ExternalCollection,
collection: Collection,
tag: Tag,
context: Context,
): Promise<SearchUpdate[]> {
this.logger.debug('Scanning collection "%s"', collection.title);
const it = context.apiClient.getCollectionItems(
context.library.uuid,
collection.externalId,
);
const seenIds = new Set<string>();
const isGroupingCollectionType =
collection.childType === 'show' || collection.childType === 'artist';
const existingCollectionProgramsByExternalId =
await (isGroupingCollectionType
? this.externalCollectionsRepo
.getCollectionProgramGroupings(collectionDao.uuid)
.then((groupings) => ({
type: 'groupings' as const,
groupings: groupBy(
compact(groupings).filter((grouping) =>
isNonEmptyString(grouping.externalKey),
),
(grouping) => grouping.externalKey!,
),
}))
: this.externalCollectionsRepo
.getCollectionPrograms(collectionDao.uuid)
.then((programs) => ({
type: 'programs' as const,
programs: groupBy(
compact(programs).filter((program) =>
isNonEmptyString(program.externalKey),
),
(program) => program.externalKey,
),
})));
for await (const item of it) {
seenIds.add(item.externalId);
}
let newKeys: Set<string>;
if (existingCollectionProgramsByExternalId.type === 'groupings') {
newKeys = seenIds.difference(
new Set(Object.keys(existingCollectionProgramsByExternalId.groupings)),
);
} else {
newKeys = seenIds.difference(
new Set(Object.keys(existingCollectionProgramsByExternalId.programs)),
);
}
this.logger.debug(
'Adding %d tag associtions for collection "%s" from %s',
newKeys.size,
collection.title,
existingCollectionProgramsByExternalId.type,
);
const searchUpdates: SearchUpdate[] = [];
for (const newKeyChunk of chunk([...newKeys], 100)) {
if (isGroupingCollectionType) {
const groupings = await this.getProgramGroupingsAndSearchDocuments(
context,
newKeyChunk,
);
searchUpdates.push(
...groupings.daos.map(
({ uuid }) =>
({
id: uuid,
type: 'direct',
collectionName: collection.title,
opType: 'add',
}) satisfies SearchUpdate,
),
);
await this.tagRepo.tagProgramGroupings(
tag.uuid,
groupings.daos.map((dao) => dao.uuid),
);
// For groupings, we also need to expand the hierarchy so we can update nested
// documents in the search index.
const allDescendentIds = await this.getAllDescendents(groupings.daos);
for (const [typ, id] of allDescendentIds) {
if (typ === 'album' || typ === 'season') {
searchUpdates.push({
type: 'parent',
id,
collectionName: collection.title,
opType: 'add',
});
} else if (typ === 'episode' || typ === 'track') {
searchUpdates.push({
type: 'grandparent',
id,
collectionName: collection.title,
opType: 'add',
});
}
}
// childSearchDocs.push(
// ...(await this.searchService.getPrograms(allDescendentIds)),
// );
} else {
const programs = await this.getProgramsAndSearchDocuments(
context,
newKeyChunk,
);
searchUpdates.push(
...programs.daos.map(
({ uuid }) =>
({
type: 'direct',
id: uuid,
collectionName: collection.title,
opType: 'add',
}) satisfies SearchUpdate,
),
);
await this.tagRepo.tagPrograms(
tag.uuid,
programs.daos.map((dao) => dao.uuid),
);
}
}
// Removed keys
let removedKeys: Set<string>;
if (existingCollectionProgramsByExternalId.type === 'groupings') {
removedKeys = new Set(
Object.keys(existingCollectionProgramsByExternalId.groupings),
).difference(seenIds);
const groupingIds = uniq(
[...removedKeys].flatMap((key) =>
(existingCollectionProgramsByExternalId.groupings[key] ?? []).map(
(x) => [x.uuid, x.type] as const,
),
),
);
await this.tagRepo.untagProgramGroupings(tag.uuid, [
...groupingIds.map(([id]) => id),
]);
searchUpdates.push(
...groupingIds.map(
([id]) =>
({
id,
type: 'direct',
opType: 'del',
collectionName: collection.title,
}) satisfies SearchUpdate,
),
);
const allDescendentIds = await this.getAllDescendents(
groupingIds.map(([id, type]) => ({ uuid: id, type })),
);
for (const [typ, id] of allDescendentIds) {
searchUpdates.push({
collectionName: collection.title,
id,
type: isGroupingItemType(typ) ? 'parent' : 'grandparent',
opType: 'del',
});
}
} else {
removedKeys = new Set(
Object.keys(existingCollectionProgramsByExternalId.programs),
).difference(seenIds);
const programIds = uniq(
[...removedKeys].flatMap((key) =>
(existingCollectionProgramsByExternalId.programs[key] ?? []).map(
(x) => x.uuid,
),
),
);
await this.tagRepo.untagPrograms(tag.uuid, programIds);
searchUpdates.push(
...programIds.map(
(id) =>
({
id,
type: 'direct',
opType: 'del',
collectionName: collection.title,
}) satisfies SearchUpdate,
),
);
}
this.logger.debug(
'Removing %d tag associations for collection "%s" from %s',
removedKeys.size,
collection.title,
existingCollectionProgramsByExternalId.type,
);
return searchUpdates;
}
protected getApiClient(
@@ -557,127 +54,18 @@ export class PlexCollectionScanner extends ExternalCollectionScanner<PlexApiClie
);
}
private async getAllDescendents(
groupings: { uuid: string; type: ProgramGroupingType }[],
): Promise<Array<readonly [ProgramLike['type'], string]>> {
return uniq(
(
await Promise.all(
groupings.map(({ type, uuid }) => {
return Promise.all([
this.programDB
.getChildren(uuid, type)
.then((_) =>
_.results.map(
(
p:
| ProgramGroupingOrmWithRelations
| ProgramWithRelationsOrm,
) => [p.type, p.uuid] as const,
),
),
this.programDB
.getProgramGroupingDescendants(uuid, type)
.then((_) => _.map((p) => [p.type, p.uuid] as const)),
]).then((_) => uniq(_.flat()));
}),
)
).flat(),
);
getAllLibraryCollections(
apiClient: PlexApiClient,
libraryExternalKey: string,
): AsyncIterable<Collection> {
return apiClient.getAllLibraryCollections(libraryExternalKey);
}
private async getProgramsAndSearchDocuments(
ctx: Context,
externalKeys: string[],
) {
const externalIds = new Set(
externalKeys.map((id) => ['plex', ctx.mediaSource.uuid, id] as const),
);
const programs = await this.programDB.lookupByExternalIds(externalIds);
const plexIds = programs
.flatMap((program) => program.externalIds)
.filter((eid) => eid.sourceType === 'plex')
.map((eid) => eid.externalKey);
const missingIds = difference(externalKeys, plexIds);
if (missingIds.length > 0) {
this.logger.warn(
'Could not resolve programs for %d Plex IDs. IDs: %O',
missingIds.length,
missingIds,
);
}
const daoIds = programs.map((group) => group.uuid);
const docs = await this.searchService.getPrograms(daoIds);
if (daoIds.length > docs.length) {
const missingIds = differenceWith(
daoIds,
docs,
(id, doc) => id === doc.id,
);
this.logger.warn(
'Could not find %d documents in the search index. IDs: %O',
missingIds.length,
missingIds,
);
}
return {
daos: programs.map((group) => ({
uuid: group.uuid,
type: group.type,
})),
searchDocs: docs,
};
}
private async getProgramGroupingsAndSearchDocuments(
ctx: Context,
externalKeys: string[],
) {
const externalIds = new Set(
externalKeys.map((id) => ['plex', ctx.mediaSource.uuid, id] as const),
);
const groups =
await this.programDB.getProgramGroupingsByExternalIds(externalIds);
const plexIds = groups
.flatMap((group) => group.externalIds)
.filter((eid) => eid.sourceType === 'plex')
.map((eid) => eid.externalKey);
const missingIds = difference(externalKeys, plexIds);
if (missingIds.length > 0) {
this.logger.warn(
'Could not resolve programs for %d Plex IDs. IDs: %O',
missingIds.length,
missingIds,
);
}
const daoIds = groups.map((group) => group.uuid);
const docs = await this.searchService.getPrograms(daoIds);
if (daoIds.length > docs.length) {
const missingIds = differenceWith(
daoIds,
docs,
(id, doc) => id === doc.id,
);
this.logger.warn(
'Could not find %d documents in the search index. IDs: %O',
missingIds.length,
missingIds,
);
}
return {
daos: groups.map((group) => ({
uuid: group.uuid,
type: group.type,
})),
searchDocs: docs,
};
getCollectionItems(
apiClient: PlexApiClient,
libraryId: string,
collectionId: string,
): AsyncIterable<ProgramOrFolder> {
return apiClient.getCollectionItems(libraryId, collectionId);
}
}