fix: ensure fallback filler is looped for the duration of the stream block (#1329)

This commit is contained in:
Christian Benincasa
2025-09-04 15:45:48 -04:00
committed by GitHub
parent b00a68a970
commit c16b208362
28 changed files with 841 additions and 73 deletions

21
pnpm-lock.yaml generated
View File

@@ -259,6 +259,9 @@ importers:
specifier: ^4.0.17
version: 4.0.17
devDependencies:
'@faker-js/faker':
specifier: ^9.9.0
version: 9.9.0
'@rollup/plugin-swc':
specifier: ^0.4.0
version: 0.4.0(@swc/core@1.10.9)(rollup@4.20.0)
@@ -355,6 +358,9 @@ importers:
ts-essentials:
specifier: ^10.0.0
version: 10.0.3(typescript@5.7.3)
ts-mockito:
specifier: ^2.6.1
version: 2.6.1
tsconfig-paths:
specifier: ^4.2.0
version: 4.2.0
@@ -1724,6 +1730,10 @@ packages:
resolution: {integrity: sha512-zSkKow6H5Kdm0ZUQUB2kV5JIXqoG0+uH5YADhaEHswm664N9Db8dXSi0nMJpacpMf+MyyglF1vnZohpEg5yUtg==}
engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0}
'@faker-js/faker@9.9.0':
resolution: {integrity: sha512-OEl393iCOoo/z8bMezRlJu+GlRGlsKbUAN7jKB6LhnKoqKve5DXRpalbItIIcwnCjs1k/FOPjFzcA6Qn+H+YbA==}
engines: {node: '>=18.0.0', npm: '>=9.0.0'}
'@fastify/accept-negotiator@2.0.0':
resolution: {integrity: sha512-/Sce/kBzuTxIq5tJh85nVNOq9wKD8s+viIgX0fFMDBdw95gnpf53qmF1oBgJym3cPFliWUuSloVg/1w/rH0FcQ==}
@@ -4076,7 +4086,7 @@ packages:
'@types/sql.js': '*'
'@vercel/postgres': '>=0.8.0'
'@xata.io/client': '*'
better-sqlite3: '>=7'
better-sqlite3: 9.4.5
bun-types: '*'
expo-sqlite: '>=14.0.0'
knex: '*'
@@ -7537,6 +7547,9 @@ packages:
ts-interface-checker@0.1.13:
resolution: {integrity: sha512-Y/arvbn+rrz3JCKl9C4kVNfTfSm2/mEp5FSz5EsZSANGPSlQrpRI5M4PKF+mJnE52jOO90PnPSc3Ur3bTQw0gA==}
ts-mockito@2.6.1:
resolution: {integrity: sha512-qU9m/oEBQrKq5hwfbJ7MgmVN5Gu6lFnIGWvpxSjrqq6YYEVv+RwVFWySbZMBgazsWqv6ctAyVBpo9TmAxnOEKw==}
ts-node@10.9.2:
resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==}
hasBin: true
@@ -9117,6 +9130,8 @@ snapshots:
dependencies:
levn: 0.4.1
'@faker-js/faker@9.9.0': {}
'@fastify/accept-negotiator@2.0.0': {}
'@fastify/ajv-compiler@4.0.1':
@@ -15523,6 +15538,10 @@ snapshots:
ts-interface-checker@0.1.13: {}
ts-mockito@2.6.1:
dependencies:
lodash: 4.17.21
ts-node@10.9.2(@swc/core@1.10.9)(@types/node@22.10.7)(typescript@5.7.3):
dependencies:
'@cspotcode/source-map-support': 0.8.1

View File

