How to Process Long Meeting Audio Without Timeouts

Long meetings create massive audio files that crash transcription pipelines. API timeouts, memory limits, and processing bottlenecks turn three-hour board meetings into deployment nightmares. 

Processing long audio requires chunking strategies, parallel processing, and robust retry mechanisms. 

This guide demonstrates how to build production-grade pipelines that handle meetings of any length reliably.

Understanding Processing Constraints

Most transcription APIs impose hard limits: 5GB file size limits, 5-hour duration caps, 30-minute API timeouts. 

Your pipeline must split audio into manageable chunks, process them independently, and reassemble results while maintaining temporal continuity and speaker consistency.

Intelligent Audio Chunking

Split audio at natural boundaries to preserve context:

import wave

import numpy as np

from pydub import AudioSegment

from pydub.silence import detect_silence

class IntelligentChunker:

    def __init__(self, chunk_duration=600, overlap=5):

        “””

        chunk_duration: target chunk length in seconds (default 10 minutes)

        overlap: overlap between chunks in seconds for context

        “””

        self.chunk_duration = chunk_duration

        self.overlap = overlap

    def split_at_silence(self, audio_file, output_dir=”chunks”):

        “””Split audio at silence points near chunk boundaries”””

        import os

        os.makedirs(output_dir, exist_ok=True)

        # Load audio

        audio = AudioSegment.from_file(audio_file)

        duration_ms = len(audio)

        chunk_duration_ms = self.chunk_duration * 1000

        overlap_ms = self.overlap * 1000

        chunks = []

        chunk_start = 0

        chunk_num = 0

        while chunk_start < duration_ms:

            # Calculate target end point

            target_end = min(chunk_start + chunk_duration_ms, duration_ms)

            # Find silence near target end point

            search_window_start = max(target_end – 10000, chunk_start)

            search_window = audio[search_window_start:target_end + 10000]

            # Detect silence in search window

            silence_ranges = detect_silence(

                search_window,

                min_silence_len=500,

                silence_thresh=-40

            )

            # Find best split point

            if silence_ranges:

                # Use middle of longest silence period

                best_silence = max(silence_ranges, key=lambda x: x[1] – x[0])

                split_point = search_window_start + (best_silence[0] + best_silence[1]) // 2

            else:

                # No silence found, split at target

                split_point = target_end

            # Extract chunk with overlap

            chunk_end = min(split_point + overlap_ms, duration_ms)

            chunk = audio[chunk_start:chunk_end]

            # Save chunk

            chunk_file = f”{output_dir}/chunk_{chunk_num:04d}.wav”

            chunk.export(chunk_file, format=”wav”)

            chunks.append({

                ‘file’: chunk_file,

                ‘start_time’: chunk_start / 1000,

                ‘end_time’: chunk_end / 1000,

                ‘duration’: (chunk_end – chunk_start) / 1000,

                ‘chunk_num’: chunk_num

            })

            print(f”Created chunk {chunk_num}: {chunk_start/1000:.2f}s – {chunk_end/1000:.2f}s”)

            # Move to next chunk (accounting for overlap)

            chunk_start = split_point

            chunk_num += 1

        return chunks

    def validate_chunks(self, chunks):

        “””Ensure chunks cover entire audio without gaps”””

        for i in range(len(chunks) – 1):

            gap = chunks[i+1][‘start_time’] – chunks[i][‘end_time’]

            if gap > 1.0:  # More than 1 second gap

                print(f”Warning: Gap detected between chunk {i} and {i+1}: {gap:.2f}s”)

        return True

Parallel Processing Pipeline

Process chunks concurrently to reduce total processing time:

import asyncio

import aiohttp

from concurrent.futures import ThreadPoolExecutor, as_completed

import assemblyai as aai

