How to Detect Speaker Changes in Meeting Audio

Detecting when speakers change transforms continuous audio into structured conversations. Speaker change detection identifies transition points where one person stops talking and another begins, enabling real-time captioning, accurate transcription segmentation, and interactive meeting features. This guide demonstrates multiple approaches to implement robust speaker change detection in meeting bots.

Understanding Speaker Change Detection

Speaker change detection differs from full diarization—it identifies transition moments rather than labeling entire segments. Your system analyzes acoustic features like pitch, energy, and spectral characteristics to detect when the voice pattern shifts, triggering actions like updating live captions or switching camera views.

Feature Extraction for Speaker Detection

Extract meaningful features from audio frames:

import numpy as np

import librosa

from scipy import signal

class AudioFeatureExtractor:

    def __init__(self, sample_rate=16000):

        self.sample_rate = sample_rate

    def extract_mfcc(self, audio_chunk, n_mfcc=13):

        “””Extract Mel-Frequency Cepstral Coefficients”””

        audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)

        # Compute MFCCs

        mfccs = librosa.feature.mfcc(

            y=audio_float,

            sr=self.sample_rate,

            n_mfcc=n_mfcc

        )

        # Return mean across time

        return np.mean(mfccs, axis=1)

    def extract_pitch(self, audio_chunk):

        “””Extract fundamental frequency (pitch)”””

        audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)

        # Compute autocorrelation

        correlation = np.correlate(audio_float, audio_float, mode=’full’)

        correlation = correlation[len(correlation)//2:]

        # Find first peak after zero lag

        peaks = signal.find_peaks(correlation)[0]

        if len(peaks) > 0:

            # Convert lag to frequency

            pitch = self.sample_rate / peaks[0]

            return pitch

        return 0.0

    def extract_energy(self, audio_chunk):

        “””Calculate audio energy”””

        audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)

        energy = np.sum(audio_float ** 2) / len(audio_float)

        return energy

    def extract_zero_crossing_rate(self, audio_chunk):

        “””Calculate zero crossing rate”””

        audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)

        # Count sign changes

        zero_crossings = np.sum(np.abs(np.diff(np.sign(audio_float)))) / 2

        zcr = zero_crossings / len(audio_float)

        return zcr

    def extract_all_features(self, audio_chunk):

        “””Extract complete feature vector”””

        features = {

            ‘mfcc’: self.extract_mfcc(audio_chunk),

            ‘pitch’: self.extract_pitch(audio_chunk),

            ‘energy’: self.extract_energy(audio_chunk),

            ‘zcr’: self.extract_zero_crossing_rate(audio_chunk)

        }

        return features

Distance-Based Change Detection

Detect speaker changes by measuring feature distance between consecutive windows:

from scipy.spatial.distance import euclidean, cosine

class DistanceBasedDetector:

    def __init__(self, threshold=0.5, window_size=1.0, sample_rate=16000):

        self.threshold = threshold

        self.window_size = window_size

        self.sample_rate = sample_rate

        self.feature_extractor = AudioFeatureExtractor(sample_rate)

        self.previous_features = None

    def calculate_distance(self, features1, features2):

        “””Calculate distance between feature vectors”””

        # Compare MFCC features

        mfcc_dist = euclidean(features1[‘mfcc’], features2[‘mfcc’])

        # Normalize by feature dimension

        mfcc_dist /= len(features1[‘mfcc’])

        # Compare pitch

        pitch_diff = abs(features1[‘pitch’] – features2[‘pitch’])

        pitch_dist = pitch_diff / max(features1[‘pitch’], features2[‘pitch’], 1.0)

        # Compare energy

        energy_diff = abs(features1[‘energy’] – features2[‘energy’])

        energy_dist = energy_diff / max(features1[‘energy’], features2[‘energy’], 1.0)

        # Weighted combination

        total_distance = (

            0.6 * mfcc_dist +

            0.25 * pitch_dist +

            0.15 * energy_dist

        )

        return total_distance

    def detect_change(self, audio_chunk):

        “””Detect if speaker changed in this chunk”””

        current_features = self.feature_extractor.extract_all_features(audio_chunk)

        if self.previous_features is None:

            self.previous_features = current_features

            return False

        # Calculate distance from previous window

        distance = self.calculate_distance(self.previous_features, current_features)

        # Update previous features

        self.previous_features = current_features

        # Check if distance exceeds threshold

        if distance > self.threshold:

            return True

        return False

    def detect_with_confidence(self, audio_chunk):

        “””Detect change with confidence score”””

        current_features = self.feature_extractor.extract_all_features(audio_chunk)

        if self.previous_features is None:

            self.previous_features = current_features

            return False, 0.0

        distance = self.calculate_distance(self.previous_features, current_features)

        self.previous_features = current_features

        # Convert distance to confidence (0-1)

        confidence = min(distance / self.threshold, 1.0)

        return distance > self.threshold, confidence

