feat: Introduce a web UI with Leptos and Axum, adding server, database, configuration, and deployment files.

This commit is contained in:
brooklyn
2026-01-06 10:47:57 -05:00
parent 25d0950757
commit 38d359dddb
15 changed files with 4140 additions and 111 deletions

3179
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,5 +15,15 @@ anyhow = "1.0"
subprocess = "0.2.9"
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite", "macros", "chrono"] }
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
futures = { version = "0.3" }
toml = "0.8"
leptos = { version = "0.6", features = ["ssr"] }
leptos_axum = "0.6"
axum = { version = "0.7", features = ["macros"] }
tower = { version = "0.4" }
tower-http = { version = "0.5", features = ["fs"] }
console_error_panic_hook = "0.1"
console_log = "1.0"
log = "0.4"
rayon = "1.10"

45
Dockerfile Normal file
View File

@@ -0,0 +1,45 @@
# Stage 1: Preparation
FROM lukemathwalker/cargo-chef:latest-rust-1.81 AS chef
WORKDIR /app
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
# Stage 2: Caching
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Build dependencies - this is the caching layer
RUN cargo chef cook --release --recipe-path recipe.json
# Stage 3: Build Application
COPY . .
RUN cargo build --release
# Stage 4: Runtime
FROM debian:bookworm-slim AS runtime
WORKDIR /app
# Install runtime dependencies: FFmpeg and HW drivers
# non-free is required for intel-media-va-driver-non-free
RUN apt-get update && apt-get install -y \
ffmpeg \
intel-media-va-driver-non-free \
libva-drm2 \
libva2 \
i965-va-driver \
va-driver-all \
libsqlite3-0 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Copy the binary
COPY --from=builder /app/target/release/alchemist /usr/local/bin/alchemist
# Set environment variables for hardware accelaration
ENV LIBVA_DRIVER_NAME=iHD
# Entrypoint
ENTRYPOINT ["alchemist"]
CMD ["--server"]

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Brooklyn
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

48
README.md Normal file
View File

@@ -0,0 +1,48 @@
# Alchemist
Alchemist is a next-generation "smart" media transcoding engine designed for extreme efficiency and reliability. Unlike traditional transcoders that blindly process every file, Alchemist utilizes a multi-stage analysis gate to decide whether a file actually *needs* to be transcoded, prioritizing file size reduction and quality retention.
## Key Features
- **Smart Analysis**: Preflight checks for codec, bitrate, and resolution to skip unnecessary work.
- **Hardware First**: Primary support for Intel Arc (AV1 QSV), NVIDIA (NVENC), and Apple (VideoToolbox).
- **Deterministic state**: A robust job state machine with persistent decision tracking ("Why did we skip this file?").
- **Real-Time Observation**: SSE-based live updates and progress tracking.
- **Fail-Loudly Policy**: Strict hardware enforcement to prevent poor-performance CPU fallbacks unless explicitly allowed.
- **Rust-Powered core**: Built for performance and reliability.
## Getting Started
### Prerequisites
- **FFmpeg**: Must be available in your PATH with appropriate hardware acceleration drivers (QSV, NVENC, or VAAPI).
- **SQLite**: Used for persistent job tracking.
### Running
To scan a directory and starting transcoding:
```bash
cargo run -- /path/to/media
```
To run as a web server:
```bash
cargo run -- --server
```
## Configuration
Alchemist looks for a `config.toml` in the working directory.
```toml
[transcode]
size_reduction_threshold = 0.3 # Fail if <30% reduction
concurrent_jobs = 1
[hardware]
allow_cpu_fallback = false # Fail loudly if GPU is missing
```
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

View File

