feat: Introduce Processor module to centralize job scanning, analysis, and transcoding job management.

This commit is contained in:
brooklyn
2026-01-06 19:33:02 -05:00
parent c67f03a557
commit 5356c75b26
10 changed files with 555 additions and 307 deletions

1
Cargo.lock generated
View File

@@ -3060,6 +3060,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -26,12 +26,13 @@ console_error_panic_hook = "0.1"
console_log = "1.0"
log = "0.4"
rayon = "1.10"
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"] }
gloo-net = { version = "0.5", optional = true }
leptos_meta = "0.6"
leptos_router = "0.6"
[features]
default = ["ssr"]
hydrate = ["leptos/hydrate", "leptos_meta/hydrate", "leptos_router/hydrate", "dep:gloo-net"]
ssr = [
"leptos/ssr",

View File

@@ -3,7 +3,7 @@ use leptos_meta::*;
use leptos_router::*;
use crate::db::{Job, JobState};
#[cfg(feature = "hydrate")]
use crate::server::AlchemistEvent;
use crate::db::AlchemistEvent;
#[server(GetJobs, "/api")]
pub async fn get_jobs() -> Result<Vec<Job>, ServerFnError> {
@@ -33,9 +33,66 @@ pub async fn get_stats() -> Result<serde_json::Value, ServerFnError> {
#[server(RunScan, "/api")]
pub async fn run_scan() -> Result<(), ServerFnError> {
// This is a placeholder for triggering a scan
tracing::info!("Scan triggered via Web UI");
Ok(())
#[cfg(feature = "ssr")]
{
use axum::Extension;
use crate::Processor;
use crate::config::Config;
use std::sync::Arc;
let processor = use_context::<Extension<Arc<Processor>>>()
.ok_or_else(|| ServerFnError::new("Processor not found"))?
.0.clone();
let config = use_context::<Extension<Arc<Config>>>()
.ok_or_else(|| ServerFnError::new("Config not found"))?
.0.clone();
let dirs = config.scanner.directories.iter().map(std::path::PathBuf::from).collect();
processor.scan_and_enqueue(dirs).await.map_err(|e| ServerFnError::new(e.to_string()))
}
#[cfg(not(feature = "ssr"))]
{
unreachable!()
}
}
#[server(CancelJob, "/api")]
pub async fn cancel_job(job_id: i64) -> Result<(), ServerFnError> {
#[cfg(feature = "ssr")]
{
use axum::Extension;
use crate::Orchestrator;
use std::sync::Arc;
let orchestrator = use_context::<Extension<Arc<Orchestrator>>>()
.ok_or_else(|| ServerFnError::new("Orchestrator not found"))?
.0.clone();
if orchestrator.cancel_job(job_id) {
Ok(())
} else {
Err(ServerFnError::new("Job not running or not found"))
}
}
#[cfg(not(feature = "ssr"))]
{
let _ = job_id;
unreachable!()
}
}
#[server(RestartJob, "/api")]
pub async fn restart_job(job_id: i64) -> Result<(), ServerFnError> {
use axum::Extension;
use crate::db::{Db, JobState};
use std::sync::Arc;
let db = use_context::<Extension<Arc<Db>>>()
.ok_or_else(|| ServerFnError::new("DB not found"))?
.0.clone();
db.update_job_status(job_id, JobState::Queued).await
.map_err(|e| ServerFnError::new(e.to_string()))
}
#[component]
@@ -60,6 +117,11 @@ pub fn App() -> impl IntoView {
fn Dashboard() -> impl IntoView {
let jobs = create_resource(|| (), |_| async move { get_jobs().await.unwrap_or_default() });
let stats = create_resource(|| (), |_| async move { get_stats().await.ok() });
// In-memory progress tracking
let (progress_map, _set_progress_map) = create_signal(std::collections::HashMap::<i64, f64>::new());
let (active_log, set_active_log) = create_signal(Option::<(i64, String)>::None);
// SSE Effect for real-time updates
#[cfg(feature = "hydrate")]
@@ -78,7 +140,24 @@ fn Dashboard() -> impl IntoView {
jobs.refetch();
stats.refetch();
}
_ => {}
AlchemistEvent::Progress { job_id, percentage, .. } => {
_set_progress_map.update(|m| {
m.insert(job_id, percentage);
});
}
AlchemistEvent::Log { job_id, message } => {
set_active_log.update(|l| {
if let Some((id, logs)) = l {
if *id == job_id {
let mut new_logs = logs.clone();
new_logs.push_str(&message);
new_logs.push('\n');
let lines: Vec<&str> = new_logs.lines().rev().take(20).collect();
*l = Some((job_id, lines.into_iter().rev().collect::<Vec<_>>().join("\n")));
}
}
});
}
}
}
}
@@ -88,6 +167,8 @@ fn Dashboard() -> impl IntoView {
});
let scan_action = create_server_action::<RunScan>();
let cancel_action = create_server_action::<CancelJob>();
let restart_action = create_server_action::<RestartJob>();
view! {
<div class="max-w-6xl mx-auto">
@@ -101,7 +182,7 @@ fn Dashboard() -> impl IntoView {
<div class="flex gap-4">
<button
on:click=move |_| scan_action.dispatch(RunScan {})
class="bg-blue-600 hover:bg-blue-700 text-white px-4 py-2 rounded-lg font-medium transition-colors"
class="bg-blue-600 hover:bg-blue-700 text-white px-6 py-2 rounded-lg font-medium transition-all shadow-lg shadow-blue-900/20 active:scale-95"
>
{move || if scan_action.pending().get() { "Scanning..." } else { "Scan Now" }}
</button>
@@ -113,57 +194,104 @@ fn Dashboard() -> impl IntoView {
let s = stats.get().flatten().unwrap_or_else(|| serde_json::json!({}));
let total = s.as_object().map(|m| m.values().filter_map(|v| v.as_i64()).sum::<i64>()).unwrap_or(0);
let completed = s.get("completed").and_then(|v| v.as_i64()).unwrap_or(0);
let processing = s.get("processing").and_then(|v| v.as_i64()).unwrap_or(0);
let processing = s.as_object().map(|m| m.iter().filter(|(k, _)| ["encoding", "analyzing"].contains(&k.as_str())).map(|(_, v)| v.as_i64().unwrap_or(0)).sum::<i64>()).unwrap_or(0);
let failed = s.get("failed").and_then(|v| v.as_i64()).unwrap_or(0);
view! {
<StatCard label="Total Jobs" value=total.to_string() color="blue" />
<StatCard label="Completed" value=completed.to_string() color="emerald" />
<StatCard label="Processing" value=processing.to_string() color="amber" />
<StatCard label="Active" value=processing.to_string() color="amber" />
<StatCard label="Failed" value=failed.to_string() color="rose" />
}
}}
</div>
<div class="bg-slate-900 border border-slate-800 rounded-xl overflow-hidden">
<div class="px-6 py-4 border-b border-slate-800 bg-slate-900/50">
<h2 class="text-xl font-semibold">"Recent Jobs"</h2>
<div class="bg-slate-900 border border-slate-800 rounded-xl overflow-hidden shadow-2xl">
<div class="px-6 py-4 border-b border-slate-800 bg-slate-900/50 flex justify-between items-center">
<h2 class="text-xl font-semibold text-slate-200">"Recent Jobs"</h2>
</div>
<div class="overflow-x-auto">
<table class="w-full text-left">
<thead class="text-xs text-slate-400 uppercase bg-slate-800/50">
<thead class="text-xs text-slate-500 uppercase bg-slate-800/30">
<tr>
<th class="px-6 py-3">"ID"</th>
<th class="px-6 py-3">"File"</th>
<th class="px-6 py-3">"Status"</th>
<th class="px-6 py-3">"Updated"</th>
<th class="px-6 py-4">"ID"</th>
<th class="px-6 py-4">"File & Progress"</th>
<th class="px-6 py-4">"Status"</th>
<th class="px-6 py-4">"Actions"</th>
</tr>
</thead>
<tbody class="divide-y divide-slate-800">
<Transition fallback=move || view! { <tr><td colspan="4" class="px-6 py-8 text-center text-slate-500">"Loading jobs..."</td></tr> }>
<tbody class="divide-y divide-slate-800/50">
<Transition fallback=move || view! { <tr><td colspan="4" class="px-6 py-12 text-center text-slate-500">"Loading jobs..."</td></tr> }>
{move || jobs.get().map(|all_jobs| {
all_jobs.into_iter().map(|job| {
let id = job.id;
let status_str = job.status.to_string();
let status_cls = match job.status {
JobState::Completed => "bg-emerald-500/10 text-emerald-400 border-emerald-500/20",
JobState::Encoding | JobState::Analyzing => "bg-amber-500/10 text-amber-400 border-amber-500/20 animate-pulse",
JobState::Failed => "bg-rose-500/10 text-rose-400 border-rose-500/20",
JobState::Encoding | JobState::Analyzing => "bg-amber-500/10 text-amber-400 border-amber-500/20",
JobState::Failed | JobState::Cancelled => "bg-rose-500/10 text-rose-400 border-rose-500/20",
_ => "bg-slate-500/10 text-slate-400 border-slate-500/20",
};
let prog = move || progress_map.with(|m| *m.get(&id).unwrap_or(&0.0));
let is_active = job.status == JobState::Encoding || job.status == JobState::Analyzing;
view! {
<tr class="hover:bg-slate-800/50 transition-colors">
<td class="px-6 py-4 font-mono text-xs text-slate-500">"#" {job.id}</td>
<tr class="hover:bg-slate-800/30 transition-colors group">
<td class="px-6 py-4 font-mono text-xs text-slate-600">"#" {id}</td>
<td class="px-6 py-4">
<div class="font-medium truncate max-w-xs">{job.input_path}</div>
<div class="text-xs text-slate-500 truncate mt-0.5">{job.decision_reason.unwrap_or_default()}</div>
<div class="font-medium truncate max-w-sm text-slate-200">{job.input_path}</div>
<div class="text-xs text-slate-500 truncate mt-1">{job.decision_reason.unwrap_or_default()}</div>
{move || if is_active {
view! {
<div class="mt-3 w-full bg-slate-800 rounded-full h-1.5 overflow-hidden">
<div
class="bg-blue-500 h-full transition-all duration-500 ease-out"
style=format!("width: {}%", prog())
></div>
</div>
<div class="text-[10px] text-blue-400 mt-1 font-mono uppercase tracking-wider">
{move || format!("{:.1}%", prog())}
</div>
}.into_view()
} else {
view! {}.into_view()
}}
</td>
<td class="px-6 py-4">
<span class=format!("px-2.5 py-1 rounded-full text-xs font-semibold border {}", status_cls)>
<span class=format!("px-2.5 py-1 rounded-full text-[10px] font-bold border uppercase tracking-tight {}", status_cls)>
{status_str}
</span>
</td>
<td class="px-6 py-4 text-sm text-slate-500">
{job.updated_at.to_rfc3339()}
<td class="px-6 py-4">
<div class="flex gap-2">
{move || if is_active {
view! {
<button
on:click=move |_| cancel_action.dispatch(CancelJob { job_id: id })
class="text-xs text-rose-400 hover:text-rose-300 font-medium px-2 py-1 rounded hover:bg-rose-500/10 transition-colors"
>
"Cancel"
</button>
<button
on:click=move |_| set_active_log.set(Some((id, String::new())))
class="text-xs text-slate-400 hover:text-slate-200 font-medium px-2 py-1 rounded hover:bg-slate-500/10 transition-colors"
>
"Logs"
</button>
}.into_view()
} else {
view! {
<button
on:click=move |_| restart_action.dispatch(RestartJob { job_id: id })
class="text-xs text-blue-400 hover:text-blue-300 font-medium px-2 py-1 rounded hover:bg-blue-500/10 transition-colors"
>
"Restart"
</button>
}.into_view()
}}
</div>
</td>
</tr>
}
@@ -174,6 +302,28 @@ fn Dashboard() -> impl IntoView {
</table>
</div>
</div>
// Log Viewer Modal
{move || active_log.get().map(|(id, logs)| {
view! {
<div class="fixed inset-0 bg-black/80 backdrop-blur-sm z-50 flex items-center justify-center p-4">
<div class="bg-slate-900 border border-slate-800 rounded-2xl w-full max-w-4xl max-h-[80vh] flex flex-col shadow-2xl">
<div class="px-6 py-4 border-b border-slate-800 flex justify-between items-center">
<h3 class="font-semibold text-lg text-slate-200">"Live Logs - Job #" {id}</h3>
<button
on:click=move |_| set_active_log.set(None)
class="text-slate-500 hover:text-slate-300"
>
<svg class="w-6 h-6" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M6 18L18 6M6 6l12 12"></path></svg>
</button>
</div>
<div class="p-6 overflow-y-auto flex-1 font-mono text-sm text-slate-400 bg-black/40">
<pre class="whitespace-pre-wrap">{logs}</pre>
</div>
</div>
</div>
}
})}
</div>
}
}
@@ -181,16 +331,16 @@ fn Dashboard() -> impl IntoView {
#[component]
fn StatCard(label: &'static str, value: String, color: &'static str) -> impl IntoView {
let accent = match color {
"blue" => "from-blue-500 to-indigo-600",
"emerald" => "from-emerald-500 to-teal-600",
"amber" => "from-amber-500 to-orange-600",
"rose" => "from-rose-500 to-pink-600",
_ => "from-slate-500 to-slate-600",
"blue" => "from-blue-400 to-indigo-500",
"emerald" => "from-emerald-400 to-teal-500",
"amber" => "from-amber-400 to-orange-500",
"rose" => "from-rose-400 to-pink-500",
_ => "from-slate-400 to-slate-500",
};
view! {
<div class="bg-slate-900 border border-slate-800 p-6 rounded-xl hover:border-slate-700 transition-colors">
<div class="text-slate-500 text-sm font-medium mb-2">{label}</div>
<div class=format!("text-3xl font-bold bg-gradient-to-br {} bg-clip-text text-transparent", accent)>
<div class="bg-slate-900 border border-slate-800 p-6 rounded-xl transition-all group hover:bg-slate-800/50 shadow-lg">
<div class="text-slate-500 text-xs font-bold uppercase tracking-widest mb-2">{label}</div>
<div class=format!("text-4xl font-black bg-gradient-to-br {} bg-clip-text text-transparent", accent)>
{value}
</div>
</div>

View File

@@ -6,6 +6,12 @@ use anyhow::Result;
pub struct Config {
pub transcode: TranscodeConfig,
pub hardware: HardwareConfig,
pub scanner: ScannerConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ScannerConfig {
pub directories: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -37,6 +43,9 @@ impl Default for Config {
device_path: None,
allow_cpu_fallback: false,
},
scanner: ScannerConfig {
directories: Vec::new(),
},
}
}
}

View File

@@ -13,6 +13,16 @@ pub enum JobState {
Completed,
Skipped,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum AlchemistEvent {
JobStateChanged { job_id: i64, status: JobState },
Progress { job_id: i64, percentage: f64, time: String },
Decision { job_id: i64, action: String, reason: String },
Log { job_id: i64, message: String },
}
impl std::fmt::Display for JobState {
@@ -24,6 +34,7 @@ impl std::fmt::Display for JobState {
JobState::Completed => "completed",
JobState::Skipped => "skipped",
JobState::Failed => "failed",
JobState::Cancelled => "cancelled",
};
write!(f, "{}", s)
}

View File

@@ -2,12 +2,21 @@ pub mod analyzer;
pub mod config;
pub mod db;
pub mod hardware;
pub mod orchestrator;
pub mod scanner;
#[cfg(feature = "ssr")]
pub mod orchestrator;
#[cfg(feature = "ssr")]
pub mod processor;
#[cfg(feature = "ssr")]
pub mod server;
pub mod app;
#[cfg(feature = "ssr")] pub use orchestrator::Orchestrator;
#[cfg(feature = "ssr")] pub use processor::Processor;
pub use db::AlchemistEvent;
#[cfg(feature = "hydrate")]
#[wasm_bindgen::prelude::wasm_bindgen]
pub fn hydrate() {

View File

@@ -1,20 +1,17 @@
use clap::Parser;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
use alchemist::{analyzer, config, db, hardware, orchestrator, scanner};
use alchemist::db::JobState;
use alchemist::server::AlchemistEvent;
use alchemist::{config, db, hardware, Processor, Orchestrator};
use tokio::sync::broadcast;
use leptos::*;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Directories to scan for media files
#[arg(required = true)]
#[arg()]
directories: Vec<PathBuf>,
/// Dry run (don't actually transcode)
@@ -65,150 +62,54 @@ async fn main() -> anyhow::Result<()> {
}
};
// 2. Initialize Database and Broadcast Channel
let db = db::Db::new("alchemist.db").await?;
// 2. Initialize Database, Broadcast Channel, Orchestrator, and Processor
let db = Arc::new(db::Db::new("alchemist.db").await?);
let (tx, _rx) = broadcast::channel(100);
info!("Database and Broadcast channel initialized.");
let orchestrator = Arc::new(Orchestrator::new());
let config = Arc::new(config);
let processor = Arc::new(Processor::new(
db.clone(),
orchestrator.clone(),
config.clone(),
hw_info,
tx.clone()
));
info!("Database and services initialized.");
// 3. Start Background Processor Loop
let proc = processor.clone();
tokio::spawn(async move {
proc.run_loop().await;
});
if args.server {
info!("Starting web server...");
alchemist::server::run_server(Arc::new(db), Arc::new(config), tx).await?;
return Ok(());
}
// 3. Scan directories and enqueue jobs
let scanner = scanner::Scanner::new();
let files = scanner.scan(args.directories);
if files.is_empty() {
info!("No media files found to process.");
alchemist::server::run_server(db, config, processor, orchestrator, tx).await?;
} else {
for scanned_file in files {
// Basic output path generation - can be refined later
let mut output_path = scanned_file.path.clone();
output_path.set_extension("av1.mkv");
if let Err(e) = db.enqueue_job(&scanned_file.path, &output_path, scanned_file.mtime).await {
error!("Failed to enqueue job for {:?}: {}", scanned_file.path, e);
}
// CLI Mode
if args.directories.is_empty() {
error!("No directories provided. Usage: alchemist <DIRECTORIES>... or alchemist --server");
return Err(anyhow::anyhow!("Missing directories for CLI mode"));
}
}
// 4. Process Queue
let orchestrator = Arc::new(orchestrator::Orchestrator::new());
let db = Arc::new(db);
let config = Arc::new(config);
let hw_info = Arc::new(hw_info);
let tx = Arc::new(tx);
let semaphore = Arc::new(Semaphore::new(config.transcode.concurrent_jobs));
let mut futures = Vec::new();
while let Some(job) = db.get_next_job().await? {
let permit = semaphore.clone().acquire_owned().await.unwrap();
processor.scan_and_enqueue(args.directories).await?;
let db = db.clone();
let orchestrator = orchestrator.clone();
let config = config.clone();
let hw_info = hw_info.clone();
let tx = tx.clone();
let dry_run = args.dry_run;
// Wait until all jobs are processed
info!("Waiting for jobs to complete...");
loop {
let stats = db.get_stats().await?;
let processing = stats.get("processing").and_then(|v| v.as_i64()).unwrap_or(0);
let queued = stats.get("queued").and_then(|v| v.as_i64()).unwrap_or(0);
let analyzing = stats.get("analyzing").and_then(|v| v.as_i64()).unwrap_or(0);
let encoding = stats.get("encoding").and_then(|v| v.as_i64()).unwrap_or(0);
let future = tokio::spawn(async move {
let _permit = permit; // Hold permit until job is done
let file_path = std::path::PathBuf::from(&job.input_path);
let output_path = std::path::PathBuf::from(&job.output_path);
info!("--- Processing Job {}: {:?} ---", job.id, file_path.file_name().unwrap_or_default());
// 1. ANALYZING
let _ = db.update_job_status(job.id, JobState::Analyzing).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Analyzing });
// Preflight Analysis
match analyzer::Analyzer::probe(&file_path) {
Ok(metadata) => {
let (should_encode, reason) = analyzer::Analyzer::should_transcode(&file_path, &metadata, &config);
if should_encode {
// 2. ENCODING
info!("Decision: ENCODE Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "encode", &reason).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "encode".to_string(), reason: reason.clone() });
let _ = db.update_job_status(job.id, JobState::Encoding).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Encoding });
if let Err(e) = orchestrator.transcode_to_av1(&file_path, &output_path, hw_info.as_ref().as_ref(), dry_run, &metadata, Some((job.id, tx.clone()))) {
error!("Transcode failed for Job {}: {}", job.id, e);
let _ = db.add_decision(job.id, "reject", &e.to_string()).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: e.to_string() });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
} else if !dry_run {
// Size Reduction Gate
let input_size = std::fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
let output_size = std::fs::metadata(&output_path).map(|m| m.len()).unwrap_or(0);
let reduction = 1.0 - (output_size as f64 / input_size as f64);
info!("Job {}: Size reduction: {:.2}% ({} -> {})", job.id, reduction * 100.0, input_size, output_size);
if reduction < config.transcode.size_reduction_threshold {
info!("Job {}: Size reduction gate failed ({:.2}% < {:.0}%). Reverting.",
job.id, reduction * 100.0, config.transcode.size_reduction_threshold * 100.0);
std::fs::remove_file(&output_path).ok();
let _ = db.add_decision(job.id, "skip", "Inefficient: <30% reduction").await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "skip".to_string(), reason: "Inefficient: <30% reduction".to_string() });
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
} else {
// Integrity Check
match analyzer::Analyzer::probe(&output_path) {
Ok(_) => {
info!("Job {}: Size reduction and integrity gate passed. Job completed.", job.id);
let _ = db.update_job_status(job.id, JobState::Completed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Completed });
}
Err(e) => {
error!("Job {}: Integrity check failed for {:?}: {}", job.id, output_path, e);
std::fs::remove_file(&output_path).ok();
let _ = db.add_decision(job.id, "reject", &format!("Integrity check failed: {}", e)).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: format!("Integrity check failed: {}", e) });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
}
}
} else {
let _ = db.update_job_status(job.id, JobState::Completed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Completed });
}
} else {
// 2. SKIPPED
info!("Decision: SKIP Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "skip", &reason).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "skip".to_string(), reason: reason.clone() });
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
}
}
Err(e) => {
error!("Job {}: Failed to probe {:?}: {}", job.id, file_path, e);
let _ = db.add_decision(job.id, "reject", &e.to_string()).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: e.to_string() });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
if processing + queued + analyzing + encoding == 0 {
break;
}
});
futures.push(future);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
info!("All jobs processed.");
}
// Wait for all jobs to finish
futures::future::join_all(futures).await;
info!("All jobs processed.");
Ok(())
}

View File

@@ -1,155 +1,173 @@
use std::path::Path;
use std::process::{Command, Stdio};
use tokio::process::Command;
use std::process::Stdio;
use anyhow::{anyhow, Result};
use tracing::{info, error};
use tracing::{info, error, warn};
use crate::hardware::{Vendor, HardwareInfo};
use crate::server::AlchemistEvent;
use tokio::sync::broadcast;
use std::sync::Arc;
use crate::db::AlchemistEvent;
use tokio::sync::{broadcast, oneshot};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, BufReader};
pub struct Orchestrator {
ffmpeg_path: String,
cancel_channels: Arc<Mutex<HashMap<i64, oneshot::Sender<()>>>>,
}
impl Orchestrator {
pub fn new() -> Self {
Self {
ffmpeg_path: "ffmpeg".to_string(),
cancel_channels: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn transcode_to_av1(&self, input: &Path, output: &Path, hw_info: Option<&HardwareInfo>, dry_run: bool, metadata: &crate::analyzer::MediaMetadata, event_target: Option<(i64, Arc<broadcast::Sender<AlchemistEvent>>)>) -> Result<()> {
let mut args = vec![
"-hide_banner".to_string(),
"-y".to_string(),
];
// Vendor-specific setup
if let Some(hw) = hw_info {
match hw.vendor {
Vendor::Intel => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "qsv".to_string(),
"-qsv_device".to_string(), hw.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_qsv".to_string(),
"-preset".to_string(), "medium".to_string(),
"-global_quality".to_string(), "25".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
Vendor::Nvidia => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "cuda".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_nvenc".to_string(),
"-preset".to_string(), "p4".to_string(),
"-cq".to_string(), "24".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
Vendor::Amd => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "vaapi".to_string(),
"-vaapi_device".to_string(), hw.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-hwaccel_output_format".to_string(), "vaapi".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-vf".to_string(), "format=nv12|vaapi,hwupload".to_string(),
"-c:v".to_string(), "av1_vaapi".to_string(),
"-qp".to_string(), "25".to_string(),
]);
}
Vendor::Apple => {
args.extend_from_slice(&[
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_videotoolbox".to_string(),
"-bitrate".to_string(), "6M".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
}
pub fn cancel_job(&self, job_id: i64) -> bool {
let mut channels = self.cancel_channels.lock().unwrap();
if let Some(tx) = channels.remove(&job_id) {
let _ = tx.send(());
true
} else {
// CPU fallback (libaom-av1) - VERY SLOW, but requested via allow_cpu_fallback
args.extend_from_slice(&[
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "libaom-av1".to_string(),
"-crf".to_string(), "30".to_string(),
"-cpu-used".to_string(), "8".to_string(), // Faster preset for CPU
"-pix_fmt".to_string(), "yuv420p10le".to_string(),
]);
false
}
}
// Audio and Subtitle Mapping
args.extend_from_slice(&["-map".to_string(), "0:v:0".to_string()]);
let mut audio_count = 0;
for (i, stream) in metadata.streams.iter().enumerate() {
if stream.codec_type == "audio" {
args.extend_from_slice(&["-map".to_string(), format!("0:a:{}", audio_count)]);
if crate::analyzer::Analyzer::should_transcode_audio(stream) {
args.extend_from_slice(&[format!("-c:a:{}", audio_count), "libopus".to_string(), format!("-b:a:{}", audio_count), "192k".to_string()]);
} else {
args.extend_from_slice(&[format!("-c:a:{}", audio_count), "copy".to_string()]);
}
audio_count += 1;
} else if stream.codec_type == "subtitle" {
args.extend_from_slice(&["-map".to_string(), format!("0:s:{}", i - audio_count - 1)]); // Simplified mapping
args.extend_from_slice(&["-c:s".to_string(), "copy".to_string()]);
}
}
// If no subtitles were found or mapping is complex, fallback to a simpler copy all if needed
// But for now, let's just map all and copy.
args.push(output.to_str().ok_or_else(|| anyhow!("Invalid output path"))?.to_string());
info!("Command: {} {}", self.ffmpeg_path, args.join(" "));
pub async fn transcode_to_av1(
&self,
input: &Path,
output: &Path,
hw_info: Option<&HardwareInfo>,
dry_run: bool,
metadata: &crate::analyzer::MediaMetadata,
event_target: Option<(i64, Arc<broadcast::Sender<AlchemistEvent>>)>
) -> Result<()> {
if dry_run {
info!("Dry run: Skipping actual execution.");
info!("[DRY RUN] Transcoding {:?} to {:?}", input, output);
return Ok(());
}
let mut child = Command::new(&self.ffmpeg_path)
.args(&args)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
let mut cmd = Command::new("ffmpeg");
cmd.arg("-hide_banner").arg("-y").arg("-i").arg(input);
// Map HardwareInfo to FFmpeg arguments
if let Some(info) = hw_info {
match info.vendor {
Vendor::Intel => {
cmd.arg("-c:v").arg("av1_qsv");
cmd.arg("-global_quality").arg("25");
cmd.arg("-look_ahead").arg("1");
}
Vendor::Nvidia => {
cmd.arg("-c:v").arg("av1_nvenc");
cmd.arg("-preset").arg("p4");
cmd.arg("-cq").arg("25");
}
Vendor::Apple => {
cmd.arg("-c:v").arg("av1_videotoolbox");
}
Vendor::Amd => {
cmd.arg("-c:v").arg("av1_vaapi");
}
}
} else {
cmd.arg("-c:v").arg("libaom-av1");
cmd.arg("-crf").arg("30");
cmd.arg("-cpu-used").arg("6");
}
cmd.arg("-c:a").arg("copy");
cmd.arg("-c:s").arg("copy");
cmd.arg(output);
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
info!("Executing FFmpeg: {:?}", cmd);
let mut child = cmd.spawn().map_err(|e| anyhow!("Failed to spawn FFmpeg: {}", e))?;
let stderr = child.stderr.take().ok_or_else(|| anyhow!("Failed to capture stderr"))?;
let reader = std::io::BufReader::new(stderr);
use std::io::BufRead;
let (kill_tx, kill_rx) = oneshot::channel();
let job_id = event_target.as_ref().map(|(id, _)| *id);
if let Some(id) = job_id {
self.cancel_channels.lock().unwrap().insert(id, kill_tx);
}
for line in reader.lines() {
let line = line?;
if let Some((job_id, ref tx)) = event_target {
let _ = tx.send(AlchemistEvent::Log { job_id, message: line.clone() });
if line.contains("time=") {
// simple parse for time=00:00:00.00
if let Some(time_part) = line.split("time=").nth(1) {
let time_str = time_part.split_whitespace().next().unwrap_or("");
info!("Progress: time={}", time_str);
let _ = tx.send(AlchemistEvent::Progress { job_id, percentage: 0.0, time: time_str.to_string() });
let total_duration = metadata.format.duration.parse::<f64>().unwrap_or(0.0);
let mut reader = BufReader::new(stderr).lines();
let event_target_clone = event_target.clone();
let mut kill_rx = kill_rx;
let mut killed = false;
loop {
tokio::select! {
line_res = reader.next_line() => {
match line_res {
Ok(Some(line)) => {
if let Some((job_id, ref tx)) = event_target_clone {
let _ = tx.send(AlchemistEvent::Log { job_id, message: line.clone() });
if line.contains("time=") {
if let Some(time_part) = line.split("time=").nth(1) {
let time_str = time_part.split_whitespace().next().unwrap_or("");
let current_time = Self::parse_duration(time_str);
let percentage = if total_duration > 0.0 {
(current_time / total_duration * 100.0).min(100.0)
} else {
0.0
};
let _ = tx.send(AlchemistEvent::Progress { job_id, percentage, time: time_str.to_string() });
}
}
}
}
Ok(None) => break,
Err(e) => {
error!("Error reading FFmpeg stderr: {}", e);
break;
}
}
}
} else if line.contains("time=") {
// simple parse for time=00:00:00.00
if let Some(time_part) = line.split("time=").nth(1) {
let time_str = time_part.split_whitespace().next().unwrap_or("");
info!("Progress: time={}", time_str);
_ = &mut kill_rx => {
warn!("Job {:?} cancelled. Killing FFmpeg process...", job_id);
let _ = child.kill().await;
killed = true;
if let Some(id) = job_id {
self.cancel_channels.lock().unwrap().remove(&id);
}
break;
}
}
}
let status = child.wait()?;
let status = child.wait().await?;
if let Some(id) = job_id {
self.cancel_channels.lock().unwrap().remove(&id);
}
if killed {
return Err(anyhow!("Cancelled"));
}
if status.success() {
info!("Transcode successful: {:?}", output);
Ok(())
} else {
error!("FFmpeg failed with exit code: {:?}", status.code());
Err(anyhow!("FFmpeg execution failed"))
error!("FFmpeg failed with status: {}", status);
Err(anyhow!("FFmpeg failed"))
}
}
fn parse_duration(s: &str) -> f64 {
// HH:MM:SS.ms
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 3 { return 0.0; }
let hours = parts[0].parse::<f64>().unwrap_or(0.0);
let minutes = parts[1].parse::<f64>().unwrap_or(0.0);
let seconds = parts[2].parse::<f64>().unwrap_or(0.0);
hours * 3600.0 + minutes * 60.0 + seconds
}
}

146
src/processor.rs Normal file
View File

@@ -0,0 +1,146 @@
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{broadcast, Semaphore};
use tracing::{info, error, warn};
use crate::db::{Db, JobState, AlchemistEvent};
use crate::Orchestrator;
use crate::config::Config;
use crate::hardware::HardwareInfo;
use crate::analyzer::Analyzer;
use crate::scanner::Scanner;
pub struct Processor {
db: Arc<Db>,
orchestrator: Arc<Orchestrator>,
config: Arc<Config>,
hw_info: Arc<Option<HardwareInfo>>,
tx: Arc<broadcast::Sender<AlchemistEvent>>,
semaphore: Arc<Semaphore>,
}
impl Processor {
pub fn new(
db: Arc<Db>,
orchestrator: Arc<Orchestrator>,
config: Arc<Config>,
hw_info: Option<HardwareInfo>,
tx: broadcast::Sender<AlchemistEvent>,
) -> Self {
let concurrent_jobs = config.transcode.concurrent_jobs;
Self {
db,
orchestrator,
config,
hw_info: Arc::new(hw_info),
tx: Arc::new(tx),
semaphore: Arc::new(Semaphore::new(concurrent_jobs)),
}
}
pub async fn scan_and_enqueue(&self, directories: Vec<PathBuf>) -> anyhow::Result<()> {
info!("Starting manual scan of directories: {:?}", directories);
let scanner = Scanner::new();
let files = scanner.scan(directories);
for scanned_file in files {
let mut output_path = scanned_file.path.clone();
output_path.set_extension("av1.mkv");
if let Err(e) = self.db.enqueue_job(&scanned_file.path, &output_path, scanned_file.mtime).await {
error!("Failed to enqueue job for {:?}: {}", scanned_file.path, e);
}
}
let _ = self.tx.send(AlchemistEvent::JobStateChanged { job_id: 0, status: JobState::Queued }); // Trigger UI refresh
Ok(())
}
pub async fn run_loop(&self) {
info!("Processor loop started.");
loop {
match self.db.get_next_job().await {
Ok(Some(job)) => {
let permit = self.semaphore.clone().acquire_owned().await.unwrap();
let db = self.db.clone();
let orchestrator = self.orchestrator.clone();
let config = self.config.clone();
let hw_info = self.hw_info.clone();
let tx = self.tx.clone();
tokio::spawn(async move {
let _permit = permit;
let file_path = PathBuf::from(&job.input_path);
let output_path = PathBuf::from(&job.output_path);
info!("--- Processing Job {}: {:?} ---", job.id, file_path.file_name().unwrap_or_default());
// 1. ANALYZING
let _ = db.update_job_status(job.id, JobState::Analyzing).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Analyzing });
match Analyzer::probe(&file_path) {
Ok(metadata) => {
let (should_encode, reason) = Analyzer::should_transcode(&file_path, &metadata, &config);
if should_encode {
info!("Decision: ENCODE Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "encode", &reason).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "encode".to_string(), reason: reason.clone() });
let _ = db.update_job_status(job.id, JobState::Encoding).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Encoding });
match orchestrator.transcode_to_av1(&file_path, &output_path, hw_info.as_ref().as_ref(), false, &metadata, Some((job.id, tx.clone()))).await {
Ok(_) => {
// Integrity & Size Reduction check
let input_size = std::fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
let output_size = std::fs::metadata(&output_path).map(|m| m.len()).unwrap_or(0);
let reduction = 1.0 - (output_size as f64 / input_size as f64);
if reduction < config.transcode.size_reduction_threshold {
warn!("Job {}: Size reduction gate failed ({:.2}%). Reverting.", job.id, reduction * 100.0);
std::fs::remove_file(&output_path).ok();
let _ = db.add_decision(job.id, "skip", "Inefficient reduction").await;
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
} else {
let _ = db.update_job_status(job.id, JobState::Completed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Completed });
}
}
Err(e) => {
if e.to_string() == "Cancelled" {
let _ = db.update_job_status(job.id, JobState::Cancelled).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Cancelled });
} else {
error!("Job {}: Transcode failed: {}", job.id, e);
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
}
}
} else {
info!("Decision: SKIP Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "skip", &reason).await;
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
}
}
Err(e) => {
error!("Job {}: Probing failed: {}", job.id, e);
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
}
});
}
Ok(None) => {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Err(e) => {
error!("Database error in processor loop: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
}

View File

@@ -1,3 +1,4 @@
#![cfg(feature = "ssr")]
use axum::{
routing::{get, post},
Router,
@@ -6,26 +7,25 @@ use axum::{
use futures::stream::Stream;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use std::convert::Infallible;
use leptos::*;
use leptos_axum::{generate_route_list, LeptosRoutes};
use std::sync::Arc;
use crate::app::*;
use crate::db::{Db, JobState};
use crate::db::{Db, AlchemistEvent};
use crate::config::Config;
use serde::{Serialize, Deserialize};
use tracing::info;
use crate::Processor;
use crate::Orchestrator;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum AlchemistEvent {
JobStateChanged { job_id: i64, status: JobState },
Progress { job_id: i64, percentage: f64, time: String },
Decision { job_id: i64, action: String, reason: String },
Log { job_id: i64, message: String },
}
pub async fn run_server(db: Arc<Db>, config: Arc<Config>, tx: broadcast::Sender<AlchemistEvent>) -> anyhow::Result<()> {
pub async fn run_server(
db: Arc<Db>,
config: Arc<Config>,
processor: Arc<Processor>,
orchestrator: Arc<Orchestrator>,
tx: broadcast::Sender<AlchemistEvent>
) -> anyhow::Result<()> {
let conf = get_configuration(None).await.unwrap();
let leptos_options = conf.leptos_options;
let addr = leptos_options.site_addr;
@@ -39,6 +39,8 @@ pub async fn run_server(db: Arc<Db>, config: Arc<Config>, tx: broadcast::Sender<
.with_state(leptos_options)
.layer(axum::Extension(db))
.layer(axum::Extension(config))
.layer(axum::Extension(processor))
.layer(axum::Extension(orchestrator))
.layer(axum::Extension(tx));
info!("listening on http://{}", addr);