Transcription failures kill production systems. Network timeouts, API rate limits, corrupted audio—failures happen constantly at scale. A system processing 1,000 daily transcriptions faces 20-50 failures from various causes. Without proper retry logic, you lose data and frustrate users. This guide demonstrates how to build resilient transcription pipelines that recover gracefully from any failure.
Understanding Failure Modes
Transcription jobs fail for distinct reasons requiring different recovery strategies. Network errors need immediate retries with exponential backoff. Rate limit errors require delayed retries. Audio format errors need conversion before retry. Recognizing failure types lets you respond appropriately rather than blindly retrying forever.
API timeouts typically resolve within seconds. Rate limits lift after minutes. Audio corruption never resolves—you need different audio or format conversion. Your retry system must distinguish between transient failures (retry immediately) and permanent failures (alert humans).
Implementing Exponential Backoff
Start with intelligent retry timing. Exponential backoff prevents overwhelming struggling services while maximizing success rates.
import time
import random
from typing import Callable, Any
class ExponentialBackoff:
def __init__(self, base_delay=1, max_delay=300, max_attempts=5):
self.base_delay = base_delay
self.max_delay = max_delay
self.max_attempts = max_attempts
def execute(self, func: Callable, *args, **kwargs) -> Any:
“””Execute function with exponential backoff”””
attempt = 0
while attempt < self.max_attempts:
try:
result = func(*args, **kwargs)
return result
except Exception as e:
attempt += 1
if attempt >= self.max_attempts:
raise Exception(f”Max retry attempts reached: {e}”)
# Calculate delay with jitter
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f”Attempt {attempt} failed: {e}”)
print(f”Retrying in {total_delay:.2f}s…”)
time.sleep(total_delay)
raise Exception(“Retry logic failed unexpectedly”)
# Usage
backoff = ExponentialBackoff(base_delay=2, max_delay=120, max_attempts=5)
def transcribe_audio(file_path):
# API call that might fail
return api.transcribe(file_path)
result = backoff.execute(transcribe_audio, “meeting.wav”)
This implementation doubles the delay after each failure, capping at five minutes. The jitter prevents thundering herd problems when multiple jobs retry simultaneously. In practice, this resolves 90% of transient network failures without manual intervention.
Job Queue System
Build a persistent queue to track transcription jobs and their states. This prevents lost work when your application crashes.
from sqlalchemy import Column, String, Integer, DateTime, JSON, Enum
from datetime import datetime
import enum
class JobStatus(enum.Enum):
PENDING = “pending”
PROCESSING = “processing”
COMPLETED = “completed”
FAILED = “failed”
RETRYING = “retrying”
class TranscriptionJob(Base):
__tablename__ = ‘transcription_jobs’
id = Column(String(50), primary_key=True)
audio_path = Column(String(500))
status = Column(Enum(JobStatus))
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=5)
error_message = Column(String(1000))
result_path = Column(String(500))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
metadata = Column(JSON)
class JobQueue:
def __init__(self, session):
self.session = session
def enqueue(self, audio_path, metadata=None):
“””Add new transcription job”””
job = TranscriptionJob(
id=self._generate_id(),
audio_path=audio_path,
status=JobStatus.PENDING,
metadata=metadata or {}
)
self.session.add(job)
self.session.commit()
return job.id
def get_next_job(self):
“””Get next pending job”””
job = self.session.query(TranscriptionJob).filter(
TranscriptionJob.status.in_([JobStatus.PENDING, JobStatus.RETRYING])
).order_by(TranscriptionJob.created_at).first()
if job:
job.status = JobStatus.PROCESSING
self.session.commit()
return job
def mark_failed(self, job_id, error_message):
“””Mark job as failed and decide on retry”””
job = self.session.query(TranscriptionJob).filter_by(id=job_id).first()
job.attempts += 1
job.error_message = error_message
job.updated_at = datetime.utcnow()
if job.attempts < job.max_attempts:
job.status = JobStatus.RETRYING
print(f”Job {job_id} will retry (attempt {job.attempts}/{job.max_attempts})”)
else:
job.status = JobStatus.FAILED
print(f”Job {job_id} permanently failed after {job.attempts} attempts”)
self.session.commit()
def mark_completed(self, job_id, result_path):
“””Mark job as successfully completed”””
job = self.session.query(TranscriptionJob).filter_by(id=job_id).first()
job.status = JobStatus.COMPLETED
job.result_path = result_path
job.updated_at = datetime.utcnow()
self.session.commit()
def _generate_id(self):
import uuid
return str(uuid.uuid4())
This queue persists job state to the database, surviving application restarts. Jobs transition through states (pending → processing → completed/failed), with automatic retry tracking built in.
Error Classification
Different errors require different responses. Classify failures to apply appropriate recovery strategies.
class TranscriptionError(Exception):
“””Base exception for transcription errors”””
pass
class TransientError(TranscriptionError):
“””Temporary error – safe to retry”””
pass
class RateLimitError(TransientError):
“””Rate limit hit – retry with delay”””
pass
class AudioFormatError(TranscriptionError):
“””Audio format issue – needs conversion”””
pass
class PermanentError(TranscriptionError):
“””Permanent failure – cannot retry”””
pass
class ErrorClassifier:
@staticmethod
def classify(exception):
“””Classify exception and determine retry strategy”””
error_message = str(exception).lower()
# Network/timeout errors
if any(keyword in error_message for keyword in [‘timeout’, ‘connection’, ‘network’]):
return TransientError(“Network error – will retry”)
# Rate limiting
if any(keyword in error_message for keyword in [‘rate limit’, ‘too many requests’, ‘429’]):
return RateLimitError(“Rate limit hit – will retry with delay”)
# Audio format issues
if any(keyword in error_message for keyword in [‘format’, ‘codec’, ‘invalid audio’]):
return AudioFormatError(“Audio format error – needs conversion”)
# Authentication/authorization
if any(keyword in error_message for keyword in [‘auth’, ‘unauthorized’, ‘401’, ‘403’]):
return PermanentError(“Authentication error – check API key”)
# Default to transient for safety
return TransientError(“Unknown error – will retry”)
This classifier examines error messages to determine the failure type. Network errors retry immediately, rate limits wait longer, format errors trigger conversion, and auth errors alert operators immediately.
Resilient Transcription Worker
Combine all components into a robust worker that handles failures gracefully.
import assemblyai as aai
from pydub import AudioSegment
class ResilientTranscriptionWorker:
def __init__(self, api_key, queue):
aai.settings.api_key = api_key
self.queue = queue
self.backoff = ExponentialBackoff(base_delay=2, max_delay=300)
self.classifier = ErrorClassifier()
def process_jobs(self):
“””Main processing loop”””
while True:
job = self.queue.get_next_job()
if not job:
time.sleep(5) # Wait for new jobs
continue
try:
self._process_job(job)
except Exception as e:
print(f”Unexpected error processing job {job.id}: {e}”)
self.queue.mark_failed(job.id, str(e))
def _process_job(self, job):
“””Process single transcription job with retry logic”””
try:
# Calculate retry delay based on attempt count
if job.attempts > 0:
delay = self._calculate_retry_delay(job)
print(f”Waiting {delay}s before retry…”)
time.sleep(delay)
# Attempt transcription
result = self._transcribe_with_recovery(job)
# Save result
result_path = self._save_result(job.id, result)
self.queue.mark_completed(job.id, result_path)
print(f”Job {job.id} completed successfully”)
except PermanentError as e:
# Don’t retry permanent errors
self.queue.mark_failed(job.id, f”Permanent error: {e}”)
self._alert_operators(job, e)
except Exception as e:
# Classify and handle appropriately
classified = self.classifier.classify(e)
self.queue.mark_failed(job.id, str(classified))
def _transcribe_with_recovery(self, job):
“””Transcribe with automatic recovery attempts”””
try:
# Try standard transcription
return self._transcribe(job.audio_path)
except AudioFormatError:
# Convert audio format and retry
print(f”Converting audio format for job {job.id}”)
converted_path = self._convert_audio_format(job.audio_path)
return self._transcribe(converted_path)
except RateLimitError:
# Wait longer for rate limits
print(“Rate limit hit, waiting 60s…”)
time.sleep(60)
return self._transcribe(job.audio_path)
def _transcribe(self, audio_path):
“””Actual transcription API call”””
config = aai.TranscriptionConfig(
speaker_labels=True,
language_code=”en_us”
)
transcriber = aai.Transcriber()
transcript = transcriber.transcribe(audio_path, config=config)
if transcript.status == aai.TranscriptStatus.error:
raise Exception(transcript.error)
return transcript
def _convert_audio_format(self, audio_path):
“””Convert audio to supported format”””
audio = AudioSegment.from_file(audio_path)
# Convert to 16kHz mono WAV
audio = audio.set_frame_rate(16000)
audio = audio.set_channels(1)
converted_path = audio_path.replace(‘.’, ‘_converted.’)
audio.export(converted_path, format=’wav’)
return converted_path
def _calculate_retry_delay(self, job):
“””Calculate appropriate retry delay”””
# Exponential backoff: 2^attempt seconds
base_delay = 2 ** job.attempts
max_delay = 300 # Cap at 5 minutes
return min(base_delay, max_delay)
def _save_result(self, job_id, transcript):
“””Save transcription result”””
result_path = f”results/{job_id}_transcript.txt”
with open(result_path, ‘w’, encoding=’utf-8′) as f:
f.write(transcript.text)
return result_path
def _alert_operators(self, job, error):
“””Send alert for permanent failures”””
print(f”ALERT: Job {job.id} permanently failed: {error}”)
# Implement actual alerting (email, Slack, PagerDuty, etc.)
This worker automatically handles transient failures, converts audio formats when needed, respects rate limits, and alerts operators for permanent failures. It processes jobs continuously, maintaining state across restarts through the database queue.
Monitoring and Alerting
Track failure rates to detect systemic issues before they cascade.
class JobMonitor:
def __init__(self, session):
self.session = session
def get_failure_rate(self, hours=24):
“””Calculate recent failure rate”””
cutoff = datetime.utcnow() – timedelta(hours=hours)
total = self.session.query(TranscriptionJob).filter(
TranscriptionJob.created_at >= cutoff
).count()
failed = self.session.query(TranscriptionJob).filter(
TranscriptionJob.created_at >= cutoff,
TranscriptionJob.status == JobStatus.FAILED
).count()
return (failed / total * 100) if total > 0 else 0
def check_health(self):
“””Check system health”””
failure_rate = self.get_failure_rate(hours=1)
if failure_rate > 10:
self._alert(f”High failure rate: {failure_rate:.1f}%”)
stuck_jobs = self.session.query(TranscriptionJob).filter(
TranscriptionJob.status == JobStatus.PROCESSING,
TranscriptionJob.updated_at < datetime.utcnow() – timedelta(hours=1)
).count()
if stuck_jobs > 0:
self._alert(f”{stuck_jobs} jobs stuck in processing state”)
Monitor failure rates and stuck jobs to catch problems early. High failure rates indicate API issues or bad audio batches. Stuck jobs suggest worker crashes or deadlocks.
Testing Recovery Logic
Validate your retry system with chaos testing that simulates real failures.
class FailureSimulator:
def __init__(self, worker):
self.worker = worker
self.original_transcribe = worker._transcribe
def inject_transient_failures(self, failure_rate=0.3):
“””Simulate random transient failures”””
def failing_transcribe(audio_path):
if random.random() < failure_rate:
raise TransientError(“Simulated network timeout”)
return self.original_transcribe(audio_path)
self.worker._transcribe = failing_transcribe
def inject_rate_limit(self, duration=30):
“””Simulate rate limiting”””
def rate_limited_transcribe(audio_path):
raise RateLimitError(“Simulated rate limit”)
self.worker._transcribe = rate_limited_transcribe
Run these simulations in staging to verify your system recovers correctly. A well-built retry system should handle 30% transient failures without user impact.
Your resilient transcription pipeline now automatically recovers from network failures, respects rate limits, converts problematic audio formats, and alerts operators only for genuine issues requiring human intervention.
Conclusion
Reliable transcription systems require exponential backoff for transient failures, persistent job queues surviving restarts, error classification determining retry strategies, and automatic recovery from format issues—building fault tolerance directly into the processing pipeline rather than hoping failures won’t happen.
If you want enterprise reliability without building complex retry logic, consider Meetstream.ai API, which includes built-in retry mechanisms, automatic error recovery, and 99.9% uptime guarantees.