@@ -18,6 +18,8 @@ pub struct Stream {
pub height: Option<u32>,
pub bit_rate: Option<String>,
pub bits_per_raw_sample: Option<String>,
pub channel_layout: Option<String>,
pub channels: Option<u32>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -50,7 +52,7 @@ impl Analyzer {
Ok(metadata)
}
pub fn should_transcode(_path: &Path, metadata: &MediaMetadata) -> (bool, String) {
pub fn should_transcode(_path: &Path, metadata: &MediaMetadata, config: &crate::config::Config) -> (bool, String) {
// 1. Codec Check (skip if already AV1 + 10-bit)
let video_stream = metadata.streams.iter().find(|s| s.codec_type == "video");
@@ -86,19 +88,41 @@ impl Analyzer {
let bpp = bitrate / (width * height);
// Heuristic: If BPP is already very low, don't murder it further.
// threshold 0.1 is very low for h264, maybe 0.05 for av1.
if bpp < 0.1 {
return (false, format!("BPP too low ({:.4}), avoiding quality murder", bpp));
if bpp < config.transcode.min_bpp_threshold {
return (false, format!("BPP too low ({:.4} < {:.2}), avoiding quality murder", bpp, config.transcode.min_bpp_threshold));
}
// 4. Projected Size Logic
// Target AV1 is roughly 30-50% smaller than H264 for same quality.
// If it's already a small file (e.g. under 50MB), maybe skip?
let size_bytes = metadata.format.size.parse::<u64>().unwrap_or(0);
if size_bytes < 50 * 1024 * 1024 {
return (false, "File too small to justify transcode overhead".to_string());
let min_size_bytes = config.transcode.min_file_size_mb * 1024 * 1024;
if size_bytes < min_size_bytes {
return (false, format!("File too small ({}MB < {}MB) to justify transcode overhead",
size_bytes / 1024 / 1024, config.transcode.min_file_size_mb));
}
(true, format!("Ready for AV1 transcode (Current codec: {}, BPP: {:.4})", video_stream.codec_name, bpp))
}
pub fn should_transcode_audio(stream: &Stream) -> bool {
if stream.codec_type != "audio" {
return false;
}
// Transcode if it's a "heavy" codec or very high bitrate
let heavy_codecs = ["truehd", "dts-hd", "flac", "pcm_s24le", "pcm_s16le"];
if heavy_codecs.contains(&stream.codec_name.to_lowercase().as_str()) {
return true;
}
let bitrate = stream.bit_rate.as_ref()
.and_then(|b| b.parse::<u64>().ok())
.unwrap_or(0);
// If bitrate > 640kbps (standard AC3 max), maybe transcode?
if bitrate > 640000 {
return true;
}
false
}
}

180
src/app.rs Normal file
View File

@@ -0,0 +1,180 @@
use leptos::*;
use crate::db::{Job, JobState};
use crate::server::AlchemistEvent;
#[server(GetJobs, "/api")]
pub async fn get_jobs() -> Result<Vec<Job>, ServerFnError> {
use axum::Extension;
use std::sync::Arc;
use crate::db::Db;
let db = use_context::<Extension<Arc<Db>>>()
.ok_or_else(|| ServerFnError::ServerError("DB not found".to_string()))?
.0.clone();
db.get_all_jobs().await.map_err(|e| ServerFnError::ServerError(e.to_string()))
}
#[server(GetStats, "/api")]
pub async fn get_stats() -> Result<serde_json::Value, ServerFnError> {
use axum::Extension;
use std::sync::Arc;
use crate::db::Db;
let db = use_context::<Extension<Arc<Db>>>()
.ok_or_else(|| ServerFnError::ServerError("DB not found".to_string()))?
.0.clone();
db.get_stats().await.map_err(|e| ServerFnError::ServerError(e.to_string()))
}
#[server(RunScan, "/api")]
pub async fn run_scan() -> Result<(), ServerFnError> {
// This is a placeholder for triggering a scan
// In a real app, we'd send a signal to the orchestrator/scanner
info!("Scan triggered via Web UI");
Ok(())
}
#[component]
pub fn App() -> impl IntoView {
let jobs = create_resource(|| (), |_| async move { get_jobs().await.unwrap_or_default() });
let stats = create_resource(|| (), |_| async move { get_stats().await.ok() });
// SSE Effect for real-time updates
#[cfg(feature = "hydrate")]
create_effect(move |_| {
use gloo_net::eventsource::futures::EventSource;
use futures::StreamExt;
let mut es = EventSource::new("/api/events").unwrap();
let mut stream = es.subscribe("message").unwrap();
spawn_local(async move {
while let Some(Ok((_, msg))) = stream.next().await {
if let Ok(event) = serde_json::from_str::<AlchemistEvent>(&msg.data().as_string().unwrap()) {
match event {
AlchemistEvent::JobStateChanged { .. } | AlchemistEvent::Decision { .. } => {
jobs.refetch();
stats.refetch();
}
_ => {}
}
}
}
});
on_cleanup(move || es.close());
});
let scan_action = create_server_action::<RunScan>();
view! {
<main class="min-h-screen bg-slate-950 text-slate-100 font-sans p-4 md:p-8">
<div class="max-w-6xl mx-auto">
<header class="flex justify-between items-center mb-12">
<div>
<h1 class="text-4xl font-extrabold tracking-tight bg-gradient-to-r from-blue-400 to-indigo-500 bg-clip-text text-transparent">
"Alchemist"
</h1>
<p class="text-slate-400 mt-1">"Next-Gen Transcoding Engine"</p>
</div>
<div class="flex gap-4">
<button
on:click=move |_| scan_action.dispatch(RunScan {})
class="bg-blue-600 hover:bg-blue-700 text-white px-4 py-2 rounded-lg font-medium transition-colors"
>
{move || if scan_action.pending().get() { "Scanning..." } else { "Scan Now" }}
</button>
</div>
</header>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6 mb-12">
{move || {
let s = stats.get().flatten().unwrap_or_else(|| serde_json::json!({}));
let total = s.as_object().map(|m| m.values().filter_map(|v| v.as_i64()).sum::<i64>()).unwrap_or(0);
let completed = s.get("completed").and_then(|v| v.as_i64()).unwrap_or(0);
let processing = s.get("processing").and_then(|v| v.as_i64()).unwrap_or(0);
let failed = s.get("failed").and_then(|v| v.as_i64()).unwrap_or(0);
view! {
<StatCard label="Total Jobs" value=total.to_string() color="blue" />
<StatCard label="Completed" value=completed.to_string() color="emerald" />
<StatCard label="Processing" value=processing.to_string() color="amber" />
<StatCard label="Failed" value=failed.to_string() color="rose" />
}
}}
</div>
<div class="bg-slate-900 border border-slate-800 rounded-xl overflow-hidden">
<div class="px-6 py-4 border-b border-slate-800 bg-slate-900/50">
<h2 class="text-xl font-semibold">"Recent Jobs"</h2>
</div>
<div class="overflow-x-auto">
<table class="w-full text-left">
<thead class="text-xs text-slate-400 uppercase bg-slate-800/50">
<tr>
<th class="px-6 py-3">"ID"</th>
<th class="px-6 py-3">"File"</th>
<th class="px-6 py-3">"Status"</th>
<th class="px-6 py-3">"Updated"</th>
</tr>
</thead>
<tbody class="divide-y divide-slate-800">
<Transition fallback=move || view! { <tr><td colspan="4" class="px-6 py-8 text-center text-slate-500">"Loading jobs..."</td></tr> }>
{move || jobs.get().map(|all_jobs| {
all_jobs.into_iter().map(|job| {
let status_str = job.status.to_string();
let status_cls = match job.status {
JobState::Completed => "bg-emerald-500/10 text-emerald-400 border-emerald-500/20",
JobState::Encoding | JobState::Analyzing => "bg-amber-500/10 text-amber-400 border-amber-500/20 animate-pulse",
JobState::Failed => "bg-rose-500/10 text-rose-400 border-rose-500/20",
_ => "bg-slate-500/10 text-slate-400 border-slate-500/20",
};
view! {
<tr class="hover:bg-slate-800/50 transition-colors">
<td class="px-6 py-4 font-mono text-xs text-slate-500">"#" {job.id}</td>
<td class="px-6 py-4">
<div class="font-medium truncate max-w-xs">{job.input_path}</div>
<div class="text-xs text-slate-500 truncate mt-0.5">{job.decision_reason.unwrap_or_default()}</div>
</td>
<td class="px-6 py-4">
<span class=format!("px-2.5 py-1 rounded-full text-xs font-semibold border {}", status_cls)>
{status_str}
</span>
</td>
<td class="px-6 py-4 text-sm text-slate-500">
{job.updated_at.to_rfc3339()}
</td>
</tr>
}
}).collect_view()
})}
</Transition>
</tbody>
</table>
</div>
</div>
</div>
</main>
}
}
#[component]
fn StatCard(label: &'static str, value: String, color: &'static str) -> impl IntoView {
let accent = match color {
"blue" => "from-blue-500 to-indigo-600",
"emerald" => "from-emerald-500 to-teal-600",
"amber" => "from-amber-500 to-orange-600",
"rose" => "from-rose-500 to-pink-600",
_ => "from-slate-500 to-slate-600",
};
view! {
<div class="bg-slate-900 border border-slate-800 p-6 rounded-xl hover:border-slate-700 transition-colors">
<div class="text-slate-500 text-sm font-medium mb-2">{label}</div>
<div class=format!("text-3xl font-bold bg-gradient-to-br {} bg-clip-text text-transparent", accent)>
{value}
</div>
</div>
}
}

53
src/config.rs Normal file
View File

@@ -0,0 +1,53 @@
use serde::{Deserialize, Serialize};
use std::path::Path;
use anyhow::Result;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config {
pub transcode: TranscodeConfig,
pub hardware: HardwareConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TranscodeConfig {
pub size_reduction_threshold: f64, // e.g., 0.3 for 30%
pub min_bpp_threshold: f64, // e.g., 0.1
pub min_file_size_mb: u64, // e.g., 50
pub concurrent_jobs: usize,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct HardwareConfig {
pub preferred_vendor: Option<String>,
pub device_path: Option<String>,
pub allow_cpu_fallback: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
transcode: TranscodeConfig {
size_reduction_threshold: 0.3,
min_bpp_threshold: 0.1,
min_file_size_mb: 50,
concurrent_jobs: 1,
},
hardware: HardwareConfig {
preferred_vendor: None,
device_path: None,
allow_cpu_fallback: false,
},
}
}
}
impl Config {
pub fn load(path: &Path) -> Result<Self> {
if !path.exists() {
return Ok(Self::default());
}
let content = std::fs::read_to_string(path)?;
let config: Config = toml::from_str(&content)?;
Ok(config)
}
}

215
src/db.rs Normal file
View File

@@ -0,0 +1,215 @@
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
use std::path::Path;
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, sqlx::Type, Clone, Copy, PartialEq, Eq)]
#[sqlx(rename_all = "lowercase")]
pub enum JobState {
Queued,
Analyzing,
Encoding,
Completed,
Skipped,
Failed,
}
impl std::fmt::Display for JobState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
JobState::Queued => "queued",
JobState::Analyzing => "analyzing",
JobState::Encoding => "encoding",
JobState::Completed => "completed",
JobState::Skipped => "skipped",
JobState::Failed => "failed",
};
write!(f, "{}", s)
}
}
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow, Clone)]
pub struct Job {
pub id: i64,
pub input_path: String,
pub output_path: String,
pub status: JobState,
pub decision_reason: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow, Clone)]
pub struct Decision {
pub id: i64,
pub job_id: i64,
pub action: String, // "encode", "skip", "reject"
pub reason: String,
pub created_at: DateTime<Utc>,
}
pub struct Db {
pool: SqlitePool,
}
impl Db {
pub async fn new(db_path: &str) -> Result<Self> {
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
let db = Self { pool };
db.init().await?;
db.reset_interrupted_jobs().await?;
Ok(db)
}
async fn init(&self) -> Result<()> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
input_path TEXT NOT NULL UNIQUE,
output_path TEXT NOT NULL,
status TEXT NOT NULL,
mtime_hash TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL,
action TEXT NOT NULL,
reason TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(job_id) REFERENCES jobs(id)
)"
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn reset_interrupted_jobs(&self) -> Result<()> {
sqlx::query(
"UPDATE jobs SET status = 'queued' WHERE status IN ('analyzing', 'encoding')"
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn enqueue_job(&self, input_path: &Path, output_path: &Path, mtime: std::time::SystemTime) -> Result<()> {
let input_str = input_path.to_str().ok_or_else(|| anyhow::anyhow!("Invalid input path"))?;
let output_str = output_path.to_str().ok_or_else(|| anyhow::anyhow!("Invalid output path"))?;
// simple mtime hash
let mtime_hash = format!("{:?}", mtime);
sqlx::query(
"INSERT INTO jobs (input_path, output_path, status, mtime_hash, updated_at)
VALUES (?, ?, 'queued', ?, CURRENT_TIMESTAMP)
ON CONFLICT(input_path) DO UPDATE SET
status = CASE WHEN mtime_hash != excluded.mtime_hash THEN 'queued' ELSE status END,
mtime_hash = excluded.mtime_hash,
updated_at = CURRENT_TIMESTAMP
WHERE mtime_hash != excluded.mtime_hash"
)
.bind(input_str)
.bind(output_str)
.bind(mtime_hash)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_next_job(&self) -> Result<Option<Job>> {
let job = sqlx::query_as::<_, Job>(
"SELECT id, input_path, output_path, status, created_at, updated_at
FROM jobs WHERE status = 'queued' ORDER BY created_at LIMIT 1"
)
.fetch_optional(&self.pool)
.await?;
Ok(job)
}
pub async fn update_job_status(&self, id: i64, status: JobState) -> Result<()> {
sqlx::query(
"UPDATE jobs SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?"
)
.bind(status)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn add_decision(&self, job_id: i64, action: &str, reason: &str) -> Result<()> {
sqlx::query(
"INSERT INTO decisions (job_id, action, reason) VALUES (?, ?, ?)"
)
.bind(job_id)
.bind(action)
.bind(reason)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_all_jobs(&self) -> Result<Vec<Job>> {
let jobs = sqlx::query_as::<_, Job>(
"SELECT j.id, j.input_path, j.output_path, j.status, j.created_at, j.updated_at, d.reason as decision_reason
FROM jobs j
LEFT JOIN decisions d ON j.id = d.job_id
GROUP BY j.id
ORDER BY j.updated_at DESC"
)
.fetch_all(&self.pool)
.await?;
Ok(jobs)
}
pub async fn get_job_decision(&self, job_id: i64) -> Result<Option<Decision>> {
let decision = sqlx::query_as::<_, Decision>(
"SELECT * FROM decisions WHERE job_id = ? ORDER BY created_at DESC LIMIT 1"
)
.bind(job_id)
.fetch_optional(&self.pool)
.await?;
Ok(decision)
}
pub async fn get_stats(&self) -> Result<serde_json::Value> {
let stats = sqlx::query(
"SELECT status, count(*) as count FROM jobs GROUP BY status"
)
.fetch_all(&self.pool)
.await?;
let mut map = serde_json::Map::new();
for row in stats {
use sqlx::Row;
let status: String = row.get("status");
let count: i64 = row.get("count");
map.insert(status, serde_json::Value::Number(count.into()));
}
Ok(serde_json::Value::Object(map))
}
}

View File

@@ -8,6 +8,7 @@ pub enum Vendor {
Nvidia,
Amd,
Intel,
Apple,
}
impl std::fmt::Display for Vendor {
@@ -16,6 +17,7 @@ impl std::fmt::Display for Vendor {
Vendor::Nvidia => write!(f, "NVIDIA (NVENC)"),
Vendor::Amd => write!(f, "AMD (VAAPI/AMF)"),
Vendor::Intel => write!(f, "Intel (QSV)"),
Vendor::Apple => write!(f, "Apple (VideoToolbox)"),
}
}
}
@@ -26,6 +28,15 @@ pub struct HardwareInfo {
}
pub fn detect_hardware() -> Result<HardwareInfo> {
// 0. Check for Apple (macOS)
if cfg!(target_os = "macos") {
info!("Detected macOS hardware");
return Ok(HardwareInfo {
vendor: Vendor::Apple,
device_path: None,
});
}
// 1. Check for NVIDIA (Simplest check via nvidia-smi or /dev/nvidiactl)
if Path::new("/dev/nvidiactl").exists() || Command::new("nvidia-smi").output().is_ok() {
info!("Detected NVIDIA hardware");

10
src/lib.rs Normal file
View File

@@ -0,0 +1,10 @@
pub mod analyzer;
pub mod config;
pub mod db;
pub mod hardware;
pub mod orchestrator;
pub mod scanner;
pub mod server;
// This will be the home for Leptos components in the next phase
pub mod app;

View File

@@ -1,11 +1,13 @@
use clap::Parser;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
mod analyzer;
mod hardware;
mod orchestrator;
mod scanner;
use alchemist::{analyzer, config, db, hardware, orchestrator, scanner};
use alchemist::db::JobState;
use alchemist::server::AlchemistEvent;
use tokio::sync::broadcast;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
@@ -21,6 +23,10 @@ struct Args {
/// Output directory (optional, defaults to same as input with .av1)
#[arg(short, long)]
output_dir: Option<PathBuf>,
/// Run as web server
#[arg(long)]
server: bool,
}
#[tokio::main]
@@ -33,66 +39,174 @@ async fn main() -> anyhow::Result<()> {
info!("Alchemist starting...");
let args = Args::parse();
// 0. Load Configuration
let config_path = std::path::Path::new("config.toml");
let config = config::Config::load(config_path).unwrap_or_else(|e| {
warn!("Failed to load config.toml: {}. Using defaults.", e);
config::Config::default()
});
// 1. Hardware Detection
let hw_info = match hardware::detect_hardware() {
Ok(info) => {
info!("Hardware detected: {}", info.vendor);
info
Some(info)
}
Err(e) => {
error!("{}", e);
if !args.dry_run {
warn!("Hardware missing. Exiting as requested by security gates.");
if !config.hardware.allow_cpu_fallback && !args.dry_run {
error!("GPU unavailable. CPU fallback: disabled. Exiting.");
return Err(e);
}
warn!("Hardware missing, but continuing due to dry_run/analysis mode (using dummy Intel config).");
hardware::HardwareInfo {
vendor: hardware::Vendor::Intel,
device_path: Some("/dev/dri/renderD129".to_string()),
}
warn!("GPU unavailable. CPU fallback: enabled.");
None
}
};
// 2. Scan directories
// 2. Initialize Database and Broadcast Channel
let db = db::Db::new("alchemist.db").await?;
let (tx, _rx) = broadcast::channel(100);
info!("Database and Broadcast channel initialized.");
if args.server {
info!("Starting web server...");
alchemist::server::run_server(Arc::new(db), Arc::new(config), tx).await?;
return Ok(());
}
// 3. Scan directories and enqueue jobs
let scanner = scanner::Scanner::new();
let files = scanner.scan(args.directories);
if files.is_empty() {
info!("No media files found to process.");
return Ok(());
}
// 3. Process Queue
let orchestrator = orchestrator::Orchestrator::new();
for file_path in files {
info!("--- Analyzing: {:?} ---", file_path.file_name().unwrap_or_default());
// Preflight Analysis
match analyzer::Analyzer::probe(&file_path) {
Ok(metadata) => {
let (should_encode, reason) = analyzer::Analyzer::should_transcode(&file_path, &metadata);
if should_encode {
info!("Decision: ENCODE - {}", reason);
let mut output_path = file_path.clone();
output_path.set_extension("av1.mkv");
if let Err(e) = orchestrator.transcode_to_av1(&file_path, &output_path, &hw_info, args.dry_run) {
error!("Transcode failed for {:?}: {}", file_path, e);
}
} else {
info!("Decision: SKIP - {}", reason);
}
}
Err(e) => {
error!("Failed to probe {:?}: {}", file_path, e);
} else {
for scanned_file in files {
// Basic output path generation - can be refined later
let mut output_path = scanned_file.path.clone();
output_path.set_extension("av1.mkv");
if let Err(e) = db.enqueue_job(&scanned_file.path, &output_path, scanned_file.mtime).await {
error!("Failed to enqueue job for {:?}: {}", scanned_file.path, e);
}
}
}
// 4. Process Queue
let orchestrator = Arc::new(orchestrator::Orchestrator::new());
let db = Arc::new(db);
let config = Arc::new(config);
let hw_info = Arc::new(hw_info);
let tx = Arc::new(tx);
let semaphore = Arc::new(Semaphore::new(config.transcode.concurrent_jobs));
let mut futures = Vec::new();
while let Some(job) = db.get_next_job().await? {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let db = db.clone();
let orchestrator = orchestrator.clone();
let config = config.clone();
let hw_info = hw_info.clone();
let tx = tx.clone();
let dry_run = args.dry_run;
let future = tokio::spawn(async move {
let _permit = permit; // Hold permit until job is done
let file_path = std::path::PathBuf::from(&job.input_path);
let output_path = std::path::PathBuf::from(&job.output_path);
info!("--- Processing Job {}: {:?} ---", job.id, file_path.file_name().unwrap_or_default());
// 1. ANALYZING
let _ = db.update_job_status(job.id, JobState::Analyzing).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Analyzing });
// Preflight Analysis
match analyzer::Analyzer::probe(&file_path) {
Ok(metadata) => {
let (should_encode, reason) = analyzer::Analyzer::should_transcode(&file_path, &metadata, &config);
if should_encode {
// 2. ENCODING
info!("Decision: ENCODE Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "encode", &reason).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "encode".to_string(), reason: reason.clone() });
let _ = db.update_job_status(job.id, JobState::Encoding).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Encoding });
if let Err(e) = orchestrator.transcode_to_av1(&file_path, &output_path, hw_info.as_ref().as_ref(), dry_run, &metadata, Some((job.id, tx.clone()))) {
error!("Transcode failed for Job {}: {}", job.id, e);
let _ = db.add_decision(job.id, "reject", &e.to_string()).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: e.to_string() });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
} else if !dry_run {
// Size Reduction Gate
let input_size = std::fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
let output_size = std::fs::metadata(&output_path).map(|m| m.len()).unwrap_or(0);
let reduction = 1.0 - (output_size as f64 / input_size as f64);
info!("Job {}: Size reduction: {:.2}% ({} -> {})", job.id, reduction * 100.0, input_size, output_size);
if reduction < config.transcode.size_reduction_threshold {
info!("Job {}: Size reduction gate failed ({:.2}% < {:.0}%). Reverting.",
job.id, reduction * 100.0, config.transcode.size_reduction_threshold * 100.0);
std::fs::remove_file(&output_path).ok();
let _ = db.add_decision(job.id, "skip", "Inefficient: <30% reduction").await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "skip".to_string(), reason: "Inefficient: <30% reduction".to_string() });
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
} else {
// Integrity Check
match analyzer::Analyzer::probe(&output_path) {
Ok(_) => {
info!("Job {}: Size reduction and integrity gate passed. Job completed.", job.id);
let _ = db.update_job_status(job.id, JobState::Completed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Completed });
}
Err(e) => {
error!("Job {}: Integrity check failed for {:?}: {}", job.id, output_path, e);
std::fs::remove_file(&output_path).ok();
let _ = db.add_decision(job.id, "reject", &format!("Integrity check failed: {}", e)).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: format!("Integrity check failed: {}", e) });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
}
}
} else {
let _ = db.update_job_status(job.id, JobState::Completed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Completed });
}
} else {
// 2. SKIPPED
info!("Decision: SKIP Job {} - {}", job.id, reason);
let _ = db.add_decision(job.id, "skip", &reason).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "skip".to_string(), reason: reason.clone() });
let _ = db.update_job_status(job.id, JobState::Skipped).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Skipped });
}
}
Err(e) => {
error!("Job {}: Failed to probe {:?}: {}", job.id, file_path, e);
let _ = db.add_decision(job.id, "reject", &e.to_string()).await;
let _ = tx.send(AlchemistEvent::Decision { job_id: job.id, action: "reject".to_string(), reason: e.to_string() });
let _ = db.update_job_status(job.id, JobState::Failed).await;
let _ = tx.send(AlchemistEvent::JobStateChanged { job_id: job.id, status: JobState::Failed });
}
}
});
futures.push(future);
}
// Wait for all jobs to finish
futures::future::join_all(futures).await;
info!("All jobs processed.");
Ok(())
}