Bayesian Change Point Detection

Use statistical methods to identify change points:

import numpy as np

from scipy import stats

class BayesianChangeDetector:

    def __init__(self, threshold=0.5, window_length=20):

        self.threshold = threshold

        self.window_length = window_length

        self.feature_buffer = []

        self.feature_extractor = AudioFeatureExtractor()

    def add_frame(self, audio_chunk):

        “””Add new audio frame to buffer”””

        features = self.feature_extractor.extract_all_features(audio_chunk)

        # Convert to single feature vector

        feature_vector = np.concatenate([

            features[‘mfcc’],

            [features[‘pitch’], features[‘energy’], features[‘zcr’]]

        ])

        self.feature_buffer.append(feature_vector)

        # Maintain window size

        if len(self.feature_buffer) > self.window_length:

            self.feature_buffer.pop(0)

    def detect_change(self):

        “””Detect change point using Bayesian approach”””

        if len(self.feature_buffer) < self.window_length:

            return False, 0.0

        # Split buffer into two halves

        mid_point = len(self.feature_buffer) // 2

        first_half = np.array(self.feature_buffer[:mid_point])

        second_half = np.array(self.feature_buffer[mid_point:])

        # Calculate statistics for each half

        mean1 = np.mean(first_half, axis=0)

        mean2 = np.mean(second_half, axis=0)

        var1 = np.var(first_half, axis=0)

        var2 = np.var(second_half, axis=0)

        # Compute T-test statistic

        t_stats = []

        for i in range(len(mean1)):

            if var1[i] + var2[i] > 0:

                t = abs(mean1[i] – mean2[i]) / np.sqrt(var1[i] + var2[i])

                t_stats.append(t)

        # Aggregate test statistics

        change_score = np.mean(t_stats) if t_stats else 0.0

        # Check threshold

        is_change = change_score > self.threshold

        confidence = min(change_score / self.threshold, 1.0)

        return is_change, confidence

Machine Learning-Based Detection

Train a neural network for speaker change detection:

from sklearn.ensemble import RandomForestClassifier

import pickle

class MLChangeDetector:

    def __init__(self, model_path=None):

        self.feature_extractor = AudioFeatureExtractor()

        self.model = None

        self.feature_buffer = []

        self.buffer_size = 5

        if model_path:

            self.load_model(model_path)

        else:

            # Initialize new model

            self.model = RandomForestClassifier(

                n_estimators=100,

                max_depth=10,

                random_state=42

            )

    def prepare_features(self, audio_chunks):

        “””Extract features from multiple consecutive chunks”””

        features_list = []

        for chunk in audio_chunks:

            features = self.feature_extractor.extract_all_features(chunk)

            # Flatten to single vector

            feature_vector = np.concatenate([

                features[‘mfcc’],

                [features[‘pitch’], features[‘energy’], features[‘zcr’]]

            ])

            features_list.append(feature_vector)

        # Concatenate temporal context

        return np.concatenate(features_list)

    def train(self, audio_chunks, labels):

        “””Train model on labeled data”””

        X = []

        y = []

        # Create training samples with temporal context

        for i in range(self.buffer_size, len(audio_chunks)):

            context = audio_chunks[i-self.buffer_size:i]

            features = self.prepare_features(context)

            X.append(features)

            y.append(labels[i])

        # Train model

        self.model.fit(np.array(X), np.array(y))

        print(“Model trained successfully”)

    def detect_change(self, audio_chunk):

        “””Detect speaker change using trained model”””

        self.feature_buffer.append(audio_chunk)

        # Maintain buffer size

        if len(self.feature_buffer) > self.buffer_size:

            self.feature_buffer.pop(0)

        # Need enough context

        if len(self.feature_buffer) < self.buffer_size:

            return False, 0.0

        # Prepare features

        features = self.prepare_features(self.feature_buffer)

        # Predict

        prediction = self.model.predict([features])[0]

        probabilities = self.model.predict_proba([features])[0]

        # Get confidence for change class

        confidence = probabilities[1] if prediction == 1 else probabilities[0]

        return bool(prediction), confidence

    def save_model(self, path):

        “””Save trained model”””

        with open(path, ‘wb’) as f:

            pickle.dump(self.model, f)

    def load_model(self, path):

        “””Load pre-trained model”””

        with open(path, ‘rb’) as f:

            self.model = pickle.load(f)

