Add timestamp-aware clipping for shared ?t= links

When a YouTube link carries a t=/start= offset, download only a 60s
window around it via yt-dlp --download-sections instead of the whole
video. Hour-long uploads shared at a timestamp previously failed the
100 MB Signal limit even after re-encoding.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 16:31:09 -04:00
parent 0f9030b72e
commit f9e73333ba
+82 -23
View File
@@ -17,6 +17,7 @@ YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)
TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)"
VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})" VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})"
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
CLIP_DURATION = 60 # seconds to grab around a shared ?t= timestamp
YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp")
COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt")
ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()}
@@ -102,6 +103,27 @@ def _summarize_ytdlp_error(stderr: str) -> str:
return lines[-1][:240] if lines else "unknown error" return lines[-1][:240] if lines else "unknown error"
def _parse_timestamp(value: str) -> int | None:
"""Parse a YouTube timestamp into seconds.
Accepts plain seconds ("1509", "90s") and the h/m/s form ("25m9s",
"1h2m3s"). Returns None for anything that isn't a recognizable time.
"""
if not value:
return None
m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s?)?", value.strip(), re.IGNORECASE)
if not m or not any(m.groups()):
return None
h, mi, s = (int(g) if g else 0 for g in m.groups())
return h * 3600 + mi * 60 + s
def _extract_timestamp(url_token: str) -> int | None:
"""Pull the start offset (seconds) out of a YouTube URL's t=/start= param."""
m = re.search(r"[?&#](?:t|start)=([0-9hms]+)", url_token, re.IGNORECASE)
return _parse_timestamp(m.group(1)) if m else None
class VideoTracker(Command): class VideoTracker(Command):
"""Watches all group messages for video attachments and stores the last one.""" """Watches all group messages for video attachments and stores the last one."""
async def handle(self, c: Context) -> None: async def handle(self, c: Context) -> None:
@@ -134,13 +156,25 @@ class VideoCommand(Command):
if not c.message.is_group(): if not c.message.is_group():
return return
urls = re.findall(VIDEO_URL_PATTERN, c.message.text) matches = list(re.finditer(VIDEO_URL_PATTERN, c.message.text))
if not urls: if not matches:
return return
is_edit = c.message.type == MessageType.EDIT_MESSAGE is_edit = c.message.type == MessageType.EDIT_MESSAGE
for url in urls: for m in matches:
url = m.group(0)
# The URL pattern stops at the video id, so any ?t=/&t= timestamp
# lives in the characters that follow. Grab the whole whitespace-
# delimited token to recover it. Only YouTube uses these offsets.
token = re.match(r"\S+", c.message.text[m.start():]).group(0)
clip_start = (
_extract_timestamp(token)
if re.match(YOUTUBE_URL_PATTERN, url)
else None
)
# Normalize fxtwitter/vxtwitter wrappers to x.com # Normalize fxtwitter/vxtwitter wrappers to x.com
url = re.sub( url = re.sub(
r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)", r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)",
@@ -153,12 +187,16 @@ class VideoCommand(Command):
continue continue
_mark_url_handled(c.message.group, url) _mark_url_handled(c.message.group, url)
await self._download_and_send(c, url) await self._download_and_send(c, url, clip_start)
async def _download_and_send(self, c: Context, url: str) -> None: async def _download_and_send(self, c: Context, url: str, clip_start: int | None = None) -> None:
clip = None
if clip_start is not None:
clip = (clip_start, clip_start + CLIP_DURATION)
log.info("Clipping %s to %d-%ds around shared timestamp", url, clip[0], clip[1])
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
outpath = os.path.join(tmpdir, "video.mp4") outpath = os.path.join(tmpdir, "video.mp4")
ok, err = self._run_ytdlp(url, outpath, tmpdir) ok, err = self._run_ytdlp(url, outpath, tmpdir, clip)
if not ok: if not ok:
if _is_no_media_error(err): if _is_no_media_error(err):
# Link just has no video (e.g. a text-only tweet). Stay quiet. # Link just has no video (e.g. a text-only tweet). Stay quiet.
@@ -196,9 +234,44 @@ class VideoCommand(Command):
_set_video(c.message.group, b64_video) _set_video(c.message.group, b64_video)
await c.send("", base64_attachments=[b64_video]) await c.send("", base64_attachments=[b64_video])
def _run_ytdlp(self, url: str, outpath: str, tmpdir: str) -> tuple[bool, str]: def _run_ytdlp(self, url: str, outpath: str, tmpdir: str,
clip: tuple[int, int] | None = None) -> tuple[bool, str]:
"""Run yt-dlp with retries. Returns (success, short_reason). """Run yt-dlp with retries. Returns (success, short_reason).
reason is empty on success; otherwise a one-line description suitable for user reply.""" reason is empty on success; otherwise a one-line description suitable for user reply.
When clip is (start, end), only that window is downloaded."""
cmd = [
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
]
if clip is not None:
start, end = clip
# Download only the requested window instead of the whole video —
# essential for hour-long uploads shared with a ?t= timestamp.
# force-keyframes-at-cuts makes the start boundary accurate.
# The filesize filters below key off the *whole* video's size, which
# is irrelevant to a 60s slice, so drop them and just take best mp4.
cmd += [
"--download-sections", f"*{start}-{end}",
"--force-keyframes-at-cuts",
"-f", "best[ext=mp4]/best",
]
else:
# Prefer the largest mp4 that already fits under 95 MB, so we avoid
# re-encoding when a smaller variant exists (e.g. a 4K rendition
# >100 MB alongside a 1080p ~50 MB).
cmd += [
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
]
cmd += [
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
url,
]
delays = [0, 3, 8] delays = [0, 3, 8]
last_stderr = "" last_stderr = ""
for attempt, delay in enumerate(delays, 1): for attempt, delay in enumerate(delays, 1):
@@ -206,21 +279,7 @@ class VideoCommand(Command):
time.sleep(delay) time.sleep(delay)
try: try:
result = subprocess.run( result = subprocess.run(
[ cmd,
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
# Prefer the largest mp4 that already fits under 95 MB,
# so we avoid re-encoding when a smaller variant exists
# (e.g. a 4K rendition >100 MB alongside a 1080p ~50 MB).
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
url,
],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=120, timeout=120,