What is a Rolling Buffer or Sliding Window? Complete Guide to Video Buffering
Rolling buffers, also known as circular buffers or sliding windows, are fundamental data structures that enable modern video streaming features like time-shift playback, instant replay, and cloud DVR functionality. This comprehensive guide explains what rolling buffers are, how they work, and how to implement them effectively.
Understanding Rolling Buffers
What is a Rolling Buffer?
A rolling buffer (also called a circular buffer or ring buffer) is a fixed-size data structure that continuously stores the most recent data while automatically discarding the oldest data when the buffer is full.
Key Characteristics:
- Fixed maximum size (e.g., 24 hours of video)
- Continuous recording with automatic old data removal
- FIFO (First-In-First-Out) behavior
- Efficient memory/storage usage
- Enables time-shift playback and replay
Visual Representation
Rolling Buffer (24-hour capacity):
┌────────────────────────────────────────────┐
│ [Hour 1] [Hour 2] ... [Hour 23] [Hour 24] │ ← Buffer Full
└────────────────────────────────────────────┘
↑ ↑
Oldest Newest
(will be deleted next)
When new data arrives:
┌────────────────────────────────────────────┐
│ [Hour 2] [Hour 3] ... [Hour 24] [Hour 25] │ ← Hour 1 deleted
└────────────────────────────────────────────┘
↑ ↑
Oldest Newest
Rolling Buffer vs. Traditional Storage
| Feature | Rolling Buffer | Traditional Storage |
|---|---|---|
| Size | Fixed (e.g., 24 hours) | Unlimited (until disk full) |
| Old Data | Automatically deleted | Manually deleted or kept forever |
| Storage Cost | Predictable, constant | Grows indefinitely |
| Use Case | Live streams, DVR, monitoring | Archives, permanent records |
| Complexity | Automated cleanup | Manual management required |
How Rolling Buffers Work
The Circular Buffer Concept
A rolling buffer uses a circular data structure where the write position wraps around to the beginning when it reaches the end.
Conceptual Model:
Circular Buffer Structure:
[Segment 5]
/ \
[Segment 4] [Segment 6]
| |
[Segment 3] [Segment 7]
\ /
[Segment 8]
|
Write Head →
Buffer Operations
1. Write Operation (Adding New Data)
class RollingBuffer:
def __init__(self, max_segments=1440): # 24 hours at 1 min segments
self.max_segments = max_segments
self.segments = []
self.write_position = 0
def add_segment(self, segment):
"""
Add new segment to buffer.
If buffer is full, oldest segment is overwritten.
"""
if len(self.segments) < self.max_segments:
# Buffer not full yet, append
self.segments.append(segment)
else:
# Buffer full, overwrite oldest
self.segments[self.write_position] = segment
# Move write position (circular)
self.write_position = (self.write_position + 1) % self.max_segments
return True
2. Read Operation (Playback)
def get_segments(self, start_time, end_time):
"""
Retrieve segments between start and end times.
Enables time-shift playback.
"""
available_segments = []
for segment in self.segments:
if start_time <= segment.timestamp <= end_time:
available_segments.append(segment)
return sorted(available_segments, key=lambda s: s.timestamp)
def get_latest_segments(self, count=10):
"""
Get most recent N segments for live playback.
"""
if len(self.segments) < count:
return self.segments
# Get last N segments
return self.segments[-count:]
3. Cleanup Operation (Removing Old Data)
def cleanup_old_segments(self, retention_hours=24):
"""
Remove segments older than retention period.
Called periodically or on every write.
"""
current_time = time.time()
cutoff_time = current_time - (retention_hours * 3600)
# Remove segments older than cutoff
self.segments = [
seg for seg in self.segments
if seg.timestamp >= cutoff_time
]
Implementation Strategies
Strategy 1: File-Based Rolling Buffer
Store segments as individual files with timestamp-based naming.
Directory Structure:
/buffer/
├── 20260222_120000_001.ts (oldest)
├── 20260222_120006_002.ts
├── 20260222_120012_003.ts
├── ...
└── 20260223_115954_1440.ts (newest)
Implementation:
import os
import time
from datetime import datetime, timedelta
class FileBasedRollingBuffer:
def __init__(self, buffer_dir, retention_hours=24):
self.buffer_dir = buffer_dir
self.retention_hours = retention_hours
os.makedirs(buffer_dir, exist_ok=True)
def add_segment(self, segment_data):
"""
Save segment to disk with timestamp filename.
"""
timestamp = datetime.now()
filename = timestamp.strftime("%Y%m%d_%H%M%S.ts")
filepath = os.path.join(self.buffer_dir, filename)
# Write segment to file
with open(filepath, 'wb') as f:
f.write(segment_data)
# Cleanup old segments
self.cleanup()
return filepath
def cleanup(self):
"""
Remove segments older than retention period.
"""
cutoff_time = datetime.now() - timedelta(hours=self.retention_hours)
for filename in os.listdir(self.buffer_dir):
filepath = os.path.join(self.buffer_dir, filename)
# Parse timestamp from filename
try:
file_time = datetime.strptime(
filename[:15], "%Y%m%d_%H%M%S"
)
# Delete if older than retention
if file_time < cutoff_time:
os.remove(filepath)
print(f"Deleted old segment: {filename}")
except (ValueError, OSError) as e:
print(f"Error processing {filename}: {e}")
def get_segments_in_range(self, start_time, end_time):
"""
Get all segments within time range.
"""
segments = []
for filename in sorted(os.listdir(self.buffer_dir)):
try:
file_time = datetime.strptime(
filename[:15], "%Y%m%d_%H%M%S"
)
if start_time <= file_time <= end_time:
filepath = os.path.join(self.buffer_dir, filename)
segments.append(filepath)
except ValueError:
continue
return segments
# Usage
buffer = FileBasedRollingBuffer(
buffer_dir="/var/video/buffer",
retention_hours=24
)
# Add new segment
buffer.add_segment(video_segment_data)
# Get segments from last hour
start = datetime.now() - timedelta(hours=1)
end = datetime.now()
segments = buffer.get_segments_in_range(start, end)
Strategy 2: Database-Based Rolling Buffer
Store segment metadata in database with references to file storage.
Database Schema:
CREATE TABLE video_segments (
id SERIAL PRIMARY KEY,
camera_id VARCHAR(50) NOT NULL,
segment_path VARCHAR(255) NOT NULL,
timestamp TIMESTAMP NOT NULL,
duration_seconds INT NOT NULL,
size_bytes BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_camera_timestamp (camera_id, timestamp),
INDEX idx_timestamp (timestamp)
);
CREATE TABLE buffer_config (
camera_id VARCHAR(50) PRIMARY KEY,
retention_hours INT DEFAULT 24,
max_size_gb INT DEFAULT 100
);
Implementation:
import psycopg2
from datetime import datetime, timedelta
class DatabaseRollingBuffer:
def __init__(self, db_conn, camera_id, retention_hours=24):
self.db = db_conn
self.camera_id = camera_id
self.retention_hours = retention_hours
def add_segment(self, segment_path, duration, size_bytes):
"""
Register new segment in database.
"""
cursor = self.db.cursor()
# Insert segment record
cursor.execute("""
INSERT INTO video_segments
(camera_id, segment_path, timestamp, duration_seconds, size_bytes)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (
self.camera_id,
segment_path,
datetime.now(),
duration,
size_bytes
))
segment_id = cursor.fetchone()[0]
self.db.commit()
# Cleanup old segments
self.cleanup()
return segment_id
def cleanup(self):
"""
Remove segments older than retention period.
"""
cursor = self.db.cursor()
cutoff_time = datetime.now() - timedelta(hours=self.retention_hours)
# Get segments to delete
cursor.execute("""
SELECT id, segment_path
FROM video_segments
WHERE camera_id = %s AND timestamp < %s
""", (self.camera_id, cutoff_time))
old_segments = cursor.fetchall()
# Delete files and database records
for segment_id, segment_path in old_segments:
try:
# Delete physical file
if os.path.exists(segment_path):
os.remove(segment_path)
# Delete database record
cursor.execute(
"DELETE FROM video_segments WHERE id = %s",
(segment_id,)
)
except OSError as e:
print(f"Error deleting segment {segment_id}: {e}")
self.db.commit()
cursor.close()
def get_segments(self, start_time, end_time):
"""
Query segments within time range.
"""
cursor = self.db.cursor()
cursor.execute("""
SELECT segment_path, timestamp, duration_seconds
FROM video_segments
WHERE camera_id = %s
AND timestamp >= %s
AND timestamp <= %s
ORDER BY timestamp ASC
""", (self.camera_id, start_time, end_time))
segments = cursor.fetchall()
cursor.close()
return segments
def get_buffer_stats(self):
"""
Get current buffer statistics.
"""
cursor = self.db.cursor()
cursor.execute("""
SELECT
COUNT(*) as segment_count,
SUM(size_bytes) as total_size_bytes,
MIN(timestamp) as oldest_segment,
MAX(timestamp) as newest_segment,
SUM(duration_seconds) as total_duration_seconds
FROM video_segments
WHERE camera_id = %s
""", (self.camera_id,))
stats = cursor.fetchone()
cursor.close()
return {
'segment_count': stats[0],
'total_size_gb': stats[1] / (1024**3) if stats[1] else 0,
'oldest_segment': stats[2],
'newest_segment': stats[3],
'total_duration_hours': stats[4] / 3600 if stats[4] else 0
}
Strategy 3: Cloud Storage Rolling Buffer
Use cloud object storage (S3, Google Cloud Storage) with lifecycle policies.
AWS S3 Implementation:
import boto3
from datetime import datetime, timedelta
class S3RollingBuffer:
def __init__(self, bucket_name, camera_id, retention_hours=24):
self.s3 = boto3.client('s3')
self.bucket = bucket_name
self.camera_id = camera_id
self.retention_hours = retention_hours
self.prefix = f"cameras/{camera_id}/segments/"
def add_segment(self, segment_data):
"""
Upload segment to S3.
"""
timestamp = datetime.now()
key = f"{self.prefix}{timestamp.strftime('%Y/%m/%d/%H%M%S')}.ts"
# Upload to S3
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=segment_data,
Metadata={
'camera_id': self.camera_id,
'timestamp': timestamp.isoformat(),
},
StorageClass='STANDARD_IA' # Infrequent Access for cost savings
)
return key
def setup_lifecycle_policy(self):
"""
Configure S3 lifecycle rule for automatic deletion.
"""
lifecycle_config = {
'Rules': [
{
'Id': f'delete-old-segments-{self.camera_id}',
'Status': 'Enabled',
'Prefix': self.prefix,
'Expiration': {
'Days': self.retention_hours // 24 + 1
}
}
]
}
self.s3.put_bucket_lifecycle_configuration(
Bucket=self.bucket,
LifecycleConfiguration=lifecycle_config
)
def get_segments(self, start_time, end_time):
"""
List segments within time range.
"""
segments = []
# List objects with prefix
paginator = self.s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
for obj in page.get('Contents', []):
# Get object metadata
metadata = self.s3.head_object(
Bucket=self.bucket,
Key=obj['Key']
)
segment_time = datetime.fromisoformat(
metadata['Metadata']['timestamp']
)
if start_time <= segment_time <= end_time:
segments.append({
'key': obj['Key'],
'timestamp': segment_time,
'size': obj['Size']
})
return sorted(segments, key=lambda s: s['timestamp'])
def generate_presigned_url(self, key, expiration=3600):
"""
Generate temporary URL for segment playback.
"""
url = self.s3.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket, 'Key': key},
ExpiresIn=expiration
)
return url
Use Cases for Rolling Buffers
1. Security Camera DVR
Scenario: Store last 48 hours of footage from security cameras.
Configuration:
Retention: 48 hours
Segment Size: 6 seconds
Resolution: 720p @ 2 Mbps
Storage per Camera: ~42 GB
Cameras: 16
Total Storage: ~672 GB
Features:
- Live monitoring
- Rewind to any point in last 48 hours
- Export clips for incidents
- Automatic old footage deletion
2. Live Sports Replay
Scenario: Enable instant replay for live sports broadcasts.
Configuration:
Retention: 2 hours (game duration + buffer)
Segment Size: 2 seconds (low latency)
Resolution: 1080p @ 6 Mbps
Storage: ~5.4 GB per stream
Features:
- Instant replay capability
- Multi-angle replay
- Highlight clip creation
- Frame-by-frame analysis
3. Time-Shifted TV
Scenario: Allow viewers to pause, rewind live TV.
Configuration:
Retention: 2 hours per viewer
Segment Size: 6 seconds
Resolution: Various (ABR)
Personal buffer per user
Features:
- Pause live TV
- Rewind up to 2 hours
- Fast forward to live
- Resume on different device
Best Practices
1. Size Your Buffer Appropriately
Calculate Required Storage:
def calculate_buffer_storage(bitrate_mbps, retention_hours):
"""
Calculate storage needed for rolling buffer.
bitrate_mbps: Video bitrate in Mbps
retention_hours: Hours of video to retain
"""
# Convert bitrate to bytes per second
bytes_per_second = (bitrate_mbps * 1_000_000) / 8
# Calculate for retention period
seconds = retention_hours * 3600
total_bytes = bytes_per_second * seconds
# Convert to GB
total_gb = total_bytes / (1024**3)
return round(total_gb, 2)
# Example: 720p at 2 Mbps for 24 hours
print(calculate_buffer_storage(2, 24)) # Output: 21.09 GB
2. Implement Efficient Cleanup
Batch Deletion:
def cleanup_batch(buffer_dir, retention_hours, batch_size=100):
"""
Delete old segments in batches for better performance.
"""
cutoff_time = datetime.now() - timedelta(hours=retention_hours)
deleted_count = 0
# Get all files
files = sorted(os.listdir(buffer_dir))
# Process in batches
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
for filename in batch:
file_time = parse_timestamp_from_filename(filename)
if file_time < cutoff_time:
filepath = os.path.join(buffer_dir, filename)
os.remove(filepath)
deleted_count += 1
return deleted_count
3. Monitor Buffer Health
Key Metrics:
class BufferHealthMonitor:
def get_health_metrics(self, buffer):
stats = buffer.get_buffer_stats()
return {
'storage_usage_percent': (
stats['total_size_gb'] / buffer.max_size_gb * 100
),
'retention_actual_hours': stats['total_duration_hours'],
'retention_target_hours': buffer.retention_hours,
'segment_count': stats['segment_count'],
'oldest_segment_age_hours': (
(datetime.now() - stats['oldest_segment']).total_seconds() / 3600
),
'write_rate_segments_per_hour': self.calculate_write_rate(stats)
}
def check_health(self, metrics):
issues = []
if metrics['storage_usage_percent'] > 90:
issues.append('Storage nearly full')
if metrics['retention_actual_hours'] < metrics['retention_target_hours'] * 0.9:
issues.append('Not retaining enough history')
return {
'healthy': len(issues) == 0,
'issues': issues,
'metrics': metrics
}
Conclusion
Rolling buffers are essential for modern video streaming applications, enabling:
- Time-shift playback: Watch live content from any point in the buffer
- Cloud DVR: Personal recording without local storage
- Instant replay: Quick access to recent content
- Efficient storage: Automatic cleanup of old data
- Predictable costs: Fixed storage requirements
Getting Started with VideoBuffer
VideoBuffer provides fully managed rolling buffer functionality:
- Configurable retention (1 hour to 30 days)
- Automatic segment management
- Time-shift playback API
- Cloud storage optimization
- No infrastructure management
Start your free trial and implement rolling buffers in minutes, not months.
Related Articles: