How to Retry and Recover Failed Audio Transcription Jobs

Transcription failures kill production systems. Network timeouts, API rate limits, corrupted audio—failures happen constantly at scale. A system processing 1,000 daily transcriptions faces 20-50 failures from various causes. Without proper retry logic, you lose data and frustrate users. This guide demonstrates how to build resilient transcription pipelines that recover gracefully from any failure.

Understanding Failure Modes

Transcription jobs fail for distinct reasons requiring different recovery strategies. Network errors need immediate retries with exponential backoff. Rate limit errors require delayed retries. Audio format errors need conversion before retry. Recognizing failure types lets you respond appropriately rather than blindly retrying forever.

API timeouts typically resolve within seconds. Rate limits lift after minutes. Audio corruption never resolves—you need different audio or format conversion. Your retry system must distinguish between transient failures (retry immediately) and permanent failures (alert humans).

Implementing Exponential Backoff

Start with intelligent retry timing. Exponential backoff prevents overwhelming struggling services while maximizing success rates.

import time

import random

from typing import Callable, Any

class ExponentialBackoff:

    def __init__(self, base_delay=1, max_delay=300, max_attempts=5):

        self.base_delay = base_delay

        self.max_delay = max_delay

        self.max_attempts = max_attempts

    def execute(self, func: Callable, *args, **kwargs) -> Any:

        “””Execute function with exponential backoff”””

        attempt = 0

        while attempt < self.max_attempts:

            try:

                result = func(*args, **kwargs)

                return result

            except Exception as e:

                attempt += 1

                if attempt >= self.max_attempts:

                    raise Exception(f”Max retry attempts reached: {e}”)

                # Calculate delay with jitter

                delay = min(

                    self.base_delay * (2 ** attempt),

                    self.max_delay

                )

                jitter = random.uniform(0, delay * 0.1)

                total_delay = delay + jitter

                print(f”Attempt {attempt} failed: {e}”)

                print(f”Retrying in {total_delay:.2f}s…”)

                time.sleep(total_delay)

        raise Exception(“Retry logic failed unexpectedly”)

# Usage

backoff = ExponentialBackoff(base_delay=2, max_delay=120, max_attempts=5)

def transcribe_audio(file_path):

    # API call that might fail

    return api.transcribe(file_path)

result = backoff.execute(transcribe_audio, “meeting.wav”)

This implementation doubles the delay after each failure, capping at five minutes. The jitter prevents thundering herd problems when multiple jobs retry simultaneously. In practice, this resolves 90% of transient network failures without manual intervention.

Job Queue System

Build a persistent queue to track transcription jobs and their states. This prevents lost work when your application crashes.

from sqlalchemy import Column, String, Integer, DateTime, JSON, Enum

from datetime import datetime

import enum

class JobStatus(enum.Enum):

    PENDING = “pending”

    PROCESSING = “processing”

    COMPLETED = “completed”

    FAILED = “failed”

    RETRYING = “retrying”

class TranscriptionJob(Base):

    __tablename__ = ‘transcription_jobs’

    id = Column(String(50), primary_key=True)

    audio_path = Column(String(500))

    status = Column(Enum(JobStatus))

    attempts = Column(Integer, default=0)

    max_attempts = Column(Integer, default=5)

    error_message = Column(String(1000))

    result_path = Column(String(500))

    created_at = Column(DateTime, default=datetime.utcnow)

    updated_at = Column(DateTime, onupdate=datetime.utcnow)

    metadata = Column(JSON)

class JobQueue:

    def __init__(self, session):

        self.session = session

    def enqueue(self, audio_path, metadata=None):

        “””Add new transcription job”””

        job = TranscriptionJob(

            id=self._generate_id(),

            audio_path=audio_path,

            status=JobStatus.PENDING,

            metadata=metadata or {}

        )

        self.session.add(job)

        self.session.commit()

        return job.id

    def get_next_job(self):

        “””Get next pending job”””

        job = self.session.query(TranscriptionJob).filter(

            TranscriptionJob.status.in_([JobStatus.PENDING, JobStatus.RETRYING])

        ).order_by(TranscriptionJob.created_at).first()

        if job:

            job.status = JobStatus.PROCESSING

            self.session.commit()

        return job

    def mark_failed(self, job_id, error_message):

        “””Mark job as failed and decide on retry”””

        job = self.session.query(TranscriptionJob).filter_by(id=job_id).first()

        job.attempts += 1

        job.error_message = error_message

        job.updated_at = datetime.utcnow()

        if job.attempts < job.max_attempts:

            job.status = JobStatus.RETRYING

            print(f”Job {job_id} will retry (attempt {job.attempts}/{job.max_attempts})”)

        else:

            job.status = JobStatus.FAILED

            print(f”Job {job_id} permanently failed after {job.attempts} attempts”)

        self.session.commit()

    def mark_completed(self, job_id, result_path):

        “””Mark job as successfully completed”””

        job = self.session.query(TranscriptionJob).filter_by(id=job_id).first()

        job.status = JobStatus.COMPLETED

        job.result_path = result_path

        job.updated_at = datetime.utcnow()

        self.session.commit()

    def _generate_id(self):

        import uuid

        return str(uuid.uuid4())

