mirror of
https://github.com/bybrooklyn/alchemist.git
synced 2026-04-18 01:43:34 -04:00
Implement engine auto-analysis and settings UI refinements
This commit is contained in:
@@ -27,6 +27,8 @@ pub struct Agent {
|
||||
manual_override: Arc<AtomicBool>,
|
||||
pub(crate) engine_mode: Arc<tokio::sync::RwLock<crate::config::EngineMode>>,
|
||||
dry_run: bool,
|
||||
in_flight_jobs: Arc<AtomicUsize>,
|
||||
analyzing_boot: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Agent {
|
||||
@@ -61,6 +63,8 @@ impl Agent {
|
||||
manual_override: Arc::new(AtomicBool::new(false)),
|
||||
engine_mode: Arc::new(tokio::sync::RwLock::new(engine_mode)),
|
||||
dry_run,
|
||||
in_flight_jobs: Arc::new(AtomicUsize::new(0)),
|
||||
analyzing_boot: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,33 +160,48 @@ impl Agent {
|
||||
self.draining.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn set_boot_analyzing(&self, value: bool) {
|
||||
self.analyzing_boot.store(value, Ordering::SeqCst);
|
||||
if value {
|
||||
info!("Boot analysis started — engine claim loop paused.");
|
||||
} else {
|
||||
info!("Boot analysis complete — engine claim loop resumed.");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_boot_analyzing(&self) -> bool {
|
||||
self.analyzing_boot.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Runs analysis (ffprobe + planning decision) on all queued
|
||||
/// and failed jobs without executing any encodes. Called on
|
||||
/// startup to populate the queue with decisions before the
|
||||
/// user starts the engine.
|
||||
pub async fn analyze_pending_jobs(&self) {
|
||||
self.set_boot_analyzing(true);
|
||||
|
||||
info!("Auto-analysis: scanning and analyzing pending jobs...");
|
||||
|
||||
// First trigger a full library scan to pick up new files
|
||||
if let Err(e) = self.db.reset_interrupted_jobs().await {
|
||||
tracing::warn!("Auto-analysis: could not reset stuck jobs: {e}");
|
||||
}
|
||||
|
||||
// Get all queued and failed jobs to analyze
|
||||
let jobs = match self.db.get_jobs_for_analysis().await {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
error!("Auto-analysis: failed to fetch jobs: {e}");
|
||||
self.set_boot_analyzing(false);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if jobs.is_empty() {
|
||||
info!("Auto-analysis: no jobs pending analysis.");
|
||||
self.set_boot_analyzing(false);
|
||||
return;
|
||||
}
|
||||
|
||||
info!("Auto-analysis: analyzing {} jobs...", jobs.len());
|
||||
info!("Auto-analysis: analyzing {} job(s)...", jobs.len());
|
||||
|
||||
for job in jobs {
|
||||
let pipeline = self.pipeline();
|
||||
@@ -194,6 +213,7 @@ impl Agent {
|
||||
}
|
||||
}
|
||||
|
||||
self.set_boot_analyzing(false);
|
||||
info!("Auto-analysis: complete.");
|
||||
}
|
||||
|
||||
@@ -285,23 +305,15 @@ impl Agent {
|
||||
pub async fn run_loop(self: Arc<Self>) {
|
||||
info!("Agent loop started.");
|
||||
loop {
|
||||
if self.is_paused() {
|
||||
// Block while paused OR while boot analysis runs
|
||||
if self.is_paused() || self.is_boot_analyzing() {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let permit = match self.semaphore.clone().acquire_owned().await {
|
||||
Ok(permit) => permit,
|
||||
Err(e) => {
|
||||
error!("Failed to acquire job permit: {}", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Check drain BEFORE acquiring a permit to eliminate the race window
|
||||
if self.is_draining() {
|
||||
drop(permit);
|
||||
if self.orchestrator.active_job_count() == 0 {
|
||||
if self.in_flight_jobs.load(Ordering::SeqCst) == 0 {
|
||||
info!(
|
||||
"Engine drain complete — all active jobs finished. Returning to paused state."
|
||||
);
|
||||
@@ -316,15 +328,33 @@ impl Agent {
|
||||
continue;
|
||||
}
|
||||
|
||||
let permit = match self.semaphore.clone().acquire_owned().await {
|
||||
Ok(permit) => permit,
|
||||
Err(e) => {
|
||||
error!("Failed to acquire job permit: {}", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Re-check drain after permit acquisition (belt-and-suspenders)
|
||||
if self.is_draining() {
|
||||
drop(permit);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
match self.db.claim_next_job().await {
|
||||
Ok(Some(job)) => {
|
||||
self.in_flight_jobs.fetch_add(1, Ordering::SeqCst);
|
||||
let agent = self.clone();
|
||||
|
||||
let counter = self.in_flight_jobs.clone();
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit;
|
||||
if let Err(e) = agent.process_job(job).await {
|
||||
error!("Job processing error: {}", e);
|
||||
}
|
||||
counter.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
|
||||
@@ -22,19 +22,19 @@ export default function HeaderActions() {
|
||||
|
||||
const statusConfig = {
|
||||
running: {
|
||||
dot: "bg-emerald-500 animate-pulse",
|
||||
dot: "bg-status-success animate-pulse",
|
||||
label: "Running",
|
||||
labelColor: "text-emerald-500",
|
||||
labelColor: "text-status-success",
|
||||
},
|
||||
paused: {
|
||||
dot: "bg-amber-500",
|
||||
dot: "bg-helios-solar",
|
||||
label: "Paused",
|
||||
labelColor: "text-amber-500",
|
||||
labelColor: "text-helios-solar",
|
||||
},
|
||||
draining: {
|
||||
dot: "bg-blue-400",
|
||||
label: "Draining",
|
||||
labelColor: "text-blue-400",
|
||||
dot: "bg-helios-slate animate-pulse",
|
||||
label: "Stopping",
|
||||
labelColor: "text-helios-slate",
|
||||
},
|
||||
} as const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user