mirror of
https://github.com/bybrooklyn/alchemist.git
synced 2026-04-18 01:43:34 -04:00
feat: add job stall detection, database connection limits, configurable local notifications, and secure proxy header handling with entrypoint support.
This commit is contained in:
11
Dockerfile
vendored
11
Dockerfile
vendored
@@ -43,6 +43,7 @@ RUN apt-get update && \
|
|||||||
va-driver-all \
|
va-driver-all \
|
||||||
libsqlite3-0 \
|
libsqlite3-0 \
|
||||||
ca-certificates \
|
ca-certificates \
|
||||||
|
gosu \
|
||||||
&& if [ "$(dpkg --print-architecture)" = "amd64" ]; then \
|
&& if [ "$(dpkg --print-architecture)" = "amd64" ]; then \
|
||||||
apt-get install -y --no-install-recommends \
|
apt-get install -y --no-install-recommends \
|
||||||
intel-media-va-driver-non-free \
|
intel-media-va-driver-non-free \
|
||||||
@@ -75,10 +76,16 @@ RUN set -e; \
|
|||||||
COPY --from=builder /app/target/release/alchemist /usr/local/bin/alchemist
|
COPY --from=builder /app/target/release/alchemist /usr/local/bin/alchemist
|
||||||
|
|
||||||
# Set environment variables
|
# Set environment variables
|
||||||
ENV LIBVA_DRIVER_NAME=iHD
|
# VA-API driver auto-detection: do NOT hardcode LIBVA_DRIVER_NAME here.
|
||||||
|
# Users can override via: docker run -e LIBVA_DRIVER_NAME=iHD ...
|
||||||
|
# Common values: iHD (Intel ≥ Broadwell), i965 (older Intel), radeonsi (AMD)
|
||||||
ENV RUST_LOG=info
|
ENV RUST_LOG=info
|
||||||
ENV ALCHEMIST_CONFIG_PATH=/app/config/config.toml
|
ENV ALCHEMIST_CONFIG_PATH=/app/config/config.toml
|
||||||
ENV ALCHEMIST_DB_PATH=/app/data/alchemist.db
|
ENV ALCHEMIST_DB_PATH=/app/data/alchemist.db
|
||||||
|
COPY entrypoint.sh /app/entrypoint.sh
|
||||||
|
RUN chmod +x /app/entrypoint.sh
|
||||||
|
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
|
|
||||||
ENTRYPOINT ["alchemist"]
|
ENTRYPOINT ["/app/entrypoint.sh"]
|
||||||
|
CMD ["alchemist"]
|
||||||
|
|||||||
31
entrypoint.sh
Normal file
31
entrypoint.sh
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
PUID=${PUID:-0}
|
||||||
|
PGID=${PGID:-0}
|
||||||
|
|
||||||
|
if [ "$PUID" -ne 0 ] && [ "$PGID" -ne 0 ]; then
|
||||||
|
echo "Starting Alchemist with UID: $PUID, GID: $PGID"
|
||||||
|
|
||||||
|
# Create group and user securely if they don't exist
|
||||||
|
if ! getent group alchemist >/dev/null; then
|
||||||
|
groupadd -g "$PGID" alchemist
|
||||||
|
fi
|
||||||
|
if ! getent passwd alchemist >/dev/null; then
|
||||||
|
useradd -u "$PUID" -g "$PGID" -s /bin/bash -m -d /app alchemist
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Take ownership of app data — skip gracefully for read-only mounts
|
||||||
|
for dir in /app/config /app/data; do
|
||||||
|
if [ -d "$dir" ]; then
|
||||||
|
chown -R alchemist:alchemist "$dir" 2>/dev/null || \
|
||||||
|
echo "Warning: Cannot chown $dir (read-only mount?). Continuing..."
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Drop privileges and execute
|
||||||
|
exec gosu alchemist "$@"
|
||||||
|
else
|
||||||
|
# Run natively
|
||||||
|
exec "$@"
|
||||||
|
fi
|
||||||
@@ -716,12 +716,23 @@ impl Config {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save config to file
|
/// Save config to file atomically (write to temp, then rename).
|
||||||
|
/// This prevents corruption if the process crashes mid-write.
|
||||||
pub fn save(&self, path: &Path) -> Result<()> {
|
pub fn save(&self, path: &Path) -> Result<()> {
|
||||||
let mut config = self.clone();
|
let mut config = self.clone();
|
||||||
config.canonicalize_for_save();
|
config.canonicalize_for_save();
|
||||||
let content = toml::to_string_pretty(&config)?;
|
let content = toml::to_string_pretty(&config)?;
|
||||||
std::fs::write(path, content)?;
|
|
||||||
|
let tmp = path.with_extension("toml.tmp");
|
||||||
|
std::fs::write(&tmp, &content)?;
|
||||||
|
|
||||||
|
// Atomic rename: if this fails, the original config is still intact.
|
||||||
|
if let Err(e) = std::fs::rename(&tmp, path) {
|
||||||
|
// Clean up the temp file on rename failure
|
||||||
|
let _ = std::fs::remove_file(&tmp);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -619,7 +619,9 @@ impl Db {
|
|||||||
.journal_mode(SqliteJournalMode::Wal)
|
.journal_mode(SqliteJournalMode::Wal)
|
||||||
.busy_timeout(Duration::from_secs(5));
|
.busy_timeout(Duration::from_secs(5));
|
||||||
|
|
||||||
let pool = SqlitePool::connect_with(options).await?;
|
let pool = sqlx::sqlite::SqlitePoolOptions::new()
|
||||||
|
.max_connections(1)
|
||||||
|
.connect_with(options).await?;
|
||||||
info!(
|
info!(
|
||||||
target: "startup",
|
target: "startup",
|
||||||
"Database connection opened in {} ms",
|
"Database connection opened in {} ms",
|
||||||
|
|||||||
13
src/main.rs
13
src/main.rs
@@ -463,17 +463,18 @@ async fn run() -> Result<()> {
|
|||||||
// Keep legacy channel for transition compatibility
|
// Keep legacy channel for transition compatibility
|
||||||
let (tx, _rx) = broadcast::channel(100);
|
let (tx, _rx) = broadcast::channel(100);
|
||||||
|
|
||||||
// Initialize Notification Manager
|
|
||||||
let notification_manager = Arc::new(alchemist::notifications::NotificationManager::new(
|
|
||||||
db.as_ref().clone(),
|
|
||||||
));
|
|
||||||
notification_manager.start_listener(tx.subscribe());
|
|
||||||
|
|
||||||
let transcoder = Arc::new(Transcoder::new());
|
let transcoder = Arc::new(Transcoder::new());
|
||||||
let hardware_state = hardware::HardwareState::new(Some(hw_info.clone()));
|
let hardware_state = hardware::HardwareState::new(Some(hw_info.clone()));
|
||||||
let hardware_probe_log = Arc::new(RwLock::new(initial_probe_log));
|
let hardware_probe_log = Arc::new(RwLock::new(initial_probe_log));
|
||||||
let config = Arc::new(RwLock::new(config));
|
let config = Arc::new(RwLock::new(config));
|
||||||
|
|
||||||
|
// Initialize Notification Manager (needs config for allow_local_notifications)
|
||||||
|
let notification_manager = Arc::new(alchemist::notifications::NotificationManager::new(
|
||||||
|
db.as_ref().clone(),
|
||||||
|
config.clone(),
|
||||||
|
));
|
||||||
|
notification_manager.start_listener(tx.subscribe());
|
||||||
|
|
||||||
let maintenance_db = db.clone();
|
let maintenance_db = db.clone();
|
||||||
let maintenance_config = config.clone();
|
let maintenance_config = config.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|||||||
@@ -1263,8 +1263,27 @@ impl Pipeline {
|
|||||||
|
|
||||||
if let Ok(file_settings) = self.db.get_file_settings().await {
|
if let Ok(file_settings) = self.db.get_file_settings().await {
|
||||||
if file_settings.delete_source {
|
if file_settings.delete_source {
|
||||||
if let Err(e) = std::fs::remove_file(input_path) {
|
// Safety: verify the promoted output is intact before destroying the source.
|
||||||
tracing::warn!("Failed to delete source {:?}: {}", input_path, e);
|
// This prevents data loss if the filesystem silently corrupted the output
|
||||||
|
// during rename (e.g., stale NFS/SMB mount, full disk).
|
||||||
|
match std::fs::metadata(context.output_path) {
|
||||||
|
Ok(m) if m.len() > 0 => {
|
||||||
|
if let Err(e) = std::fs::remove_file(input_path) {
|
||||||
|
tracing::warn!("Failed to delete source {:?}: {}", input_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Job {}: Output file {:?} is empty after promotion — source preserved to prevent data loss",
|
||||||
|
job_id, context.output_path
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Job {}: Cannot verify output {:?} after promotion ({}). Source preserved to prevent data loss",
|
||||||
|
job_id, context.output_path, e
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -450,44 +450,24 @@ impl Agent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Gracefully shutdown the agent.
|
/// Gracefully shutdown the agent.
|
||||||
/// Drains active jobs and waits up to `timeout` for them to complete.
|
/// Cancels active jobs immediately and returns quickly.
|
||||||
/// After timeout, forcefully cancels remaining jobs.
|
pub async fn graceful_shutdown(&self) {
|
||||||
pub async fn graceful_shutdown(&self, timeout: std::time::Duration) {
|
info!("Initiating rapid shutdown...");
|
||||||
info!("Initiating graceful shutdown...");
|
|
||||||
|
|
||||||
// Stop accepting new jobs
|
// Stop accepting new jobs
|
||||||
self.pause();
|
self.pause();
|
||||||
self.drain();
|
|
||||||
|
|
||||||
// Wait for active jobs to complete (with timeout)
|
// Immediately force cancel remaining jobs
|
||||||
let start = std::time::Instant::now();
|
|
||||||
let check_interval = std::time::Duration::from_millis(500);
|
|
||||||
|
|
||||||
while start.elapsed() < timeout {
|
|
||||||
let active = self.orchestrator.active_job_count();
|
|
||||||
if active == 0 {
|
|
||||||
info!("All jobs completed gracefully.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"Waiting for {} active job(s) to complete... ({:.0}s remaining)",
|
|
||||||
active,
|
|
||||||
(timeout - start.elapsed()).as_secs_f64()
|
|
||||||
);
|
|
||||||
tokio::time::sleep(check_interval).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timeout reached - force cancel remaining jobs
|
|
||||||
let cancelled = self.orchestrator.cancel_all_jobs();
|
let cancelled = self.orchestrator.cancel_all_jobs();
|
||||||
if cancelled > 0 {
|
if cancelled > 0 {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"Shutdown timeout reached. Forcefully cancelled {} job(s).",
|
"Fast shutdown requested. Forcefully cancelled {} job(s).",
|
||||||
cancelled
|
cancelled
|
||||||
);
|
);
|
||||||
// Give FFmpeg processes a moment to terminate
|
// Give FFmpeg processes a moment to terminate and Tokio to flush DB statuses
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Graceful shutdown complete.");
|
info!("Rapid shutdown complete.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,23 @@
|
|||||||
|
use crate::config::Config;
|
||||||
use crate::db::{AlchemistEvent, Db, NotificationTarget};
|
use crate::db::{AlchemistEvent, Db, NotificationTarget};
|
||||||
use reqwest::{Client, Url, redirect::Policy};
|
use reqwest::{Client, Url, redirect::Policy};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::lookup_host;
|
use tokio::net::lookup_host;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::{RwLock, broadcast};
|
||||||
use tracing::{error, warn};
|
use tracing::{error, warn};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct NotificationManager {
|
pub struct NotificationManager {
|
||||||
db: Db,
|
db: Db,
|
||||||
|
config: Arc<RwLock<Config>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NotificationManager {
|
impl NotificationManager {
|
||||||
pub fn new(db: Db) -> Self {
|
pub fn new(db: Db, config: Arc<RwLock<Config>>) -> Self {
|
||||||
Self { db }
|
Self { db, config }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_listener(&self, mut rx: broadcast::Receiver<AlchemistEvent>) {
|
pub fn start_listener(&self, mut rx: broadcast::Receiver<AlchemistEvent>) {
|
||||||
@@ -111,17 +114,28 @@ impl NotificationManager {
|
|||||||
.ok_or("notification endpoint host is missing")?;
|
.ok_or("notification endpoint host is missing")?;
|
||||||
let port = url.port_or_known_default().ok_or("invalid port")?;
|
let port = url.port_or_known_default().ok_or("invalid port")?;
|
||||||
|
|
||||||
if host.eq_ignore_ascii_case("localhost") {
|
let allow_local = self.config.read().await.notifications.allow_local_notifications;
|
||||||
|
|
||||||
|
if !allow_local && host.eq_ignore_ascii_case("localhost") {
|
||||||
return Err("localhost is not allowed as a notification endpoint".into());
|
return Err("localhost is not allowed as a notification endpoint".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let addr = format!("{}:{}", host, port);
|
let addr = format!("{}:{}", host, port);
|
||||||
let ips = tokio::time::timeout(Duration::from_secs(3), lookup_host(&addr)).await??;
|
let ips = tokio::time::timeout(Duration::from_secs(3), lookup_host(&addr)).await??;
|
||||||
let target_ip = ips
|
|
||||||
.into_iter()
|
let target_ip = if allow_local {
|
||||||
.map(|a| a.ip())
|
// When local notifications are allowed, accept any resolved IP
|
||||||
.find(|ip| !is_private_ip(*ip))
|
ips.into_iter()
|
||||||
.ok_or("no public IP address found for notification endpoint")?;
|
.map(|a| a.ip())
|
||||||
|
.next()
|
||||||
|
.ok_or("no IP address found for notification endpoint")?
|
||||||
|
} else {
|
||||||
|
// When local notifications are blocked, only use public IPs
|
||||||
|
ips.into_iter()
|
||||||
|
.map(|a| a.ip())
|
||||||
|
.find(|ip| !is_private_ip(*ip))
|
||||||
|
.ok_or("no public IP address found for notification endpoint")?
|
||||||
|
};
|
||||||
|
|
||||||
// Pin the request to the validated IP to prevent DNS rebinding
|
// Pin the request to the validated IP to prevent DNS rebinding
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
@@ -324,7 +338,10 @@ mod tests {
|
|||||||
db_path.push(format!("alchemist_notifications_test_{}.db", token));
|
db_path.push(format!("alchemist_notifications_test_{}.db", token));
|
||||||
|
|
||||||
let db = Db::new(db_path.to_string_lossy().as_ref()).await?;
|
let db = Db::new(db_path.to_string_lossy().as_ref()).await?;
|
||||||
let manager = NotificationManager::new(db);
|
let mut test_config = crate::config::Config::default();
|
||||||
|
test_config.notifications.allow_local_notifications = true;
|
||||||
|
let config = Arc::new(RwLock::new(test_config));
|
||||||
|
let manager = NotificationManager::new(db, config);
|
||||||
|
|
||||||
let listener = match TcpListener::bind("127.0.0.1:0").await {
|
let listener = match TcpListener::bind("127.0.0.1:0").await {
|
||||||
Ok(listener) => listener,
|
Ok(listener) => listener,
|
||||||
|
|||||||
@@ -294,32 +294,46 @@ impl Transcoder {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
line_res = reader.next_line() => {
|
line_res_timeout = tokio::time::timeout(tokio::time::Duration::from_secs(600), reader.next_line()) => {
|
||||||
match line_res {
|
match line_res_timeout {
|
||||||
Ok(Some(line)) => {
|
Ok(line_res) => match line_res {
|
||||||
let line = if line.len() > 4096 {
|
Ok(Some(line)) => {
|
||||||
format!("{}...[truncated]", &line[..4096])
|
let line = if line.len() > 4096 {
|
||||||
} else {
|
format!("{}...[truncated]", &line[..4096])
|
||||||
line
|
} else {
|
||||||
};
|
line
|
||||||
last_lines.push_back(line.clone());
|
};
|
||||||
if last_lines.len() > 20 {
|
last_lines.push_back(line.clone());
|
||||||
last_lines.pop_front();
|
if last_lines.len() > 20 {
|
||||||
}
|
last_lines.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(observer) = observer.as_ref() {
|
if let Some(observer) = observer.as_ref() {
|
||||||
observer.on_log(line.clone()).await;
|
observer.on_log(line.clone()).await;
|
||||||
|
|
||||||
if let Some(total_duration) = total_duration {
|
if let Some(total_duration) = total_duration {
|
||||||
if let Some(progress) = progress_state.ingest_line(&line) {
|
if let Some(progress) = progress_state.ingest_line(&line) {
|
||||||
observer.on_progress(progress, total_duration).await;
|
observer.on_progress(progress, total_duration).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(None) => break,
|
||||||
Ok(None) => break,
|
Err(e) => {
|
||||||
Err(e) => {
|
error!("Error reading FFmpeg stderr: {}", e);
|
||||||
error!("Error reading FFmpeg stderr: {}", e);
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Job {:?} stalled: No output from FFmpeg for 10 minutes. Killing process...", job_id);
|
||||||
|
let _ = child.kill().await;
|
||||||
|
killed = true;
|
||||||
|
if let Some(id) = job_id {
|
||||||
|
match self.cancel_channels.lock() {
|
||||||
|
Ok(mut channels) => { channels.remove(&id); }
|
||||||
|
Err(e) => { e.into_inner().remove(&id); }
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -235,23 +235,45 @@ pub(crate) fn get_cookie_value(headers: &axum::http::HeaderMap, name: &str) -> O
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn request_ip(req: &Request) -> Option<IpAddr> {
|
pub(crate) fn request_ip(req: &Request) -> Option<IpAddr> {
|
||||||
if let Some(xff) = req.headers().get("X-Forwarded-For") {
|
let peer_ip = req
|
||||||
if let Ok(xff_str) = xff.to_str() {
|
.extensions()
|
||||||
if let Some(ip_str) = xff_str.split(',').next() {
|
.get::<ConnectInfo<SocketAddr>>()
|
||||||
if let Ok(ip) = ip_str.trim().parse() {
|
.map(|info| info.0.ip());
|
||||||
return Some(ip);
|
|
||||||
|
// Only trust proxy headers (X-Forwarded-For, X-Real-IP) when the direct
|
||||||
|
// TCP peer is a loopback or private IP — i.e., a trusted reverse proxy.
|
||||||
|
// This prevents external attackers from spoofing these headers to bypass
|
||||||
|
// rate limiting.
|
||||||
|
if let Some(peer) = peer_ip {
|
||||||
|
if is_trusted_peer(peer) {
|
||||||
|
if let Some(xff) = req.headers().get("X-Forwarded-For") {
|
||||||
|
if let Ok(xff_str) = xff.to_str() {
|
||||||
|
if let Some(ip_str) = xff_str.split(',').next() {
|
||||||
|
if let Ok(ip) = ip_str.trim().parse() {
|
||||||
|
return Some(ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(xri) = req.headers().get("X-Real-IP") {
|
||||||
|
if let Ok(xri_str) = xri.to_str() {
|
||||||
|
if let Ok(ip) = xri_str.trim().parse() {
|
||||||
|
return Some(ip);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(xri) = req.headers().get("X-Real-IP") {
|
|
||||||
if let Ok(xri_str) = xri.to_str() {
|
peer_ip
|
||||||
if let Ok(ip) = xri_str.trim().parse() {
|
}
|
||||||
return Some(ip);
|
|
||||||
}
|
/// Returns true if the peer IP is a loopback or private address,
|
||||||
}
|
/// meaning it is likely a local reverse proxy that can be trusted
|
||||||
}
|
/// to set forwarded headers.
|
||||||
req.extensions()
|
fn is_trusted_peer(ip: IpAddr) -> bool {
|
||||||
.get::<ConnectInfo<SocketAddr>>()
|
match ip {
|
||||||
.map(|info| info.0.ip())
|
IpAddr::V4(v4) => v4.is_loopback() || v4.is_private() || v4.is_link_local(),
|
||||||
|
IpAddr::V6(v6) => v6.is_loopback() || v6.is_unique_local() || v6.is_unicast_link_local(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -84,6 +84,7 @@ pub struct AppState {
|
|||||||
pub resources_cache: Arc<tokio::sync::Mutex<Option<(serde_json::Value, std::time::Instant)>>>,
|
pub resources_cache: Arc<tokio::sync::Mutex<Option<(serde_json::Value, std::time::Instant)>>>,
|
||||||
pub(crate) login_rate_limiter: Mutex<HashMap<IpAddr, RateLimitEntry>>,
|
pub(crate) login_rate_limiter: Mutex<HashMap<IpAddr, RateLimitEntry>>,
|
||||||
pub(crate) global_rate_limiter: Mutex<HashMap<IpAddr, RateLimitEntry>>,
|
pub(crate) global_rate_limiter: Mutex<HashMap<IpAddr, RateLimitEntry>>,
|
||||||
|
pub(crate) sse_connections: Arc<std::sync::atomic::AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RunServerArgs {
|
pub struct RunServerArgs {
|
||||||
@@ -164,6 +165,7 @@ pub async fn run_server(args: RunServerArgs) -> Result<()> {
|
|||||||
resources_cache: Arc::new(tokio::sync::Mutex::new(None)),
|
resources_cache: Arc::new(tokio::sync::Mutex::new(None)),
|
||||||
login_rate_limiter: Mutex::new(HashMap::new()),
|
login_rate_limiter: Mutex::new(HashMap::new()),
|
||||||
global_rate_limiter: Mutex::new(HashMap::new()),
|
global_rate_limiter: Mutex::new(HashMap::new()),
|
||||||
|
sse_connections: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Clone agent for shutdown handler before moving state into router
|
// Clone agent for shutdown handler before moving state into router
|
||||||
@@ -265,9 +267,9 @@ pub async fn run_server(args: RunServerArgs) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Give active jobs up to 5 minutes to complete
|
// Forceful immediate shutdown of active jobs
|
||||||
shutdown_agent
|
shutdown_agent
|
||||||
.graceful_shutdown(std::time::Duration::from_secs(300))
|
.graceful_shutdown()
|
||||||
.await;
|
.await;
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -171,9 +171,40 @@ pub(crate) fn sse_unified_stream(
|
|||||||
])
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum concurrent SSE connections to prevent resource exhaustion.
|
||||||
|
const MAX_SSE_CONNECTIONS: usize = 50;
|
||||||
|
|
||||||
|
/// RAII guard that decrements the SSE connection counter on drop.
|
||||||
|
struct SseConnectionGuard(Arc<std::sync::atomic::AtomicUsize>);
|
||||||
|
|
||||||
|
impl Drop for SseConnectionGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn sse_handler(
|
pub(crate) async fn sse_handler(
|
||||||
State(state): State<Arc<AppState>>,
|
State(state): State<Arc<AppState>>,
|
||||||
) -> Sse<impl Stream<Item = std::result::Result<AxumEvent, Infallible>>> {
|
) -> std::result::Result<
|
||||||
|
Sse<impl Stream<Item = std::result::Result<AxumEvent, Infallible>>>,
|
||||||
|
axum::http::StatusCode,
|
||||||
|
> {
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
// Enforce connection limit
|
||||||
|
let current = state.sse_connections.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if current >= MAX_SSE_CONNECTIONS {
|
||||||
|
state.sse_connections.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
warn!(
|
||||||
|
"SSE connection limit reached ({}/{}). Rejecting new connection.",
|
||||||
|
current, MAX_SSE_CONNECTIONS
|
||||||
|
);
|
||||||
|
return Err(axum::http::StatusCode::TOO_MANY_REQUESTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// RAII guard to decrement the counter when the stream is dropped
|
||||||
|
let guard = Arc::new(SseConnectionGuard(state.sse_connections.clone()));
|
||||||
|
|
||||||
// Subscribe to all channels
|
// Subscribe to all channels
|
||||||
let job_rx = state.event_channels.jobs.subscribe();
|
let job_rx = state.event_channels.jobs.subscribe();
|
||||||
let config_rx = state.event_channels.config.subscribe();
|
let config_rx = state.event_channels.config.subscribe();
|
||||||
@@ -182,10 +213,13 @@ pub(crate) async fn sse_handler(
|
|||||||
// Create unified stream from new typed channels
|
// Create unified stream from new typed channels
|
||||||
let unified_stream = sse_unified_stream(job_rx, config_rx, system_rx);
|
let unified_stream = sse_unified_stream(job_rx, config_rx, system_rx);
|
||||||
|
|
||||||
let stream = unified_stream.map(|message| match message {
|
let stream = unified_stream.map(move |message| {
|
||||||
Ok(message) => Ok(message.into()),
|
let _guard = guard.clone(); // keep the guard alive as long as the stream lives
|
||||||
Err(never) => match never {},
|
match message {
|
||||||
|
Ok(message) => Ok(message.into()),
|
||||||
|
Err(never) => match never {},
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
|
Ok(Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -107,6 +107,7 @@ where
|
|||||||
telemetry_runtime_id: "test-runtime".to_string(),
|
telemetry_runtime_id: "test-runtime".to_string(),
|
||||||
notification_manager: Arc::new(crate::notifications::NotificationManager::new(
|
notification_manager: Arc::new(crate::notifications::NotificationManager::new(
|
||||||
db.as_ref().clone(),
|
db.as_ref().clone(),
|
||||||
|
config.clone(),
|
||||||
)),
|
)),
|
||||||
sys: Mutex::new(sys),
|
sys: Mutex::new(sys),
|
||||||
file_watcher,
|
file_watcher,
|
||||||
@@ -118,6 +119,7 @@ where
|
|||||||
resources_cache: Arc::new(tokio::sync::Mutex::new(None)),
|
resources_cache: Arc::new(tokio::sync::Mutex::new(None)),
|
||||||
login_rate_limiter: Mutex::new(HashMap::new()),
|
login_rate_limiter: Mutex::new(HashMap::new()),
|
||||||
global_rate_limiter: Mutex::new(HashMap::new()),
|
global_rate_limiter: Mutex::new(HashMap::new()),
|
||||||
|
sse_connections: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok((state.clone(), app_router(state), config_path, db_path))
|
Ok((state.clone(), app_router(state), config_path, db_path))
|
||||||
|
|||||||
@@ -266,25 +266,20 @@ fn preview_blocking(request: FsPreviewRequest) -> Result<FsPreviewResponse> {
|
|||||||
let exists = canonical.exists();
|
let exists = canonical.exists();
|
||||||
let readable = exists && canonical.is_dir() && std::fs::read_dir(&canonical).is_ok();
|
let readable = exists && canonical.is_dir() && std::fs::read_dir(&canonical).is_ok();
|
||||||
|
|
||||||
let media_files = if readable {
|
// Scan once and reuse results for both count and samples
|
||||||
scanner
|
let scan_results = if readable {
|
||||||
.scan_with_recursion(vec![(canonical.clone(), true)])
|
scanner.scan_with_recursion(vec![(canonical.clone(), true)])
|
||||||
.len()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
total_media_files += media_files;
|
|
||||||
|
|
||||||
let sample_files = if readable {
|
|
||||||
scanner
|
|
||||||
.scan_with_recursion(vec![(canonical.clone(), true)])
|
|
||||||
.into_iter()
|
|
||||||
.take(5)
|
|
||||||
.map(|media| media.path.to_string_lossy().to_string())
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
} else {
|
} else {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
};
|
};
|
||||||
|
let media_files = scan_results.len();
|
||||||
|
total_media_files += media_files;
|
||||||
|
|
||||||
|
let sample_files = scan_results
|
||||||
|
.into_iter()
|
||||||
|
.take(5)
|
||||||
|
.map(|media| media.path.to_string_lossy().to_string())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let mut dir_warnings = directory_warnings(&canonical, readable);
|
let mut dir_warnings = directory_warnings(&canonical, readable);
|
||||||
if readable && media_files == 0 {
|
if readable && media_files == 0 {
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import { apiJson, isApiError } from "../lib/api";
|
|||||||
import { useSharedStats } from "../lib/statsStore";
|
import { useSharedStats } from "../lib/statsStore";
|
||||||
import { showToast } from "../lib/toast";
|
import { showToast } from "../lib/toast";
|
||||||
import ResourceMonitor from "./ResourceMonitor";
|
import ResourceMonitor from "./ResourceMonitor";
|
||||||
|
import { withErrorBoundary } from "./ErrorBoundary";
|
||||||
|
|
||||||
interface Job {
|
interface Job {
|
||||||
id: number;
|
id: number;
|
||||||
@@ -76,7 +77,7 @@ function StatCard({ label, value, icon: Icon, colorClass }: StatCardProps) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export default function Dashboard() {
|
function Dashboard() {
|
||||||
const [jobs, setJobs] = useState<Job[]>([]);
|
const [jobs, setJobs] = useState<Job[]>([]);
|
||||||
const [jobsLoading, setJobsLoading] = useState(true);
|
const [jobsLoading, setJobsLoading] = useState(true);
|
||||||
const [bundle, setBundle] = useState<SettingsBundleResponse | null>(null);
|
const [bundle, setBundle] = useState<SettingsBundleResponse | null>(null);
|
||||||
@@ -366,3 +367,5 @@ export default function Dashboard() {
|
|||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export default withErrorBoundary(Dashboard, "Dashboard");
|
||||||
|
|||||||
69
web/src/components/ErrorBoundary.tsx
Normal file
69
web/src/components/ErrorBoundary.tsx
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
import React, { Component, type ReactNode } from "react";
|
||||||
|
import { AlertCircle } from "lucide-react";
|
||||||
|
|
||||||
|
interface Props {
|
||||||
|
children: ReactNode;
|
||||||
|
fallback?: ReactNode;
|
||||||
|
moduleName?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface State {
|
||||||
|
hasError: boolean;
|
||||||
|
errorMessage: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ErrorBoundary extends Component<Props, State> {
|
||||||
|
public state: State = {
|
||||||
|
hasError: false,
|
||||||
|
errorMessage: "",
|
||||||
|
};
|
||||||
|
|
||||||
|
public static getDerivedStateFromError(error: Error): State {
|
||||||
|
return { hasError: true, errorMessage: error.message };
|
||||||
|
}
|
||||||
|
|
||||||
|
public componentDidCatch(error: Error, errorInfo: React.ErrorInfo) {
|
||||||
|
console.error("Uncaught error in ErrorBoundary:", error, errorInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public render() {
|
||||||
|
if (this.state.hasError) {
|
||||||
|
if (this.props.fallback) {
|
||||||
|
return this.props.fallback;
|
||||||
|
}
|
||||||
|
return (
|
||||||
|
<div className="flex flex-col items-center justify-center p-8 bg-helios-background border border-helios-red/50 rounded-lg shadow-sm text-center w-full min-h-[300px]">
|
||||||
|
<AlertCircle className="w-12 h-12 text-helios-red mb-4" />
|
||||||
|
<h2 className="text-xl font-bold text-white mb-2">Something went wrong</h2>
|
||||||
|
<p className="text-helios-text/70 mb-4 max-w-md">
|
||||||
|
The {this.props.moduleName || "component"} encountered an unexpected error and could not be displayed.
|
||||||
|
</p>
|
||||||
|
<div className="text-xs text-helios-red/80 font-mono bg-helios-red/10 p-4 rounded w-full overflow-auto max-w-lg mb-6 text-left break-words max-h-32">
|
||||||
|
{this.state.errorMessage}
|
||||||
|
</div>
|
||||||
|
<button
|
||||||
|
onClick={() => window.location.reload()}
|
||||||
|
className="px-6 py-2 bg-helios-orange hover:bg-helios-orange/80 text-white font-medium rounded transition"
|
||||||
|
>
|
||||||
|
Reload Page
|
||||||
|
</button>
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.props.children;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const withErrorBoundary = <P extends object>(
|
||||||
|
WrappedComponent: React.ComponentType<P>,
|
||||||
|
moduleName?: string
|
||||||
|
) => {
|
||||||
|
return function WithErrorBoundary(props: P) {
|
||||||
|
return (
|
||||||
|
<ErrorBoundary moduleName={moduleName}>
|
||||||
|
<WrappedComponent {...props} />
|
||||||
|
</ErrorBoundary>
|
||||||
|
);
|
||||||
|
};
|
||||||
|
};
|
||||||
@@ -11,6 +11,7 @@ import ConfirmDialog from "./ui/ConfirmDialog";
|
|||||||
import { clsx, type ClassValue } from "clsx";
|
import { clsx, type ClassValue } from "clsx";
|
||||||
import { twMerge } from "tailwind-merge";
|
import { twMerge } from "tailwind-merge";
|
||||||
import { motion, AnimatePresence } from "framer-motion";
|
import { motion, AnimatePresence } from "framer-motion";
|
||||||
|
import { withErrorBoundary } from "./ErrorBoundary";
|
||||||
|
|
||||||
function cn(...inputs: ClassValue[]) {
|
function cn(...inputs: ClassValue[]) {
|
||||||
return twMerge(clsx(inputs));
|
return twMerge(clsx(inputs));
|
||||||
@@ -364,7 +365,7 @@ const SORT_OPTIONS: Array<{ value: SortField; label: string }> = [
|
|||||||
{ value: "size", label: "File Size" },
|
{ value: "size", label: "File Size" },
|
||||||
];
|
];
|
||||||
|
|
||||||
export default function JobManager() {
|
function JobManager() {
|
||||||
const [jobs, setJobs] = useState<Job[]>([]);
|
const [jobs, setJobs] = useState<Job[]>([]);
|
||||||
const [loading, setLoading] = useState(true);
|
const [loading, setLoading] = useState(true);
|
||||||
const [selected, setSelected] = useState<Set<number>>(new Set());
|
const [selected, setSelected] = useState<Set<number>>(new Set());
|
||||||
@@ -1769,3 +1770,5 @@ export default function JobManager() {
|
|||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export default withErrorBoundary(JobManager, "Job Management");
|
||||||
|
|||||||
38
web/src/pages/500.astro
Normal file
38
web/src/pages/500.astro
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
---
|
||||||
|
import Layout from "../layouts/Layout.astro";
|
||||||
|
import { AlertTriangle } from "lucide-react";
|
||||||
|
|
||||||
|
interface Props {
|
||||||
|
error: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { error } = Astro.props;
|
||||||
|
---
|
||||||
|
|
||||||
|
<Layout title="Alchemist | Server Error">
|
||||||
|
<div class="min-h-screen bg-helios-background flex items-center justify-center p-6 pb-24">
|
||||||
|
<div class="max-w-md w-full flex flex-col items-center text-center">
|
||||||
|
<div class="w-16 h-16 rounded-full bg-helios-red/10 flex items-center justify-center mb-6">
|
||||||
|
<AlertTriangle className="w-8 h-8 text-helios-red" />
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<h1 class="text-3xl font-bold text-white mb-3">500 Server Error</h1>
|
||||||
|
<p class="text-helios-text mb-8">
|
||||||
|
Alchemist encountered an internal error. Please check the backend logs.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
{error instanceof Error ? (
|
||||||
|
<div class="bg-black/50 border border-white/5 rounded-lg p-4 mb-8 w-full overflow-auto text-left">
|
||||||
|
<p class="text-helios-red/90 font-mono text-sm break-words">{error.message}</p>
|
||||||
|
</div>
|
||||||
|
) : null}
|
||||||
|
|
||||||
|
<a
|
||||||
|
href="/"
|
||||||
|
class="px-6 py-2.5 bg-helios-orange hover:bg-helios-orange/90 text-white font-medium rounded-md transition-colors"
|
||||||
|
>
|
||||||
|
Return to Dashboard
|
||||||
|
</a>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</Layout>
|
||||||
Reference in New Issue
Block a user