kollagen/tgbot/kollagen-bot/main.py

134 lines
4.6 KiB
Python

from genericpath import isdir
import logging
import os
import subprocess
from typing import Optional
import re
from tempfile import NamedTemporaryFile
from telegram import Update, ForceReply
from telegram.ext import (
Updater,
CommandHandler,
DictPersistence,
CallbackContext,
)
def start(update: Update, context: CallbackContext) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
update.message.reply_markdown_v2(
fr"Hi {user.mention_markdown_v2()}\!",
reply_markup=ForceReply(selective=True),
)
class KollagenBot:
def __init__(
self, tg_token: str, kollagen_path: str, base_dir: Optional[str]
) -> None:
self.logger = logging.getLogger("kollagen")
self.kollagen_path = kollagen_path
self.base_dir = base_dir
self.updater = Updater(tg_token, persistence=DictPersistence())
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("list_modes", self.tg_list_modes))
dispatcher.add_handler(CommandHandler("generate", self.tg_generate))
dispatcher.add_handler(CommandHandler("g", self.tg_generate))
dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate))
dispatcher.add_handler(CommandHandler("r", self.tg_regenerate))
dispatcher.add_error_handler(self.tg_error)
def tg_generate(self, update: Update, context: CallbackContext):
cmd_line = update.message.text.split(" ")[1:]
self._process(cmd_line, update)
if context.user_data:
context.user_data["last_cmd_line"] = cmd_line
def tg_regenerate(self, update: Update, context: CallbackContext):
if context.user_data and context.user_data.get("last_cmd_line"):
self._process(context.user_data["last_cmd_line"], update)
else:
update.message.reply_text("No previous command to regenerate!")
def _process(self, cmd_line: str, update: Update):
self.logger.info(f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`")
directories = [
os.path.join(self.base_dir or "./", re.sub(r"[^a-zA-Z0-9]", "", arg))
for arg in cmd_line
]
directories = [dir for dir in directories if os.path.isdir(dir)]
if len(directories) == 0 and self.base_dir:
directories = [self.base_dir]
with NamedTemporaryFile(suffix=".png") as ntf:
subprocess.run(
[self.kollagen_path, "-r", *directories, "-o", ntf.name],
check=True,
capture_output=True,
)
ntf.seek(0)
update.message.reply_photo(ntf)
def tg_list_modes(self, update: Update, context: CallbackContext):
modes = subprocess.run(
[self.kollagen_path, "-m"], check=True, capture_output=True
)
update.message.reply_text(f"Available modes: {modes.stdout.decode('utf-8')}")
def tg_error(self, update: object, context: CallbackContext) -> None:
self.logger.error(
msg="Exception while handling an update:", exc_info=context.error
)
if isinstance(update, Update):
if isinstance(context.error, subprocess.CalledProcessError):
update.message.reply_text(f"Something is fucked!\n{context.error.stderr.decode('utf-8')}")
else:
update.message.reply_text(f"Something is fucked!\n{context.error}")
def start_idle(self):
self.updater.start_polling()
self.updater.idle()
def main() -> None:
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
tg_token = os.getenv("TG_TOKEN")
if not tg_token:
logging.error("TG_TOKEN is required.")
exit(1)
if env_kollagen_path := os.getenv("KOLLAGEN_PATH"):
if os.path.exists(env_kollagen_path):
kollagen_path = env_kollagen_path
else:
logging.error(f"kollagen not found! {env_kollagen_path} does not exist.")
exit(1)
else:
which = subprocess.run(["which", "kollagen"], capture_output=True)
try:
which.check_returncode()
kollagen_path = which.stdout.decode("utf-8").strip()
except subprocess.CalledProcessError:
logging.error(
"kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH."
)
exit(1)
bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR"))
bot.start_idle()
if __name__ == "__main__":
main()