View File

@@ -3,6 +3,8 @@ use std::process::{Command, Stdio};
use anyhow::{anyhow, Result};
use tracing::{info, error};
use crate::hardware::{Vendor, HardwareInfo};
use crate::server::AlchemistEvent;
use tokio::sync::broadcast;
pub struct Orchestrator {
ffmpeg_path: String,
@@ -15,54 +17,90 @@ impl Orchestrator {
}
}
pub fn transcode_to_av1(&self, input: &Path, output: &Path, hw_info: &HardwareInfo, dry_run: bool) -> Result<()> {
pub fn transcode_to_av1(&self, input: &Path, output: &Path, hw_info: Option<&HardwareInfo>, dry_run: bool, metadata: &crate::analyzer::MediaMetadata, event_target: Option<(i64, Arc<broadcast::Sender<AlchemistEvent>>)>) -> Result<()> {
let mut args = vec![
"-hide_banner".to_string(),
"-y".to_string(),
];
// Vendor-specific setup
match hw_info.vendor {
Vendor::Intel => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "qsv".to_string(),
"-qsv_device".to_string(), hw_info.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_qsv".to_string(),
"-preset".to_string(), "medium".to_string(),
"-global_quality".to_string(), "25".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
if let Some(hw) = hw_info {
match hw.vendor {
Vendor::Intel => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "qsv".to_string(),
"-qsv_device".to_string(), hw.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_qsv".to_string(),
"-preset".to_string(), "medium".to_string(),
"-global_quality".to_string(), "25".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
Vendor::Nvidia => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "cuda".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_nvenc".to_string(),
"-preset".to_string(), "p4".to_string(),
"-cq".to_string(), "24".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
Vendor::Amd => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "vaapi".to_string(),
"-vaapi_device".to_string(), hw.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-hwaccel_output_format".to_string(), "vaapi".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-vf".to_string(), "format=nv12|vaapi,hwupload".to_string(),
"-c:v".to_string(), "av1_vaapi".to_string(),
"-qp".to_string(), "25".to_string(),
]);
}
Vendor::Apple => {
args.extend_from_slice(&[
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_videotoolbox".to_string(),
"-bitrate".to_string(), "6M".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
}
Vendor::Nvidia => {
args.extend_from_slice(&[
"-hwaccel".to_string(), "cuda".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "av1_nvenc".to_string(),
"-preset".to_string(), "p4".to_string(),
"-cq".to_string(), "24".to_string(),
"-pix_fmt".to_string(), "p010le".to_string(),
]);
}
Vendor::Amd => {
// Assuming VAAPI for AMD on Linux as it's more common than AMF CLI support in many ffmpeg builds
args.extend_from_slice(&[
"-hwaccel".to_string(), "vaapi".to_string(),
"-vaapi_device".to_string(), hw_info.device_path.as_ref().cloned().unwrap_or_else(|| "/dev/dri/renderD128".to_string()),
"-hwaccel_output_format".to_string(), "vaapi".to_string(),
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-vf".to_string(), "format=nv12|vaapi,hwupload".to_string(),
"-c:v".to_string(), "av1_vaapi".to_string(),
"-qp".to_string(), "25".to_string(),
]);
} else {
// CPU fallback (libaom-av1) - VERY SLOW, but requested via allow_cpu_fallback
args.extend_from_slice(&[
"-i".to_string(), input.to_str().ok_or_else(|| anyhow!("Invalid input path"))?.to_string(),
"-c:v".to_string(), "libaom-av1".to_string(),
"-crf".to_string(), "30".to_string(),
"-cpu-used".to_string(), "8".to_string(), // Faster preset for CPU
"-pix_fmt".to_string(), "yuv420p10le".to_string(),
]);
}
// Audio and Subtitle Mapping
args.extend_from_slice(&["-map".to_string(), "0:v:0".to_string()]);
let mut audio_count = 0;
for (i, stream) in metadata.streams.iter().enumerate() {
if stream.codec_type == "audio" {
args.extend_from_slice(&["-map".to_string(), format!("0:a:{}", audio_count)]);
if crate::analyzer::Analyzer::should_transcode_audio(stream) {
args.extend_from_slice(&[format!("-c:a:{}", audio_count), "libopus".to_string(), format!("-b:a:{}", audio_count), "192k".to_string()]);
} else {
args.extend_from_slice(&[format!("-c:a:{}", audio_count), "copy".to_string()]);
}
audio_count += 1;
} else if stream.codec_type == "subtitle" {
args.extend_from_slice(&["-map".to_string(), format!("0:s:{}", i - audio_count - 1)]); // Simplified mapping
args.extend_from_slice(&["-c:s".to_string(), "copy".to_string()]);
}
}
// Common arguments
args.extend_from_slice(&[
"-c:a".to_string(), "copy".to_string(),
output.to_str().ok_or_else(|| anyhow!("Invalid output path"))?.to_string(),
]);
// If no subtitles were found or mapping is complex, fallback to a simpler copy all if needed
// But for now, let's just map all and copy.
args.push(output.to_str().ok_or_else(|| anyhow!("Invalid output path"))?.to_string());
info!("Command: {} {}", self.ffmpeg_path, args.join(" "));
@@ -77,6 +115,32 @@ impl Orchestrator {
.stderr(Stdio::piped())
.spawn()?;
let stderr = child.stderr.take().ok_or_else(|| anyhow!("Failed to capture stderr"))?;
let reader = std::io::BufReader::new(stderr);
use std::io::BufRead;
for line in reader.lines() {
let line = line?;
if let Some((job_id, ref tx)) = event_target {
let _ = tx.send(AlchemistEvent::Log { job_id, message: line.clone() });
if line.contains("time=") {
// simple parse for time=00:00:00.00
if let Some(time_part) = line.split("time=").nth(1) {
let time_str = time_part.split_whitespace().next().unwrap_or("");
info!("Progress: time={}", time_str);
let _ = tx.send(AlchemistEvent::Progress { job_id, percentage: 0.0, time: time_str.to_string() });
}
}
} else if line.contains("time=") {
// simple parse for time=00:00:00.00
if let Some(time_part) = line.split("time=").nth(1) {
let time_str = time_part.split_whitespace().next().unwrap_or("");
info!("Progress: time={}", time_str);
}
}
}
let status = child.wait()?;
if status.success() {

View File

@@ -1,6 +1,14 @@
use std::path::PathBuf;
use std::time::SystemTime;
use walkdir::WalkDir;
use tracing::{info, debug};
use rayon::prelude::*;
use std::sync::{Arc, Mutex};
pub struct ScannedFile {
pub path: PathBuf,
pub mtime: SystemTime,
}
pub struct Scanner {
extensions: Vec<String>,
@@ -19,22 +27,34 @@ impl Scanner {
}
}
pub fn scan(&self, directories: Vec<PathBuf>) -> Vec<PathBuf> {
let mut files = Vec::new();
for dir in directories {
pub fn scan(&self, directories: Vec<PathBuf>) -> Vec<ScannedFile> {
let files = Arc::new(Mutex::new(Vec::new()));
directories.into_par_iter().for_each(|dir| {
info!("Scanning directory: {:?}", dir);
let mut local_files = Vec::new();
for entry in WalkDir::new(dir).into_iter().filter_map(|e| e.ok()) {
if entry.file_type().is_file() {
if let Some(ext) = entry.path().extension().and_then(|s| s.to_str()) {
if self.extensions.contains(&ext.to_lowercase()) {
debug!("Found media file: {:?}", entry.path());
files.push(entry.path().to_path_buf());
let mtime = entry.metadata().map(|m| m.modified().unwrap_or(SystemTime::UNIX_EPOCH)).unwrap_or(SystemTime::UNIX_EPOCH);
local_files.push(ScannedFile {
path: entry.path().to_path_buf(),
mtime,
});
}
}
}
}
}
info!("Found {} candidate media files", files.len());
files
files.lock().unwrap().extend(local_files);
});
let mut final_files = files.lock().unwrap().clone();
// Deterministic ordering
final_files.sort_by(|a, b| a.path.cmp(&b.path));
info!("Found {} candidate media files", final_files.len());
final_files
}
}

65
src/server.rs Normal file
View File

@@ -0,0 +1,65 @@
use axum::{
routing::{get, post},
Router,
response::sse::{Event as AxumEvent, Sse},
};
use futures::stream::Stream;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use std::convert::Infallible;
use leptos::*;
use leptos_axum::{generate_route_list, LeptosRoutes};
use std::sync::Arc;
use crate::app::*;
use crate::db::{Db, JobState};
use crate::config::Config;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum AlchemistEvent {
JobStateChanged { job_id: i64, status: JobState },
Progress { job_id: i64, percentage: f64, time: String },
Decision { job_id: i64, action: String, reason: String },
Log { job_id: i64, message: String },
}
pub async fn run_server(db: Arc<Db>, config: Arc<Config>, tx: broadcast::Sender<AlchemistEvent>) -> anyhow::Result<()> {
let conf = get_configuration(None).await.unwrap();
let leptos_options = conf.leptos_options;
let addr = leptos_options.site_addr;
let routes = generate_route_list(App);
let app = Router::new()
.route("/api/events", get(sse_handler))
.route("/api/*fn_name", post(leptos_axum::handle_server_fns))
.leptos_routes(&leptos_options, routes, App)
.with_state(leptos_options)
.layer(axum::Extension(db))
.layer(axum::Extension(config))
.layer(axum::Extension(tx));
// run our app with hyper
// `axum::Server` is re-exported from `hyper`
info!("listening on http://{}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
async fn sse_handler(
axum::Extension(tx): axum::Extension<broadcast::Sender<AlchemistEvent>>,
) -> Sse<impl Stream<Item = Result<AxumEvent, Infallible>>> {
let rx = tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|msg| {
match msg {
Ok(event) => {
let json = serde_json::to_string(&event).ok()?;
Some(Ok(AxumEvent::default().data(json)))
}
Err(_) => None,
}
});
Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
}