Files
signal-bot/bot.py
James Price 26a5ecb2d0 Add /clip duration override and /help command
/clip <seconds> in a link's message overrides the default 60s clip
window (capped at 600s); with a ?t= it sets the window length, without
one it clips from the start. /help lists every command with examples.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 16:35:28 -04:00

665 lines
25 KiB
Python

import base64
import json
import logging
import os
import re
import subprocess
import tempfile
import time
from signalbot import Command, Context, SignalBot
from signalbot.command import regex_triggered, triggered
from signalbot.message import MessageType
TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/.+/status/\d+"
INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+"
YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+"
TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)"
VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})"
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
CLIP_DURATION = 60 # default seconds to grab around a shared ?t= timestamp
MAX_CLIP_DURATION = 600 # ceiling for a user-supplied /clip override
YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp")
COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt")
ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()}
VIDEO_CONTENT_TYPES = ("video/mp4", "video/webm", "video/quicktime", "video/3gpp", "video/mpeg")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("signal-bot")
# group_id -> {"b64": ..., "time": ...}
last_video = {}
VIDEO_TTL = 3600 # 1 hour
# (group_id, url) -> monotonic time the bot started handling this URL.
# When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE
# with the same text — without this guard the bot re-downloads and re-posts the video.
recent_urls = {}
RECENT_URL_TTL = 600 # 10 min
def _set_video(group_id, b64):
last_video[group_id] = {"b64": b64, "time": time.monotonic()}
def _get_video(group_id):
entry = last_video.get(group_id)
if not entry:
return None
if time.monotonic() - entry["time"] > VIDEO_TTL:
del last_video[group_id]
return None
return entry["b64"]
def _url_recently_handled(group_id, url):
key = (group_id, url)
t = recent_urls.get(key)
if t is None:
return False
if time.monotonic() - t > RECENT_URL_TTL:
del recent_urls[key]
return False
return True
def _mark_url_handled(group_id, url):
recent_urls[(group_id, url)] = time.monotonic()
# Errors that mean "the link simply has no downloadable video" rather than a
# genuine failure. We stay silent for these — the bot watches every message with
# a link, so most links legitimately have no video and shouldn't draw a complaint.
_NO_MEDIA_ERROR_PATTERNS = (
"no video could be found",
"there's no video",
"no media found",
"no video formats found",
"unsupported url",
)
def _is_no_media_error(err: str) -> bool:
e = err.lower()
return any(p in e for p in _NO_MEDIA_ERROR_PATTERNS)
def _summarize_ytdlp_error(stderr: str) -> str:
"""Pull a short, user-readable reason out of yt-dlp stderr."""
if not stderr:
return "unknown error"
error_lines = [ln.strip() for ln in stderr.splitlines() if ln.startswith("ERROR:")]
if error_lines:
msg = error_lines[-1][len("ERROR:"):].strip()
# Strip "[extractor] video_id:" prefix yt-dlp prepends.
msg = re.sub(r"^\[[^\]]+\]\s+\S+?:\s*", "", msg)
# Trim verbose "Use --cookies..." tails that aren't useful to a chat user.
msg = re.split(r"\s+(?:Use --cookies|See https?://)", msg, maxsplit=1)[0]
return msg[:240].rstrip(". ")
lines = [ln.strip() for ln in stderr.splitlines() if ln.strip()]
return lines[-1][:240] if lines else "unknown error"
def _parse_timestamp(value: str) -> int | None:
"""Parse a YouTube timestamp into seconds.
Accepts plain seconds ("1509", "90s") and the h/m/s form ("25m9s",
"1h2m3s"). Returns None for anything that isn't a recognizable time.
"""
if not value:
return None
m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s?)?", value.strip(), re.IGNORECASE)
if not m or not any(m.groups()):
return None
h, mi, s = (int(g) if g else 0 for g in m.groups())
return h * 3600 + mi * 60 + s
def _extract_timestamp(url_token: str) -> int | None:
"""Pull the start offset (seconds) out of a YouTube URL's t=/start= param."""
m = re.search(r"[?&#](?:t|start)=([0-9hms]+)", url_token, re.IGNORECASE)
return _parse_timestamp(m.group(1)) if m else None
class VideoTracker(Command):
"""Watches all group messages for video attachments and stores the last one."""
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
if not c.message.base64_attachments:
return
# Check raw message for video content types
try:
raw = json.loads(c.message.raw_message)
envelope = raw["envelope"]
data = envelope.get("dataMessage") or envelope.get("syncMessage", {}).get("sentMessage", {})
attachments = data.get("attachments", [])
except Exception:
return
for i, att in enumerate(attachments):
content_type = att.get("contentType", "")
if content_type.startswith("video/"):
if i < len(c.message.base64_attachments):
_set_video(c.message.group, c.message.base64_attachments[i])
log.info("Stored received video for group %s", c.message.group)
return
class VideoCommand(Command):
@regex_triggered(VIDEO_URL_PATTERN)
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
matches = list(re.finditer(VIDEO_URL_PATTERN, c.message.text))
if not matches:
return
# An optional "/clip <seconds>" anywhere in the message overrides the
# default window length for any clip produced from this message.
clip_len = CLIP_DURATION
mclip = re.search(r"/clip\s+(\S+)", c.message.text, re.IGNORECASE)
if mclip:
secs = _parse_timestamp(mclip.group(1))
if secs is None or secs < 1:
await c.reply("`/clip` needs a length in seconds, e.g. `/clip 30`.")
return
clip_len = min(secs, MAX_CLIP_DURATION)
is_edit = c.message.type == MessageType.EDIT_MESSAGE
for m in matches:
url = m.group(0)
# The URL pattern stops at the video id, so any ?t=/&t= timestamp
# lives in the characters that follow. Grab the whole whitespace-
# delimited token to recover it. Timestamps/clips apply to YouTube.
clip = None
if re.match(YOUTUBE_URL_PATTERN, url):
token = re.match(r"\S+", c.message.text[m.start():]).group(0)
start = _extract_timestamp(token)
if mclip:
# Explicit /clip clips even without a timestamp (from 0).
start = start or 0
clip = (start, start + clip_len)
elif start is not None:
clip = (start, start + clip_len)
# Normalize fxtwitter/vxtwitter wrappers to x.com
url = re.sub(
r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)",
"https://x.com",
url,
)
if is_edit and _url_recently_handled(c.message.group, url):
log.info("Skipping edited message; already handled %s", url)
continue
_mark_url_handled(c.message.group, url)
await self._download_and_send(c, url, clip)
async def _download_and_send(self, c: Context, url: str, clip: tuple[int, int] | None = None) -> None:
if clip is not None:
log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1])
with tempfile.TemporaryDirectory() as tmpdir:
outpath = os.path.join(tmpdir, "video.mp4")
ok, err = self._run_ytdlp(url, outpath, tmpdir, clip)
if not ok:
if _is_no_media_error(err):
# Link just has no video (e.g. a text-only tweet). Stay quiet.
log.info("No video at %s (%s); staying silent", url, err)
return
await c.reply(f"Couldn't grab that video: {err}")
return
# yt-dlp may produce a slightly different filename
actual_file = None
for f in os.listdir(tmpdir):
if f.endswith(".mp4"):
actual_file = os.path.join(tmpdir, f)
break
if actual_file is None:
log.warning("No mp4 file found after yt-dlp for %s", url)
await c.reply("yt-dlp finished but produced no mp4.")
return
file_size = os.path.getsize(actual_file)
if file_size > MAX_FILE_SIZE:
size_mb = file_size // (1024 * 1024)
log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024))
new_path, reason = _reencode(actual_file, tmpdir)
if new_path is None:
await c.reply(f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.")
return
actual_file = new_path
with open(actual_file, "rb") as f:
video_bytes = f.read()
b64_video = base64.b64encode(video_bytes).decode("utf-8")
_set_video(c.message.group, b64_video)
await c.send("", base64_attachments=[b64_video])
def _run_ytdlp(self, url: str, outpath: str, tmpdir: str,
clip: tuple[int, int] | None = None) -> tuple[bool, str]:
"""Run yt-dlp with retries. Returns (success, short_reason).
reason is empty on success; otherwise a one-line description suitable for user reply.
When clip is (start, end), only that window is downloaded."""
cmd = [
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
]
if clip is not None:
start, end = clip
# Download only the requested window instead of the whole video —
# essential for hour-long uploads shared with a ?t= timestamp.
# force-keyframes-at-cuts makes the start boundary accurate.
# The filesize filters below key off the *whole* video's size, which
# is irrelevant to a 60s slice, so drop them and just take best mp4.
cmd += [
"--download-sections", f"*{start}-{end}",
"--force-keyframes-at-cuts",
"-f", "best[ext=mp4]/best",
]
else:
# Prefer the largest mp4 that already fits under 95 MB, so we avoid
# re-encoding when a smaller variant exists (e.g. a 4K rendition
# >100 MB alongside a 1080p ~50 MB).
cmd += [
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
]
cmd += [
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
url,
]
delays = [0, 3, 8]
last_stderr = ""
for attempt, delay in enumerate(delays, 1):
if delay:
time.sleep(delay)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
cwd=tmpdir,
)
except subprocess.TimeoutExpired:
# Don't retry timeouts — three 120s timeouts would block the consumer for 6 min.
log.warning("yt-dlp timed out for %s", url)
return False, "yt-dlp timed out after 120s"
if result.returncode == 0:
return True, ""
last_stderr = result.stderr
log.warning(
"yt-dlp failed for %s (attempt %d/%d): %s",
url, attempt, len(delays), last_stderr.strip()[-300:],
)
return False, _summarize_ytdlp_error(last_stderr)
def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
"""Re-encode video with ffmpeg to fit under MAX_FILE_SIZE.
Returns (path, reason). On success, path is set and reason is "".
On failure, path is None and reason is a short human-readable cause.
"""
outpath = os.path.join(tmpdir, "reencoded.mp4")
try:
probe = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", input_file],
capture_output=True, text=True, timeout=30,
)
duration = float(probe.stdout.strip())
except Exception:
log.warning("Could not probe video duration")
return None, "could not read video duration"
# Target 95 MB to leave headroom
target_bytes = 95 * 1024 * 1024
# Total bitrate in kbps; reserve 128k for audio
audio_bitrate = 128
total_bitrate = int((target_bytes * 8) / duration / 1000)
video_bitrate = max(total_bitrate - audio_bitrate, 200)
try:
result = subprocess.run(
[
"ffmpeg", "-y", "-i", input_file,
"-c:v", "libx264", "-b:v", f"{video_bitrate}k",
"-c:a", "aac", "-b:a", f"{audio_bitrate}k",
"-preset", "fast",
"-movflags", "+faststart",
outpath,
],
capture_output=True, text=True, timeout=300,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg re-encode timed out")
return None, "ffmpeg timed out after 300s"
if result.returncode != 0:
log.warning("ffmpeg re-encode failed: %s", result.stderr[-500:])
stderr_lines = [ln for ln in result.stderr.strip().splitlines() if ln.strip()]
last_line = stderr_lines[-1] if stderr_lines else "no stderr"
return None, f"ffmpeg exited {result.returncode} ({last_line[:160]})"
final_size = os.path.getsize(outpath)
if final_size > MAX_FILE_SIZE:
final_mb = final_size // (1024 * 1024)
log.warning("Re-encoded video still too large: %d MB", final_mb)
return None, f"output still {final_mb} MB after re-encode (duration {int(duration)}s)"
log.info("Re-encoded video from %d MB to %d MB",
os.path.getsize(input_file) // (1024 * 1024), final_size // (1024 * 1024))
return outpath, ""
def _has_audio_stream(path: str) -> bool:
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a",
"-show_entries", "stream=index", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15,
)
except Exception:
return True # assume yes; ffmpeg will fail loudly if it's wrong
return bool(result.stdout.strip())
def _audio_sample_rate(path: str) -> int | None:
try:
r = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "stream=sample_rate", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15,
)
except Exception:
return None
out = r.stdout.strip()
try:
return int(out) if out else None
except ValueError:
return None
class SpeedCommand(Command):
SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE)
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
text = (c.message.text or "").strip()
m = self.SPEED_RE.match(text)
if not m:
return
speed_arg = m.group(1)
if speed_arg is None:
speed = 2.0
else:
try:
speed = float(speed_arg.rstrip("xX"))
except ValueError:
await c.reply(f"`{speed_arg}` isn't a number. Try `/speed 2`.")
return
if not (0.1 <= speed <= 100.0):
await c.reply(f"Speed must be between 0.1 and 100 (got {speed:g}).")
return
if speed == 1.0:
await c.reply("1x wouldn't change anything.")
return
b64 = _get_video(c.message.group)
if not b64:
await c.reply("No video to speed up.")
return
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "sped.mp4")
with open(inpath, "wb") as f:
f.write(base64.b64decode(b64))
cmd = [
"ffmpeg", "-y", "-i", inpath,
"-filter:v", f"setpts={1.0/speed:g}*PTS",
]
if _has_audio_stream(inpath):
# asetrate scales the sample rate (which shifts pitch AND
# tempo, the tape-speed effect); aresample brings the data
# rate back to a standard playback rate without undoing it.
rate = _audio_sample_rate(inpath) or 48000
cmd += [
"-filter:a",
f"asetrate={int(rate * speed)},aresample={rate}",
]
else:
cmd += ["-an"]
cmd += ["-preset", "fast", "-movflags", "+faststart", outpath]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=180,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out speeding up video")
await c.reply("Timed out speeding up that video.")
return
if result.returncode != 0:
log.warning("ffmpeg /speed failed: %s", result.stderr[-500:])
await c.reply("Failed to speed up that video.")
return
final_file = outpath
size = os.path.getsize(final_file)
if size > MAX_FILE_SIZE:
size_mb = size // (1024 * 1024)
log.info("Sped-up video is %d MB, re-encoding to fit", size_mb)
new_path, reason = _reencode(final_file, tmpdir)
if new_path is None:
await c.reply(
f"Sped-up video is too large ({size_mb} MB) and "
f"re-encoding failed: {reason}."
)
return
final_file = new_path
with open(final_file, "rb") as f:
sped_bytes = f.read()
b64_sped = base64.b64encode(sped_bytes).decode("utf-8")
_set_video(c.message.group, b64_sped)
await c.send("", base64_attachments=[b64_sped])
class ReverseCommand(Command):
@triggered("/rev")
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
b64 = _get_video(c.message.group)
if not b64:
await c.reply("No video to reverse.")
return
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "reversed.mp4")
with open(inpath, "wb") as f:
f.write(base64.b64decode(b64))
try:
result = subprocess.run(
[
"ffmpeg", "-i", inpath,
"-vf", "reverse",
"-af", "areverse",
"-preset", "fast",
outpath,
],
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out reversing video")
await c.reply("Timed out reversing that video.")
return
if result.returncode != 0:
log.warning("ffmpeg failed: %s", result.stderr)
await c.reply("Failed to reverse that video.")
return
with open(outpath, "rb") as f:
reversed_bytes = f.read()
b64_reversed = base64.b64encode(reversed_bytes).decode("utf-8")
_set_video(c.message.group, b64_reversed)
await c.send("", base64_attachments=[b64_reversed])
HELP_TEXT = f"""🎬 Video bot — what I can do
Share a video link (X/Twitter, Instagram, YouTube, TikTok) and I'll post the video back to the group.
e.g. https://x.com/user/status/123456789
A YouTube link with a timestamp → I post a {CLIP_DURATION}s clip starting at that moment.
e.g. https://youtu.be/dQw4w9WgXcQ?t=90
/clip <seconds> — set the clip length for a link in the same message (max {MAX_CLIP_DURATION}s). With a ?t= it sets the window; without one it clips from the start.
e.g. /clip 30 https://youtu.be/dQw4w9WgXcQ?t=90
e.g. /clip 15 https://youtu.be/dQw4w9WgXcQ
/speed [factor] — speed up the last video (default 2x).
e.g. /speed /speed 4 /speed 0.5
/rev — reverse the last video.
/help — show this message.
(In a DM, admins can run /cookies to refresh Instagram login cookies.)"""
class HelpCommand(Command):
async def handle(self, c: Context) -> None:
if not c.message.is_group():
return
if (c.message.text or "").strip().lower() not in ("/help", "/commands"):
return
await c.reply(HELP_TEXT)
def _sender_number(msg) -> str | None:
for attr in ("source", "source_number", "sourceNumber"):
v = getattr(msg, attr, None)
if v:
return v
try:
env = json.loads(msg.raw_message)["envelope"]
return env.get("source") or env.get("sourceNumber")
except Exception:
return None
class CookiesCommand(Command):
async def handle(self, c: Context) -> None:
text = c.message.text or ""
if not text.startswith("/cookies"):
return
if c.message.is_group():
return
sender = _sender_number(c.message)
if not ADMIN_NUMBERS or sender not in ADMIN_NUMBERS:
log.warning("Refused /cookies from %r (admins=%s)", sender, ADMIN_NUMBERS or "<unset>")
return
body = text.split("\n", 1)[1] if "\n" in text else ""
cookie_re = re.compile(r"^\.?instagram\.com\b")
ig_lines = []
for line in body.splitlines():
if not cookie_re.match(line):
continue
# Browser pastes can replace tabs with runs of spaces; restore tabs.
normalized = re.sub(r"[ \t]{2,}", "\t", line)
ig_lines.append(normalized)
if not ig_lines:
await c.reply("No `.instagram.com` cookie lines found.")
return
if not any("\tsessionid\t" in ln for ln in ig_lines):
await c.reply("Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.")
return
try:
with open(COOKIES, "r") as f:
existing = f.read().splitlines()
except FileNotFoundError:
existing = ["# Netscape HTTP Cookie File", ""]
kept = [ln for ln in existing if not cookie_re.match(ln)]
new_content = "\n".join(kept + ig_lines) + "\n"
tmppath = COOKIES + ".tmp"
with open(tmppath, "w") as f:
f.write(new_content)
os.chmod(tmppath, 0o600)
os.replace(tmppath, COOKIES)
log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender)
await c.reply(f"Installed {len(ig_lines)} Instagram cookies.")
def main():
phone_number = os.environ.get("SIGNAL_PHONE_NUMBER")
signal_service = os.environ.get("SIGNAL_SERVICE", "127.0.0.1:8080")
if not phone_number:
print("Error: SIGNAL_PHONE_NUMBER environment variable is required.")
print("Example: export SIGNAL_PHONE_NUMBER='+15551234567'")
raise SystemExit(1)
bot = SignalBot({
"signal_service": signal_service,
"phone_number": phone_number,
})
bot.register(VideoTracker(), contacts=False, groups=True)
bot.register(VideoCommand(), contacts=False, groups=True)
bot.register(ReverseCommand(), contacts=False, groups=True)
bot.register(SpeedCommand(), contacts=False, groups=True)
bot.register(HelpCommand(), contacts=False, groups=True)
bot.register(CookiesCommand(), contacts=True, groups=False)
log.info("Starting Signal video bot...")
bot.start()
if __name__ == "__main__":
main()