@@ -80,6 +80,7 @@
"zod": "^4.0.17"
},
"devDependencies": {
"@faker-js/faker": "^9.9.0",
"@rollup/plugin-swc": "^0.4.0",
"@types/archiver": "^6.0.2",
"@types/async-retry": "^1.4.8",
@@ -112,6 +113,7 @@
"tmp": "^0.2.1",
"tmp-promise": "^3.0.3",
"ts-essentials": "^10.0.0",
"ts-mockito": "^2.6.1",
"tsconfig-paths": "^4.2.0",
"tsx": "^4.19.2",
"typed-emitter": "^2.1.0",

View File

@@ -87,6 +87,9 @@ export const debugFfmpegApiRouter: RouterPluginAsyncCallback = async (
.getStream({ path: req.query.path });
lineupItem = {
duration: +dayjs.duration({ seconds: 30 }),
contentDuration: +dayjs.duration({ seconds: 30 }),
infiniteLoop: false,
streamDuration: +dayjs.duration({ seconds: 30 }),
externalKey: 'none',
externalSource: 'emby',
externalSourceId: 'none',

View File

@@ -67,7 +67,6 @@ export const debugStreamApiRouter: RouterPluginAsyncCallback = async (
channel,
channel,
false,
false,
true,
channel.transcodeConfig,
'mpegts',
@@ -292,7 +291,6 @@ export const debugStreamApiRouter: RouterPluginAsyncCallback = async (
channel,
channel,
false,
false,
true,
transcodeConfig,
'mpegts',
@@ -324,5 +322,8 @@ function createStreamItemFromProgram(
plexFilePath: program.plexFilePath ?? undefined,
filePath: program.filePath ?? undefined,
programBeginMs: +dayjs(),
contentDuration: program.duration,
streamDuration: program.duration,
infiniteLoop: false,
};
}

View File

@@ -45,6 +45,7 @@ import { LoadChannelCacheStartupTask } from './services/startup/LoadChannelCache
import { ScheduleJobsStartupTask } from './services/startup/ScheduleJobsStartupTask.ts';
import { SeedFfmpegInfoCache } from './services/startup/SeedFfmpegInfoCache.ts';
import { SeedSystemDevicesStartupTask } from './services/startup/SeedSystemDevicesStartupTask.ts';
import { ChannelCache } from './stream/ChannelCache.ts';
import { FixerRunner } from './tasks/fixers/FixerRunner.ts';
import { Timer } from './util/Timer.ts';
import { getBooleanEnvVar, USE_WORKER_POOL_ENV_VAR } from './util/env.ts';
@@ -116,6 +117,7 @@ const RootModule = new ContainerModule((bind) => {
bind<interfaces.AutoFactory<TimeSlotSchedulerService>>(
KEYS.TimeSlotSchedulerServiceFactory,
).toAutoFactory(TimeSlotSchedulerService);
bind(KEYS.ChannelCache).to(ChannelCache).inSingletonScope();
bind(KEYS.StartupTask).to(SeedSystemDevicesStartupTask).inSingletonScope();
bind(KEYS.StartupTask).to(ClearM3uCacheStartupTask).inSingletonScope();
@@ -143,7 +145,7 @@ const RootModule = new ContainerModule((bind) => {
container.load(RootModule);
container.load(dbContainer);
container.load(StreamModule);
container.load(new StreamModule());
container.load(TasksModule);
container.load(HealthCheckModule);
container.load(FixerModule);

View File

@@ -7,6 +7,7 @@ import type { interfaces } from 'inversify';
import { ContainerModule } from 'inversify';
import type { Kysely } from 'kysely';
import { DBAccess } from './DBAccess.ts';
import { FillerDB } from './FillerListDB.ts';
import type { DB } from './schema/db.ts';
const DBModule = new ContainerModule((bind) => {
@@ -19,6 +20,7 @@ const DBModule = new ContainerModule((bind) => {
bind<interfaces.Factory<Kysely<DB>>>(KEYS.DatabaseFactory).toAutoFactory(
KEYS.Database,
);
bind(KEYS.FillerListDB).to(FillerDB).inSingletonScope();
});
export { DBModule as dbContainer };

View File

@@ -2,6 +2,7 @@ import type { IProgramDB } from '@/db/interfaces/IProgramDB.js';
import { ChannelCache } from '@/stream/ChannelCache.js';
import { KEYS } from '@/types/inject.js';
import { isNonEmptyString } from '@/util/index.js';
import { ContentProgram } from '@tunarr/types';
import {
CreateFillerListRequest,
UpdateFillerListRequest,
@@ -28,8 +29,14 @@ import {
uniq,
values,
} from 'lodash-es';
import { MarkRequired } from 'ts-essentials';
import { v4 } from 'uuid';
import { Maybe, Nilable } from '../types/util.ts';
import { ProgramConverter } from './converters/ProgramConverter.ts';
import {
FillerShowWithContent,
IFillerListDB,
} from './interfaces/IFillerListDB.ts';
import { createPendingProgramIndexMap } from './programHelpers.ts';
import { withFillerPrograms } from './programQueryHelpers.ts';
import { ChannelFillerShow } from './schema/Channel.ts';
@@ -42,7 +49,7 @@ import { DB } from './schema/db.ts';
import type { ChannelFillerShowWithContent } from './schema/derivedTypes.ts';
@injectable()
export class FillerDB {
export class FillerDB implements IFillerListDB {
constructor(
@inject(ChannelCache) private channelCache: ChannelCache,
@inject(KEYS.ProgramDB) private programDB: IProgramDB,
@@ -50,7 +57,7 @@ export class FillerDB {
@inject(KEYS.Database) private db: Kysely<DB>,
) {}
getFiller(id: string) {
getFiller(id: string): Promise<Maybe<FillerShowWithContent>> {
return this.db
.selectFrom('fillerShow')
.where('uuid', '=', id)
@@ -59,7 +66,10 @@ export class FillerDB {
.executeTakeFirst();
}
async saveFiller(id: string, updateRequest: UpdateFillerListRequest) {
async saveFiller(
id: string,
updateRequest: UpdateFillerListRequest,
): Promise<Nilable<FillerShowWithContent>> {
const filler = await this.getFiller(id);
if (isNil(filler)) {
@@ -179,7 +189,9 @@ export class FillerDB {
}
// Returns all channels a given filler list is a part of
async getFillerChannels(id: string) {
async getFillerChannels(
id: string,
): Promise<Array<{ number: number; name: string }>> {
return this.db
.selectFrom('channelFillerShow')
.where('channelFillerShow.fillerShowUuid', '=', id)
@@ -326,7 +338,9 @@ export class FillerDB {
.execute();
}
async getFillerPrograms(id: string) {
async getFillerPrograms(
id: string,
): Promise<MarkRequired<ContentProgram, 'id'>[]> {
const programs = await this.db
.selectFrom('fillerShow')
.where('fillerShow.uuid', '=', id)

View File

@@ -13,7 +13,6 @@ const baseStreamLineupItemSchema = z.object({
streamDuration: z
.number()
.nonnegative()
.optional()
.describe('The amount of time left in the stream'),
// beginningOffset: z.number().nonnegative().optional(),
title: z.string().optional(),
@@ -120,34 +119,26 @@ export type OfflineStreamLineupItem = z.infer<
typeof OfflineStreamLineupItemSchema
>;
export const LoadingStreamLineupItemSchema = baseStreamLineupItemSchema
.extend({
type: z.literal('loading'),
})
.required({ streamDuration: true });
export type LoadingStreamLineupItem = z.infer<
typeof LoadingStreamLineupItemSchema
>;
const BaseContentBackedStreamLineupItemSchema =
baseStreamLineupItemSchema.extend({
// ID in the program DB table
programId: z.string().uuid(),
programId: z.uuid(),
// These are taken from the Program DB entity
plexFilePath: z.string().optional(),
externalSourceId: z.string(),
filePath: z.string().optional(),
externalKey: z.string(),
programType: ContentProgramTypeSchema,
externalSource: z.nativeEnum(MediaSourceType),
externalSource: z.enum(MediaSourceType),
infiniteLoop: z.boolean(),
contentDuration: z.number().describe('The duration of the content itself'),
});
const CommercialStreamLineupItemSchema =
BaseContentBackedStreamLineupItemSchema.extend({
type: z.literal('commercial'),
fillerId: z.string(),
}).required({ streamDuration: true });
});
export type CommercialStreamLineupItem = z.infer<
typeof CommercialStreamLineupItemSchema
@@ -182,7 +173,6 @@ export type RedirectStreamLineupItem = z.infer<
export const StreamLineupItemSchema = z.discriminatedUnion('type', [
ProgramStreamLineupItemSchema,
CommercialStreamLineupItemSchema,
LoadingStreamLineupItemSchema,
OfflineStreamLineupItemSchema,
RedirectStreamLineupItemSchema,
ErrorStreamLineupItemSchema,
@@ -198,6 +188,7 @@ export type StreamLineupItem = z.infer<typeof StreamLineupItemSchema>;
// consolidated as we rewrite pieces of the streaming pipeline.
export const EnrichedLineupItemSchema = z.discriminatedUnion('type', [
ProgramStreamLineupItemSchema,
CommercialStreamLineupItemSchema,
OfflineStreamLineupItemSchema,
RedirectStreamLineupItemSchema,
ErrorStreamLineupItemSchema,
@@ -214,5 +205,6 @@ export function createOfflineStreamLineupItem(
startOffset: 0,
type: 'offline',
programBeginMs,
streamDuration: duration,
};
}

View File

@@ -0,0 +1,36 @@
import type { ContentProgram } from '@tunarr/types';
import type {
CreateFillerListRequest,
UpdateFillerListRequest,
} from '@tunarr/types/api';
import type { MarkRequired } from 'ts-essentials';
import type { Maybe, Nilable } from '../../types/util.ts';
import type { FillerShow } from '../schema/FillerShow.ts';
import type { ProgramDao } from '../schema/Program.ts';
import type { ChannelFillerShowWithContent } from '../schema/derivedTypes.js';
export interface IFillerListDB {
getFiller(id: string): Promise<Maybe<FillerShowWithContent>>;
saveFiller(
id: string,
updateRequest: UpdateFillerListRequest,
): Promise<Nilable<FillerShowWithContent>>;
createFiller(createRequest: CreateFillerListRequest): Promise<string>;
getFillerChannels(
id: string,
): Promise<Array<{ number: number; name: string }>>;
deleteFiller(id: string): Promise<void>;
getAllFillerIds(): Promise<string[]>;
getFillerPrograms(id: string): Promise<MarkRequired<ContentProgram, 'id'>[]>;
getFillersFromChannel(
channelId: string,
): Promise<ChannelFillerShowWithContent[]>;
}
export type FillerShowWithContent = FillerShow & {
fillerContent: Array<
ProgramDao & {
index: number;
}
>;
};

View File

@@ -535,6 +535,7 @@ export class FfmpegStreamFactory extends IFFMPEG {
videoFormat: playbackParams.videoFormat,
videoProfile: null, // 'main', // TODO:
deinterlace: playbackParams.deinterlace,
infiniteLoop: lineupItem.infiniteLoop,
}),
pipelineOptions,
);

View File

@@ -529,6 +529,11 @@ export abstract class BasePipelineBuilder implements PipelineBuilder {
this.setRealtime();
if (this.desiredState.infiniteLoop) {
this.videoInputSource?.addOption(new InfiniteLoopInputOption());
this.audioInputSource?.addOption(new InfiniteLoopInputOption());
}
if (
this.desiredState.videoFormat !== VideoFormats.Copy &&
this.desiredState.frameRate

View File

@@ -29,6 +29,7 @@ export const DefaultFrameState: Omit<
pixelFormat: null,
bitDepth: 8,
forceSoftwareOverlay: false,
infiniteLoop: false,
};
export class FrameState {
@@ -47,6 +48,7 @@ export class FrameState {
frameDataLocation: FrameDataLocation;
deinterlace: boolean;
pixelFormat: Nullable<PixelFormat>;
infiniteLoop: boolean = false;
forceSoftwareOverlay = false;

View File

@@ -0,0 +1,22 @@
import type { StreamLineupItem } from '../db/derived_types/StreamLineup.ts';
export interface IStreamLineupCache {
getCurrentLineupItem(
channelId: string,
timeNow: number,
): StreamLineupItem | undefined;
getProgramLastPlayTime(channelId: string, programId: string): number;
getFillerLastPlayTime(channelId: string, fillerId: string): number;
recordPlayback(
channelId: string,
t0: number,
lineupItem: StreamLineupItem,
): Promise<void>;
clear(): void;
clearPlayback(channelId: string): Promise<void>;
}

View File

@@ -13,6 +13,7 @@ import {
StreamLineupItemSchema,
isCommercialLineupItem,
} from '../db/derived_types/StreamLineup.ts';
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
import { KEYS } from '../types/inject.ts';
const SLACK = constants.SLACK;
@@ -98,7 +99,7 @@ export class PersistentChannelCache {
}
@injectable()
export class ChannelCache {
export class ChannelCache implements IStreamLineupCache {
constructor(
@inject(PersistentChannelCache)
private persistentChannelCache: PersistentChannelCache,

View File

@@ -4,7 +4,6 @@ import type { OutputFormat } from '@/ffmpeg/builder/constants.js';
import type { CacheImageService } from '@/services/cacheImageService.js';
import { Result } from '@/types/result.js';
import { LoggerFactory } from '@/util/logging/LoggerFactory.js';
import { makeLocalUrl } from '@/util/serverUtil.js';
import dayjs from 'dayjs';
import { isError, isUndefined } from 'lodash-es';
import type { FFmpegFactory } from '../ffmpeg/FFmpegModule.ts';
@@ -30,18 +29,6 @@ export class OfflineProgramStream extends ProgramStream {
outputFormat: OutputFormat,
) {
super(context, outputFormat, settingsDB, cacheImageService, ffmpegFactory);
if (context.isLoading === true) {
context.targetChannel = {
...context.targetChannel,
offline: {
...context.targetChannel.offline,
mode: 'pic',
picture: makeLocalUrl('/images/loading-screen.png'),
soundtrack: undefined,
},
};
}
}
protected shutdownInternal() {}

View File

@@ -23,7 +23,6 @@ export class PlayerContext {
public targetChannel: Channel,
public sourceChannel: Channel,
public audioOnly: boolean,
public isLoading: boolean,
public realtime: boolean,
public transcodeConfig: TranscodeConfig,
public streamMode: ChannelStreamMode,
@@ -50,7 +49,6 @@ export class PlayerContext {
targetChannel,
sourceChannel,
false,
false,
realtime,
transcodeConfig,
streamMode,

View File

@@ -26,6 +26,7 @@ import type { interfaces } from 'inversify';
import { ContainerModule } from 'inversify';
import type { ISettingsDB } from '../db/interfaces/ISettingsDB.ts';
import type { FFmpegFactory } from '../ffmpeg/FFmpegModule.ts';
import { FillerPicker } from '../services/FillerPicker.ts';
import type { UpdatePlexPlayStatusScheduledTaskFactory } from '../tasks/plex/UpdatePlexPlayStatusTask.ts';
import { UpdatePlexPlayStatusScheduledTask } from '../tasks/plex/UpdatePlexPlayStatusTask.ts';
import { bindFactoryFunc } from '../util/inject.ts';
@@ -41,7 +42,7 @@ export type OfflineStreamFactoryType = interfaces.MultiFactory<
[PlayerContext, OutputFormat]
>;
const StreamModule = new ContainerModule((bind) => {
const configure: interfaces.ContainerModuleCallBack = (bind) => {
bind(SessionManager).toSelf().inSingletonScope();
bindFactoryFunc<ProgramStreamFactory>(
@@ -136,10 +137,8 @@ const StreamModule = new ContainerModule((bind) => {
playerContext.lineupItem.externalSource,
)(playerContext, outputFormat);
}
case 'loading':
case 'offline':
case 'error': {
// const isLoading = playerContext.lineupItem.type === 'loading';
const isError = playerContext.lineupItem.type === 'error';
return ctx.container.getNamed<OfflineStreamFactoryType>(
KEYS.ProgramStreamFactory,
@@ -224,6 +223,14 @@ const StreamModule = new ContainerModule((bind) => {
bind(ExternalStreamDetailsFetcherFactory).toSelf().inSingletonScope();
bind(PersistentChannelCache).toSelf().inSingletonScope();
});
bind(KEYS.FillerPicker).to(FillerPicker).inSingletonScope();
};
class StreamModule extends ContainerModule {
constructor() {
super(configure);
}
}
export { StreamModule };

View File

@@ -0,0 +1,289 @@
import { faker } from '@faker-js/faker';
import dayjs from 'dayjs';
import { now, sumBy } from 'lodash-es';
import { instance, mock, verify, when } from 'ts-mockito';
import { test as baseTest } from 'vitest';
import { LineupItem } from '../db/derived_types/Lineup.ts';
import { StreamLineupItem } from '../db/derived_types/StreamLineup.ts';
import { IChannelDB } from '../db/interfaces/IChannelDB.ts';
import { IFillerListDB } from '../db/interfaces/IFillerListDB.ts';
import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
import { calculateStartTimeOffsets } from '../db/lineupUtil.ts';
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
import {
createChannel,
createFakeProgram,
} from '../testing/fakes/entityCreators.ts';
import { LoggerFactory } from '../util/logging/LoggerFactory.ts';
import { StreamProgramCalculator } from './StreamProgramCalculator.ts';
describe('StreamProgramCalculator', () => {
baseTest('getCurrentLineupItem simple', async () => {
const fillerDB = mock<IFillerListDB>();
const channelDB = mock<IChannelDB>();
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
const programId1 = faker.string.uuid();
const programId2 = faker.string.uuid();
const lineup: LineupItem[] = [
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId1,
},
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId2,
},
];
when(programDB.getProgramById(programId1)).thenReturn(
Promise.resolve(
createFakeProgram({ uuid: programId1, duration: lineup[0].durationMs }),
),
);
when(programDB.getProgramById(programId2)).thenReturn(
Promise.resolve(
createFakeProgram({ uuid: programId2, duration: lineup[1].durationMs }),
),
);
const channel = createChannel({
uuid: channelId,
number: 1,
startTime: +startTime.subtract(1, 'hour'),
duration: sumBy(lineup, ({ durationMs }) => durationMs),
});
when(channelDB.getChannel(1)).thenReturn(Promise.resolve(channel));
when(channelDB.loadLineup(channelId)).thenReturn(
Promise.resolve({
version: 1,
items: lineup,
startTimeOffsets: calculateStartTimeOffsets(lineup),
lastUpdated: now(),
}),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
instance(channelDB),
instance(channelCache),
instance(programDB),
instance(fillerPicker),
);
const out = (
await calc.getCurrentLineupItem({
allowSkip: false,
channelId: 1,
startTime: +startTime,
})
).get();
expect(out.lineupItem).toMatchObject<Partial<StreamLineupItem>>({
streamDuration: +dayjs.duration(6, 'minutes'),
programId: programId1,
infiniteLoop: false,
programBeginMs: +startTime - +dayjs.duration(16, 'minutes'),
startOffset: +dayjs.duration(16, 'minutes'),
});
verify(
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
).once();
});
baseTest('getCurrentLineupItem filler lineup item', async () => {
const fillerDB = mock<IFillerListDB>();
const channelDB = mock<IChannelDB>();
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
const programId1 = faker.string.uuid();
const programId2 = faker.string.uuid();
const fillerListId = faker.string.uuid();
const lineup: LineupItem[] = [
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId1,
fillerListId: fillerListId,
},
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId2,
},
];
when(programDB.getProgramById(programId1)).thenReturn(
Promise.resolve(
createFakeProgram({ uuid: programId1, duration: lineup[0].durationMs }),
),
);
when(programDB.getProgramById(programId2)).thenReturn(
Promise.resolve(
createFakeProgram({ uuid: programId2, duration: lineup[1].durationMs }),
),
);
const channel = createChannel({
uuid: channelId,
number: 1,
startTime: +startTime.subtract(1, 'hour'),
duration: sumBy(lineup, ({ durationMs }) => durationMs),
});
when(channelDB.getChannel(1)).thenReturn(Promise.resolve(channel));
when(channelDB.loadLineup(channelId)).thenReturn(
Promise.resolve({
version: 1,
items: lineup,
startTimeOffsets: calculateStartTimeOffsets(lineup),
lastUpdated: now(),
}),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
instance(channelDB),
instance(channelCache),
instance(programDB),
instance(fillerPicker),
);
const out = (
await calc.getCurrentLineupItem({
allowSkip: false,
channelId: 1,
startTime: +startTime,
})
).get();
expect(out.lineupItem).toMatchObject<Partial<StreamLineupItem>>({
streamDuration: +dayjs.duration(6, 'minutes'),
programId: programId1,
infiniteLoop: false,
programBeginMs: +startTime - +dayjs.duration(16, 'minutes'),
startOffset: +dayjs.duration(16, 'minutes'),
fillerId: fillerListId,
type: 'commercial',
});
verify(
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
).once();
});
baseTest('getCurrentLineupItem loop filler lineup item', async () => {
const fillerDB = mock<IFillerListDB>();
const channelDB = mock<IChannelDB>();
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
const programId1 = faker.string.uuid();
const programId2 = faker.string.uuid();
const fillerListId = faker.string.uuid();
const lineup: LineupItem[] = [
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId1,
fillerListId: fillerListId,
},
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId2,
},
];
when(programDB.getProgramById(programId1)).thenReturn(
Promise.resolve(
createFakeProgram({
uuid: programId1,
duration: +dayjs.duration({ minutes: 2 }),
}),
),
);
when(programDB.getProgramById(programId2)).thenReturn(
Promise.resolve(
createFakeProgram({ uuid: programId2, duration: lineup[1].durationMs }),
),
);
const channel = createChannel({
uuid: channelId,
number: 1,
startTime: +startTime.subtract(1, 'hour'),
duration: sumBy(lineup, ({ durationMs }) => durationMs),
});
when(channelDB.getChannel(1)).thenReturn(Promise.resolve(channel));
when(channelDB.loadLineup(channelId)).thenReturn(
Promise.resolve({
version: 1,
items: lineup,
startTimeOffsets: calculateStartTimeOffsets(lineup),
lastUpdated: now(),
}),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
instance(channelDB),
instance(channelCache),
instance(programDB),
instance(fillerPicker),
);
const out = (
await calc.getCurrentLineupItem({
allowSkip: false,
channelId: 1,
startTime: +startTime,
})
).get();
expect(out.lineupItem).toMatchObject<Partial<StreamLineupItem>>({
streamDuration: +dayjs.duration(6, 'minutes'),
programId: programId1,
infiniteLoop: true,
programBeginMs: +startTime - +dayjs.duration(16, 'minutes'),
startOffset: +dayjs.duration(16, 'minutes'),
fillerId: fillerListId,
type: 'commercial',
duration: +dayjs.duration(22, 'minutes'),
contentDuration: +dayjs.duration(2, 'minutes'),
});
verify(
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
).once();
});
});

View File

@@ -1,11 +1,7 @@
import { ChannelDB } from '@/db/ChannelDB.js';
import { FillerDB } from '@/db/FillerListDB.js';
import { ProgramDB } from '@/db/ProgramDB.js';
import { ProgramExternalIdType } from '@/db/custom_types/ProgramExternalIdType.js';
import { Channel } from '@/db/schema/Channel.js';
import { MediaSourceType } from '@/db/schema/MediaSource.js';
import type { ProgramWithRelations as RawProgramEntity } from '@/db/schema/derivedTypes.js';
import { FillerPicker } from '@/services/FillerPicker.js';
import { KEYS } from '@/types/inject.js';
import { Result } from '@/types/result.js';
import { Maybe, Nullable } from '@/types/util.js';
@@ -23,8 +19,12 @@ import {
StreamLineupItem,
createOfflineStreamLineupItem,
} from '../db/derived_types/StreamLineup.ts';
import { IChannelDB } from '../db/interfaces/IChannelDB.ts';
import { IFillerListDB } from '../db/interfaces/IFillerListDB.ts';
import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
import { isNonEmptyString, nullToUndefined } from '../util/index.js';
import { ChannelCache } from './ChannelCache.js';
import { wereThereTooManyAttempts } from './StreamThrottler.js';
const SLACK = constants.SLACK;
@@ -71,11 +71,11 @@ export type CurrentLineupItemResult = {
export class StreamProgramCalculator {
constructor(
@inject(KEYS.Logger) private logger: Logger,
@inject(FillerDB) private fillerDB: FillerDB,
@inject(KEYS.ChannelDB) private channelDB: ChannelDB,
@inject(ChannelCache) private channelCache: ChannelCache,
@inject(KEYS.ProgramDB) private programDB: ProgramDB,
@inject(FillerPicker) private fillerPicker: FillerPicker,
@inject(KEYS.FillerListDB) private fillerDB: IFillerListDB,
@inject(KEYS.ChannelDB) private channelDB: IChannelDB,
@inject(KEYS.ChannelCache) private channelCache: IStreamLineupCache,
@inject(KEYS.ProgramDB) private programDB: IProgramDB,
@inject(KEYS.FillerPicker) private fillerPicker: IFillerPicker,
) {}
async getCurrentLineupItem(
@@ -183,6 +183,7 @@ export class StreamProgramCalculator {
const newEndTime = req.startTime + timeLeft;
if (newEndTime < endTimeMs) {
streamDuration = newEndTime - req.startTime;
currentProgram.program.streamDuration = streamDuration;
}
}
}
@@ -295,6 +296,7 @@ export class StreamProgramCalculator {
duration: 60_000,
startOffset: 0,
programBeginMs: req.startTime,
streamDuration,
};
}
@@ -310,6 +312,7 @@ export class StreamProgramCalculator {
timestamp: number,
channel: MinimalChannelDetails,
channelLineup: Lineup,
streamDuration?: number,
): Promise<ProgramAndTimeElapsed> {
if (channel.startTime > timestamp) {
this.logger.debug(
@@ -366,6 +369,15 @@ export class StreamProgramCalculator {
throw new Error('No program found; find algorithm messed up');
}
const currOffset = channelLineup.startTimeOffsets[currentProgramIndex];
const nextOffset =
currOffset +
channelLineup.items[
(currentProgramIndex + 1) % channelLineup.items.length
].durationMs;
streamDuration ??= nextOffset - currOffset - elapsed;
const lineupItem = channelLineup.items[currentProgramIndex];
let program: EnrichedLineupItem;
switch (lineupItem.type) {
@@ -376,6 +388,7 @@ export class StreamProgramCalculator {
duration: lineupItem.durationMs,
type: 'offline',
programBeginMs: timestamp - timeElapsed,
streamDuration,
};
if (backingItem) {
@@ -400,20 +413,37 @@ export class StreamProgramCalculator {
if (!mediaSourceType) {
throw new Error('Impossible');
}
program = {
type: 'program',
const baseItem = {
externalSource: mediaSourceType,
plexFilePath: nullToUndefined(externalInfo.externalFilePath),
externalKey: externalInfo.externalKey,
filePath: nullToUndefined(externalInfo.directFilePath),
externalSourceId: externalInfo.externalSourceId,
duration: backingItem.duration,
contentDuration: backingItem.duration,
duration: lineupItem.durationMs,
programId: backingItem.uuid,
title: backingItem.title,
// id: backingItem.uuid,
programType: backingItem.type,
programBeginMs: timestamp - timeElapsed,
streamDuration,
};
if (isNonEmptyString(lineupItem.fillerListId)) {
program = {
...baseItem,
type: 'commercial',
fillerId: lineupItem.fillerListId,
infiniteLoop: backingItem.duration < streamDuration,
};
} else {
program = {
...baseItem,
type: 'program',
infiniteLoop: false,
};
}
} else {
this.logger.warn(
'Found a backing item, but could not find external ID info!! %O',
@@ -437,6 +467,7 @@ export class StreamProgramCalculator {
channel: lineupItem.channel,
type: 'redirect',
programBeginMs: timestamp - timeElapsed,
streamDuration,
};
break;
}
@@ -455,8 +486,6 @@ export class StreamProgramCalculator {
// 2b. If no fillter content is found, then pad with more offline time
// 3. Return the currently playing "real" program
async createLineupItem(
// activeProgram: StrictExclude<EnrichedLineupItem, RedirectStreamLineupItem>,
// timeElapsed: number,
{ program, timeElapsed }: ProgramAndTimeElapsed,
streamDuration: number,
channel: Channel,
@@ -537,6 +566,10 @@ export class StreamProgramCalculator {
if (!isEmpty(externalInfos)) {
const externalInfo = first(externalInfos)!;
streamDuration = Math.max(
1,
Math.min(filler.duration - fillerstart, streamDuration),
);
return {
// just add the video, starting at 0, playing the entire duration
type: 'commercial',
@@ -548,18 +581,16 @@ export class StreamProgramCalculator {
? MediaSourceType.Jellyfin
: MediaSourceType.Plex,
startOffset: fillerstart,
streamDuration: Math.max(
1,
Math.min(filler.duration - fillerstart, streamDuration),
),
duration: filler.duration,
streamDuration,
contentDuration: filler.duration,
duration: program.duration,
programId: filler.uuid,
// beginningOffset,
externalSourceId: externalInfo.externalSourceId!,
plexFilePath: nullToUndefined(externalInfo.externalFilePath),
programType: filler.type,
programBeginMs: program.programBeginMs,
fillerId: filler.uuid,
infiniteLoop: filler.duration < streamDuration,
} satisfies CommercialStreamLineupItem;
}
}
@@ -580,12 +611,19 @@ export class StreamProgramCalculator {
};
}
if (program.type === 'commercial') {
return {
...program,
startOffset: timeElapsed,
streamDuration,
};
}
return {
...program,
type: 'program',
startOffset: timeElapsed,
streamDuration,
// beginningOffset: timeElapsed,
};
}
}

View File

@@ -137,7 +137,6 @@ export class VideoStream {
result.channelContext,
result.sourceChannel,
audioOnly,
result.lineupItem.type === 'loading',
true,
channel.transcodeConfig,
streamMode,

View File

@@ -173,7 +173,6 @@ export class HlsSession extends BaseHlsSession {
result.channelContext,
result.sourceChannel,
false,
result.lineupItem.type === 'loading',
realtime,
this.channel.transcodeConfig,
this.sessionType,

View File

@@ -57,7 +57,6 @@ export class HlsSlowerSession extends BaseHlsSession {
);
return lineupItemResult.mapAsync(async (result) => {
const { lineupItem } = result;
const transcodeBuffer = dayjs
.duration(dayjs(this.transcodedUntil).diff(now))
.asSeconds();
@@ -73,7 +72,6 @@ export class HlsSlowerSession extends BaseHlsSession {
result.channelContext,
result.sourceChannel,
request.audioOnly,
lineupItem.type === 'loading',
this.#realtimeTranscode,
this.channel.transcodeConfig,
this.sessionType,

View File

@@ -0,0 +1,258 @@
import type {
ChannelProgramming,
CondensedChannelProgramming,
SaveableChannel,
} from '@tunarr/types';
import type { UpdateChannelProgrammingRequest } from '@tunarr/types/api';
import type { ContentProgramType } from '@tunarr/types/schemas';
import type { MarkRequired } from 'ts-essentials';
import type { ChannelQueryBuilder } from '../../db/ChannelQueryBuilder.ts';
import type {
Lineup,
LineupItem,
PendingProgram,
} from '../../db/derived_types/Lineup.ts';
import type {
ChannelAndRawLineup,
ChannnelAndLineup,
IChannelDB,
PageParams,
UpdateChannelLineupRequest,
} from '../../db/interfaces/IChannelDB.ts';
import type { Channel } from '../../db/schema/Channel.ts';
import type {
ChannelWithPrograms,
ChannelWithRelations,
MusicArtistWithExternalIds,
ProgramWithRelations,
TvShowWithExternalIds,
} from '../../db/schema/derivedTypes.js';
import type { ProgramDao } from '../../db/schema/Program.ts';
import type { ProgramExternalId } from '../../db/schema/ProgramExternalId.ts';
import type { ChannelSubtitlePreferences } from '../../db/schema/SubtitlePreferences.ts';
import type { ChannelAndLineup } from '../../types/internal.ts';
import type { Maybe, Nullable, PagedResult } from '../../types/util.ts';
export class FakeChannelDB implements IChannelDB {
channelExists(channelId: string): Promise<boolean> {
throw new Error('Method not implemented.');
}
getChannel(id: string | number): Promise<Maybe<ChannelWithRelations>>;
getChannel(
id: string | number,
includeFiller: true,
): Promise<Maybe<MarkRequired<ChannelWithRelations, 'fillerShows'>>>;
getChannel(
id: string | number,
includeFiller: boolean = false,
): Promise<Maybe<ChannelWithRelations>> {
throw new Error('Method not implemented.');
}
getChannelBuilder(
id: string | number,
): ChannelQueryBuilder<ChannelWithRelations> {
throw new Error('Method not implemented.');
}
getAllChannels(pageParams?: PageParams): Promise<Channel[]> {
throw new Error('Method not implemented.');
}
getChannelAndPrograms(
uuid: string,
typeFilter?: ContentProgramType,
): Promise<ChannelWithPrograms | undefined> {
throw new Error('Method not implemented.');
}
getChannelTvShows(
id: string,
pageParams?: PageParams,
): Promise<PagedResult<TvShowWithExternalIds>> {
throw new Error('Method not implemented.');
}
getChannelMusicArtists(
id: string,
pageParams?: PageParams,
): Promise<PagedResult<MusicArtistWithExternalIds>> {
throw new Error('Method not implemented.');
}
getChannelPrograms(
id: string,
pageParams?: PageParams,
typeFilter?: ContentProgramType,
): Promise<ProgramWithRelations[]> {
throw new Error('Method not implemented.');
}
getChannelProgramExternalIds(uuid: string): Promise<ProgramExternalId[]> {
throw new Error('Method not implemented.');
}
getChannelFallbackPrograms(uuid: string): Promise<ProgramDao[]> {
throw new Error('Method not implemented.');
}
saveChannel(createReq: SaveableChannel): Promise<ChannnelAndLineup> {
throw new Error('Method not implemented.');
}
deleteChannel(
channelId: string,
blockOnLineupUpdates?: boolean,
): Promise<void> {
throw new Error('Method not implemented.');
}
updateChannel(
id: string,
updateReq: SaveableChannel,
): Promise<ChannelAndLineup> {
throw new Error('Method not implemented.');
}
copyChannel(id: string): Promise<ChannelAndLineup> {
throw new Error('Method not implemented.');
}
loadLineup(channelId: string, forceRead?: boolean): Promise<Lineup> {
throw new Error('Method not implemented.');
}
loadCondensedLineup(
channelId: string,
offset?: number,
limit?: number,
): Promise<CondensedChannelProgramming | null> {
throw new Error('Method not implemented.');
}
updateLineup(
id: string,
req: UpdateChannelProgrammingRequest,
): Promise<Nullable<{ channel: Channel; newLineup: LineupItem[] }>> {
throw new Error('Method not implemented.');
}
saveLineup(
channelId: string,
newLineup: UpdateChannelLineupRequest,
): Promise<Lineup> {
throw new Error('Method not implemented.');
}
updateLineupConfig<
Key extends keyof Omit<
Lineup,
'items' | 'startTimeOffsets' | 'pendingPrograms'
>,
>(id: string, key: Key, conf: Lineup[Key]): Promise<void> {
throw new Error('Method not implemented.');
}
removeProgramsFromLineup(
channelId: string,
programIds: string[],
): Promise<void> {
throw new Error('Method not implemented.');
}
removeProgramsFromAllLineups(programIds: string[]): Promise<void> {
throw new Error('Method not implemented.');
}
loadAllLineupConfigs(
forceRead?: boolean,
): Promise<Record<string, ChannnelAndLineup>> {
throw new Error('Method not implemented.');
}
loadAllRawLineups(): Promise<Record<string, ChannelAndRawLineup>> {
throw new Error('Method not implemented.');
}
loadChannelAndLineup(channelId: string): Promise<ChannnelAndLineup | null> {
throw new Error('Method not implemented.');
}
addPendingPrograms(
channelId: string,
pendingPrograms: PendingProgram[],
): Promise<void> {
throw new Error('Method not implemented.');
}
setChannelPrograms(
channel: Channel,
lineup: readonly LineupItem[],
): Promise<Channel | null>;
setChannelPrograms(
channel: string | Channel,
lineup: readonly LineupItem[],
startTime?: number,
): Promise<Channel | null>;
setChannelPrograms(
channel: unknown,
lineup: unknown,
startTime?: unknown,
): Promise<{
number: number;
duration: number;
uuid: string;
offline: {
mode: 'pic' | 'clip';
picture?: string | undefined;
soundtrack?: string | undefined;
};
createdAt: number | null;
updatedAt: number | null;
name: string;
disableFillerOverlay: number;
fillerRepeatCooldown: number | null;
groupTitle: string | null;
guideFlexTitle: string | null;
guideMinimumDuration: number;
icon: {
path: string;
width: number;
duration: number;
position: 'top-left' | 'top-right' | 'bottom-left' | 'bottom-right';
};
startTime: number;
stealth: number;
streamMode: 'hls' | 'hls_slower' | 'mpegts' | 'hls_direct';
transcoding: {
targetResolution?: { widthPx: number; heightPx: number } | undefined;
videoBitrate?: number | undefined;
videoBufferSize?: number | undefined;
} | null;
transcodeConfigId: string;
watermark: {
enabled: boolean;
position: 'top-left' | 'top-right' | 'bottom-left' | 'bottom-right';
width: number;
verticalMargin: number;
horizontalMargin: number;
duration: number;
opacity: number;
url?: string | undefined;
fixedSize?: boolean | undefined;
animated?: boolean | undefined;
fadeConfig?:
| {
periodMins: number;
programType?:
| 'movie'
| 'episode'
| 'track'
| 'music_video'
| 'other_video'
| undefined;
leadingEdge?: boolean | undefined;
}[]
| undefined;
} | null;
subtitlesEnabled: number;
} | null> {
throw new Error('Method not implemented.');
}
updateChannelStartTime(id: string, newTime: number): Promise<void> {
throw new Error('Method not implemented.');
}
getChannelSubtitlePreferences(
id: string,
): Promise<ChannelSubtitlePreferences[]> {
throw new Error('Method not implemented.');
}
loadAndMaterializeLineup(
channelId: string,
offset?: number,
limit?: number,
): Promise<ChannelProgramming | null> {
throw new Error('Method not implemented.');
}
findChannelsForProgramId(programId: string): Promise<Channel[]> {
throw new Error('Method not implemented.');
}
}

View File

@@ -0,0 +1,45 @@
import type {
CreateFillerListRequest,
UpdateFillerListRequest,
} from '@tunarr/types/api';
import type {
FillerShowWithContent,
IFillerListDB,
} from '../../db/interfaces/IFillerListDB.ts';
import type { ChannelFillerShowWithContent } from '../../db/schema/derivedTypes.js';
import type { ProgramDao } from '../../db/schema/Program.ts';
import type { Maybe, Nilable } from '../../types/util.ts';
export class FakeFillerDB implements IFillerListDB {
getFiller(id: string): Promise<Maybe<FillerShowWithContent>> {
throw new Error('Method not implemented.');
}
saveFiller(
id: string,
updateRequest: UpdateFillerListRequest,
): Promise<Nilable<FillerShowWithContent>> {
throw new Error('Method not implemented.');
}
createFiller(createRequest: CreateFillerListRequest): Promise<string> {
throw new Error('Method not implemented.');
}
getFillerChannels(
id: string,
): Promise<Array<{ number: number; name: string }>> {
throw new Error('Method not implemented.');
}
deleteFiller(id: string): Promise<void> {
throw new Error('Method not implemented.');
}
getAllFillerIds(): Promise<string[]> {
throw new Error('Method not implemented.');
}
getFillerPrograms(id: string): Promise<ProgramDao[]> {
throw new Error('Method not implemented.');
}
getFillersFromChannel(
channelId: string,
): Promise<ChannelFillerShowWithContent[]> {
throw new Error('Method not implemented.');
}
}

View File

@@ -0,0 +1,44 @@
import { faker } from '@faker-js/faker';
import { MultiExternalIdType } from '@tunarr/types/schemas';
import type { Channel } from '../../db/schema/Channel.ts';
import type { ProgramDao } from '../../db/schema/Program.ts';
import { ProgramTypes } from '../../db/schema/Program.ts';
import type { MinimalProgramExternalId } from '../../db/schema/ProgramExternalId.ts';
import type { ProgramWithExternalIds } from '../../db/schema/derivedTypes.js';
export function createChannel(overrides?: Partial<Channel>): Channel {
return {
uuid: faker.string.uuid(),
duration: faker.number.int({ min: 0 }),
startTime: faker.date.past().getTime(),
createdAt: null,
disableFillerOverlay: faker.datatype.boolean() ? 1 : 0,
name: faker.music.artist(),
...(overrides ?? {}),
} satisfies Channel;
}
export function createFakeMultiExternalId(): MinimalProgramExternalId {
const typ = faker.helpers.arrayElement(MultiExternalIdType);
return {
sourceType: typ,
externalKey: faker.string.alphanumeric(),
externalSourceId: faker.string.uuid(),
} satisfies MinimalProgramExternalId;
}
export function createFakeProgram(
overrides?: Partial<ProgramDao>,
): ProgramWithExternalIds {
return {
uuid: faker.string.uuid(), // programId2,
duration: faker.number.int({ min: 1 }), //lineup[1].durationMs,
year: faker.date.past().getFullYear(),
title: faker.music.songName(),
externalIds: [createFakeMultiExternalId()],
type: faker.helpers.arrayElement(ProgramTypes),
summary: faker.lorem.sentences(),
...(overrides ?? {}),
} satisfies ProgramWithExternalIds;
}

View File

@@ -17,6 +17,7 @@ const KEYS = {
DatabaseFactory: Symbol.for('DatabaseFactory'),
ChannelDB: Symbol.for('ChannelDB'),
ProgramDB: Symbol.for('ProgramDB'),
FillerListDB: Symbol.for('FillerListDB'),
SettingsDB: Symbol.for('SettingsDB'),
MediaSourceApiFactory: Symbol.for('MediaSourceApiFactory'),
TimeSlotSchedulerServiceFactory: Symbol.for(
@@ -48,6 +49,9 @@ const KEYS = {
WorkerPoolFactory: Symbol.for('WorkerPoolFactory'),
ContentSourceUpdateFactory: Symbol.for('ContentSourceUpdateFactory'),
FillerPicker: Symbol.for('FillerPicker'),
ChannelCache: Symbol.for('ChannelCache'),
};
export type LoggerFactory = (args: GetChildLoggerArgs) => Logger;

View File

@@ -1,7 +1,7 @@
import type { SettingsDB } from '@/db/SettingsDB.js';
import type { Maybe, TupleToUnion } from '@/types/util.js';
import { getDefaultLogLevel } from '@/util/defaults.js';
import { isNonEmptyString, isProduction } from '@/util/index.js';
import { isNonEmptyString, isProduction, isTest } from '@/util/index.js';
import {
forEach,
isEmpty,
@@ -133,7 +133,7 @@ class LoggerFactoryImpl {
);
}
child(opts: GetChildLoggerArgs) {
child(opts: GetChildLoggerArgs): Logger {
const { caller, className, ...rest } = opts;
if (this.children[className]) {
@@ -254,7 +254,7 @@ class LoggerFactoryImpl {
// We can only add these streams post-initialization because they
// require configuration.
if (!isUndefined(this.settingsDB)) {
if (!isUndefined(this.settingsDB) && !isTest) {
streams.push({
// stream: pino.destination({
// dest: join(