From 120392e11b6f78ade13c8357abbcf314a1c8d124 Mon Sep 17 00:00:00 2001 From: bybrooklyn Date: Tue, 31 Mar 2026 23:01:09 -0400 Subject: [PATCH] updated UI to remove odd flashing --- src/config.rs | 2 + src/db.rs | 71 ++++++++++++++-- src/media/processor.rs | 82 +++++++++++++------ src/server/mod.rs | 15 +++- src/server/scan.rs | 29 +++++++ web/src/components/JobManager.tsx | 12 ++- web/src/components/SettingsPanel.tsx | 6 +- web/src/components/WatchFolders.tsx | 69 ++++++---------- web/src/components/setup/LibraryStep.tsx | 8 +- .../components/ui/ServerDirectoryPicker.tsx | 2 +- 10 files changed, 207 insertions(+), 89 deletions(-) diff --git a/src/config.rs b/src/config.rs index ff66831..2a8940f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -367,6 +367,8 @@ pub(crate) fn default_tonemap_desat() -> f32 { pub struct NotificationsConfig { pub enabled: bool, #[serde(default)] + pub allow_local_notifications: bool, + #[serde(default)] pub targets: Vec, #[serde(default)] pub webhook_url: Option, diff --git a/src/db.rs b/src/db.rs index cbc3684..3066645 100644 --- a/src/db.rs +++ b/src/db.rs @@ -830,16 +830,41 @@ impl Db { .await } - /// Returns all jobs whose filename stem appears more than - /// once across the library. Groups by stem, filtered to - /// only non-cancelled jobs. Grouping and path parsing is - /// done in Rust using std::path::Path. pub async fn get_duplicate_candidates(&self) -> Result> { timed_query("get_duplicate_candidates", || async { + // Find stems that appear more than once, then + // fetch only those jobs — never load the full + // library into memory. + // + // SQLite doesn't have a built-in stem function, + // so we use a subquery: find all input_path + // values that share the same basename (last + // path segment after the final '/'), keeping + // only those that appear in more than one row. let rows: Vec = sqlx::query_as( - "SELECT id, input_path, status FROM jobs - WHERE status NOT IN ('cancelled') - ORDER BY input_path ASC", + "SELECT id, input_path, status + FROM jobs + WHERE status NOT IN ('cancelled') + AND ( + SELECT COUNT(*) + FROM jobs j2 + WHERE j2.status NOT IN ('cancelled') + AND SUBSTR( + j2.input_path, + INSTR( + REPLACE(j2.input_path, + '\\', '/'), '/' + ) + 1 + ) = + SUBSTR( + input_path, + INSTR( + REPLACE(input_path, + '\\', '/'), '/' + ) + 1 + ) + ) > 1 + ORDER BY input_path ASC", ) .fetch_all(&self.pool) .await?; @@ -1157,6 +1182,38 @@ impl Db { .await } + pub async fn get_jobs_for_analysis_batch(&self, offset: i64, limit: i64) -> Result> { + timed_query("get_jobs_for_analysis_batch", || async { + let rows: Vec = sqlx::query_as( + "SELECT j.id, j.input_path, j.output_path, + j.status, + (SELECT reason FROM decisions + WHERE job_id = j.id + ORDER BY created_at DESC LIMIT 1) + as decision_reason, + COALESCE(j.priority, 0) as priority, + COALESCE(CAST(j.progress AS REAL), + 0.0) as progress, + COALESCE(j.attempt_count, 0) + as attempt_count, + (SELECT vmaf_score FROM encode_stats + WHERE job_id = j.id) as vmaf_score, + j.created_at, j.updated_at + FROM jobs j + WHERE j.status IN ('queued', 'failed') + AND j.archived = 0 + ORDER BY j.priority DESC, j.created_at ASC + LIMIT ? OFFSET ?", + ) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + Ok(rows) + }) + .await + } + pub async fn get_jobs_by_ids(&self, ids: &[i64]) -> Result> { if ids.is_empty() { return Ok(Vec::new()); diff --git a/src/media/processor.rs b/src/media/processor.rs index 40eb633..340294d 100644 --- a/src/media/processor.rs +++ b/src/media/processor.rs @@ -194,42 +194,68 @@ impl Agent { }; self.set_boot_analyzing(true); - info!("Auto-analysis: scanning and analyzing pending jobs..."); if let Err(e) = self.db.reset_interrupted_jobs().await { tracing::warn!("Auto-analysis: could not reset stuck jobs: {e}"); } - let jobs = match self.db.get_jobs_for_analysis().await { - Ok(j) => j, - Err(e) => { - error!("Auto-analysis: failed to fetch jobs: {e}"); - self.set_boot_analyzing(false); - return; - } - }; + let batch_size: i64 = 100; + let mut offset: i64 = 0; + let mut total_analyzed: usize = 0; - if jobs.is_empty() { - info!("Auto-analysis: no jobs pending analysis."); - self.set_boot_analyzing(false); - return; - } - - info!("Auto-analysis: analyzing {} job(s)...", jobs.len()); - - for job in jobs { - let pipeline = self.pipeline(); - match pipeline.analyze_job_only(job).await { - Ok(_) => {} + loop { + let batch = match self + .db + .get_jobs_for_analysis_batch(offset, batch_size) + .await + { + Ok(b) => b, Err(e) => { - tracing::warn!("Auto-analysis: job analysis failed: {e:?}"); + error!("Auto-analysis: failed to fetch batch at offset {offset}: {e}"); + break; } + }; + + if batch.is_empty() { + break; + } + + let batch_len = batch.len(); + info!( + "Auto-analysis: analyzing batch of {} job(s) (offset {})...", + batch_len, offset + ); + + for job in batch { + let pipeline = self.pipeline(); + match pipeline.analyze_job_only(job).await { + Ok(_) => {} + Err(e) => { + tracing::warn!("Auto-analysis: job analysis failed: {e:?}"); + } + } + } + + total_analyzed += batch_len; + offset += batch_size; + + // If batch was smaller than batch_size we're done + if batch_len < batch_size as usize { + break; } } self.set_boot_analyzing(false); - info!("Auto-analysis: complete."); + + if total_analyzed == 0 { + info!("Auto-analysis: no jobs pending analysis."); + } else { + info!( + "Auto-analysis: complete. Analyzed {} job(s) total.", + total_analyzed + ); + } } pub async fn current_mode(&self) -> crate::config::EngineMode { @@ -365,11 +391,19 @@ impl Agent { let agent = self.clone(); let counter = self.in_flight_jobs.clone(); tokio::spawn(async move { + struct InFlightGuard(Arc); + impl Drop for InFlightGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::SeqCst); + } + } + + let _guard = InFlightGuard(counter); let _permit = permit; if let Err(e) = agent.process_job(job).await { error!("Job processing error: {}", e); } - counter.fetch_sub(1, Ordering::SeqCst); + // _guard drops here automatically, even on panic }); } Ok(None) => { diff --git a/src/server/mod.rs b/src/server/mod.rs index dcfc380..7e289d6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -347,6 +347,10 @@ fn app_router(state: Arc) -> Router { "/api/settings/watch-dirs", get(get_watch_dirs_handler).post(add_watch_dir_handler), ) + .route( + "/api/settings/folders", + post(sync_watch_dirs_handler), + ) .route( "/api/settings/watch-dirs/:id", delete(remove_watch_dir_handler), @@ -675,7 +679,10 @@ pub(crate) fn normalize_schedule_time(value: &str) -> Option { Some(format!("{:02}:{:02}", hour, minute)) } -pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result<(), String> { +pub(crate) async fn validate_notification_url( + raw: &str, + allow_local: bool, +) -> std::result::Result<(), String> { let url = reqwest::Url::parse(raw).map_err(|_| "endpoint_url must be a valid URL".to_string())?; match url.scheme() { @@ -693,12 +700,12 @@ pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result< .host_str() .ok_or_else(|| "endpoint_url must include a host".to_string())?; - if host.eq_ignore_ascii_case("localhost") { + if !allow_local && host.eq_ignore_ascii_case("localhost") { return Err("endpoint_url host is not allowed".to_string()); } if let Ok(ip) = host.parse::() { - if is_private_ip(ip) { + if !allow_local && is_private_ip(ip) { return Err("endpoint_url host is not allowed".to_string()); } } else { @@ -713,7 +720,7 @@ pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result< .map_err(|_| "endpoint_url host could not be resolved".to_string())?; for addr in addrs { resolved = true; - if is_private_ip(addr.ip()) { + if !allow_local && is_private_ip(addr.ip()) { return Err("endpoint_url host is not allowed".to_string()); } } diff --git a/src/server/scan.rs b/src/server/scan.rs index 1efae01..ff36f61 100644 --- a/src/server/scan.rs +++ b/src/server/scan.rs @@ -298,6 +298,35 @@ pub(crate) async fn add_watch_dir_handler( } } +#[derive(Deserialize)] +pub(crate) struct SyncWatchDirsPayload { + dirs: Vec, +} + +pub(crate) async fn sync_watch_dirs_handler( + State(state): State>, + axum::Json(payload): axum::Json, +) -> impl IntoResponse { + let mut next_config = state.config.read().await.clone(); + next_config.scanner.extra_watch_dirs = payload.dirs; + + if let Err(response) = save_config_or_response(&state, &next_config).await { + return *response; + } + + { + let mut config = state.config.write().await; + *config = next_config; + } + + refresh_file_watcher(&state).await; + + match state.db.get_watch_dirs().await { + Ok(dirs) => axum::Json(dirs).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + } +} + pub(crate) async fn remove_watch_dir_handler( State(state): State>, Path(id): Path, diff --git a/web/src/components/JobManager.tsx b/web/src/components/JobManager.tsx index 9ddc002..9383a04 100644 --- a/web/src/components/JobManager.tsx +++ b/web/src/components/JobManager.tsx @@ -432,7 +432,17 @@ export default function JobManager() { } const data = await apiJson(`/api/jobs/table?${params}`); - setJobs(data); + setJobs((prev) => + data.map((serverJob) => { + const local = prev.find((j) => j.id === serverJob.id); + const terminal = ["completed", "skipped", "failed", "cancelled"]; + if (local && terminal.includes(local.status)) { + // Keep the terminal state from SSE to prevent flickering back to a stale poll state. + return { ...serverJob, status: local.status }; + } + return serverJob; + }) + ); setActionError(null); } catch (e) { const message = isApiError(e) ? e.message : "Failed to fetch jobs"; diff --git a/web/src/components/SettingsPanel.tsx b/web/src/components/SettingsPanel.tsx index e598e35..fb7fed5 100644 --- a/web/src/components/SettingsPanel.tsx +++ b/web/src/components/SettingsPanel.tsx @@ -74,9 +74,9 @@ export default function SettingsPanel() { navItemRefs.current[tab.id] = node; }} onClick={() => paginate(tab.id)} - className={`w-full flex items-center gap-3 px-4 py-3 rounded-md text-sm font-bold transition-all duration-200 group ${isActive - ? "text-helios-ink bg-helios-surface-soft shadow-sm border border-helios-line/20" - : "text-helios-slate hover:text-helios-ink hover:bg-helios-surface-soft/50" + className={`w-full flex items-center gap-3 px-4 py-3 rounded-md text-sm font-bold border transition-all duration-200 group ${isActive + ? "text-helios-ink bg-helios-surface-soft shadow-sm border-helios-line/20" + : "text-helios-slate border-transparent hover:text-helios-ink hover:bg-helios-surface-soft/50" }`} > diff --git a/web/src/components/WatchFolders.tsx b/web/src/components/WatchFolders.tsx index 996b013..c78f617 100644 --- a/web/src/components/WatchFolders.tsx +++ b/web/src/components/WatchFolders.tsx @@ -169,33 +169,26 @@ export default function WatchFolders() { } try { - // Add to BOTH config (canonical) and DB (profiles) const bundle = await apiJson("/api/settings/bundle"); - if (!bundle.settings.scanner.directories.includes(normalized)) { - await apiAction("/api/settings/bundle", { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - ...bundle.settings, - scanner: { - ...bundle.settings.scanner, - directories: [...bundle.settings.scanner.directories, normalized], - }, - }), - }); - } - - try { - await apiAction("/api/settings/watch-dirs", { + const currentDirs = bundle.settings.scanner.directories; + + if (currentDirs.includes(normalized)) { + // Even if it's in config, sync it to ensure it's in DB for profiles + await apiAction("/api/settings/folders", { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ path: normalized, is_recursive: true }), + body: JSON.stringify({ + dirs: currentDirs.map(d => ({ path: d, is_recursive: true })) + }), + }); + } else { + await apiAction("/api/settings/folders", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + dirs: [...currentDirs, normalized].map(d => ({ path: d, is_recursive: true })) + }), }); - } catch (innerE) { - // If it's just a duplicate DB error we can ignore it since we successfully added to canonical - if (!(isApiError(innerE) && innerE.status === 409)) { - throw innerE; - } } setDirInput(""); @@ -214,30 +207,16 @@ export default function WatchFolders() { if (!dir) return; try { - // Remove from canonical config if present const bundle = await apiJson("/api/settings/bundle"); - const filteredDirs = bundle.settings.scanner.directories.filter(candidate => candidate !== dir.path); + const filteredDirs = bundle.settings.scanner.directories.filter(candidate => candidate !== dirPath); - if (filteredDirs.length !== bundle.settings.scanner.directories.length) { - await apiAction("/api/settings/bundle", { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - ...bundle.settings, - scanner: { - ...bundle.settings.scanner, - directories: filteredDirs, - }, - }), - }); - } - - // Remove from DB if it has a real ID - if (dir.id > 0) { - await apiAction(`/api/settings/watch-dirs/${dir.id}`, { - method: "DELETE", - }); - } + await apiAction("/api/settings/folders", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + dirs: filteredDirs.map(d => ({ path: d, is_recursive: true })) + }), + }); setError(null); await fetchDirs(); diff --git a/web/src/components/setup/LibraryStep.tsx b/web/src/components/setup/LibraryStep.tsx index 586d307..6def716 100644 --- a/web/src/components/setup/LibraryStep.tsx +++ b/web/src/components/setup/LibraryStep.tsx @@ -15,7 +15,7 @@ interface LibraryStepProps { } interface FsBreadcrumb { - name: string; + label: string; path: string; } @@ -193,7 +193,7 @@ export default function LibraryStep({ {pickerOpen ? ( -
+
@@ -245,7 +245,7 @@ export default function LibraryStep({ : "rounded-lg px-2 py-1 transition-colors hover:bg-helios-surface-soft hover:text-helios-ink" } > - {crumb.name.replace(/^\//, "")} + {crumb.label.replace(/^\//, "")}
); @@ -316,7 +316,7 @@ export default function LibraryStep({ .. - Go up to {parentBreadcrumb.name.replace(/^\//, "")} + Go up to {parentBreadcrumb.label.replace(/^\//, "")}
diff --git a/web/src/components/ui/ServerDirectoryPicker.tsx b/web/src/components/ui/ServerDirectoryPicker.tsx index 5d53e1a..21ca4c5 100644 --- a/web/src/components/ui/ServerDirectoryPicker.tsx +++ b/web/src/components/ui/ServerDirectoryPicker.tsx @@ -117,7 +117,7 @@ export default function ServerDirectoryPicker({ />
-
+