Long meetings create massive audio files that crash transcription pipelines. API timeouts, memory limits, and processing bottlenecks turn three-hour board meetings into deployment nightmares.
Processing long audio requires chunking strategies, parallel processing, and robust retry mechanisms.
This guide demonstrates how to build production-grade pipelines that handle meetings of any length reliably.
Understanding Processing Constraints
Most transcription APIs impose hard limits: 5GB file size limits, 5-hour duration caps, 30-minute API timeouts.
Your pipeline must split audio into manageable chunks, process them independently, and reassemble results while maintaining temporal continuity and speaker consistency.
Intelligent Audio Chunking
Split audio at natural boundaries to preserve context:
import wave
import numpy as np
from pydub import AudioSegment
from pydub.silence import detect_silence
class IntelligentChunker:
def __init__(self, chunk_duration=600, overlap=5):
“””
chunk_duration: target chunk length in seconds (default 10 minutes)
overlap: overlap between chunks in seconds for context
“””
self.chunk_duration = chunk_duration
self.overlap = overlap
def split_at_silence(self, audio_file, output_dir=”chunks”):
“””Split audio at silence points near chunk boundaries”””
import os
os.makedirs(output_dir, exist_ok=True)
# Load audio
audio = AudioSegment.from_file(audio_file)
duration_ms = len(audio)
chunk_duration_ms = self.chunk_duration * 1000
overlap_ms = self.overlap * 1000
chunks = []
chunk_start = 0
chunk_num = 0
while chunk_start < duration_ms:
# Calculate target end point
target_end = min(chunk_start + chunk_duration_ms, duration_ms)
# Find silence near target end point
search_window_start = max(target_end – 10000, chunk_start)
search_window = audio[search_window_start:target_end + 10000]
# Detect silence in search window
silence_ranges = detect_silence(
search_window,
min_silence_len=500,
silence_thresh=-40
)
# Find best split point
if silence_ranges:
# Use middle of longest silence period
best_silence = max(silence_ranges, key=lambda x: x[1] – x[0])
split_point = search_window_start + (best_silence[0] + best_silence[1]) // 2
else:
# No silence found, split at target
split_point = target_end
# Extract chunk with overlap
chunk_end = min(split_point + overlap_ms, duration_ms)
chunk = audio[chunk_start:chunk_end]
# Save chunk
chunk_file = f”{output_dir}/chunk_{chunk_num:04d}.wav”
chunk.export(chunk_file, format=”wav”)
chunks.append({
‘file’: chunk_file,
‘start_time’: chunk_start / 1000,
‘end_time’: chunk_end / 1000,
‘duration’: (chunk_end – chunk_start) / 1000,
‘chunk_num’: chunk_num
})
print(f”Created chunk {chunk_num}: {chunk_start/1000:.2f}s – {chunk_end/1000:.2f}s”)
# Move to next chunk (accounting for overlap)
chunk_start = split_point
chunk_num += 1
return chunks
def validate_chunks(self, chunks):
“””Ensure chunks cover entire audio without gaps”””
for i in range(len(chunks) – 1):
gap = chunks[i+1][‘start_time’] – chunks[i][‘end_time’]
if gap > 1.0: # More than 1 second gap
print(f”Warning: Gap detected between chunk {i} and {i+1}: {gap:.2f}s”)
return True
Parallel Processing Pipeline
Process chunks concurrently to reduce total processing time:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
import assemblyai as aai
class ParallelTranscriptionPipeline:
def __init__(self, max_workers=5):
self.max_workers = max_workers
aai.settings.api_key = os.getenv(“ASSEMBLYAI_API_KEY”)
def process_chunks_parallel(self, chunks):
“””Process multiple chunks in parallel”””
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all chunks for processing
future_to_chunk = {
executor.submit(self._transcribe_chunk, chunk): chunk
for chunk in chunks
}
# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
result = future.result()
results[chunk[‘chunk_num’]] = result
print(f”Completed chunk {chunk[‘chunk_num’]}”)
except Exception as e:
print(f”Chunk {chunk[‘chunk_num’]} failed: {e}”)
results[chunk[‘chunk_num’]] = None
return results
def _transcribe_chunk(self, chunk):
“””Transcribe a single chunk with retry logic”””
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
config = aai.TranscriptionConfig(
speaker_labels=True,
language_code=”en_us”
)
transcriber = aai.Transcriber()
transcript = transcriber.transcribe(
chunk[‘file’],
config=config
)
if transcript.status == aai.TranscriptStatus.error:
raise Exception(f”Transcription error: {transcript.error}”)
return {
‘chunk_num’: chunk[‘chunk_num’],
‘start_time’: chunk[‘start_time’],
‘end_time’: chunk[‘end_time’],
‘transcript’: transcript,
‘text’: transcript.text,
‘utterances’: transcript.utterances
}
except Exception as e:
if attempt < max_retries – 1:
print(f”Retry {attempt + 1}/{max_retries} for chunk {chunk[‘chunk_num’]}”)
import time
time.sleep(retry_delay * (attempt + 1))
else:
raise e
async def process_chunks_async(self, chunks):
“””Async version for better concurrency”””
tasks = [
self._transcribe_chunk_async(chunk)
for chunk in chunks
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
successful_results = {}
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f”Chunk {chunks[i][‘chunk_num’]} failed: {result}”)
else:
successful_results[chunks[i][‘chunk_num’]] = result
return successful_results
async def _transcribe_chunk_async(self, chunk):
“””Async transcription with timeout handling”””
timeout = aiohttp.ClientTimeout(total=3600) # 1 hour timeout
async with aiohttp.ClientSession(timeout=timeout) as session:
# Implement async transcription call
# This is a placeholder – actual implementation depends on API
return await self._async_api_call(chunk, session)
Stream Processing for Real-Time
Process audio as it arrives instead of waiting for complete file:
import queue
import threading
class StreamingProcessor:
def __init__(self, chunk_size=30):
self.chunk_size = chunk_size # seconds
self.audio_buffer = queue.Queue()
self.results_queue = queue.Queue()
self.is_processing = False
def start_streaming(self):
“””Start stream processing in background”””
self.is_processing = True
# Start buffer processor thread
self.processor_thread = threading.Thread(
target=self._process_buffer
)
self.processor_thread.start()
def add_audio_chunk(self, audio_data, timestamp):
“””Add audio chunk to processing queue”””
self.audio_buffer.put({
‘data’: audio_data,
‘timestamp’: timestamp
})
def _process_buffer(self):
“””Process audio chunks from buffer”””
accumulated_audio = b”
accumulated_duration = 0
start_timestamp = None
while self.is_processing or not self.audio_buffer.empty():
try:
chunk = self.audio_buffer.get(timeout=1.0)
if start_timestamp is None:
start_timestamp = chunk[‘timestamp’]
accumulated_audio += chunk[‘data’]
accumulated_duration += len(chunk[‘data’]) / (16000 * 2) # Assuming 16kHz, 16-bit
# Process when we have enough audio
if accumulated_duration >= self.chunk_size:
self._transcribe_accumulated(
accumulated_audio,
start_timestamp,
chunk[‘timestamp’]
)
accumulated_audio = b”
accumulated_duration = 0
start_timestamp = None
except queue.Empty:
continue
# Process remaining audio
if accumulated_audio:
self._transcribe_accumulated(
accumulated_audio,
start_timestamp,
chunk[‘timestamp’]
)
def _transcribe_accumulated(self, audio_data, start_time, end_time):
“””Transcribe accumulated audio chunk”””
# Save temporary file
import tempfile
with tempfile.NamedTemporaryFile(suffix=’.wav’, delete=False) as f:
temp_file = f.name
import wave
with wave.open(temp_file, ‘wb’) as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(audio_data)
# Transcribe
config = aai.TranscriptionConfig(speaker_labels=True)
transcriber = aai.Transcriber()
transcript = transcriber.transcribe(temp_file, config=config)
# Add to results
self.results_queue.put({
‘start_time’: start_time,
‘end_time’: end_time,
‘transcript’: transcript
})
print(f”Processed segment: {start_time:.2f}s – {end_time:.2f}s”)
# Cleanup
import os
os.unlink(temp_file)
def stop_streaming(self):
“””Stop stream processing”””
self.is_processing = False
self.processor_thread.join()
def get_results(self):
“””Retrieve all processed results”””
results = []
while not self.results_queue.empty():
results.append(self.results_queue.get())
return sorted(results, key=lambda x: x[‘start_time’])
Speaker Consistency Across Chunks
Maintain speaker labels when merging chunks:
class SpeakerAlignmentMerger:
def __init__(self):
self.speaker_mapping = {}
def merge_transcripts(self, chunk_results):
“””Merge chunk transcripts with speaker alignment”””
# Sort by chunk number
sorted_chunks = sorted(
chunk_results.items(),
key=lambda x: x[0]
)
merged_utterances = []
global_speaker_id = 0
for chunk_num, result in sorted_chunks:
if result is None:
continue
chunk_start_time = result[‘start_time’]
utterances = result[‘utterances’]
# Map speakers from this chunk to global IDs
chunk_speaker_map = {}
for utterance in utterances:
local_speaker = utterance.speaker
# Determine global speaker ID
if local_speaker not in chunk_speaker_map:
# Check if we can match to previous chunk
global_id = self._match_speaker(
utterance,
merged_utterances[-5:] if merged_utterances else []
)
if global_id is None:
global_id = global_speaker_id
global_speaker_id += 1
chunk_speaker_map[local_speaker] = global_id
# Adjust timestamp to global timeline
adjusted_utterance = {
‘speaker’: chunk_speaker_map[local_speaker],
‘text’: utterance.text,
‘start’: chunk_start_time + (utterance.start / 1000),
‘end’: chunk_start_time + (utterance.end / 1000),
‘confidence’: utterance.confidence
}
merged_utterances.append(adjusted_utterance)
return merged_utterances
def _match_speaker(self, utterance, recent_utterances):
“””Match speaker to previous chunk using voice characteristics”””
if not recent_utterances:
return None
# Simple matching based on temporal proximity
# In production, use voice embeddings for better matching
time_threshold = 10.0 # seconds
for recent in reversed(recent_utterances):
time_gap = (utterance.start / 1000) – recent[‘end’]
if time_gap < time_threshold:
return recent[‘speaker’]
return None
def format_merged_transcript(self, utterances):
“””Format merged transcript for output”””
output = []
output.append(“Complete Meeting Transcript”)
output.append(“=” * 70)
output.append(“”)
for utt in utterances:
timestamp = self._format_time(utt[‘start’])
speaker = f”Speaker {utt[‘speaker’]}”
text = utt[‘text’]
output.append(f”[{timestamp}] {speaker}:”)
output.append(f” {text}”)
output.append(“”)
return “\n”.join(output)
def _format_time(self, seconds):
“””Format seconds to HH:MM:SS”””
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f”{hours:02d}:{minutes:02d}:{secs:02d}”
Complete Long Audio Processing System
Integrate all components:
import os
from datetime import datetime
class LongAudioProcessor:
def __init__(self, max_workers=5):
self.chunker = IntelligentChunker(chunk_duration=600, overlap=5)
self.pipeline = ParallelTranscriptionPipeline(max_workers=max_workers)
self.merger = SpeakerAlignmentMerger()
def process_long_audio(self, audio_file, output_file=None):
“””Complete pipeline for processing long audio files”””
print(f”Starting processing: {audio_file}”)
start_time = datetime.now()
# Step 1: Split into chunks
print(“\n[1/4] Splitting audio into chunks…”)
chunks = self.chunker.split_at_silence(audio_file)
print(f”Created {len(chunks)} chunks”)
# Validate chunks
self.chunker.validate_chunks(chunks)
# Step 2: Process chunks in parallel
print(“\n[2/4] Processing chunks in parallel…”)
chunk_results = self.pipeline.process_chunks_parallel(chunks)
successful_chunks = sum(1 for r in chunk_results.values() if r is not None)
print(f”Successfully processed {successful_chunks}/{len(chunks)} chunks”)
# Step 3: Merge results
print(“\n[3/4] Merging transcripts…”)
merged_utterances = self.merger.merge_transcripts(chunk_results)
print(f”Merged {len(merged_utterances)} utterances”)
# Step 4: Generate final output
print(“\n[4/4] Generating final transcript…”)
final_transcript = self.merger.format_merged_transcript(merged_utterances)
# Save results
if output_file is None:
output_file = f”transcript_{datetime.now().strftime(‘%Y%m%d_%H%M%S’)}.txt”
with open(output_file, ‘w’, encoding=’utf-8′) as f:
f.write(final_transcript)
# Calculate statistics
processing_time = (datetime.now() – start_time).total_seconds()
total_audio_duration = sum(c[‘duration’] for c in chunks)
print(“\n” + “=” * 70)
print(“Processing Complete!”)
print(“=” * 70)
print(f”Output file: {output_file}”)
print(f”Total audio duration: {total_audio_duration/60:.2f} minutes”)
print(f”Processing time: {processing_time/60:.2f} minutes”)
print(f”Speed: {total_audio_duration/processing_time:.2f}x real-time”)
print(f”Chunks processed: {successful_chunks}/{len(chunks)}”)
print(“=” * 70)
return final_transcript
def cleanup_chunks(self, chunk_dir=”chunks”):
“””Remove temporary chunk files”””
import shutil
if os.path.exists(chunk_dir):
shutil.rmtree(chunk_dir)
print(f”Cleaned up temporary files in {chunk_dir}”)
# Usage example
if __name__ == “__main__”:
processor = LongAudioProcessor(max_workers=5)
try:
# Process a 3-hour meeting
transcript = processor.process_long_audio(
“long_meeting_3hrs.wav”,
“complete_transcript.txt”
)
print(“\nFirst 500 characters of transcript:”)
print(transcript[:500])
except Exception as e:
print(f”Error: {e}”)
import traceback
traceback.print_exc()
finally:
# Clean up temporary files
processor.cleanup_chunks()
Optimization Strategies
Use adaptive chunk sizing—start with 10-minute chunks, reduce to 5 minutes if timeouts occur. Implement exponential backoff for retries: wait 5s, then 10s, then 20s before retrying failed chunks.
Monitor memory usage when processing multiple chunks. Release processed chunks from memory immediately after merging. Use file-based storage for intermediate results instead of keeping everything in RAM.
Cache successfully processed chunks locally. If the pipeline fails, resume from the last successful chunk instead of reprocessing everything.
Progress Tracking
Add progress monitoring for long-running jobs:
def process_with_progress(self, audio_file):
“””Process with progress tracking”””
from tqdm import tqdm
chunks = self.chunker.split_at_silence(audio_file)
with tqdm(total=len(chunks), desc=”Processing chunks”) as pbar:
results = {}
for chunk in chunks:
result = self._transcribe_chunk(chunk)
results[chunk[‘chunk_num’]] = result
pbar.update(1)
return results
Your long audio processor now handles meetings of any length through intelligent chunking, parallel processing, and robust error handling, processing 3-hour meetings in under 30 minutes.
Conclusion
Processing long meeting audio requires intelligent chunking at natural boundaries, parallel processing for speed, robust retry logic for reliability, and careful speaker alignment when merging results to handle meetings of any duration without timeouts.
If you want production-ready long audio processing without building complex pipelines, consider Meetstream.ai API, which automatically handles audio of any length with optimized chunking and processing.