mirror of
https://github.com/chrisbenincasa/tunarr.git
synced 2026-04-18 09:03:35 -04:00
refactor: remove unused stream cache code
This commit is contained in:
@@ -30,34 +30,32 @@ import { OnDemandChannelService } from './services/OnDemandChannelService.js';
|
||||
import { TVGuideService } from './services/TvGuideService.ts';
|
||||
import { CacheImageService } from './services/cacheImageService.js';
|
||||
import { MediaSourceScanCoordinator } from './services/scanner/MediaSourceScanCoordinator.ts';
|
||||
import { ChannelCache } from './stream/ChannelCache.js';
|
||||
import { SessionManager } from './stream/SessionManager.js';
|
||||
import { StreamProgramCalculator } from './stream/StreamProgramCalculator.js';
|
||||
|
||||
@injectable()
|
||||
export class ServerContext {
|
||||
@inject(ProgramConverter) public readonly programConverter: ProgramConverter;
|
||||
@inject(ProgramConverter) public readonly programConverter!: ProgramConverter;
|
||||
@inject(OnDemandChannelService)
|
||||
public readonly onDemandChannelService: OnDemandChannelService;
|
||||
@inject(KEYS.ChannelDB) public channelDB: IChannelDB;
|
||||
@inject(M3uService) public m3uService: M3uService;
|
||||
@inject(KEYS.SettingsDB) public settings: ISettingsDB;
|
||||
public readonly onDemandChannelService!: OnDemandChannelService;
|
||||
@inject(KEYS.ChannelDB) public channelDB!: IChannelDB;
|
||||
@inject(M3uService) public m3uService!: M3uService;
|
||||
@inject(KEYS.SettingsDB) public settings!: ISettingsDB;
|
||||
|
||||
@inject(FillerDB) public fillerDB!: FillerDB;
|
||||
public fileCache: FileCacheService = new FileCacheService();
|
||||
public cacheImageService: CacheImageService;
|
||||
@inject(EventService) public eventService: EventService;
|
||||
@inject(TVGuideService) public guideService: TVGuideService;
|
||||
@inject(HdhrService) public hdhrService: HdhrService;
|
||||
@inject(CustomShowDB) public customShowDB: CustomShowDB;
|
||||
@inject(ChannelCache) public channelCache: ChannelCache;
|
||||
@inject(MediaSourceDB) public mediaSourceDB: MediaSourceDB;
|
||||
@inject(KEYS.ProgramDB) public programDB: IProgramDB;
|
||||
@inject(TranscodeConfigDB) public transcodeConfigDB: TranscodeConfigDB;
|
||||
public cacheImageService!: CacheImageService;
|
||||
@inject(EventService) public eventService!: EventService;
|
||||
@inject(TVGuideService) public guideService!: TVGuideService;
|
||||
@inject(HdhrService) public hdhrService!: HdhrService;
|
||||
@inject(CustomShowDB) public customShowDB!: CustomShowDB;
|
||||
@inject(MediaSourceDB) public mediaSourceDB!: MediaSourceDB;
|
||||
@inject(KEYS.ProgramDB) public programDB!: IProgramDB;
|
||||
@inject(TranscodeConfigDB) public transcodeConfigDB!: TranscodeConfigDB;
|
||||
|
||||
@inject(SessionManager) public readonly sessionManager: SessionManager;
|
||||
@inject(SessionManager) public readonly sessionManager!: SessionManager;
|
||||
@inject(HealthCheckService)
|
||||
public readonly healthCheckService: HealthCheckService;
|
||||
public readonly healthCheckService!: HealthCheckService;
|
||||
|
||||
@inject(ChannelLineupMigrator)
|
||||
public readonly channelLineupMigrator!: ChannelLineupMigrator;
|
||||
@@ -77,7 +75,7 @@ export class ServerContext {
|
||||
public readonly drizzleFactory!: interfaces.AutoFactory<DrizzleDBAccess>;
|
||||
|
||||
@inject(KEYS.WorkerPool)
|
||||
public readonly workerPool: IWorkerPool;
|
||||
public readonly workerPool!: IWorkerPool;
|
||||
|
||||
@inject(MeilisearchService)
|
||||
public readonly searchService!: MeilisearchService;
|
||||
|
||||
@@ -49,13 +49,10 @@ import { SearchParser } from './services/search/SearchParser.ts';
|
||||
import { ChannelLineupMigratorStartupTask } from './services/startup/ChannelLineupMigratorStartupTask.ts';
|
||||
import { ClearM3uCacheStartupTask } from './services/startup/ClearM3uCacheStartupTask.ts';
|
||||
import { GenerateGuideStartupTask } from './services/startup/GenerateGuideStartupTask.ts';
|
||||
import { LoadChannelCacheStartupTask } from './services/startup/LoadChannelCacheStartupTask.ts';
|
||||
import { RefreshLibrariesStartupTask } from './services/startup/RefreshLibrariesStartupTask.ts';
|
||||
import { ScheduleJobsStartupTask } from './services/startup/ScheduleJobsStartupTask.ts';
|
||||
import { SeedFfmpegInfoCache } from './services/startup/SeedFfmpegInfoCache.ts';
|
||||
import { SeedSystemDevicesStartupTask } from './services/startup/SeedSystemDevicesStartupTask.ts';
|
||||
import { StreamCacheMigratorStartupTask } from './services/startup/StreamCacheMigratorStartupTask.ts';
|
||||
import { ChannelCache } from './stream/ChannelCache.ts';
|
||||
import { FixerRunner } from './tasks/fixers/FixerRunner.ts';
|
||||
import { ChildProcessHelper } from './util/ChildProcessHelper.ts';
|
||||
import { Timer } from './util/Timer.ts';
|
||||
@@ -152,7 +149,6 @@ const RootModule = new ContainerModule((bind) => {
|
||||
bind<interfaces.AutoFactory<TimeSlotSchedulerService>>(
|
||||
KEYS.TimeSlotSchedulerServiceFactory,
|
||||
).toAutoFactory(TimeSlotSchedulerService);
|
||||
bind(KEYS.ChannelCache).to(ChannelCache).inSingletonScope();
|
||||
|
||||
bind(KEYS.StartupTask).to(SeedSystemDevicesStartupTask).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(ClearM3uCacheStartupTask).inSingletonScope();
|
||||
@@ -163,8 +159,6 @@ const RootModule = new ContainerModule((bind) => {
|
||||
bind(KEYS.StartupTask).to(ScheduleJobsStartupTask).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(FixerRunner).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(GenerateGuideStartupTask).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(LoadChannelCacheStartupTask).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(StreamCacheMigratorStartupTask).inSingletonScope();
|
||||
bind(KEYS.StartupTask).to(RefreshLibrariesStartupTask).inSingletonScope();
|
||||
|
||||
if (getBooleanEnvVar(USE_WORKER_POOL_ENV_VAR, false)) {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import type { IProgramDB } from '@/db/interfaces/IProgramDB.js';
|
||||
import { ChannelCache } from '@/stream/ChannelCache.js';
|
||||
import { KEYS } from '@/types/inject.js';
|
||||
import { isNonEmptyString, programExternalIdString } from '@/util/index.js';
|
||||
import { seq } from '@tunarr/shared/util';
|
||||
@@ -50,7 +49,6 @@ import { DrizzleDBAccess } from './schema/index.ts';
|
||||
@injectable()
|
||||
export class FillerDB implements IFillerListDB {
|
||||
constructor(
|
||||
@inject(ChannelCache) private channelCache: ChannelCache,
|
||||
@inject(KEYS.ProgramDB) private programDB: IProgramDB,
|
||||
@inject(ProgramConverter) private programConverter: ProgramConverter,
|
||||
@inject(KEYS.Database) private db: Kysely<DB>,
|
||||
@@ -326,7 +324,6 @@ export class FillerDB implements IFillerListDB {
|
||||
.execute();
|
||||
});
|
||||
|
||||
this.channelCache.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
import type { StreamLineupItem } from '../db/derived_types/StreamLineup.ts';
|
||||
|
||||
export interface IStreamLineupCache {
|
||||
getProgramLastPlayTime(channelId: string, programId: string): number;
|
||||
|
||||
getFillerLastPlayTime(channelId: string, fillerId: string): number;
|
||||
|
||||
recordPlayback(
|
||||
channelId: string,
|
||||
t0: number,
|
||||
lineupItem: StreamLineupItem,
|
||||
): Promise<void>;
|
||||
|
||||
clear(): void;
|
||||
}
|
||||
@@ -1,109 +0,0 @@
|
||||
import { jsonSchema } from '@/types/schemas.js';
|
||||
import { inject, interfaces } from 'inversify';
|
||||
import { findIndex, isArray } from 'lodash-es';
|
||||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import { CurrentLineupSchemaVersion } from '../../db/derived_types/Lineup.ts';
|
||||
import { GlobalOptions } from '../../globals.ts';
|
||||
import { PersistentChannelCache } from '../../stream/ChannelCache.ts';
|
||||
import { KEYS } from '../../types/inject.ts';
|
||||
import { fileExists } from '../../util/fsUtil.ts';
|
||||
import { parseIntOrNull } from '../../util/index.ts';
|
||||
import { getFirstValue } from '../../util/json.ts';
|
||||
import { Logger } from '../../util/logging/LoggerFactory.ts';
|
||||
import { JsonFileMigrator, MigrationStep } from '../JsonFileMigrator.ts';
|
||||
import { ClearStreamPlayCacheMigration } from './ClearStreamPlayCacheMigration.ts';
|
||||
|
||||
const MigrationSteps: interfaces.ServiceIdentifier<MigrationStep>[] = [
|
||||
ClearStreamPlayCacheMigration,
|
||||
];
|
||||
|
||||
const CurrentVersion = 1;
|
||||
|
||||
export class StreamCacheMigrator extends JsonFileMigrator<MigrationStep> {
|
||||
constructor(
|
||||
@inject(KEYS.Logger) private logger: Logger,
|
||||
@inject(KEYS.GlobalOptions) private opts: GlobalOptions,
|
||||
@inject(PersistentChannelCache)
|
||||
private channelCache: PersistentChannelCache,
|
||||
) {
|
||||
super(MigrationSteps);
|
||||
}
|
||||
|
||||
async run(): Promise<void> {
|
||||
const cachePath = path.join(
|
||||
this.opts.databaseDirectory,
|
||||
'stream-cache.json',
|
||||
);
|
||||
if (!(await fileExists(cachePath))) {
|
||||
return;
|
||||
}
|
||||
const rawCacheContents = await fs.readFile(cachePath);
|
||||
const parsed = jsonSchema.parse(
|
||||
JSON.parse(rawCacheContents.toString('utf-8')),
|
||||
);
|
||||
if (
|
||||
(typeof parsed !== 'object' && typeof parsed !== 'function') ||
|
||||
parsed === null ||
|
||||
isArray(parsed)
|
||||
) {
|
||||
this.logger.warn(
|
||||
'Got invalid cache JSON: %s. Expected object.',
|
||||
JSON.stringify(parsed),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const version = getFirstValue('$.version@number()', parsed, parseIntOrNull);
|
||||
let currVersion = version ?? 0;
|
||||
|
||||
if (currVersion === CurrentVersion) {
|
||||
this.logger.debug(
|
||||
'Cache schema already at latest version: %d',
|
||||
CurrentLineupSchemaVersion,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let migrationIndex = findIndex(
|
||||
this.pipeline,
|
||||
({ from }) => from === currVersion,
|
||||
);
|
||||
|
||||
if (migrationIndex === -1) {
|
||||
this.logger.error(
|
||||
'Error determining which migration to start from for stream cache',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
do {
|
||||
const migration = this.pipeline?.[migrationIndex];
|
||||
if (!migration) {
|
||||
break;
|
||||
}
|
||||
|
||||
await migration.migrate(parsed);
|
||||
currVersion = migration.to;
|
||||
parsed['version'] = currVersion;
|
||||
migrationIndex++;
|
||||
} while (currVersion <= CurrentLineupSchemaVersion);
|
||||
|
||||
await fs.writeFile(
|
||||
path.join(this.opts.databaseDirectory, 'stream-cache.json'),
|
||||
JSON.stringify(parsed),
|
||||
);
|
||||
|
||||
await this.channelCache.init();
|
||||
|
||||
this.logger.info(
|
||||
'Successfully migrated stream cache from version %d to %d',
|
||||
version ?? 0,
|
||||
currVersion,
|
||||
);
|
||||
} catch (e) {
|
||||
this.logger.error(e, 'Error while migrating stream cache schema');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
import type { ChannelOrm } from '@/db/schema/Channel.js';
|
||||
import { ChannelCache } from '@/stream/ChannelCache.js';
|
||||
import type { Maybe } from '@/types/util.js';
|
||||
import { random } from '@/util/random.js';
|
||||
import constants from '@tunarr/shared/constants';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { isEmpty } from 'lodash-es';
|
||||
import type {
|
||||
ChannelFillerShowWithContent,
|
||||
ProgramWithRelations,
|
||||
} from '../db/schema/derivedTypes.js';
|
||||
import {
|
||||
FiveMinutesMillis,
|
||||
OneDayMillis,
|
||||
} from '../ffmpeg/builder/constants.ts';
|
||||
import type {
|
||||
FillerPickResult,
|
||||
IFillerPicker,
|
||||
} from './interfaces/IFillerPicker.ts';
|
||||
import {
|
||||
DefaultFillerCooldownMillis,
|
||||
EmptyFillerPickResult,
|
||||
} from './interfaces/IFillerPicker.ts';
|
||||
|
||||
@injectable()
|
||||
export class FillerPicker implements IFillerPicker {
|
||||
#channelCache: ChannelCache;
|
||||
|
||||
constructor(@inject(ChannelCache) channelCache: ChannelCache) {
|
||||
this.#channelCache = channelCache;
|
||||
}
|
||||
|
||||
pickFiller(
|
||||
channel: ChannelOrm,
|
||||
fillers: ChannelFillerShowWithContent[],
|
||||
maxDuration: number,
|
||||
): Promise<FillerPickResult> {
|
||||
if (isEmpty(fillers)) {
|
||||
return Promise.resolve(EmptyFillerPickResult);
|
||||
}
|
||||
|
||||
let pick1: Maybe<ProgramWithRelations>;
|
||||
const t0 = new Date().getTime();
|
||||
let minimumWait = 1000000000;
|
||||
|
||||
const fillerRepeatCooldownMs =
|
||||
channel.fillerRepeatCooldown ?? DefaultFillerCooldownMillis;
|
||||
|
||||
let listM = 0;
|
||||
let fillerListId: Maybe<string>;
|
||||
for (const filler of fillers) {
|
||||
const fillerPrograms = filler.fillerContent;
|
||||
let pickedList = false;
|
||||
let n = 0;
|
||||
|
||||
for (const clip of fillerPrograms) {
|
||||
// a few extra milliseconds won't hurt anyone, would it? dun dun dun
|
||||
if (clip.duration <= maxDuration + constants.SLACK) {
|
||||
const t1 = this.#channelCache.getProgramLastPlayTime(
|
||||
channel.uuid,
|
||||
clip.uuid,
|
||||
);
|
||||
let timeSince = t1 == 0 ? OneDayMillis : t0 - t1;
|
||||
|
||||
if (timeSince < fillerRepeatCooldownMs - constants.SLACK) {
|
||||
const w = fillerRepeatCooldownMs - timeSince;
|
||||
if (clip.duration + w <= maxDuration + constants.SLACK) {
|
||||
minimumWait = Math.min(minimumWait, w);
|
||||
}
|
||||
timeSince = 0;
|
||||
//30 minutes is too little, don't repeat it at all
|
||||
} else if (!pickedList) {
|
||||
const t1 = this.#channelCache.getFillerLastPlayTime(
|
||||
channel.uuid,
|
||||
filler.fillerShow.uuid,
|
||||
);
|
||||
const timeSince = t1 == 0 ? OneDayMillis : t0 - t1;
|
||||
if (timeSince + constants.SLACK >= filler.cooldown) {
|
||||
//should we pick this list?
|
||||
listM += filler.weight;
|
||||
if (random.bool(filler.weight, listM)) {
|
||||
pickedList = true;
|
||||
fillerListId = filler.fillerShow.uuid;
|
||||
n = 0;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
const w = filler.cooldown - timeSince;
|
||||
if (clip.duration + w <= maxDuration + constants.SLACK) {
|
||||
minimumWait = Math.min(minimumWait, w);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (timeSince <= 0) {
|
||||
continue;
|
||||
}
|
||||
const s = norm_s(
|
||||
timeSince >= FiveMinutesMillis ? FiveMinutesMillis : timeSince,
|
||||
);
|
||||
const d = norm_d(clip.duration);
|
||||
const w = s + d;
|
||||
n += w;
|
||||
if (random.bool(w, n)) {
|
||||
pick1 = clip;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve({
|
||||
fillerListId: fillerListId ?? null,
|
||||
filler: pick1 ?? null,
|
||||
minimumWait,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function norm_d(x: number) {
|
||||
x /= 60 * 1000;
|
||||
if (x >= 3.0) {
|
||||
x = 3.0 + Math.log(x);
|
||||
}
|
||||
const y = 10000 * (Math.ceil(x * 1000) + 1);
|
||||
return Math.ceil(y / 1000000) + 1;
|
||||
}
|
||||
|
||||
function norm_s(x: number) {
|
||||
let y = Math.ceil(x / 600) + 1;
|
||||
y = y * y;
|
||||
return Math.ceil(y / 1000000) + 1;
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { PersistentChannelCache } from '../../stream/ChannelCache.ts';
|
||||
import { SimpleStartupTask } from './IStartupTask.ts';
|
||||
|
||||
@injectable()
|
||||
export class LoadChannelCacheStartupTask extends SimpleStartupTask {
|
||||
id = LoadChannelCacheStartupTask.name;
|
||||
dependencies: string[] = [];
|
||||
|
||||
constructor(
|
||||
@inject(PersistentChannelCache)
|
||||
private persistentChannelCache: PersistentChannelCache,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
getPromise(): Promise<void> {
|
||||
return this.persistentChannelCache.init();
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { StreamCacheMigrator } from '../../migration/streamCache/StreamCacheMigrator.ts';
|
||||
import { ClearM3uCacheStartupTask } from './ClearM3uCacheStartupTask.ts';
|
||||
import { SimpleStartupTask } from './IStartupTask.ts';
|
||||
|
||||
@injectable()
|
||||
export class StreamCacheMigratorStartupTask extends SimpleStartupTask {
|
||||
id: string = StreamCacheMigratorStartupTask.name;
|
||||
|
||||
dependencies: string[] = [ClearM3uCacheStartupTask.name];
|
||||
|
||||
constructor(
|
||||
@inject(StreamCacheMigrator)
|
||||
private streamCacheMigrator: StreamCacheMigrator,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
getPromise(): Promise<void> {
|
||||
return this.streamCacheMigrator.run();
|
||||
}
|
||||
}
|
||||
@@ -1,164 +0,0 @@
|
||||
import { InMemoryCachedDbAdapter } from '@/db/json/InMemoryCachedDbAdapter.js';
|
||||
import { SchemaBackedDbAdapter } from '@/db/json/SchemaBackedJsonDBAdapter.js';
|
||||
import { GlobalOptions } from '@/globals.js';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { isUndefined } from 'lodash-es';
|
||||
import { Low } from 'lowdb';
|
||||
import { join } from 'node:path';
|
||||
import { z } from 'zod/v4';
|
||||
import {
|
||||
StreamLineupItem,
|
||||
isCommercialLineupItem,
|
||||
} from '../db/derived_types/StreamLineup.ts';
|
||||
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
|
||||
import { KEYS } from '../types/inject.ts';
|
||||
import { Logger } from '../util/logging/LoggerFactory.ts';
|
||||
|
||||
const channelCacheSchema = z.object({
|
||||
fillerPlayTimeCache: z.record(z.string(), z.number()).default({}),
|
||||
programPlayTimeCache: z.record(z.string(), z.number()).default({}),
|
||||
version: z.number().optional(),
|
||||
});
|
||||
|
||||
type ChannelCacheSchema = z.infer<typeof channelCacheSchema>;
|
||||
|
||||
export type PersistentChannelCacheProvider =
|
||||
() => Promise<PersistentChannelCache>;
|
||||
|
||||
@injectable()
|
||||
export class PersistentChannelCache {
|
||||
#db: Low<ChannelCacheSchema>;
|
||||
|
||||
constructor(
|
||||
@inject(KEYS.GlobalOptions) private globalOptions: GlobalOptions,
|
||||
) {
|
||||
this.#db = new Low<ChannelCacheSchema>(
|
||||
new InMemoryCachedDbAdapter(
|
||||
new SchemaBackedDbAdapter(
|
||||
channelCacheSchema,
|
||||
join(this.globalOptions.databaseDirectory, 'stream-cache.json'),
|
||||
),
|
||||
),
|
||||
{
|
||||
fillerPlayTimeCache: {},
|
||||
programPlayTimeCache: {},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async init() {
|
||||
return this.#db.read();
|
||||
}
|
||||
|
||||
getProgramPlayTime(id: string): number | undefined {
|
||||
return this.#db.data.programPlayTimeCache[id];
|
||||
}
|
||||
|
||||
setProgramPlayTime(id: string, time: number) {
|
||||
return this.#db.update(({ programPlayTimeCache }) => {
|
||||
programPlayTimeCache[id] = time;
|
||||
});
|
||||
}
|
||||
|
||||
getFillerPlayTime(id: string): number | undefined {
|
||||
return this.#db.data.fillerPlayTimeCache[id];
|
||||
}
|
||||
|
||||
setFillerPlayTime(id: string, time: number) {
|
||||
return this.#db.update(({ fillerPlayTimeCache }) => {
|
||||
fillerPlayTimeCache[id] = time;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@injectable()
|
||||
export class ChannelCache implements IStreamLineupCache {
|
||||
constructor(
|
||||
@inject(PersistentChannelCache)
|
||||
private persistentChannelCache: PersistentChannelCache,
|
||||
@inject(KEYS.Logger) private logger: Logger,
|
||||
) {}
|
||||
|
||||
getCurrentLineupItem(): StreamLineupItem | undefined {
|
||||
// TODO: Remove this entirely. Just return undefined for now since this is essentially
|
||||
// useless.
|
||||
return;
|
||||
}
|
||||
|
||||
private getKey(channelId: string, programId: string) {
|
||||
return `${channelId}|${programId}`;
|
||||
}
|
||||
|
||||
private async recordProgramPlayTime(
|
||||
channelId: string,
|
||||
lineupItem: StreamLineupItem,
|
||||
t0: number,
|
||||
) {
|
||||
let remaining: number;
|
||||
if (!isUndefined(lineupItem.streamDuration)) {
|
||||
remaining = lineupItem.streamDuration;
|
||||
} else {
|
||||
remaining = lineupItem.duration - (lineupItem.startOffset ?? 0);
|
||||
}
|
||||
|
||||
if (lineupItem.type === 'program') {
|
||||
const key = this.getKey(channelId, lineupItem.program.uuid);
|
||||
await this.persistentChannelCache.setProgramPlayTime(key, t0 + remaining);
|
||||
}
|
||||
|
||||
if (isCommercialLineupItem(lineupItem)) {
|
||||
await this.persistentChannelCache.setFillerPlayTime(
|
||||
this.getKey(channelId, lineupItem.fillerListId),
|
||||
t0 + remaining,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
getProgramLastPlayTime(channelId: string, programId: string) {
|
||||
return (
|
||||
this.persistentChannelCache.getProgramPlayTime(
|
||||
this.getKey(channelId, programId),
|
||||
) ?? 0
|
||||
);
|
||||
}
|
||||
|
||||
getFillerLastPlayTime(channelId: string, fillerId: string) {
|
||||
return (
|
||||
this.persistentChannelCache.getFillerPlayTime(
|
||||
this.getKey(channelId, fillerId),
|
||||
) ?? 0
|
||||
);
|
||||
}
|
||||
|
||||
async recordPlayback(
|
||||
channelId: string,
|
||||
t0: number,
|
||||
lineupItem: StreamLineupItem,
|
||||
) {
|
||||
try {
|
||||
await this.recordProgramPlayTime(channelId, lineupItem, t0);
|
||||
// await this.persistentChannelCache.setStreamPlayItem(channelId, {
|
||||
// timestamp: t0,
|
||||
// lineupItem: lineupItem,
|
||||
// });
|
||||
} catch (e) {
|
||||
this.logger.warn(
|
||||
e,
|
||||
'Error while setting stream cache for lineup item: %O at %d',
|
||||
lineupItem,
|
||||
t0,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async clearPlayback() {
|
||||
// return await this.persistentChannelCache.clearStreamPlayItem(channelId);
|
||||
}
|
||||
|
||||
// Is this necessary??
|
||||
clear() {
|
||||
// this.configCache = {};
|
||||
// this.cache = {};
|
||||
// this.channelNumbers = undefined;
|
||||
}
|
||||
}
|
||||
@@ -27,12 +27,10 @@ import { ContainerModule } from 'inversify';
|
||||
import type { IProgramDB } from '../db/interfaces/IProgramDB.ts';
|
||||
import type { ISettingsDB } from '../db/interfaces/ISettingsDB.ts';
|
||||
import type { FFmpegFactory } from '../ffmpeg/FFmpegModule.ts';
|
||||
import { FillerPicker } from '../services/FillerPicker.ts';
|
||||
import { FillerPickerV2 } from '../services/scheduling/FillerPickerV2.ts';
|
||||
import type { UpdatePlexPlayStatusScheduledTaskFactory } from '../tasks/plex/UpdatePlexPlayStatusTask.ts';
|
||||
import { UpdatePlexPlayStatusScheduledTask } from '../tasks/plex/UpdatePlexPlayStatusTask.ts';
|
||||
import { bindFactoryFunc } from '../util/inject.ts';
|
||||
import { PersistentChannelCache } from './ChannelCache.ts';
|
||||
import type { ProgramStreamFactory } from './ProgramStreamFactory.ts';
|
||||
import { ExternalStreamDetailsFetcherFactory } from './StreamDetailsFetcher.ts';
|
||||
import { EmbyProgramStream } from './emby/EmbyProgramStream.ts';
|
||||
@@ -253,16 +251,10 @@ const configure: interfaces.ContainerModuleCallBack = (bind) => {
|
||||
|
||||
bind(ExternalStreamDetailsFetcherFactory).toSelf().inSingletonScope();
|
||||
|
||||
bind(PersistentChannelCache).toSelf().inSingletonScope();
|
||||
|
||||
bind(KEYS.FillerPicker)
|
||||
.to(FillerPicker)
|
||||
.inSingletonScope()
|
||||
.whenTargetIsDefault();
|
||||
bind(KEYS.FillerPicker)
|
||||
.to(FillerPickerV2)
|
||||
.inSingletonScope()
|
||||
.whenTargetNamed('FillerPickerV2');
|
||||
.whenTargetIsDefault();
|
||||
};
|
||||
|
||||
class StreamModule extends ContainerModule {
|
||||
|
||||
@@ -13,7 +13,6 @@ import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
|
||||
import { calculateStartTimeOffsets } from '../db/lineupUtil.ts';
|
||||
import { ProgramPlayHistoryDB } from '../db/ProgramPlayHistoryDB.ts';
|
||||
import { MediaSourceId } from '../db/schema/base.ts';
|
||||
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
|
||||
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
|
||||
import {
|
||||
createChannelOrm,
|
||||
@@ -31,7 +30,6 @@ describe('StreamProgramCalculator', () => {
|
||||
const channelDB = mock<IChannelDB>();
|
||||
const programDB = mock<IProgramDB>();
|
||||
const fillerPicker = mock<IFillerPicker>();
|
||||
const channelCache = mock<IStreamLineupCache>();
|
||||
const playHistoryDB = mock<ProgramPlayHistoryDB>();
|
||||
|
||||
const startTime = dayjs(new Date(2025, 8, 17, 8));
|
||||
@@ -106,7 +104,6 @@ describe('StreamProgramCalculator', () => {
|
||||
LoggerFactory.root,
|
||||
instance(fillerDB),
|
||||
instance(channelDB),
|
||||
instance(channelCache),
|
||||
instance(programDB),
|
||||
instance(fillerPicker),
|
||||
instance(playHistoryDB),
|
||||
@@ -130,9 +127,13 @@ describe('StreamProgramCalculator', () => {
|
||||
startOffset: +dayjs.duration(16, 'minutes'),
|
||||
});
|
||||
|
||||
// Wait for async play history recording
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
verify(
|
||||
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
|
||||
playHistoryDB.isProgramCurrentlyPlaying(channelId, programId1, +startTime),
|
||||
).once();
|
||||
verify(playHistoryDB.create(anything())).once();
|
||||
});
|
||||
|
||||
baseTest('getCurrentLineupItem filler lineup item', async () => {
|
||||
@@ -140,7 +141,6 @@ describe('StreamProgramCalculator', () => {
|
||||
const channelDB = mock<IChannelDB>();
|
||||
const programDB = mock<IProgramDB>();
|
||||
const fillerPicker = mock<IFillerPicker>();
|
||||
const channelCache = mock<IStreamLineupCache>();
|
||||
const playHistoryDB = mock<ProgramPlayHistoryDB>();
|
||||
|
||||
const startTime = dayjs(new Date(2025, 8, 17, 8));
|
||||
@@ -217,7 +217,6 @@ describe('StreamProgramCalculator', () => {
|
||||
LoggerFactory.root,
|
||||
instance(fillerDB),
|
||||
instance(channelDB),
|
||||
instance(channelCache),
|
||||
instance(programDB),
|
||||
instance(fillerPicker),
|
||||
instance(playHistoryDB),
|
||||
@@ -243,9 +242,13 @@ describe('StreamProgramCalculator', () => {
|
||||
type: 'commercial',
|
||||
});
|
||||
|
||||
// Wait for async play history recording
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
verify(
|
||||
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
|
||||
playHistoryDB.isProgramCurrentlyPlaying(channelId, programId1, +startTime),
|
||||
).once();
|
||||
verify(playHistoryDB.create(anything())).once();
|
||||
});
|
||||
|
||||
baseTest('getCurrentLineupItem loop filler lineup item', async () => {
|
||||
@@ -253,7 +256,6 @@ describe('StreamProgramCalculator', () => {
|
||||
const channelDB = mock<IChannelDB>();
|
||||
const programDB = mock<IProgramDB>();
|
||||
const fillerPicker = mock<IFillerPicker>();
|
||||
const channelCache = mock<IStreamLineupCache>();
|
||||
const playHistoryDB = mock<ProgramPlayHistoryDB>();
|
||||
|
||||
const startTime = dayjs(new Date(2025, 8, 17, 8));
|
||||
@@ -330,7 +332,6 @@ describe('StreamProgramCalculator', () => {
|
||||
LoggerFactory.root,
|
||||
instance(fillerDB),
|
||||
instance(channelDB),
|
||||
instance(channelCache),
|
||||
instance(programDB),
|
||||
instance(fillerPicker),
|
||||
instance(playHistoryDB),
|
||||
@@ -355,9 +356,13 @@ describe('StreamProgramCalculator', () => {
|
||||
duration: +dayjs.duration(22, 'minutes'),
|
||||
});
|
||||
|
||||
// Wait for async play history recording
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
verify(
|
||||
channelCache.recordPlayback(channel.uuid, +startTime, out.lineupItem),
|
||||
playHistoryDB.isProgramCurrentlyPlaying(channelId, programId1, +startTime),
|
||||
).once();
|
||||
verify(playHistoryDB.create(anything())).once();
|
||||
});
|
||||
|
||||
baseTest('records play history for new playback', async () => {
|
||||
@@ -365,7 +370,6 @@ describe('StreamProgramCalculator', () => {
|
||||
const channelDB = mock<IChannelDB>();
|
||||
const programDB = mock<IProgramDB>();
|
||||
const fillerPicker = mock<IFillerPicker>();
|
||||
const channelCache = mock<IStreamLineupCache>();
|
||||
const playHistoryDB = mock<ProgramPlayHistoryDB>();
|
||||
|
||||
const startTime = dayjs(new Date(2025, 8, 17, 8));
|
||||
@@ -440,7 +444,6 @@ describe('StreamProgramCalculator', () => {
|
||||
LoggerFactory.root,
|
||||
instance(fillerDB),
|
||||
instance(channelDB),
|
||||
instance(channelCache),
|
||||
instance(programDB),
|
||||
instance(fillerPicker),
|
||||
instance(playHistoryDB),
|
||||
@@ -475,7 +478,6 @@ describe('StreamProgramCalculator', () => {
|
||||
const channelDB = mock<IChannelDB>();
|
||||
const programDB = mock<IProgramDB>();
|
||||
const fillerPicker = mock<IFillerPicker>();
|
||||
const channelCache = mock<IStreamLineupCache>();
|
||||
const playHistoryDB = mock<ProgramPlayHistoryDB>();
|
||||
|
||||
const startTime = dayjs(new Date(2025, 8, 17, 8));
|
||||
@@ -547,7 +549,6 @@ describe('StreamProgramCalculator', () => {
|
||||
LoggerFactory.root,
|
||||
instance(fillerDB),
|
||||
instance(channelDB),
|
||||
instance(channelCache),
|
||||
instance(programDB),
|
||||
instance(fillerPicker),
|
||||
instance(playHistoryDB),
|
||||
|
||||
@@ -7,7 +7,7 @@ import { binarySearchRange } from '@/util/binarySearch.js';
|
||||
import { type Logger } from '@/util/logging/LoggerFactory.js';
|
||||
import constants from '@tunarr/shared/constants';
|
||||
import dayjs from 'dayjs';
|
||||
import { inject, injectable, named } from 'inversify';
|
||||
import { inject, injectable } from 'inversify';
|
||||
import { first, inRange, isEmpty, isNil, isNull, sumBy } from 'lodash-es';
|
||||
import { Lineup, LineupItem } from '../db/derived_types/Lineup.ts';
|
||||
import {
|
||||
@@ -23,9 +23,7 @@ import { IFillerListDB } from '../db/interfaces/IFillerListDB.ts';
|
||||
import { IProgramDB } from '../db/interfaces/IProgramDB.ts';
|
||||
import { ProgramPlayHistoryDB } from '../db/ProgramPlayHistoryDB.ts';
|
||||
import { OneDayMillis } from '../ffmpeg/builder/constants.ts';
|
||||
import { IStreamLineupCache } from '../interfaces/IStreamLineupCache.ts';
|
||||
import { IFillerPicker } from '../services/interfaces/IFillerPicker.ts';
|
||||
import { FillerPickerV2 } from '../services/scheduling/FillerPickerV2.ts';
|
||||
import { WrappedError } from '../types/errors.ts';
|
||||
import { devAssert } from '../util/debug.ts';
|
||||
import { isNonEmptyString } from '../util/index.js';
|
||||
@@ -77,10 +75,8 @@ export class StreamProgramCalculator {
|
||||
@inject(KEYS.Logger) private logger: Logger,
|
||||
@inject(KEYS.FillerListDB) private fillerDB: IFillerListDB,
|
||||
@inject(KEYS.ChannelDB) private channelDB: IChannelDB,
|
||||
@inject(KEYS.ChannelCache) private channelCache: IStreamLineupCache,
|
||||
@inject(KEYS.ProgramDB) private programDB: IProgramDB,
|
||||
@inject(KEYS.FillerPicker)
|
||||
@named(FillerPickerV2.name)
|
||||
private fillerPicker: IFillerPicker,
|
||||
@inject(ProgramPlayHistoryDB)
|
||||
private programPlayHistoryDB: ProgramPlayHistoryDB,
|
||||
@@ -134,15 +130,6 @@ export class StreamProgramCalculator {
|
||||
);
|
||||
|
||||
if (redirectChannels.includes(currentProgram.program.channel)) {
|
||||
await this.channelCache.recordPlayback(channelContext.uuid, startTime, {
|
||||
type: 'error',
|
||||
error:
|
||||
'Recursive channel redirect found: ' + redirectChannels.join(', '),
|
||||
duration: 60_000,
|
||||
streamDuration: 60_000,
|
||||
startOffset: 0,
|
||||
programBeginMs: req.startTime,
|
||||
});
|
||||
}
|
||||
|
||||
const nextChannelId = currentProgram.program.channel;
|
||||
@@ -239,12 +226,6 @@ export class StreamProgramCalculator {
|
||||
this.logger.trace('Got lineup item: %O', lineupItem);
|
||||
}
|
||||
|
||||
await this.channelCache.recordPlayback(
|
||||
channel.uuid,
|
||||
req.startTime,
|
||||
lineupItem,
|
||||
);
|
||||
|
||||
// Record play history for content-backed items (programs and commercials/fillers)
|
||||
// Only record if this is a new playback (not a duplicate request for an already-playing program)
|
||||
if (
|
||||
|
||||
Reference in New Issue
Block a user