mirror of
https://github.com/bybrooklyn/alchemist.git
synced 2026-04-18 01:43:34 -04:00
updated UI to remove odd flashing
This commit is contained in:
@@ -367,6 +367,8 @@ pub(crate) fn default_tonemap_desat() -> f32 {
|
||||
pub struct NotificationsConfig {
|
||||
pub enabled: bool,
|
||||
#[serde(default)]
|
||||
pub allow_local_notifications: bool,
|
||||
#[serde(default)]
|
||||
pub targets: Vec<NotificationTargetConfig>,
|
||||
#[serde(default)]
|
||||
pub webhook_url: Option<String>,
|
||||
|
||||
71
src/db.rs
71
src/db.rs
@@ -830,16 +830,41 @@ impl Db {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns all jobs whose filename stem appears more than
|
||||
/// once across the library. Groups by stem, filtered to
|
||||
/// only non-cancelled jobs. Grouping and path parsing is
|
||||
/// done in Rust using std::path::Path.
|
||||
pub async fn get_duplicate_candidates(&self) -> Result<Vec<DuplicateCandidate>> {
|
||||
timed_query("get_duplicate_candidates", || async {
|
||||
// Find stems that appear more than once, then
|
||||
// fetch only those jobs — never load the full
|
||||
// library into memory.
|
||||
//
|
||||
// SQLite doesn't have a built-in stem function,
|
||||
// so we use a subquery: find all input_path
|
||||
// values that share the same basename (last
|
||||
// path segment after the final '/'), keeping
|
||||
// only those that appear in more than one row.
|
||||
let rows: Vec<DuplicateCandidate> = sqlx::query_as(
|
||||
"SELECT id, input_path, status FROM jobs
|
||||
WHERE status NOT IN ('cancelled')
|
||||
ORDER BY input_path ASC",
|
||||
"SELECT id, input_path, status
|
||||
FROM jobs
|
||||
WHERE status NOT IN ('cancelled')
|
||||
AND (
|
||||
SELECT COUNT(*)
|
||||
FROM jobs j2
|
||||
WHERE j2.status NOT IN ('cancelled')
|
||||
AND SUBSTR(
|
||||
j2.input_path,
|
||||
INSTR(
|
||||
REPLACE(j2.input_path,
|
||||
'\\', '/'), '/'
|
||||
) + 1
|
||||
) =
|
||||
SUBSTR(
|
||||
input_path,
|
||||
INSTR(
|
||||
REPLACE(input_path,
|
||||
'\\', '/'), '/'
|
||||
) + 1
|
||||
)
|
||||
) > 1
|
||||
ORDER BY input_path ASC",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
@@ -1157,6 +1182,38 @@ impl Db {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_jobs_for_analysis_batch(&self, offset: i64, limit: i64) -> Result<Vec<Job>> {
|
||||
timed_query("get_jobs_for_analysis_batch", || async {
|
||||
let rows: Vec<Job> = sqlx::query_as(
|
||||
"SELECT j.id, j.input_path, j.output_path,
|
||||
j.status,
|
||||
(SELECT reason FROM decisions
|
||||
WHERE job_id = j.id
|
||||
ORDER BY created_at DESC LIMIT 1)
|
||||
as decision_reason,
|
||||
COALESCE(j.priority, 0) as priority,
|
||||
COALESCE(CAST(j.progress AS REAL),
|
||||
0.0) as progress,
|
||||
COALESCE(j.attempt_count, 0)
|
||||
as attempt_count,
|
||||
(SELECT vmaf_score FROM encode_stats
|
||||
WHERE job_id = j.id) as vmaf_score,
|
||||
j.created_at, j.updated_at
|
||||
FROM jobs j
|
||||
WHERE j.status IN ('queued', 'failed')
|
||||
AND j.archived = 0
|
||||
ORDER BY j.priority DESC, j.created_at ASC
|
||||
LIMIT ? OFFSET ?",
|
||||
)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_jobs_by_ids(&self, ids: &[i64]) -> Result<Vec<Job>> {
|
||||
if ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
|
||||
@@ -194,42 +194,68 @@ impl Agent {
|
||||
};
|
||||
|
||||
self.set_boot_analyzing(true);
|
||||
|
||||
info!("Auto-analysis: scanning and analyzing pending jobs...");
|
||||
|
||||
if let Err(e) = self.db.reset_interrupted_jobs().await {
|
||||
tracing::warn!("Auto-analysis: could not reset stuck jobs: {e}");
|
||||
}
|
||||
|
||||
let jobs = match self.db.get_jobs_for_analysis().await {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
error!("Auto-analysis: failed to fetch jobs: {e}");
|
||||
self.set_boot_analyzing(false);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let batch_size: i64 = 100;
|
||||
let mut offset: i64 = 0;
|
||||
let mut total_analyzed: usize = 0;
|
||||
|
||||
if jobs.is_empty() {
|
||||
info!("Auto-analysis: no jobs pending analysis.");
|
||||
self.set_boot_analyzing(false);
|
||||
return;
|
||||
}
|
||||
|
||||
info!("Auto-analysis: analyzing {} job(s)...", jobs.len());
|
||||
|
||||
for job in jobs {
|
||||
let pipeline = self.pipeline();
|
||||
match pipeline.analyze_job_only(job).await {
|
||||
Ok(_) => {}
|
||||
loop {
|
||||
let batch = match self
|
||||
.db
|
||||
.get_jobs_for_analysis_batch(offset, batch_size)
|
||||
.await
|
||||
{
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::warn!("Auto-analysis: job analysis failed: {e:?}");
|
||||
error!("Auto-analysis: failed to fetch batch at offset {offset}: {e}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if batch.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let batch_len = batch.len();
|
||||
info!(
|
||||
"Auto-analysis: analyzing batch of {} job(s) (offset {})...",
|
||||
batch_len, offset
|
||||
);
|
||||
|
||||
for job in batch {
|
||||
let pipeline = self.pipeline();
|
||||
match pipeline.analyze_job_only(job).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!("Auto-analysis: job analysis failed: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
total_analyzed += batch_len;
|
||||
offset += batch_size;
|
||||
|
||||
// If batch was smaller than batch_size we're done
|
||||
if batch_len < batch_size as usize {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
self.set_boot_analyzing(false);
|
||||
info!("Auto-analysis: complete.");
|
||||
|
||||
if total_analyzed == 0 {
|
||||
info!("Auto-analysis: no jobs pending analysis.");
|
||||
} else {
|
||||
info!(
|
||||
"Auto-analysis: complete. Analyzed {} job(s) total.",
|
||||
total_analyzed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn current_mode(&self) -> crate::config::EngineMode {
|
||||
@@ -365,11 +391,19 @@ impl Agent {
|
||||
let agent = self.clone();
|
||||
let counter = self.in_flight_jobs.clone();
|
||||
tokio::spawn(async move {
|
||||
struct InFlightGuard(Arc<AtomicUsize>);
|
||||
impl Drop for InFlightGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_sub(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = InFlightGuard(counter);
|
||||
let _permit = permit;
|
||||
if let Err(e) = agent.process_job(job).await {
|
||||
error!("Job processing error: {}", e);
|
||||
}
|
||||
counter.fetch_sub(1, Ordering::SeqCst);
|
||||
// _guard drops here automatically, even on panic
|
||||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
|
||||
@@ -347,6 +347,10 @@ fn app_router(state: Arc<AppState>) -> Router {
|
||||
"/api/settings/watch-dirs",
|
||||
get(get_watch_dirs_handler).post(add_watch_dir_handler),
|
||||
)
|
||||
.route(
|
||||
"/api/settings/folders",
|
||||
post(sync_watch_dirs_handler),
|
||||
)
|
||||
.route(
|
||||
"/api/settings/watch-dirs/:id",
|
||||
delete(remove_watch_dir_handler),
|
||||
@@ -675,7 +679,10 @@ pub(crate) fn normalize_schedule_time(value: &str) -> Option<String> {
|
||||
Some(format!("{:02}:{:02}", hour, minute))
|
||||
}
|
||||
|
||||
pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result<(), String> {
|
||||
pub(crate) async fn validate_notification_url(
|
||||
raw: &str,
|
||||
allow_local: bool,
|
||||
) -> std::result::Result<(), String> {
|
||||
let url =
|
||||
reqwest::Url::parse(raw).map_err(|_| "endpoint_url must be a valid URL".to_string())?;
|
||||
match url.scheme() {
|
||||
@@ -693,12 +700,12 @@ pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result<
|
||||
.host_str()
|
||||
.ok_or_else(|| "endpoint_url must include a host".to_string())?;
|
||||
|
||||
if host.eq_ignore_ascii_case("localhost") {
|
||||
if !allow_local && host.eq_ignore_ascii_case("localhost") {
|
||||
return Err("endpoint_url host is not allowed".to_string());
|
||||
}
|
||||
|
||||
if let Ok(ip) = host.parse::<IpAddr>() {
|
||||
if is_private_ip(ip) {
|
||||
if !allow_local && is_private_ip(ip) {
|
||||
return Err("endpoint_url host is not allowed".to_string());
|
||||
}
|
||||
} else {
|
||||
@@ -713,7 +720,7 @@ pub(crate) async fn validate_notification_url(raw: &str) -> std::result::Result<
|
||||
.map_err(|_| "endpoint_url host could not be resolved".to_string())?;
|
||||
for addr in addrs {
|
||||
resolved = true;
|
||||
if is_private_ip(addr.ip()) {
|
||||
if !allow_local && is_private_ip(addr.ip()) {
|
||||
return Err("endpoint_url host is not allowed".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,6 +298,35 @@ pub(crate) async fn add_watch_dir_handler(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub(crate) struct SyncWatchDirsPayload {
|
||||
dirs: Vec<crate::config::WatchDirConfig>,
|
||||
}
|
||||
|
||||
pub(crate) async fn sync_watch_dirs_handler(
|
||||
State(state): State<Arc<AppState>>,
|
||||
axum::Json(payload): axum::Json<SyncWatchDirsPayload>,
|
||||
) -> impl IntoResponse {
|
||||
let mut next_config = state.config.read().await.clone();
|
||||
next_config.scanner.extra_watch_dirs = payload.dirs;
|
||||
|
||||
if let Err(response) = save_config_or_response(&state, &next_config).await {
|
||||
return *response;
|
||||
}
|
||||
|
||||
{
|
||||
let mut config = state.config.write().await;
|
||||
*config = next_config;
|
||||
}
|
||||
|
||||
refresh_file_watcher(&state).await;
|
||||
|
||||
match state.db.get_watch_dirs().await {
|
||||
Ok(dirs) => axum::Json(dirs).into_response(),
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_watch_dir_handler(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<i64>,
|
||||
|
||||
@@ -432,7 +432,17 @@ export default function JobManager() {
|
||||
}
|
||||
|
||||
const data = await apiJson<Job[]>(`/api/jobs/table?${params}`);
|
||||
setJobs(data);
|
||||
setJobs((prev) =>
|
||||
data.map((serverJob) => {
|
||||
const local = prev.find((j) => j.id === serverJob.id);
|
||||
const terminal = ["completed", "skipped", "failed", "cancelled"];
|
||||
if (local && terminal.includes(local.status)) {
|
||||
// Keep the terminal state from SSE to prevent flickering back to a stale poll state.
|
||||
return { ...serverJob, status: local.status };
|
||||
}
|
||||
return serverJob;
|
||||
})
|
||||
);
|
||||
setActionError(null);
|
||||
} catch (e) {
|
||||
const message = isApiError(e) ? e.message : "Failed to fetch jobs";
|
||||
|
||||
@@ -74,9 +74,9 @@ export default function SettingsPanel() {
|
||||
navItemRefs.current[tab.id] = node;
|
||||
}}
|
||||
onClick={() => paginate(tab.id)}
|
||||
className={`w-full flex items-center gap-3 px-4 py-3 rounded-md text-sm font-bold transition-all duration-200 group ${isActive
|
||||
? "text-helios-ink bg-helios-surface-soft shadow-sm border border-helios-line/20"
|
||||
: "text-helios-slate hover:text-helios-ink hover:bg-helios-surface-soft/50"
|
||||
className={`w-full flex items-center gap-3 px-4 py-3 rounded-md text-sm font-bold border transition-all duration-200 group ${isActive
|
||||
? "text-helios-ink bg-helios-surface-soft shadow-sm border-helios-line/20"
|
||||
: "text-helios-slate border-transparent hover:text-helios-ink hover:bg-helios-surface-soft/50"
|
||||
}`}
|
||||
>
|
||||
<span className="flex items-center gap-3">
|
||||
|
||||
@@ -169,33 +169,26 @@ export default function WatchFolders() {
|
||||
}
|
||||
|
||||
try {
|
||||
// Add to BOTH config (canonical) and DB (profiles)
|
||||
const bundle = await apiJson<SettingsBundleResponse>("/api/settings/bundle");
|
||||
if (!bundle.settings.scanner.directories.includes(normalized)) {
|
||||
await apiAction("/api/settings/bundle", {
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
...bundle.settings,
|
||||
scanner: {
|
||||
...bundle.settings.scanner,
|
||||
directories: [...bundle.settings.scanner.directories, normalized],
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
await apiAction("/api/settings/watch-dirs", {
|
||||
const currentDirs = bundle.settings.scanner.directories;
|
||||
|
||||
if (currentDirs.includes(normalized)) {
|
||||
// Even if it's in config, sync it to ensure it's in DB for profiles
|
||||
await apiAction("/api/settings/folders", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ path: normalized, is_recursive: true }),
|
||||
body: JSON.stringify({
|
||||
dirs: currentDirs.map(d => ({ path: d, is_recursive: true }))
|
||||
}),
|
||||
});
|
||||
} else {
|
||||
await apiAction("/api/settings/folders", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
dirs: [...currentDirs, normalized].map(d => ({ path: d, is_recursive: true }))
|
||||
}),
|
||||
});
|
||||
} catch (innerE) {
|
||||
// If it's just a duplicate DB error we can ignore it since we successfully added to canonical
|
||||
if (!(isApiError(innerE) && innerE.status === 409)) {
|
||||
throw innerE;
|
||||
}
|
||||
}
|
||||
|
||||
setDirInput("");
|
||||
@@ -214,30 +207,16 @@ export default function WatchFolders() {
|
||||
if (!dir) return;
|
||||
|
||||
try {
|
||||
// Remove from canonical config if present
|
||||
const bundle = await apiJson<SettingsBundleResponse>("/api/settings/bundle");
|
||||
const filteredDirs = bundle.settings.scanner.directories.filter(candidate => candidate !== dir.path);
|
||||
const filteredDirs = bundle.settings.scanner.directories.filter(candidate => candidate !== dirPath);
|
||||
|
||||
if (filteredDirs.length !== bundle.settings.scanner.directories.length) {
|
||||
await apiAction("/api/settings/bundle", {
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
...bundle.settings,
|
||||
scanner: {
|
||||
...bundle.settings.scanner,
|
||||
directories: filteredDirs,
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
// Remove from DB if it has a real ID
|
||||
if (dir.id > 0) {
|
||||
await apiAction(`/api/settings/watch-dirs/${dir.id}`, {
|
||||
method: "DELETE",
|
||||
});
|
||||
}
|
||||
await apiAction("/api/settings/folders", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
dirs: filteredDirs.map(d => ({ path: d, is_recursive: true }))
|
||||
}),
|
||||
});
|
||||
|
||||
setError(null);
|
||||
await fetchDirs();
|
||||
|
||||
@@ -15,7 +15,7 @@ interface LibraryStepProps {
|
||||
}
|
||||
|
||||
interface FsBreadcrumb {
|
||||
name: string;
|
||||
label: string;
|
||||
path: string;
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ export default function LibraryStep({
|
||||
</div>
|
||||
|
||||
{pickerOpen ? (
|
||||
<div className="flex h-[420px] flex-col gap-4 overflow-hidden rounded-lg border border-helios-line/30 bg-helios-surface p-4">
|
||||
<div className="flex h-[540px] flex-col gap-4 overflow-hidden rounded-lg border border-helios-line/30 bg-helios-surface p-4">
|
||||
<div className="shrink-0 flex items-start justify-between gap-4">
|
||||
<div className="min-w-0 space-y-3">
|
||||
<div className="space-y-1">
|
||||
@@ -245,7 +245,7 @@ export default function LibraryStep({
|
||||
: "rounded-lg px-2 py-1 transition-colors hover:bg-helios-surface-soft hover:text-helios-ink"
|
||||
}
|
||||
>
|
||||
{crumb.name.replace(/^\//, "")}
|
||||
{crumb.label.replace(/^\//, "")}
|
||||
</button>
|
||||
</div>
|
||||
);
|
||||
@@ -316,7 +316,7 @@ export default function LibraryStep({
|
||||
..
|
||||
</span>
|
||||
<span className="block truncate text-xs text-helios-slate">
|
||||
Go up to {parentBreadcrumb.name.replace(/^\//, "")}
|
||||
Go up to {parentBreadcrumb.label.replace(/^\//, "")}
|
||||
</span>
|
||||
</div>
|
||||
</button>
|
||||
|
||||
@@ -117,7 +117,7 @@ export default function ServerDirectoryPicker({
|
||||
/>
|
||||
|
||||
<div className="absolute inset-0 flex items-center justify-center px-4 py-6">
|
||||
<div className="w-full max-w-5xl rounded-xl border border-helios-line/30 bg-helios-surface shadow-2xl overflow-hidden flex flex-col max-h-[min(90vh,800px)]">
|
||||
<div className="w-full max-w-7xl rounded-xl border border-helios-line/30 bg-helios-surface shadow-2xl overflow-hidden flex flex-col max-h-[min(95vh,1000px)]">
|
||||
<div className="border-b border-helios-line/20 px-6 py-5 flex items-start justify-between gap-4">
|
||||
<div>
|
||||
<div className="flex items-center gap-3">
|
||||
|
||||
Reference in New Issue
Block a user