Skip to content

ytdlp Service Deployment Guide

This guide explains how to deploy a video parsing service based on yt-dlp either locally or on the cloud (such as AWS Lambda).

1. Service Source Code

Below is a recommended Python service implementation, ready for local or cloud deployment:

python
import json
import traceback
import yt_dlp

def _build_format_entry(f: dict) -> dict:
    return {
        'format_id': f.get('format_id'),
        'url': f.get('url'),
        'ext': f.get('ext'),
        'resolution': f.get('resolution') or f.get('height'),
        'width': f.get('width'),
        'height': f.get('height'),
        'fps': f.get('fps'),
        'vcodec': f.get('vcodec'),
        'acodec': f.get('acodec'),
        'abr': f.get('abr'),
        'vbr': f.get('vbr'),
        'tbr': f.get('tbr'),
        'filesize': f.get('filesize') or f.get('filesize_approx'),
        'protocol': f.get('protocol'),
        'language': f.get('language'),
        'format_note': f.get('format_note'),
    }

def resolve(url: str) -> dict:
    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'noplaylist': True,
        'format': 'best/bestvideo+bestaudio',
        'skip_download': True,
        'writesubtitles': True,
        'writeautomaticsub': True,
        'cookiefile': 'cookie.txt',
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        if 'requested_formats' in info:
            best = [_build_format_entry(f) for f in info['requested_formats']]
        else:
            best = [_build_format_entry(info)]
        videos, audios, combined = [], [], []
        for f in info.get('formats') or []:
            entry = _build_format_entry(f)
            vc = f.get('vcodec', 'none')
            ac = f.get('acodec', 'none')
            has_video = vc and vc != 'none'
            has_audio = ac and ac != 'none'
            if has_video and has_audio:
                combined.append(entry)
            elif has_video:
                videos.append(entry)
            elif has_audio:
                audios.append(entry)
        subtitles = {}
        for lang, subs in (info.get('subtitles') or {}).items():
            subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]
        auto_subtitles = {}
        for lang, subs in (info.get('automatic_captions') or {}).items():
            auto_subtitles[lang] = [
                {'ext': s.get('ext'), 'url': s.get('url'), 'name': s.get('name')}
                for s in subs
            ]
        danmaku_url = None
        for lang, subs in (info.get('subtitles') or {}).items():
            if 'danmaku' in lang.lower():
                danmaku_url = subs[0].get('url') if subs else None
                break
        comments = info.get('comments')
        thumbnails = [
            {'url': t.get('url'), 'width': t.get('width'), 'height': t.get('height'), 'id': t.get('id')}
            for t in (info.get('thumbnails') or [])
        ]
        chapters = [
            {'title': c.get('title'), 'start_time': c.get('start_time'), 'end_time': c.get('end_time')}
            for c in (info.get('chapters') or [])
        ]
        return {
            'title': info.get('title'),
            'description': info.get('description'),
            'thumbnail': info.get('thumbnail'),
            'thumbnails': thumbnails,
            'duration': info.get('duration'),
            'uploader': info.get('uploader'),
            'upload_date': info.get('upload_date'),
            'view_count': info.get('view_count'),
            'like_count': info.get('like_count'),
            'webpage_url': info.get('webpage_url'),
            'best': best,
            'videos': videos,
            'audios': audios,
            'combined': combined,
            'subtitles': subtitles,
            'auto_subtitles': auto_subtitles,
            'danmaku_url': danmaku_url,
            'comments': comments,
            'chapters': chapters,
        }

def handler(event, context):
    params = event.get('queryStringParameters') or {}
    url = params.get('url', '').strip()
    print(f"Received URL: {url}")
    if not url:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Missing url parameter'}),
        }
    try:
        result = resolve(url)
        return {
            'statusCode': 200,
            'body': json.dumps(result, ensure_ascii=False),
        }
    except Exception as e:
        traceback.print_exc()
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)}),
        }

if __name__ == '__main__':
    import sys
    import os
    is_lambda = os.environ.get('AWS_LAMBDA_FUNCTION_NAME') is not None
    if is_lambda:
        test_url = sys.argv[1] if len(sys.argv) > 1 else 'https://example.com/video'
        result = resolve(test_url)
        print(json.dumps(result, ensure_ascii=False, indent=2))
    else:
        from http.server import BaseHTTPRequestHandler, HTTPServer
        from urllib.parse import urlparse, parse_qs
        class VideoResolverHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                parsed_url = urlparse(self.path)
                params = parse_qs(parsed_url.query)
                url = params.get('url', [''])[0].strip()
                self.send_response(200 if url else 400)
                self.send_header('Content-Type', 'application/json; charset=utf-8')
                self.end_headers()
                if not url:
                    self.wfile.write(json.dumps({'error': 'Missing url parameter'}).encode('utf-8'))
                    return
                try:
                    result = resolve(url)
                    self.wfile.write(json.dumps(result, ensure_ascii=False).encode('utf-8'))
                except Exception as e:
                    traceback.print_exc()
                    self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
        port = int(os.environ.get('PORT', 8080))
        print(f"Starting HTTP server on port {port}...")
        server = HTTPServer(('0.0.0.0', port), VideoResolverHandler)
        try:
            server.serve_forever()
        except KeyboardInterrupt:
            print("Shutting down server.")
            server.server_close()

2. Local Deployment

  1. Install dependencies:
    bash
    pip install yt-dlp
  2. Run the service:
    bash
    python ytdlp_service.py
  3. Access the API: Open your browser and visit http://localhost:8080/?url=<video_page_URL>

3. Cloud Deployment (e.g. AWS Lambda)

  • Upload the above code as a Lambda function, with entry point handler.
  • Make sure to include the yt-dlp dependency in your deployment package.
  • Trigger via API Gateway, with parameter ?url=<video_page_URL>.

  • Supports cookie.txt for sites requiring login.
  • Supports parsing video, audio, subtitles, danmaku, and more.
  • You can modify the code as needed for your use case.

For questions or feedback, please contact us.