From c7eb101ff0a943d88c053f27995b3db41fee8f8e Mon Sep 17 00:00:00 2001 From: James Price Date: Mon, 15 Jun 2026 18:43:41 -0400 Subject: [PATCH] Fix bug pass: async offload, resource leaks, media/parse correctness, deploy hardening bot.py: - Run all yt-dlp/ffmpeg/ffprobe/file-IO/base64 work off the event loop via asyncio.to_thread, bounded by a Semaphore(2); the loop no longer freezes bot-wide during downloads/encodes. - download_attachments=False; VideoTracker fetches only video attachments lazily instead of the library base64-ing every attachment of every message. - last_video keyed per (group, sender) with group-latest fallback so /speed and /rev stop silently targeting a stranger's video; proactive TTL sweeps free the big base64 blobs and bound recent_urls growth. - _reencode guards 0/NaN ffprobe duration and adds -maxrate/-bufsize; /rev now size-checks + re-encodes + faststart like the other paths. - Twitter URL regex no longer merges space-separated links (keeps i/web/status); dedupe repeated URLs within a message; mark-handled only on success/no-media so a corrective edit can retry; 'unsupported url' surfaced instead of silently dropped. - All sends wrapped (catch SendMessageError); base64 decode guarded; edit/sync attachment envelopes handled; /cookies temp file created 0600. deploy: - Pin signalbot==1.1.0, yt-dlp floor, add missing yt-dlp-ejs. - Pin signal-cli-rest-api by digest + add /v1/health healthcheck. - Restart=always (the library never exits non-zero when wedged). Co-Authored-By: Claude Opus 4.8 (1M context) --- bot.py | 681 +++++++++++++++++++++++-------------- docker-compose.yml | 12 +- requirements.txt | 7 +- signal-bot.service.example | 4 +- 4 files changed, 439 insertions(+), 265 deletions(-) diff --git a/bot.py b/bot.py index 51e5bdb..8fafbf6 100644 --- a/bot.py +++ b/bot.py @@ -1,4 +1,6 @@ +import asyncio import base64 +import binascii import json import logging import os @@ -11,7 +13,11 @@ from signalbot import Command, Context, SignalBot from signalbot.command import regex_triggered, triggered from signalbot.message import MessageType -TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/.+/status/\d+" +# Path segments before /status/ are [^/\s]+ (not .+): a greedy .+ matched across +# spaces, merging two space-separated tweet links into one malformed URL. The +# (?:/[^/\s]+)* allows multi-segment forms like x.com/i/web/status/ while +# still refusing whitespace (so two links can't merge). +TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/[^/\s]+(?:/[^/\s]+)*/status/\d+" INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+" YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+" TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" @@ -19,67 +25,120 @@ VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB CLIP_DURATION = 60 # default seconds to grab around a shared ?t= timestamp MAX_CLIP_DURATION = 600 # ceiling for a user-supplied /clip override +# Cap simultaneous heavy media jobs (yt-dlp/ffmpeg) so a busy group can't spawn +# unbounded subprocesses. Each job runs in a worker thread (see _run_blocking). +MAX_CONCURRENT_JOBS = 2 YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} -VIDEO_CONTENT_TYPES = ("video/mp4", "video/webm", "video/quicktime", "video/3gpp", "video/mpeg") - logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) log = logging.getLogger("signal-bot") -# group_id -> {"b64": ..., "time": ...} +# (group_id, sender) -> {"b64": ..., "time": monotonic}. Keyed per sender so a +# stranger's later video doesn't silently become the target of your /speed or +# /rev; _get_video falls back to the group's latest video when you have none. last_video = {} VIDEO_TTL = 3600 # 1 hour -# (group_id, url) -> monotonic time the bot started handling this URL. +# (group_id, url) -> monotonic time the URL was successfully handled. # When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE # with the same text — without this guard the bot re-downloads and re-posts the video. recent_urls = {} RECENT_URL_TTL = 600 # 10 min +# (group_id, url) currently being downloaded — guards against an edit redelivery +# arriving mid-download (before recent_urls is set) triggering a second download. +_inflight = set() -def _set_video(group_id, b64): - last_video[group_id] = {"b64": b64, "time": time.monotonic()} +# Lazily created so it binds to the bot's running event loop, not import time. +_job_semaphore = None -def _get_video(group_id): - entry = last_video.get(group_id) - if not entry: - return None - if time.monotonic() - entry["time"] > VIDEO_TTL: - del last_video[group_id] - return None - return entry["b64"] +def _get_job_semaphore() -> asyncio.Semaphore: + global _job_semaphore + if _job_semaphore is None: + _job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS) + return _job_semaphore -def _url_recently_handled(group_id, url): - key = (group_id, url) - t = recent_urls.get(key) - if t is None: - return False - if time.monotonic() - t > RECENT_URL_TTL: +def _sweep_videos() -> None: + now = time.monotonic() + for key in [k for k, e in last_video.items() if now - e["time"] > VIDEO_TTL]: + del last_video[key] + + +def _set_video(group_id, sender, b64) -> None: + _sweep_videos() + last_video[(group_id, sender)] = {"b64": b64, "time": time.monotonic()} + + +def _get_video(group_id, sender): + """The caller's own most recent video, else the group's latest non-expired one.""" + _sweep_videos() + entry = last_video.get((group_id, sender)) + if entry: + return entry["b64"] + best = None + for (g, _s), e in last_video.items(): + if g != group_id: + continue + if best is None or e["time"] > best["time"]: + best = e + return best["b64"] if best else None + + +def _sweep_recent_urls() -> None: + now = time.monotonic() + for key in [k for k, t in recent_urls.items() if now - t > RECENT_URL_TTL]: del recent_urls[key] - return False - return True -def _mark_url_handled(group_id, url): +def _url_recently_handled(group_id, url) -> bool: + _sweep_recent_urls() + return (group_id, url) in recent_urls + + +def _url_busy(group_id, url) -> bool: + return (group_id, url) in _inflight or _url_recently_handled(group_id, url) + + +def _mark_url_handled(group_id, url) -> None: + _sweep_recent_urls() recent_urls[(group_id, url)] = time.monotonic() +async def _safe_reply(c: Context, text: str) -> None: + try: + await c.reply(text) + except Exception as e: # noqa: BLE001 + log.warning("Failed to send reply: %s", e) + + +async def _safe_send_video(c: Context, b64: str) -> bool: + try: + await c.send("", base64_attachments=[b64]) + return True + except Exception as e: # noqa: BLE001 + log.warning("Failed to send video: %s", e) + await _safe_reply(c, "Couldn't send that video (Signal may have rejected it).") + return False + + # Errors that mean "the link simply has no downloadable video" rather than a # genuine failure. We stay silent for these — the bot watches every message with # a link, so most links legitimately have no video and shouldn't draw a complaint. +# NOTE: "unsupported url" is deliberately NOT here. Every URL that reaches yt-dlp +# already matched one of our platform patterns, so "Unsupported URL" means a real +# extractor/version break worth surfacing, not a quiet no-media link. _NO_MEDIA_ERROR_PATTERNS = ( "no video could be found", "there's no video", "no media found", "no video formats found", - "unsupported url", ) @@ -125,30 +184,54 @@ def _extract_timestamp(url_token: str) -> int | None: return _parse_timestamp(m.group(1)) if m else None +def _message_data(envelope: dict) -> dict: + """Find the data_message dict (which carries attachments) inside a raw envelope. + + Mirrors signalbot's own extraction so we also see attachments delivered via + edit envelopes and synced (linked-device) messages, not just plain dataMessage. + """ + if "dataMessage" in envelope: + return envelope["dataMessage"] or {} + if "editMessage" in envelope: + return envelope["editMessage"].get("dataMessage") or {} + if "syncMessage" in envelope: + sent = envelope["syncMessage"].get("sentMessage") or {} + if "editMessage" in sent: + return sent["editMessage"].get("dataMessage") or {} + return sent + return {} + + class VideoTracker(Command): - """Watches all group messages for video attachments and stores the last one.""" + """Watches group messages for video attachments and stores the last one per sender.""" async def handle(self, c: Context) -> None: if not c.message.is_group(): return - if not c.message.base64_attachments: - return - # Check raw message for video content types try: - raw = json.loads(c.message.raw_message) - envelope = raw["envelope"] - data = envelope.get("dataMessage") or envelope.get("syncMessage", {}).get("sentMessage", {}) - attachments = data.get("attachments", []) - except Exception: + envelope = json.loads(c.message.raw_message)["envelope"] + except Exception: # noqa: BLE001 return - for i, att in enumerate(attachments): - content_type = att.get("contentType", "") - if content_type.startswith("video/"): - if i < len(c.message.base64_attachments): - _set_video(c.message.group, c.message.base64_attachments[i]) - log.info("Stored received video for group %s", c.message.group) - return + attachments = _message_data(envelope).get("attachments") or [] + att = next( + (a for a in attachments if str(a.get("contentType", "")).startswith("video/")), + None, + ) + if not att or not att.get("id"): + return + + # download_attachments is disabled globally, so fetch only this video on + # demand instead of letting the library base64 every attachment of every + # message on the producer loop. + try: + b64 = await c.bot._signal.get_attachment(att["id"]) + except Exception as e: # noqa: BLE001 + log.warning("Failed to fetch video attachment %s: %s", att.get("id"), e) + return + + _set_video(c.message.group, _sender_number(c.message), b64) + log.info("Stored received video for group %s", c.message.group) class VideoCommand(Command): @@ -157,23 +240,31 @@ class VideoCommand(Command): if not c.message.is_group(): return - matches = list(re.finditer(VIDEO_URL_PATTERN, c.message.text)) + text = c.message.text or "" + matches = list(re.finditer(VIDEO_URL_PATTERN, text)) if not matches: return # An optional "/clip " anywhere in the message overrides the # default window length for any clip produced from this message. clip_len = CLIP_DURATION - mclip = re.search(r"/clip\s+(\S+)", c.message.text, re.IGNORECASE) + mclip = re.search(r"/clip\s+(\S+)", text, re.IGNORECASE) if mclip: secs = _parse_timestamp(mclip.group(1)) if secs is None or secs < 1: - await c.reply("`/clip` needs a length in seconds, e.g. `/clip 30`.") + await _safe_reply(c, "`/clip` needs a length in seconds, e.g. `/clip 30`.") return clip_len = min(secs, MAX_CLIP_DURATION) is_edit = c.message.type == MessageType.EDIT_MESSAGE + group = c.message.group + sender = _sender_number(c.message) + # Process URLs sequentially in message order. Each download already runs + # off the event loop (asyncio.to_thread), so this doesn't block the bot, + # and sequential keeps last_video deterministic (the last link wins) and + # posts in order. `seen` drops a URL repeated within one message. + seen = set() for m in matches: url = m.group(0) @@ -182,8 +273,8 @@ class VideoCommand(Command): # delimited token to recover it. Timestamps/clips apply to YouTube. clip = None if re.match(YOUTUBE_URL_PATTERN, url): - token = re.match(r"\S+", c.message.text[m.start():]).group(0) - start = _extract_timestamp(token) + tok = re.match(r"\S+", text[m.start():]) + start = _extract_timestamp(tok.group(0) if tok else url) if mclip: # Explicit /clip clips even without a timestamp (from 0). start = start or 0 @@ -198,122 +289,153 @@ class VideoCommand(Command): url, ) - if is_edit and _url_recently_handled(c.message.group, url): + if url in seen: + continue + seen.add(url) + + if is_edit and _url_busy(group, url): log.info("Skipping edited message; already handled %s", url) continue - _mark_url_handled(c.message.group, url) - await self._download_and_send(c, url, clip) + await self._process_one(c, group, sender, url, clip) - async def _download_and_send(self, c: Context, url: str, clip: tuple[int, int] | None = None) -> None: - if clip is not None: - log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1]) - with tempfile.TemporaryDirectory() as tmpdir: - outpath = os.path.join(tmpdir, "video.mp4") - ok, err = self._run_ytdlp(url, outpath, tmpdir, clip) - if not ok: - if _is_no_media_error(err): - # Link just has no video (e.g. a text-only tweet). Stay quiet. - log.info("No video at %s (%s); staying silent", url, err) - return - await c.reply(f"Couldn't grab that video: {err}") + async def _process_one(self, c: Context, group, sender, url, clip) -> None: + key = (group, url) + _inflight.add(key) + try: + if clip is not None: + log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1]) + async with _get_job_semaphore(): + b64, err, silent = await asyncio.to_thread(_produce_video, url, clip) + + if silent: + # A genuine no-media determination: record it so an edit replay + # doesn't pointlessly re-run yt-dlp. + _mark_url_handled(group, url) + return + if err: + # Hard error: do NOT mark handled, so a corrective edit can retry. + await _safe_reply(c, f"Couldn't grab that video: {err}") return - # yt-dlp may produce a slightly different filename - actual_file = None + _mark_url_handled(group, url) + _set_video(group, sender, b64) + await _safe_send_video(c, b64) + except Exception as e: # noqa: BLE001 + log.exception("Unexpected error handling %s: %s", url, e) + await _safe_reply(c, "Something went wrong handling that video.") + finally: + _inflight.discard(key) + + +def _run_ytdlp(url: str, outpath: str, tmpdir: str, + clip: tuple[int, int] | None = None) -> tuple[bool, str]: + """Run yt-dlp with retries. Returns (success, short_reason). + reason is empty on success; otherwise a one-line description suitable for user reply. + When clip is (start, end), only that window is downloaded. + Blocking — call via asyncio.to_thread, never directly on the event loop.""" + cmd = [ + YTDLP, + "--no-playlist", + # YouTube wraps URLs in a JS "n-sig" challenge; node solves it + # via yt-dlp-ejs. Without this, only image/thumb formats resolve. + "--js-runtimes", "node", + ] + if clip is not None: + start, end = clip + # Download only the requested window instead of the whole video — + # essential for hour-long uploads shared with a ?t= timestamp. + # force-keyframes-at-cuts makes the start boundary accurate. + # The filesize filters below key off the *whole* video's size, which + # is irrelevant to a 60s slice, so drop them and just take best mp4. + cmd += [ + "--download-sections", f"*{start}-{end}", + "--force-keyframes-at-cuts", + "-f", "best[ext=mp4]/best", + ] + else: + # Prefer the largest mp4 that already fits under 95 MB, so we avoid + # re-encoding when a smaller variant exists (e.g. a 4K rendition + # >100 MB alongside a 1080p ~50 MB). + cmd += [ + "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", + ] + cmd += [ + "--merge-output-format", "mp4", + *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), + "-o", outpath, + "--", url, + ] + + delays = [0, 3, 8] + last_stderr = "" + for attempt, delay in enumerate(delays, 1): + if delay: + time.sleep(delay) + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=120, + cwd=tmpdir, + ) + except subprocess.TimeoutExpired: + # Don't retry timeouts — three 120s timeouts would block a worker for 6 min. + log.warning("yt-dlp timed out for %s", url) + return False, "yt-dlp timed out after 120s" + + if result.returncode == 0: + return True, "" + + last_stderr = result.stderr + log.warning( + "yt-dlp failed for %s (attempt %d/%d): %s", + url, attempt, len(delays), last_stderr.strip()[-300:], + ) + + return False, _summarize_ytdlp_error(last_stderr) + + +def _produce_video(url: str, clip: tuple[int, int] | None) -> tuple[str | None, str | None, bool]: + """Blocking pipeline: download (+optional clip), re-encode if oversized, base64. + + Returns (b64, error, silent): + success -> (b64, None, False) + no media -> (None, None, True) # stay quiet + failure -> (None, "reason", False) + Run via asyncio.to_thread — does all subprocess/file/base64 work off the loop.""" + with tempfile.TemporaryDirectory() as tmpdir: + outpath = os.path.join(tmpdir, "video.mp4") + ok, err = _run_ytdlp(url, outpath, tmpdir, clip) + if not ok: + if _is_no_media_error(err): + log.info("No video at %s (%s); staying silent", url, err) + return None, None, True + return None, err, False + + # Prefer the known output path; fall back to any mp4 yt-dlp produced. + actual_file = outpath if os.path.exists(outpath) else None + if actual_file is None: for f in os.listdir(tmpdir): if f.endswith(".mp4"): actual_file = os.path.join(tmpdir, f) break + if actual_file is None: + log.warning("No mp4 file found after yt-dlp for %s", url) + return None, "yt-dlp finished but produced no mp4.", False - if actual_file is None: - log.warning("No mp4 file found after yt-dlp for %s", url) - await c.reply("yt-dlp finished but produced no mp4.") - return + file_size = os.path.getsize(actual_file) + if file_size > MAX_FILE_SIZE: + size_mb = file_size // (1024 * 1024) + log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024)) + new_path, reason = _reencode(actual_file, tmpdir) + if new_path is None: + return None, f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.", False + actual_file = new_path - file_size = os.path.getsize(actual_file) - if file_size > MAX_FILE_SIZE: - size_mb = file_size // (1024 * 1024) - log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024)) - new_path, reason = _reencode(actual_file, tmpdir) - if new_path is None: - await c.reply(f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.") - return - actual_file = new_path - - with open(actual_file, "rb") as f: - video_bytes = f.read() - - b64_video = base64.b64encode(video_bytes).decode("utf-8") - _set_video(c.message.group, b64_video) - await c.send("", base64_attachments=[b64_video]) - - def _run_ytdlp(self, url: str, outpath: str, tmpdir: str, - clip: tuple[int, int] | None = None) -> tuple[bool, str]: - """Run yt-dlp with retries. Returns (success, short_reason). - reason is empty on success; otherwise a one-line description suitable for user reply. - When clip is (start, end), only that window is downloaded.""" - cmd = [ - YTDLP, - "--no-playlist", - # YouTube wraps URLs in a JS "n-sig" challenge; node solves it - # via yt-dlp-ejs. Without this, only image/thumb formats resolve. - "--js-runtimes", "node", - ] - if clip is not None: - start, end = clip - # Download only the requested window instead of the whole video — - # essential for hour-long uploads shared with a ?t= timestamp. - # force-keyframes-at-cuts makes the start boundary accurate. - # The filesize filters below key off the *whole* video's size, which - # is irrelevant to a 60s slice, so drop them and just take best mp4. - cmd += [ - "--download-sections", f"*{start}-{end}", - "--force-keyframes-at-cuts", - "-f", "best[ext=mp4]/best", - ] - else: - # Prefer the largest mp4 that already fits under 95 MB, so we avoid - # re-encoding when a smaller variant exists (e.g. a 4K rendition - # >100 MB alongside a 1080p ~50 MB). - cmd += [ - "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", - ] - cmd += [ - "--merge-output-format", "mp4", - *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), - "-o", outpath, - url, - ] - - delays = [0, 3, 8] - last_stderr = "" - for attempt, delay in enumerate(delays, 1): - if delay: - time.sleep(delay) - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=120, - cwd=tmpdir, - ) - except subprocess.TimeoutExpired: - # Don't retry timeouts — three 120s timeouts would block the consumer for 6 min. - log.warning("yt-dlp timed out for %s", url) - return False, "yt-dlp timed out after 120s" - - if result.returncode == 0: - return True, "" - - last_stderr = result.stderr - log.warning( - "yt-dlp failed for %s (attempt %d/%d): %s", - url, attempt, len(delays), last_stderr.strip()[-300:], - ) - - return False, _summarize_ytdlp_error(last_stderr) + with open(actual_file, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8"), None, False def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: @@ -321,7 +443,7 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: Returns (path, reason). On success, path is set and reason is "". On failure, path is None and reason is a short human-readable cause. - """ + Blocking — only called from within _produce_video/_speed_video/_reverse_video.""" outpath = os.path.join(tmpdir, "reencoded.mp4") try: probe = subprocess.run( @@ -330,10 +452,17 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: capture_output=True, text=True, timeout=30, ) duration = float(probe.stdout.strip()) - except Exception: + except Exception: # noqa: BLE001 log.warning("Could not probe video duration") return None, "could not read video duration" + # Guard against 0/NaN duration before the bitrate math (else ZeroDivisionError + # or "cannot convert float NaN to integer" would escape and silently kill the + # handler with no reply to the user). `not (duration > 0)` catches 0, negative, NaN. + if not (duration > 0): + log.warning("Invalid video duration from ffprobe: %r", duration) + return None, "could not read video duration" + # Target 95 MB to leave headroom target_bytes = 95 * 1024 * 1024 # Total bitrate in kbps; reserve 128k for audio @@ -346,6 +475,8 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: [ "ffmpeg", "-y", "-i", input_file, "-c:v", "libx264", "-b:v", f"{video_bitrate}k", + # Cap peaks so VBR overshoot can't blow past the target size. + "-maxrate", f"{video_bitrate}k", "-bufsize", f"{2 * video_bitrate}k", "-c:a", "aac", "-b:a", f"{audio_bitrate}k", "-preset", "fast", "-movflags", "+faststart", @@ -381,7 +512,7 @@ def _has_audio_stream(path: str) -> bool: "-show_entries", "stream=index", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) - except Exception: + except Exception: # noqa: BLE001 return True # assume yes; ffmpeg will fail loudly if it's wrong return bool(result.stdout.strip()) @@ -393,7 +524,7 @@ def _audio_sample_rate(path: str) -> int | None: "-show_entries", "stream=sample_rate", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) - except Exception: + except Exception: # noqa: BLE001 return None out = r.stdout.strip() try: @@ -402,6 +533,107 @@ def _audio_sample_rate(path: str) -> int | None: return None +def _speed_video(b64: str, speed: float) -> tuple[str | None, str | None]: + """Blocking: change playback speed. Returns (b64, error). Run via to_thread.""" + with tempfile.TemporaryDirectory() as tmpdir: + inpath = os.path.join(tmpdir, "input.mp4") + outpath = os.path.join(tmpdir, "sped.mp4") + + try: + data = base64.b64decode(b64) + except (binascii.Error, ValueError): + return None, "That video looks corrupted — share it again." + with open(inpath, "wb") as f: + f.write(data) + + cmd = [ + "ffmpeg", "-y", "-i", inpath, + "-filter:v", f"setpts={1.0/speed:g}*PTS", + ] + if _has_audio_stream(inpath): + # asetrate scales the sample rate (which shifts pitch AND + # tempo, the tape-speed effect); aresample brings the data + # rate back to a standard playback rate without undoing it. + rate = _audio_sample_rate(inpath) or 48000 + cmd += [ + "-filter:a", + f"asetrate={int(rate * speed)},aresample={rate}", + ] + else: + cmd += ["-an"] + cmd += ["-preset", "fast", "-movflags", "+faststart", outpath] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) + except subprocess.TimeoutExpired: + log.warning("ffmpeg timed out speeding up video") + return None, "Timed out speeding up that video." + + if result.returncode != 0: + log.warning("ffmpeg /speed failed: %s", result.stderr[-500:]) + return None, "Failed to speed up that video." + + final_file = outpath + size = os.path.getsize(final_file) + if size > MAX_FILE_SIZE: + size_mb = size // (1024 * 1024) + log.info("Sped-up video is %d MB, re-encoding to fit", size_mb) + new_path, reason = _reencode(final_file, tmpdir) + if new_path is None: + return None, f"Sped-up video is too large ({size_mb} MB) and re-encoding failed: {reason}." + final_file = new_path + + with open(final_file, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8"), None + + +def _reverse_video(b64: str) -> tuple[str | None, str | None]: + """Blocking: reverse a video. Returns (b64, error). Run via to_thread.""" + with tempfile.TemporaryDirectory() as tmpdir: + inpath = os.path.join(tmpdir, "input.mp4") + outpath = os.path.join(tmpdir, "reversed.mp4") + + try: + data = base64.b64decode(b64) + except (binascii.Error, ValueError): + return None, "That video looks corrupted — share it again." + with open(inpath, "wb") as f: + f.write(data) + + try: + result = subprocess.run( + [ + "ffmpeg", "-i", inpath, + "-vf", "reverse", + "-af", "areverse", + "-preset", "fast", + "-movflags", "+faststart", + outpath, + ], + capture_output=True, text=True, timeout=120, + ) + except subprocess.TimeoutExpired: + log.warning("ffmpeg timed out reversing video") + return None, "Timed out reversing that video." + + if result.returncode != 0: + log.warning("ffmpeg reverse failed: %s", result.stderr[-500:]) + return None, "Failed to reverse that video." + + final_file = outpath + size = os.path.getsize(final_file) + if size > MAX_FILE_SIZE: + size_mb = size // (1024 * 1024) + log.info("Reversed video is %d MB, re-encoding to fit", size_mb) + new_path, reason = _reencode(final_file, tmpdir) + if new_path is None: + return None, f"Reversed video is too large ({size_mb} MB) and re-encoding failed: {reason}." + final_file = new_path + + with open(final_file, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8"), None + + class SpeedCommand(Command): SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE) @@ -420,79 +652,30 @@ class SpeedCommand(Command): try: speed = float(speed_arg.rstrip("xX")) except ValueError: - await c.reply(f"`{speed_arg}` isn't a number. Try `/speed 2`.") + await _safe_reply(c, f"`{speed_arg}` isn't a number. Try `/speed 2`.") return if not (0.1 <= speed <= 100.0): - await c.reply(f"Speed must be between 0.1 and 100 (got {speed:g}).") + await _safe_reply(c, f"Speed must be between 0.1 and 100 (got {speed:g}).") return if speed == 1.0: - await c.reply("1x wouldn't change anything.") + await _safe_reply(c, "1x wouldn't change anything.") return - b64 = _get_video(c.message.group) + sender = _sender_number(c.message) + b64 = _get_video(c.message.group, sender) if not b64: - await c.reply("No video to speed up.") + await _safe_reply(c, "No video to speed up.") return - with tempfile.TemporaryDirectory() as tmpdir: - inpath = os.path.join(tmpdir, "input.mp4") - outpath = os.path.join(tmpdir, "sped.mp4") + async with _get_job_semaphore(): + out, err = await asyncio.to_thread(_speed_video, b64, speed) + if err: + await _safe_reply(c, err) + return - with open(inpath, "wb") as f: - f.write(base64.b64decode(b64)) - - cmd = [ - "ffmpeg", "-y", "-i", inpath, - "-filter:v", f"setpts={1.0/speed:g}*PTS", - ] - if _has_audio_stream(inpath): - # asetrate scales the sample rate (which shifts pitch AND - # tempo, the tape-speed effect); aresample brings the data - # rate back to a standard playback rate without undoing it. - rate = _audio_sample_rate(inpath) or 48000 - cmd += [ - "-filter:a", - f"asetrate={int(rate * speed)},aresample={rate}", - ] - else: - cmd += ["-an"] - cmd += ["-preset", "fast", "-movflags", "+faststart", outpath] - - try: - result = subprocess.run( - cmd, capture_output=True, text=True, timeout=180, - ) - except subprocess.TimeoutExpired: - log.warning("ffmpeg timed out speeding up video") - await c.reply("Timed out speeding up that video.") - return - - if result.returncode != 0: - log.warning("ffmpeg /speed failed: %s", result.stderr[-500:]) - await c.reply("Failed to speed up that video.") - return - - final_file = outpath - size = os.path.getsize(final_file) - if size > MAX_FILE_SIZE: - size_mb = size // (1024 * 1024) - log.info("Sped-up video is %d MB, re-encoding to fit", size_mb) - new_path, reason = _reencode(final_file, tmpdir) - if new_path is None: - await c.reply( - f"Sped-up video is too large ({size_mb} MB) and " - f"re-encoding failed: {reason}." - ) - return - final_file = new_path - - with open(final_file, "rb") as f: - sped_bytes = f.read() - - b64_sped = base64.b64encode(sped_bytes).decode("utf-8") - _set_video(c.message.group, b64_sped) - await c.send("", base64_attachments=[b64_sped]) + _set_video(c.message.group, sender, out) + await _safe_send_video(c, out) class ReverseCommand(Command): @@ -501,47 +684,20 @@ class ReverseCommand(Command): if not c.message.is_group(): return - b64 = _get_video(c.message.group) + sender = _sender_number(c.message) + b64 = _get_video(c.message.group, sender) if not b64: - await c.reply("No video to reverse.") + await _safe_reply(c, "No video to reverse.") return - with tempfile.TemporaryDirectory() as tmpdir: - inpath = os.path.join(tmpdir, "input.mp4") - outpath = os.path.join(tmpdir, "reversed.mp4") + async with _get_job_semaphore(): + out, err = await asyncio.to_thread(_reverse_video, b64) + if err: + await _safe_reply(c, err) + return - with open(inpath, "wb") as f: - f.write(base64.b64decode(b64)) - - try: - result = subprocess.run( - [ - "ffmpeg", "-i", inpath, - "-vf", "reverse", - "-af", "areverse", - "-preset", "fast", - outpath, - ], - capture_output=True, - text=True, - timeout=120, - ) - except subprocess.TimeoutExpired: - log.warning("ffmpeg timed out reversing video") - await c.reply("Timed out reversing that video.") - return - - if result.returncode != 0: - log.warning("ffmpeg failed: %s", result.stderr) - await c.reply("Failed to reverse that video.") - return - - with open(outpath, "rb") as f: - reversed_bytes = f.read() - - b64_reversed = base64.b64encode(reversed_bytes).decode("utf-8") - _set_video(c.message.group, b64_reversed) - await c.send("", base64_attachments=[b64_reversed]) + _set_video(c.message.group, sender, out) + await _safe_send_video(c, out) HELP_TEXT = f"""🎬 Video bot — what I can do @@ -572,7 +728,7 @@ class HelpCommand(Command): return if (c.message.text or "").strip().lower() not in ("/help", "/commands"): return - await c.reply(HELP_TEXT) + await _safe_reply(c, HELP_TEXT) def _sender_number(msg) -> str | None: @@ -583,7 +739,7 @@ def _sender_number(msg) -> str | None: try: env = json.loads(msg.raw_message)["envelope"] return env.get("source") or env.get("sourceNumber") - except Exception: + except Exception: # noqa: BLE001 return None @@ -611,11 +767,11 @@ class CookiesCommand(Command): ig_lines.append(normalized) if not ig_lines: - await c.reply("No `.instagram.com` cookie lines found.") + await _safe_reply(c, "No `.instagram.com` cookie lines found.") return if not any("\tsessionid\t" in ln for ln in ig_lines): - await c.reply("Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.") + await _safe_reply(c, "Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.") return try: @@ -627,14 +783,16 @@ class CookiesCommand(Command): kept = [ln for ln in existing if not cookie_re.match(ln)] new_content = "\n".join(kept + ig_lines) + "\n" + # Create the temp file 0600 from the start: open()+chmod left a brief + # world-readable window exposing the Instagram session cookie. tmppath = COOKIES + ".tmp" - with open(tmppath, "w") as f: + fd = os.open(tmppath, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "w") as f: f.write(new_content) - os.chmod(tmppath, 0o600) os.replace(tmppath, COOKIES) log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender) - await c.reply(f"Installed {len(ig_lines)} Instagram cookies.") + await _safe_reply(c, f"Installed {len(ig_lines)} Instagram cookies.") def main(): @@ -649,6 +807,9 @@ def main(): bot = SignalBot({ "signal_service": signal_service, "phone_number": phone_number, + # Don't let the library download+base64 every attachment of every message + # on the producer loop. VideoTracker fetches only video attachments lazily. + "download_attachments": False, }) bot.register(VideoTracker(), contacts=False, groups=True) bot.register(VideoCommand(), contacts=False, groups=True) diff --git a/docker-compose.yml b/docker-compose.yml index 270eaea..b78a79d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,10 @@ services: signal-cli-rest-api: - image: bbernhard/signal-cli-rest-api:latest + # Pinned by digest so a re-pull/recreate can't silently swap in a build with + # a protocol change (the getServerGuid-style breakage). This digest is + # signal-cli-rest-api with signal-cli 0.100 (build 2). Upgrade deliberately: + # `docker pull bbernhard/signal-cli-rest-api:latest`, test, then update this. + image: bbernhard/signal-cli-rest-api@sha256:2399d449123cdad56c4d859277e3b9127e1a00c4d2ab4601c239882609286cf8 container_name: signal-cli-rest-api restart: unless-stopped environment: @@ -9,3 +13,9 @@ services: - "127.0.0.1:8080:8080" volumes: - ./signal-cli-data:/home/.local/share/signal-cli + healthcheck: + test: ["CMD", "curl", "-fsS", "http://127.0.0.1:8080/v1/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 30s diff --git a/requirements.txt b/requirements.txt index 8b67f80..a3c15e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -signalbot -yt-dlp +signalbot==1.1.0 +# yt-dlp is kept current for site/extractor changes; bump the floor deliberately. +yt-dlp>=2026.3.17 # Needed for YouTube: yt-dlp wraps URLs in a JS "n-sig" challenge that a JS # runtime must solve. Requires a system `node` on PATH plus this package. -yt-dlp-ejs +yt-dlp-ejs>=0.8.0 diff --git a/signal-bot.service.example b/signal-bot.service.example index 4c2ddae..7f06575 100644 --- a/signal-bot.service.example +++ b/signal-bot.service.example @@ -13,7 +13,9 @@ Environment=SIGNAL_SERVICE=127.0.0.1:8080 # Optional: comma-separated numbers allowed to run /cookies (admin command). # Environment=BOT_ADMINS=+15551234567 ExecStart=/home/YOUR_USER/signal-bot/venv/bin/python bot.py -Restart=on-failure +# always (not on-failure): the bot's library swallows exceptions in its own run +# loop and never exits non-zero even when wedged, so on-failure would never bounce it. +Restart=always RestartSec=10 [Install]