mirror of
https://github.com/bybrooklyn/alchemist.git
synced 2026-04-18 01:43:34 -04:00
Fix audio planning and stop infinite analysis loops
This commit is contained in:
@@ -1202,6 +1202,10 @@ impl Db {
|
||||
FROM jobs j
|
||||
WHERE j.status IN ('queued', 'failed')
|
||||
AND j.archived = 0
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM decisions d
|
||||
WHERE d.job_id = j.id
|
||||
)
|
||||
ORDER BY j.priority DESC, j.created_at ASC
|
||||
LIMIT ? OFFSET ?",
|
||||
)
|
||||
|
||||
@@ -568,7 +568,7 @@ async fn run() -> Result<()> {
|
||||
}
|
||||
|
||||
// Now analyze all queued + failed jobs
|
||||
scan_agent.analyze_pending_jobs().await;
|
||||
scan_agent.analyze_pending_jobs_boot().await;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -491,24 +491,21 @@ impl Analyzer {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Transcode if it's a "heavy" codec or very high bitrate
|
||||
let heavy_codecs = ["truehd", "dts-hd", "flac", "pcm_s24le", "pcm_s16le"];
|
||||
if heavy_codecs.contains(&stream.codec_name.to_lowercase().as_str()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
let bitrate = stream
|
||||
.bit_rate
|
||||
.as_ref()
|
||||
.and_then(|b| b.parse::<u64>().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
// If bitrate > 640kbps (standard AC3 max), maybe transcode?
|
||||
if bitrate > 640000 {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
// Only transcode lossless or exotic heavy codecs.
|
||||
// Standard compressed codecs (eac3, ac3, dts) copy
|
||||
// fine into MKV regardless of bitrate — eac3 Atmos
|
||||
// at 768 kbps is normal and should not be transcoded.
|
||||
let heavy_codecs = [
|
||||
"truehd",
|
||||
"mlp",
|
||||
"dts-hd",
|
||||
"flac",
|
||||
"pcm_s24le",
|
||||
"pcm_s16le",
|
||||
"pcm_s32le",
|
||||
"pcm_f32le",
|
||||
];
|
||||
heavy_codecs.contains(&stream.codec_name.to_lowercase().as_str())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -672,15 +669,15 @@ mod tests {
|
||||
};
|
||||
assert!(!Analyzer::should_transcode_audio(&standard));
|
||||
|
||||
let high_bitrate_ac3 = Stream {
|
||||
codec_name: "ac3".into(),
|
||||
let atmos_eac3 = Stream {
|
||||
codec_name: "eac3".into(),
|
||||
codec_type: "audio".into(),
|
||||
pix_fmt: None,
|
||||
width: None,
|
||||
height: None,
|
||||
coded_width: None,
|
||||
coded_height: None,
|
||||
bit_rate: Some("1000000".into()),
|
||||
bit_rate: Some("768000".into()),
|
||||
bits_per_raw_sample: None,
|
||||
channel_layout: None,
|
||||
channels: None,
|
||||
@@ -695,6 +692,31 @@ mod tests {
|
||||
color_space: None,
|
||||
color_range: None,
|
||||
};
|
||||
assert!(Analyzer::should_transcode_audio(&high_bitrate_ac3));
|
||||
assert!(!Analyzer::should_transcode_audio(&atmos_eac3));
|
||||
|
||||
let lossless_pcm = Stream {
|
||||
codec_name: "pcm_s32le".into(),
|
||||
codec_type: "audio".into(),
|
||||
pix_fmt: None,
|
||||
width: None,
|
||||
height: None,
|
||||
coded_width: None,
|
||||
coded_height: None,
|
||||
bit_rate: Some("2000000".into()),
|
||||
bits_per_raw_sample: None,
|
||||
channel_layout: None,
|
||||
channels: None,
|
||||
avg_frame_rate: None,
|
||||
r_frame_rate: None,
|
||||
nb_frames: None,
|
||||
duration: None,
|
||||
disposition: None,
|
||||
tags: None,
|
||||
color_primaries: None,
|
||||
color_transfer: None,
|
||||
color_space: None,
|
||||
color_range: None,
|
||||
};
|
||||
assert!(Analyzer::should_transcode_audio(&lossless_pcm));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +123,10 @@ impl EncoderCapabilities {
|
||||
pub fn has_libx264(&self) -> bool {
|
||||
self.has_video_encoder("libx264")
|
||||
}
|
||||
|
||||
pub fn has_libopus(&self) -> bool {
|
||||
self.audio_encoders.contains("libopus")
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FFmpegCommandBuilder<'a> {
|
||||
|
||||
@@ -167,6 +167,7 @@ impl Planner for BasicPlanner {
|
||||
analysis.metadata.audio_is_heavy,
|
||||
&container,
|
||||
audio_mode,
|
||||
&self.encoder_caps,
|
||||
);
|
||||
let audio_stream_indices = filter_audio_streams(
|
||||
&analysis.metadata.audio_streams,
|
||||
@@ -674,6 +675,7 @@ fn plan_audio(
|
||||
audio_is_heavy: bool,
|
||||
container: &str,
|
||||
audio_mode: Option<AudioMode>,
|
||||
encoder_caps: &crate::media::ffmpeg::EncoderCapabilities,
|
||||
) -> AudioStreamPlan {
|
||||
if let Some(audio_mode) = audio_mode {
|
||||
return match audio_mode {
|
||||
@@ -710,10 +712,13 @@ fn plan_audio(
|
||||
|
||||
let compatible = audio_copy_supported(container, audio_codec);
|
||||
if !compatible || audio_is_heavy {
|
||||
let codec = if container == "mp4" {
|
||||
AudioCodec::Aac
|
||||
} else {
|
||||
// Use Opus for MKV if libopus is available,
|
||||
// otherwise fall back to AAC which is always
|
||||
// present. MP4 always uses AAC.
|
||||
let codec = if container != "mp4" && encoder_caps.has_libopus() {
|
||||
AudioCodec::Opus
|
||||
} else {
|
||||
AudioCodec::Aac
|
||||
};
|
||||
return AudioStreamPlan::Transcode {
|
||||
codec,
|
||||
@@ -1185,7 +1190,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn heavy_audio_prefers_transcode() {
|
||||
let plan = plan_audio(Some("flac"), Some(6), true, "mkv", None);
|
||||
let mut encoder_caps = crate::media::ffmpeg::EncoderCapabilities::default();
|
||||
encoder_caps.audio_encoders.insert("libopus".to_string());
|
||||
let plan = plan_audio(Some("flac"), Some(6), true, "mkv", None, &encoder_caps);
|
||||
assert!(matches!(
|
||||
plan,
|
||||
AudioStreamPlan::Transcode {
|
||||
@@ -1195,6 +1202,19 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heavy_audio_falls_back_to_aac_when_libopus_is_unavailable() {
|
||||
let encoder_caps = crate::media::ffmpeg::EncoderCapabilities::default();
|
||||
let plan = plan_audio(Some("flac"), Some(6), true, "mkv", None, &encoder_caps);
|
||||
assert!(matches!(
|
||||
plan,
|
||||
AudioStreamPlan::Transcode {
|
||||
codec: AudioCodec::Aac,
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn vaapi_plan_includes_hwupload_filter() {
|
||||
let mut cfg = config();
|
||||
|
||||
@@ -9,7 +9,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{Mutex, OwnedSemaphorePermit, RwLock, Semaphore, broadcast};
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
pub struct Agent {
|
||||
db: Arc<Db>,
|
||||
@@ -175,46 +175,61 @@ impl Agent {
|
||||
self.analyzing_boot.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Runs analysis (ffprobe + planning decision) on all queued
|
||||
/// and failed jobs without executing any encodes. Called on
|
||||
/// startup to populate the queue with decisions before the
|
||||
/// user starts the engine.
|
||||
pub async fn analyze_pending_jobs(&self) {
|
||||
// Serialize all analysis passes to prevent
|
||||
// concurrent runs from racing on job state
|
||||
/// Boot-time analysis pass. Uses blocking acquire so
|
||||
/// it always runs to completion before the engine
|
||||
/// starts processing jobs. Called once from main.rs.
|
||||
pub async fn analyze_pending_jobs_boot(&self) {
|
||||
let _permit = match self.analysis_semaphore.acquire().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
"Auto-analysis: semaphore closed, \
|
||||
skipping."
|
||||
tracing::warn!("Auto-analysis: semaphore closed, skipping boot pass.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
self._run_analysis_pass().await;
|
||||
}
|
||||
|
||||
/// Watcher-triggered analysis pass. Uses try_acquire
|
||||
/// so it skips immediately if a pass is already
|
||||
/// running — the running pass will pick up newly
|
||||
/// enqueued jobs on its next loop iteration.
|
||||
/// Called from the file watcher after each enqueue.
|
||||
pub async fn analyze_pending_jobs(&self) {
|
||||
let _permit = match self.analysis_semaphore.try_acquire() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
debug!(
|
||||
"Auto-analysis: pass already running, \
|
||||
skipping watcher trigger."
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
self._run_analysis_pass().await;
|
||||
}
|
||||
|
||||
/// Shared analysis loop used by both boot and
|
||||
/// watcher-triggered passes. Caller holds the
|
||||
/// semaphore permit.
|
||||
async fn _run_analysis_pass(&self) {
|
||||
self.set_boot_analyzing(true);
|
||||
info!("Auto-analysis: scanning and analyzing pending jobs...");
|
||||
info!("Auto-analysis: starting pass...");
|
||||
|
||||
if let Err(e) = self.db.reset_interrupted_jobs().await {
|
||||
tracing::warn!("Auto-analysis: could not reset stuck jobs: {e}");
|
||||
}
|
||||
// NOTE: reset_interrupted_jobs is intentionally
|
||||
// NOT called here. It is a one-time startup
|
||||
// recovery operation called in main.rs before
|
||||
// this method is ever invoked. Calling it here
|
||||
// would reset jobs that are mid-analysis in a
|
||||
// concurrent pass, causing the infinite loop.
|
||||
|
||||
let batch_size: i64 = 100;
|
||||
let mut total_analyzed: usize = 0;
|
||||
|
||||
loop {
|
||||
// Always fetch from offset 0 — as jobs are
|
||||
// analyzed they leave the eligible set, so the
|
||||
// next fetch naturally returns the next batch
|
||||
// of still-unanalyzed jobs.
|
||||
let batch = match self.db.get_jobs_for_analysis_batch(0, batch_size).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Auto-analysis: failed to fetch \
|
||||
batch: {e}"
|
||||
);
|
||||
error!("Auto-analysis: fetch failed: {e}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -224,26 +239,21 @@ impl Agent {
|
||||
}
|
||||
|
||||
let batch_len = batch.len();
|
||||
info!(
|
||||
"Auto-analysis: analyzing batch of {} \
|
||||
job(s)...",
|
||||
batch_len
|
||||
);
|
||||
info!("Auto-analysis: analyzing {} job(s)...", batch_len);
|
||||
|
||||
for job in batch {
|
||||
let pipeline = self.pipeline();
|
||||
match pipeline.analyze_job_only(job).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Auto-analysis: job analysis \
|
||||
failed: {e:?}"
|
||||
);
|
||||
}
|
||||
Err(e) => tracing::warn!("Auto-analysis: job failed: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
total_analyzed += batch_len;
|
||||
|
||||
// Yield between batches to avoid CPU spinning
|
||||
// and allow other tokio tasks to run.
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
self.set_boot_analyzing(false);
|
||||
@@ -252,7 +262,7 @@ impl Agent {
|
||||
info!("Auto-analysis: no jobs pending analysis.");
|
||||
} else {
|
||||
info!(
|
||||
"Auto-analysis: complete. Analyzed {} job(s) total.",
|
||||
"Auto-analysis: complete. {} job(s) analyzed.",
|
||||
total_analyzed
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user