How to Process Long Meeting Audio Without Timeouts

Long meetings create massive audio files that crash transcription pipelines. API timeouts, memory limits, and processing bottlenecks turn three-hour board meetings into deployment nightmares. 

Processing long audio requires chunking strategies, parallel processing, and robust retry mechanisms. 

This guide demonstrates how to build production-grade pipelines that handle meetings of any length reliably.

Understanding Processing Constraints

Most transcription APIs impose hard limits: 5GB file size limits, 5-hour duration caps, 30-minute API timeouts. 

Your pipeline must split audio into manageable chunks, process them independently, and reassemble results while maintaining temporal continuity and speaker consistency.

Intelligent Audio Chunking

Split audio at natural boundaries to preserve context:

import wave
import numpy as np
from pydub import AudioSegment
from pydub.silence import detect_silence
class IntelligentChunker:
    def __init__(self, chunk_duration=600, overlap=5):
        """
        chunk_duration: target chunk length in seconds (default 10 minutes)
        overlap: overlap between chunks in seconds for context
        """
        self.chunk_duration = chunk_duration
        self.overlap = overlap
    def split_at_silence(self, audio_file, output_dir="chunks"):
        """Split audio at silence points near chunk boundaries"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        # Load audio
        audio = AudioSegment.from_file(audio_file)
        duration_ms = len(audio)
        chunk_duration_ms = self.chunk_duration * 1000
        overlap_ms = self.overlap * 1000
        chunks = []
        chunk_start = 0
        chunk_num = 0
        while chunk_start < duration_ms:
            # Calculate target end point
            target_end = min(chunk_start + chunk_duration_ms, duration_ms)
            # Find silence near target end point
            search_window_start = max(target_end - 10000, chunk_start)
            search_window = audio[search_window_start:target_end + 10000]
            # Detect silence in search window
            silence_ranges = detect_silence(
                search_window,
                min_silence_len=500,
                silence_thresh=-40
            )
            # Find best split point
            if silence_ranges:
                # Use middle of longest silence period
                best_silence = max(silence_ranges, key=lambda x: x[1] - x[0])
                split_point = search_window_start + (best_silence[0] + best_silence[1]) // 2
            else:
                # No silence found, split at target
                split_point = target_end
            # Extract chunk with overlap
            chunk_end = min(split_point + overlap_ms, duration_ms)
            chunk = audio[chunk_start:chunk_end]
            # Save chunk
            chunk_file = f"{output_dir}/chunk_{chunk_num:04d}.wav"
            chunk.export(chunk_file, format="wav")
            chunks.append({
                'file': chunk_file,
                'start_time': chunk_start / 1000,
                'end_time': chunk_end / 1000,
                'duration': (chunk_end - chunk_start) / 1000,
                'chunk_num': chunk_num
            })
            print(f"Created chunk {chunk_num}: {chunk_start/1000:.2f}s - {chunk_end/1000:.2f}s")
            # Move to next chunk (accounting for overlap)
            chunk_start = split_point
            chunk_num += 1
        return chunks
    def validate_chunks(self, chunks):
        """Ensure chunks cover entire audio without gaps"""
        for i in range(len(chunks) - 1):
            gap = chunks[i+1]['start_time'] - chunks[i]['end_time']
            if gap > 1.0:  # More than 1 second gap
                print(f"Warning: Gap detected between chunk {i} and {i+1}: {gap:.2f}s")
        return True

Parallel Processing Pipeline

Process chunks concurrently to reduce total processing time:

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
import assemblyai as aai
class ParallelTranscriptionPipeline:
    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY")
    def process_chunks_parallel(self, chunks):
        """Process multiple chunks in parallel"""
        results = {}
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all chunks for processing
            future_to_chunk = {
                executor.submit(self._transcribe_chunk, chunk): chunk
                for chunk in chunks
            }
            # Collect results as they complete
            for future in as_completed(future_to_chunk):
                chunk = future_to_chunk[future]
                try:
                    result = future.result()
                    results[chunk['chunk_num']] = result
                    print(f"Completed chunk {chunk['chunk_num']}")
                except Exception as e:
                    print(f"Chunk {chunk['chunk_num']} failed: {e}")
                    results[chunk['chunk_num']] = None
        return results
    def _transcribe_chunk(self, chunk):
        """Transcribe a single chunk with retry logic"""
        max_retries = 3
        retry_delay = 5
        for attempt in range(max_retries):
            try:
                config = aai.TranscriptionConfig(
                    speaker_labels=True,
                    language_code="en_us"
                )
                transcriber = aai.Transcriber()
                transcript = transcriber.transcribe(
                    chunk['file'],
                    config=config
                )
                if transcript.status == aai.TranscriptStatus.error:
                    raise Exception(f"Transcription error: {transcript.error}")
                return {
                    'chunk_num': chunk['chunk_num'],
                    'start_time': chunk['start_time'],
                    'end_time': chunk['end_time'],
                    'transcript': transcript,
                    'text': transcript.text,
                    'utterances': transcript.utterances
                }
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"Retry {attempt + 1}/{max_retries} for chunk {chunk['chunk_num']}")
                    import time
                    time.sleep(retry_delay * (attempt + 1))
                else:
                    raise e
    async def process_chunks_async(self, chunks):
        """Async version for better concurrency"""
        tasks = [
            self._transcribe_chunk_async(chunk)
            for chunk in chunks
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # Filter out exceptions
        successful_results = {}
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Chunk {chunks[i]['chunk_num']} failed: {result}")
            else:
                successful_results[chunks[i]['chunk_num']] = result
        return successful_results
    async def _transcribe_chunk_async(self, chunk):
        """Async transcription with timeout handling"""
        timeout = aiohttp.ClientTimeout(total=3600)  # 1 hour timeout
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # Implement async transcription call
            # This is a placeholder - actual implementation depends on API
            return await self._async_api_call(chunk, session)

Stream Processing for Real-Time

Process audio as it arrives instead of waiting for complete file:

import queue
import threading
class StreamingProcessor:
    def __init__(self, chunk_size=30):
        self.chunk_size = chunk_size  # seconds
        self.audio_buffer = queue.Queue()
        self.results_queue = queue.Queue()
        self.is_processing = False
    def start_streaming(self):
        """Start stream processing in background"""
        self.is_processing = True
        # Start buffer processor thread
        self.processor_thread = threading.Thread(
            target=self._process_buffer
        )
        self.processor_thread.start()
    def add_audio_chunk(self, audio_data, timestamp):
        """Add audio chunk to processing queue"""
        self.audio_buffer.put({
            'data': audio_data,
            'timestamp': timestamp
        })
    def _process_buffer(self):
        """Process audio chunks from buffer"""
        accumulated_audio = b''
        accumulated_duration = 0
        start_timestamp = None
        while self.is_processing or not self.audio_buffer.empty():
            try:
                chunk = self.audio_buffer.get(timeout=1.0)
                if start_timestamp is None:
                    start_timestamp = chunk['timestamp']
                accumulated_audio += chunk['data']
                accumulated_duration += len(chunk['data']) / (16000 * 2)  # Assuming 16kHz, 16-bit
                # Process when we have enough audio
                if accumulated_duration >= self.chunk_size:
                    self._transcribe_accumulated(
                        accumulated_audio,
                        start_timestamp,
                        chunk['timestamp']
                    )
                    accumulated_audio = b''
                    accumulated_duration = 0
                    start_timestamp = None
            except queue.Empty:
                continue
        # Process remaining audio
        if accumulated_audio:
            self._transcribe_accumulated(
                accumulated_audio,
                start_timestamp,
                chunk['timestamp']
            )
    def _transcribe_accumulated(self, audio_data, start_time, end_time):
        """Transcribe accumulated audio chunk"""
        # Save temporary file
        import tempfile
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            temp_file = f.name
            import wave
            with wave.open(temp_file, 'wb') as wf:
                wf.setnchannels(1)
                wf.setsampwidth(2)
                wf.setframerate(16000)
                wf.writeframes(audio_data)
        # Transcribe
        config = aai.TranscriptionConfig(speaker_labels=True)
        transcriber = aai.Transcriber()
        transcript = transcriber.transcribe(temp_file, config=config)
        # Add to results
        self.results_queue.put({
            'start_time': start_time,
            'end_time': end_time,
            'transcript': transcript
        })
        print(f"Processed segment: {start_time:.2f}s - {end_time:.2f}s")
        # Cleanup
        import os
        os.unlink(temp_file)
    def stop_streaming(self):
        """Stop stream processing"""
        self.is_processing = False
        self.processor_thread.join()
    def get_results(self):
        """Retrieve all processed results"""
        results = []
        while not self.results_queue.empty():
            results.append(self.results_queue.get())
        return sorted(results, key=lambda x: x['start_time'])

Speaker Consistency Across Chunks

Maintain speaker labels when merging chunks:

class SpeakerAlignmentMerger:
    def __init__(self):
        self.speaker_mapping = {}
    def merge_transcripts(self, chunk_results):
        """Merge chunk transcripts with speaker alignment"""
        # Sort by chunk number
        sorted_chunks = sorted(
            chunk_results.items(),
            key=lambda x: x[0]
        )
        merged_utterances = []
        global_speaker_id = 0
        for chunk_num, result in sorted_chunks:
            if result is None:
                continue
            chunk_start_time = result['start_time']
            utterances = result['utterances']
            # Map speakers from this chunk to global IDs
            chunk_speaker_map = {}
            for utterance in utterances:
                local_speaker = utterance.speaker
                # Determine global speaker ID
                if local_speaker not in chunk_speaker_map:
                    # Check if we can match to previous chunk
                    global_id = self._match_speaker(
                        utterance,
                        merged_utterances[-5:] if merged_utterances else []
                    )
                    if global_id is None:
                        global_id = global_speaker_id
                        global_speaker_id += 1
                    chunk_speaker_map[local_speaker] = global_id
                # Adjust timestamp to global timeline
                adjusted_utterance = {
                    'speaker': chunk_speaker_map[local_speaker],
                    'text': utterance.text,
                    'start': chunk_start_time + (utterance.start / 1000),
                    'end': chunk_start_time + (utterance.end / 1000),
                    'confidence': utterance.confidence
                }
                merged_utterances.append(adjusted_utterance)
        return merged_utterances
    def _match_speaker(self, utterance, recent_utterances):
        """Match speaker to previous chunk using voice characteristics"""
        if not recent_utterances:
            return None
        # Simple matching based on temporal proximity
        # In production, use voice embeddings for better matching
        time_threshold = 10.0  # seconds
        for recent in reversed(recent_utterances):
            time_gap = (utterance.start / 1000) - recent['end']
            if time_gap < time_threshold:
                return recent['speaker']
        return None
    def format_merged_transcript(self, utterances):
        """Format merged transcript for output"""
        output = []
        output.append("Complete Meeting Transcript")
        output.append("=" * 70)
        output.append("")
        for utt in utterances:
            timestamp = self._format_time(utt['start'])
            speaker = f"Speaker {utt['speaker']}"
            text = utt['text']
            output.append(f"[{timestamp}] {speaker}:")
            output.append(f"  {text}")
            output.append("")
        return "\n".join(output)
    def _format_time(self, seconds):
        """Format seconds to HH:MM:SS"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        return f"{hours:02d}:{minutes:02d}:{secs:02d}"

