kollagen/tgbot/kollagen-bot/main.py

152 lines
5.6 KiB
Python

from genericpath import isdir
import logging
import os
import subprocess
from typing import Optional
import re
from tempfile import NamedTemporaryFile
from telegram import Update, ForceReply
from telegram.ext import (
Updater,
CommandHandler,
DictPersistence,
CallbackContext,
)
def start(update: Update, context: CallbackContext) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
update.message.reply_markdown_v2(
fr"Hi {user.mention_markdown_v2()}\!",
reply_markup=ForceReply(selective=True),
)
class KollagenBot:
def __init__(
self, tg_token: str, kollagen_path: str, base_dir: Optional[str]
) -> None:
self.logger = logging.getLogger("kollagen")
self.kollagen_path = kollagen_path
self.base_dir = base_dir
self.updater = Updater(tg_token, persistence=DictPersistence())
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("list_modes", self.tg_list_modes))
dispatcher.add_handler(CommandHandler("generate", self.tg_generate))
dispatcher.add_handler(CommandHandler("g", self.tg_generate))
dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate))
dispatcher.add_handler(CommandHandler("r", self.tg_regenerate))
dispatcher.add_error_handler(self.tg_error)
def tg_generate(self, update: Update, context: CallbackContext):
cmd_line = update.message.text.split(" ")[1:]
self._process(cmd_line, update)
if context.user_data:
context.user_data["last_cmd_line"] = cmd_line
def tg_regenerate(self, update: Update, context: CallbackContext):
if context.user_data and context.user_data.get("last_cmd_line"):
self._process(context.user_data["last_cmd_line"], update)
else:
update.message.reply_text("No previous command to regenerate!")
def _process(self, cmd_line: str, update: Update):
self.logger.info(f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`")
directories = [
os.path.join(self.base_dir or "./", re.sub(r"[^a-zA-Z0-9]", "", arg))
for arg in cmd_line
]
directories = [dir for dir in directories if os.path.isdir(dir)]
if len(directories) == 0 and self.base_dir:
directories = [self.base_dir]
with NamedTemporaryFile(suffix=".png") as ntf:
subprocess.run(
[self.kollagen_path, "-r", *directories, "-o", ntf.name],
check=True,
capture_output=True,
)
ntf.seek(0)
update.message.reply_photo(ntf)
def tg_list_modes(self, update: Update, context: CallbackContext):
modes = subprocess.run(
[self.kollagen_path, "-m"], check=True, capture_output=True
)
update.message.reply_text(f"Available modes: {modes.stdout.decode('utf-8')}")
def tg_error(self, update: object, context: CallbackContext) -> None:
self.logger.error(
msg="Exception while handling an update:", exc_info=context.error
)
# # traceback.format_exception returns the usual python message about an exception, but as a
# # list of strings rather than a single string, so we have to join them together.
# tb_list = traceback.format_exception(
# None, context.error, context.error.__traceback__
# )
# tb_string = "".join(tb_list)
# # Build the message with some markup and additional information about what happened.
# # You might need to add some logic to deal with messages longer than the 4096 character limit.
# update_str = update.to_dict() if isinstance(update, Update) else str(update)
# message = (
# f"An exception was raised while handling an update\n"
# f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
# "</pre>\n\n"
# f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
# f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
# f"<pre>{html.escape(tb_string)}</pre>"
# )
# # Finally, send the message
# context.bot.send_message(
# chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML
# )
def start_idle(self):
self.updater.start_polling()
self.updater.idle()
def main() -> None:
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
tg_token = os.getenv("TG_TOKEN")
if not tg_token:
logging.error("TG_TOKEN is required.")
exit(1)
if env_kollagen_path := os.getenv("KOLLAGEN_PATH"):
if os.path.exists(env_kollagen_path):
kollagen_path = env_kollagen_path
else:
logging.error(f"kollagen not found! {env_kollagen_path} does not exist.")
exit(1)
else:
which = subprocess.run(["which", "kollagen"], capture_output=True)
try:
which.check_returncode()
kollagen_path = which.stdout.decode("utf-8").strip()
except subprocess.CalledProcessError:
logging.error(
"kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH."
)
exit(1)
bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR"))
bot.start_idle()
if __name__ == "__main__":
main()