style: format executor, pipeline, and notifications

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-13 16:20:58 -04:00
parent f511f1c084
commit c26c2d4420
3 changed files with 173 additions and 71 deletions

View File

@@ -44,11 +44,7 @@ struct JobExecutionObserver {
}
impl JobExecutionObserver {
fn new(
job_id: i64,
db: Arc<Db>,
event_channels: Arc<EventChannels>,
) -> Self {
fn new(job_id: i64, db: Arc<Db>, event_channels: Arc<EventChannels>) -> Self {
Self {
job_id,
db,

View File

@@ -605,7 +605,11 @@ impl Pipeline {
if let Err(e) = self.db.add_decision(job_id, "skip", &reason).await {
tracing::warn!(job_id, "Failed to record decision: {e}");
}
if let Err(e) = self.db.upsert_job_failure_explanation(job_id, &failure_explanation).await {
if let Err(e) = self
.db
.upsert_job_failure_explanation(job_id, &failure_explanation)
.await
{
tracing::warn!(job_id, "Failed to record failure explanation: {e}");
}
self.update_job_state(job_id, crate::db::JobState::Failed)
@@ -644,7 +648,11 @@ impl Pipeline {
if let Err(e) = self.db.add_decision(job_id, "skip", &reason).await {
tracing::warn!(job_id, "Failed to record decision: {e}");
}
if let Err(e) = self.db.upsert_job_failure_explanation(job_id, &failure_explanation).await {
if let Err(e) = self
.db
.upsert_job_failure_explanation(job_id, &failure_explanation)
.await
{
tracing::warn!(job_id, "Failed to record failure explanation: {e}");
}
self.update_job_state(job_id, crate::db::JobState::Failed)
@@ -777,10 +785,17 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Err(JobFailure::MediaCorrupt);
@@ -832,10 +847,20 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Err(JobFailure::PlannerBug);
@@ -851,10 +876,20 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Err(JobFailure::PlannerBug);
@@ -871,10 +906,20 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Err(JobFailure::Transient);
@@ -892,10 +937,20 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Err(JobFailure::PlannerBug);
@@ -960,7 +1015,10 @@ impl Pipeline {
if let Err(e) = self.db.add_decision(job.id, "skip", &reason).await {
tracing::warn!(job_id = job.id, "Failed to record decision: {e}");
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Skipped).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Skipped)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
return Ok(());
@@ -1034,23 +1092,37 @@ impl Pipeline {
if let Err(e) = self.db.add_log("error", Some(job.id), summary).await {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(job_id = job.id, "Failed to update job state: {e}");
}
if let Err(e) = self.db.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "failed".to_string(),
failure_code: Some("fallback_blocked".to_string()),
failure_summary: Some(summary.to_string()),
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
}).await {
if let Err(e) = self
.db
.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "failed".to_string(),
failure_code: Some("fallback_blocked".to_string()),
failure_summary: Some(summary.to_string()),
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
})
.await
{
tracing::warn!(job_id = job.id, "Failed to record encode attempt: {e}");
}
return Err(JobFailure::EncoderUnavailable);
@@ -1134,20 +1206,30 @@ impl Pipeline {
.await;
if let crate::error::AlchemistError::Cancelled = e {
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Cancelled).await {
tracing::warn!(job_id = job.id, "Failed to update job state to cancelled: {e}");
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Cancelled)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to update job state to cancelled: {e}"
);
}
if let Err(e) = self.db.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "cancelled".to_string(),
failure_code: None,
failure_summary: None,
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
}).await {
if let Err(e) = self
.db
.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "cancelled".to_string(),
failure_code: None,
failure_summary: None,
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
})
.await
{
tracing::warn!(job_id = job.id, "Failed to record encode attempt: {e}");
}
} else {
@@ -1157,23 +1239,40 @@ impl Pipeline {
tracing::warn!(job_id = job.id, "Failed to record log: {e}");
}
let explanation = crate::explanations::failure_from_summary(&msg);
if let Err(e) = self.db.upsert_job_failure_explanation(job.id, &explanation).await {
tracing::warn!(job_id = job.id, "Failed to record failure explanation: {e}");
if let Err(e) = self
.db
.upsert_job_failure_explanation(job.id, &explanation)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to record failure explanation: {e}"
);
}
if let Err(e) = self.update_job_state(job.id, crate::db::JobState::Failed).await {
tracing::warn!(job_id = job.id, "Failed to update job state to failed: {e}");
if let Err(e) = self
.update_job_state(job.id, crate::db::JobState::Failed)
.await
{
tracing::warn!(
job_id = job.id,
"Failed to update job state to failed: {e}"
);
}
if let Err(e) = self.db.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "failed".to_string(),
failure_code: Some(explanation.code.clone()),
failure_summary: Some(msg),
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
}).await {
if let Err(e) = self
.db
.insert_encode_attempt(crate::db::EncodeAttemptInput {
job_id: job.id,
attempt_number: current_attempt_number,
started_at: Some(encode_started_at.to_rfc3339()),
outcome: "failed".to_string(),
failure_code: Some(explanation.code.clone()),
failure_summary: Some(msg),
input_size_bytes: Some(metadata.size_bytes as i64),
output_size_bytes: None,
encode_time_seconds: Some(start_time.elapsed().as_secs_f64()),
})
.await
{
tracing::warn!(job_id = job.id, "Failed to record encode attempt: {e}");
}
}
@@ -1536,7 +1635,11 @@ impl Pipeline {
tracing::warn!(job_id, "Failed to record log: {e}");
}
let failure_explanation = crate::explanations::failure_from_summary(&message);
if let Err(e) = self.db.upsert_job_failure_explanation(job_id, &failure_explanation).await {
if let Err(e) = self
.db
.upsert_job_failure_explanation(job_id, &failure_explanation)
.await
{
tracing::warn!(job_id, "Failed to record failure explanation: {e}");
}
if let crate::error::AlchemistError::QualityCheckFailed(reason) = err {

View File

@@ -128,10 +128,7 @@ impl NotificationManager {
/// Build an HTTP client with SSRF protections: DNS resolution timeout,
/// private-IP blocking (unless allow_local_notifications), no redirects,
/// and a 10-second request timeout.
async fn build_safe_client(
&self,
target: &NotificationTarget,
) -> NotificationResult<Client> {
async fn build_safe_client(&self, target: &NotificationTarget) -> NotificationResult<Client> {
if let Some(endpoint_url) = endpoint_url_for_target(target)? {
let url = Url::parse(&endpoint_url)?;
let host = url
@@ -209,12 +206,18 @@ impl NotificationManager {
loop {
match system_rx.recv().await {
Ok(SystemEvent::ScanCompleted) => {
if let Err(e) = manager_clone.handle_event(NotifiableEvent::ScanCompleted).await {
if let Err(e) = manager_clone
.handle_event(NotifiableEvent::ScanCompleted)
.await
{
error!("Notification error: {}", e);
}
}
Ok(SystemEvent::EngineIdle) => {
if let Err(e) = manager_clone.handle_event(NotifiableEvent::EngineIdle).await {
if let Err(e) = manager_clone
.handle_event(NotifiableEvent::EngineIdle)
.await
{
error!("Notification error: {}", e);
}
}