Complete Long Audio Processing System

Integrate all components:

import os
from datetime import datetime
class LongAudioProcessor:
    def __init__(self, max_workers=5):
        self.chunker = IntelligentChunker(chunk_duration=600, overlap=5)
        self.pipeline = ParallelTranscriptionPipeline(max_workers=max_workers)
        self.merger = SpeakerAlignmentMerger()
    def process_long_audio(self, audio_file, output_file=None):
        """Complete pipeline for processing long audio files"""
        print(f"Starting processing: {audio_file}")
        start_time = datetime.now()
        # Step 1: Split into chunks
        print("\n[1/4] Splitting audio into chunks...")
        chunks = self.chunker.split_at_silence(audio_file)
        print(f"Created {len(chunks)} chunks")
        # Validate chunks
        self.chunker.validate_chunks(chunks)
        # Step 2: Process chunks in parallel
        print("\n[2/4] Processing chunks in parallel...")
        chunk_results = self.pipeline.process_chunks_parallel(chunks)
        successful_chunks = sum(1 for r in chunk_results.values() if r is not None)
        print(f"Successfully processed {successful_chunks}/{len(chunks)} chunks")
        # Step 3: Merge results
        print("\n[3/4] Merging transcripts...")
        merged_utterances = self.merger.merge_transcripts(chunk_results)
        print(f"Merged {len(merged_utterances)} utterances")
        # Step 4: Generate final output
        print("\n[4/4] Generating final transcript...")
        final_transcript = self.merger.format_merged_transcript(merged_utterances)
        # Save results
        if output_file is None:
            output_file = f"transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(final_transcript)
        # Calculate statistics
        processing_time = (datetime.now() - start_time).total_seconds()
        total_audio_duration = sum(c['duration'] for c in chunks)
        print("\n" + "=" * 70)
        print("Processing Complete!")
        print("=" * 70)
        print(f"Output file: {output_file}")
        print(f"Total audio duration: {total_audio_duration/60:.2f} minutes")
        print(f"Processing time: {processing_time/60:.2f} minutes")
        print(f"Speed: {total_audio_duration/processing_time:.2f}x real-time")
        print(f"Chunks processed: {successful_chunks}/{len(chunks)}")
        print("=" * 70)
        return final_transcript
    def cleanup_chunks(self, chunk_dir="chunks"):
        """Remove temporary chunk files"""
        import shutil
        if os.path.exists(chunk_dir):
            shutil.rmtree(chunk_dir)
            print(f"Cleaned up temporary files in {chunk_dir}")
