How to Capture Clean Audio Streams from Meeting Bots

Audio quality determines transcription accuracy. When building meeting bots, capturing clean, interference-free audio streams separates professional solutions from amateur ones. This guide explores proven techniques for capturing high-quality audio from meeting platforms, handling noise reduction, and implementing robust audio processing pipelines.

Understanding Audio Stream Challenges

Meeting bots face unique audio challenges: multiple speakers talking simultaneously, background noise, varying microphone quality, network packet loss, and audio compression artifacts. Your bot must handle these issues while maintaining synchronization between audio and metadata like speaker identification.

Audio Format Fundamentals

Start with the right audio configuration:

import pyaudio

import wave

class AudioConfig:

    “””Optimal audio settings for speech recognition”””

    # Format specifications

    FORMAT = pyaudio.paInt16  # 16-bit PCM

    CHANNELS = 1              # Mono audio

    RATE = 16000              # 16kHz sample rate

    CHUNK = 1024              # Frames per buffer

    @staticmethod

    def get_optimal_config():

        “””Return configuration dict for audio capture”””

        return {

            ‘format’: AudioConfig.FORMAT,

            ‘channels’: AudioConfig.CHANNELS,

            ‘rate’: AudioConfig.RATE,

            ‘chunk’: AudioConfig.CHUNK,

            ‘sample_width’: pyaudio.get_sample_size(AudioConfig.FORMAT)

        }

Use 16kHz sample rate for speech—it captures frequencies up to 8kHz, covering the entire human speech range while keeping file sizes manageable. Mono audio simplifies processing and reduces bandwidth without losing speech intelligibility.

Capturing Raw Audio Streams

Implement a robust audio capture system that handles interruptions gracefully:

import threading

import queue

import numpy as np

class AudioStreamCapture:

    def __init__(self, config=None):

        self.config = config or AudioConfig.get_optimal_config()

        self.audio = pyaudio.PyAudio()

        self.stream = None

        self.audio_queue = queue.Queue()

        self.is_capturing = False

        self.capture_thread = None

    def start_capture(self, device_index=None):

        “””Start capturing audio with error recovery”””

        self.is_capturing = True

        try:

            self.stream = self.audio.open(

                format=self.config[‘format’],

                channels=self.config[‘channels’],

                rate=self.config[‘rate’],

                input=True,

                input_device_index=device_index,

                frames_per_buffer=self.config[‘chunk’],

                stream_callback=self._audio_callback

            )

            self.stream.start_stream()

            print(“Audio capture started successfully”)

        except Exception as e:

            print(f”Error starting audio capture: {e}”)

            self._recover_stream()

    def _audio_callback(self, in_data, frame_count, time_info, status):

        “””Handle incoming audio frames”””

        if status:

            print(f”Audio stream status: {status}”)

        if self.is_capturing:

            self.audio_queue.put(in_data)

        return (in_data, pyaudio.paContinue)

    def _recover_stream(self):

        “””Attempt to recover from stream errors”””

        print(“Attempting to recover audio stream…”)

        if self.stream:

            self.stream.close()

        import time

        time.sleep(1)

        self.start_capture()

    def get_audio_chunk(self, timeout=1.0):

        “””Retrieve next audio chunk from queue”””

        try:

            return self.audio_queue.get(timeout=timeout)

        except queue.Empty:

            return None

    def stop_capture(self):

        “””Stop audio capture gracefully”””

        self.is_capturing = False

        if self.stream:

            self.stream.stop_stream()

            self.stream.close()

        self.audio.terminate()

        print(“Audio capture stopped”)

Implementing Noise Reduction

Clean audio requires aggressive noise reduction. Use spectral subtraction and bandpass filtering:

from scipy import signal

import numpy as np