Real-Time Speaker Change Tracking

Implement real-time detection with smoothing:

from collections import deque

import time

class RealtimeChangeTracker:

    def __init__(self, detector, smoothing_window=3):

        self.detector = detector

        self.smoothing_window = smoothing_window

        self.detection_history = deque(maxlen=smoothing_window)

        self.current_speaker_id = 0

        self.speaker_segments = []

        self.segment_start_time = None

    def process_chunk(self, audio_chunk, timestamp):

        “””Process audio chunk and track speaker changes”””

        is_change, confidence = self.detector.detect_with_confidence(audio_chunk)

        # Add to history

        self.detection_history.append(is_change)

        # Smooth detection using voting

        change_votes = sum(self.detection_history)

        smoothed_change = change_votes > (self.smoothing_window / 2)

        if smoothed_change:

            # Speaker changed

            self._record_speaker_change(timestamp, confidence)

            return True, confidence

        return False, confidence

    def _record_speaker_change(self, timestamp, confidence):

        “””Record speaker change event”””

        if self.segment_start_time is not None:

            # Save previous segment

            segment = {

                ‘speaker_id’: self.current_speaker_id,

                ‘start_time’: self.segment_start_time,

                ‘end_time’: timestamp,

                ‘duration’: timestamp – self.segment_start_time

            }

            self.speaker_segments.append(segment)

        # Start new segment

        self.current_speaker_id += 1

        self.segment_start_time = timestamp

        print(f”[{timestamp:.2f}s] Speaker change detected (confidence: {confidence:.2f})”)

    def get_current_speaker(self):

        “””Get current speaker ID”””

        return self.current_speaker_id

    def get_speaker_timeline(self):

        “””Get complete speaker timeline”””

        return self.speaker_segments

    def finalize(self, end_timestamp):

        “””Finalize tracking and close last segment”””

        if self.segment_start_time is not None:

            segment = {

                ‘speaker_id’: self.current_speaker_id,

                ‘start_time’: self.segment_start_time,

                ‘end_time’: end_timestamp,

                ‘duration’: end_timestamp – self.segment_start_time

            }

            self.speaker_segments.append(segment)

Complete Speaker Change Detection System

Integrate all components:

import pyaudio

import wave

