How to Capture Clean Audio Streams from Meeting Bots

Audio quality determines transcription accuracy. When building meeting bots, capturing clean, interference-free audio streams separates professional solutions from amateur ones. This guide explores proven techniques for capturing high-quality audio from meeting platforms, handling noise reduction, and implementing robust audio processing pipelines.

Understanding Audio Stream Challenges

Meeting bots face unique audio challenges: multiple speakers talking simultaneously, background noise, varying microphone quality, network packet loss, and audio compression artifacts. Your bot must handle these issues while maintaining synchronization between audio and metadata like speaker identification.

Audio Format Fundamentals

Start with the right audio configuration:

import pyaudio
import wave

class AudioConfig:
    """Optimal audio settings for speech recognition"""
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    CHUNK = 1024

    @staticmethod
    def get_optimal_config():
        """Return configuration dict for audio capture"""
        return {
            'format': AudioConfig.FORMAT,
            'channels': AudioConfig.CHANNELS,
            'rate': AudioConfig.RATE,
            'chunk': AudioConfig.CHUNK,
            'sample_width': pyaudio.get_sample_size(AudioConfig.FORMAT)
        }

Use 16kHz sample rate for speech—it captures frequencies up to 8kHz, covering the entire human speech range while keeping file sizes manageable. Mono audio simplifies processing and reduces bandwidth without losing speech intelligibility.

Capturing Raw Audio Streams

Implement a robust audio capture system that handles interruptions gracefully:

import threading
import queue
import numpy as np

class AudioStreamCapture:
    def __init__(self, config=None):
        self.config = config or AudioConfig.get_optimal_config()
        self.audio = pyaudio.PyAudio()
        self.stream = None
        self.audio_queue = queue.Queue()
        self.is_capturing = False
        self.capture_thread = None

    def start_capture(self, device_index=None):
        self.is_capturing = True
        try:
            self.stream = self.audio.open(
                format=self.config['format'],
                channels=self.config['channels'],
                rate=self.config['rate'],
                input=True,
                input_device_index=device_index,
                frames_per_buffer=self.config['chunk'],
                stream_callback=self._audio_callback
            )
            self.stream.start_stream()
            print("Audio capture started successfully")
        except Exception as e:
            print(f"Error starting audio capture: {e}")
            self._recover_stream()

    def _audio_callback(self, in_data, frame_count, time_info, status):
        if status:
            print(f"Audio status warning: {status}")
        if self.is_capturing:
            self.audio_queue.put(in_data)
        return (in_data, pyaudio.paContinue)

    def _recover_stream(self):
        print("Attempting to recover audio stream...")
        if self.stream:
            self.stream.close()
        import time
        time.sleep(1)
        self.start_capture()

    def get_audio_chunk(self, timeout=1.0):
        try:
            return self.audio_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def stop_capture(self):
        self.is_capturing = False
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        self.audio.terminate()
        print("Audio capture stopped")

Implementing Noise Reduction

Clean audio requires aggressive noise reduction. Use spectral subtraction and bandpass filtering:

from scipy import signal
import numpy as np

class AudioNoiseReducer:
    def __init__(self, sample_rate=16000):
        self.sample_rate = sample_rate
        self.noise_profile = None

    def create_noise_profile(self, audio_data, duration=1.0):
        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        audio_float = audio_array.astype(np.float32)
        f, t, stft = signal.stft(audio_float, fs=self.sample_rate, nperseg=512)
        self.noise_profile = np.abs(stft).mean(axis=1)
        print("Noise profile created")

    def reduce_noise(self, audio_data):
        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        audio_float = audio_array.astype(np.float32)
        f, t, stft = signal.stft(audio_float, fs=self.sample_rate, nperseg=512)
        magnitude = np.abs(stft)
        phase = np.angle(stft)
        if self.noise_profile is not None:
            clean_magnitude = magnitude - self.noise_profile[:, np.newaxis]
            clean_magnitude = np.maximum(clean_magnitude, 0)
        else:
            clean_magnitude = magnitude
        clean_stft = clean_magnitude * np.exp(1j * phase)
        _, clean_audio = signal.istft(clean_stft, fs=self.sample_rate, nperseg=512)
        clean_audio = np.clip(clean_audio, -32768, 32767)
        return clean_audio.astype(np.int16).tobytes()

    def apply_bandpass_filter(self, audio_data, lowcut=80, highcut=8000):
        audio_array = np.frombuffer(audio_data, dtype=np.int16)
        audio_float = audio_array.astype(np.float32)
        nyquist = self.sample_rate / 2
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = signal.butter(4, [low, high], btype='band')
        filtered = signal.filtfilt(b, a, audio_float)
        return filtered.astype(np.int16).tobytes()

Voice Activity Detection (VAD)

Save bandwidth and improve processing by detecting speech segments:

import webrtcvad

