From f9e73333ba8d9175fe68db7f0dcc575723271b3b Mon Sep 17 00:00:00 2001 From: James Price Date: Mon, 15 Jun 2026 16:31:09 -0400 Subject: [PATCH] Add timestamp-aware clipping for shared ?t= links When a YouTube link carries a t=/start= offset, download only a 60s window around it via yt-dlp --download-sections instead of the whole video. Hour-long uploads shared at a timestamp previously failed the 100 MB Signal limit even after re-encoding. Co-Authored-By: Claude Opus 4.8 (1M context) --- bot.py | 105 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 23 deletions(-) diff --git a/bot.py b/bot.py index 251b145..b219511 100644 --- a/bot.py +++ b/bot.py @@ -17,6 +17,7 @@ YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/) TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})" MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB +CLIP_DURATION = 60 # seconds to grab around a shared ?t= timestamp YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} @@ -102,6 +103,27 @@ def _summarize_ytdlp_error(stderr: str) -> str: return lines[-1][:240] if lines else "unknown error" +def _parse_timestamp(value: str) -> int | None: + """Parse a YouTube timestamp into seconds. + + Accepts plain seconds ("1509", "90s") and the h/m/s form ("25m9s", + "1h2m3s"). Returns None for anything that isn't a recognizable time. + """ + if not value: + return None + m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s?)?", value.strip(), re.IGNORECASE) + if not m or not any(m.groups()): + return None + h, mi, s = (int(g) if g else 0 for g in m.groups()) + return h * 3600 + mi * 60 + s + + +def _extract_timestamp(url_token: str) -> int | None: + """Pull the start offset (seconds) out of a YouTube URL's t=/start= param.""" + m = re.search(r"[?&#](?:t|start)=([0-9hms]+)", url_token, re.IGNORECASE) + return _parse_timestamp(m.group(1)) if m else None + + class VideoTracker(Command): """Watches all group messages for video attachments and stores the last one.""" async def handle(self, c: Context) -> None: @@ -134,13 +156,25 @@ class VideoCommand(Command): if not c.message.is_group(): return - urls = re.findall(VIDEO_URL_PATTERN, c.message.text) - if not urls: + matches = list(re.finditer(VIDEO_URL_PATTERN, c.message.text)) + if not matches: return is_edit = c.message.type == MessageType.EDIT_MESSAGE - for url in urls: + for m in matches: + url = m.group(0) + + # The URL pattern stops at the video id, so any ?t=/&t= timestamp + # lives in the characters that follow. Grab the whole whitespace- + # delimited token to recover it. Only YouTube uses these offsets. + token = re.match(r"\S+", c.message.text[m.start():]).group(0) + clip_start = ( + _extract_timestamp(token) + if re.match(YOUTUBE_URL_PATTERN, url) + else None + ) + # Normalize fxtwitter/vxtwitter wrappers to x.com url = re.sub( r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)", @@ -153,12 +187,16 @@ class VideoCommand(Command): continue _mark_url_handled(c.message.group, url) - await self._download_and_send(c, url) + await self._download_and_send(c, url, clip_start) - async def _download_and_send(self, c: Context, url: str) -> None: + async def _download_and_send(self, c: Context, url: str, clip_start: int | None = None) -> None: + clip = None + if clip_start is not None: + clip = (clip_start, clip_start + CLIP_DURATION) + log.info("Clipping %s to %d-%ds around shared timestamp", url, clip[0], clip[1]) with tempfile.TemporaryDirectory() as tmpdir: outpath = os.path.join(tmpdir, "video.mp4") - ok, err = self._run_ytdlp(url, outpath, tmpdir) + ok, err = self._run_ytdlp(url, outpath, tmpdir, clip) if not ok: if _is_no_media_error(err): # Link just has no video (e.g. a text-only tweet). Stay quiet. @@ -196,9 +234,44 @@ class VideoCommand(Command): _set_video(c.message.group, b64_video) await c.send("", base64_attachments=[b64_video]) - def _run_ytdlp(self, url: str, outpath: str, tmpdir: str) -> tuple[bool, str]: + def _run_ytdlp(self, url: str, outpath: str, tmpdir: str, + clip: tuple[int, int] | None = None) -> tuple[bool, str]: """Run yt-dlp with retries. Returns (success, short_reason). - reason is empty on success; otherwise a one-line description suitable for user reply.""" + reason is empty on success; otherwise a one-line description suitable for user reply. + When clip is (start, end), only that window is downloaded.""" + cmd = [ + YTDLP, + "--no-playlist", + # YouTube wraps URLs in a JS "n-sig" challenge; node solves it + # via yt-dlp-ejs. Without this, only image/thumb formats resolve. + "--js-runtimes", "node", + ] + if clip is not None: + start, end = clip + # Download only the requested window instead of the whole video — + # essential for hour-long uploads shared with a ?t= timestamp. + # force-keyframes-at-cuts makes the start boundary accurate. + # The filesize filters below key off the *whole* video's size, which + # is irrelevant to a 60s slice, so drop them and just take best mp4. + cmd += [ + "--download-sections", f"*{start}-{end}", + "--force-keyframes-at-cuts", + "-f", "best[ext=mp4]/best", + ] + else: + # Prefer the largest mp4 that already fits under 95 MB, so we avoid + # re-encoding when a smaller variant exists (e.g. a 4K rendition + # >100 MB alongside a 1080p ~50 MB). + cmd += [ + "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", + ] + cmd += [ + "--merge-output-format", "mp4", + *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), + "-o", outpath, + url, + ] + delays = [0, 3, 8] last_stderr = "" for attempt, delay in enumerate(delays, 1): @@ -206,21 +279,7 @@ class VideoCommand(Command): time.sleep(delay) try: result = subprocess.run( - [ - YTDLP, - "--no-playlist", - # YouTube wraps URLs in a JS "n-sig" challenge; node solves it - # via yt-dlp-ejs. Without this, only image/thumb formats resolve. - "--js-runtimes", "node", - # Prefer the largest mp4 that already fits under 95 MB, - # so we avoid re-encoding when a smaller variant exists - # (e.g. a 4K rendition >100 MB alongside a 1080p ~50 MB). - "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", - "--merge-output-format", "mp4", - *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), - "-o", outpath, - url, - ], + cmd, capture_output=True, text=True, timeout=120,