kollagen/tgbot/kollagen-bot/main.py

214 lines
7.0 KiB
Python

from genericpath import isdir
import logging
import os
import subprocess
from typing import List, Optional
import re
from tempfile import NamedTemporaryFile
from telegram import Update
from telegram.ext import (
Updater,
CommandHandler,
DictPersistence,
CallbackContext,
defaults,
)
from telegram.parsemode import ParseMode
from parser import SafeArgumentParser, safe_str
class KollagenBot:
def __init__(
self, tg_token: str, kollagen_path: str, base_dir: Optional[str]
) -> None:
self.logger = logging.getLogger("kollagen")
self.kollagen_path = kollagen_path
self.base_dir = base_dir
self._init_parser()
self.updater = Updater(
tg_token, persistence=DictPersistence(user_data_json="{}")
)
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler("start", self.tg_start))
dispatcher.add_handler(CommandHandler("help", self.tg_help))
dispatcher.add_handler(CommandHandler("generate", self.tg_generate))
dispatcher.add_handler(CommandHandler("g", self.tg_generate))
dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate))
dispatcher.add_handler(CommandHandler("r", self.tg_regenerate))
dispatcher.add_error_handler(self.tg_error)
def _init_parser(self):
parser = SafeArgumentParser(prog="/generate", add_help=False)
parser.add_argument(
"directories",
metavar="dir",
type=safe_str,
nargs="*",
default=[self.base_dir] if self.base_dir else [],
help="Directories to process. By default, the entire directory is processed.",
)
parser.add_argument(
"-m",
dest="mode",
metavar="mode",
choices=self._get_modes(),
nargs="?",
const=True,
help=f"Which collage mode to use. By default, one is chosen at random. When no value is specified, all modes are listed.",
)
parser.add_argument(
"-n",
dest="num_images",
metavar="N",
type=int,
help=f"How many images to have in a collage. Random (2<n<5) by default.",
)
parser.add_argument(
"-w",
dest="width",
type=int,
default=640,
help=f"Width of resulting output (in px). 640px by default.",
)
parser.add_argument(
"-h",
dest="height",
type=int,
default=640,
help=f"Height of resulting output (in px). 640px by default.",
)
self.parser = parser
def _get_modes(self):
modes = subprocess.run(
[self.kollagen_path, "-m"], check=True, capture_output=True
)
return modes.stdout.decode("utf-8").strip().split(", ")
def tg_start(self, update: Update, context: CallbackContext):
update.message.reply_text("Hi! Check out https://gitlab.com/tmladek/kollagen")
self._process([], update)
def tg_generate(self, update: Update, context: CallbackContext):
cmd_line = update.message.text.split(" ")[1:]
success = self._process(cmd_line, update)
if success and context.user_data is not None:
context.user_data["last_cmd_line"] = cmd_line
def tg_regenerate(self, update: Update, context: CallbackContext):
if context.user_data and context.user_data.get("last_cmd_line"):
self._process(context.user_data["last_cmd_line"], update)
else:
update.message.reply_text("No previous command to regenerate!")
def _process(self, cmd_line: List[str], update: Update):
self.logger.info(
f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`"
)
args = self.parser.parse_args(cmd_line)
if args.mode is True:
update.message.reply_text(
f"Available modes: {', '.join(self._get_modes())}"
)
return False
directories = [
os.path.join(self.base_dir or "./", dir) for dir in args.directories
]
mode = ["-m", args.mode] if args.mode else []
num_images = ["-n", args.num_images] if args.num_images else []
with NamedTemporaryFile(suffix=".png") as ntf:
self.logger.debug(
f"Running: "
+ str(
[
self.kollagen_path,
"-r",
*directories,
"-w",
args.width,
"-h",
args.height,
*mode,
"-o",
ntf.name,
]
)
)
subprocess.run(
[self.kollagen_path, "-r", *directories, *mode, "-o", ntf.name],
check=True,
capture_output=True,
)
ntf.seek(0)
update.message.reply_photo(ntf)
return True
def tg_help(self, update: Update, context: CallbackContext):
update.message.reply_text(
f"```{self.parser.format_help()}```", parse_mode=ParseMode.MARKDOWN
)
def tg_error(self, update: object, context: CallbackContext) -> None:
self.logger.error(
msg="Exception while handling an update:", exc_info=context.error
)
if isinstance(update, Update):
if isinstance(context.error, subprocess.CalledProcessError):
update.message.reply_text(
f"Something is fucked!\n{context.error.stderr.decode('utf-8')}"
)
else:
update.message.reply_text(f"Something is fucked!\n{context.error}")
def start_idle(self):
self.updater.start_polling()
self.updater.idle()
def main() -> None:
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=getattr(logging, os.getenv("LOG_LEVEL", "info").upper()),
)
tg_token = os.getenv("TG_TOKEN")
if not tg_token:
logging.error("TG_TOKEN is required.")
exit(1)
if env_kollagen_path := os.getenv("KOLLAGEN_PATH"):
if os.path.exists(env_kollagen_path):
kollagen_path = env_kollagen_path
else:
logging.error(f"kollagen not found! {env_kollagen_path} does not exist.")
exit(1)
else:
which = subprocess.run(["which", "kollagen"], capture_output=True)
try:
which.check_returncode()
kollagen_path = which.stdout.decode("utf-8").strip()
except subprocess.CalledProcessError:
logging.error(
"kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH."
)
exit(1)
bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR"))
bot.start_idle()
if __name__ == "__main__":
main()