class SpeakerChangeDetectionSystem:

    def __init__(self, detection_method=’distance’):

        self.sample_rate = 16000

        self.chunk_size = int(self.sample_rate * 0.5)  # 0.5 second chunks

        # Initialize detector based on method

        if detection_method == ‘distance’:

            detector = DistanceBasedDetector(threshold=0.5)

        elif detection_method == ‘bayesian’:

            detector = BayesianChangeDetector(threshold=0.5)

        elif detection_method == ‘ml’:

            detector = MLChangeDetector()

        else:

            raise ValueError(f”Unknown detection method: {detection_method}”)

        self.tracker = RealtimeChangeTracker(detector, smoothing_window=3)

        self.audio = pyaudio.PyAudio()

        self.stream = None

    def start_detection(self, duration=None):

        “””Start real-time speaker change detection”””

        print(“Starting speaker change detection…”)

        self.stream = self.audio.open(

            format=pyaudio.paInt16,

            channels=1,

            rate=self.sample_rate,

            input=True,

            frames_per_buffer=self.chunk_size

        )

        start_time = time.time()

        try:

            while True:

                # Check duration limit

                if duration and (time.time() – start_time) >= duration:

                    break

                # Read audio chunk

                audio_chunk = self.stream.read(self.chunk_size, exception_on_overflow=False)

                current_time = time.time() – start_time

                # Detect speaker change

                is_change, confidence = self.tracker.process_chunk(

                    audio_chunk,

                    current_time

                )

                # Display current speaker

                if is_change:

                    speaker_id = self.tracker.get_current_speaker()

                    print(f”Now speaking: Speaker {speaker_id}”)

        except KeyboardInterrupt:

            print(“\nStopping detection…”)

        finally:

            self.stop_detection()

    def stop_detection(self):

        “””Stop detection and save results”””

        if self.stream:

            self.stream.stop_stream()

            self.stream.close()

        self.audio.terminate()

        # Finalize tracking

        end_time = time.time()

        self.tracker.finalize(end_time)

        # Print summary

        self._print_summary()

    def _print_summary(self):

        “””Print detection summary”””

        timeline = self.tracker.get_speaker_timeline()

        print(“\n” + “=”*60)

        print(“SPEAKER CHANGE DETECTION SUMMARY”)

        print(“=”*60)

        print(f”Total speakers detected: {len(set(s[‘speaker_id’] for s in timeline))}”)

        print(f”Total speaker changes: {len(timeline) – 1}”)

        print(“\nSpeaker Timeline:”)

        for segment in timeline:

            print(f”  Speaker {segment[‘speaker_id’]}: “

                  f”{segment[‘start_time’]:.2f}s – {segment[‘end_time’]:.2f}s “

                  f”(duration: {segment[‘duration’]:.2f}s)”)

# Usage example

if __name__ == “__main__”:

    # Choose detection method: ‘distance’, ‘bayesian’, or ‘ml’

    system = SpeakerChangeDetectionSystem(detection_method=’distance’)

    try:

        # Run detection for 60 seconds

        system.start_detection(duration=60)

    except Exception as e:

        print(f”Error: {e}”)

        system.stop_detection()

Performance Optimization

Reduce latency by processing audio in parallel threads. Use smaller chunk sizes (0.3-0.5 seconds) for faster detection but maintain a smoothing window to prevent false positives. Cache feature extraction results when analyzing overlapping windows.

Tune thresholds based on your meeting environment—formal meetings with clear turn-taking need lower thresholds than casual conversations with frequent interruptions. Monitor false positive rates and adjust accordingly.

Validation and Testing

Test your detector with ground truth data:

def evaluate_detector(audio_file, ground_truth_changes):

    “””Evaluate detector accuracy”””

    # Run detector

    detected_changes = run_detector(audio_file)

    # Calculate metrics

    true_positives = 0

    false_positives = 0

    false_negatives = 0

    tolerance = 0.5  # seconds

    for detected in detected_changes:

        matched = False

        for truth in ground_truth_changes:

            if abs(detected – truth) <= tolerance:

                true_positives += 1

                matched = True

                break

        if not matched:

            false_positives += 1

    false_negatives = len(ground_truth_changes) – true_positives

    precision = true_positives / (true_positives + false_positives)

    recall = true_positives / (true_positives + false_negatives)

    f1_score = 2 * (precision * recall) / (precision + recall)

    return {‘precision’: precision, ‘recall’: recall, ‘f1’: f1_score}

Your speaker change detection system now accurately identifies when speakers transition, enabling real-time meeting features, improved transcription segmentation, and interactive participant tracking.

Conclusion

Implementing robust speaker change detection requires combining acoustic feature extraction, statistical analysis, and real-time tracking with appropriate smoothing to deliver accurate, low-latency results for meeting applications.If you want production-ready speaker change detection without building complex pipelines, consider Meetstream.ai API, which provides automatic speaker tracking and identification across all major meeting platforms.

Leave a Reply

Your email address will not be published. Required fields are marked *