feat: keep play history in the DB (#1600)

This commit is contained in:
Christian Benincasa
2026-01-21 11:50:10 -05:00
committed by GitHub
parent bcf8c511a2
commit fe2a460546
18 changed files with 4596 additions and 201 deletions

View File

@@ -8,6 +8,7 @@ import { ContainerModule } from 'inversify';
import type { Kysely } from 'kysely';
import { DBAccess } from './DBAccess.ts';
import { FillerDB } from './FillerListDB.ts';
import { ProgramPlayHistoryDB } from './ProgramPlayHistoryDB.ts';
import { ProgramDaoMinter } from './converters/ProgramMinter.ts';
import type { DB } from './schema/db.ts';
import type { DrizzleDBAccess } from './schema/index.ts';
@@ -29,6 +30,7 @@ const DBModule = new ContainerModule((bind) => {
KEYS.DrizzleDatabaseFactory,
).toAutoFactory(KEYS.DrizzleDB);
bind(KEYS.FillerListDB).to(FillerDB).inSingletonScope();
bind(ProgramPlayHistoryDB).toSelf().inSingletonScope();
bind(ProgramDaoMinter).toSelf();
bind<interfaces.AutoFactory<ProgramDaoMinter>>(

View File

@@ -0,0 +1,165 @@
import { KEYS } from '@/types/inject.js';
import { and, eq, gt } from 'drizzle-orm';
import { inject, injectable } from 'inversify';
import { v4 } from 'uuid';
import {
NewProgramPlayHistoryDrizzle,
ProgramPlayHistory,
} from './schema/ProgramPlayHistory.ts';
import { DrizzleDBAccess } from './schema/index.ts';
@injectable()
export class ProgramPlayHistoryDB {
constructor(@inject(KEYS.DrizzleDB) private drizzle: DrizzleDBAccess) {}
getById(id: string) {
return this.drizzle.query.programPlayHistory.findFirst({
where: (fields, { eq }) => eq(fields.uuid, id),
});
}
getByProgramId(programId: string) {
return this.drizzle.query.programPlayHistory.findMany({
where: (fields, { eq }) => eq(fields.programUuid, programId),
orderBy: (fields, { desc }) => desc(fields.playedAt),
});
}
getByChannelId(channelId: string, limit?: number) {
return this.drizzle.query.programPlayHistory.findMany({
where: (fields, { eq }) => eq(fields.channelUuid, channelId),
orderBy: (fields, { desc }) => desc(fields.playedAt),
limit,
});
}
getByChannelAndProgram(channelId: string, programId: string) {
return this.drizzle.query.programPlayHistory.findMany({
where: (fields, { eq, and }) =>
and(
eq(fields.channelUuid, channelId),
eq(fields.programUuid, programId),
),
orderBy: (fields, { desc }) => desc(fields.playedAt),
});
}
getAll(limit?: number) {
return this.drizzle.query.programPlayHistory.findMany({
orderBy: (fields, { desc }) => desc(fields.playedAt),
limit,
});
}
create(
data: Omit<NewProgramPlayHistoryDrizzle, 'uuid' | 'createdAt'> & {
uuid?: string;
createdAt?: Date;
},
) {
const now = new Date();
const record: NewProgramPlayHistoryDrizzle = {
uuid: data.uuid ?? v4(),
programUuid: data.programUuid,
channelUuid: data.channelUuid,
playedAt: data.playedAt,
playedDuration: data.playedDuration,
createdAt: data.createdAt ?? now,
fillerListId: data.fillerListId,
};
return this.drizzle
.insert(ProgramPlayHistory)
.values(record)
.returning()
.then((rows) => rows[0]);
}
update(
id: string,
data: Partial<
Pick<
NewProgramPlayHistoryDrizzle,
'playedAt' | 'playedDuration' | 'fillerListId'
>
>,
) {
return this.drizzle
.update(ProgramPlayHistory)
.set(data)
.where(eq(ProgramPlayHistory.uuid, id))
.returning()
.then((rows) => rows[0]);
}
delete(id: string) {
return this.drizzle
.delete(ProgramPlayHistory)
.where(eq(ProgramPlayHistory.uuid, id))
.returning()
.then((rows) => rows.length > 0);
}
deleteByChannelId(channelId: string) {
return this.drizzle
.delete(ProgramPlayHistory)
.where(eq(ProgramPlayHistory.channelUuid, channelId))
.returning()
.then((rows) => rows.length);
}
deleteByProgramId(programId: string) {
return this.drizzle
.delete(ProgramPlayHistory)
.where(eq(ProgramPlayHistory.programUuid, programId))
.returning()
.then((rows) => rows.length);
}
deleteByChannelIdAfter(channelId: string, after: Date) {
return this.drizzle
.delete(ProgramPlayHistory)
.where(
and(
eq(ProgramPlayHistory.channelUuid, channelId),
gt(ProgramPlayHistory.playedAt, after),
),
)
.returning()
.then((rows) => rows.length);
}
/**
* Gets the most recent play history entry for a program on a channel.
* Returns the entry if found, or undefined if no play history exists.
*/
async getMostRecentPlay(channelId: string, programId: string) {
return this.drizzle.query.programPlayHistory.findFirst({
where: (fields, { eq, and }) =>
and(
eq(fields.channelUuid, channelId),
eq(fields.programUuid, programId),
),
orderBy: (fields, { desc }) => desc(fields.playedAt),
});
}
/**
* Checks if a program is currently playing on a channel.
* A program is considered "currently playing" if there's a play history entry
* where playedAt + playedDuration > timestamp.
*/
async isProgramCurrentlyPlaying(
channelId: string,
programId: string,
timestamp: number,
): Promise<boolean> {
const mostRecent = await this.getMostRecentPlay(channelId, programId);
if (!mostRecent || !mostRecent.playedDuration) {
return false;
}
const playEndTime =
mostRecent.playedAt.getTime() + mostRecent.playedDuration;
return timestamp < playEndTime;
}
}

View File

@@ -45,7 +45,11 @@ export function isProgramLineupItem(
export function isContentBackedLineupItem(
item: StreamLineupItem,
): item is ContentBackedStreamLineupItem {
return isCommercialLineupItem(item) || isProgramLineupItem(item);
return (
isCommercialLineupItem(item) ||
isProgramLineupItem(item) ||
item.type === 'fallback'
);
}
export function isPlexBackedLineupItem(
@@ -77,7 +81,8 @@ export function isEmnyBackedLineupItem(
export type ContentBackedStreamLineupItem =
| CommercialStreamLineupItem
| ProgramStreamLineupItem;
| ProgramStreamLineupItem
| FallbackStreamLineupItem;
export type MinimalContentStreamLineupItem = {
programId: string;
@@ -121,7 +126,11 @@ type BaseContentBackedStreamLineupItem = BaseStreamLineupItem & {
export type CommercialStreamLineupItem = BaseContentBackedStreamLineupItem & {
type: 'commercial';
fillerId: string;
fillerListId: string;
};
export type FallbackStreamLineupItem = BaseContentBackedStreamLineupItem & {
type: 'fallback';
};
export type ProgramStreamLineupItem = BaseContentBackedStreamLineupItem & {
@@ -144,7 +153,8 @@ export type StreamLineupItem =
| CommercialStreamLineupItem
| OfflineStreamLineupItem
| RedirectStreamLineupItem
| ErrorStreamLineupItem;
| ErrorStreamLineupItem
| FallbackStreamLineupItem;
export function createOfflineStreamLineupItem(
duration: number,

View File

@@ -18,6 +18,7 @@ import { ChannelCustomShow } from './ChannelCustomShow.ts';
import { ChannelFillerShow } from './ChannelFillerShow.ts';
import { ChannelPrograms } from './ChannelPrograms.ts';
import type { KyselifyBetter } from './KyselifyBetter.ts';
import { ProgramPlayHistory } from './ProgramPlayHistory.ts';
export const Channel = sqliteTable('channel', {
uuid: text().primaryKey(),
@@ -64,4 +65,5 @@ export const ChannelRelations = relations(Channel, ({ many }) => ({
channelPrograms: many(ChannelPrograms),
channelCustomShows: many(ChannelCustomShow),
channelFillerShow: many(ChannelFillerShow),
playHistory: many(ProgramPlayHistory),
}));

View File

@@ -23,6 +23,7 @@ import { MediaSource } from './MediaSource.ts';
import { MediaSourceLibrary } from './MediaSourceLibrary.ts';
import { ProgramExternalId } from './ProgramExternalId.ts';
import { ProgramGrouping } from './ProgramGrouping.ts';
import { ProgramPlayHistory } from './ProgramPlayHistory.ts';
import { ProgramSubtitles } from './ProgramSubtitles.ts';
import { ProgramVersion } from './ProgramVersion.ts';
import { StudioEntity } from './Studio.ts';
@@ -166,6 +167,7 @@ export const ProgramRelations = relations(Program, ({ many, one }) => ({
genres: many(EntityGenre),
studios: many(StudioEntity),
tags: many(TagRelations),
playHistory: many(ProgramPlayHistory),
}));
export type ProgramTable = KyselifyBetter<typeof Program>;

View File

@@ -0,0 +1,62 @@
import type { InferInsertModel, InferSelectModel } from 'drizzle-orm';
import { relations } from 'drizzle-orm';
import { index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-core';
import type { Insertable, Selectable } from 'kysely';
import { Channel } from './Channel.ts';
import { FillerShow } from './FillerShow.ts';
import type { KyselifyBetter } from './KyselifyBetter.ts';
import { Program } from './Program.ts';
export const ProgramPlayHistory = sqliteTable(
'program_play_history',
{
uuid: text().primaryKey(),
programUuid: text()
.notNull()
.references(() => Program.uuid, { onDelete: 'cascade' }),
channelUuid: text()
.notNull()
.references(() => Channel.uuid, { onDelete: 'cascade' }),
// Timestamp when this program started playing (milliseconds since epoch)
playedAt: integer({ mode: 'timestamp_ms' }).notNull(),
// How long the program was played in milliseconds (useful for tracking partial plays)
playedDuration: integer(),
createdAt: integer({ mode: 'timestamp_ms' }).notNull(),
fillerListId: text().references(() => FillerShow.uuid),
},
(table) => [
index('program_play_history_program_uuid_index').on(table.programUuid),
index('program_play_history_channel_uuid_index').on(
table.channelUuid,
table.fillerListId,
),
index('program_play_history_played_at_index').on(table.playedAt),
// Composite index for querying play history by channel ordered by time
index('program_play_history_channel_played_at_index').on(
table.channelUuid,
table.playedAt,
),
],
);
export type ProgramPlayHistoryTable = KyselifyBetter<typeof ProgramPlayHistory>;
export type ProgramPlayHistoryDao = Selectable<ProgramPlayHistoryTable>;
export type ProgramPlayHistoryOrm = InferSelectModel<typeof ProgramPlayHistory>;
export type NewProgramPlayHistoryDao = Insertable<ProgramPlayHistoryTable>;
export type NewProgramPlayHistoryDrizzle = InferInsertModel<
typeof ProgramPlayHistory
>;
export const ProgramPlayHistoryRelations = relations(
ProgramPlayHistory,
({ one }) => ({
channel: one(Channel, {
fields: [ProgramPlayHistory.channelUuid],
references: [Channel.uuid],
}),
program: one(Program, {
fields: [ProgramPlayHistory.programUuid],
references: [Program.uuid],
}),
}),
);

View File

@@ -54,6 +54,10 @@ import {
ProgramExternalId,
ProgramExternalIdRelations,
} from './ProgramExternalId.ts';
import {
ProgramPlayHistory,
ProgramPlayHistoryRelations,
} from './ProgramPlayHistory.ts';
import {
ProgramGrouping,
ProgramGroupingRelations,
@@ -111,6 +115,8 @@ export const schema = {
programGroupingRelations: ProgramGroupingRelations,
programExternalId: ProgramExternalId,
programExternalIdRelations: ProgramExternalIdRelations,
programPlayHistory: ProgramPlayHistory,
programPlayHistoryRelations: ProgramPlayHistoryRelations,
programGroupingExternalId: ProgramGroupingExternalId,
programGroupingExternalIdRelations: ProgramGroupingExternalIdRelations,
programMediaStream: ProgramMediaStream,

View File

@@ -186,6 +186,9 @@ export class DirectMigrationProvider implements MigrationProvider {
migration1768825617: makeKyselyMigrationFromSqlFile(
'./sql/0037_orange_bromley.sql',
),
migration1768876318: makeKyselyMigrationFromSqlFile(
'./sql/0038_boring_maginty.sql',
),
},
wrapWithTransaction,
),

View File

@@ -0,0 +1,15 @@
CREATE TABLE `program_play_history` (
`uuid` text PRIMARY KEY NOT NULL,
`program_uuid` text NOT NULL,
`channel_uuid` text NOT NULL,
`played_at` integer NOT NULL,
`played_duration` integer,
`created_at` integer NOT NULL,
FOREIGN KEY (`program_uuid`) REFERENCES `program`(`uuid`) ON UPDATE no action ON DELETE cascade,
FOREIGN KEY (`channel_uuid`) REFERENCES `channel`(`uuid`) ON UPDATE no action ON DELETE cascade
);
--> statement-breakpoint
CREATE INDEX `program_play_history_program_uuid_index` ON `program_play_history` (`program_uuid`);--> statement-breakpoint
CREATE INDEX `program_play_history_channel_uuid_index` ON `program_play_history` (`channel_uuid`);--> statement-breakpoint
CREATE INDEX `program_play_history_played_at_index` ON `program_play_history` (`played_at`);--> statement-breakpoint
CREATE INDEX `program_play_history_channel_played_at_index` ON `program_play_history` (`channel_uuid`,`played_at`);

View File

@@ -0,0 +1,17 @@
CREATE TABLE `program_play_history` (
`uuid` text PRIMARY KEY NOT NULL,
`program_uuid` text NOT NULL,
`channel_uuid` text NOT NULL,
`played_at` integer NOT NULL,
`played_duration` integer,
`created_at` integer NOT NULL,
`filler_list_id` text,
FOREIGN KEY (`program_uuid`) REFERENCES `program`(`uuid`) ON UPDATE no action ON DELETE cascade,
FOREIGN KEY (`channel_uuid`) REFERENCES `channel`(`uuid`) ON UPDATE no action ON DELETE cascade,
FOREIGN KEY (`filler_list_id`) REFERENCES `filler_show`(`uuid`) ON UPDATE no action ON DELETE no action
);
--> statement-breakpoint
CREATE INDEX `program_play_history_program_uuid_index` ON `program_play_history` (`program_uuid`);--> statement-breakpoint
CREATE INDEX `program_play_history_channel_uuid_index` ON `program_play_history` (`channel_uuid`,`filler_list_id`);--> statement-breakpoint
CREATE INDEX `program_play_history_played_at_index` ON `program_play_history` (`played_at`);--> statement-breakpoint
CREATE INDEX `program_play_history_channel_played_at_index` ON `program_play_history` (`channel_uuid`,`played_at`);

File diff suppressed because it is too large Load Diff

View File

@@ -267,6 +267,13 @@
"when": 1768792601901,
"tag": "0037_orange_bromley",
"breakpoints": true
},
{
"idx": 38,
"version": "6",
"when": 1768876233969,
"tag": "0038_boring_maginty",
"breakpoints": true
}
]
}

View File

@@ -1,4 +1,5 @@
import { CompiledQuery, type Kysely } from 'kysely';
import { castArray } from 'lodash-es';
import fs from 'node:fs/promises';
import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
@@ -52,14 +53,17 @@ export async function processSqlMigrationFile(
}
export function makeKyselyMigrationFromSqlFile(
filePath: string,
filePaths: string | Array<string>,
fullCopy: boolean = false,
): TunarrDatabaseMigration {
return {
fullCopy,
async up(db) {
for (const statement of await processSqlMigrationFile(filePath)) {
await db.executeQuery(statement);
filePaths = castArray(filePaths);
for (const path of filePaths) {
for (const statement of await processSqlMigrationFile(path)) {
await db.executeQuery(statement);
}
}
},
};

View File

@@ -1,183 +0,0 @@
import type { Channel } from '@/db/schema/Channel.js';
import { ChannelCache } from '@/stream/ChannelCache.js';
import type { Maybe } from '@/types/util.js';
import { random } from '@/util/random.js';
import constants from '@tunarr/shared/constants';
import dayjs from 'dayjs';
import { injectable } from 'inversify';
import { isEmpty, isNil } from 'lodash-es';
import type {
ChannelFillerShowWithContent,
ProgramWithRelations,
} from '../db/schema/derivedTypes.js';
import {
EmptyFillerPickResult,
IFillerPicker,
} from './interfaces/IFillerPicker.ts';
const DefaultFillerCooldownMillis = 30 * 60 * 1000;
const OneDayMillis = 7 * 24 * 60 * 60 * 1000;
const FiveMinutesMillis = 5 * 60 * 60 * 1000;
@injectable()
export class BestFitFillerPicker implements IFillerPicker {
#channelCache: ChannelCache;
constructor(channelCache: ChannelCache) {
this.#channelCache = channelCache;
}
pickFiller(
channel: Channel,
channelFillerLists: ChannelFillerShowWithContent[],
maxDuration: number,
) {
if (isEmpty(channelFillerLists)) {
return EmptyFillerPickResult;
}
let pick1: Maybe<ProgramWithRelations>;
const now = +dayjs();
let minimumWait = Number.MAX_SAFE_INTEGER;
const fillerRepeatCooldownMs =
channel.fillerRepeatCooldown ?? DefaultFillerCooldownMillis;
let listPickWeight = 0;
let fillerId: Maybe<string>;
let minUntilFillerListAvailable = Number.MAX_SAFE_INTEGER;
const listsNotInCooldown = channelFillerLists.filter(
({ cooldown, fillerShowUuid }) => {
const fillerListLastPlayed = this.#channelCache.getFillerLastPlayTime(
channel.uuid,
fillerShowUuid,
);
minUntilFillerListAvailable = Math.min(
minUntilFillerListAvailable,
fillerListLastPlayed,
);
const timeSince =
fillerListLastPlayed === 0
? OneDayMillis
: now - fillerListLastPlayed;
return timeSince < cooldown + constants.SLACK;
},
);
if (listsNotInCooldown.length === 0) {
return EmptyFillerPickResult;
}
for (const viableList of listsNotInCooldown) {
// @ts-expect-error - WIP
const _viablePrograms = viableList.fillerContent.filter((program) => {
if (program.duration > maxDuration + constants.SLACK) {
return false;
}
const lastPlayedTime = this.#channelCache.getProgramLastPlayTime(
channel.uuid,
program.uuid,
);
// @ts-expect-error - WIP
const _timeSinceLastPlayed =
lastPlayedTime === 0 ? OneDayMillis : now - lastPlayedTime;
});
}
for (const channelFillerList of channelFillerLists) {
const fillerPrograms = channelFillerList.fillerContent;
let pickedList = false;
let n = 0;
for (const fillerProgram of fillerPrograms) {
// a few extra milliseconds won't hurt anyone, would it? dun dun dun
if (fillerProgram.duration <= maxDuration + constants.SLACK) {
const lastPlayedTime = this.#channelCache.getProgramLastPlayTime(
channel.uuid,
fillerProgram.uuid,
);
let timeSinceLastPlayed =
lastPlayedTime === 0 ? OneDayMillis : now - lastPlayedTime;
if (timeSinceLastPlayed < fillerRepeatCooldownMs - constants.SLACK) {
const w = fillerRepeatCooldownMs - timeSinceLastPlayed;
if (fillerProgram.duration + w <= maxDuration + constants.SLACK) {
minimumWait = Math.min(minimumWait, w);
}
timeSinceLastPlayed = 0;
//30 minutes is too little, don't repeat it at all
} else if (!pickedList) {
const fillerListLastPlayed =
this.#channelCache.getFillerLastPlayTime(
channel.uuid,
channelFillerList.fillerShow.uuid,
);
const timeSince =
fillerListLastPlayed === 0
? OneDayMillis
: now - fillerListLastPlayed;
if (timeSince + constants.SLACK >= channelFillerList.cooldown) {
//should we pick this list?
listPickWeight += channelFillerList.weight;
if (random.bool(channelFillerList.weight, listPickWeight)) {
pickedList = true;
fillerId = channelFillerList.fillerShow.uuid;
n = 0;
} else {
break;
}
} else {
const w = channelFillerList.cooldown - timeSince;
if (fillerProgram.duration + w <= maxDuration + constants.SLACK) {
minimumWait = Math.min(minimumWait, w);
}
break;
}
}
if (timeSinceLastPlayed <= 0) {
continue;
}
const s = norm_s(
timeSinceLastPlayed >= FiveMinutesMillis
? FiveMinutesMillis
: timeSinceLastPlayed,
);
const d = norm_d(fillerProgram.duration);
const w = s + d;
n += w;
if (random.bool(w, n)) {
pick1 = fillerProgram;
}
}
}
}
return {
fillerListId: fillerId!,
filler: isNil(pick1) ? null : pick1,
minimumWait,
};
}
}
function norm_d(x: number) {
x /= 60 * 1000;
if (x >= 3.0) {
x = 3.0 + Math.log(x);
}
const y = 10000 * (Math.ceil(x * 1000) + 1);
return Math.ceil(y / 1000000) + 1;
}
function norm_s(x: number) {
let y = Math.ceil(x / 600) + 1;
y = y * y;
return Math.ceil(y / 1000000) + 1;
}

View File

@@ -108,7 +108,7 @@ export class ChannelCache implements IStreamLineupCache {
if (isCommercialLineupItem(lineupItem)) {
await this.persistentChannelCache.setFillerPlayTime(
this.getKey(channelId, lineupItem.fillerId),
this.getKey(channelId, lineupItem.fillerListId),
t0 + remaining,
);
}

View File

@@ -150,7 +150,8 @@ const configure: interfaces.ContainerModuleCallBack = (bind) => {
return (playerContext: PlayerContext, outputFormat: OutputFormat) => {
switch (playerContext.lineupItem.type) {
case 'program':
case 'commercial': {
case 'commercial':
case 'fallback': {
return ctx.container.getNamed<ProgramStreamFactory>(
KEYS.ProgramStreamFactory,
playerContext.lineupItem.program.sourceType,

View File

@@ -3,7 +3,7 @@ import { tag } from '@tunarr/types';
import dayjs from 'dayjs';
import { now, sum, sumBy } from 'lodash-es';
import { DeepPartial } from 'ts-essentials';
import { instance, mock, verify, when } from 'ts-mockito';
import { anything, instance, mock, verify, when } from 'ts-mockito';
import { test as baseTest } from 'vitest';
import { Lineup, LineupItem } from '../db/derived_types/Lineup.ts';
import { StreamLineupItem } from '../db/derived_types/StreamLineup.ts';
@@ -11,6 +11,7 @@ import { IChannelDB } from '../db/interfaces/IChannelDB.ts';
import { IFillerListDB } from '../db/interfaces/IFillerListDB.ts';
import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
import { calculateStartTimeOffsets } from '../db/lineupUtil.ts';
import { ProgramPlayHistoryDB } from '../db/ProgramPlayHistoryDB.ts';
import { MediaSourceId } from '../db/schema/base.ts';
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
@@ -31,6 +32,7 @@ describe('StreamProgramCalculator', () => {
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const playHistoryDB = mock<ProgramPlayHistoryDB>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
@@ -88,6 +90,18 @@ describe('StreamProgramCalculator', () => {
}),
);
// Mock play history - program not currently playing
when(
playHistoryDB.isProgramCurrentlyPlaying(
anything(),
anything(),
anything(),
),
).thenReturn(Promise.resolve(false));
when(playHistoryDB.create(anything())).thenReturn(
Promise.resolve(undefined),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
@@ -95,6 +109,7 @@ describe('StreamProgramCalculator', () => {
instance(channelCache),
instance(programDB),
instance(fillerPicker),
instance(playHistoryDB),
);
const out = (
@@ -126,6 +141,7 @@ describe('StreamProgramCalculator', () => {
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const playHistoryDB = mock<ProgramPlayHistoryDB>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
@@ -185,6 +201,18 @@ describe('StreamProgramCalculator', () => {
}),
);
// Mock play history - program not currently playing
when(
playHistoryDB.isProgramCurrentlyPlaying(
anything(),
anything(),
anything(),
),
).thenReturn(Promise.resolve(false));
when(playHistoryDB.create(anything())).thenReturn(
Promise.resolve(undefined),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
@@ -192,6 +220,7 @@ describe('StreamProgramCalculator', () => {
instance(channelCache),
instance(programDB),
instance(fillerPicker),
instance(playHistoryDB),
);
const out = (
@@ -210,7 +239,7 @@ describe('StreamProgramCalculator', () => {
infiniteLoop: false,
programBeginMs: +startTime - +dayjs.duration(16, 'minutes'),
startOffset: +dayjs.duration(16, 'minutes'),
fillerId: fillerListId,
fillerListId: fillerListId,
type: 'commercial',
});
@@ -225,6 +254,7 @@ describe('StreamProgramCalculator', () => {
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const playHistoryDB = mock<ProgramPlayHistoryDB>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
@@ -284,6 +314,18 @@ describe('StreamProgramCalculator', () => {
}),
);
// Mock play history - program not currently playing
when(
playHistoryDB.isProgramCurrentlyPlaying(
anything(),
anything(),
anything(),
),
).thenReturn(Promise.resolve(false));
when(playHistoryDB.create(anything())).thenReturn(
Promise.resolve(undefined),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
@@ -291,6 +333,7 @@ describe('StreamProgramCalculator', () => {
instance(channelCache),
instance(programDB),
instance(fillerPicker),
instance(playHistoryDB),
);
const out = (
@@ -307,7 +350,7 @@ describe('StreamProgramCalculator', () => {
infiniteLoop: true,
programBeginMs: +startTime - +dayjs.duration(16, 'minutes'),
startOffset: +dayjs.duration(16, 'minutes'),
fillerId: fillerListId,
fillerListId: fillerListId,
type: 'commercial',
duration: +dayjs.duration(22, 'minutes'),
});
@@ -317,6 +360,222 @@ describe('StreamProgramCalculator', () => {
).once();
});
baseTest('records play history for new playback', async () => {
const fillerDB = mock<IFillerListDB>();
const channelDB = mock<IChannelDB>();
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const playHistoryDB = mock<ProgramPlayHistoryDB>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
const programId1 = faker.string.uuid();
const programId2 = faker.string.uuid();
const lineup: LineupItem[] = [
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId1,
},
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId2,
},
];
when(programDB.getProgramById(programId1)).thenReturn(
Promise.resolve(
createFakeProgram({
uuid: programId1,
duration: lineup[0].durationMs,
mediaSourceId: tag<MediaSourceId>('mediasource-123'),
}),
),
);
when(programDB.getProgramById(programId2)).thenReturn(
Promise.resolve(
createFakeProgram({
uuid: programId2,
duration: lineup[1].durationMs,
mediaSourceId: tag<MediaSourceId>('mediasource-123'),
}),
),
);
const channel = createChannel({
uuid: channelId,
number: 1,
startTime: +startTime.subtract(1, 'hour'),
duration: sumBy(lineup, ({ durationMs }) => durationMs),
});
when(channelDB.getChannel(1)).thenReturn(Promise.resolve(channel));
when(channelDB.loadLineup(channelId)).thenReturn(
Promise.resolve({
version: 1,
items: lineup,
startTimeOffsets: calculateStartTimeOffsets(lineup),
lastUpdated: now(),
}),
);
// Mock play history - program NOT currently playing
when(
playHistoryDB.isProgramCurrentlyPlaying(
anything(),
anything(),
anything(),
),
).thenReturn(Promise.resolve(false));
when(playHistoryDB.create(anything())).thenReturn(
Promise.resolve(undefined),
);
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
instance(channelDB),
instance(channelCache),
instance(programDB),
instance(fillerPicker),
instance(playHistoryDB),
);
await calc.getCurrentLineupItem({
allowSkip: false,
channelId: 1,
startTime: +startTime,
});
// Wait for async play history recording
await new Promise((resolve) => setTimeout(resolve, 10));
// Verify play history was checked
verify(
playHistoryDB.isProgramCurrentlyPlaying(
channelId,
programId1,
+startTime,
),
).once();
// Verify play history was created since program was not currently playing
verify(playHistoryDB.create(anything())).once();
});
baseTest(
'does not record duplicate play history when program is already playing',
async () => {
const fillerDB = mock<IFillerListDB>();
const channelDB = mock<IChannelDB>();
const programDB = mock<IProgramDB>();
const fillerPicker = mock<IFillerPicker>();
const channelCache = mock<IStreamLineupCache>();
const playHistoryDB = mock<ProgramPlayHistoryDB>();
const startTime = dayjs(new Date(2025, 8, 17, 8));
const channelId = faker.string.uuid();
const programId1 = faker.string.uuid();
const programId2 = faker.string.uuid();
const lineup: LineupItem[] = [
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId1,
},
{
type: 'content',
durationMs: +dayjs.duration({ minutes: 22 }),
id: programId2,
},
];
when(programDB.getProgramById(programId1)).thenReturn(
Promise.resolve(
createFakeProgram({
uuid: programId1,
duration: lineup[0].durationMs,
mediaSourceId: tag<MediaSourceId>('mediasource-123'),
}),
),
);
when(programDB.getProgramById(programId2)).thenReturn(
Promise.resolve(
createFakeProgram({
uuid: programId2,
duration: lineup[1].durationMs,
mediaSourceId: tag<MediaSourceId>('mediasource-123'),
}),
),
);
const channel = createChannel({
uuid: channelId,
number: 1,
startTime: +startTime.subtract(1, 'hour'),
duration: sumBy(lineup, ({ durationMs }) => durationMs),
});
when(channelDB.getChannel(1)).thenReturn(Promise.resolve(channel));
when(channelDB.loadLineup(channelId)).thenReturn(
Promise.resolve({
version: 1,
items: lineup,
startTimeOffsets: calculateStartTimeOffsets(lineup),
lastUpdated: now(),
}),
);
// Mock play history - program IS currently playing (simulates another client already connected)
when(
playHistoryDB.isProgramCurrentlyPlaying(
anything(),
anything(),
anything(),
),
).thenReturn(Promise.resolve(true));
const calc = new StreamProgramCalculator(
LoggerFactory.root,
instance(fillerDB),
instance(channelDB),
instance(channelCache),
instance(programDB),
instance(fillerPicker),
instance(playHistoryDB),
);
await calc.getCurrentLineupItem({
allowSkip: false,
channelId: 1,
startTime: +startTime,
});
// Wait for async play history recording
await new Promise((resolve) => setTimeout(resolve, 10));
// Verify play history was checked
verify(
playHistoryDB.isProgramCurrentlyPlaying(
channelId,
programId1,
+startTime,
),
).once();
// Verify play history was NOT created since program was already playing
verify(playHistoryDB.create(anything())).never();
},
);
describe('calculateStreamDuration', () => {
test('first channel cycle', () => {
const lineupItems: LineupItem[] = [

View File

@@ -12,13 +12,16 @@ import { first, isEmpty, isNil, isNull, sumBy } from 'lodash-es';
import { Lineup } from '../db/derived_types/Lineup.ts';
import {
CommercialStreamLineupItem,
createOfflineStreamLineupItem,
FallbackStreamLineupItem,
isContentBackedLineupItem,
ProgramStreamLineupItem,
StreamLineupItem,
createOfflineStreamLineupItem,
} from '../db/derived_types/StreamLineup.ts';
import { IChannelDB } from '../db/interfaces/IChannelDB.ts';
import { IFillerListDB } from '../db/interfaces/IFillerListDB.ts';
import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
import { ProgramPlayHistoryDB } from '../db/ProgramPlayHistoryDB.ts';
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
import { WrappedError } from '../types/errors.ts';
@@ -75,6 +78,8 @@ export class StreamProgramCalculator {
@inject(KEYS.ChannelCache) private channelCache: IStreamLineupCache,
@inject(KEYS.ProgramDB) private programDB: IProgramDB,
@inject(KEYS.FillerPicker) private fillerPicker: IFillerPicker,
@inject(ProgramPlayHistoryDB)
private programPlayHistoryDB: ProgramPlayHistoryDB,
) {}
async getCurrentLineupItem(
@@ -234,6 +239,43 @@ export class StreamProgramCalculator {
lineupItem,
);
// Record play history for content-backed items (programs and commercials/fillers)
// Only record if this is a new playback (not a duplicate request for an already-playing program)
if (
isContentBackedLineupItem(lineupItem) &&
lineupItem.type !== 'fallback'
) {
const programUuid = lineupItem.program.uuid;
const fillerListId =
lineupItem.type === 'commercial' ? lineupItem.fillerListId : undefined;
const streamDuration = lineupItem.streamDuration;
const channelUuid = channel.uuid;
const playedAt = new Date(req.startTime);
this.programPlayHistoryDB
.isProgramCurrentlyPlaying(channelUuid, programUuid, req.startTime)
.then((isCurrentlyPlaying) => {
if (!isCurrentlyPlaying) {
return this.programPlayHistoryDB.create({
programUuid,
channelUuid,
playedAt,
playedDuration: streamDuration,
fillerListId,
});
}
return;
})
.catch((err) => {
this.logger.error(
err,
'Failed to record play history for program %s on channel %s',
programUuid,
channelUuid,
);
});
}
if (
req.sessionToken &&
wereThereTooManyAttempts(req.sessionToken, lineupItem)
@@ -309,7 +351,7 @@ export class StreamProgramCalculator {
program = {
...baseItem,
type: 'commercial',
fillerId: lineupItem.fillerListId,
fillerListId: lineupItem.fillerListId,
infiniteLoop: backingItem.duration < streamDuration,
} satisfies CommercialStreamLineupItem;
} else {
@@ -452,9 +494,7 @@ export class StreamProgramCalculator {
);
const startOffset = Math.round(fillerstart);
return {
// just add the video, starting at 0, playing the entire duration
type: 'commercial',
const base = {
program: {
...fillerProgram,
mediaSourceId,
@@ -463,7 +503,22 @@ export class StreamProgramCalculator {
streamDuration,
duration: program.duration,
programBeginMs: program.programBeginMs,
fillerId: filler.uuid,
fillerListId: filler.uuid,
infiniteLoop: filler.duration < streamDuration,
};
if (isSpecial) {
return {
...base,
type: 'fallback',
} satisfies FallbackStreamLineupItem;
}
return {
...base,
// just add the video, starting at 0, playing the entire duration
type: 'commercial',
fillerListId: filler.uuid,
infiniteLoop: filler.duration < streamDuration,
} satisfies CommercialStreamLineupItem;
}