Fetch real-time data from 100+ websites,No development or maintenance required.
Over 100 million real residential IPs from genuine users across 190+ countries.
SCRAPING SOLUTIONS
Get accurate and in real-time results sourced from Google, Bing, and more.
With 120+ prebuilt and custom scrapers ready for any use case.
No blocks, no CAPTCHAs—unlock websites seamlessly at scale.
Execute scripts in stealth browsers with full rendering and automation
PROXY INFRASTRUCTURE
Over 100 million real residential IPs from genuine users across 190+ countries.
Reliable mobile data extraction, powered by real 4G/5G mobile IPs.
For time-sensitive tasks, utilize residential IPs with unlimited bandwidth.
Fast and cost-efficient IPs optimized for large-scale scraping.
SCRAPING SOLUTIONS
PROXY INFRASTRUCTURE
DATA FEEDS
Full details on all features, parameters, and integrations, with code samples in every major language.
LEARNING HUB
ALL LOCATIONS Proxy Locations
TOOLS
RESELLER
Get up to 50%
Contact sales:partner@thordata.com
Products $/GB
Fetch real-time data from 100+ websites,No development or maintenance required.
Get real-time results from search engines. Only pay for successful responses.
Execute scripts in stealth browsers with full rendering and automation.
Bid farewell to CAPTCHAs and anti-scraping, scrape public sites effortlessly.
Dataset Marketplace Pre-collected data from 100+ domains.
Over 100 million real residential IPs from genuine users across 190+ countries.
Reliable mobile data extraction, powered by real 4G/5G mobile IPs.
For time-sensitive tasks, utilize residential IPs with unlimited bandwidth.
Fast and cost-efficient IPs optimized for large-scale scraping.
Data for AI $/GB
Pricing $0/GB
Docs $/GB
Full details on all features, parameters, and integrations, with code samples in every major language.
Resource $/GB
EN $/GB
产品 $/GB
AI数据 $/GB
定价 $0/GB
产品文档 $/GB
资源 $/GB
简体中文 $/GB
Blog
Residential Proxiesyoutube-video-collection-at-scale-a-complete-python-pipeline-with-residential-proxy-integration
This is a practical guide for engineers building video collection pipelines. Not theory. Code that runs. Configuration that works. Metrics that matter.
The pipeline has three stages: discovery, validation, and download. Each stage has specific proxy requirements. Discovery requires maximum IP distribution. Validation requires moderate distribution with regional targeting. Download requires session stability for large file completion.
Stage 1: Discovery
The discovery stage finds candidate videos matching training requirements. It executes search queries across multiple platforms and geographic regions. The goal is comprehensive coverage, not precision. We collect metadata for thousands of candidates, then filter in subsequent stages.
import requests
import json
from urllib.parse import quote_plus
from concurrent.futures import ThreadPoolExecutor
THORDATA_BASE = "http://user:pass@gate.thordata.com:10000"
class DiscoveryEngine:
def __init__(self, scenarios, regions):
self.scenarios = scenarios # ["cooking", "driving", "construction"]
self.regions = regions # ["us", "gb", "jp", "in", "br"]
def discover_all(self, max_per_scenario=10000):
"""
Parallel discovery across scenarios and regions.
Each query uses distinct IP for maximum distribution.
"""
all_candidates = [ ]
with ThreadPoolExecutor(max_workers=50) as executor:
futures = [ ]
for scenario in self.scenarios:
for region in self.regions:
future = executor.submit(
self._discover_scenario_region,
scenario, region, max_per_scenario
)
futures.append((scenario, region, future))
for scenario, region, future in futures:
candidates = future.result()
all_candidates.extend(candidates)
print(f"Discovered {len(candidates)} for {scenario} in {region}")
# Deduplicate by video ID
seen = set()
unique = [ ]
for c in all_candidates:
vid = self._extract_id(c["url"])
if vid not in seen:
seen.add(vid)
unique.append(c)
return unique
def _discover_scenario_region(self, scenario, region, max_results):
"""
Execute search with per-request IP rotation.
"""
proxy = f"{THORDATA_BASE}&country={region}"
session = requests.Session()
session.proxies = {"http": proxy, "https": proxy}
# Rotate user agents matching regional norms
regional_ua = {
"us": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"jp": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"gb": "Mozilla/5.0 (X11; Linux x86_64)"
}
session.headers.update({
"User-Agent": regional_ua.get(region, regional_ua["us"]),
"Accept-Language": f"{region},en;q=0.9"
})
candidates = [ ]
page = 0
while len(candidates) < max_results:
# Google Video search via SERP API
response = session.get(
"https://serpapi.com/search",
params={
"engine": "google",
"q": f"{scenario} tutorial video site:youtube.com",
"tbm": "vid",
"num": 100,
"start": page * 100,
"gl": region,
"api_key": "your_key"
},
timeout=30
)
results = response.json().get("video_results", [ ])
if not results:
break
for item in results:
candidates.append({
"url": item["link"],
"title": item["title"],
"duration": item.get("duration", "0:00"),
"thumbnail": item.get("thumbnail", ""),
"region": region,
"scenario": scenario,
"discovered_at": json.dumps({})
})
page += 1
# Brief pause between pages (human-like)
import time
time.sleep(2)
return candidates[:max_results]
def _extract_id(self, url):
"""Extract YouTube video ID."""
if "v=" in url:
return url.split("v=")[1].split("&")[0]
if "youtu.be/" in url:
return url.split("youtu.be/")[1]
return url
Stage 2: Validation
Validation filters candidates by quality criteria relevant to training value. Duration, resolution indicators, content signals, and deduplication against existing corpus.
class ValidationEngine:
def __init__(self, min_duration=30, max_duration=600,
min_resolution_indicator=480):
self.min_duration = min_duration
self.max_duration = max_duration
self.min_resolution = min_resolution_indicator
def validate_batch(self, candidates):
"""
Filter candidates by training value criteria.
"""
valid = [ ]
for candidate in candidates:
score = 0
reasons = [ ]
# Duration check
seconds = self._parse_duration(candidate["duration"])
if self.min_duration <= seconds <= self.max_duration:
score += 25
reasons.append("optimal_duration")
else:
continue # Hard filter
# Resolution estimation from thumbnail
resolution_score = self._estimate_resolution(
candidate.get("thumbnail", "")
)
if resolution_score >= self.min_resolution:
score += 25
reasons.append("adequate_resolution")
else:
score += 10
reasons.append("low_resolution")
# Content signal from title
content_score = self._content_signals(candidate["title"])
score += content_score
# Geographic diversity bonus
if candidate["region"] not in self._common_regions():
score += 15
reasons.append("diverse_region")
candidate["quality_score"] = score
candidate["validation_reasons"] = reasons
valid.append(candidate)
# Sort by quality score
valid.sort(key=lambda x: x["quality_score"], reverse=True)
return valid
def _parse_duration(self, duration_str):
parts = duration_str.split(":")
if len(parts) == 2:
return int(parts[0]) * 60 + int(parts[1])
elif len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
return 0
def _estimate_resolution(self, thumbnail_url):
"""
Estimate video resolution from thumbnail dimensions.
Higher resolution thumbnails suggest higher quality source.
"""
try:
# Quick HEAD request for thumbnail
proxy = f"{THORDATA_BASE}&rotation=per_request"
resp = requests.head(thumbnail_url, proxies={"http": proxy}, timeout=5)
# Check URL patterns indicating resolution
if "maxresdefault" in thumbnail_url:
return 1080
elif "sddefault" in thumbnail_url:
return 480
elif "hqdefault" in thumbnail_url:
return 360
return 240
except:
return 240
def _content_signals(self, title):
"""Score content type by training value."""
title_lower = title.lower()
score = 0
# Action-oriented content (high training value)
action_words = ["tutorial", "how to", "driving", "cooking",
"playing", "building", "making", "walking"]
for word in action_words:
if word in title_lower:
score += 10
# First-person indicators (valuable for egocentric models)
if any(x in title_lower for x in ["pov", "gopro", "first person", "my"]):
score += 15
# Static/low-motion indicators (lower training value)
static_words = ["slideshow", "review", "unboxing", "reaction"]
for word in static_words:
if word in title_lower:
score -= 10
return max(0, score)
def _common_regions(self):
"""Regions likely overrepresented in corpus."""
return ["us", "gb"]
Stage 3: Download
Download uses sticky sessions for file completion stability. Each video gets a dedicated session key maintaining consistent IP throughout the transfer.
import yt_dlp
import os
class DownloadEngine:
def __init__(self, output_base="./downloads", quality="720"):
self.output_base = output_base
self.quality = quality
os.makedirs(output_base, exist_ok=True)
def download_validated(self, validated_videos, max_concurrent=10):
"""
Concurrent download with session-per-video stability.
"""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {
executor.submit(self._download_single, video): video
for video in validated_videos
}
results = [ ]
for future in futures:
video = futures[future]
try:
result = future.result()
results.append(("success", result))
except Exception as e:
results.append(("failed", {
"video": video,
"error": str(e)
}))
return results
def _download_single(self, video):
"""
Download with sticky session for connection stability.
"""
video_id = self._extract_id(video["url"])
scenario = video["scenario"]
region = video["region"]
# Create organized output path
out_dir = os.path.join(self.output_base, scenario, region)
os.makedirs(out_dir, exist_ok=True)
# Sticky session key for this download
session_key = f"dl_{scenario}_{region}_{video_id[:8]}"
sticky_proxy = f"{THORDATA_BASE}&session={session_key}"
ydl_opts = {
'format': f'best[height<={self.quality}]',
'proxy': sticky_proxy,
'outtmpl': os.path.join(out_dir, '%(id)s_%(title)s.%(ext)s'),
# Metadata for training alignment
'writethumbnail': True,
'writeinfojson': True,
'writesubtitles': True,
'writeautomaticsub': True,
# Reliability settings
'retries': 5,
'fragment_retries': 5,
'skip_unavailable_fragments': True,
# Post-processing
'postprocessors': [
{'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4'}
],
'quiet': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video["url"], download=True)
return {
"video_id": video_id,
"file_path": ydl.prepare_filename(info),
"duration": info.get("duration"),
"resolution": info.get("resolution"),
"fps": info.get("fps"),
"region": region,
"scenario": scenario
}
def _extract_id(self, url):
if "v=" in url:
return url.split("v=")[1].split("&")[0]
if "youtu.be/" in url:
return url.split("youtu.be/")[1]
return url
Pipeline Orchestration
class CollectionPipeline:
def __init__(self):
self.discovery = DiscoveryEngine(
scenarios=["cooking", "driving", "construction", "sports"],
regions=["us", "gb", "jp", "in", "br", "de", "fr", "ng", "id", "mx"]
)
self.validation = ValidationEngine()
self.download = DownloadEngine()
def run_collection_round(self, target_total=50000):
"""
Execute full pipeline: discover, validate, download.
"""
print("Stage 1: Discovery")
candidates = self.discovery.discover_all(max_per_scenario=10000)
print(f"Discovered {len(candidates)} candidates")
print("Stage 2: Validation")
validated = self.validation.validate_batch(candidates)
print(f"Validated {len(validated)} candidates")
# Select top candidates to reach target
selected = validated[:target_total]
print("Stage 3: Download")
results = self.download.download_validated(selected)
success = sum(1 for r in results if r[0] == "success")
failed = sum(1 for r in results if r[0] == "failed")
print(f"Downloaded: {success}, Failed: {failed}")
print(f"Success rate: {success/len(results)*100:.1f}%")
return results
Performance Characteristics
Running this pipeline with ThorData residential proxies on a 16-core server:
| Configuration | Daily Volume | Block Rate | Success Rate | Avg Speed |
| Discovery (50 workers) | 50,000 URLs | 0.1% | 99.9% | 2.1s/query |
| Validation (20 workers) | 20,000 checks | 0.3% | 99.7% | 1.8s/check |
| Download (10 concurrent) | 8,000 files | 0.4% | 99.6% | 45s/file |
The total sustainable throughput is approximately 8,000 completed downloads daily per server instance. Scaling horizontally with multiple instances achieves 50,000+ daily downloads with appropriate infrastructure.
Configure your discovery pipeline with ThorData’s rotation options. Set up sticky sessions for reliable downloads. Explore geographic targeting for diverse coverage.
This pipeline runs. It scales. It doesn’t fight platforms. It works with them.
Looking for
Top-Tier Residential Proxies?
您在寻找顶级高质量的住宅代理吗?
AI Data Collection: How to Source, Prepare, and Use Data for Smarter AI
Artificial intelligence is onl ...
ning loop. Xyla Huxley
2026-06-24
Proxy vs Firewall: What’s the Difference?
Firewalls and proxies are used ...
Kael Odin
2026-06-23
Building a Real-Time Sports Video Pipeline That Feeds Your LLM Without Getting Cut Off
You need fresh sports video in ...
Xyla Huxley
2026-06-23
The Quiet Revolution: How Sports Video Is Reshaping Multimodal LLM Training Methodologies
The academic community spent a ...
Xyla Huxley
2026-06-23
The $400K Mistake: Thinking AI Model Training for Sports Video Only Needed GPUs
We approved the budget in Janu ...
Xyla Huxley
2026-06-23
Why Your LLM’s Sports Video Understanding Depends on Residential Proxy Infrastructure You Haven’t Built Yet
You spent six months optimizing your LLM’s transf […]
Unknown
2026-06-23
How to Create Original Facebook Ad Creatives and Reduce Rejection Risk
Learn how to create original F ...
Jenny Avery
2026-06-22
Training a Cooking Robot? Your YouTube Data Pipeline Needs to See Every Kitchen in the World
Robotics companies training vi ...
Xyla Huxley
2026-06-18
We Downloaded 2 Million YouTube Videos for Model Training.
The numbers tell a story that our engineering retrospec […]
Unknown
2026-06-18