Fix bug pass: async offload, resource leaks, media/parse correctness, deploy hardening

bot.py:
- Run all yt-dlp/ffmpeg/ffprobe/file-IO/base64 work off the event loop via
  asyncio.to_thread, bounded by a Semaphore(2); the loop no longer freezes
  bot-wide during downloads/encodes.
- download_attachments=False; VideoTracker fetches only video attachments lazily
  instead of the library base64-ing every attachment of every message.
- last_video keyed per (group, sender) with group-latest fallback so /speed and
  /rev stop silently targeting a stranger's video; proactive TTL sweeps free the
  big base64 blobs and bound recent_urls growth.
- _reencode guards 0/NaN ffprobe duration and adds -maxrate/-bufsize; /rev now
  size-checks + re-encodes + faststart like the other paths.
- Twitter URL regex no longer merges space-separated links (keeps i/web/status);
  dedupe repeated URLs within a message; mark-handled only on success/no-media so
  a corrective edit can retry; 'unsupported url' surfaced instead of silently dropped.
- All sends wrapped (catch SendMessageError); base64 decode guarded; edit/sync
  attachment envelopes handled; /cookies temp file created 0600.

deploy:
- Pin signalbot==1.1.0, yt-dlp floor, add missing yt-dlp-ejs.
- Pin signal-cli-rest-api by digest + add /v1/health healthcheck.
- Restart=always (the library never exits non-zero when wedged).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 18:43:41 -04:00
parent 26a5ecb2d0
commit c7eb101ff0
4 changed files with 439 additions and 265 deletions
+421 -260
View File
@@ -1,4 +1,6 @@
import asyncio
import base64 import base64
import binascii
import json import json
import logging import logging
import os import os
@@ -11,7 +13,11 @@ from signalbot import Command, Context, SignalBot
from signalbot.command import regex_triggered, triggered from signalbot.command import regex_triggered, triggered
from signalbot.message import MessageType from signalbot.message import MessageType
TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/.+/status/\d+" # Path segments before /status/ are [^/\s]+ (not .+): a greedy .+ matched across
# spaces, merging two space-separated tweet links into one malformed URL. The
# (?:/[^/\s]+)* allows multi-segment forms like x.com/i/web/status/<id> while
# still refusing whitespace (so two links can't merge).
TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/[^/\s]+(?:/[^/\s]+)*/status/\d+"
INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+" INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+"
YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+" YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+"
TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)"
@@ -19,67 +25,120 @@ VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
CLIP_DURATION = 60 # default seconds to grab around a shared ?t= timestamp CLIP_DURATION = 60 # default seconds to grab around a shared ?t= timestamp
MAX_CLIP_DURATION = 600 # ceiling for a user-supplied /clip override MAX_CLIP_DURATION = 600 # ceiling for a user-supplied /clip override
# Cap simultaneous heavy media jobs (yt-dlp/ffmpeg) so a busy group can't spawn
# unbounded subprocesses. Each job runs in a worker thread (see _run_blocking).
MAX_CONCURRENT_JOBS = 2
YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp")
COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt")
ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()}
VIDEO_CONTENT_TYPES = ("video/mp4", "video/webm", "video/quicktime", "video/3gpp", "video/mpeg")
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s", format="%(asctime)s [%(levelname)s] %(message)s",
) )
log = logging.getLogger("signal-bot") log = logging.getLogger("signal-bot")
# group_id -> {"b64": ..., "time": ...} # (group_id, sender) -> {"b64": ..., "time": monotonic}. Keyed per sender so a
# stranger's later video doesn't silently become the target of your /speed or
# /rev; _get_video falls back to the group's latest video when you have none.
last_video = {} last_video = {}
VIDEO_TTL = 3600 # 1 hour VIDEO_TTL = 3600 # 1 hour
# (group_id, url) -> monotonic time the bot started handling this URL. # (group_id, url) -> monotonic time the URL was successfully handled.
# When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE # When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE
# with the same text — without this guard the bot re-downloads and re-posts the video. # with the same text — without this guard the bot re-downloads and re-posts the video.
recent_urls = {} recent_urls = {}
RECENT_URL_TTL = 600 # 10 min RECENT_URL_TTL = 600 # 10 min
# (group_id, url) currently being downloaded — guards against an edit redelivery
# arriving mid-download (before recent_urls is set) triggering a second download.
_inflight = set()
def _set_video(group_id, b64): # Lazily created so it binds to the bot's running event loop, not import time.
last_video[group_id] = {"b64": b64, "time": time.monotonic()} _job_semaphore = None
def _get_video(group_id): def _get_job_semaphore() -> asyncio.Semaphore:
entry = last_video.get(group_id) global _job_semaphore
if not entry: if _job_semaphore is None:
return None _job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)
if time.monotonic() - entry["time"] > VIDEO_TTL: return _job_semaphore
del last_video[group_id]
return None
return entry["b64"]
def _url_recently_handled(group_id, url): def _sweep_videos() -> None:
key = (group_id, url) now = time.monotonic()
t = recent_urls.get(key) for key in [k for k, e in last_video.items() if now - e["time"] > VIDEO_TTL]:
if t is None: del last_video[key]
return False
if time.monotonic() - t > RECENT_URL_TTL:
def _set_video(group_id, sender, b64) -> None:
_sweep_videos()
last_video[(group_id, sender)] = {"b64": b64, "time": time.monotonic()}
def _get_video(group_id, sender):
"""The caller's own most recent video, else the group's latest non-expired one."""
_sweep_videos()
entry = last_video.get((group_id, sender))
if entry:
return entry["b64"]
best = None
for (g, _s), e in last_video.items():
if g != group_id:
continue
if best is None or e["time"] > best["time"]:
best = e
return best["b64"] if best else None
def _sweep_recent_urls() -> None:
now = time.monotonic()
for key in [k for k, t in recent_urls.items() if now - t > RECENT_URL_TTL]:
del recent_urls[key] del recent_urls[key]
return False
return True
def _mark_url_handled(group_id, url): def _url_recently_handled(group_id, url) -> bool:
_sweep_recent_urls()
return (group_id, url) in recent_urls
def _url_busy(group_id, url) -> bool:
return (group_id, url) in _inflight or _url_recently_handled(group_id, url)
def _mark_url_handled(group_id, url) -> None:
_sweep_recent_urls()
recent_urls[(group_id, url)] = time.monotonic() recent_urls[(group_id, url)] = time.monotonic()
async def _safe_reply(c: Context, text: str) -> None:
try:
await c.reply(text)
except Exception as e: # noqa: BLE001
log.warning("Failed to send reply: %s", e)
async def _safe_send_video(c: Context, b64: str) -> bool:
try:
await c.send("", base64_attachments=[b64])
return True
except Exception as e: # noqa: BLE001
log.warning("Failed to send video: %s", e)
await _safe_reply(c, "Couldn't send that video (Signal may have rejected it).")
return False
# Errors that mean "the link simply has no downloadable video" rather than a # Errors that mean "the link simply has no downloadable video" rather than a
# genuine failure. We stay silent for these — the bot watches every message with # genuine failure. We stay silent for these — the bot watches every message with
# a link, so most links legitimately have no video and shouldn't draw a complaint. # a link, so most links legitimately have no video and shouldn't draw a complaint.
# NOTE: "unsupported url" is deliberately NOT here. Every URL that reaches yt-dlp
# already matched one of our platform patterns, so "Unsupported URL" means a real
# extractor/version break worth surfacing, not a quiet no-media link.
_NO_MEDIA_ERROR_PATTERNS = ( _NO_MEDIA_ERROR_PATTERNS = (
"no video could be found", "no video could be found",
"there's no video", "there's no video",
"no media found", "no media found",
"no video formats found", "no video formats found",
"unsupported url",
) )
@@ -125,30 +184,54 @@ def _extract_timestamp(url_token: str) -> int | None:
return _parse_timestamp(m.group(1)) if m else None return _parse_timestamp(m.group(1)) if m else None
def _message_data(envelope: dict) -> dict:
"""Find the data_message dict (which carries attachments) inside a raw envelope.
Mirrors signalbot's own extraction so we also see attachments delivered via
edit envelopes and synced (linked-device) messages, not just plain dataMessage.
"""
if "dataMessage" in envelope:
return envelope["dataMessage"] or {}
if "editMessage" in envelope:
return envelope["editMessage"].get("dataMessage") or {}
if "syncMessage" in envelope:
sent = envelope["syncMessage"].get("sentMessage") or {}
if "editMessage" in sent:
return sent["editMessage"].get("dataMessage") or {}
return sent
return {}
class VideoTracker(Command): class VideoTracker(Command):
"""Watches all group messages for video attachments and stores the last one.""" """Watches group messages for video attachments and stores the last one per sender."""
async def handle(self, c: Context) -> None: async def handle(self, c: Context) -> None:
if not c.message.is_group(): if not c.message.is_group():
return return
if not c.message.base64_attachments:
return
# Check raw message for video content types
try: try:
raw = json.loads(c.message.raw_message) envelope = json.loads(c.message.raw_message)["envelope"]
envelope = raw["envelope"] except Exception: # noqa: BLE001
data = envelope.get("dataMessage") or envelope.get("syncMessage", {}).get("sentMessage", {})
attachments = data.get("attachments", [])
except Exception:
return return
for i, att in enumerate(attachments): attachments = _message_data(envelope).get("attachments") or []
content_type = att.get("contentType", "") att = next(
if content_type.startswith("video/"): (a for a in attachments if str(a.get("contentType", "")).startswith("video/")),
if i < len(c.message.base64_attachments): None,
_set_video(c.message.group, c.message.base64_attachments[i]) )
log.info("Stored received video for group %s", c.message.group) if not att or not att.get("id"):
return return
# download_attachments is disabled globally, so fetch only this video on
# demand instead of letting the library base64 every attachment of every
# message on the producer loop.
try:
b64 = await c.bot._signal.get_attachment(att["id"])
except Exception as e: # noqa: BLE001
log.warning("Failed to fetch video attachment %s: %s", att.get("id"), e)
return
_set_video(c.message.group, _sender_number(c.message), b64)
log.info("Stored received video for group %s", c.message.group)
class VideoCommand(Command): class VideoCommand(Command):
@@ -157,23 +240,31 @@ class VideoCommand(Command):
if not c.message.is_group(): if not c.message.is_group():
return return
matches = list(re.finditer(VIDEO_URL_PATTERN, c.message.text)) text = c.message.text or ""
matches = list(re.finditer(VIDEO_URL_PATTERN, text))
if not matches: if not matches:
return return
# An optional "/clip <seconds>" anywhere in the message overrides the # An optional "/clip <seconds>" anywhere in the message overrides the
# default window length for any clip produced from this message. # default window length for any clip produced from this message.
clip_len = CLIP_DURATION clip_len = CLIP_DURATION
mclip = re.search(r"/clip\s+(\S+)", c.message.text, re.IGNORECASE) mclip = re.search(r"/clip\s+(\S+)", text, re.IGNORECASE)
if mclip: if mclip:
secs = _parse_timestamp(mclip.group(1)) secs = _parse_timestamp(mclip.group(1))
if secs is None or secs < 1: if secs is None or secs < 1:
await c.reply("`/clip` needs a length in seconds, e.g. `/clip 30`.") await _safe_reply(c, "`/clip` needs a length in seconds, e.g. `/clip 30`.")
return return
clip_len = min(secs, MAX_CLIP_DURATION) clip_len = min(secs, MAX_CLIP_DURATION)
is_edit = c.message.type == MessageType.EDIT_MESSAGE is_edit = c.message.type == MessageType.EDIT_MESSAGE
group = c.message.group
sender = _sender_number(c.message)
# Process URLs sequentially in message order. Each download already runs
# off the event loop (asyncio.to_thread), so this doesn't block the bot,
# and sequential keeps last_video deterministic (the last link wins) and
# posts in order. `seen` drops a URL repeated within one message.
seen = set()
for m in matches: for m in matches:
url = m.group(0) url = m.group(0)
@@ -182,8 +273,8 @@ class VideoCommand(Command):
# delimited token to recover it. Timestamps/clips apply to YouTube. # delimited token to recover it. Timestamps/clips apply to YouTube.
clip = None clip = None
if re.match(YOUTUBE_URL_PATTERN, url): if re.match(YOUTUBE_URL_PATTERN, url):
token = re.match(r"\S+", c.message.text[m.start():]).group(0) tok = re.match(r"\S+", text[m.start():])
start = _extract_timestamp(token) start = _extract_timestamp(tok.group(0) if tok else url)
if mclip: if mclip:
# Explicit /clip clips even without a timestamp (from 0). # Explicit /clip clips even without a timestamp (from 0).
start = start or 0 start = start or 0
@@ -198,122 +289,153 @@ class VideoCommand(Command):
url, url,
) )
if is_edit and _url_recently_handled(c.message.group, url): if url in seen:
continue
seen.add(url)
if is_edit and _url_busy(group, url):
log.info("Skipping edited message; already handled %s", url) log.info("Skipping edited message; already handled %s", url)
continue continue
_mark_url_handled(c.message.group, url) await self._process_one(c, group, sender, url, clip)
await self._download_and_send(c, url, clip)
async def _download_and_send(self, c: Context, url: str, clip: tuple[int, int] | None = None) -> None: async def _process_one(self, c: Context, group, sender, url, clip) -> None:
if clip is not None: key = (group, url)
log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1]) _inflight.add(key)
with tempfile.TemporaryDirectory() as tmpdir: try:
outpath = os.path.join(tmpdir, "video.mp4") if clip is not None:
ok, err = self._run_ytdlp(url, outpath, tmpdir, clip) log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1])
if not ok: async with _get_job_semaphore():
if _is_no_media_error(err): b64, err, silent = await asyncio.to_thread(_produce_video, url, clip)
# Link just has no video (e.g. a text-only tweet). Stay quiet.
log.info("No video at %s (%s); staying silent", url, err) if silent:
return # A genuine no-media determination: record it so an edit replay
await c.reply(f"Couldn't grab that video: {err}") # doesn't pointlessly re-run yt-dlp.
_mark_url_handled(group, url)
return
if err:
# Hard error: do NOT mark handled, so a corrective edit can retry.
await _safe_reply(c, f"Couldn't grab that video: {err}")
return return
# yt-dlp may produce a slightly different filename _mark_url_handled(group, url)
actual_file = None _set_video(group, sender, b64)
await _safe_send_video(c, b64)
except Exception as e: # noqa: BLE001
log.exception("Unexpected error handling %s: %s", url, e)
await _safe_reply(c, "Something went wrong handling that video.")
finally:
_inflight.discard(key)
def _run_ytdlp(url: str, outpath: str, tmpdir: str,
clip: tuple[int, int] | None = None) -> tuple[bool, str]:
"""Run yt-dlp with retries. Returns (success, short_reason).
reason is empty on success; otherwise a one-line description suitable for user reply.
When clip is (start, end), only that window is downloaded.
Blocking — call via asyncio.to_thread, never directly on the event loop."""
cmd = [
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
]
if clip is not None:
start, end = clip
# Download only the requested window instead of the whole video —
# essential for hour-long uploads shared with a ?t= timestamp.
# force-keyframes-at-cuts makes the start boundary accurate.
# The filesize filters below key off the *whole* video's size, which
# is irrelevant to a 60s slice, so drop them and just take best mp4.
cmd += [
"--download-sections", f"*{start}-{end}",
"--force-keyframes-at-cuts",
"-f", "best[ext=mp4]/best",
]
else:
# Prefer the largest mp4 that already fits under 95 MB, so we avoid
# re-encoding when a smaller variant exists (e.g. a 4K rendition
# >100 MB alongside a 1080p ~50 MB).
cmd += [
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
]
cmd += [
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
"--", url,
]
delays = [0, 3, 8]
last_stderr = ""
for attempt, delay in enumerate(delays, 1):
if delay:
time.sleep(delay)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
cwd=tmpdir,
)
except subprocess.TimeoutExpired:
# Don't retry timeouts — three 120s timeouts would block a worker for 6 min.
log.warning("yt-dlp timed out for %s", url)
return False, "yt-dlp timed out after 120s"
if result.returncode == 0:
return True, ""
last_stderr = result.stderr
log.warning(
"yt-dlp failed for %s (attempt %d/%d): %s",
url, attempt, len(delays), last_stderr.strip()[-300:],
)
return False, _summarize_ytdlp_error(last_stderr)
def _produce_video(url: str, clip: tuple[int, int] | None) -> tuple[str | None, str | None, bool]:
"""Blocking pipeline: download (+optional clip), re-encode if oversized, base64.
Returns (b64, error, silent):
success -> (b64, None, False)
no media -> (None, None, True) # stay quiet
failure -> (None, "reason", False)
Run via asyncio.to_thread — does all subprocess/file/base64 work off the loop."""
with tempfile.TemporaryDirectory() as tmpdir:
outpath = os.path.join(tmpdir, "video.mp4")
ok, err = _run_ytdlp(url, outpath, tmpdir, clip)
if not ok:
if _is_no_media_error(err):
log.info("No video at %s (%s); staying silent", url, err)
return None, None, True
return None, err, False
# Prefer the known output path; fall back to any mp4 yt-dlp produced.
actual_file = outpath if os.path.exists(outpath) else None
if actual_file is None:
for f in os.listdir(tmpdir): for f in os.listdir(tmpdir):
if f.endswith(".mp4"): if f.endswith(".mp4"):
actual_file = os.path.join(tmpdir, f) actual_file = os.path.join(tmpdir, f)
break break
if actual_file is None:
log.warning("No mp4 file found after yt-dlp for %s", url)
return None, "yt-dlp finished but produced no mp4.", False
if actual_file is None: file_size = os.path.getsize(actual_file)
log.warning("No mp4 file found after yt-dlp for %s", url) if file_size > MAX_FILE_SIZE:
await c.reply("yt-dlp finished but produced no mp4.") size_mb = file_size // (1024 * 1024)
return log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024))
new_path, reason = _reencode(actual_file, tmpdir)
if new_path is None:
return None, f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.", False
actual_file = new_path
file_size = os.path.getsize(actual_file) with open(actual_file, "rb") as f:
if file_size > MAX_FILE_SIZE: return base64.b64encode(f.read()).decode("utf-8"), None, False
size_mb = file_size // (1024 * 1024)
log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024))
new_path, reason = _reencode(actual_file, tmpdir)
if new_path is None:
await c.reply(f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.")
return
actual_file = new_path
with open(actual_file, "rb") as f:
video_bytes = f.read()
b64_video = base64.b64encode(video_bytes).decode("utf-8")
_set_video(c.message.group, b64_video)
await c.send("", base64_attachments=[b64_video])
def _run_ytdlp(self, url: str, outpath: str, tmpdir: str,
clip: tuple[int, int] | None = None) -> tuple[bool, str]:
"""Run yt-dlp with retries. Returns (success, short_reason).
reason is empty on success; otherwise a one-line description suitable for user reply.
When clip is (start, end), only that window is downloaded."""
cmd = [
YTDLP,
"--no-playlist",
# YouTube wraps URLs in a JS "n-sig" challenge; node solves it
# via yt-dlp-ejs. Without this, only image/thumb formats resolve.
"--js-runtimes", "node",
]
if clip is not None:
start, end = clip
# Download only the requested window instead of the whole video —
# essential for hour-long uploads shared with a ?t= timestamp.
# force-keyframes-at-cuts makes the start boundary accurate.
# The filesize filters below key off the *whole* video's size, which
# is irrelevant to a 60s slice, so drop them and just take best mp4.
cmd += [
"--download-sections", f"*{start}-{end}",
"--force-keyframes-at-cuts",
"-f", "best[ext=mp4]/best",
]
else:
# Prefer the largest mp4 that already fits under 95 MB, so we avoid
# re-encoding when a smaller variant exists (e.g. a 4K rendition
# >100 MB alongside a 1080p ~50 MB).
cmd += [
"-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best",
]
cmd += [
"--merge-output-format", "mp4",
*(["--cookies", COOKIES] if os.path.exists(COOKIES) else []),
"-o", outpath,
url,
]
delays = [0, 3, 8]
last_stderr = ""
for attempt, delay in enumerate(delays, 1):
if delay:
time.sleep(delay)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
cwd=tmpdir,
)
except subprocess.TimeoutExpired:
# Don't retry timeouts — three 120s timeouts would block the consumer for 6 min.
log.warning("yt-dlp timed out for %s", url)
return False, "yt-dlp timed out after 120s"
if result.returncode == 0:
return True, ""
last_stderr = result.stderr
log.warning(
"yt-dlp failed for %s (attempt %d/%d): %s",
url, attempt, len(delays), last_stderr.strip()[-300:],
)
return False, _summarize_ytdlp_error(last_stderr)
def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
@@ -321,7 +443,7 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
Returns (path, reason). On success, path is set and reason is "". Returns (path, reason). On success, path is set and reason is "".
On failure, path is None and reason is a short human-readable cause. On failure, path is None and reason is a short human-readable cause.
""" Blocking — only called from within _produce_video/_speed_video/_reverse_video."""
outpath = os.path.join(tmpdir, "reencoded.mp4") outpath = os.path.join(tmpdir, "reencoded.mp4")
try: try:
probe = subprocess.run( probe = subprocess.run(
@@ -330,10 +452,17 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
capture_output=True, text=True, timeout=30, capture_output=True, text=True, timeout=30,
) )
duration = float(probe.stdout.strip()) duration = float(probe.stdout.strip())
except Exception: except Exception: # noqa: BLE001
log.warning("Could not probe video duration") log.warning("Could not probe video duration")
return None, "could not read video duration" return None, "could not read video duration"
# Guard against 0/NaN duration before the bitrate math (else ZeroDivisionError
# or "cannot convert float NaN to integer" would escape and silently kill the
# handler with no reply to the user). `not (duration > 0)` catches 0, negative, NaN.
if not (duration > 0):
log.warning("Invalid video duration from ffprobe: %r", duration)
return None, "could not read video duration"
# Target 95 MB to leave headroom # Target 95 MB to leave headroom
target_bytes = 95 * 1024 * 1024 target_bytes = 95 * 1024 * 1024
# Total bitrate in kbps; reserve 128k for audio # Total bitrate in kbps; reserve 128k for audio
@@ -346,6 +475,8 @@ def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]:
[ [
"ffmpeg", "-y", "-i", input_file, "ffmpeg", "-y", "-i", input_file,
"-c:v", "libx264", "-b:v", f"{video_bitrate}k", "-c:v", "libx264", "-b:v", f"{video_bitrate}k",
# Cap peaks so VBR overshoot can't blow past the target size.
"-maxrate", f"{video_bitrate}k", "-bufsize", f"{2 * video_bitrate}k",
"-c:a", "aac", "-b:a", f"{audio_bitrate}k", "-c:a", "aac", "-b:a", f"{audio_bitrate}k",
"-preset", "fast", "-preset", "fast",
"-movflags", "+faststart", "-movflags", "+faststart",
@@ -381,7 +512,7 @@ def _has_audio_stream(path: str) -> bool:
"-show_entries", "stream=index", "-of", "csv=p=0", path], "-show_entries", "stream=index", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15, capture_output=True, text=True, timeout=15,
) )
except Exception: except Exception: # noqa: BLE001
return True # assume yes; ffmpeg will fail loudly if it's wrong return True # assume yes; ffmpeg will fail loudly if it's wrong
return bool(result.stdout.strip()) return bool(result.stdout.strip())
@@ -393,7 +524,7 @@ def _audio_sample_rate(path: str) -> int | None:
"-show_entries", "stream=sample_rate", "-of", "csv=p=0", path], "-show_entries", "stream=sample_rate", "-of", "csv=p=0", path],
capture_output=True, text=True, timeout=15, capture_output=True, text=True, timeout=15,
) )
except Exception: except Exception: # noqa: BLE001
return None return None
out = r.stdout.strip() out = r.stdout.strip()
try: try:
@@ -402,6 +533,107 @@ def _audio_sample_rate(path: str) -> int | None:
return None return None
def _speed_video(b64: str, speed: float) -> tuple[str | None, str | None]:
"""Blocking: change playback speed. Returns (b64, error). Run via to_thread."""
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "sped.mp4")
try:
data = base64.b64decode(b64)
except (binascii.Error, ValueError):
return None, "That video looks corrupted — share it again."
with open(inpath, "wb") as f:
f.write(data)
cmd = [
"ffmpeg", "-y", "-i", inpath,
"-filter:v", f"setpts={1.0/speed:g}*PTS",
]
if _has_audio_stream(inpath):
# asetrate scales the sample rate (which shifts pitch AND
# tempo, the tape-speed effect); aresample brings the data
# rate back to a standard playback rate without undoing it.
rate = _audio_sample_rate(inpath) or 48000
cmd += [
"-filter:a",
f"asetrate={int(rate * speed)},aresample={rate}",
]
else:
cmd += ["-an"]
cmd += ["-preset", "fast", "-movflags", "+faststart", outpath]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out speeding up video")
return None, "Timed out speeding up that video."
if result.returncode != 0:
log.warning("ffmpeg /speed failed: %s", result.stderr[-500:])
return None, "Failed to speed up that video."
final_file = outpath
size = os.path.getsize(final_file)
if size > MAX_FILE_SIZE:
size_mb = size // (1024 * 1024)
log.info("Sped-up video is %d MB, re-encoding to fit", size_mb)
new_path, reason = _reencode(final_file, tmpdir)
if new_path is None:
return None, f"Sped-up video is too large ({size_mb} MB) and re-encoding failed: {reason}."
final_file = new_path
with open(final_file, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8"), None
def _reverse_video(b64: str) -> tuple[str | None, str | None]:
"""Blocking: reverse a video. Returns (b64, error). Run via to_thread."""
with tempfile.TemporaryDirectory() as tmpdir:
inpath = os.path.join(tmpdir, "input.mp4")
outpath = os.path.join(tmpdir, "reversed.mp4")
try:
data = base64.b64decode(b64)
except (binascii.Error, ValueError):
return None, "That video looks corrupted — share it again."
with open(inpath, "wb") as f:
f.write(data)
try:
result = subprocess.run(
[
"ffmpeg", "-i", inpath,
"-vf", "reverse",
"-af", "areverse",
"-preset", "fast",
"-movflags", "+faststart",
outpath,
],
capture_output=True, text=True, timeout=120,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out reversing video")
return None, "Timed out reversing that video."
if result.returncode != 0:
log.warning("ffmpeg reverse failed: %s", result.stderr[-500:])
return None, "Failed to reverse that video."
final_file = outpath
size = os.path.getsize(final_file)
if size > MAX_FILE_SIZE:
size_mb = size // (1024 * 1024)
log.info("Reversed video is %d MB, re-encoding to fit", size_mb)
new_path, reason = _reencode(final_file, tmpdir)
if new_path is None:
return None, f"Reversed video is too large ({size_mb} MB) and re-encoding failed: {reason}."
final_file = new_path
with open(final_file, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8"), None
class SpeedCommand(Command): class SpeedCommand(Command):
SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE) SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE)
@@ -420,79 +652,30 @@ class SpeedCommand(Command):
try: try:
speed = float(speed_arg.rstrip("xX")) speed = float(speed_arg.rstrip("xX"))
except ValueError: except ValueError:
await c.reply(f"`{speed_arg}` isn't a number. Try `/speed 2`.") await _safe_reply(c, f"`{speed_arg}` isn't a number. Try `/speed 2`.")
return return
if not (0.1 <= speed <= 100.0): if not (0.1 <= speed <= 100.0):
await c.reply(f"Speed must be between 0.1 and 100 (got {speed:g}).") await _safe_reply(c, f"Speed must be between 0.1 and 100 (got {speed:g}).")
return return
if speed == 1.0: if speed == 1.0:
await c.reply("1x wouldn't change anything.") await _safe_reply(c, "1x wouldn't change anything.")
return return
b64 = _get_video(c.message.group) sender = _sender_number(c.message)
b64 = _get_video(c.message.group, sender)
if not b64: if not b64:
await c.reply("No video to speed up.") await _safe_reply(c, "No video to speed up.")
return return
with tempfile.TemporaryDirectory() as tmpdir: async with _get_job_semaphore():
inpath = os.path.join(tmpdir, "input.mp4") out, err = await asyncio.to_thread(_speed_video, b64, speed)
outpath = os.path.join(tmpdir, "sped.mp4") if err:
await _safe_reply(c, err)
return
with open(inpath, "wb") as f: _set_video(c.message.group, sender, out)
f.write(base64.b64decode(b64)) await _safe_send_video(c, out)
cmd = [
"ffmpeg", "-y", "-i", inpath,
"-filter:v", f"setpts={1.0/speed:g}*PTS",
]
if _has_audio_stream(inpath):
# asetrate scales the sample rate (which shifts pitch AND
# tempo, the tape-speed effect); aresample brings the data
# rate back to a standard playback rate without undoing it.
rate = _audio_sample_rate(inpath) or 48000
cmd += [
"-filter:a",
f"asetrate={int(rate * speed)},aresample={rate}",
]
else:
cmd += ["-an"]
cmd += ["-preset", "fast", "-movflags", "+faststart", outpath]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=180,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out speeding up video")
await c.reply("Timed out speeding up that video.")
return
if result.returncode != 0:
log.warning("ffmpeg /speed failed: %s", result.stderr[-500:])
await c.reply("Failed to speed up that video.")
return
final_file = outpath
size = os.path.getsize(final_file)
if size > MAX_FILE_SIZE:
size_mb = size // (1024 * 1024)
log.info("Sped-up video is %d MB, re-encoding to fit", size_mb)
new_path, reason = _reencode(final_file, tmpdir)
if new_path is None:
await c.reply(
f"Sped-up video is too large ({size_mb} MB) and "
f"re-encoding failed: {reason}."
)
return
final_file = new_path
with open(final_file, "rb") as f:
sped_bytes = f.read()
b64_sped = base64.b64encode(sped_bytes).decode("utf-8")
_set_video(c.message.group, b64_sped)
await c.send("", base64_attachments=[b64_sped])
class ReverseCommand(Command): class ReverseCommand(Command):
@@ -501,47 +684,20 @@ class ReverseCommand(Command):
if not c.message.is_group(): if not c.message.is_group():
return return
b64 = _get_video(c.message.group) sender = _sender_number(c.message)
b64 = _get_video(c.message.group, sender)
if not b64: if not b64:
await c.reply("No video to reverse.") await _safe_reply(c, "No video to reverse.")
return return
with tempfile.TemporaryDirectory() as tmpdir: async with _get_job_semaphore():
inpath = os.path.join(tmpdir, "input.mp4") out, err = await asyncio.to_thread(_reverse_video, b64)
outpath = os.path.join(tmpdir, "reversed.mp4") if err:
await _safe_reply(c, err)
return
with open(inpath, "wb") as f: _set_video(c.message.group, sender, out)
f.write(base64.b64decode(b64)) await _safe_send_video(c, out)
try:
result = subprocess.run(
[
"ffmpeg", "-i", inpath,
"-vf", "reverse",
"-af", "areverse",
"-preset", "fast",
outpath,
],
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
log.warning("ffmpeg timed out reversing video")
await c.reply("Timed out reversing that video.")
return
if result.returncode != 0:
log.warning("ffmpeg failed: %s", result.stderr)
await c.reply("Failed to reverse that video.")
return
with open(outpath, "rb") as f:
reversed_bytes = f.read()
b64_reversed = base64.b64encode(reversed_bytes).decode("utf-8")
_set_video(c.message.group, b64_reversed)
await c.send("", base64_attachments=[b64_reversed])
HELP_TEXT = f"""🎬 Video bot — what I can do HELP_TEXT = f"""🎬 Video bot — what I can do
@@ -572,7 +728,7 @@ class HelpCommand(Command):
return return
if (c.message.text or "").strip().lower() not in ("/help", "/commands"): if (c.message.text or "").strip().lower() not in ("/help", "/commands"):
return return
await c.reply(HELP_TEXT) await _safe_reply(c, HELP_TEXT)
def _sender_number(msg) -> str | None: def _sender_number(msg) -> str | None:
@@ -583,7 +739,7 @@ def _sender_number(msg) -> str | None:
try: try:
env = json.loads(msg.raw_message)["envelope"] env = json.loads(msg.raw_message)["envelope"]
return env.get("source") or env.get("sourceNumber") return env.get("source") or env.get("sourceNumber")
except Exception: except Exception: # noqa: BLE001
return None return None
@@ -611,11 +767,11 @@ class CookiesCommand(Command):
ig_lines.append(normalized) ig_lines.append(normalized)
if not ig_lines: if not ig_lines:
await c.reply("No `.instagram.com` cookie lines found.") await _safe_reply(c, "No `.instagram.com` cookie lines found.")
return return
if not any("\tsessionid\t" in ln for ln in ig_lines): if not any("\tsessionid\t" in ln for ln in ig_lines):
await c.reply("Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.") await _safe_reply(c, "Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.")
return return
try: try:
@@ -627,14 +783,16 @@ class CookiesCommand(Command):
kept = [ln for ln in existing if not cookie_re.match(ln)] kept = [ln for ln in existing if not cookie_re.match(ln)]
new_content = "\n".join(kept + ig_lines) + "\n" new_content = "\n".join(kept + ig_lines) + "\n"
# Create the temp file 0600 from the start: open()+chmod left a brief
# world-readable window exposing the Instagram session cookie.
tmppath = COOKIES + ".tmp" tmppath = COOKIES + ".tmp"
with open(tmppath, "w") as f: fd = os.open(tmppath, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w") as f:
f.write(new_content) f.write(new_content)
os.chmod(tmppath, 0o600)
os.replace(tmppath, COOKIES) os.replace(tmppath, COOKIES)
log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender) log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender)
await c.reply(f"Installed {len(ig_lines)} Instagram cookies.") await _safe_reply(c, f"Installed {len(ig_lines)} Instagram cookies.")
def main(): def main():
@@ -649,6 +807,9 @@ def main():
bot = SignalBot({ bot = SignalBot({
"signal_service": signal_service, "signal_service": signal_service,
"phone_number": phone_number, "phone_number": phone_number,
# Don't let the library download+base64 every attachment of every message
# on the producer loop. VideoTracker fetches only video attachments lazily.
"download_attachments": False,
}) })
bot.register(VideoTracker(), contacts=False, groups=True) bot.register(VideoTracker(), contacts=False, groups=True)
bot.register(VideoCommand(), contacts=False, groups=True) bot.register(VideoCommand(), contacts=False, groups=True)
+11 -1
View File
@@ -1,6 +1,10 @@
services: services:
signal-cli-rest-api: signal-cli-rest-api:
image: bbernhard/signal-cli-rest-api:latest # Pinned by digest so a re-pull/recreate can't silently swap in a build with
# a protocol change (the getServerGuid-style breakage). This digest is
# signal-cli-rest-api with signal-cli 0.100 (build 2). Upgrade deliberately:
# `docker pull bbernhard/signal-cli-rest-api:latest`, test, then update this.
image: bbernhard/signal-cli-rest-api@sha256:2399d449123cdad56c4d859277e3b9127e1a00c4d2ab4601c239882609286cf8
container_name: signal-cli-rest-api container_name: signal-cli-rest-api
restart: unless-stopped restart: unless-stopped
environment: environment:
@@ -9,3 +13,9 @@ services:
- "127.0.0.1:8080:8080" - "127.0.0.1:8080:8080"
volumes: volumes:
- ./signal-cli-data:/home/.local/share/signal-cli - ./signal-cli-data:/home/.local/share/signal-cli
healthcheck:
test: ["CMD", "curl", "-fsS", "http://127.0.0.1:8080/v1/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
+4 -3
View File
@@ -1,5 +1,6 @@
signalbot signalbot==1.1.0
yt-dlp # yt-dlp is kept current for site/extractor changes; bump the floor deliberately.
yt-dlp>=2026.3.17
# Needed for YouTube: yt-dlp wraps URLs in a JS "n-sig" challenge that a JS # Needed for YouTube: yt-dlp wraps URLs in a JS "n-sig" challenge that a JS
# runtime must solve. Requires a system `node` on PATH plus this package. # runtime must solve. Requires a system `node` on PATH plus this package.
yt-dlp-ejs yt-dlp-ejs>=0.8.0
+3 -1
View File
@@ -13,7 +13,9 @@ Environment=SIGNAL_SERVICE=127.0.0.1:8080
# Optional: comma-separated numbers allowed to run /cookies (admin command). # Optional: comma-separated numbers allowed to run /cookies (admin command).
# Environment=BOT_ADMINS=+15551234567 # Environment=BOT_ADMINS=+15551234567
ExecStart=/home/YOUR_USER/signal-bot/venv/bin/python bot.py ExecStart=/home/YOUR_USER/signal-bot/venv/bin/python bot.py
Restart=on-failure # always (not on-failure): the bot's library swallows exceptions in its own run
# loop and never exits non-zero even when wedged, so on-failure would never bounce it.
Restart=always
RestartSec=10 RestartSec=10
[Install] [Install]