This queue persists job state to the database, surviving application restarts. Jobs transition through states (pending → processing → completed/failed), with automatic retry tracking built in.

Error Classification

Different errors require different responses. Classify failures to apply appropriate recovery strategies.

class TranscriptionError(Exception):

    “””Base exception for transcription errors”””

    pass

class TransientError(TranscriptionError):

    “””Temporary error – safe to retry”””

    pass

class RateLimitError(TransientError):

    “””Rate limit hit – retry with delay”””

    pass

class AudioFormatError(TranscriptionError):

    “””Audio format issue – needs conversion”””

    pass

class PermanentError(TranscriptionError):

    “””Permanent failure – cannot retry”””

    pass

class ErrorClassifier:

    @staticmethod

    def classify(exception):

        “””Classify exception and determine retry strategy”””

        error_message = str(exception).lower()

        # Network/timeout errors

        if any(keyword in error_message for keyword in [‘timeout’, ‘connection’, ‘network’]):

            return TransientError(“Network error – will retry”)

        # Rate limiting

        if any(keyword in error_message for keyword in [‘rate limit’, ‘too many requests’, ‘429’]):

            return RateLimitError(“Rate limit hit – will retry with delay”)

        # Audio format issues

        if any(keyword in error_message for keyword in [‘format’, ‘codec’, ‘invalid audio’]):

            return AudioFormatError(“Audio format error – needs conversion”)

        # Authentication/authorization

        if any(keyword in error_message for keyword in [‘auth’, ‘unauthorized’, ‘401’, ‘403’]):

            return PermanentError(“Authentication error – check API key”)

        # Default to transient for safety

        return TransientError(“Unknown error – will retry”)

This classifier examines error messages to determine the failure type. Network errors retry immediately, rate limits wait longer, format errors trigger conversion, and auth errors alert operators immediately.

Resilient Transcription Worker

Combine all components into a robust worker that handles failures gracefully.

import assemblyai as aai

from pydub import AudioSegment

class ResilientTranscriptionWorker:

    def __init__(self, api_key, queue):

        aai.settings.api_key = api_key

        self.queue = queue

        self.backoff = ExponentialBackoff(base_delay=2, max_delay=300)

        self.classifier = ErrorClassifier()

    def process_jobs(self):

        “””Main processing loop”””

        while True:

            job = self.queue.get_next_job()

            if not job:

                time.sleep(5)  # Wait for new jobs

                continue

            try:

                self._process_job(job)

            except Exception as e:

                print(f”Unexpected error processing job {job.id}: {e}”)

                self.queue.mark_failed(job.id, str(e))

    def _process_job(self, job):

        “””Process single transcription job with retry logic”””

        try:

            # Calculate retry delay based on attempt count

            if job.attempts > 0:

                delay = self._calculate_retry_delay(job)

                print(f”Waiting {delay}s before retry…”)

                time.sleep(delay)

            # Attempt transcription

            result = self._transcribe_with_recovery(job)

            # Save result

            result_path = self._save_result(job.id, result)

            self.queue.mark_completed(job.id, result_path)

            print(f”Job {job.id} completed successfully”)

        except PermanentError as e:

            # Don’t retry permanent errors

            self.queue.mark_failed(job.id, f”Permanent error: {e}”)

            self._alert_operators(job, e)

        except Exception as e:

            # Classify and handle appropriately

            classified = self.classifier.classify(e)

            self.queue.mark_failed(job.id, str(classified))

    def _transcribe_with_recovery(self, job):

        “””Transcribe with automatic recovery attempts”””

        try:

            # Try standard transcription

            return self._transcribe(job.audio_path)

        except AudioFormatError:

            # Convert audio format and retry

            print(f”Converting audio format for job {job.id}”)

            converted_path = self._convert_audio_format(job.audio_path)

            return self._transcribe(converted_path)

        except RateLimitError:

            # Wait longer for rate limits

            print(“Rate limit hit, waiting 60s…”)

            time.sleep(60)

            return self._transcribe(job.audio_path)

    def _transcribe(self, audio_path):

        “””Actual transcription API call”””

        config = aai.TranscriptionConfig(

            speaker_labels=True,

            language_code=”en_us”

        )

        transcriber = aai.Transcriber()

        transcript = transcriber.transcribe(audio_path, config=config)

        if transcript.status == aai.TranscriptStatus.error:

            raise Exception(transcript.error)

        return transcript

    def _convert_audio_format(self, audio_path):

        “””Convert audio to supported format”””

        audio = AudioSegment.from_file(audio_path)

        # Convert to 16kHz mono WAV

        audio = audio.set_frame_rate(16000)

        audio = audio.set_channels(1)

        converted_path = audio_path.replace(‘.’, ‘_converted.’)

        audio.export(converted_path, format=’wav’)

        return converted_path

    def _calculate_retry_delay(self, job):

        “””Calculate appropriate retry delay”””

        # Exponential backoff: 2^attempt seconds

        base_delay = 2 ** job.attempts

        max_delay = 300  # Cap at 5 minutes

        return min(base_delay, max_delay)

    def _save_result(self, job_id, transcript):

        “””Save transcription result”””

        result_path = f”results/{job_id}_transcript.txt”

        with open(result_path, ‘w’, encoding=’utf-8′) as f:

            f.write(transcript.text)

        return result_path

    def _alert_operators(self, job, error):

        “””Send alert for permanent failures”””

        print(f”ALERT: Job {job.id} permanently failed: {error}”)

        # Implement actual alerting (email, Slack, PagerDuty, etc.)

