kollagen/tgbot/kollagen-bot/main.py

271 lines
9.1 KiB
Python
Raw Permalink Normal View History

2021-09-19 10:31:34 +02:00
import logging
import os
import subprocess
2021-09-19 13:19:00 +02:00
from typing import List, Optional
2021-09-19 10:31:34 +02:00
from tempfile import NamedTemporaryFile
2021-09-19 13:19:00 +02:00
from telegram import Update
2021-09-19 10:31:34 +02:00
from telegram.ext import (
Updater,
CommandHandler,
DictPersistence,
CallbackContext,
)
2021-09-19 13:19:00 +02:00
from telegram.parsemode import ParseMode
from parser import SafeArgumentParser, safe_str
2021-09-19 10:31:34 +02:00
class KollagenBot:
def __init__(
self, tg_token: str, kollagen_path: str, base_dir: Optional[str]
) -> None:
self.logger = logging.getLogger("kollagen")
self.kollagen_path = kollagen_path
self.base_dir = os.path.abspath(base_dir) if base_dir else None
2021-09-19 10:31:34 +02:00
2021-09-19 13:19:00 +02:00
self._init_parser()
self.updater = Updater(
tg_token, persistence=DictPersistence(user_data_json="{}")
)
2021-09-19 10:31:34 +02:00
dispatcher = self.updater.dispatcher
2021-09-19 11:48:29 +02:00
dispatcher.add_handler(CommandHandler("start", self.tg_start))
2021-09-19 13:19:00 +02:00
dispatcher.add_handler(CommandHandler("help", self.tg_help))
2021-09-19 10:31:34 +02:00
dispatcher.add_handler(CommandHandler("generate", self.tg_generate))
dispatcher.add_handler(CommandHandler("g", self.tg_generate))
dispatcher.add_handler(CommandHandler("regenerate", self.tg_regenerate))
dispatcher.add_handler(CommandHandler("r", self.tg_regenerate))
dispatcher.add_error_handler(self.tg_error)
2021-09-19 13:19:00 +02:00
def _init_parser(self):
parser = SafeArgumentParser(prog="/generate", add_help=False)
parser.add_argument(
"directories",
metavar="path",
2021-09-19 13:19:00 +02:00
type=safe_str,
nargs="*",
default=[self.base_dir] if self.base_dir else [],
help="Directories or files to process. By default, the entire base directory is processed.",
2021-09-19 13:19:00 +02:00
)
parser.add_argument(
"-m",
dest="mode",
metavar="mode",
2021-09-20 23:05:24 +02:00
type=safe_str,
2021-09-19 13:19:00 +02:00
nargs="?",
const=True,
help=f"Collage modes to use. By default, one is chosen at random. Multiple modes can be specified, separated by commas. When no value is specified, all modes are listed.",
2021-09-19 13:19:00 +02:00
)
parser.add_argument(
"-n",
dest="num_images",
metavar="N",
type=int,
help=f"How many images to use in a single collage. Random (or collage-dependant) by default.",
2021-09-19 13:19:00 +02:00
)
parser.add_argument(
"-w",
dest="width",
type=int,
default=640,
help=f"Width of resulting output (in px). 640px by default.",
)
parser.add_argument(
"-h",
dest="height",
type=int,
default=640,
help=f"Height of resulting output (in px). 640px by default.",
)
2021-09-20 23:05:24 +02:00
parser.add_argument(
"--rm",
dest="recursive_modes",
type=safe_str,
help=f"Collage modes (comma-separated) to use in a recursive collage. All by default.",
2021-09-20 23:05:24 +02:00
)
parser.add_argument(
"--rl",
dest="recursive_level",
type=int,
2021-09-20 23:06:11 +02:00
default=2,
help=f"Level/depth of recursive collage. 2 by default.",
2021-09-20 23:05:24 +02:00
)
parser.add_argument(
"--rr",
dest="recursive_repeat",
action="store_true",
help=f"Allow repeating images in (different levels of) recursive collages. False by default.",
2021-09-20 23:05:24 +02:00
)
2021-09-19 13:19:00 +02:00
self.parser = parser
def _get_modes(self):
modes = subprocess.run(
[self.kollagen_path, "-m"], check=True, capture_output=True
)
return modes.stdout.decode("utf-8").strip().split(", ")
2021-09-19 11:48:29 +02:00
def tg_start(self, update: Update, context: CallbackContext):
update.message.reply_text("Hi! I make random collages. Check out https://gitlab.com/tmladek/kollagen and /help. Here's one to get you started:")
2021-09-19 13:19:00 +02:00
self._process([], update)
2021-09-19 11:48:29 +02:00
2021-09-19 10:31:34 +02:00
def tg_generate(self, update: Update, context: CallbackContext):
cmd_line = update.message.text.split(" ")[1:]
2021-09-19 13:19:00 +02:00
success = self._process(cmd_line, update)
if success and context.user_data is not None:
2021-09-19 10:31:34 +02:00
context.user_data["last_cmd_line"] = cmd_line
def tg_regenerate(self, update: Update, context: CallbackContext):
if context.user_data and context.user_data.get("last_cmd_line"):
self._process(context.user_data["last_cmd_line"], update)
else:
update.message.reply_text("No previous command to regenerate!")
2021-09-19 13:19:00 +02:00
def _process(self, cmd_line: List[str], update: Update):
2021-09-19 11:48:29 +02:00
self.logger.info(
f"Generating from {update.effective_user}, with cmd_line: `{cmd_line}`"
)
2021-09-19 13:19:00 +02:00
args = self.parser.parse_args(cmd_line)
if args.mode is True:
update.message.reply_text(
f"Available modes: {', '.join(self._get_modes())}"
)
return False
2021-09-19 13:47:52 +02:00
directories = []
for dir in args.directories:
for possible in [
os.path.join(self.base_dir or "./", dir),
os.path.join(self.base_dir or "./", dir.upper()),
]:
if os.path.exists(possible):
directories.append(possible)
break
else:
raise FileNotFoundError(f'"{dir}" does not exist.')
2021-09-19 13:41:38 +02:00
2021-09-19 13:19:00 +02:00
mode = ["-m", args.mode] if args.mode else []
2021-09-19 13:36:31 +02:00
num_images = ["-n", str(args.num_images)] if args.num_images else []
recursive_level = (
["--rl", str(args.recursive_level)] if args.recursive_level else []
)
2021-09-20 23:05:24 +02:00
recursive_repeat = ["--rr"] if args.recursive_repeat else []
recursive_modes = (
["--rm", str(args.recursive_modes)] if args.recursive_modes else []
)
2021-09-19 13:19:00 +02:00
2021-09-19 10:31:34 +02:00
with NamedTemporaryFile(suffix=".png") as ntf:
2021-09-19 13:32:50 +02:00
shell_cmd_line = [
self.kollagen_path,
2021-09-19 13:47:52 +02:00
*directories,
2021-09-19 13:32:50 +02:00
"-w",
2021-09-19 13:34:29 +02:00
str(args.width),
2021-09-19 13:32:50 +02:00
"-h",
2021-09-19 13:34:29 +02:00
str(args.height),
2021-09-19 13:32:50 +02:00
*mode,
*num_images,
2021-09-20 23:05:24 +02:00
*recursive_level,
*recursive_repeat,
*recursive_modes,
2021-09-19 13:32:50 +02:00
"-o",
ntf.name,
]
2021-09-19 13:32:50 +02:00
self.logger.debug(f"Running: " + str(shell_cmd_line))
result = subprocess.run(
2021-09-19 13:32:50 +02:00
shell_cmd_line,
2021-09-19 10:31:34 +02:00
check=True,
capture_output=True,
timeout=60,
env={
'NO_COLOR': "1"
}
2021-09-19 10:31:34 +02:00
)
ntf.seek(0)
used_line = next(
(
line
for line in result.stdout.decode("utf-8").splitlines()
if line.startswith("Used: ")
),
"",
).replace(f"{self.base_dir}/" if self.base_dir else "", "")
caption = ""
caption += (
f"`{' '.join(['/generate', *cmd_line])}`\n" if len(cmd_line) else ""
)
caption += used_line.replace("_", "\\_")
caption = caption[:200]
update.message.reply_photo(
ntf,
caption=caption,
parse_mode=ParseMode.MARKDOWN,
)
2021-09-19 13:19:00 +02:00
return True
2021-09-19 10:31:34 +02:00
2021-09-19 13:19:00 +02:00
def tg_help(self, update: Update, context: CallbackContext):
update.message.reply_text(
f"```{self.parser.format_help()}```", parse_mode=ParseMode.MARKDOWN
2021-09-19 10:31:34 +02:00
)
def tg_error(self, update: object, context: CallbackContext) -> None:
self.logger.error(
msg="Exception while handling an update:", exc_info=context.error
)
2021-09-19 11:30:17 +02:00
if isinstance(update, Update):
if isinstance(context.error, subprocess.CalledProcessError):
error_display = context.error.stderr.decode('utf-8')
2021-09-19 11:30:17 +02:00
else:
error_display = str(context.error)
error_display = error_display[:2500]
update.message.reply_text(
f"Something is fucked!\n{error_display}"
)
2021-09-19 10:31:34 +02:00
def start_idle(self):
self.updater.start_polling()
self.updater.idle()
def main() -> None:
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
2021-09-19 13:19:00 +02:00
level=getattr(logging, os.getenv("LOG_LEVEL", "info").upper()),
2021-09-19 10:31:34 +02:00
)
tg_token = os.getenv("TG_TOKEN")
if not tg_token:
logging.error("TG_TOKEN is required.")
exit(1)
if env_kollagen_path := os.getenv("KOLLAGEN_PATH"):
if os.path.exists(env_kollagen_path):
kollagen_path = env_kollagen_path
else:
logging.error(f"kollagen not found! {env_kollagen_path} does not exist.")
exit(1)
else:
which = subprocess.run(["which", "kollagen"], capture_output=True)
try:
which.check_returncode()
kollagen_path = which.stdout.decode("utf-8").strip()
except subprocess.CalledProcessError:
logging.error(
"kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn't in $PATH."
)
exit(1)
bot = KollagenBot(tg_token, kollagen_path, os.getenv("BASE_DIR"))
bot.start_idle()
if __name__ == "__main__":
main()