# Usage example
if __name__ == "__main__":
    processor = LongAudioProcessor(max_workers=5)
    try:
        # Process a 3-hour meeting
        transcript = processor.process_long_audio(
            "long_meeting_3hrs.wav",
            "complete_transcript.txt"
        )
        print("\nFirst 500 characters of transcript:")
        print(transcript[:500])
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        # Clean up temporary files
        processor.cleanup_chunks()

Optimization Strategies

Use adaptive chunk sizing—start with 10-minute chunks, reduce to 5 minutes if timeouts occur. Implement exponential backoff for retries: wait 5s, then 10s, then 20s before retrying failed chunks.

Monitor memory usage when processing multiple chunks. Release processed chunks from memory immediately after merging. Use file-based storage for intermediate results instead of keeping everything in RAM.

Cache successfully processed chunks locally. If the pipeline fails, resume from the last successful chunk instead of reprocessing everything.

Progress Tracking

Add progress monitoring for long-running jobs:

def process_with_progress(self, audio_file):
    """Process with progress tracking"""
    from tqdm import tqdm
    chunks = self.chunker.split_at_silence(audio_file)
    with tqdm(total=len(chunks), desc="Processing chunks") as pbar:
        results = {}
        for chunk in chunks:
            result = self._transcribe_chunk(chunk)
            results[chunk['chunk_num']] = result
            pbar.update(1)
    return results

Your long audio processor now handles meetings of any length through intelligent chunking, parallel processing, and robust error handling, processing 3-hour meetings in under 30 minutes.

Conclusion

Processing long meeting audio requires intelligent chunking at natural boundaries, parallel processing for speed, robust retry logic for reliability, and careful speaker alignment when merging results to handle meetings of any duration without timeouts.

If you want production-ready long audio processing without building complex pipelines, consider Meetstream.ai API, which automatically handles audio of any length with optimized chunking and processing.

Leave a Reply

Your email address will not be published. Required fields are marked *