2021-09-19 10:31:34 +02:00
|
|
|
from genericpath import isdir
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
from typing import Optional
|
|
|
|
import re
|
|
|
|
|
|
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
|
|
|
|
from telegram import Update, ForceReply
|
|
|
|
from telegram.ext import (
|
|
|
|
Updater,
|
|
|
|
CommandHandler,
|
|
|
|
DictPersistence,
|
|
|
|
CallbackContext,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class KollagenBot:
|
|
|
|
def __init__(
|
|
|
|
self, tg_token: str, kollagen_path: str, base_dir: Optional[str]
|
|
|
|
) -> None:
|
|
|
|
self.logger = logging.getLogger("kollagen")
|
|
|
|
self.kollagen_path = kollagen_path
|
|
|
|
self.base_dir = base_dir
|
|
|
|
|
2021-09-19 12:18:32 +02:00
|
|
|
self.updater = Updater(tg_token, persistence=DictPersistence(user_data_json="{}"))
|
2021-09-19 10:31:34 +02:00
|
|
|
|
|
|
|
dispatcher = self.updater.dispatcher
|
|
|
|
|
2021-09-19 11:48:29 +02:00
|
|
|
dispatcher.add_handler(CommandHandler("start", self.tg_start))
|
2021-09-19 10:31:34 +02:00
|
|
|
dispatcher.add_handler(CommandHandler("list_modes", self.tg_list_modes))
|
|
|
|
dispatcher.add_handler(CommandHandler("generate", self.tg_generate))
|
|
|
|
dispatcher.add_handler(CommandHandler("g", self.tg_generate))
|
|
|
|
dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate))
|
|
|
|
dispatcher.add_handler(CommandHandler("r", self.tg_regenerate))
|
|
|
|
dispatcher.add_error_handler(self.tg_error)
|
|
|
|
|
2021-09-19 11:48:29 +02:00
|
|
|
def tg_start(self, update: Update, context: CallbackContext):
|
|
|
|
update.message.reply_text("Hi! Check out https://gitlab.com/tmladek/kollagen")
|
|
|
|
self._process("", update)
|
|
|
|
|
2021-09-19 10:31:34 +02:00
|
|
|
def tg_generate(self, update: Update, context: CallbackContext):
|
|
|
|
cmd_line = update.message.text.split(" ")[1:]
|
|
|
|
self._process(cmd_line, update)
|
|
|
|
if context.user_data:
|
|
|
|
context.user_data["last_cmd_line"] = cmd_line
|
|
|
|
|
|
|
|
def tg_regenerate(self, update: Update, context: CallbackContext):
|
|
|
|
if context.user_data and context.user_data.get("last_cmd_line"):
|
|
|
|
self._process(context.user_data["last_cmd_line"], update)
|
|
|
|
else:
|
|
|
|
update.message.reply_text("No previous command to regenerate!")
|
|
|
|
|
|
|
|
def _process(self, cmd_line: str, update: Update):
|
2021-09-19 11:48:29 +02:00
|
|
|
self.logger.info(
|
|
|
|
f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`"
|
|
|
|
)
|
2021-09-19 10:31:34 +02:00
|
|
|
directories = [
|
|
|
|
os.path.join(self.base_dir or "./", re.sub(r"[^a-zA-Z0-9]", "", arg))
|
|
|
|
for arg in cmd_line
|
|
|
|
]
|
|
|
|
directories = [dir for dir in directories if os.path.isdir(dir)]
|
|
|
|
if len(directories) == 0 and self.base_dir:
|
|
|
|
directories = [self.base_dir]
|
|
|
|
with NamedTemporaryFile(suffix=".png") as ntf:
|
|
|
|
subprocess.run(
|
|
|
|
[self.kollagen_path, "-r", *directories, "-o", ntf.name],
|
|
|
|
check=True,
|
|
|
|
capture_output=True,
|
|
|
|
)
|
|
|
|
ntf.seek(0)
|
|
|
|
update.message.reply_photo(ntf)
|
|
|
|
|
|
|
|
def tg_list_modes(self, update: Update, context: CallbackContext):
|
|
|
|
modes = subprocess.run(
|
|
|
|
[self.kollagen_path, "-m"], check=True, capture_output=True
|
|
|
|
)
|
|
|
|
update.message.reply_text(f"Available modes: {modes.stdout.decode('utf-8')}")
|
|
|
|
|
|
|
|
def tg_error(self, update: object, context: CallbackContext) -> None:
|
|
|
|
self.logger.error(
|
|
|
|
msg="Exception while handling an update:", exc_info=context.error
|
|
|
|
)
|
|
|
|
|
2021-09-19 11:30:17 +02:00
|
|
|
if isinstance(update, Update):
|
|
|
|
if isinstance(context.error, subprocess.CalledProcessError):
|
2021-09-19 11:48:29 +02:00
|
|
|
update.message.reply_text(
|
|
|
|
f"Something is fucked!\n{context.error.stderr.decode('utf-8')}"
|
|
|
|
)
|
2021-09-19 11:30:17 +02:00
|
|
|
else:
|
|
|
|
update.message.reply_text(f"Something is fucked!\n{context.error}")
|
2021-09-19 10:31:34 +02:00
|
|
|
|
|
|
|
def start_idle(self):
|
|
|
|
self.updater.start_polling()
|
|
|
|
self.updater.idle()
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
|
logging.basicConfig(
|
|
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
|
|
level=logging.INFO,
|
|
|
|
)
|
|
|
|
|
|
|
|
tg_token = os.getenv("TG_TOKEN")
|
|
|
|
if not tg_token:
|
|
|
|
logging.error("TG_TOKEN is required.")
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
if env_kollagen_path := os.getenv("KOLLAGEN_PATH"):
|
|
|
|
if os.path.exists(env_kollagen_path):
|
|
|
|
kollagen_path = env_kollagen_path
|
|
|
|
else:
|
|
|
|
logging.error(f"kollagen not found! {env_kollagen_path} does not exist.")
|
|
|
|
exit(1)
|
|
|
|
else:
|
|
|
|
which = subprocess.run(["which", "kollagen"], capture_output=True)
|
|
|
|
try:
|
|
|
|
which.check_returncode()
|
|
|
|
kollagen_path = which.stdout.decode("utf-8").strip()
|
|
|
|
except subprocess.CalledProcessError:
|
|
|
|
logging.error(
|
|
|
|
"kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH."
|
|
|
|
)
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR"))
|
|
|
|
bot.start_idle()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|