class VoiceActivityDetector:
    def __init__(self, aggressiveness=3):
        self.vad = webrtcvad.Vad(aggressiveness)
        self.sample_rate = 16000
        self.frame_duration = 30  # ms

    def is_speech(self, audio_chunk):
        try:
            return self.vad.is_speech(audio_chunk, self.sample_rate)
        except Exception as e:
            print(f"VAD error: {e}")
            return False

    def get_speech_segments(self, audio_data):
        frame_size = int(self.sample_rate * self.frame_duration / 1000) * 2
        speech_frames = []
        for i in range(0, len(audio_data), frame_size):
            frame = audio_data[i:i + frame_size]
            if len(frame) < frame_size:
                continue
            if self.is_speech(frame):
                speech_frames.append(frame)
        return b''.join(speech_frames)

Audio Normalization and Enhancement

Normalize volume levels across different speakers:

from pydub import AudioSegment
from pydub.effects import normalize, compress_dynamic_range

class AudioEnhancer:
    @staticmethod
    def normalize_audio(audio_data, target_dBFS=-20.0):
        audio = AudioSegment(
            data=audio_data, sample_width=2,
            frame_rate=16000, channels=1
        )
        change_in_dBFS = target_dBFS - audio.dBFS
        normalized = audio.apply_gain(change_in_dBFS)
        return normalized.raw_data

    @staticmethod
    def apply_compression(audio_data, threshold=-20.0, ratio=4.0):
        audio = AudioSegment(
            data=audio_data, sample_width=2,
            frame_rate=16000, channels=1
        )
        compressed = compress_dynamic_range(
            audio, threshold=threshold, ratio=ratio
        )
        return compressed.raw_data

    @staticmethod
    def enhance_speech(audio_data):
        normalized = AudioEnhancer.normalize_audio(audio_data)
        compressed = AudioEnhancer.apply_compression(normalized)
        return compressed

Building a Complete Audio Pipeline

Integrate all components into a production-ready pipeline:

class CleanAudioPipeline:
    def __init__(self):
        self.capture = AudioStreamCapture()
        self.noise_reducer = AudioNoiseReducer()
        self.vad = VoiceActivityDetector()
        self.enhancer = AudioEnhancer()
        self.processed_chunks = []

    def initialize(self, noise_sample_duration=2.0):
        print("Initializing audio pipeline...")
        self.capture.start_capture()
        print(f"Collecting {noise_sample_duration}s noise sample...")
        noise_data = b''
        import time
        end_time = time.time() + noise_sample_duration
        while time.time()  duration:
                break
            chunk = self.capture.get_audio_chunk(timeout=1.0)
            if not chunk:
                continue
            denoised = self.noise_reducer.reduce_noise(chunk)
            filtered = self.noise_reducer.apply_bandpass_filter(denoised)
            if self.vad.is_speech(filtered):
                enhanced = self.enhancer.enhance_speech(filtered)
                self.processed_chunks.append(enhanced)

    def save_clean_audio(self, filename="clean_audio.wav"):
        if not self.processed_chunks:
            print("No processed audio to save")
            return
        combined_audio = b''.join(self.processed_chunks)
        with wave.open(filename, 'wb') as wf:
            wf.setnchannels(1)
            wf.setsampwidth(2)
            wf.setframerate(16000)
            wf.writeframes(combined_audio)
        print(f"Clean audio saved: {filename}")

    def stop(self):
        self.capture.stop_capture()
        print("Pipeline stopped")

if __name__ == "__main__":
    pipeline = CleanAudioPipeline()
    try:
        pipeline.initialize(noise_sample_duration=2.0)
        pipeline.process_stream(duration=60)
        pipeline.save_clean_audio("meeting_clean.wav")
    except KeyboardInterrupt:
        print("Stopping...")
    finally:
        pipeline.stop()

Platform-Specific Optimizations

Different platforms require different approaches. For Zoom, capture virtual audio devices. For Google Meet, intercept WebRTC audio tracks. For Teams, use the Bot Framework media platform. Each platform has specific latency characteristics and audio encoding formats that affect your pipeline configuration.

Monitoring Audio Quality

Implement real-time quality monitoring:

def calculate_snr(audio_data, noise_profile):
    signal_power = np.mean(np.abs(audio_data) ** 2)
    noise_power = np.mean(np.abs(noise_profile) ** 2)
    if noise_power > 0:
        snr = 10 * np.log10(signal_power / noise_power)
        return snr
    return float('inf')

Monitor SNR in real-time. Values above 20dB indicate excellent quality. Below 10dB requires aggressive noise reduction or source-level fixes.

Performance Optimization

Process audio in separate threads to prevent blocking. Use circular buffers for efficient memory management. Batch small chunks before processing to reduce overhead. Profile your pipeline to identify bottlenecks—noise reduction and FFT operations typically consume the most CPU time.

Your audio pipeline now captures clean, professional-quality audio streams suitable for accurate transcription. The combination of noise reduction, VAD, and normalization ensures consistent quality regardless of meeting conditions.

Conclusion

Capturing clean audio streams requires careful attention to format selection, noise reduction, voice activity detection, and volume normalization. The pipeline approach shown here processes audio in stages, each improving quality before the next stage. Start with the audio configuration, add noise reduction, implement VAD to filter silence, and normalize the output. This produces transcription-ready audio that maximizes accuracy across all meeting platforms. For teams building meeting bots at scale, MeetStream’s API handles audio capture and processing automatically, letting you focus on building features rather than audio engineering infrastructure.

Leave a Reply

Your email address will not be published. Required fields are marked *