Audio quality determines transcription accuracy. When building meeting bots, capturing clean, interference-free audio streams separates professional solutions from amateur ones. This guide explores proven techniques for capturing high-quality audio from meeting platforms, handling noise reduction, and implementing robust audio processing pipelines.
Understanding Audio Stream Challenges
Meeting bots face unique audio challenges: multiple speakers talking simultaneously, background noise, varying microphone quality, network packet loss, and audio compression artifacts. Your bot must handle these issues while maintaining synchronization between audio and metadata like speaker identification.
Audio Format Fundamentals
Start with the right audio configuration:
import pyaudio
import wave
class AudioConfig:
"""Optimal audio settings for speech recognition"""
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024
@staticmethod
def get_optimal_config():
"""Return configuration dict for audio capture"""
return {
'format': AudioConfig.FORMAT,
'channels': AudioConfig.CHANNELS,
'rate': AudioConfig.RATE,
'chunk': AudioConfig.CHUNK,
'sample_width': pyaudio.get_sample_size(AudioConfig.FORMAT)
}Use 16kHz sample rate for speechâit captures frequencies up to 8kHz, covering the entire human speech range while keeping file sizes manageable. Mono audio simplifies processing and reduces bandwidth without losing speech intelligibility.
Capturing Raw Audio Streams
Implement a robust audio capture system that handles interruptions gracefully:
import threading
import queue
import numpy as np
class AudioStreamCapture:
def __init__(self, config=None):
self.config = config or AudioConfig.get_optimal_config()
self.audio = pyaudio.PyAudio()
self.stream = None
self.audio_queue = queue.Queue()
self.is_capturing = False
self.capture_thread = None
def start_capture(self, device_index=None):
self.is_capturing = True
try:
self.stream = self.audio.open(
format=self.config['format'],
channels=self.config['channels'],
rate=self.config['rate'],
input=True,
input_device_index=device_index,
frames_per_buffer=self.config['chunk'],
stream_callback=self._audio_callback
)
self.stream.start_stream()
print("Audio capture started successfully")
except Exception as e:
print(f"Error starting audio capture: {e}")
self._recover_stream()
def _audio_callback(self, in_data, frame_count, time_info, status):
if status:
print(f"Audio status warning: {status}")
if self.is_capturing:
self.audio_queue.put(in_data)
return (in_data, pyaudio.paContinue)
def _recover_stream(self):
print("Attempting to recover audio stream...")
if self.stream:
self.stream.close()
import time
time.sleep(1)
self.start_capture()
def get_audio_chunk(self, timeout=1.0):
try:
return self.audio_queue.get(timeout=timeout)
except queue.Empty:
return None
def stop_capture(self):
self.is_capturing = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
print("Audio capture stopped")Implementing Noise Reduction
Clean audio requires aggressive noise reduction. Use spectral subtraction and bandpass filtering:
from scipy import signal
import numpy as np
class AudioNoiseReducer:
def __init__(self, sample_rate=16000):
self.sample_rate = sample_rate
self.noise_profile = None
def create_noise_profile(self, audio_data, duration=1.0):
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_float = audio_array.astype(np.float32)
f, t, stft = signal.stft(audio_float, fs=self.sample_rate, nperseg=512)
self.noise_profile = np.abs(stft).mean(axis=1)
print("Noise profile created")
def reduce_noise(self, audio_data):
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_float = audio_array.astype(np.float32)
f, t, stft = signal.stft(audio_float, fs=self.sample_rate, nperseg=512)
magnitude = np.abs(stft)
phase = np.angle(stft)
if self.noise_profile is not None:
clean_magnitude = magnitude - self.noise_profile[:, np.newaxis]
clean_magnitude = np.maximum(clean_magnitude, 0)
else:
clean_magnitude = magnitude
clean_stft = clean_magnitude * np.exp(1j * phase)
_, clean_audio = signal.istft(clean_stft, fs=self.sample_rate, nperseg=512)
clean_audio = np.clip(clean_audio, -32768, 32767)
return clean_audio.astype(np.int16).tobytes()
def apply_bandpass_filter(self, audio_data, lowcut=80, highcut=8000):
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_float = audio_array.astype(np.float32)
nyquist = self.sample_rate / 2
low = lowcut / nyquist
high = highcut / nyquist
b, a = signal.butter(4, [low, high], btype='band')
filtered = signal.filtfilt(b, a, audio_float)
return filtered.astype(np.int16).tobytes()Voice Activity Detection (VAD)
Save bandwidth and improve processing by detecting speech segments:
import webrtcvad
class VoiceActivityDetector:
def __init__(self, aggressiveness=3):
self.vad = webrtcvad.Vad(aggressiveness)
self.sample_rate = 16000
self.frame_duration = 30 # ms
def is_speech(self, audio_chunk):
try:
return self.vad.is_speech(audio_chunk, self.sample_rate)
except Exception as e:
print(f"VAD error: {e}")
return False
def get_speech_segments(self, audio_data):
frame_size = int(self.sample_rate * self.frame_duration / 1000) * 2
speech_frames = []
for i in range(0, len(audio_data), frame_size):
frame = audio_data[i:i + frame_size]
if len(frame) < frame_size:
continue
if self.is_speech(frame):
speech_frames.append(frame)
return b''.join(speech_frames)Audio Normalization and Enhancement
Normalize volume levels across different speakers:
from pydub import AudioSegment
from pydub.effects import normalize, compress_dynamic_range
class AudioEnhancer:
@staticmethod
def normalize_audio(audio_data, target_dBFS=-20.0):
audio = AudioSegment(
data=audio_data, sample_width=2,
frame_rate=16000, channels=1
)
change_in_dBFS = target_dBFS - audio.dBFS
normalized = audio.apply_gain(change_in_dBFS)
return normalized.raw_data
@staticmethod
def apply_compression(audio_data, threshold=-20.0, ratio=4.0):
audio = AudioSegment(
data=audio_data, sample_width=2,
frame_rate=16000, channels=1
)
compressed = compress_dynamic_range(
audio, threshold=threshold, ratio=ratio
)
return compressed.raw_data
@staticmethod
def enhance_speech(audio_data):
normalized = AudioEnhancer.normalize_audio(audio_data)
compressed = AudioEnhancer.apply_compression(normalized)
return compressedBuilding a Complete Audio Pipeline
Integrate all components into a production-ready pipeline:
class CleanAudioPipeline:
def __init__(self):
self.capture = AudioStreamCapture()
self.noise_reducer = AudioNoiseReducer()
self.vad = VoiceActivityDetector()
self.enhancer = AudioEnhancer()
self.processed_chunks = []
def initialize(self, noise_sample_duration=2.0):
print("Initializing audio pipeline...")
self.capture.start_capture()
print(f"Collecting {noise_sample_duration}s noise sample...")
noise_data = b''
import time
end_time = time.time() + noise_sample_duration
while time.time() duration:
break
chunk = self.capture.get_audio_chunk(timeout=1.0)
if not chunk:
continue
denoised = self.noise_reducer.reduce_noise(chunk)
filtered = self.noise_reducer.apply_bandpass_filter(denoised)
if self.vad.is_speech(filtered):
enhanced = self.enhancer.enhance_speech(filtered)
self.processed_chunks.append(enhanced)
def save_clean_audio(self, filename="clean_audio.wav"):
if not self.processed_chunks:
print("No processed audio to save")
return
combined_audio = b''.join(self.processed_chunks)
with wave.open(filename, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(combined_audio)
print(f"Clean audio saved: {filename}")
def stop(self):
self.capture.stop_capture()
print("Pipeline stopped")
if __name__ == "__main__":
pipeline = CleanAudioPipeline()
try:
pipeline.initialize(noise_sample_duration=2.0)
pipeline.process_stream(duration=60)
pipeline.save_clean_audio("meeting_clean.wav")
except KeyboardInterrupt:
print("Stopping...")
finally:
pipeline.stop()Platform-Specific Optimizations
Different platforms require different approaches. For Zoom, capture virtual audio devices. For Google Meet, intercept WebRTC audio tracks. For Teams, use the Bot Framework media platform. Each platform has specific latency characteristics and audio encoding formats that affect your pipeline configuration.
Monitoring Audio Quality
Implement real-time quality monitoring:
def calculate_snr(audio_data, noise_profile):
signal_power = np.mean(np.abs(audio_data) ** 2)
noise_power = np.mean(np.abs(noise_profile) ** 2)
if noise_power > 0:
snr = 10 * np.log10(signal_power / noise_power)
return snr
return float('inf')Monitor SNR in real-time. Values above 20dB indicate excellent quality. Below 10dB requires aggressive noise reduction or source-level fixes.
Performance Optimization
Process audio in separate threads to prevent blocking. Use circular buffers for efficient memory management. Batch small chunks before processing to reduce overhead. Profile your pipeline to identify bottlenecksânoise reduction and FFT operations typically consume the most CPU time.
Your audio pipeline now captures clean, professional-quality audio streams suitable for accurate transcription. The combination of noise reduction, VAD, and normalization ensures consistent quality regardless of meeting conditions.
Conclusion
Capturing clean audio streams requires careful attention to format selection, noise reduction, voice activity detection, and volume normalization. The pipeline approach shown here processes audio in stages, each improving quality before the next stage. Start with the audio configuration, add noise reduction, implement VAD to filter silence, and normalize the output. This produces transcription-ready audio that maximizes accuracy across all meeting platforms. For teams building meeting bots at scale, MeetStream’s API handles audio capture and processing automatically, letting you focus on building features rather than audio engineering infrastructure.