2021-09-19 10:31:34 +02:00
import logging
import os
import subprocess
2021-09-19 13:19:00 +02:00
from typing import List , Optional
2021-09-19 10:31:34 +02:00
from tempfile import NamedTemporaryFile
2021-09-19 13:19:00 +02:00
from telegram import Update
2021-09-19 10:31:34 +02:00
from telegram . ext import (
Updater ,
CommandHandler ,
DictPersistence ,
CallbackContext ,
)
2021-09-19 13:19:00 +02:00
from telegram . parsemode import ParseMode
from parser import SafeArgumentParser , safe_str
2021-09-19 10:31:34 +02:00
class KollagenBot :
def __init__ (
self , tg_token : str , kollagen_path : str , base_dir : Optional [ str ]
) - > None :
self . logger = logging . getLogger ( " kollagen " )
self . kollagen_path = kollagen_path
2021-09-22 23:55:48 +02:00
self . base_dir = os . path . abspath ( base_dir ) if base_dir else None
2021-09-19 10:31:34 +02:00
2021-09-19 13:19:00 +02:00
self . _init_parser ( )
self . updater = Updater (
tg_token , persistence = DictPersistence ( user_data_json = " {} " )
)
2021-09-19 10:31:34 +02:00
dispatcher = self . updater . dispatcher
2021-09-19 11:48:29 +02:00
dispatcher . add_handler ( CommandHandler ( " start " , self . tg_start ) )
2021-09-19 13:19:00 +02:00
dispatcher . add_handler ( CommandHandler ( " help " , self . tg_help ) )
2021-09-19 10:31:34 +02:00
dispatcher . add_handler ( CommandHandler ( " generate " , self . tg_generate ) )
dispatcher . add_handler ( CommandHandler ( " g " , self . tg_generate ) )
dispatcher . add_handler ( CommandHandler ( " regenerate " , self . tg_regenerate ) )
dispatcher . add_handler ( CommandHandler ( " r " , self . tg_regenerate ) )
dispatcher . add_error_handler ( self . tg_error )
2021-09-19 13:19:00 +02:00
def _init_parser ( self ) :
parser = SafeArgumentParser ( prog = " /generate " , add_help = False )
parser . add_argument (
" directories " ,
2021-09-22 23:56:42 +02:00
metavar = " path " ,
2021-09-19 13:19:00 +02:00
type = safe_str ,
nargs = " * " ,
default = [ self . base_dir ] if self . base_dir else [ ] ,
2021-09-22 23:56:42 +02:00
help = " Directories or files to process. By default, the entire base directory is processed. " ,
2021-09-19 13:19:00 +02:00
)
parser . add_argument (
" -m " ,
dest = " mode " ,
metavar = " mode " ,
2021-09-20 23:05:24 +02:00
type = safe_str ,
2021-09-19 13:19:00 +02:00
nargs = " ? " ,
const = True ,
2021-09-22 23:56:42 +02:00
help = f " Collage modes to use. By default, one is chosen at random. Multiple modes can be specified, separated by commas. When no value is specified, all modes are listed. " ,
2021-09-19 13:19:00 +02:00
)
parser . add_argument (
" -n " ,
dest = " num_images " ,
metavar = " N " ,
type = int ,
2021-09-22 23:56:42 +02:00
help = f " How many images to use in a single collage. Random (or collage-dependant) by default. " ,
2021-09-19 13:19:00 +02:00
)
parser . add_argument (
" -w " ,
dest = " width " ,
type = int ,
default = 640 ,
help = f " Width of resulting output (in px). 640px by default. " ,
)
parser . add_argument (
" -h " ,
dest = " height " ,
type = int ,
default = 640 ,
help = f " Height of resulting output (in px). 640px by default. " ,
)
2021-09-20 23:05:24 +02:00
parser . add_argument (
" --rm " ,
dest = " recursive_modes " ,
type = safe_str ,
2021-09-22 23:56:42 +02:00
help = f " Collage modes (comma-separated) to use in a recursive collage. All by default. " ,
2021-09-20 23:05:24 +02:00
)
parser . add_argument (
" --rl " ,
dest = " recursive_level " ,
type = int ,
2021-09-20 23:06:11 +02:00
default = 2 ,
help = f " Level/depth of recursive collage. 2 by default. " ,
2021-09-20 23:05:24 +02:00
)
parser . add_argument (
" --rr " ,
dest = " recursive_repeat " ,
2021-09-22 23:56:42 +02:00
action = " store_true " ,
help = f " Allow repeating images in (different levels of) recursive collages. False by default. " ,
2021-09-20 23:05:24 +02:00
)
2021-09-19 13:19:00 +02:00
self . parser = parser
def _get_modes ( self ) :
modes = subprocess . run (
[ self . kollagen_path , " -m " ] , check = True , capture_output = True
)
return modes . stdout . decode ( " utf-8 " ) . strip ( ) . split ( " , " )
2021-09-19 11:48:29 +02:00
def tg_start ( self , update : Update , context : CallbackContext ) :
2021-09-22 23:56:42 +02:00
update . message . reply_text ( " Hi! I make random collages. Check out https://gitlab.com/tmladek/kollagen and /help. Here ' s one to get you started: " )
2021-09-19 13:19:00 +02:00
self . _process ( [ ] , update )
2021-09-19 11:48:29 +02:00
2021-09-19 10:31:34 +02:00
def tg_generate ( self , update : Update , context : CallbackContext ) :
cmd_line = update . message . text . split ( " " ) [ 1 : ]
2021-09-19 13:19:00 +02:00
success = self . _process ( cmd_line , update )
if success and context . user_data is not None :
2021-09-19 10:31:34 +02:00
context . user_data [ " last_cmd_line " ] = cmd_line
def tg_regenerate ( self , update : Update , context : CallbackContext ) :
if context . user_data and context . user_data . get ( " last_cmd_line " ) :
self . _process ( context . user_data [ " last_cmd_line " ] , update )
else :
update . message . reply_text ( " No previous command to regenerate! " )
2021-09-19 13:19:00 +02:00
def _process ( self , cmd_line : List [ str ] , update : Update ) :
2021-09-19 11:48:29 +02:00
self . logger . info (
f " Generating from { update . effective_user } , with cmd_line: ` { cmd_line } ` "
)
2021-09-19 13:19:00 +02:00
args = self . parser . parse_args ( cmd_line )
if args . mode is True :
update . message . reply_text (
f " Available modes: { ' , ' . join ( self . _get_modes ( ) ) } "
)
return False
2021-09-19 13:47:52 +02:00
directories = [ ]
for dir in args . directories :
for possible in [
os . path . join ( self . base_dir or " ./ " , dir ) ,
os . path . join ( self . base_dir or " ./ " , dir . upper ( ) ) ,
] :
if os . path . exists ( possible ) :
directories . append ( possible )
break
else :
raise FileNotFoundError ( f ' " { dir } " does not exist. ' )
2021-09-19 13:41:38 +02:00
2021-09-19 13:19:00 +02:00
mode = [ " -m " , args . mode ] if args . mode else [ ]
2021-09-19 13:36:31 +02:00
num_images = [ " -n " , str ( args . num_images ) ] if args . num_images else [ ]
2021-09-22 23:56:42 +02:00
recursive_level = (
[ " --rl " , str ( args . recursive_level ) ] if args . recursive_level else [ ]
)
2021-09-20 23:05:24 +02:00
recursive_repeat = [ " --rr " ] if args . recursive_repeat else [ ]
2021-09-22 23:56:42 +02:00
recursive_modes = (
[ " --rm " , str ( args . recursive_modes ) ] if args . recursive_modes else [ ]
)
2021-09-19 13:19:00 +02:00
2021-09-19 10:31:34 +02:00
with NamedTemporaryFile ( suffix = " .png " ) as ntf :
2021-09-19 13:32:50 +02:00
shell_cmd_line = [
self . kollagen_path ,
2021-09-19 13:47:52 +02:00
* directories ,
2021-09-19 13:32:50 +02:00
" -w " ,
2021-09-19 13:34:29 +02:00
str ( args . width ) ,
2021-09-19 13:32:50 +02:00
" -h " ,
2021-09-19 13:34:29 +02:00
str ( args . height ) ,
2021-09-19 13:32:50 +02:00
* mode ,
* num_images ,
2021-09-20 23:05:24 +02:00
* recursive_level ,
* recursive_repeat ,
* recursive_modes ,
2021-09-19 13:32:50 +02:00
" -o " ,
ntf . name ,
]
2021-09-22 23:55:48 +02:00
2021-09-19 13:32:50 +02:00
self . logger . debug ( f " Running: " + str ( shell_cmd_line ) )
2021-09-22 23:55:48 +02:00
result = subprocess . run (
2021-09-19 13:32:50 +02:00
shell_cmd_line ,
2021-09-19 10:31:34 +02:00
check = True ,
capture_output = True ,
2021-09-22 23:57:06 +02:00
timeout = 60 ,
2021-09-22 23:57:42 +02:00
env = {
' NO_COLOR ' : " 1 "
}
2021-09-19 10:31:34 +02:00
)
ntf . seek ( 0 )
2021-09-22 23:55:48 +02:00
used_line = next (
(
line
for line in result . stdout . decode ( " utf-8 " ) . splitlines ( )
if line . startswith ( " Used: " )
) ,
" " ,
) . replace ( f " { self . base_dir } / " if self . base_dir else " " , " " )
caption = " "
caption + = (
f " ` { ' ' . join ( [ ' /generate ' , * cmd_line ] ) } ` \n " if len ( cmd_line ) else " "
)
caption + = used_line . replace ( " _ " , " \\ _ " )
caption = caption [ : 200 ]
update . message . reply_photo (
ntf ,
caption = caption ,
parse_mode = ParseMode . MARKDOWN ,
)
2021-09-19 13:19:00 +02:00
return True
2021-09-19 10:31:34 +02:00
2021-09-19 13:19:00 +02:00
def tg_help ( self , update : Update , context : CallbackContext ) :
update . message . reply_text (
f " ``` { self . parser . format_help ( ) } ``` " , parse_mode = ParseMode . MARKDOWN
2021-09-19 10:31:34 +02:00
)
def tg_error ( self , update : object , context : CallbackContext ) - > None :
self . logger . error (
msg = " Exception while handling an update: " , exc_info = context . error
)
2021-09-19 11:30:17 +02:00
if isinstance ( update , Update ) :
if isinstance ( context . error , subprocess . CalledProcessError ) :
2021-09-22 23:57:42 +02:00
error_display = context . error . stderr . decode ( ' utf-8 ' )
2021-09-19 11:30:17 +02:00
else :
2021-09-22 23:57:42 +02:00
error_display = str ( context . error )
error_display = error_display [ : 2500 ]
update . message . reply_text (
f " Something is fucked! \n { error_display } "
)
2021-09-19 10:31:34 +02:00
def start_idle ( self ) :
self . updater . start_polling ( )
self . updater . idle ( )
def main ( ) - > None :
logging . basicConfig (
format = " %(asctime)s - %(name)s - %(levelname)s - %(message)s " ,
2021-09-19 13:19:00 +02:00
level = getattr ( logging , os . getenv ( " LOG_LEVEL " , " info " ) . upper ( ) ) ,
2021-09-19 10:31:34 +02:00
)
tg_token = os . getenv ( " TG_TOKEN " )
if not tg_token :
logging . error ( " TG_TOKEN is required. " )
exit ( 1 )
if env_kollagen_path := os . getenv ( " KOLLAGEN_PATH " ) :
if os . path . exists ( env_kollagen_path ) :
kollagen_path = env_kollagen_path
else :
logging . error ( f " kollagen not found! { env_kollagen_path } does not exist. " )
exit ( 1 )
else :
which = subprocess . run ( [ " which " , " kollagen " ] , capture_output = True )
try :
which . check_returncode ( )
kollagen_path = which . stdout . decode ( " utf-8 " ) . strip ( )
except subprocess . CalledProcessError :
logging . error (
" kollagen not found! KOLLAGEN_PATH not specified and `kollagen` isn ' t in $PATH. "
)
exit ( 1 )
bot = KollagenBot ( tg_token , kollagen_path , os . getenv ( " BASE_DIR " ) )
bot . start_idle ( )
if __name__ == " __main__ " :
main ( )