class AudioNoiseReducer:

    def __init__(self, sample_rate=16000):

        self.sample_rate = sample_rate

        self.noise_profile = None

    def create_noise_profile(self, audio_data, duration=1.0):

        “””Build noise profile from silent audio segment”””

        audio_array = np.frombuffer(audio_data, dtype=np.int16)

        audio_float = audio_array.astype(np.float32) / 32768.0

        # Calculate noise spectrum

        f, t, stft = signal.stft(

            audio_float,

            fs=self.sample_rate,

            nperseg=256

        )

        self.noise_profile = np.abs(stft).mean(axis=1)

        print(“Noise profile created”)

    def reduce_noise(self, audio_data):

        “””Apply spectral subtraction to reduce noise”””

        audio_array = np.frombuffer(audio_data, dtype=np.int16)

        audio_float = audio_array.astype(np.float32) / 32768.0

        # Apply STFT

        f, t, stft = signal.stft(

            audio_float,

            fs=self.sample_rate,

            nperseg=256

        )

        # Spectral subtraction

        magnitude = np.abs(stft)

        phase = np.angle(stft)

        if self.noise_profile is not None:

            # Subtract noise profile

            clean_magnitude = magnitude – self.noise_profile[:, np.newaxis]

            clean_magnitude = np.maximum(clean_magnitude, 0.1 * magnitude)

        else:

            clean_magnitude = magnitude

        # Reconstruct signal

        clean_stft = clean_magnitude * np.exp(1j * phase)

        _, clean_audio = signal.istft(

            clean_stft,

            fs=self.sample_rate,

            nperseg=256

        )

        # Convert back to int16

        clean_audio = np.clip(clean_audio * 32768.0, -32768, 32767)

        return clean_audio.astype(np.int16).tobytes()

    def apply_bandpass_filter(self, audio_data, lowcut=80, highcut=8000):

        “””Apply bandpass filter for speech frequencies”””

        audio_array = np.frombuffer(audio_data, dtype=np.int16)

        audio_float = audio_array.astype(np.float32)

        # Design Butterworth bandpass filter

        nyquist = self.sample_rate / 2

        low = lowcut / nyquist

        high = highcut / nyquist

        b, a = signal.butter(4, [low, high], btype=’band’)

        # Apply filter

        filtered = signal.filtfilt(b, a, audio_float)

        return filtered.astype(np.int16).tobytes()

Voice Activity Detection (VAD)

Save bandwidth and improve processing by detecting speech segments:

import webrtcvad

class VoiceActivityDetector:

    def __init__(self, aggressiveness=3):

        “””Initialize VAD with aggressiveness level (0-3)”””

        self.vad = webrtcvad.Vad(aggressiveness)

        self.sample_rate = 16000

        self.frame_duration = 30  # milliseconds

    def is_speech(self, audio_chunk):

        “””Detect if audio chunk contains speech”””

        try:

            return self.vad.is_speech(audio_chunk, self.sample_rate)

        except Exception as e:

            print(f”VAD error: {e}”)

            return False

    def get_speech_segments(self, audio_data):

        “””Extract only speech segments from audio”””

        frame_size = int(self.sample_rate * self.frame_duration / 1000) * 2

        speech_frames = []

        # Process in frames

        for i in range(0, len(audio_data), frame_size):

            frame = audio_data[i:i + frame_size]

            if len(frame) < frame_size:

                break

            if self.is_speech(frame):

                speech_frames.append(frame)

        return b”.join(speech_frames)

Audio Normalization and Enhancement

Normalize volume levels across different speakers:

from pydub import AudioSegment

from pydub.effects import normalize, compress_dynamic_range

class AudioEnhancer:

    @staticmethod

    def normalize_audio(audio_data, target_dBFS=-20.0):

        “””Normalize audio to target loudness level”””

        # Convert to AudioSegment

        audio = AudioSegment(

            data=audio_data,

            sample_width=2,

            frame_rate=16000,

            channels=1

        )

        # Normalize to target dBFS

        change_in_dBFS = target_dBFS – audio.dBFS

        normalized = audio.apply_gain(change_in_dBFS)

        return normalized.raw_data

    @staticmethod

    def apply_compression(audio_data, threshold=-20.0, ratio=4.0):

        “””Apply dynamic range compression”””

        audio = AudioSegment(

            data=audio_data,

            sample_width=2,

            frame_rate=16000,

            channels=1

        )

        compressed = compress_dynamic_range(

            audio,

            threshold=threshold,

            ratio=ratio

        )

        return compressed.raw_data

    @staticmethod

    def enhance_speech(audio_data):

        “””Apply full enhancement pipeline”””

        # Normalize volume

        normalized = AudioEnhancer.normalize_audio(audio_data)

        # Apply compression to reduce dynamic range

        compressed = AudioEnhancer.apply_compression(normalized)

        return compressed

Building a Complete Audio Pipeline

