Detecting when speakers change transforms continuous audio into structured conversations. Speaker change detection identifies transition points where one person stops talking and another begins, enabling real-time captioning, accurate transcription segmentation, and interactive meeting features. This guide demonstrates multiple approaches to implement robust speaker change detection in meeting bots.
Understanding Speaker Change Detection
Speaker change detection differs from full diarization—it identifies transition moments rather than labeling entire segments. Your system analyzes acoustic features like pitch, energy, and spectral characteristics to detect when the voice pattern shifts, triggering actions like updating live captions or switching camera views.
Feature Extraction for Speaker Detection
Extract meaningful features from audio frames:
import numpy as np
import librosa
from scipy import signal
class AudioFeatureExtractor:
def __init__(self, sample_rate=16000):
self.sample_rate = sample_rate
def extract_mfcc(self, audio_chunk, n_mfcc=13):
“””Extract Mel-Frequency Cepstral Coefficients”””
audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)
# Compute MFCCs
mfccs = librosa.feature.mfcc(
y=audio_float,
sr=self.sample_rate,
n_mfcc=n_mfcc
)
# Return mean across time
return np.mean(mfccs, axis=1)
def extract_pitch(self, audio_chunk):
“””Extract fundamental frequency (pitch)”””
audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)
# Compute autocorrelation
correlation = np.correlate(audio_float, audio_float, mode=’full’)
correlation = correlation[len(correlation)//2:]
# Find first peak after zero lag
peaks = signal.find_peaks(correlation)[0]
if len(peaks) > 0:
# Convert lag to frequency
pitch = self.sample_rate / peaks[0]
return pitch
return 0.0
def extract_energy(self, audio_chunk):
“””Calculate audio energy”””
audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)
energy = np.sum(audio_float ** 2) / len(audio_float)
return energy
def extract_zero_crossing_rate(self, audio_chunk):
“””Calculate zero crossing rate”””
audio_float = np.frombuffer(audio_chunk, dtype=np.int16).astype(float)
# Count sign changes
zero_crossings = np.sum(np.abs(np.diff(np.sign(audio_float)))) / 2
zcr = zero_crossings / len(audio_float)
return zcr
def extract_all_features(self, audio_chunk):
“””Extract complete feature vector”””
features = {
‘mfcc’: self.extract_mfcc(audio_chunk),
‘pitch’: self.extract_pitch(audio_chunk),
‘energy’: self.extract_energy(audio_chunk),
‘zcr’: self.extract_zero_crossing_rate(audio_chunk)
}
return features
Distance-Based Change Detection
Detect speaker changes by measuring feature distance between consecutive windows:
from scipy.spatial.distance import euclidean, cosine
class DistanceBasedDetector:
def __init__(self, threshold=0.5, window_size=1.0, sample_rate=16000):
self.threshold = threshold
self.window_size = window_size
self.sample_rate = sample_rate
self.feature_extractor = AudioFeatureExtractor(sample_rate)
self.previous_features = None
def calculate_distance(self, features1, features2):
“””Calculate distance between feature vectors”””
# Compare MFCC features
mfcc_dist = euclidean(features1[‘mfcc’], features2[‘mfcc’])
# Normalize by feature dimension
mfcc_dist /= len(features1[‘mfcc’])
# Compare pitch
pitch_diff = abs(features1[‘pitch’] – features2[‘pitch’])
pitch_dist = pitch_diff / max(features1[‘pitch’], features2[‘pitch’], 1.0)
# Compare energy
energy_diff = abs(features1[‘energy’] – features2[‘energy’])
energy_dist = energy_diff / max(features1[‘energy’], features2[‘energy’], 1.0)
# Weighted combination
total_distance = (
0.6 * mfcc_dist +
0.25 * pitch_dist +
0.15 * energy_dist
)
return total_distance
def detect_change(self, audio_chunk):
“””Detect if speaker changed in this chunk”””
current_features = self.feature_extractor.extract_all_features(audio_chunk)
if self.previous_features is None:
self.previous_features = current_features
return False
# Calculate distance from previous window
distance = self.calculate_distance(self.previous_features, current_features)
# Update previous features
self.previous_features = current_features
# Check if distance exceeds threshold
if distance > self.threshold:
return True
return False
def detect_with_confidence(self, audio_chunk):
“””Detect change with confidence score”””
current_features = self.feature_extractor.extract_all_features(audio_chunk)
if self.previous_features is None:
self.previous_features = current_features
return False, 0.0
distance = self.calculate_distance(self.previous_features, current_features)
self.previous_features = current_features
# Convert distance to confidence (0-1)
confidence = min(distance / self.threshold, 1.0)
return distance > self.threshold, confidence
Bayesian Change Point Detection
Use statistical methods to identify change points:
import numpy as np
from scipy import stats
class BayesianChangeDetector:
def __init__(self, threshold=0.5, window_length=20):
self.threshold = threshold
self.window_length = window_length
self.feature_buffer = []
self.feature_extractor = AudioFeatureExtractor()
def add_frame(self, audio_chunk):
“””Add new audio frame to buffer”””
features = self.feature_extractor.extract_all_features(audio_chunk)
# Convert to single feature vector
feature_vector = np.concatenate([
features[‘mfcc’],
[features[‘pitch’], features[‘energy’], features[‘zcr’]]
])
self.feature_buffer.append(feature_vector)
# Maintain window size
if len(self.feature_buffer) > self.window_length:
self.feature_buffer.pop(0)
def detect_change(self):
“””Detect change point using Bayesian approach”””
if len(self.feature_buffer) < self.window_length:
return False, 0.0
# Split buffer into two halves
mid_point = len(self.feature_buffer) // 2
first_half = np.array(self.feature_buffer[:mid_point])
second_half = np.array(self.feature_buffer[mid_point:])
# Calculate statistics for each half
mean1 = np.mean(first_half, axis=0)
mean2 = np.mean(second_half, axis=0)
var1 = np.var(first_half, axis=0)
var2 = np.var(second_half, axis=0)
# Compute T-test statistic
t_stats = []
for i in range(len(mean1)):
if var1[i] + var2[i] > 0:
t = abs(mean1[i] – mean2[i]) / np.sqrt(var1[i] + var2[i])
t_stats.append(t)
# Aggregate test statistics
change_score = np.mean(t_stats) if t_stats else 0.0
# Check threshold
is_change = change_score > self.threshold
confidence = min(change_score / self.threshold, 1.0)
return is_change, confidence
Machine Learning-Based Detection
Train a neural network for speaker change detection:
from sklearn.ensemble import RandomForestClassifier
import pickle
class MLChangeDetector:
def __init__(self, model_path=None):
self.feature_extractor = AudioFeatureExtractor()
self.model = None
self.feature_buffer = []
self.buffer_size = 5
if model_path:
self.load_model(model_path)
else:
# Initialize new model
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
def prepare_features(self, audio_chunks):
“””Extract features from multiple consecutive chunks”””
features_list = []
for chunk in audio_chunks:
features = self.feature_extractor.extract_all_features(chunk)
# Flatten to single vector
feature_vector = np.concatenate([
features[‘mfcc’],
[features[‘pitch’], features[‘energy’], features[‘zcr’]]
])
features_list.append(feature_vector)
# Concatenate temporal context
return np.concatenate(features_list)
def train(self, audio_chunks, labels):
“””Train model on labeled data”””
X = []
y = []
# Create training samples with temporal context
for i in range(self.buffer_size, len(audio_chunks)):
context = audio_chunks[i-self.buffer_size:i]
features = self.prepare_features(context)
X.append(features)
y.append(labels[i])
# Train model
self.model.fit(np.array(X), np.array(y))
print(“Model trained successfully”)
def detect_change(self, audio_chunk):
“””Detect speaker change using trained model”””
self.feature_buffer.append(audio_chunk)
# Maintain buffer size
if len(self.feature_buffer) > self.buffer_size:
self.feature_buffer.pop(0)
# Need enough context
if len(self.feature_buffer) < self.buffer_size:
return False, 0.0
# Prepare features
features = self.prepare_features(self.feature_buffer)
# Predict
prediction = self.model.predict([features])[0]
probabilities = self.model.predict_proba([features])[0]
# Get confidence for change class
confidence = probabilities[1] if prediction == 1 else probabilities[0]
return bool(prediction), confidence
def save_model(self, path):
“””Save trained model”””
with open(path, ‘wb’) as f:
pickle.dump(self.model, f)
def load_model(self, path):
“””Load pre-trained model”””
with open(path, ‘rb’) as f:
self.model = pickle.load(f)
Real-Time Speaker Change Tracking
Implement real-time detection with smoothing:
from collections import deque
import time
class RealtimeChangeTracker:
def __init__(self, detector, smoothing_window=3):
self.detector = detector
self.smoothing_window = smoothing_window
self.detection_history = deque(maxlen=smoothing_window)
self.current_speaker_id = 0
self.speaker_segments = []
self.segment_start_time = None
def process_chunk(self, audio_chunk, timestamp):
“””Process audio chunk and track speaker changes”””
is_change, confidence = self.detector.detect_with_confidence(audio_chunk)
# Add to history
self.detection_history.append(is_change)
# Smooth detection using voting
change_votes = sum(self.detection_history)
smoothed_change = change_votes > (self.smoothing_window / 2)
if smoothed_change:
# Speaker changed
self._record_speaker_change(timestamp, confidence)
return True, confidence
return False, confidence
def _record_speaker_change(self, timestamp, confidence):
“””Record speaker change event”””
if self.segment_start_time is not None:
# Save previous segment
segment = {
‘speaker_id’: self.current_speaker_id,
‘start_time’: self.segment_start_time,
‘end_time’: timestamp,
‘duration’: timestamp – self.segment_start_time
}
self.speaker_segments.append(segment)
# Start new segment
self.current_speaker_id += 1
self.segment_start_time = timestamp
print(f”[{timestamp:.2f}s] Speaker change detected (confidence: {confidence:.2f})”)
def get_current_speaker(self):
“””Get current speaker ID”””
return self.current_speaker_id
def get_speaker_timeline(self):
“””Get complete speaker timeline”””
return self.speaker_segments
def finalize(self, end_timestamp):
“””Finalize tracking and close last segment”””
if self.segment_start_time is not None:
segment = {
‘speaker_id’: self.current_speaker_id,
‘start_time’: self.segment_start_time,
‘end_time’: end_timestamp,
‘duration’: end_timestamp – self.segment_start_time
}
self.speaker_segments.append(segment)
Complete Speaker Change Detection System
Integrate all components:
import pyaudio
import wave
class SpeakerChangeDetectionSystem:
def __init__(self, detection_method=’distance’):
self.sample_rate = 16000
self.chunk_size = int(self.sample_rate * 0.5) # 0.5 second chunks
# Initialize detector based on method
if detection_method == ‘distance’:
detector = DistanceBasedDetector(threshold=0.5)
elif detection_method == ‘bayesian’:
detector = BayesianChangeDetector(threshold=0.5)
elif detection_method == ‘ml’:
detector = MLChangeDetector()
else:
raise ValueError(f”Unknown detection method: {detection_method}”)
self.tracker = RealtimeChangeTracker(detector, smoothing_window=3)
self.audio = pyaudio.PyAudio()
self.stream = None
def start_detection(self, duration=None):
“””Start real-time speaker change detection”””
print(“Starting speaker change detection…”)
self.stream = self.audio.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
start_time = time.time()
try:
while True:
# Check duration limit
if duration and (time.time() – start_time) >= duration:
break
# Read audio chunk
audio_chunk = self.stream.read(self.chunk_size, exception_on_overflow=False)
current_time = time.time() – start_time
# Detect speaker change
is_change, confidence = self.tracker.process_chunk(
audio_chunk,
current_time
)
# Display current speaker
if is_change:
speaker_id = self.tracker.get_current_speaker()
print(f”Now speaking: Speaker {speaker_id}”)
except KeyboardInterrupt:
print(“\nStopping detection…”)
finally:
self.stop_detection()
def stop_detection(self):
“””Stop detection and save results”””
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
# Finalize tracking
end_time = time.time()
self.tracker.finalize(end_time)
# Print summary
self._print_summary()
def _print_summary(self):
“””Print detection summary”””
timeline = self.tracker.get_speaker_timeline()
print(“\n” + “=”*60)
print(“SPEAKER CHANGE DETECTION SUMMARY”)
print(“=”*60)
print(f”Total speakers detected: {len(set(s[‘speaker_id’] for s in timeline))}”)
print(f”Total speaker changes: {len(timeline) – 1}”)
print(“\nSpeaker Timeline:”)
for segment in timeline:
print(f” Speaker {segment[‘speaker_id’]}: “
f”{segment[‘start_time’]:.2f}s – {segment[‘end_time’]:.2f}s “
f”(duration: {segment[‘duration’]:.2f}s)”)
# Usage example
if __name__ == “__main__”:
# Choose detection method: ‘distance’, ‘bayesian’, or ‘ml’
system = SpeakerChangeDetectionSystem(detection_method=’distance’)
try:
# Run detection for 60 seconds
system.start_detection(duration=60)
except Exception as e:
print(f”Error: {e}”)
system.stop_detection()
Performance Optimization
Reduce latency by processing audio in parallel threads. Use smaller chunk sizes (0.3-0.5 seconds) for faster detection but maintain a smoothing window to prevent false positives. Cache feature extraction results when analyzing overlapping windows.
Tune thresholds based on your meeting environment—formal meetings with clear turn-taking need lower thresholds than casual conversations with frequent interruptions. Monitor false positive rates and adjust accordingly.
Validation and Testing
Test your detector with ground truth data:
def evaluate_detector(audio_file, ground_truth_changes):
“””Evaluate detector accuracy”””
# Run detector
detected_changes = run_detector(audio_file)
# Calculate metrics
true_positives = 0
false_positives = 0
false_negatives = 0
tolerance = 0.5 # seconds
for detected in detected_changes:
matched = False
for truth in ground_truth_changes:
if abs(detected – truth) <= tolerance:
true_positives += 1
matched = True
break
if not matched:
false_positives += 1
false_negatives = len(ground_truth_changes) – true_positives
precision = true_positives / (true_positives + false_positives)
recall = true_positives / (true_positives + false_negatives)
f1_score = 2 * (precision * recall) / (precision + recall)
return {‘precision’: precision, ‘recall’: recall, ‘f1’: f1_score}
Your speaker change detection system now accurately identifies when speakers transition, enabling real-time meeting features, improved transcription segmentation, and interactive participant tracking.
Conclusion
Implementing robust speaker change detection requires combining acoustic feature extraction, statistical analysis, and real-time tracking with appropriate smoothing to deliver accurate, low-latency results for meeting applications.If you want production-ready speaker change detection without building complex pipelines, consider Meetstream.ai API, which provides automatic speaker tracking and identification across all major meeting platforms.