mirror of
https://github.com/bybrooklyn/alchemist.git
synced 2026-04-18 01:43:34 -04:00
fix major security bugs.
This commit is contained in:
72
src/main.rs
72
src/main.rs
@@ -206,8 +206,16 @@ async fn run() -> Result<()> {
|
||||
config_path, e
|
||||
);
|
||||
if is_server_mode {
|
||||
warn!("Config load failed in server mode. Entering Setup Mode (Web UI).");
|
||||
(config::Config::default(), true)
|
||||
warn!(
|
||||
"Config load failed in server mode. \
|
||||
Will check for existing users before \
|
||||
entering Setup Mode."
|
||||
);
|
||||
// Do not force setup_mode=true here.
|
||||
// The user-check below will set it if needed.
|
||||
// If users exist, we start with defaults but
|
||||
// do NOT re-enable unauthenticated setup endpoints.
|
||||
(config::Config::default(), false)
|
||||
} else {
|
||||
(config::Config::default(), false)
|
||||
}
|
||||
@@ -278,6 +286,66 @@ async fn run() -> Result<()> {
|
||||
Err(err) => error!("Failed to reset interrupted jobs: {}", err),
|
||||
}
|
||||
|
||||
// Also clean up any temp files left by cancelled jobs
|
||||
// (process was killed before runtime cleanup could run)
|
||||
match db.get_jobs_by_status(db::JobState::Cancelled).await {
|
||||
Ok(cancelled_jobs) => {
|
||||
for job in cancelled_jobs {
|
||||
let temp_path = orphaned_temp_output_path(&job.output_path);
|
||||
if std::fs::metadata(&temp_path).is_ok() {
|
||||
match std::fs::remove_file(&temp_path) {
|
||||
Ok(_) => warn!(
|
||||
"Removed orphaned temp file \
|
||||
from cancelled job: {}",
|
||||
temp_path.display()
|
||||
),
|
||||
Err(err) => error!(
|
||||
"Failed to remove cancelled \
|
||||
job temp file {}: {}",
|
||||
temp_path.display(),
|
||||
err
|
||||
),
|
||||
}
|
||||
}
|
||||
// Also check for subtitle sidecar temps
|
||||
// Pattern: output_path + ".alchemist-part"
|
||||
// and output_path + ".N.alchemist-part"
|
||||
let sidecar_glob = format!("{}*.alchemist-part", job.output_path);
|
||||
// Use glob-style scan: check parent dir
|
||||
// for files matching the pattern
|
||||
if let Some(parent) = std::path::Path::new(&job.output_path).parent() {
|
||||
if let Ok(entries) = std::fs::read_dir(parent) {
|
||||
for entry in entries.flatten() {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
if name.ends_with(".alchemist-part") {
|
||||
let path = entry.path();
|
||||
match std::fs::remove_file(&path) {
|
||||
Ok(_) => warn!(
|
||||
"Removed orphaned \
|
||||
subtitle sidecar: {}",
|
||||
path.display()
|
||||
),
|
||||
Err(err) => error!(
|
||||
"Failed to remove \
|
||||
sidecar {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(sidecar_glob);
|
||||
}
|
||||
}
|
||||
Err(err) => error!(
|
||||
"Failed to load cancelled jobs for \
|
||||
cleanup: {}",
|
||||
err
|
||||
),
|
||||
}
|
||||
|
||||
let log_retention_days = config.system.log_retention_days.unwrap_or(30);
|
||||
match db.prune_old_logs(log_retention_days).await {
|
||||
Ok(count) if count > 0 => info!("Pruned {} old log rows", count),
|
||||
|
||||
@@ -201,18 +201,20 @@ impl Agent {
|
||||
}
|
||||
|
||||
let batch_size: i64 = 100;
|
||||
let mut offset: i64 = 0;
|
||||
let mut total_analyzed: usize = 0;
|
||||
|
||||
loop {
|
||||
let batch = match self
|
||||
.db
|
||||
.get_jobs_for_analysis_batch(offset, batch_size)
|
||||
.await
|
||||
{
|
||||
// Always fetch from offset 0 — as jobs are
|
||||
// analyzed they leave the eligible set, so the
|
||||
// next fetch naturally returns the next batch
|
||||
// of still-unanalyzed jobs.
|
||||
let batch = match self.db.get_jobs_for_analysis_batch(0, batch_size).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
error!("Auto-analysis: failed to fetch batch at offset {offset}: {e}");
|
||||
error!(
|
||||
"Auto-analysis: failed to fetch \
|
||||
batch: {e}"
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -223,8 +225,9 @@ impl Agent {
|
||||
|
||||
let batch_len = batch.len();
|
||||
info!(
|
||||
"Auto-analysis: analyzing batch of {} job(s) (offset {})...",
|
||||
batch_len, offset
|
||||
"Auto-analysis: analyzing batch of {} \
|
||||
job(s)...",
|
||||
batch_len
|
||||
);
|
||||
|
||||
for job in batch {
|
||||
@@ -232,18 +235,15 @@ impl Agent {
|
||||
match pipeline.analyze_job_only(job).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!("Auto-analysis: job analysis failed: {e:?}");
|
||||
tracing::warn!(
|
||||
"Auto-analysis: job analysis \
|
||||
failed: {e:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
total_analyzed += batch_len;
|
||||
offset += batch_size;
|
||||
|
||||
// If batch was smaller than batch_size we're done
|
||||
if batch_len < batch_size as usize {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
self.set_boot_analyzing(false);
|
||||
|
||||
@@ -131,11 +131,14 @@ pub(crate) fn build_clear_session_cookie() -> String {
|
||||
}
|
||||
|
||||
fn secure_cookie_enabled() -> bool {
|
||||
// Default to false — Alchemist serves plain HTTP.
|
||||
// Set ALCHEMIST_COOKIE_SECURE=true only when
|
||||
// running behind a TLS-terminating reverse proxy.
|
||||
match std::env::var("ALCHEMIST_COOKIE_SECURE") {
|
||||
Ok(value) => matches!(
|
||||
value.trim().to_ascii_lowercase().as_str(),
|
||||
"1" | "true" | "yes" | "on"
|
||||
),
|
||||
Err(_) => !cfg!(debug_assertions),
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +101,28 @@ pub(crate) async fn auth_middleware(
|
||||
return next.run(req).await;
|
||||
}
|
||||
if state.setup_required.load(Ordering::Relaxed) && path.starts_with("/api/fs/") {
|
||||
return next.run(req).await;
|
||||
// Only allow filesystem browsing from localhost
|
||||
// during setup — no account exists yet so we
|
||||
// cannot authenticate the caller.
|
||||
let connect_info = req.extensions().get::<ConnectInfo<SocketAddr>>();
|
||||
let is_local = connect_info
|
||||
.map(|ci| {
|
||||
let ip = ci.0.ip();
|
||||
ip.is_loopback()
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if is_local {
|
||||
return next.run(req).await;
|
||||
}
|
||||
// Non-local request during setup -> 403
|
||||
return Response::builder()
|
||||
.status(StatusCode::FORBIDDEN)
|
||||
.body(axum::body::Body::from(
|
||||
"Filesystem browsing is only available \
|
||||
from localhost during setup",
|
||||
))
|
||||
.unwrap_or_else(|_| StatusCode::FORBIDDEN.into_response());
|
||||
}
|
||||
if state.setup_required.load(Ordering::Relaxed) && path == "/api/settings/bundle" {
|
||||
return next.run(req).await;
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, error, info};
|
||||
@@ -97,6 +98,7 @@ pub struct FileWatcher {
|
||||
inner: Arc<std::sync::Mutex<Option<RecommendedWatcher>>>,
|
||||
tx: mpsc::UnboundedSender<PendingEvent>,
|
||||
agent: Option<Arc<crate::media::processor::Agent>>,
|
||||
analysis_pending: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
@@ -108,8 +110,10 @@ impl FileWatcher {
|
||||
inner: Arc::new(std::sync::Mutex::new(None)),
|
||||
tx,
|
||||
agent,
|
||||
analysis_pending: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
let agent_clone = watcher.agent.clone();
|
||||
let analysis_pending_clone = watcher.analysis_pending.clone();
|
||||
|
||||
// Process filesystem events after the target file has stabilized.
|
||||
tokio::spawn(async move {
|
||||
@@ -160,10 +164,22 @@ impl FileWatcher {
|
||||
Ok(true) => {
|
||||
info!("Auto-enqueued: {:?}", key.path);
|
||||
if let Some(agent) = &agent_clone {
|
||||
let agent = agent.clone();
|
||||
tokio::spawn(async move {
|
||||
agent.analyze_pending_jobs().await;
|
||||
});
|
||||
// Only spawn if no analysis pass is
|
||||
// already queued — coalesces bursts
|
||||
let already_pending =
|
||||
analysis_pending_clone
|
||||
.swap(true, Ordering::SeqCst);
|
||||
if !already_pending {
|
||||
let agent = agent.clone();
|
||||
let flag = analysis_pending_clone.clone();
|
||||
tokio::spawn(async move {
|
||||
// Clear the flag before starting so
|
||||
// new arrivals during analysis still
|
||||
// get their own pass
|
||||
flag.store(false, Ordering::SeqCst);
|
||||
agent.analyze_pending_jobs().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false) => debug!("No queue update needed for {:?}", key.path),
|
||||
|
||||
@@ -457,10 +457,18 @@ export default function JobManager() {
|
||||
data.map((serverJob) => {
|
||||
const local = prev.find((j) => j.id === serverJob.id);
|
||||
const terminal = ["completed", "skipped", "failed", "cancelled"];
|
||||
if (local && terminal.includes(local.status)) {
|
||||
// Keep the terminal state from SSE to prevent flickering back to a stale poll state.
|
||||
const serverIsTerminal = terminal.includes(serverJob.status);
|
||||
if (
|
||||
local &&
|
||||
terminal.includes(local.status) &&
|
||||
serverIsTerminal
|
||||
) {
|
||||
// Both agree this is terminal — keep
|
||||
// local status to prevent SSE→poll flicker.
|
||||
return { ...serverJob, status: local.status };
|
||||
}
|
||||
// Server says it changed (e.g. retry queued it)
|
||||
// — trust the server.
|
||||
return serverJob;
|
||||
})
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user