import base64 import json import logging import os import re import subprocess import tempfile import time from signalbot import Command, Context, SignalBot from signalbot.command import regex_triggered, triggered from signalbot.message import MessageType TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/.+/status/\d+" INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+" YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+" TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})" MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} VIDEO_CONTENT_TYPES = ("video/mp4", "video/webm", "video/quicktime", "video/3gpp", "video/mpeg") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) log = logging.getLogger("signal-bot") # group_id -> {"b64": ..., "time": ...} last_video = {} VIDEO_TTL = 3600 # 1 hour # (group_id, url) -> monotonic time the bot started handling this URL. # When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE # with the same text — without this guard the bot re-downloads and re-posts the video. recent_urls = {} RECENT_URL_TTL = 600 # 10 min def _set_video(group_id, b64): last_video[group_id] = {"b64": b64, "time": time.monotonic()} def _get_video(group_id): entry = last_video.get(group_id) if not entry: return None if time.monotonic() - entry["time"] > VIDEO_TTL: del last_video[group_id] return None return entry["b64"] def _url_recently_handled(group_id, url): key = (group_id, url) t = recent_urls.get(key) if t is None: return False if time.monotonic() - t > RECENT_URL_TTL: del recent_urls[key] return False return True def _mark_url_handled(group_id, url): recent_urls[(group_id, url)] = time.monotonic() # Errors that mean "the link simply has no downloadable video" rather than a # genuine failure. We stay silent for these — the bot watches every message with # a link, so most links legitimately have no video and shouldn't draw a complaint. _NO_MEDIA_ERROR_PATTERNS = ( "no video could be found", "there's no video", "no media found", "no video formats found", "unsupported url", ) def _is_no_media_error(err: str) -> bool: e = err.lower() return any(p in e for p in _NO_MEDIA_ERROR_PATTERNS) def _summarize_ytdlp_error(stderr: str) -> str: """Pull a short, user-readable reason out of yt-dlp stderr.""" if not stderr: return "unknown error" error_lines = [ln.strip() for ln in stderr.splitlines() if ln.startswith("ERROR:")] if error_lines: msg = error_lines[-1][len("ERROR:"):].strip() # Strip "[extractor] video_id:" prefix yt-dlp prepends. msg = re.sub(r"^\[[^\]]+\]\s+\S+?:\s*", "", msg) # Trim verbose "Use --cookies..." tails that aren't useful to a chat user. msg = re.split(r"\s+(?:Use --cookies|See https?://)", msg, maxsplit=1)[0] return msg[:240].rstrip(". ") lines = [ln.strip() for ln in stderr.splitlines() if ln.strip()] return lines[-1][:240] if lines else "unknown error" class VideoTracker(Command): """Watches all group messages for video attachments and stores the last one.""" async def handle(self, c: Context) -> None: if not c.message.is_group(): return if not c.message.base64_attachments: return # Check raw message for video content types try: raw = json.loads(c.message.raw_message) envelope = raw["envelope"] data = envelope.get("dataMessage") or envelope.get("syncMessage", {}).get("sentMessage", {}) attachments = data.get("attachments", []) except Exception: return for i, att in enumerate(attachments): content_type = att.get("contentType", "") if content_type.startswith("video/"): if i < len(c.message.base64_attachments): _set_video(c.message.group, c.message.base64_attachments[i]) log.info("Stored received video for group %s", c.message.group) return class VideoCommand(Command): @regex_triggered(VIDEO_URL_PATTERN) async def handle(self, c: Context) -> None: if not c.message.is_group(): return urls = re.findall(VIDEO_URL_PATTERN, c.message.text) if not urls: return is_edit = c.message.type == MessageType.EDIT_MESSAGE for url in urls: # Normalize fxtwitter/vxtwitter wrappers to x.com url = re.sub( r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)", "https://x.com", url, ) if is_edit and _url_recently_handled(c.message.group, url): log.info("Skipping edited message; already handled %s", url) continue _mark_url_handled(c.message.group, url) await self._download_and_send(c, url) async def _download_and_send(self, c: Context, url: str) -> None: with tempfile.TemporaryDirectory() as tmpdir: outpath = os.path.join(tmpdir, "video.mp4") ok, err = self._run_ytdlp(url, outpath, tmpdir) if not ok: if _is_no_media_error(err): # Link just has no video (e.g. a text-only tweet). Stay quiet. log.info("No video at %s (%s); staying silent", url, err) return await c.reply(f"Couldn't grab that video: {err}") return # yt-dlp may produce a slightly different filename actual_file = None for f in os.listdir(tmpdir): if f.endswith(".mp4"): actual_file = os.path.join(tmpdir, f) break if actual_file is None: log.warning("No mp4 file found after yt-dlp for %s", url) await c.reply("yt-dlp finished but produced no mp4.") return file_size = os.path.getsize(actual_file) if file_size > MAX_FILE_SIZE: size_mb = file_size // (1024 * 1024) log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024)) new_path, reason = _reencode(actual_file, tmpdir) if new_path is None: await c.reply(f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.") return actual_file = new_path with open(actual_file, "rb") as f: video_bytes = f.read() b64_video = base64.b64encode(video_bytes).decode("utf-8") _set_video(c.message.group, b64_video) await c.send("", base64_attachments=[b64_video]) def _run_ytdlp(self, url: str, outpath: str, tmpdir: str) -> tuple[bool, str]: """Run yt-dlp with retries. Returns (success, short_reason). reason is empty on success; otherwise a one-line description suitable for user reply.""" delays = [0, 3, 8] last_stderr = "" for attempt, delay in enumerate(delays, 1): if delay: time.sleep(delay) try: result = subprocess.run( [ YTDLP, "--no-playlist", # YouTube wraps URLs in a JS "n-sig" challenge; node solves it # via yt-dlp-ejs. Without this, only image/thumb formats resolve. "--js-runtimes", "node", # Prefer the largest mp4 that already fits under 95 MB, # so we avoid re-encoding when a smaller variant exists # (e.g. a 4K rendition >100 MB alongside a 1080p ~50 MB). "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", "--merge-output-format", "mp4", *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), "-o", outpath, url, ], capture_output=True, text=True, timeout=120, cwd=tmpdir, ) except subprocess.TimeoutExpired: # Don't retry timeouts — three 120s timeouts would block the consumer for 6 min. log.warning("yt-dlp timed out for %s", url) return False, "yt-dlp timed out after 120s" if result.returncode == 0: return True, "" last_stderr = result.stderr log.warning( "yt-dlp failed for %s (attempt %d/%d): %s", url, attempt, len(delays), last_stderr.strip()[-300:], ) return False, _summarize_ytdlp_error(last_stderr) def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: """Re-encode video with ffmpeg to fit under MAX_FILE_SIZE. Returns (path, reason). On success, path is set and reason is "". On failure, path is None and reason is a short human-readable cause. """ outpath = os.path.join(tmpdir, "reencoded.mp4") try: probe = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", input_file], capture_output=True, text=True, timeout=30, ) duration = float(probe.stdout.strip()) except Exception: log.warning("Could not probe video duration") return None, "could not read video duration" # Target 95 MB to leave headroom target_bytes = 95 * 1024 * 1024 # Total bitrate in kbps; reserve 128k for audio audio_bitrate = 128 total_bitrate = int((target_bytes * 8) / duration / 1000) video_bitrate = max(total_bitrate - audio_bitrate, 200) try: result = subprocess.run( [ "ffmpeg", "-y", "-i", input_file, "-c:v", "libx264", "-b:v", f"{video_bitrate}k", "-c:a", "aac", "-b:a", f"{audio_bitrate}k", "-preset", "fast", "-movflags", "+faststart", outpath, ], capture_output=True, text=True, timeout=300, ) except subprocess.TimeoutExpired: log.warning("ffmpeg re-encode timed out") return None, "ffmpeg timed out after 300s" if result.returncode != 0: log.warning("ffmpeg re-encode failed: %s", result.stderr[-500:]) stderr_lines = [ln for ln in result.stderr.strip().splitlines() if ln.strip()] last_line = stderr_lines[-1] if stderr_lines else "no stderr" return None, f"ffmpeg exited {result.returncode} ({last_line[:160]})" final_size = os.path.getsize(outpath) if final_size > MAX_FILE_SIZE: final_mb = final_size // (1024 * 1024) log.warning("Re-encoded video still too large: %d MB", final_mb) return None, f"output still {final_mb} MB after re-encode (duration {int(duration)}s)" log.info("Re-encoded video from %d MB to %d MB", os.path.getsize(input_file) // (1024 * 1024), final_size // (1024 * 1024)) return outpath, "" def _has_audio_stream(path: str) -> bool: try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) except Exception: return True # assume yes; ffmpeg will fail loudly if it's wrong return bool(result.stdout.strip()) def _audio_sample_rate(path: str) -> int | None: try: r = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=sample_rate", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) except Exception: return None out = r.stdout.strip() try: return int(out) if out else None except ValueError: return None class SpeedCommand(Command): SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE) async def handle(self, c: Context) -> None: if not c.message.is_group(): return text = (c.message.text or "").strip() m = self.SPEED_RE.match(text) if not m: return speed_arg = m.group(1) if speed_arg is None: speed = 2.0 else: try: speed = float(speed_arg.rstrip("xX")) except ValueError: await c.reply(f"`{speed_arg}` isn't a number. Try `/speed 2`.") return if not (0.1 <= speed <= 100.0): await c.reply(f"Speed must be between 0.1 and 100 (got {speed:g}).") return if speed == 1.0: await c.reply("1x wouldn't change anything.") return b64 = _get_video(c.message.group) if not b64: await c.reply("No video to speed up.") return with tempfile.TemporaryDirectory() as tmpdir: inpath = os.path.join(tmpdir, "input.mp4") outpath = os.path.join(tmpdir, "sped.mp4") with open(inpath, "wb") as f: f.write(base64.b64decode(b64)) cmd = [ "ffmpeg", "-y", "-i", inpath, "-filter:v", f"setpts={1.0/speed:g}*PTS", ] if _has_audio_stream(inpath): # asetrate scales the sample rate (which shifts pitch AND # tempo, the tape-speed effect); aresample brings the data # rate back to a standard playback rate without undoing it. rate = _audio_sample_rate(inpath) or 48000 cmd += [ "-filter:a", f"asetrate={int(rate * speed)},aresample={rate}", ] else: cmd += ["-an"] cmd += ["-preset", "fast", "-movflags", "+faststart", outpath] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=180, ) except subprocess.TimeoutExpired: log.warning("ffmpeg timed out speeding up video") await c.reply("Timed out speeding up that video.") return if result.returncode != 0: log.warning("ffmpeg /speed failed: %s", result.stderr[-500:]) await c.reply("Failed to speed up that video.") return final_file = outpath size = os.path.getsize(final_file) if size > MAX_FILE_SIZE: size_mb = size // (1024 * 1024) log.info("Sped-up video is %d MB, re-encoding to fit", size_mb) new_path, reason = _reencode(final_file, tmpdir) if new_path is None: await c.reply( f"Sped-up video is too large ({size_mb} MB) and " f"re-encoding failed: {reason}." ) return final_file = new_path with open(final_file, "rb") as f: sped_bytes = f.read() b64_sped = base64.b64encode(sped_bytes).decode("utf-8") _set_video(c.message.group, b64_sped) await c.send("", base64_attachments=[b64_sped]) class ReverseCommand(Command): @triggered("/rev") async def handle(self, c: Context) -> None: if not c.message.is_group(): return b64 = _get_video(c.message.group) if not b64: await c.reply("No video to reverse.") return with tempfile.TemporaryDirectory() as tmpdir: inpath = os.path.join(tmpdir, "input.mp4") outpath = os.path.join(tmpdir, "reversed.mp4") with open(inpath, "wb") as f: f.write(base64.b64decode(b64)) try: result = subprocess.run( [ "ffmpeg", "-i", inpath, "-vf", "reverse", "-af", "areverse", "-preset", "fast", outpath, ], capture_output=True, text=True, timeout=120, ) except subprocess.TimeoutExpired: log.warning("ffmpeg timed out reversing video") await c.reply("Timed out reversing that video.") return if result.returncode != 0: log.warning("ffmpeg failed: %s", result.stderr) await c.reply("Failed to reverse that video.") return with open(outpath, "rb") as f: reversed_bytes = f.read() b64_reversed = base64.b64encode(reversed_bytes).decode("utf-8") _set_video(c.message.group, b64_reversed) await c.send("", base64_attachments=[b64_reversed]) def _sender_number(msg) -> str | None: for attr in ("source", "source_number", "sourceNumber"): v = getattr(msg, attr, None) if v: return v try: env = json.loads(msg.raw_message)["envelope"] return env.get("source") or env.get("sourceNumber") except Exception: return None class CookiesCommand(Command): async def handle(self, c: Context) -> None: text = c.message.text or "" if not text.startswith("/cookies"): return if c.message.is_group(): return sender = _sender_number(c.message) if not ADMIN_NUMBERS or sender not in ADMIN_NUMBERS: log.warning("Refused /cookies from %r (admins=%s)", sender, ADMIN_NUMBERS or "") return body = text.split("\n", 1)[1] if "\n" in text else "" cookie_re = re.compile(r"^\.?instagram\.com\b") ig_lines = [] for line in body.splitlines(): if not cookie_re.match(line): continue # Browser pastes can replace tabs with runs of spaces; restore tabs. normalized = re.sub(r"[ \t]{2,}", "\t", line) ig_lines.append(normalized) if not ig_lines: await c.reply("No `.instagram.com` cookie lines found.") return if not any("\tsessionid\t" in ln for ln in ig_lines): await c.reply("Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.") return try: with open(COOKIES, "r") as f: existing = f.read().splitlines() except FileNotFoundError: existing = ["# Netscape HTTP Cookie File", ""] kept = [ln for ln in existing if not cookie_re.match(ln)] new_content = "\n".join(kept + ig_lines) + "\n" tmppath = COOKIES + ".tmp" with open(tmppath, "w") as f: f.write(new_content) os.chmod(tmppath, 0o600) os.replace(tmppath, COOKIES) log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender) await c.reply(f"Installed {len(ig_lines)} Instagram cookies.") def main(): phone_number = os.environ.get("SIGNAL_PHONE_NUMBER") signal_service = os.environ.get("SIGNAL_SERVICE", "127.0.0.1:8080") if not phone_number: print("Error: SIGNAL_PHONE_NUMBER environment variable is required.") print("Example: export SIGNAL_PHONE_NUMBER='+15551234567'") raise SystemExit(1) bot = SignalBot({ "signal_service": signal_service, "phone_number": phone_number, }) bot.register(VideoTracker(), contacts=False, groups=True) bot.register(VideoCommand(), contacts=False, groups=True) bot.register(ReverseCommand(), contacts=False, groups=True) bot.register(SpeedCommand(), contacts=False, groups=True) bot.register(CookiesCommand(), contacts=True, groups=False) log.info("Starting Signal video bot...") bot.start() if __name__ == "__main__": main()