Integrate all components into a production-ready pipeline:

class CleanAudioPipeline:

    def __init__(self):

        self.capture = AudioStreamCapture()

        self.noise_reducer = AudioNoiseReducer()

        self.vad = VoiceActivityDetector()

        self.enhancer = AudioEnhancer()

        self.processed_chunks = []

    def initialize(self, noise_sample_duration=2.0):

        “””Initialize pipeline with noise profiling”””

        print(“Initializing audio pipeline…”)

        # Start capture

        self.capture.start_capture()

        # Collect noise profile

        print(f”Collecting {noise_sample_duration}s noise sample…”)

        noise_data = b”

        import time

        end_time = time.time() + noise_sample_duration

        while time.time() < end_time:

            chunk = self.capture.get_audio_chunk()

            if chunk:

                noise_data += chunk

        # Create noise profile

        self.noise_reducer.create_noise_profile(noise_data)

        print(“Pipeline initialized successfully”)

    def process_stream(self, duration=None):

        “””Process audio stream with full cleaning pipeline”””

        print(“Processing audio stream…”)

        import time

        start_time = time.time()

        while True:

            if duration and (time.time() – start_time) >= duration:

                break

            # Get audio chunk

            chunk = self.capture.get_audio_chunk()

            if not chunk:

                continue

            # Apply noise reduction

            denoised = self.noise_reducer.reduce_noise(chunk)

            # Apply bandpass filter

            filtered = self.noise_reducer.apply_bandpass_filter(denoised)

            # Check for speech

            if self.vad.is_speech(filtered):

                # Enhance speech

                enhanced = self.enhancer.enhance_speech(filtered)

                self.processed_chunks.append(enhanced)

    def save_clean_audio(self, filename=”clean_audio.wav”):

        “””Save processed audio to file”””

        if not self.processed_chunks:

            print(“No audio to save”)

            return

        combined_audio = b”.join(self.processed_chunks)

        with wave.open(filename, ‘wb’) as wf:

            wf.setnchannels(1)

            wf.setsampwidth(2)

            wf.setframerate(16000)

            wf.writeframes(combined_audio)

        print(f”Clean audio saved: {filename}”)

    def stop(self):

        “””Stop pipeline and cleanup”””

        self.capture.stop_capture()

        print(“Pipeline stopped”)

# Usage example

if __name__ == “__main__”:

    pipeline = CleanAudioPipeline()

    try:

        # Initialize with noise profiling

        pipeline.initialize(noise_sample_duration=2.0)

        # Process audio for 60 seconds

        pipeline.process_stream(duration=60)

        # Save clean audio

        pipeline.save_clean_audio(“meeting_clean.wav”)

    except KeyboardInterrupt:

        print(“\nStopping…”)

    finally:

        pipeline.stop()

Platform-Specific Optimizations

Different platforms require different approaches. For Zoom, capture virtual audio devices. For Google Meet and Teams, intercept WebRTC streams. Use platform SDKs when available—they provide higher quality audio than system capture.

Monitoring Audio Quality

Implement real-time quality monitoring:

def calculate_snr(audio_data, noise_profile):

    “””Calculate Signal-to-Noise Ratio”””

    signal_power = np.mean(np.abs(audio_data) ** 2)

    noise_power = np.mean(np.abs(noise_profile) ** 2)

    if noise_power > 0:

        snr = 10 * np.log10(signal_power / noise_power)

        return snr

    return float(‘inf’)

Monitor SNR in real-time. Values above 20dB indicate excellent quality. Below 10dB requires aggressive noise reduction.

Performance Optimization

Process audio in separate threads to prevent blocking. Use circular buffers for efficient memory management. Batch audio chunks before sending to transcription services—reduces API calls and improves accuracy.

Your audio pipeline now captures clean, professional-quality audio streams suitable for accurate transcription, with noise reduction, voice activity detection, and dynamic enhancement working together seamlessly.

Conclusion

Capturing clean audio streams requires careful attention to format selection, noise reduction, voice activity detection, and real-time enhancement. Implementing these techniques significantly improves transcription accuracy and user experience in production meeting bots.If you want enterprise-grade audio processing without building complex pipelines, consider Meetstream.ai API, which handles all audio optimization automatically for Zoom, Google Meet, and Microsoft Teams.

Leave a Reply

Your email address will not be published. Required fields are marked *