Files
James Price 0f9030b72e Initial commit: Signal video-grabber bot
Group-chat bot that downloads videos from X/Instagram/YouTube/TikTok links
via yt-dlp and posts them back, plus /speed and /rev video toys.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 01:53:14 -04:00

560 lines
21 KiB
Python

import base64
import json
import logging
import os
import re
import subprocess
import tempfile
import time
from signalbot import Command, Context, SignalBot
from signalbot.command import regex_triggered, triggered
from signalbot.message import MessageType
TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/.+/status/\d+"
INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+"
YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+"
TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)"
VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})"
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp")
COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt")
ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()}
VIDEO_CONTENT_TYPES = ("video/mp4", "video/webm", "video/quicktime", "video/3gpp", "video/mpeg")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("signal-bot")
# group_id -> {"b64": ..., "time": ...}
last_video = {}
VIDEO_TTL = 3600 # 1 hour
# (group_id, url) -> monotonic time the bot started handling this URL.
# When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE
# with the same text — without this guard the bot re-downloads and re-posts the video.
recent_urls = {}
RECENT_URL_TTL = 600 # 10 min
def _set_video(group_id, b64):
last_video[group_id] = {"b64": b64, "time": time.monotonic()}
def _get_video(group_id):
entry = last_video.get(group_id)
if not entry:
return None
if time.monotonic() - entry["time"] > VIDEO_TTL:
del last_video[group_id]
return None
return entry["b64"]
def _url_recently_handled(group_id, url):
key = (group_id, url)
t = recent_urls.get(key)
if t is None:
return False
if time.monotonic() - t > RECENT_URL_TTL:
del recent_urls[key]
return False
return True
def _mark_url_handled(group_id, url):
recent_urls[(group_id, url)] = time.monotonic()
# Errors that mean "the link simply has no downloadable video" rather than a
# genuine failure. We stay silent for these — the bot watches every message with
# a link, so most links legitimately have no video and shouldn't draw a complaint.
_NO_MEDIA_ERROR_PATTERNS = (
"no video could be found",
"there's no video",
"no media found",
"no video formats found",
"unsupported url",
)
def _is_no_media_error(err: str) -> bool:
e = err.lower()
return any(p in e for p in _NO_MEDIA_ERROR_PATTERNS)
def _summarize_ytdlp_error(stderr: str) -> str:
"""Pull a short, user-readable reason out of yt-dlp stderr."""
if not stderr:
return "unknown error"
error_lines = [ln.strip() for ln in stderr.splitlines() if ln.startswith("ERROR:")]
if error_lines:
msg = error_lines[-1][len("ERROR:"):].strip()
# Strip "[extractor] video_id:" prefix yt-dlp prepends.
msg = re.sub(r"^\[[^\]]+\]\s+\S+?:\s*", "", msg)
# Trim verbose "Use --cookies..." tails that aren't useful to a chat user.
msg = re.split(r"\s+(?:Use --cookies|See https?://)", msg, maxsplit=1)[0]
return msg[:240].rstrip(". ")
lines = [ln.strip() for ln in stderr.splitlines() if ln.strip()]
return lines[-1][:240] if lines else "unknown error"
class VideoTracker(Command):
"""Watches all group messages for video attachments and stores the last one."""
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
if not c.message.base64_attachments:
return
# Check raw message for video content types
try:
raw = json.loads(c.message.raw_message)
envelope = raw["envelope"]
data = envelope.get("dataMessage") or envelope.get("syncMessage", {}).get("sentMessage", {})
attachments = data.get("attachments", [])
except Exception:
return
for i, att in enumerate(attachments):
content_type = att.get("contentType", "")
if content_type.startswith("video/"):
if i < len(c.message.base64_attachments):
_set_video(c.message.group, c.message.base64_attachments[i])
log.info("Stored received video for group %s", c.message.group)
return
class VideoCommand(Command):
@regex_triggered(VIDEO_URL_PATTERN)
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
urls = re.findall(VIDEO_URL_PATTERN, c.message.text)
if not urls:
return
is_edit = c.message.type == MessageType.EDIT_MESSAGE
for url in urls:
# Normalize fxtwitter/vxtwitter wrappers to x.com
url = re.sub(
r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)",
"https://x.com",
url,
)
if is_edit and _url_recently_handled(c.message.group, url):
log.info("Skipping edited message; already handled %s", url)
continue
_mark_url_handled(c.message.group, url)
await self._download_and_send(c, url)
async def _download_and_send(self, c: Context, url: str) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
outpath = os.path.join(tmpdir, "video.mp4")
ok, err = self._run_ytdlp(url, outpath, tmpdir)
if not ok:
if _is_no_media_error(err):
# Link just has no video (e.g. a text-only tweet). Stay quiet.
log.info("No video at %s (%s); staying silent", url, err)
return
await c.reply(f"Couldn't grab that video: {err}")
return
# yt-dlp may produce a slightly different filename
actual_file = None
for f in os.listdir(tmpdir):
if f.endswith(".mp4"):
actual_file = os.path.join(tmpdir, f)
break
if actual_file is None:
log.warning("No mp4 file found after yt-dlp for %s", url)
await c.reply("yt-dlp finished but produced no mp4.")
return
file_size = os.path.getsize(actual_file)
if file_size > MAX_FILE_SIZE:
size_mb = file_size // (1024 * 1024)
log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024))
new_path, reason = _reencode(actual_file, tmpdir)
if new_path is None:
await c.reply(f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.")
return
actual_file = new_path
with open(actual_file, "rb") as f:
video_bytes = f.read()
b64_video = base64.b64encode(video_bytes).decode("utf-8")
_set_video(c.message.group, b64_video)
await c.send("", base64_attachments=[b64_video])
def _run_ytdlp(self, url: str, outpath: str, tmpdir: str) -> tuple[bool, str]:
"""Run yt-dlp with retries. Returns (success, short_reason).
reason is empty on success; otherwise a one-line description suitable for user reply."""
delays = [0, 3, 8]
last_stderr = ""
for attempt, delay in enumerate(delays, 1):
if delay:
time.sleep(delay)
try:
result = subprocess.run(
[
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
# Prefer the largest mp4 that already fits under 95 MB,
# so we avoid re-encoding when a smaller variant exists
# (e.g. a 4K rendition >100 MB alongside a 1080p ~50 MB).
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
url,
],
capture_output=True,
text=True,
timeout=120,
cwd=tmpdir,
)
except subprocess.TimeoutExpired:
# Don't retry timeouts — three 120s timeouts would block the consumer for 6 min.
log.warning("yt-dlp timed out for %s", url)
return False, "yt-dlp timed out after 120s"
if result.returncode == 0:
return True, ""
last_stderr = result.stderr
log.warning(
"yt-dlp failed for %s (attempt %d/%d): %s",
url, attempt, len(delays), last_stderr.strip()[-300:],
)
return False, _summarize_ytdlp_error(last_stderr)
def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
"""Re-encode video with ffmpeg to fit under MAX_FILE_SIZE.
Returns (path, reason). On success, path is set and reason is "".
On failure, path is None and reason is a short human-readable cause.
"""
outpath = os.path.join(tmpdir, "reencoded.mp4")
try:
probe = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", input_file],
capture_output=True, text=True, timeout=30,
)
duration = float(probe.stdout.strip())
except Exception:
log.warning("Could not probe video duration")
return None, "could not read video duration"
# Target 95 MB to leave headroom
target_bytes = 95 * 1024 * 1024
# Total bitrate in kbps; reserve 128k for audio
audio_bitrate = 128
total_bitrate = int((target_bytes * 8) / duration / 1000)
video_bitrate = max(total_bitrate - audio_bitrate, 200)
try:
result = subprocess.run(
[
"ffmpeg", "-y", "-i", input_file,
"-c:v", "libx264", "-b:v", f"{video_bitrate}k",
"-c:a", "aac", "-b:a", f"{audio_bitrate}k",
"-preset", "fast",
"-movflags", "+faststart",
outpath,
],
capture_output=True, text=True, timeout=300,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg re-encode timed out")
return None, "ffmpeg timed out after 300s"
if result.returncode != 0:
log.warning("ffmpeg re-encode failed: %s", result.stderr[-500:])
stderr_lines = [ln for ln in result.stderr.strip().splitlines() if ln.strip()]
last_line = stderr_lines[-1] if stderr_lines else "no stderr"
return None, f"ffmpeg exited {result.returncode} ({last_line[:160]})"
final_size = os.path.getsize(outpath)
if final_size > MAX_FILE_SIZE:
final_mb = final_size // (1024 * 1024)
log.warning("Re-encoded video still too large: %d MB", final_mb)
return None, f"output still {final_mb} MB after re-encode (duration {int(duration)}s)"
log.info("Re-encoded video from %d MB to %d MB",
os.path.getsize(input_file) // (1024 * 1024), final_size // (1024 * 1024))
return outpath, ""
def _has_audio_stream(path: str) -> bool:
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a",
"-show_entries", "stream=index", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15,
)
except Exception:
return True # assume yes; ffmpeg will fail loudly if it's wrong
return bool(result.stdout.strip())
def _audio_sample_rate(path: str) -> int | None:
try:
r = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "stream=sample_rate", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15,
)
except Exception:
return None
out = r.stdout.strip()
try:
return int(out) if out else None
except ValueError:
return None
class SpeedCommand(Command):
SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE)
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
text = (c.message.text or "").strip()
m = self.SPEED_RE.match(text)
if not m:
return
speed_arg = m.group(1)
if speed_arg is None:
speed = 2.0
else:
try:
speed = float(speed_arg.rstrip("xX"))
except ValueError:
await c.reply(f"`{speed_arg}` isn't a number. Try `/speed 2`.")
return
if not (0.1 <= speed <= 100.0):
await c.reply(f"Speed must be between 0.1 and 100 (got {speed:g}).")
return
if speed == 1.0:
await c.reply("1x wouldn't change anything.")
return
b64 = _get_video(c.message.group)
if not b64:
await c.reply("No video to speed up.")
return
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "sped.mp4")
with open(inpath, "wb") as f:
f.write(base64.b64decode(b64))
cmd = [
"ffmpeg", "-y", "-i", inpath,
"-filter:v", f"setpts={1.0/speed:g}*PTS",
]
if _has_audio_stream(inpath):
# asetrate scales the sample rate (which shifts pitch AND
# tempo, the tape-speed effect); aresample brings the data
# rate back to a standard playback rate without undoing it.
rate = _audio_sample_rate(inpath) or 48000
cmd += [
"-filter:a",
f"asetrate={int(rate * speed)},aresample={rate}",
]
else:
cmd += ["-an"]
cmd += ["-preset", "fast", "-movflags", "+faststart", outpath]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=180,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out speeding up video")
await c.reply("Timed out speeding up that video.")
return
if result.returncode != 0:
log.warning("ffmpeg /speed failed: %s", result.stderr[-500:])
await c.reply("Failed to speed up that video.")
return
final_file = outpath
size = os.path.getsize(final_file)
if size > MAX_FILE_SIZE:
size_mb = size // (1024 * 1024)
log.info("Sped-up video is %d MB, re-encoding to fit", size_mb)
new_path, reason = _reencode(final_file, tmpdir)
if new_path is None:
await c.reply(
f"Sped-up video is too large ({size_mb} MB) and "
f"re-encoding failed: {reason}."
)
return
final_file = new_path
with open(final_file, "rb") as f:
sped_bytes = f.read()
b64_sped = base64.b64encode(sped_bytes).decode("utf-8")
_set_video(c.message.group, b64_sped)
await c.send("", base64_attachments=[b64_sped])
class ReverseCommand(Command):
@triggered("/rev")
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
b64 = _get_video(c.message.group)
if not b64:
await c.reply("No video to reverse.")
return
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "reversed.mp4")
with open(inpath, "wb") as f:
f.write(base64.b64decode(b64))
try:
result = subprocess.run(
[
"ffmpeg", "-i", inpath,
"-vf", "reverse",
"-af", "areverse",
"-preset", "fast",
outpath,
],
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out reversing video")
await c.reply("Timed out reversing that video.")
return
if result.returncode != 0:
log.warning("ffmpeg failed: %s", result.stderr)
await c.reply("Failed to reverse that video.")
return
with open(outpath, "rb") as f:
reversed_bytes = f.read()
b64_reversed = base64.b64encode(reversed_bytes).decode("utf-8")
_set_video(c.message.group, b64_reversed)
await c.send("", base64_attachments=[b64_reversed])
def _sender_number(msg) -> str | None:
for attr in ("source", "source_number", "sourceNumber"):
v = getattr(msg, attr, None)
if v:
return v
try:
env = json.loads(msg.raw_message)["envelope"]
return env.get("source") or env.get("sourceNumber")
except Exception:
return None
class CookiesCommand(Command):
async def handle(self, c: Context) -> None:
text = c.message.text or ""
if not text.startswith("/cookies"):
return
if c.message.is_group():
return
sender = _sender_number(c.message)
if not ADMIN_NUMBERS or sender not in ADMIN_NUMBERS:
log.warning("Refused /cookies from %r (admins=%s)", sender, ADMIN_NUMBERS or "<unset>")
return
body = text.split("\n", 1)[1] if "\n" in text else ""
cookie_re = re.compile(r"^\.?instagram\.com\b")
ig_lines = []
for line in body.splitlines():
if not cookie_re.match(line):
continue
# Browser pastes can replace tabs with runs of spaces; restore tabs.
normalized = re.sub(r"[ \t]{2,}", "\t", line)
ig_lines.append(normalized)
if not ig_lines:
await c.reply("No `.instagram.com` cookie lines found.")
return
if not any("\tsessionid\t" in ln for ln in ig_lines):
await c.reply("Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.")
return
try:
with open(COOKIES, "r") as f:
existing = f.read().splitlines()
except FileNotFoundError:
existing = ["# Netscape HTTP Cookie File", ""]
kept = [ln for ln in existing if not cookie_re.match(ln)]
new_content = "\n".join(kept + ig_lines) + "\n"
tmppath = COOKIES + ".tmp"
with open(tmppath, "w") as f:
f.write(new_content)
os.chmod(tmppath, 0o600)
os.replace(tmppath, COOKIES)
log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender)
await c.reply(f"Installed {len(ig_lines)} Instagram cookies.")
def main():
phone_number = os.environ.get("SIGNAL_PHONE_NUMBER")
signal_service = os.environ.get("SIGNAL_SERVICE", "127.0.0.1:8080")
if not phone_number:
print("Error: SIGNAL_PHONE_NUMBER environment variable is required.")
print("Example: export SIGNAL_PHONE_NUMBER='+15551234567'")
raise SystemExit(1)
bot = SignalBot({
"signal_service": signal_service,
"phone_number": phone_number,
})
bot.register(VideoTracker(), contacts=False, groups=True)
bot.register(VideoCommand(), contacts=False, groups=True)
bot.register(ReverseCommand(), contacts=False, groups=True)
bot.register(SpeedCommand(), contacts=False, groups=True)
bot.register(CookiesCommand(), contacts=True, groups=False)
log.info("Starting Signal video bot...")
bot.start()
if __name__ == "__main__":
main()