completed phase 2, 3, added notifications

This commit is contained in:
brooklyn
2026-01-08 14:01:59 -05:00
parent 301903a188
commit ee09b635d1
10 changed files with 733 additions and 117 deletions

143
Cargo.lock generated
View File

@@ -52,7 +52,7 @@ dependencies = [
"tokio-stream",
"toml",
"tower 0.4.13",
"tower-http",
"tower-http 0.5.2",
"tracing",
"tracing-subscriber",
"walkdir",
@@ -1139,6 +1139,7 @@ version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f"
dependencies = [
"base64 0.22.1",
"bytes",
"futures-channel",
"futures-core",
@@ -1146,12 +1147,16 @@ dependencies = [
"http",
"http-body",
"hyper",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"system-configuration",
"tokio",
"tower-service",
"tracing",
"windows-registry",
]
[[package]]
@@ -1333,6 +1338,16 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "iri-string"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
@@ -1347,10 +1362,11 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "js-sys"
version = "0.3.70"
version = "0.3.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a"
checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8"
dependencies = [
"once_cell",
"wasm-bindgen",
]
@@ -1385,9 +1401,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.179"
version = "0.2.180"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f"
checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc"
[[package]]
name = "libm"
@@ -1914,15 +1930,14 @@ checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "reqwest"
version = "0.12.12"
version = "0.12.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
@@ -1931,29 +1946,26 @@ dependencies = [
"hyper-rustls",
"hyper-tls",
"hyper-util",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
"tower 0.5.2",
"tower-http 0.6.8",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"windows-registry",
]
[[package]]
@@ -2051,15 +2063,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustls-pemfile"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.13.2"
@@ -2896,6 +2899,24 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
dependencies = [
"bitflags 2.10.0",
"bytes",
"futures-util",
"http",
"http-body",
"iri-string",
"pin-project-lite",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
@@ -3129,47 +3150,35 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.93"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5"
checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.114",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.43"
version = "0.4.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed"
checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c"
dependencies = [
"cfg-if",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.93"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf"
checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -3177,28 +3186,31 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.93"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.114",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.93"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4"
dependencies = [
"unicode-ident",
]
[[package]]
name = "web-sys"
version = "0.3.70"
version = "0.3.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0"
checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -3254,8 +3266,8 @@ dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result 0.4.1",
"windows-strings 0.5.1",
"windows-result",
"windows-strings",
]
[[package]]
@@ -3288,22 +3300,13 @@ checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-registry"
version = "0.2.0"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0"
checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720"
dependencies = [
"windows-result 0.2.0",
"windows-strings 0.1.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-result"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e"
dependencies = [
"windows-targets 0.52.6",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
@@ -3315,16 +3318,6 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10"
dependencies = [
"windows-result 0.2.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-strings"
version = "0.5.1"

374
public/js/sse.js Normal file
View File

@@ -0,0 +1,374 @@
/*
Server Sent Events Extension
============================
This extension adds support for Server Sent Events to htmx. See /www/extensions/sse.md for usage instructions.
*/
(function() {
if (htmx.version && !htmx.version.startsWith("1.")) {
console.warn("WARNING: You are using an htmx 1 extension with htmx " + htmx.version +
". It is recommended that you move to the version of this extension found on https://htmx.org/extensions")
}
/** @type {import("../htmx").HtmxInternalApi} */
var api;
htmx.defineExtension("sse", {
/**
* Init saves the provided reference to the internal HTMX API.
*
* @param {import("../htmx").HtmxInternalApi} api
* @returns void
*/
init: function(apiRef) {
// store a reference to the internal API.
api = apiRef;
// set a function in the public API for creating new EventSource objects
if (htmx.createEventSource == undefined) {
htmx.createEventSource = createEventSource;
}
},
/**
* onEvent handles all events passed to this extension.
*
* @param {string} name
* @param {Event} evt
* @returns void
*/
onEvent: function(name, evt) {
var parent = evt.target || evt.detail.elt;
switch (name) {
case "htmx:beforeCleanupElement":
var internalData = api.getInternalData(parent)
// Try to remove remove an EventSource when elements are removed
if (internalData.sseEventSource) {
internalData.sseEventSource.close();
}
return;
// Try to create EventSources when elements are processed
case "htmx:afterProcessNode":
ensureEventSourceOnElement(parent);
}
}
});
///////////////////////////////////////////////
// HELPER FUNCTIONS
///////////////////////////////////////////////
/**
* createEventSource is the default method for creating new EventSource objects.
* it is hoisted into htmx.config.createEventSource to be overridden by the user, if needed.
*
* @param {string} url
* @returns EventSource
*/
function createEventSource(url) {
return new EventSource(url, { withCredentials: true });
}
function splitOnWhitespace(trigger) {
return trigger.trim().split(/\s+/);
}
function getLegacySSEURL(elt) {
var legacySSEValue = api.getAttributeValue(elt, "hx-sse");
if (legacySSEValue) {
var values = splitOnWhitespace(legacySSEValue);
for (var i = 0; i < values.length; i++) {
var value = values[i].split(/:(.+)/);
if (value[0] === "connect") {
return value[1];
}
}
}
}
function getLegacySSESwaps(elt) {
var legacySSEValue = api.getAttributeValue(elt, "hx-sse");
var returnArr = [];
if (legacySSEValue != null) {
var values = splitOnWhitespace(legacySSEValue);
for (var i = 0; i < values.length; i++) {
var value = values[i].split(/:(.+)/);
if (value[0] === "swap") {
returnArr.push(value[1]);
}
}
}
return returnArr;
}
/**
* registerSSE looks for attributes that can contain sse events, right
* now hx-trigger and sse-swap and adds listeners based on these attributes too
* the closest event source
*
* @param {HTMLElement} elt
*/
function registerSSE(elt) {
// Add message handlers for every `sse-swap` attribute
queryAttributeOnThisOrChildren(elt, "sse-swap").forEach(function (child) {
// Find closest existing event source
var sourceElement = api.getClosestMatch(child, hasEventSource);
if (sourceElement == null) {
// api.triggerErrorEvent(elt, "htmx:noSSESourceError")
return null; // no eventsource in parentage, orphaned element
}
// Set internalData and source
var internalData = api.getInternalData(sourceElement);
var source = internalData.sseEventSource;
var sseSwapAttr = api.getAttributeValue(child, "sse-swap");
if (sseSwapAttr) {
var sseEventNames = sseSwapAttr.split(",");
} else {
var sseEventNames = getLegacySSESwaps(child);
}
for (var i = 0; i < sseEventNames.length; i++) {
var sseEventName = sseEventNames[i].trim();
var listener = function(event) {
// If the source is missing then close SSE
if (maybeCloseSSESource(sourceElement)) {
return;
}
// If the body no longer contains the element, remove the listener
if (!api.bodyContains(child)) {
source.removeEventListener(sseEventName, listener);
return;
}
// swap the response into the DOM and trigger a notification
if(!api.triggerEvent(elt, "htmx:sseBeforeMessage", event)) {
return;
}
swap(child, event.data);
api.triggerEvent(elt, "htmx:sseMessage", event);
};
// Register the new listener
api.getInternalData(child).sseEventListener = listener;
source.addEventListener(sseEventName, listener);
}
});
// Add message handlers for every `hx-trigger="sse:*"` attribute
queryAttributeOnThisOrChildren(elt, "hx-trigger").forEach(function(child) {
// Find closest existing event source
var sourceElement = api.getClosestMatch(child, hasEventSource);
if (sourceElement == null) {
// api.triggerErrorEvent(elt, "htmx:noSSESourceError")
return null; // no eventsource in parentage, orphaned element
}
// Set internalData and source
var internalData = api.getInternalData(sourceElement);
var source = internalData.sseEventSource;
var sseEventName = api.getAttributeValue(child, "hx-trigger");
if (sseEventName == null) {
return;
}
// Only process hx-triggers for events with the "sse:" prefix
if (sseEventName.slice(0, 4) != "sse:") {
return;
}
// remove the sse: prefix from here on out
sseEventName = sseEventName.substr(4);
var listener = function() {
if (maybeCloseSSESource(sourceElement)) {
return
}
if (!api.bodyContains(child)) {
source.removeEventListener(sseEventName, listener);
}
}
});
}
/**
* ensureEventSourceOnElement creates a new EventSource connection on the provided element.
* If a usable EventSource already exists, then it is returned. If not, then a new EventSource
* is created and stored in the element's internalData.
* @param {HTMLElement} elt
* @param {number} retryCount
* @returns {EventSource | null}
*/
function ensureEventSourceOnElement(elt, retryCount) {
if (elt == null) {
return null;
}
// handle extension source creation attribute
queryAttributeOnThisOrChildren(elt, "sse-connect").forEach(function(child) {
var sseURL = api.getAttributeValue(child, "sse-connect");
if (sseURL == null) {
return;
}
ensureEventSource(child, sseURL, retryCount);
});
// handle legacy sse, remove for HTMX2
queryAttributeOnThisOrChildren(elt, "hx-sse").forEach(function(child) {
var sseURL = getLegacySSEURL(child);
if (sseURL == null) {
return;
}
ensureEventSource(child, sseURL, retryCount);
});
registerSSE(elt);
}
function ensureEventSource(elt, url, retryCount) {
var source = htmx.createEventSource(url);
source.onerror = function(err) {
// Log an error event
api.triggerErrorEvent(elt, "htmx:sseError", { error: err, source: source });
// If parent no longer exists in the document, then clean up this EventSource
if (maybeCloseSSESource(elt)) {
return;
}
// Otherwise, try to reconnect the EventSource
if (source.readyState === EventSource.CLOSED) {
retryCount = retryCount || 0;
var timeout = Math.random() * (2 ^ retryCount) * 500;
window.setTimeout(function() {
ensureEventSourceOnElement(elt, Math.min(7, retryCount + 1));
}, timeout);
}
};
source.onopen = function(evt) {
api.triggerEvent(elt, "htmx:sseOpen", { source: source });
}
api.getInternalData(elt).sseEventSource = source;
}
/**
* maybeCloseSSESource confirms that the parent element still exists.
* If not, then any associated SSE source is closed and the function returns true.
*
* @param {HTMLElement} elt
* @returns boolean
*/
function maybeCloseSSESource(elt) {
if (!api.bodyContains(elt)) {
var source = api.getInternalData(elt).sseEventSource;
if (source != undefined) {
source.close();
// source = null
return true;
}
}
return false;
}
/**
* queryAttributeOnThisOrChildren returns all nodes that contain the requested attributeName, INCLUDING THE PROVIDED ROOT ELEMENT.
*
* @param {HTMLElement} elt
* @param {string} attributeName
*/
function queryAttributeOnThisOrChildren(elt, attributeName) {
var result = [];
// If the parent element also contains the requested attribute, then add it to the results too.
if (api.hasAttribute(elt, attributeName)) {
result.push(elt);
}
// Search all child nodes that match the requested attribute
elt.querySelectorAll("[" + attributeName + "], [data-" + attributeName + "]").forEach(function(node) {
result.push(node);
});
return result;
}
/**
* @param {HTMLElement} elt
* @param {string} content
*/
function swap(elt, content) {
api.withExtensions(elt, function(extension) {
content = extension.transformResponse(content, null, elt);
});
var swapSpec = api.getSwapSpecification(elt);
var target = api.getTarget(elt);
var settleInfo = api.makeSettleInfo(elt);
api.selectAndSwap(swapSpec.swapStyle, target, elt, content, settleInfo);
settleInfo.elts.forEach(function(elt) {
if (elt.classList) {
elt.classList.add(htmx.config.settlingClass);
}
api.triggerEvent(elt, 'htmx:beforeSettle');
});
// Handle settle tasks (with delay if requested)
if (swapSpec.settleDelay > 0) {
setTimeout(doSettle(settleInfo), swapSpec.settleDelay);
} else {
doSettle(settleInfo)();
}
}
/**
* doSettle mirrors much of the functionality in htmx that
* settles elements after their content has been swapped.
* TODO: this should be published by htmx, and not duplicated here
* @param {import("../htmx").HtmxSettleInfo} settleInfo
* @returns () => void
*/
function doSettle(settleInfo) {
return function() {
settleInfo.tasks.forEach(function(task) {
task.call();
});
settleInfo.elts.forEach(function(elt) {
if (elt.classList) {
elt.classList.remove(htmx.config.settlingClass);
}
api.triggerEvent(elt, 'htmx:afterSettle');
});
}
}
function hasEventSource(node) {
return api.getInternalData(node).sseEventSource != null;
}
})();

View File

@@ -66,6 +66,7 @@ pub struct Job {
pub priority: i32,
pub progress: f64,
pub attempt_count: i32,
pub vmaf_score: Option<f64>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -87,6 +88,10 @@ impl Job {
pub fn progress_fixed(&self) -> String {
format!("{:.1}", self.progress)
}
pub fn vmaf_fixed(&self) -> String {
self.vmaf_score.map(|s| format!("{:.1}", s)).unwrap_or_else(|| "N/A".to_string())
}
}
#[derive(Debug, Serialize, Deserialize, Clone, sqlx::FromRow)]
@@ -250,6 +255,7 @@ impl Db {
"SELECT id, input_path, output_path, status, NULL as decision_reason,
COALESCE(priority, 0) as priority, COALESCE(progress, 0.0) as progress,
COALESCE(attempt_count, 0) as attempt_count,
NULL as vmaf_score,
created_at, updated_at
FROM jobs WHERE status = 'queued'
ORDER BY priority DESC, created_at ASC LIMIT 1",
@@ -288,6 +294,7 @@ impl Db {
COALESCE(j.priority, 0) as priority,
COALESCE(j.progress, 0.0) as progress,
COALESCE(j.attempt_count, 0) as attempt_count,
(SELECT vmaf_score FROM encode_stats WHERE job_id = j.id) as vmaf_score,
j.created_at, j.updated_at
FROM jobs j
ORDER BY j.updated_at DESC",
@@ -395,6 +402,7 @@ impl Db {
COALESCE(j.priority, 0) as priority,
COALESCE(j.progress, 0.0) as progress,
COALESCE(j.attempt_count, 0) as attempt_count,
(SELECT vmaf_score FROM encode_stats WHERE job_id = j.id) as vmaf_score,
j.created_at, j.updated_at
FROM jobs j
WHERE j.id = ?",
@@ -414,6 +422,7 @@ impl Db {
COALESCE(j.priority, 0) as priority,
COALESCE(j.progress, 0.0) as progress,
COALESCE(j.attempt_count, 0) as attempt_count,
(SELECT vmaf_score FROM encode_stats WHERE job_id = j.id) as vmaf_score,
j.created_at, j.updated_at
FROM jobs j
WHERE j.status = ?
@@ -451,4 +460,18 @@ impl Db {
.await?;
Ok(())
}
pub async fn restart_failed_jobs(&self) -> Result<u64> {
let result = sqlx::query("UPDATE jobs SET status = 'queued', updated_at = CURRENT_TIMESTAMP WHERE status IN ('failed', 'cancelled')")
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn clear_completed_jobs(&self) -> Result<u64> {
let result = sqlx::query("DELETE FROM jobs WHERE status = 'completed'")
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
}

View File

@@ -160,6 +160,23 @@ async fn main() -> Result<()> {
if args.server {
info!("Starting web server...");
// Start File Watcher if directories are configured
if !config.scanner.directories.is_empty() {
let watcher_dirs: Vec<PathBuf> = config
.scanner.directories
.iter()
.map(PathBuf::from)
.collect();
let watcher = alchemist::system::watcher::FileWatcher::new(watcher_dirs, db.clone());
let watcher_handle = watcher.clone();
tokio::spawn(async move {
if let Err(e) = watcher_handle.start().await {
error!("File watcher failed: {}", e);
}
});
}
alchemist::server::run_server(db, config, agent, transcoder, tx).await?;
} else {
// CLI Mode

View File

@@ -1,14 +1,16 @@
use crate::config::Config;
use crate::db::{AlchemistEvent, Db, JobState};
use crate::db::{AlchemistEvent, Db, Job, JobState};
use crate::error::Result;
use crate::media::analyzer::Analyzer;
use crate::media::scanner::Scanner;
use crate::system::hardware::HardwareInfo;
use crate::system::notifications::NotificationService;
use crate::Transcoder;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, Semaphore};
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
pub struct Agent {
db: Arc<Db>,
@@ -17,6 +19,8 @@ pub struct Agent {
hw_info: Arc<Option<HardwareInfo>>,
tx: Arc<broadcast::Sender<AlchemistEvent>>,
semaphore: Arc<Semaphore>,
notifications: NotificationService,
paused: Arc<AtomicBool>,
dry_run: bool,
}
@@ -30,6 +34,7 @@ impl Agent {
dry_run: bool,
) -> Self {
let concurrent_jobs = config.transcode.concurrent_jobs;
let notifications = NotificationService::new(config.notifications.clone());
Self {
db,
orchestrator,
@@ -37,6 +42,8 @@ impl Agent {
hw_info: Arc::new(hw_info),
tx: Arc::new(tx),
semaphore: Arc::new(Semaphore::new(concurrent_jobs)),
notifications,
paused: Arc::new(AtomicBool::new(false)),
dry_run,
}
}
@@ -65,10 +72,29 @@ impl Agent {
}); // Trigger UI refresh
Ok(())
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
info!("Engine paused.");
}
pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
info!("Engine resumed.");
}
pub async fn run_loop(self: Arc<Self>) {
info!("Agent loop started.");
loop {
if self.is_paused() {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
continue;
}
match self.db.get_next_job().await {
Ok(Some(job)) => {
let permit = self.semaphore.clone().acquire_owned().await.unwrap();
@@ -168,7 +194,7 @@ impl Agent {
.await
{
Ok(_) => {
self.finalize_job(job.id, &file_path, &output_path, start_time)
self.finalize_job(job, &file_path, &output_path, start_time)
.await
}
Err(e) => {
@@ -176,6 +202,7 @@ impl Agent {
self.update_job_state(job.id, JobState::Cancelled).await
} else {
error!("Job {}: Transcode failed: {}", job.id, e);
let _ = self.notifications.notify_job_failed(&job, &e.to_string()).await;
self.update_job_state(job.id, JobState::Failed).await?;
Err(e)
}
@@ -193,11 +220,12 @@ impl Agent {
async fn finalize_job(
&self,
job_id: i64,
job: Job,
input_path: &std::path::Path,
output_path: &std::path::Path,
start_time: std::time::Instant,
) -> Result<()> {
let job_id = job.id;
// Integrity & Size Reduction check
let input_metadata = match std::fs::metadata(input_path) {
Ok(m) => m,
@@ -226,37 +254,83 @@ impl Agent {
}
let reduction = 1.0 - (output_size as f64 / input_size as f64);
let encode_duration = start_time.elapsed().as_secs_f64();
// Check reduction threshold
if output_size == 0 || reduction < self.config.transcode.size_reduction_threshold {
warn!(
"Job {}: Size reduction gate failed ({:.2}%). Reverting.",
job_id,
reduction * 100.0
);
if let Err(e) = std::fs::remove_file(output_path) {
error!("Job {}: Failed to remove inefficient output: {}", job_id, e);
}
let reason = if output_size == 0 {
"Empty output"
} else {
"Inefficient reduction"
};
if let Err(e) = self.db.add_decision(job_id, "skip", reason).await {
error!("Job {}: Failed to add decision to DB: {}", job_id, e);
}
let _ = std::fs::remove_file(output_path);
let reason = if output_size == 0 { "Empty output" } else { "Inefficient reduction" };
let _ = self.db.add_decision(job_id, "skip", reason).await;
self.update_job_state(job_id, JobState::Skipped).await?;
} else {
let encode_duration = start_time.elapsed();
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
info!("✅ Job #{} COMPLETED", job_id);
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
info!(" Input Size: {} MB", input_size / 1_048_576);
info!(" Output Size: {} MB", output_size / 1_048_576);
info!(" Reduction: {:.1}%", reduction * 100.0);
info!(" Duration: {:.2}s", encode_duration.as_secs_f64());
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
self.update_job_state(job_id, JobState::Completed).await?;
return Ok(());
}
// 2. QUALITY GATE (VMAF)
let mut vmaf_score = None;
if self.config.quality.enable_vmaf {
info!("[Job {}] Phase 2: Computing VMAF quality score...", job_id);
match crate::media::ffmpeg::QualityScore::compute(input_path, output_path) {
Ok(score) => {
vmaf_score = score.vmaf;
if let Some(s) = vmaf_score {
info!("[Job {}] VMAF Score: {:.2}", job_id, s);
if s < self.config.quality.min_vmaf_score && self.config.quality.revert_on_low_quality {
warn!("Job {}: Quality gate failed ({:.2} < {}). Reverting.", job_id, s, self.config.quality.min_vmaf_score);
let _ = std::fs::remove_file(output_path);
let _ = self.db.add_decision(job_id, "skip", "Low quality (VMAF)").await;
self.update_job_state(job_id, JobState::Skipped).await?;
return Ok(());
}
}
}
Err(e) => {
warn!("[Job {}] VMAF computation failed: {}", job_id, e);
}
}
}
// Finalize results
let bitrate = (output_size as f64 * 8.0) / (encode_duration * 1000.0); // Rough estimate
let _ = self.db.save_encode_stats(
job_id,
input_size,
output_size,
reduction,
encode_duration,
1.0, // Speed calculation could be refined
bitrate,
vmaf_score,
).await;
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
info!("✅ Job #{} COMPLETED", job_id);
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
info!(" Input Size: {} MB", input_size / 1_048_576);
info!(" Output Size: {} MB", output_size / 1_048_576);
info!(" Reduction: {:.1}%", reduction * 100.0);
if let Some(s) = vmaf_score {
info!(" VMAF Score: {:.2}", s);
}
info!(" Duration: {:.2}s", encode_duration);
info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
self.update_job_state(job_id, JobState::Completed).await?;
// Notifications
let stats_msg = format!(
"📊 {} MB -> {} MB ({:.1}% reduction), VMAF: {}",
input_size / 1_048_576,
output_size / 1_048_576,
reduction * 100.0,
vmaf_score.map(|s| format!("{:.2}", s)).unwrap_or_else(|| "N/A".to_string())
);
let _ = self.notifications.notify_job_complete(&job, Some(&stats_msg)).await;
Ok(())
}
}

View File

@@ -53,9 +53,14 @@ pub async fn run_server(
.route("/api/scan", post(scan_handler))
.route("/api/stats", get(stats_handler))
.route("/api/jobs/table", get(jobs_table_handler))
.route("/api/jobs/restart-failed", post(restart_failed_handler))
.route("/api/jobs/clear-completed", post(clear_completed_handler))
.route("/api/jobs/:id/cancel", post(cancel_job_handler))
.route("/api/jobs/:id/restart", post(restart_job_handler))
.route("/api/events", get(sse_handler))
.route("/api/engine/pause", post(pause_engine_handler))
.route("/api/engine/resume", post(resume_engine_handler))
.route("/api/engine/status", get(engine_status_handler))
.route("/assets/*file", get(static_handler))
.with_state(state);
@@ -73,6 +78,7 @@ struct DashboardTemplate {
active_page: &'static str,
stats: StatsData,
jobs: Vec<Job>,
engine_paused: bool,
}
#[derive(Template)]
@@ -94,6 +100,12 @@ struct JobsTablePartialTemplate {
jobs: Vec<Job>,
}
#[derive(Template)]
#[template(path = "partials/engine_control.html")]
struct EngineControlPartialTemplate {
engine_paused: bool,
}
struct StatsData {
total: i64,
completed: i64,
@@ -118,6 +130,7 @@ async fn dashboard_handler(State(state): State<Arc<AppState>>) -> impl IntoRespo
active_page: "dashboard",
stats,
jobs,
engine_paused: state.agent.is_paused(),
}
}
@@ -163,14 +176,50 @@ async fn restart_job_handler(
axum::http::StatusCode::OK
}
async fn restart_failed_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let _ = state.db.restart_failed_jobs().await;
axum::http::StatusCode::OK
}
async fn clear_completed_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let _ = state.db.clear_completed_jobs().await;
axum::http::StatusCode::OK
}
async fn pause_engine_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
state.agent.pause();
EngineControlPartialTemplate { engine_paused: true }
}
async fn resume_engine_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
state.agent.resume();
EngineControlPartialTemplate { engine_paused: false }
}
async fn engine_status_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
if state.agent.is_paused() {
"paused"
} else {
"running"
}
}
async fn sse_handler(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = std::result::Result<AxumEvent, Infallible>>> {
let rx = state.tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|msg| match msg {
Ok(event) => {
let json = serde_json::to_string(&event).ok()?;
Some(Ok(AxumEvent::default().data(json)))
let (event_name, data) = match &event {
AlchemistEvent::Log { message, .. } => ("log", message.clone()),
AlchemistEvent::Progress { job_id, percentage, time } => {
( "progress", format!("{{\"job_id\": {}, \"percentage\": {:.1}, \"time\": \"{}\"}}", job_id, percentage, time))
}
AlchemistEvent::JobStateChanged { job_id, status } => {
("status", format!("{{\"job_id\": {}, \"status\": \"{:?}\"}}", job_id, status))
}
};
Some(Ok(AxumEvent::default().event(event_name).data(data)))
}
Err(_) => None,
});

View File

@@ -1,36 +1,55 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Alchemist - Transcoding Engine</title>
<link rel="stylesheet" href="/assets/css/app.css">
<script src="/assets/js/htmx.min.js"></script>
<script src="/assets/js/sse.js"></script>
</head>
<body hx-boost="true">
<div class="app-container">
<aside class="sidebar">
<div class="logo-container">
<div class="logo-icon" style="background: rgb(var(--brand-primary)); border-radius: 6px; padding: 4px; display: flex; align-items: center; justify-content: center;">
<svg class="w-5 h-5 text-white" fill="none" stroke="currentColor" viewBox="0 0 24 24" style="width: 20px; height: 20px;"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M19.428 15.428a2 2 0 00-1.022-.547l-2.387-.477a6 6 0 00-3.86.517l-.644.322a6 6 0 01-3.86.517l-2.387-.477a2 2 0 00-1.022.547l-1.168 1.168a2 2 0 001.61 3.412h13.44a2 2 0 001.61-3.412l-1.168-1.168zM12 5a3 3 0 100 6 3 3 0 000-6z"></path></svg>
<div class="logo-icon"
style="background: rgb(var(--brand-primary)); border-radius: 6px; padding: 4px; display: flex; align-items: center; justify-content: center;">
<svg class="w-5 h-5 text-white" fill="none" stroke="currentColor" viewBox="0 0 24 24"
style="width: 20px; height: 20px;">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M19.428 15.428a2 2 0 00-1.022-.547l-2.387-.477a6 6 0 00-3.86.517l-.644.322a6 6 0 01-3.86.517l-2.387-.477a2 2 0 00-1.022.547l-1.168 1.168a2 2 0 001.61 3.412h13.44a2 2 0 001.61-3.412l-1.168-1.168zM12 5a3 3 0 100 6 3 3 0 000-6z">
</path>
</svg>
</div>
<span class="logo-text">Alchemist</span>
</div>
<nav class="nav-links">
<a href="/" class="nav-link {% if active_page == "dashboard" %}active{% endif %}">
<svg fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M3 12l2-2m0 0l7-7 7 7M5 10v10a1 1 0 001 1h3m10-11l2 2m-2-2v10a1 1 0 01-1 1h-3m-6 0a1 1 0 001-1v-4a1 1 0 011-1h2a1 1 0 011 1v4a1 1 0 001 1m-6 0h6"></path></svg>
<a href="/" class="nav-link {% if active_page == " dashboard" %}active{% endif %}">
<svg fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M3 12l2-2m0 0l7-7 7 7M5 10v10a1 1 0 001 1h3m10-11l2 2m-2-2v10a1 1 0 01-1 1h-3m-6 0a1 1 0 001-1v-4a1 1 0 011-1h2a1 1 0 011 1v4a1 1 0 001 1m-6 0h6">
</path>
</svg>
Dashboard
</a>
<a href="/settings" class="nav-link {% if active_page == "settings" %}active{% endif %}">
<svg fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.325 4.317c.426-1.756 2.924-1.756 3.35 0a1.724 1.724 0 002.573 1.066c1.543-.94 3.31.826 2.37 2.37a1.724 1.724 0 001.065 2.572c1.756.426 1.756 2.924 0 3.35a1.724 1.724 0 00-1.066 2.573c.94 1.543-.826 3.31-2.37 2.37a1.724 1.724 0 00-2.572 1.065c-.426 1.756-2.924 1.756-3.35 0a1.724 1.724 0 00-2.573-1.066-1.543.94-3.31-.826-2.37-2.37a1.724 1.724 0 00-1.065-2.572c-1.756-.426-1.756-2.924 0-3.35a1.724 1.724 0 001.066-2.573-0.94-1.543.826-3.31 2.37-2.37.996.608 2.296.07 2.572-1.065z"></path><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 12a3 3 0 11-6 0 3 3 0 016 0z"></path></svg>
<a href="/settings" class="nav-link {% if active_page == " settings" %}active{% endif %}">
<svg fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M10.325 4.317c.426-1.756 2.924-1.756 3.35 0a1.724 1.724 0 002.573 1.066c1.543-.94 3.31.826 2.37 2.37a1.724 1.724 0 001.065 2.572c1.756.426 1.756 2.924 0 3.35a1.724 1.724 0 00-1.066 2.573c.94 1.543-.826 3.31-2.37 2.37a1.724 1.724 0 00-2.572 1.065c-.426 1.756-2.924 1.756-3.35 0a1.724 1.724 0 00-2.573-1.066-1.543.94-3.31-.826-2.37-2.37a1.724 1.724 0 00-1.065-2.572c-1.756-.426-1.756-2.924 0-3.35a1.724 1.724 0 001.066-2.573-0.94-1.543.826-3.31 2.37-2.37.996.608 2.296.07 2.572-1.065z">
</path>
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M15 12a3 3 0 11-6 0 3 3 0 016 0z"></path>
</svg>
Settings
</a>
</nav>
<div class="engine-status">
<div class="pulse"></div>
<span class="text-xs font-medium text-slate-400">Engine Online</span>
<div class="pulse"></div>
<span class="text-xs font-medium text-slate-400">Engine Online</span>
</div>
</aside>
@@ -39,4 +58,5 @@
</main>
</div>
</body>
</html>
</html>

View File

@@ -4,10 +4,28 @@
<div class="dashboard-container">
<header class="flex justify-between items-center mb-12">
<div>
<h1 class="text-4xl">Dashboard</h1>
<div class="flex items-center gap-3">
<h1 class="text-4xl">Dashboard</h1>
{% if engine_paused %}
<span
class="px-2 py-1 rounded bg-amber-500/10 text-amber-500 text-[10px] font-bold uppercase tracking-wider border border-amber-500/20">Paused</span>
{% else %}
<span
class="px-2 py-1 rounded bg-emerald-500/10 text-emerald-500 text-[10px] font-bold uppercase tracking-wider border border-emerald-500/20">Running</span>
{% endif %}
</div>
<p class="text-slate-400 mt-1">Next-Gen Transcoding Engine</p>
</div>
<div class="flex gap-4">
{% include "partials/engine_control.html" %}
<button hx-post="/api/jobs/restart-failed" hx-swap="none"
class="btn border border-slate-800 text-slate-400 hover:text-slate-200">
Restart Failed
</button>
<button hx-post="/api/jobs/clear-completed" hx-swap="none"
class="btn border border-slate-800 text-slate-400 hover:text-slate-200">
Clear Completed
</button>
<button hx-post="/api/scan" hx-swap="none" class="btn btn-primary">
Scan Now
</button>
@@ -18,14 +36,44 @@
{% include "partials/stats.html" %}
</div>
<div class="glass-card" style="padding: 0; overflow: hidden;">
<div class="glass-card mb-8" style="padding: 0; overflow: hidden;">
<div class="px-8 py-5 border-b border-slate-800 flex justify-between items-center">
<h2 class="text-xl">Engine Activity</h2>
<span class="text-xs font-medium text-slate-500 bg-slate-800 px-3 py-1 rounded-full">v0.1.0</span>
</div>
<div class="overflow-x-auto" id="jobs-table-container" hx-get="/api/jobs/table" hx-trigger="every 2s">
<div class="overflow-x-auto" id="jobs-table-container" hx-ext="sse" sse-connect="/api/events"
sse-swap="status, progress" hx-get="/api/jobs/table" hx-trigger="every 5s, sse:status">
{% include "partials/jobs_table.html" %}
</div>
</div>
<div class="glass-card overflow-hidden">
<div class="px-8 py-5 border-b border-slate-800">
<h2 class="text-xl">System Logs</h2>
</div>
<div id="console" class="p-6 font-mono text-xs overflow-y-auto max-h-64 space-y-1 bg-black/30" hx-ext="sse"
sse-connect="/api/events" sse-swap="log" hx-swap="beforeend">
</div>
</div>
</div>
<script>
document.body.addEventListener('htmx:sseMessage', function (e) {
if (e.detail.type === 'log') {
const console = document.getElementById('console');
const line = document.createElement('div');
line.className = 'text-slate-400 border-l-2 border-slate-800 pl-3 py-0.5';
line.textContent = `[${new Date().toLocaleTimeString()}] ${e.detail.data}`;
console.appendChild(line);
console.scrollTop = console.scrollHeight;
if (console.children.length > 100) console.removeChild(console.firstChild);
} else if (e.detail.type === 'progress') {
const data = JSON.parse(e.detail.data);
const bar = document.querySelector(`#job-progress-${data.job_id}`);
const text = document.querySelector(`#job-progress-text-${data.job_id}`);
if (bar) bar.style.width = data.percentage + '%';
if (text) text.textContent = data.percentage + '%';
}
});
</script>
{% endblock %}

View File

@@ -0,0 +1,13 @@
<div id="engine-control">
{% if engine_paused %}
<button hx-post="/api/engine/resume" hx-target="#engine-control" hx-swap="outerHTML"
class="btn border border-emerald-500/50 text-emerald-400 hover:bg-emerald-500/10">
▶ Resume Engine
</button>
{% else %}
<button hx-post="/api/engine/pause" hx-target="#engine-control" hx-swap="outerHTML"
class="btn border border-amber-500/50 text-amber-400 hover:bg-amber-500/10">
⏸ Pause Engine
</button>
{% endif %}
</div>

View File

@@ -20,10 +20,10 @@
{% if job.is_active() %}
<div class="mt-3 w-full bg-slate-800 rounded-full h-1.5 overflow-hidden">
<div class="h-full transition-all duration-500 ease-out"
<div id="job-progress-{{ job.id }}" class="h-full transition-all duration-500 ease-out"
style="width: {{ job.progress }}%; background-color: rgb(var(--brand-primary))"></div>
</div>
<div class="text-[10px] mt-1 font-mono uppercase tracking-wider"
<div id="job-progress-text-{{ job.id }}" class="text-[10px] mt-1 font-mono uppercase tracking-wider"
style="color: rgb(var(--brand-primary))">
{{ job.progress_fixed() }}%
</div>
@@ -39,6 +39,11 @@
class="px-2.5 py-1 rounded-full text-[10px] font-bold border uppercase tracking-tight {{ job.status_class() }}">
{{ job.status }}
</span>
{% if let Some(vmaf) = job.vmaf_score %}
<span class="ml-2 px-2 py-0.5 rounded bg-slate-800 text-slate-400 text-[9px] font-mono">
VMAF: {{ job.vmaf_fixed() }}
</span>
{% endif %}
</td>
<td class="px-6 py-4">
<div class="flex gap-2">