This worker automatically handles transient failures, converts audio formats when needed, respects rate limits, and alerts operators for permanent failures. It processes jobs continuously, maintaining state across restarts through the database queue.

Monitoring and Alerting

Track failure rates to detect systemic issues before they cascade.

class JobMonitor:

    def __init__(self, session):

        self.session = session

    def get_failure_rate(self, hours=24):

        “””Calculate recent failure rate”””

        cutoff = datetime.utcnow() – timedelta(hours=hours)

        total = self.session.query(TranscriptionJob).filter(

            TranscriptionJob.created_at >= cutoff

        ).count()

        failed = self.session.query(TranscriptionJob).filter(

            TranscriptionJob.created_at >= cutoff,

            TranscriptionJob.status == JobStatus.FAILED

        ).count()

        return (failed / total * 100) if total > 0 else 0

    def check_health(self):

        “””Check system health”””

        failure_rate = self.get_failure_rate(hours=1)

        if failure_rate > 10:

            self._alert(f”High failure rate: {failure_rate:.1f}%”)

        stuck_jobs = self.session.query(TranscriptionJob).filter(

            TranscriptionJob.status == JobStatus.PROCESSING,

            TranscriptionJob.updated_at < datetime.utcnow() – timedelta(hours=1)

        ).count()

        if stuck_jobs > 0:

            self._alert(f”{stuck_jobs} jobs stuck in processing state”)

Monitor failure rates and stuck jobs to catch problems early. High failure rates indicate API issues or bad audio batches. Stuck jobs suggest worker crashes or deadlocks.

Testing Recovery Logic

Validate your retry system with chaos testing that simulates real failures.

class FailureSimulator:

    def __init__(self, worker):

        self.worker = worker

        self.original_transcribe = worker._transcribe

    def inject_transient_failures(self, failure_rate=0.3):

        “””Simulate random transient failures”””

        def failing_transcribe(audio_path):

            if random.random() < failure_rate:

                raise TransientError(“Simulated network timeout”)

            return self.original_transcribe(audio_path)

        self.worker._transcribe = failing_transcribe

    def inject_rate_limit(self, duration=30):

        “””Simulate rate limiting”””

        def rate_limited_transcribe(audio_path):

            raise RateLimitError(“Simulated rate limit”)

        self.worker._transcribe = rate_limited_transcribe

Run these simulations in staging to verify your system recovers correctly. A well-built retry system should handle 30% transient failures without user impact.

Your resilient transcription pipeline now automatically recovers from network failures, respects rate limits, converts problematic audio formats, and alerts operators only for genuine issues requiring human intervention.

Conclusion

Reliable transcription systems require exponential backoff for transient failures, persistent job queues surviving restarts, error classification determining retry strategies, and automatic recovery from format issues—building fault tolerance directly into the processing pipeline rather than hoping failures won’t happen.

If you want enterprise reliability without building complex retry logic, consider Meetstream.ai API, which includes built-in retry mechanisms, automatic error recovery, and 99.9% uptime guarantees.

Leave a Reply

Your email address will not be published. Required fields are marked *