import asyncio import base64 import binascii import json import logging import os import re import subprocess import tempfile import time from signalbot import Command, Context, SignalBot from signalbot.command import regex_triggered, triggered from signalbot.message import MessageType # Path segments before /status/ are [^/\s]+ (not .+): a greedy .+ matched across # spaces, merging two space-separated tweet links into one malformed URL. The # (?:/[^/\s]+)* allows multi-segment forms like x.com/i/web/status/ while # still refusing whitespace (so two links can't merge). TWITTER_URL_PATTERN = r"https?://(?:www\.)?(?:twitter\.com|x\.com|fxtwitter\.com|vxtwitter\.com|fixupx\.com)/[^/\s]+(?:/[^/\s]+)*/status/\d+" INSTAGRAM_URL_PATTERN = r"https?://(?:www\.)?instagram\.com/(?:reel|p)/[\w-]+" YOUTUBE_URL_PATTERN = r"https?://(?:www\.)?(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)[\w-]+" TIKTOK_URL_PATTERN = r"https?://(?:(?:www|m)\.tiktok\.com/(?:@[\w.-]+/video/\d+|t/\w+|v/\d+)|(?:vm|vt)\.tiktok\.com/\w+)" VIDEO_URL_PATTERN = rf"(?:{TWITTER_URL_PATTERN}|{INSTAGRAM_URL_PATTERN}|{YOUTUBE_URL_PATTERN}|{TIKTOK_URL_PATTERN})" MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB CLIP_DURATION = 60 # default seconds to grab around a shared ?t= timestamp MAX_CLIP_DURATION = 600 # ceiling for a user-supplied /clip override # Cap simultaneous heavy media jobs (yt-dlp/ffmpeg) so a busy group can't spawn # unbounded subprocesses. Each job runs in a worker thread (see _run_blocking). MAX_CONCURRENT_JOBS = 2 YTDLP = os.path.join(os.path.dirname(os.path.abspath(__file__)), "venv", "bin", "yt-dlp") COOKIES = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") STATE_DB = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bot-state.db") ADMIN_NUMBERS = {n.strip() for n in os.environ.get("BOT_ADMINS", "").split(",") if n.strip()} logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) log = logging.getLogger("signal-bot") # (group_id, sender) -> {"b64": ..., "time": monotonic}. Keyed per sender so a # stranger's later video doesn't silently become the target of your /speed or # /rev; _get_video falls back to the group's latest video when you have none. # Intentionally in-memory ONLY: each value is a base64 video up to ~133 MB, so # persisting these to SQLite would bloat the DB. Only the dedup map below persists. last_video = {} VIDEO_TTL = 3600 # 1 hour # (group_id, url) -> wall-clock epoch seconds the URL was successfully handled. # When a user edits a message, signal-cli redelivers it as MessageType.EDIT_MESSAGE # with the same text — without this guard the bot re-downloads and re-posts the video. # Persisted to SQLite so a restart doesn't forget recent dedup; tiny and TTL-bounded. # Wall-clock (not monotonic) so the stored timestamps stay meaningful across restarts. recent_urls = {} RECENT_URL_TTL = 600 # 10 min # (group_id, url) currently being downloaded — guards against an edit redelivery # arriving mid-download (before recent_urls is set) triggering a second download. _inflight = set() # Lazily created so it binds to the bot's running event loop, not import time. _job_semaphore = None # signalbot SQLite key/value store (set in main()); persists recent_urls only. _storage = None _RECENT_URLS_KEY = "recent_urls" _RECENT_SEP = "\x1f" # joins (group, url) into one JSON-safe storage key def _get_job_semaphore() -> asyncio.Semaphore: global _job_semaphore if _job_semaphore is None: _job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS) return _job_semaphore def _sweep_videos() -> None: now = time.monotonic() for key in [k for k, e in last_video.items() if now - e["time"] > VIDEO_TTL]: del last_video[key] def _set_video(group_id, sender, b64) -> None: _sweep_videos() last_video[(group_id, sender)] = {"b64": b64, "time": time.monotonic()} def _get_video(group_id, sender): """The caller's own most recent video, else the group's latest non-expired one.""" _sweep_videos() entry = last_video.get((group_id, sender)) if entry: return entry["b64"] best = None for (g, _s), e in last_video.items(): if g != group_id: continue if best is None or e["time"] > best["time"]: best = e return best["b64"] if best else None def _sweep_recent_urls() -> None: now = time.time() for key in [k for k, t in recent_urls.items() if now - t > RECENT_URL_TTL]: del recent_urls[key] def _load_recent_urls() -> None: """Restore the (pruned) dedup map from SQLite on startup.""" if _storage is None: return try: if not _storage.exists(_RECENT_URLS_KEY): return data = _storage.read(_RECENT_URLS_KEY) or {} except Exception as e: # noqa: BLE001 log.warning("Could not load dedup state: %s", e) return now = time.time() for joined, ts in data.items(): if not isinstance(ts, (int, float)) or now - ts > RECENT_URL_TTL: continue group_id, _, url = joined.partition(_RECENT_SEP) recent_urls[(group_id, url)] = ts log.info("Restored %d dedup entries from %s", len(recent_urls), STATE_DB) def _persist_recent_urls() -> None: if _storage is None: return try: _storage.save( _RECENT_URLS_KEY, {f"{g}{_RECENT_SEP}{u}": ts for (g, u), ts in recent_urls.items()}, ) except Exception as e: # noqa: BLE001 log.warning("Could not persist dedup state: %s", e) def _url_recently_handled(group_id, url) -> bool: _sweep_recent_urls() return (group_id, url) in recent_urls def _url_busy(group_id, url) -> bool: return (group_id, url) in _inflight or _url_recently_handled(group_id, url) def _mark_url_handled(group_id, url) -> None: _sweep_recent_urls() recent_urls[(group_id, url)] = time.time() _persist_recent_urls() async def _safe_reply(c: Context, text: str) -> None: try: await c.reply(text) except Exception as e: # noqa: BLE001 log.warning("Failed to send reply: %s", e) async def _safe_send_video(c: Context, b64: str) -> bool: try: await c.send("", base64_attachments=[b64]) return True except Exception as e: # noqa: BLE001 log.warning("Failed to send video: %s", e) await _safe_reply(c, "Couldn't send that video (Signal may have rejected it).") return False # Errors that mean "the link simply has no downloadable video" rather than a # genuine failure. We stay silent for these — the bot watches every message with # a link, so most links legitimately have no video and shouldn't draw a complaint. # NOTE: "unsupported url" is deliberately NOT here. Every URL that reaches yt-dlp # already matched one of our platform patterns, so "Unsupported URL" means a real # extractor/version break worth surfacing, not a quiet no-media link. _NO_MEDIA_ERROR_PATTERNS = ( "no video could be found", "there's no video", "no media found", "no video formats found", ) def _is_no_media_error(err: str) -> bool: e = err.lower() return any(p in e for p in _NO_MEDIA_ERROR_PATTERNS) def _summarize_ytdlp_error(stderr: str) -> str: """Pull a short, user-readable reason out of yt-dlp stderr.""" if not stderr: return "unknown error" error_lines = [ln.strip() for ln in stderr.splitlines() if ln.startswith("ERROR:")] if error_lines: msg = error_lines[-1][len("ERROR:"):].strip() # Strip "[extractor] video_id:" prefix yt-dlp prepends. msg = re.sub(r"^\[[^\]]+\]\s+\S+?:\s*", "", msg) # Trim verbose "Use --cookies..." tails that aren't useful to a chat user. msg = re.split(r"\s+(?:Use --cookies|See https?://)", msg, maxsplit=1)[0] return msg[:240].rstrip(". ") lines = [ln.strip() for ln in stderr.splitlines() if ln.strip()] return lines[-1][:240] if lines else "unknown error" def _parse_timestamp(value: str) -> int | None: """Parse a YouTube timestamp into seconds. Accepts plain seconds ("1509", "90s") and the h/m/s form ("25m9s", "1h2m3s"). Returns None for anything that isn't a recognizable time. """ if not value: return None m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s?)?", value.strip(), re.IGNORECASE) if not m or not any(m.groups()): return None h, mi, s = (int(g) if g else 0 for g in m.groups()) return h * 3600 + mi * 60 + s def _extract_timestamp(url_token: str) -> int | None: """Pull the start offset (seconds) out of a YouTube URL's t=/start= param.""" m = re.search(r"[?&#](?:t|start)=([0-9hms]+)", url_token, re.IGNORECASE) return _parse_timestamp(m.group(1)) if m else None def _message_data(envelope: dict) -> dict: """Find the data_message dict (which carries attachments) inside a raw envelope. Mirrors signalbot's own extraction so we also see attachments delivered via edit envelopes and synced (linked-device) messages, not just plain dataMessage. """ if "dataMessage" in envelope: return envelope["dataMessage"] or {} if "editMessage" in envelope: return envelope["editMessage"].get("dataMessage") or {} if "syncMessage" in envelope: sent = envelope["syncMessage"].get("sentMessage") or {} if "editMessage" in sent: return sent["editMessage"].get("dataMessage") or {} return sent return {} class VideoTracker(Command): """Watches group messages for video attachments and stores the last one per sender.""" async def handle(self, c: Context) -> None: if not c.message.is_group(): return try: envelope = json.loads(c.message.raw_message)["envelope"] except Exception: # noqa: BLE001 return attachments = _message_data(envelope).get("attachments") or [] att = next( (a for a in attachments if str(a.get("contentType", "")).startswith("video/")), None, ) if not att or not att.get("id"): return # download_attachments is disabled globally, so fetch only this video on # demand instead of letting the library base64 every attachment of every # message on the producer loop. try: b64 = await c.bot._signal.get_attachment(att["id"]) except Exception as e: # noqa: BLE001 log.warning("Failed to fetch video attachment %s: %s", att.get("id"), e) return _set_video(c.message.group, _sender_number(c.message), b64) log.info("Stored received video for group %s", c.message.group) class VideoCommand(Command): @regex_triggered(VIDEO_URL_PATTERN) async def handle(self, c: Context) -> None: if not c.message.is_group(): return text = c.message.text or "" matches = list(re.finditer(VIDEO_URL_PATTERN, text)) if not matches: return # An optional "/clip " anywhere in the message overrides the # default window length for any clip produced from this message. clip_len = CLIP_DURATION mclip = re.search(r"/clip\s+(\S+)", text, re.IGNORECASE) if mclip: secs = _parse_timestamp(mclip.group(1)) if secs is None or secs < 1: await _safe_reply(c, "`/clip` needs a length in seconds, e.g. `/clip 30`.") return clip_len = min(secs, MAX_CLIP_DURATION) is_edit = c.message.type == MessageType.EDIT_MESSAGE group = c.message.group sender = _sender_number(c.message) # Process URLs sequentially in message order. Each download already runs # off the event loop (asyncio.to_thread), so this doesn't block the bot, # and sequential keeps last_video deterministic (the last link wins) and # posts in order. `seen` drops a URL repeated within one message. seen = set() for m in matches: url = m.group(0) # The URL pattern stops at the video id, so any ?t=/&t= timestamp # lives in the characters that follow. Grab the whole whitespace- # delimited token to recover it. Timestamps/clips apply to YouTube. clip = None if re.match(YOUTUBE_URL_PATTERN, url): tok = re.match(r"\S+", text[m.start():]) start = _extract_timestamp(tok.group(0) if tok else url) if mclip: # Explicit /clip clips even without a timestamp (from 0). start = start or 0 clip = (start, start + clip_len) elif start is not None: clip = (start, start + clip_len) # Normalize fxtwitter/vxtwitter wrappers to x.com url = re.sub( r"https?://(?:www\.)?(?:fxtwitter\.com|vxtwitter\.com|fixupx\.com)", "https://x.com", url, ) if url in seen: continue seen.add(url) if is_edit and _url_busy(group, url): log.info("Skipping edited message; already handled %s", url) continue await self._process_one(c, group, sender, url, clip) async def _process_one(self, c: Context, group, sender, url, clip) -> None: key = (group, url) _inflight.add(key) try: if clip is not None: log.info("Clipping %s to window %d-%ds", url, clip[0], clip[1]) async with _get_job_semaphore(): b64, err, silent = await asyncio.to_thread(_produce_video, url, clip) if silent: # A genuine no-media determination: record it so an edit replay # doesn't pointlessly re-run yt-dlp. _mark_url_handled(group, url) return if err: # Hard error: do NOT mark handled, so a corrective edit can retry. await _safe_reply(c, f"Couldn't grab that video: {err}") return _mark_url_handled(group, url) _set_video(group, sender, b64) await _safe_send_video(c, b64) except Exception as e: # noqa: BLE001 log.exception("Unexpected error handling %s: %s", url, e) await _safe_reply(c, "Something went wrong handling that video.") finally: _inflight.discard(key) def _run_ytdlp(url: str, outpath: str, tmpdir: str, clip: tuple[int, int] | None = None) -> tuple[bool, str]: """Run yt-dlp with retries. Returns (success, short_reason). reason is empty on success; otherwise a one-line description suitable for user reply. When clip is (start, end), only that window is downloaded. Blocking — call via asyncio.to_thread, never directly on the event loop.""" cmd = [ YTDLP, "--no-playlist", # YouTube wraps URLs in a JS "n-sig" challenge; node solves it # via yt-dlp-ejs. Without this, only image/thumb formats resolve. "--js-runtimes", "node", ] if clip is not None: start, end = clip # Download only the requested window instead of the whole video — # essential for hour-long uploads shared with a ?t= timestamp. # force-keyframes-at-cuts makes the start boundary accurate. # The filesize filters below key off the *whole* video's size, which # is irrelevant to a 60s slice, so drop them and just take best mp4. cmd += [ "--download-sections", f"*{start}-{end}", "--force-keyframes-at-cuts", "-f", "best[ext=mp4]/best", ] else: # Prefer the largest mp4 that already fits under 95 MB, so we avoid # re-encoding when a smaller variant exists (e.g. a 4K rendition # >100 MB alongside a 1080p ~50 MB). cmd += [ "-f", "best[ext=mp4][filesize<95M]/best[ext=mp4][filesize_approx<95M]/best[ext=mp4]/best", ] cmd += [ "--merge-output-format", "mp4", *(["--cookies", COOKIES] if os.path.exists(COOKIES) else []), "-o", outpath, "--", url, ] delays = [0, 3, 8] last_stderr = "" for attempt, delay in enumerate(delays, 1): if delay: time.sleep(delay) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, cwd=tmpdir, ) except subprocess.TimeoutExpired: # Don't retry timeouts — three 120s timeouts would block a worker for 6 min. log.warning("yt-dlp timed out for %s", url) return False, "yt-dlp timed out after 120s" if result.returncode == 0: return True, "" last_stderr = result.stderr log.warning( "yt-dlp failed for %s (attempt %d/%d): %s", url, attempt, len(delays), last_stderr.strip()[-300:], ) return False, _summarize_ytdlp_error(last_stderr) def _produce_video(url: str, clip: tuple[int, int] | None) -> tuple[str | None, str | None, bool]: """Blocking pipeline: download (+optional clip), re-encode if oversized, base64. Returns (b64, error, silent): success -> (b64, None, False) no media -> (None, None, True) # stay quiet failure -> (None, "reason", False) Run via asyncio.to_thread — does all subprocess/file/base64 work off the loop.""" with tempfile.TemporaryDirectory() as tmpdir: outpath = os.path.join(tmpdir, "video.mp4") ok, err = _run_ytdlp(url, outpath, tmpdir, clip) if not ok: if _is_no_media_error(err): log.info("No video at %s (%s); staying silent", url, err) return None, None, True return None, err, False # Prefer the known output path; fall back to any mp4 yt-dlp produced. actual_file = outpath if os.path.exists(outpath) else None if actual_file is None: for f in os.listdir(tmpdir): if f.endswith(".mp4"): actual_file = os.path.join(tmpdir, f) break if actual_file is None: log.warning("No mp4 file found after yt-dlp for %s", url) return None, "yt-dlp finished but produced no mp4.", False file_size = os.path.getsize(actual_file) if file_size > MAX_FILE_SIZE: size_mb = file_size // (1024 * 1024) log.info("Video is %d MB, re-encoding to fit under %d MB", size_mb, MAX_FILE_SIZE // (1024 * 1024)) new_path, reason = _reencode(actual_file, tmpdir) if new_path is None: return None, f"That video is too large ({size_mb} MB) and re-encoding failed: {reason}.", False actual_file = new_path with open(actual_file, "rb") as f: return base64.b64encode(f.read()).decode("utf-8"), None, False def _reencode(input_file: str, tmpdir: str) -> tuple[str | None, str]: """Re-encode video with ffmpeg to fit under MAX_FILE_SIZE. Returns (path, reason). On success, path is set and reason is "". On failure, path is None and reason is a short human-readable cause. Blocking — only called from within _produce_video/_speed_video/_reverse_video.""" outpath = os.path.join(tmpdir, "reencoded.mp4") try: probe = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", input_file], capture_output=True, text=True, timeout=30, ) duration = float(probe.stdout.strip()) except Exception: # noqa: BLE001 log.warning("Could not probe video duration") return None, "could not read video duration" # Guard against 0/NaN duration before the bitrate math (else ZeroDivisionError # or "cannot convert float NaN to integer" would escape and silently kill the # handler with no reply to the user). `not (duration > 0)` catches 0, negative, NaN. if not (duration > 0): log.warning("Invalid video duration from ffprobe: %r", duration) return None, "could not read video duration" # Target 95 MB to leave headroom target_bytes = 95 * 1024 * 1024 # Total bitrate in kbps; reserve 128k for audio audio_bitrate = 128 total_bitrate = int((target_bytes * 8) / duration / 1000) video_bitrate = max(total_bitrate - audio_bitrate, 200) try: result = subprocess.run( [ "ffmpeg", "-y", "-i", input_file, "-c:v", "libx264", "-b:v", f"{video_bitrate}k", # Cap peaks so VBR overshoot can't blow past the target size. "-maxrate", f"{video_bitrate}k", "-bufsize", f"{2 * video_bitrate}k", "-c:a", "aac", "-b:a", f"{audio_bitrate}k", "-preset", "fast", "-movflags", "+faststart", outpath, ], capture_output=True, text=True, timeout=300, ) except subprocess.TimeoutExpired: log.warning("ffmpeg re-encode timed out") return None, "ffmpeg timed out after 300s" if result.returncode != 0: log.warning("ffmpeg re-encode failed: %s", result.stderr[-500:]) stderr_lines = [ln for ln in result.stderr.strip().splitlines() if ln.strip()] last_line = stderr_lines[-1] if stderr_lines else "no stderr" return None, f"ffmpeg exited {result.returncode} ({last_line[:160]})" final_size = os.path.getsize(outpath) if final_size > MAX_FILE_SIZE: final_mb = final_size // (1024 * 1024) log.warning("Re-encoded video still too large: %d MB", final_mb) return None, f"output still {final_mb} MB after re-encode (duration {int(duration)}s)" log.info("Re-encoded video from %d MB to %d MB", os.path.getsize(input_file) // (1024 * 1024), final_size // (1024 * 1024)) return outpath, "" def _has_audio_stream(path: str) -> bool: try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) except Exception: # noqa: BLE001 return True # assume yes; ffmpeg will fail loudly if it's wrong return bool(result.stdout.strip()) def _audio_sample_rate(path: str) -> int | None: try: r = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=sample_rate", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=15, ) except Exception: # noqa: BLE001 return None out = r.stdout.strip() try: return int(out) if out else None except ValueError: return None def _speed_video(b64: str, speed: float) -> tuple[str | None, str | None]: """Blocking: change playback speed. Returns (b64, error). Run via to_thread.""" with tempfile.TemporaryDirectory() as tmpdir: inpath = os.path.join(tmpdir, "input.mp4") outpath = os.path.join(tmpdir, "sped.mp4") try: data = base64.b64decode(b64) except (binascii.Error, ValueError): return None, "That video looks corrupted — share it again." with open(inpath, "wb") as f: f.write(data) cmd = [ "ffmpeg", "-y", "-i", inpath, "-filter:v", f"setpts={1.0/speed:g}*PTS", ] if _has_audio_stream(inpath): # asetrate scales the sample rate (which shifts pitch AND # tempo, the tape-speed effect); aresample brings the data # rate back to a standard playback rate without undoing it. rate = _audio_sample_rate(inpath) or 48000 cmd += [ "-filter:a", f"asetrate={int(rate * speed)},aresample={rate}", ] else: cmd += ["-an"] cmd += ["-preset", "fast", "-movflags", "+faststart", outpath] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) except subprocess.TimeoutExpired: log.warning("ffmpeg timed out speeding up video") return None, "Timed out speeding up that video." if result.returncode != 0: log.warning("ffmpeg /speed failed: %s", result.stderr[-500:]) return None, "Failed to speed up that video." final_file = outpath size = os.path.getsize(final_file) if size > MAX_FILE_SIZE: size_mb = size // (1024 * 1024) log.info("Sped-up video is %d MB, re-encoding to fit", size_mb) new_path, reason = _reencode(final_file, tmpdir) if new_path is None: return None, f"Sped-up video is too large ({size_mb} MB) and re-encoding failed: {reason}." final_file = new_path with open(final_file, "rb") as f: return base64.b64encode(f.read()).decode("utf-8"), None def _reverse_video(b64: str) -> tuple[str | None, str | None]: """Blocking: reverse a video. Returns (b64, error). Run via to_thread.""" with tempfile.TemporaryDirectory() as tmpdir: inpath = os.path.join(tmpdir, "input.mp4") outpath = os.path.join(tmpdir, "reversed.mp4") try: data = base64.b64decode(b64) except (binascii.Error, ValueError): return None, "That video looks corrupted — share it again." with open(inpath, "wb") as f: f.write(data) try: result = subprocess.run( [ "ffmpeg", "-i", inpath, "-vf", "reverse", "-af", "areverse", "-preset", "fast", "-movflags", "+faststart", outpath, ], capture_output=True, text=True, timeout=120, ) except subprocess.TimeoutExpired: log.warning("ffmpeg timed out reversing video") return None, "Timed out reversing that video." if result.returncode != 0: log.warning("ffmpeg reverse failed: %s", result.stderr[-500:]) return None, "Failed to reverse that video." final_file = outpath size = os.path.getsize(final_file) if size > MAX_FILE_SIZE: size_mb = size // (1024 * 1024) log.info("Reversed video is %d MB, re-encoding to fit", size_mb) new_path, reason = _reencode(final_file, tmpdir) if new_path is None: return None, f"Reversed video is too large ({size_mb} MB) and re-encoding failed: {reason}." final_file = new_path with open(final_file, "rb") as f: return base64.b64encode(f.read()).decode("utf-8"), None class SpeedCommand(Command): SPEED_RE = re.compile(r"^/speed(?:\s+(\S+))?$", re.IGNORECASE) async def handle(self, c: Context) -> None: if not c.message.is_group(): return text = (c.message.text or "").strip() m = self.SPEED_RE.match(text) if not m: return speed_arg = m.group(1) if speed_arg is None: speed = 2.0 else: try: speed = float(speed_arg.rstrip("xX")) except ValueError: await _safe_reply(c, f"`{speed_arg}` isn't a number. Try `/speed 2`.") return if not (0.1 <= speed <= 100.0): await _safe_reply(c, f"Speed must be between 0.1 and 100 (got {speed:g}).") return if speed == 1.0: await _safe_reply(c, "1x wouldn't change anything.") return sender = _sender_number(c.message) b64 = _get_video(c.message.group, sender) if not b64: await _safe_reply(c, "No video to speed up.") return async with _get_job_semaphore(): out, err = await asyncio.to_thread(_speed_video, b64, speed) if err: await _safe_reply(c, err) return _set_video(c.message.group, sender, out) await _safe_send_video(c, out) class ReverseCommand(Command): @triggered("/rev") async def handle(self, c: Context) -> None: if not c.message.is_group(): return sender = _sender_number(c.message) b64 = _get_video(c.message.group, sender) if not b64: await _safe_reply(c, "No video to reverse.") return async with _get_job_semaphore(): out, err = await asyncio.to_thread(_reverse_video, b64) if err: await _safe_reply(c, err) return _set_video(c.message.group, sender, out) await _safe_send_video(c, out) HELP_TEXT = f"""🎬 Video bot — what I can do Share a video link (X/Twitter, Instagram, YouTube, TikTok) and I'll post the video back to the group. e.g. https://x.com/user/status/123456789 A YouTube link with a timestamp → I post a {CLIP_DURATION}s clip starting at that moment. e.g. https://youtu.be/dQw4w9WgXcQ?t=90 /clip — set the clip length for a link in the same message (max {MAX_CLIP_DURATION}s). With a ?t= it sets the window; without one it clips from the start. e.g. /clip 30 https://youtu.be/dQw4w9WgXcQ?t=90 e.g. /clip 15 https://youtu.be/dQw4w9WgXcQ /speed [factor] — speed up the last video (default 2x). e.g. /speed /speed 4 /speed 0.5 /rev — reverse the last video. /help — show this message. (In a DM, admins can run /cookies to refresh Instagram login cookies.)""" class HelpCommand(Command): async def handle(self, c: Context) -> None: if not c.message.is_group(): return if (c.message.text or "").strip().lower() not in ("/help", "/commands"): return await _safe_reply(c, HELP_TEXT) def _sender_number(msg) -> str | None: for attr in ("source", "source_number", "sourceNumber"): v = getattr(msg, attr, None) if v: return v try: env = json.loads(msg.raw_message)["envelope"] return env.get("source") or env.get("sourceNumber") except Exception: # noqa: BLE001 return None class CookiesCommand(Command): async def handle(self, c: Context) -> None: text = c.message.text or "" if not text.startswith("/cookies"): return if c.message.is_group(): return sender = _sender_number(c.message) if not ADMIN_NUMBERS or sender not in ADMIN_NUMBERS: log.warning("Refused /cookies from %r (admins=%s)", sender, ADMIN_NUMBERS or "") return body = text.split("\n", 1)[1] if "\n" in text else "" cookie_re = re.compile(r"^\.?instagram\.com\b") ig_lines = [] for line in body.splitlines(): if not cookie_re.match(line): continue # Browser pastes can replace tabs with runs of spaces; restore tabs. normalized = re.sub(r"[ \t]{2,}", "\t", line) ig_lines.append(normalized) if not ig_lines: await _safe_reply(c, "No `.instagram.com` cookie lines found.") return if not any("\tsessionid\t" in ln for ln in ig_lines): await _safe_reply(c, "Missing `sessionid` cookie — that's the one that proves you're logged in. Re-export and try again.") return try: with open(COOKIES, "r") as f: existing = f.read().splitlines() except FileNotFoundError: existing = ["# Netscape HTTP Cookie File", ""] kept = [ln for ln in existing if not cookie_re.match(ln)] new_content = "\n".join(kept + ig_lines) + "\n" # Create the temp file 0600 from the start: open()+chmod left a brief # world-readable window exposing the Instagram session cookie. tmppath = COOKIES + ".tmp" fd = os.open(tmppath, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, "w") as f: f.write(new_content) os.replace(tmppath, COOKIES) log.info("Installed %d Instagram cookies from %s", len(ig_lines), sender) await _safe_reply(c, f"Installed {len(ig_lines)} Instagram cookies.") def main(): global _storage phone_number = os.environ.get("SIGNAL_PHONE_NUMBER") signal_service = os.environ.get("SIGNAL_SERVICE", "127.0.0.1:8080") if not phone_number: print("Error: SIGNAL_PHONE_NUMBER environment variable is required.") print("Example: export SIGNAL_PHONE_NUMBER='+15551234567'") raise SystemExit(1) bot = SignalBot({ "signal_service": signal_service, "phone_number": phone_number, # Don't let the library download+base64 every attachment of every message # on the producer loop. VideoTracker fetches only video attachments lazily. "download_attachments": False, # Tiny SQLite KV (a few KB): persists only the URL dedup map across # restarts. The big last-video blobs stay in memory by design. "storage": {"type": "sqlite", "sqlite_db": STATE_DB}, }) _storage = bot.storage _load_recent_urls() bot.register(VideoTracker(), contacts=False, groups=True) bot.register(VideoCommand(), contacts=False, groups=True) bot.register(ReverseCommand(), contacts=False, groups=True) bot.register(SpeedCommand(), contacts=False, groups=True) bot.register(HelpCommand(), contacts=False, groups=True) bot.register(CookiesCommand(), contacts=True, groups=False) log.info("Starting Signal video bot...") bot.start() if __name__ == "__main__": main()