from genericpath import isdir import logging import os import subprocess from typing import Optional import re from tempfile import NamedTemporaryFile from telegram import Update, ForceReply from telegram.ext import ( Updater, CommandHandler, DictPersistence, CallbackContext, ) def start(update: Update, context: CallbackContext) -> None: """Send a message when the command /start is issued.""" user = update.effective_user update.message.reply_markdown_v2( fr"Hi {user.mention_markdown_v2()}\!", reply_markup=ForceReply(selective=True), ) class KollagenBot: def __init__( self, tg_token: str, kollagen_path: str, base_dir: Optional[str] ) -> None: self.logger = logging.getLogger("kollagen") self.kollagen_path = kollagen_path self.base_dir = base_dir self.updater = Updater(tg_token, persistence=DictPersistence()) dispatcher = self.updater.dispatcher dispatcher.add_handler(CommandHandler("start", start)) dispatcher.add_handler(CommandHandler("list_modes", self.tg_list_modes)) dispatcher.add_handler(CommandHandler("generate", self.tg_generate)) dispatcher.add_handler(CommandHandler("g", self.tg_generate)) dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate)) dispatcher.add_handler(CommandHandler("r", self.tg_regenerate)) dispatcher.add_error_handler(self.tg_error) def tg_generate(self, update: Update, context: CallbackContext): cmd_line = update.message.text.split(" ")[1:] self._process(cmd_line, update) if context.user_data: context.user_data["last_cmd_line"] = cmd_line def tg_regenerate(self, update: Update, context: CallbackContext): if context.user_data and context.user_data.get("last_cmd_line"): self._process(context.user_data["last_cmd_line"], update) else: update.message.reply_text("No previous command to regenerate!") def _process(self, cmd_line: str, update: Update): self.logger.info(f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`") directories = [ os.path.join(self.base_dir or "./", re.sub(r"[^a-zA-Z0-9]", "", arg)) for arg in cmd_line ] directories = [dir for dir in directories if os.path.isdir(dir)] if len(directories) == 0 and self.base_dir: directories = [self.base_dir] with NamedTemporaryFile(suffix=".png") as ntf: subprocess.run( [self.kollagen_path, "-r", *directories, "-o", ntf.name], check=True, capture_output=True, ) ntf.seek(0) update.message.reply_photo(ntf) def tg_list_modes(self, update: Update, context: CallbackContext): modes = subprocess.run( [self.kollagen_path, "-m"], check=True, capture_output=True ) update.message.reply_text(f"Available modes: {modes.stdout.decode('utf-8')}") def tg_error(self, update: object, context: CallbackContext) -> None: self.logger.error( msg="Exception while handling an update:", exc_info=context.error ) # # traceback.format_exception returns the usual python message about an exception, but as a # # list of strings rather than a single string, so we have to join them together. # tb_list = traceback.format_exception( # None, context.error, context.error.__traceback__ # ) # tb_string = "".join(tb_list) # # Build the message with some markup and additional information about what happened. # # You might need to add some logic to deal with messages longer than the 4096 character limit. # update_str = update.to_dict() if isinstance(update, Update) else str(update) # message = ( # f"An exception was raised while handling an update\n" # f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
        #     "
\n\n" # f"
context.chat_data = {html.escape(str(context.chat_data))}
\n\n" # f"
context.user_data = {html.escape(str(context.user_data))}
\n\n" # f"
{html.escape(tb_string)}
" # ) # # Finally, send the message # context.bot.send_message( # chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML # ) def start_idle(self): self.updater.start_polling() self.updater.idle() def main() -> None: logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) tg_token = os.getenv("TG_TOKEN") if not tg_token: logging.error("TG_TOKEN is required.") exit(1) if env_kollagen_path := os.getenv("KOLLAGEN_PATH"): if os.path.exists(env_kollagen_path): kollagen_path = env_kollagen_path else: logging.error(f"kollagen not found! {env_kollagen_path} does not exist.") exit(1) else: which = subprocess.run(["which", "kollagen"], capture_output=True) try: which.check_returncode() kollagen_path = which.stdout.decode("utf-8").strip() except subprocess.CalledProcessError: logging.error( "kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH." ) exit(1) bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR")) bot.start_idle() if __name__ == "__main__": main()