import logging import os import subprocess from typing import List, Optional from tempfile import NamedTemporaryFile from telegram import Update from telegram.ext import ( Updater, CommandHandler, DictPersistence, CallbackContext, ) from telegram.parsemode import ParseMode from parser import SafeArgumentParser, safe_str class KollagenBot: def __init__( self, tg_token: str, kollagen_path: str, base_dir: Optional[str] ) -> None: self.logger = logging.getLogger("kollagen") self.kollagen_path = kollagen_path self.base_dir = os.path.abspath(base_dir) if base_dir else None self._init_parser() self.updater = Updater( tg_token, persistence=DictPersistence(user_data_json="{}") ) dispatcher = self.updater.dispatcher dispatcher.add_handler(CommandHandler("start", self.tg_start)) dispatcher.add_handler(CommandHandler("help", self.tg_help)) dispatcher.add_handler(CommandHandler("generate", self.tg_generate)) dispatcher.add_handler(CommandHandler("g", self.tg_generate)) dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate)) dispatcher.add_handler(CommandHandler("r", self.tg_regenerate)) dispatcher.add_error_handler(self.tg_error) def _init_parser(self): parser = SafeArgumentParser(prog="/generate", add_help=False) parser.add_argument( "directories", metavar="path", type=safe_str, nargs="*", default=[self.base_dir] if self.base_dir else [], help="Directories or files to process. By default, the entire base directory is processed.", ) parser.add_argument( "-m", dest="mode", metavar="mode", type=safe_str, nargs="?", const=True, help=f"Collage modes to use. By default, one is chosen at random. Multiple modes can be specified, separated by commas. When no value is specified, all modes are listed.", ) parser.add_argument( "-n", dest="num_images", metavar="N", type=int, help=f"How many images to use in a single collage. Random (or collage-dependant) by default.", ) parser.add_argument( "-w", dest="width", type=int, default=640, help=f"Width of resulting output (in px). 640px by default.", ) parser.add_argument( "-h", dest="height", type=int, default=640, help=f"Height of resulting output (in px). 640px by default.", ) parser.add_argument( "--rm", dest="recursive_modes", type=safe_str, help=f"Collage modes (comma-separated) to use in a recursive collage. All by default.", ) parser.add_argument( "--rl", dest="recursive_level", type=int, default=2, help=f"Level/depth of recursive collage. 2 by default.", ) parser.add_argument( "--rr", dest="recursive_repeat", action="store_true", help=f"Allow repeating images in (different levels of) recursive collages. False by default.", ) self.parser = parser def _get_modes(self): modes = subprocess.run( [self.kollagen_path, "-m"], check=True, capture_output=True ) return modes.stdout.decode("utf-8").strip().split(", ") def tg_start(self, update: Update, context: CallbackContext): update.message.reply_text("Hi! I make random collages. Check out https://gitlab.com/tmladek/kollagen and /help. Here's one to get you started:") self._process([], update) def tg_generate(self, update: Update, context: CallbackContext): cmd_line = update.message.text.split(" ")[1:] success = self._process(cmd_line, update) if success and context.user_data is not None: context.user_data["last_cmd_line"] = cmd_line def tg_regenerate(self, update: Update, context: CallbackContext): if context.user_data and context.user_data.get("last_cmd_line"): self._process(context.user_data["last_cmd_line"], update) else: update.message.reply_text("No previous command to regenerate!") def _process(self, cmd_line: List[str], update: Update): self.logger.info( f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`" ) args = self.parser.parse_args(cmd_line) if args.mode is True: update.message.reply_text( f"Available modes: {', '.join(self._get_modes())}" ) return False directories = [] for dir in args.directories: for possible in [ os.path.join(self.base_dir or "./", dir), os.path.join(self.base_dir or "./", dir.upper()), ]: if os.path.exists(possible): directories.append(possible) break else: raise FileNotFoundError(f'"{dir}" does not exist.') mode = ["-m", args.mode] if args.mode else [] num_images = ["-n", str(args.num_images)] if args.num_images else [] recursive_level = ( ["--rl", str(args.recursive_level)] if args.recursive_level else [] ) recursive_repeat = ["--rr"] if args.recursive_repeat else [] recursive_modes = ( ["--rm", str(args.recursive_modes)] if args.recursive_modes else [] ) with NamedTemporaryFile(suffix=".png") as ntf: shell_cmd_line = [ self.kollagen_path, *directories, "-w", str(args.width), "-h", str(args.height), *mode, *num_images, *recursive_level, *recursive_repeat, *recursive_modes, "-o", ntf.name, ] self.logger.debug(f"Running: " + str(shell_cmd_line)) result = subprocess.run( shell_cmd_line, check=True, capture_output=True, timeout=60, env={ 'NO_COLOR': "1" } ) ntf.seek(0) used_line = next( ( line for line in result.stdout.decode("utf-8").splitlines() if line.startswith("Used: ") ), "", ).replace(f"{self.base_dir}/" if self.base_dir else "", "") caption = "" caption += ( f"`{' '.join(['/generate', *cmd_line])}`\n" if len(cmd_line) else "" ) caption += used_line.replace("_", "\\_") caption = caption[:200] update.message.reply_photo( ntf, caption=caption, parse_mode=ParseMode.MARKDOWN, ) return True def tg_help(self, update: Update, context: CallbackContext): update.message.reply_text( f"```{self.parser.format_help()}```", parse_mode=ParseMode.MARKDOWN ) def tg_error(self, update: object, context: CallbackContext) -> None: self.logger.error( msg="Exception while handling an update:", exc_info=context.error ) if isinstance(update, Update): if isinstance(context.error, subprocess.CalledProcessError): error_display = context.error.stderr.decode('utf-8') else: error_display = str(context.error) error_display = error_display[:2500] update.message.reply_text( f"Something is fucked!\n{error_display}" ) def start_idle(self): self.updater.start_polling() self.updater.idle() def main() -> None: logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=getattr(logging, os.getenv("LOG_LEVEL", "info").upper()), ) tg_token = os.getenv("TG_TOKEN") if not tg_token: logging.error("TG_TOKEN is required.") exit(1) if env_kollagen_path := os.getenv("KOLLAGEN_PATH"): if os.path.exists(env_kollagen_path): kollagen_path = env_kollagen_path else: logging.error(f"kollagen not found! {env_kollagen_path} does not exist.") exit(1) else: which = subprocess.run(["which", "kollagen"], capture_output=True) try: which.check_returncode() kollagen_path = which.stdout.decode("utf-8").strip() except subprocess.CalledProcessError: logging.error( "kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH." ) exit(1) bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR")) bot.start_idle() if __name__ == "__main__": main()