class ParallelTranscriptionPipeline:

    def __init__(self, max_workers=5):

        self.max_workers = max_workers

        aai.settings.api_key = os.getenv(“ASSEMBLYAI_API_KEY”)

    def process_chunks_parallel(self, chunks):

        “””Process multiple chunks in parallel”””

        results = {}

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:

            # Submit all chunks for processing

            future_to_chunk = {

                executor.submit(self._transcribe_chunk, chunk): chunk

                for chunk in chunks

            }

            # Collect results as they complete

            for future in as_completed(future_to_chunk):

                chunk = future_to_chunk[future]

                try:

                    result = future.result()

                    results[chunk[‘chunk_num’]] = result

                    print(f”Completed chunk {chunk[‘chunk_num’]}”)

                except Exception as e:

                    print(f”Chunk {chunk[‘chunk_num’]} failed: {e}”)

                    results[chunk[‘chunk_num’]] = None

        return results

    def _transcribe_chunk(self, chunk):

        “””Transcribe a single chunk with retry logic”””

        max_retries = 3

        retry_delay = 5

        for attempt in range(max_retries):

            try:

                config = aai.TranscriptionConfig(

                    speaker_labels=True,

                    language_code=”en_us”

                )

                transcriber = aai.Transcriber()

                transcript = transcriber.transcribe(

                    chunk[‘file’],

                    config=config

                )

                if transcript.status == aai.TranscriptStatus.error:

                    raise Exception(f”Transcription error: {transcript.error}”)

                return {

                    ‘chunk_num’: chunk[‘chunk_num’],

                    ‘start_time’: chunk[‘start_time’],

                    ‘end_time’: chunk[‘end_time’],

                    ‘transcript’: transcript,

                    ‘text’: transcript.text,

                    ‘utterances’: transcript.utterances

                }

            except Exception as e:

                if attempt < max_retries – 1:

                    print(f”Retry {attempt + 1}/{max_retries} for chunk {chunk[‘chunk_num’]}”)

                    import time

                    time.sleep(retry_delay * (attempt + 1))

                else:

                    raise e

    async def process_chunks_async(self, chunks):

        “””Async version for better concurrency”””

        tasks = [

            self._transcribe_chunk_async(chunk)

            for chunk in chunks

        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Filter out exceptions

        successful_results = {}

        for i, result in enumerate(results):

            if isinstance(result, Exception):

                print(f”Chunk {chunks[i][‘chunk_num’]} failed: {result}”)

            else:

                successful_results[chunks[i][‘chunk_num’]] = result

        return successful_results

    async def _transcribe_chunk_async(self, chunk):

        “””Async transcription with timeout handling”””

        timeout = aiohttp.ClientTimeout(total=3600)  # 1 hour timeout

        async with aiohttp.ClientSession(timeout=timeout) as session:

            # Implement async transcription call

            # This is a placeholder – actual implementation depends on API

            return await self._async_api_call(chunk, session)

Stream Processing for Real-Time

Process audio as it arrives instead of waiting for complete file:

import queue

import threading

class StreamingProcessor:

    def __init__(self, chunk_size=30):

        self.chunk_size = chunk_size  # seconds

        self.audio_buffer = queue.Queue()

        self.results_queue = queue.Queue()

        self.is_processing = False

    def start_streaming(self):

        “””Start stream processing in background”””

        self.is_processing = True

        # Start buffer processor thread

        self.processor_thread = threading.Thread(

            target=self._process_buffer

        )

        self.processor_thread.start()

    def add_audio_chunk(self, audio_data, timestamp):

        “””Add audio chunk to processing queue”””

        self.audio_buffer.put({

            ‘data’: audio_data,

            ‘timestamp’: timestamp

        })

    def _process_buffer(self):

        “””Process audio chunks from buffer”””

        accumulated_audio = b”

        accumulated_duration = 0

        start_timestamp = None

        while self.is_processing or not self.audio_buffer.empty():

            try:

                chunk = self.audio_buffer.get(timeout=1.0)

                if start_timestamp is None:

                    start_timestamp = chunk[‘timestamp’]

                accumulated_audio += chunk[‘data’]

                accumulated_duration += len(chunk[‘data’]) / (16000 * 2)  # Assuming 16kHz, 16-bit

                # Process when we have enough audio

                if accumulated_duration >= self.chunk_size:

                    self._transcribe_accumulated(

                        accumulated_audio,

                        start_timestamp,

                        chunk[‘timestamp’]

                    )

                    accumulated_audio = b”

                    accumulated_duration = 0

                    start_timestamp = None

            except queue.Empty:

                continue

        # Process remaining audio

        if accumulated_audio:

            self._transcribe_accumulated(

                accumulated_audio,

                start_timestamp,

                chunk[‘timestamp’]

            )

    def _transcribe_accumulated(self, audio_data, start_time, end_time):

        “””Transcribe accumulated audio chunk”””

        # Save temporary file

        import tempfile

        with tempfile.NamedTemporaryFile(suffix=’.wav’, delete=False) as f:

            temp_file = f.name

            import wave

            with wave.open(temp_file, ‘wb’) as wf:

                wf.setnchannels(1)

                wf.setsampwidth(2)

                wf.setframerate(16000)

                wf.writeframes(audio_data)

        # Transcribe

        config = aai.TranscriptionConfig(speaker_labels=True)

        transcriber = aai.Transcriber()

        transcript = transcriber.transcribe(temp_file, config=config)

        # Add to results

        self.results_queue.put({

            ‘start_time’: start_time,

            ‘end_time’: end_time,

            ‘transcript’: transcript

        })

        print(f”Processed segment: {start_time:.2f}s – {end_time:.2f}s”)

        # Cleanup

        import os

        os.unlink(temp_file)

    def stop_streaming(self):

        “””Stop stream processing”””

        self.is_processing = False

        self.processor_thread.join()

    def get_results(self):

        “””Retrieve all processed results”””

        results = []

        while not self.results_queue.empty():

            results.append(self.results_queue.get())

        return sorted(results, key=lambda x: x[‘start_time’])

Speaker Consistency Across Chunks

Maintain speaker labels when merging chunks:

class SpeakerAlignmentMerger:

    def __init__(self):

        self.speaker_mapping = {}

    def merge_transcripts(self, chunk_results):

        “””Merge chunk transcripts with speaker alignment”””

        # Sort by chunk number

        sorted_chunks = sorted(

            chunk_results.items(),

            key=lambda x: x[0]

        )

        merged_utterances = []

        global_speaker_id = 0

        for chunk_num, result in sorted_chunks:

            if result is None:

                continue

            chunk_start_time = result[‘start_time’]

            utterances = result[‘utterances’]

            # Map speakers from this chunk to global IDs

            chunk_speaker_map = {}

            for utterance in utterances:

                local_speaker = utterance.speaker

                # Determine global speaker ID

                if local_speaker not in chunk_speaker_map:

                    # Check if we can match to previous chunk

                    global_id = self._match_speaker(

                        utterance,

                        merged_utterances[-5:] if merged_utterances else []

                    )

                    if global_id is None:

                        global_id = global_speaker_id

                        global_speaker_id += 1

                    chunk_speaker_map[local_speaker] = global_id

                # Adjust timestamp to global timeline

                adjusted_utterance = {

                    ‘speaker’: chunk_speaker_map[local_speaker],

                    ‘text’: utterance.text,

                    ‘start’: chunk_start_time + (utterance.start / 1000),

                    ‘end’: chunk_start_time + (utterance.end / 1000),

                    ‘confidence’: utterance.confidence

                }

                merged_utterances.append(adjusted_utterance)

        return merged_utterances

    def _match_speaker(self, utterance, recent_utterances):

        “””Match speaker to previous chunk using voice characteristics”””

        if not recent_utterances:

            return None

        # Simple matching based on temporal proximity

        # In production, use voice embeddings for better matching

        time_threshold = 10.0  # seconds

        for recent in reversed(recent_utterances):

            time_gap = (utterance.start / 1000) – recent[‘end’]

            if time_gap < time_threshold:

                return recent[‘speaker’]

        return None

    def format_merged_transcript(self, utterances):

        “””Format merged transcript for output”””

        output = []

        output.append(“Complete Meeting Transcript”)

        output.append(“=” * 70)

        output.append(“”)

        for utt in utterances:

            timestamp = self._format_time(utt[‘start’])

            speaker = f”Speaker {utt[‘speaker’]}”

            text = utt[‘text’]

            output.append(f”[{timestamp}] {speaker}:”)

            output.append(f”  {text}”)

            output.append(“”)

        return “\n”.join(output)

    def _format_time(self, seconds):

        “””Format seconds to HH:MM:SS”””

        hours = int(seconds // 3600)

        minutes = int((seconds % 3600) // 60)

        secs = int(seconds % 60)

        return f”{hours:02d}:{minutes:02d}:{secs:02d}”

Complete Long Audio Processing System

Integrate all components:

import os

from datetime import datetime

class LongAudioProcessor:

    def __init__(self, max_workers=5):

        self.chunker = IntelligentChunker(chunk_duration=600, overlap=5)

        self.pipeline = ParallelTranscriptionPipeline(max_workers=max_workers)

        self.merger = SpeakerAlignmentMerger()

    def process_long_audio(self, audio_file, output_file=None):

        “””Complete pipeline for processing long audio files”””

        print(f”Starting processing: {audio_file}”)

        start_time = datetime.now()

        # Step 1: Split into chunks

        print(“\n[1/4] Splitting audio into chunks…”)

        chunks = self.chunker.split_at_silence(audio_file)

        print(f”Created {len(chunks)} chunks”)

        # Validate chunks

        self.chunker.validate_chunks(chunks)

        # Step 2: Process chunks in parallel

        print(“\n[2/4] Processing chunks in parallel…”)

        chunk_results = self.pipeline.process_chunks_parallel(chunks)

        successful_chunks = sum(1 for r in chunk_results.values() if r is not None)

        print(f”Successfully processed {successful_chunks}/{len(chunks)} chunks”)

        # Step 3: Merge results

        print(“\n[3/4] Merging transcripts…”)

        merged_utterances = self.merger.merge_transcripts(chunk_results)

        print(f”Merged {len(merged_utterances)} utterances”)

        # Step 4: Generate final output

        print(“\n[4/4] Generating final transcript…”)

        final_transcript = self.merger.format_merged_transcript(merged_utterances)

        # Save results

        if output_file is None:

            output_file = f”transcript_{datetime.now().strftime(‘%Y%m%d_%H%M%S’)}.txt”

        with open(output_file, ‘w’, encoding=’utf-8′) as f:

            f.write(final_transcript)

        # Calculate statistics

        processing_time = (datetime.now() – start_time).total_seconds()

        total_audio_duration = sum(c[‘duration’] for c in chunks)

        print(“\n” + “=” * 70)

        print(“Processing Complete!”)

        print(“=” * 70)

        print(f”Output file: {output_file}”)

        print(f”Total audio duration: {total_audio_duration/60:.2f} minutes”)

        print(f”Processing time: {processing_time/60:.2f} minutes”)

        print(f”Speed: {total_audio_duration/processing_time:.2f}x real-time”)

        print(f”Chunks processed: {successful_chunks}/{len(chunks)}”)

        print(“=” * 70)

        return final_transcript

    def cleanup_chunks(self, chunk_dir=”chunks”):

        “””Remove temporary chunk files”””

        import shutil

        if os.path.exists(chunk_dir):

            shutil.rmtree(chunk_dir)

            print(f”Cleaned up temporary files in {chunk_dir}”)

# Usage example

if __name__ == “__main__”:

    processor = LongAudioProcessor(max_workers=5)

    try:

        # Process a 3-hour meeting

        transcript = processor.process_long_audio(

            “long_meeting_3hrs.wav”,

            “complete_transcript.txt”

        )

        print(“\nFirst 500 characters of transcript:”)

        print(transcript[:500])

    except Exception as e:

        print(f”Error: {e}”)

        import traceback

        traceback.print_exc()

    finally:

        # Clean up temporary files

        processor.cleanup_chunks()

Optimization Strategies

Use adaptive chunk sizing—start with 10-minute chunks, reduce to 5 minutes if timeouts occur. Implement exponential backoff for retries: wait 5s, then 10s, then 20s before retrying failed chunks.

Monitor memory usage when processing multiple chunks. Release processed chunks from memory immediately after merging. Use file-based storage for intermediate results instead of keeping everything in RAM.

Cache successfully processed chunks locally. If the pipeline fails, resume from the last successful chunk instead of reprocessing everything.

Progress Tracking

Add progress monitoring for long-running jobs:

def process_with_progress(self, audio_file):

    “””Process with progress tracking”””

    from tqdm import tqdm

    chunks = self.chunker.split_at_silence(audio_file)

    with tqdm(total=len(chunks), desc=”Processing chunks”) as pbar:

        results = {}

        for chunk in chunks:

            result = self._transcribe_chunk(chunk)

            results[chunk[‘chunk_num’]] = result

            pbar.update(1)

    return results

Your long audio processor now handles meetings of any length through intelligent chunking, parallel processing, and robust error handling, processing 3-hour meetings in under 30 minutes.

Conclusion

Processing long meeting audio requires intelligent chunking at natural boundaries, parallel processing for speed, robust retry logic for reliability, and careful speaker alignment when merging results to handle meetings of any duration without timeouts.

If you want production-ready long audio processing without building complex pipelines, consider Meetstream.ai API, which automatically handles audio of any length with optimized chunking and processing.

Leave a Reply

Your email address will not be published. Required fields are marked *