#!/usr/bin/env python3 import errno import logging from markov import MarkovBlabberer import os import pprint import re import shutil import sqlite3 import subprocess import sys import tempfile import unicodedata from configparser import ConfigParser, NoSectionError from datetime import datetime, timedelta from glob import glob from operator import itemgetter from random import random from sqlite3.dbapi2 import Connection from time import sleep from typing import Any, List, Optional, Tuple, cast from util import datestr, mkdir_p import acoustid import filetype import markovify import mutagen.id3 import pytumblr import requests import telegram import youtube_dl from _typeshed import StrPath from markovify.text import Text from mutagen import File, FileType from mutagen.easyid3 import EasyID3 from telegram.ext import CommandHandler, MessageHandler, Updater from youtube_dl import DownloadError from youtube_dl.version import __version__ as YTDL_VERSION class DelojzaDB: def __init__(self, db_path): self.db_path = db_path self.db: Optional[Connection] = None def initialize(self): if self.db is None: self.db = sqlite3.connect(self.db_path) def get_protected_tags(self): if self.db is None: raise RuntimeError("Database not initialized!") results = self.db.execute("SELECT tag FROM tags WHERE protected == 1") return [res[0] for res in results.fetchall()] def get_protected_chats(self): if self.db is None: raise RuntimeError("Database not initialized!") results = self.db.execute("SELECT id FROM chats WHERE protected == 1") return [res[0] for res in results.fetchall()] def get_chat(self, id): if self.db is None: raise RuntimeError("Database not initialized!") return self.db.execute( "SELECT id, protected FROM chats WHERE id == ?", (id,) ).fetchone() def set_chat_protected(self, id: str, protected: bool): if self.db is None: raise RuntimeError("Database not initialized!") chat_in_db = self.get_chat(id) if chat_in_db: self.db.execute( "UPDATE chats SET protected = ? WHERE id = ?", (protected, id) ) else: self.db.execute( "INSERT INTO chats (id, protected) VALUES (?, ?)", (id, protected) ) self.db.commit() def get_tag(self, tag: str): if self.db is None: raise RuntimeError("Database not initialized!") return self.db.execute( "SELECT id, tag, protected FROM tags WHERE tag == ?", (tag,) ).fetchone() def set_tag_protected(self, tag: str, protected: bool): if self.db is None: raise RuntimeError("Database not initialized!") tag_in_db = self.get_tag(tag) if tag_in_db: self.db.execute( "UPDATE tags SET protected = ? WHERE tag = ?", (protected, tag) ) else: self.db.execute( "INSERT INTO tags (tag, protected) VALUES (?, ?)", (tag, protected) ) self.db.commit() class DelojzaBot: def __init__( self, tg_api_key: str, out_dir: StrPath, redirects: Optional[List[Tuple[str, str]]] = None, tmp_dir: Optional[StrPath] = None, db_path: Optional[StrPath] = None, protected_password: Optional[str] = None, acoustid_key: Optional[str] = None, tumblr_name: Optional[str] = None, tumblr_keys: Optional[Tuple[str, str, str, str]] = None, markov: Optional[MarkovBlabberer] = None, ): self._setup_logging(os.path.dirname(os.path.realpath(__file__))) self.db = DelojzaDB( db_path or os.path.join(os.path.dirname(os.path.realpath(__file__)), "delojza.db") ) self.out_dir = os.path.abspath(out_dir) self.out_dir = self.out_dir[:-1] if self.out_dir[-1] == "/" else self.out_dir self.logger.debug(f"OUT_DIR: {out_dir}") self.tmp_dir = tmp_dir if tmp_dir else tempfile.gettempdir() self.logger.debug(f"TMP_DIR: {tmp_dir}") self.markov = markov self.redirects = {} if redirects is not None: for hashtag, directory in redirects: hashtag = hashtag.upper() directory = str(directory) directory = directory[:-1] if directory[-1] == "/" else directory mkdir_p(directory) self.redirects[hashtag] = directory self.logger.debug(f"Will redirect hashtag {hashtag} to {directory}") self.updater = Updater(tg_api_key) dp = self.updater.dispatcher dp.add_handler(CommandHandler("start", self.tg_start)) dp.add_error_handler(self.tg_error) dp.add_handler(CommandHandler("stats", self.tg_stats)) dp.add_handler(CommandHandler("orphans", self.tg_orphan)) dp.add_handler(CommandHandler("orphans_full", self.tg_orphan_full)) dp.add_handler(CommandHandler("retag", self.tg_retag)) dp.add_handler(CommandHandler("delete", self.tg_delete)) dp.add_handler(CommandHandler("protect", self.tg_protect)) dp.add_handler(CommandHandler("version", self.tg_version)) dp.add_handler(CommandHandler("queue", self.tg_queue)) dp.add_handler(MessageHandler(None, self.tg_handle)) self.acoustid_key = acoustid_key if tumblr_name and tumblr_keys: self.tumblr_name = tumblr_name self.tumblr_client = pytumblr.TumblrRestClient(*tumblr_keys) else: self.tumblr_client = None self.protected_password = protected_password self.last_downloaded = {} self.last_hashtags = {} def _setup_logging(self, log_path: StrPath): self.logger = logging.getLogger("delojza") self.logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.INFO) dfh = logging.FileHandler(os.path.join(log_path, "delojza.log")) dfh.setLevel(logging.DEBUG) formatter = logging.Formatter( "%(asctime)s - %(name)s [%(levelname)s] %(message)s" ) ch.setFormatter(formatter) dfh.setFormatter(formatter) self.logger.addHandler(ch) self.logger.addHandler(dfh) def _log_msg(self, update): from_user = update.message.from_user self.logger.debug( f"Received from {from_user.username or (from_user.first_name + from_user.last_name)}" f" ({update.message.chat.id}): " + (update.message.text or "") ) @staticmethod def ytdl_can(url: str): ies = youtube_dl.extractor.gen_extractors() for ie in ies: if ie.suitable(url) and ie.IE_NAME != "generic" and "/channel/" not in url: # Site has dedicated extractor return True return False # https://github.com/django/django/blob/master/django/utils/text.py#L393 @staticmethod def sanitize(text: str): if text is None: return "" text = ( unicodedata.normalize("NFKD", text) .encode("ascii", "ignore") .decode("ascii") ) return re.sub(r"[^\w.()\[\]{}#-]", "_", text) @staticmethod def _get_tags(filepath: StrPath): try: audio = EasyID3(filepath) return ( audio["artist"][0] if audio["artist"] else None, audio["title"][0] if audio["title"] else None, ) except mutagen.id3.ID3NoHeaderError: return None, None @staticmethod def _tag_file(filepath: StrPath, artist: Optional[str], title: str): try: id3 = mutagen.id3.ID3(filepath) except mutagen.id3.ID3NoHeaderError: mutafile = cast(Optional[FileType], File(filepath)) if not mutafile: return mutafile.add_tags() mutafile.save() id3 = mutagen.id3.ID3(filepath) id3.add(mutagen.id3.TIT2(encoding=3, text=title)) if artist: id3.add(mutagen.id3.TOPE(encoding=3, text=artist)) id3.add(mutagen.id3.TPE1(encoding=3, text=artist)) id3.save() def _autotag_file(self, filepath, message, info=None): if info is None: info = {} title = None artist = None source = None best_acoustid_score = 0 if self.acoustid_key: try: self.logger.debug("Requesting AcoustID for {}".format(filepath)) results = sorted( acoustid.match(self.acoustid_key, filepath), key=itemgetter(0), reverse=True, ) if len(results) > 0: score, rid, aid_title, aid_artist = results[0] if score > 0.4: title = aid_title artist = re.sub(r" *; +", " & ", aid_artist or "") best_acoustid_score = score source = "AcoustID ({}%)".format(round(score * 100)) except acoustid.NoBackendError: self.logger.warning("chromaprint library/tool not found") except acoustid.FingerprintGenerationError: self.logger.warning("fingerprint could not be calculated") except acoustid.WebServiceError as exc: self.logger.warning( "web service request failed: {}".format(exc.message) ) if best_acoustid_score < 0.8: if "track" in info: title = info["track"] if "artist" in info: artist = info["artist"] if "track" in info or "artist" in info: source = "supplied metadata" if title is None and artist is None and "-" in info.get("title", ""): split = info["title"].split("-") artist = split[0] title = split[1] source = "fallback (artist - title)" if title is None and "title" in info: title = info["title"] source = "full title fallback" if "soundcloud" in info.get("extractor", "") and artist is None: artist = info["uploader"] source = 'soundcloud "fallback"' artist = artist.strip() if artist else None title = title.strip() if title else None if title is None: message.reply_text("Tried tagging, found nothing :(") return message.reply_text( 'Tagging as "{}" by "{}"\nvia {}'.format(title, artist, source) ) self.logger.info( "Tagging {} w/ {} - {} [{}]...".format(filepath, title, artist, source) ) self._tag_file(filepath, artist, title) @staticmethod def _get_percent_filled(directory: str): output = subprocess.check_output(["df", directory]) percents_re = re.search(r"[0-9]+%", output.decode("utf-8")) if not percents_re: raise RuntimeError return int(percents_re.group(0)[:-1]) # noinspection PyUnusedLocal def download_ytdl( self, urls: List[str], out_path: StrPath, date: datetime, message: telegram.Message, audio: bool = False, filetitle: Optional[str] = None, ): ytdl = { "noplaylist": True, "restrictfilenames": True, "outtmpl": os.path.join( self.tmp_dir, "{}__%(title)s__%(id)s.%(ext)s".format(datestr(date)) ), } if audio: ytdl["format"] = "bestaudio/best" ytdl["postprocessors"] = [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "256", } ] ytdl["postprocessor_args"] = ["-ar", "44100"] filenames = [] with youtube_dl.YoutubeDL(ytdl) as ytdl: attempts = 0 while True: try: ytdl.download(urls) break except DownloadError as exc: attempts += 1 if "403" in str(exc) and attempts < 5: self.logger.warning("Received a 403!") sleep(1.357) if self.markov: message.reply_text(self.markov.make_sentence) else: raise exc for info in [ytdl.extract_info(url, download=False) for url in urls]: filename = cast(str, ytdl.prepare_filename(info)) globbeds = glob(os.path.splitext(filename)[0] + ".*") for globbed in globbeds: if globbed.endswith("mp3"): self._autotag_file(globbed, message, info=info) self.logger.info("Moving %s to %s..." % (globbed, out_path)) dest = shutil.move(globbed, out_path) filenames.append(dest) return filenames def download_raw( self, urls: List[str], out_path: StrPath, date: datetime, message: telegram.Message, audio: bool = False, filetitle: Optional[str] = None, ): filenames = [] for url in urls: local_filename = os.path.join( out_path, "{}__{}".format( datestr(date), self.sanitize(filetitle or url.split("/")[-1]) ), ) final_filename = local_filename is_mp3 = local_filename.endswith("mp3") r = requests.get(url, stream=True) with open(local_filename, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) if not re.match(r".*\..{3,5}$", os.path.split(local_filename)[-1]): kind = filetype.guess(local_filename) if kind is None: self.logger.error( "File has no extension and could not be determined!" ) else: self.logger.info( "Moving file without extension... %s?" % kind.extension ) final_filename = shutil.move( local_filename, local_filename + "." + kind.extension ) is_mp3 = kind.extension == "mp3" filenames.append(final_filename) if audio and is_mp3: try: id3 = mutagen.id3.ID3(final_filename) untagged = "TIT2" not in id3 except mutagen.id3.ID3NoHeaderError: untagged = True if untagged: self._autotag_file(final_filename, message) return filenames @staticmethod def extract_hashtags(message: telegram.Message): hashtags = list( map( message.parse_entity, list(filter(lambda e: e.type == "hashtag", message.entities)), ) ) hashtags += list( map( message.parse_caption_entity, list(filter(lambda e: e.type == "hashtag", message.caption_entities)), ) ) if len(hashtags) > 0: hashtags = [hashtag[1:].upper() for hashtag in hashtags] for i, hashtag in enumerate(hashtags): if "PRAS" in hashtag: hashtags[i] = "PRAS" return hashtags def _get_hashtags(self, message: telegram.Message): hashtags = self.extract_hashtags(message) if len(hashtags) == 0 and self.last_hashtags.get(message.chat.id) is not None: user, ts, last_hashtags = self.last_hashtags[message.chat.id] if user == message.from_user and ts > datetime.now() - timedelta(hours=1): hashtags = last_hashtags return hashtags def handle_text(self, message: telegram.Message, hashtags: List[str]): if len(hashtags) == 0 or hashtags[0] not in ("TEXT", "TXT"): return info_line = self.sanitize( "-".join(re.sub(r"#[\w]+", "", message.text).strip().split()[:7]) ) if len(info_line) > 64: info_line = info_line[:64] filename = "{}__{}.txt".format(datestr(message.date), info_line) out_dir = self.redirects.get(hashtags[0], self.out_dir) out_path = os.path.join(out_dir, *hashtags[1:] or ["TEXT"]) file_path = os.path.join(out_path, filename) mkdir_p(out_path) with open(file_path, "w") as out_file: out_file.write(message.text) message.reply_text( 'Saved text to "{}"...'.format( os.path.join(*hashtags[1:] or ["TEXT"], filename) ) ) # noinspection PyBroadException def handle( self, urls: List[str], message: telegram.Message, hashtags: List[str], download_fn: Any, filetitle=None, ): self.db.initialize() try: if len(hashtags) == 0: self.logger.info("Ignoring %s due to no hashtag present..." % urls) return False original_hashtags = hashtags if hashtags[0] in self.redirects: out_dir = self.redirects[hashtags[0]] hashtags = hashtags[1:] else: out_dir = self.out_dir if any( hashtag in self.db.get_protected_tags() for hashtag in original_hashtags ): if message.chat.id not in self.db.get_protected_chats(): self.logger.info( "Redirecting {} in chat {} due to protected hashtags: {}...".format( urls, message.chat.title, hashtags ) ) hashtags.insert(0, "PUBLIC") for i in range(len(hashtags)): current_path = hashtags[: i + 1] if not os.path.isdir(os.path.join(out_dir, *current_path)): test_path = current_path test_path[-1] = "_" + test_path[-1] if os.path.isdir(os.path.join(out_dir, *test_path)): self.logger.debug( f"Rerouting {current_path[-1]} to _{test_path[-1]}" ) hashtags[i] = test_path[-1] self.last_hashtags[message.chat.id] = None self.logger.info( "Downloading %s into '%s' (%s)" % (urls, "/".join(original_hashtags), out_dir) ) out_path = os.path.join(out_dir, *hashtags) mkdir_p(out_path) reply = 'Downloading to "{}"...'.format("/".join(original_hashtags)) audio = any( [ any([tag in hashtag for tag in ("AUDIO", "RADIO")]) for hashtag in original_hashtags ] ) if audio and download_fn != self.download_raw: reply += " (And also guessing you want to extract the audio)" message.reply_text(reply) filenames = download_fn( urls, out_path, message.date, message, audio=audio, filetitle=filetitle ) cmd_hashtag = original_hashtags[0] tumblr_ids = [] if cmd_hashtag in ("TUMBLR", "TUMBLR_NOW") and self.tumblr_client: now = cmd_hashtag == "TUMBLR_NOW" reply = "(btw, {})".format( "***FIRING TO TUMBLR RIGHT AWAY***" if now else "queueing to tumblr" ) message.reply_text(reply, parse_mode=telegram.ParseMode.MARKDOWN) for filename in filenames: if filename.endswith(".mp4"): try: output_filename = filename[: -len(".mp4")] + ".gif" subprocess.check_output( ["ffmpeg", "-i", filename, output_filename] ) filename = output_filename except subprocess.CalledProcessError: message.reply_text( "Conversion to gif failed, sorry! Check log..." ) continue response = self.tumblr_client.create_photo( self.tumblr_name, data=filename, state="published" if now else "queue", ) if "id" in response: tumblr_ids.append(response["id"]) else: self.logger.warning( "Did not receive 'id' in tumblr response: \n" + pprint.pformat(response) ) message.reply_text( "Something weird happened with the tumblrs, check it!" ) self.last_downloaded[message.chat.id] = ( filenames, original_hashtags, tumblr_ids, ) return True except: exc_type, exc_value, __ = sys.exc_info() if "Timed out" not in str(exc_value): message.reply_text( "Something is FUCKED: [{}] {}".format(exc_type, exc_value) ) return False def handle_tg_message(self, message, bot, hashtag): file, filetitle, tumblr = None, None, False if len(message.photo) > 0: photo = max(message.photo, key=lambda p: p.width) file = photo.file_id elif message.document is not None: filetitle = message.document.file_name file = message.document.file_id elif message.audio is not None: filetitle = message.audio.title file = message.audio.file_id elif message.video is not None: file = message.video.file_id elif message.video_note is not None: file = message.video_note.file_id elif message.voice is not None: file = message.voice.file_id if file is not None: url = bot.getFile(file).file_path return self.handle( [url], message, hashtag, self.download_raw, filetitle=filetitle ) else: return False def handle_urls(self, message: telegram.Message, hashtags: List[str]): urls = list( map( lambda e: message.parse_entity(e), filter(lambda e: e.type == "url", message.entities), ) ) ytdl_res = False ytdl_urls = [url for url in urls if self.ytdl_can(url)] if len(ytdl_urls) > 0: ytdl_res = self.handle(ytdl_urls, message, hashtags, self.download_ytdl) raw_res = False normal_urls = [url for url in urls if not self.ytdl_can(url)] if len(normal_urls) > 0: file_urls = [ url for url in normal_urls if "text" not in requests.head(url).headers.get("Content-Type", "text") ] if len(file_urls) > 0: raw_res = self.handle(file_urls, message, hashtags, self.download_raw) return ytdl_res or raw_res def tg_handle(self, bot: telegram.Bot, update: telegram.Update): self._log_msg(update) hashtags = self._get_hashtags(update.message) if hashtags: url_res = self.handle_urls( update.message, self._get_hashtags(update.message) ) if url_res: return msg_res = self.handle_tg_message( update.message, bot, self._get_hashtags(update.message) ) if msg_res: return hashtags = self.extract_hashtags(update.message) if len(hashtags) > 0: self.handle_text( update.message.reply_to_message or update.message, hashtags ) if update.message.reply_to_message: self.handle_tg_message(update.message.reply_to_message, bot, hashtags) self.handle_urls(update.message.reply_to_message, hashtags) else: self.last_hashtags[update.message.chat.id] = ( update.message.from_user, datetime.now(), hashtags, ) else: if self.markov and update.message.text: self.markov.add_to_corpus(update.message.text) def _get_tag_dirs(self): return ( list( filter( lambda x: x.upper() == x, filter( lambda directory: os.path.isdir( os.path.join(self.out_dir, directory) ), os.listdir(self.out_dir), ), ) ) + list(self.redirects.keys()) ) def tg_stats(self, _, update: telegram.Update): self._log_msg(update) self.db.initialize() if update.message.chat.id not in self.db.get_protected_chats(): update.message.reply_text( (self.markov.make_sentence() + "!") if self.markov and random() > 0.7 else "nope." ) return tag_dirs = self._get_tag_dirs() reply = "Total number of tags: {}\n\n".format(len(tag_dirs)) counts = [ (directory, os.listdir(os.path.join(self.out_dir, directory))) for directory in tag_dirs ] # TODO REDIRECTS counts.sort(key=itemgetter(0)) counts.sort(key=lambda x: len(x[1]), reverse=True) for directory, files in counts: if len(files) == 1: break abs_paths = [ os.path.join(self.out_dir, directory, file) for file in files ] # TODO REDIRECTS abs_files = list(filter(os.path.isfile, abs_paths)) # mimes = [magic.from_file(path, mime=True).split("/")[0] for path in abs_files] # mime_counts = [(mime, mimes.count(mime)) for mime in set(mimes)] exts = [ ext[1:] for ext in [os.path.splitext(path)[1] for path in abs_files] if len(ext) > 0 ] ext_counts = [(ext, exts.count(ext)) for ext in set(exts)] dir_cnt = len(abs_paths) - len(abs_files) type_counts = ext_counts + ( [("directorie", dir_cnt)] if dir_cnt > 0 else [] ) details = ", ".join( [ "{} {}s".format(cnt, mime) for mime, cnt in sorted( type_counts, key=itemgetter(1), reverse=True ) ] ) if len(type_counts) == 1: reply += "{}: {}\n".format(directory, details) else: reply += "{}: {} files ({})\n".format( directory, len(files), details ) orphans = list(filter(lambda cnt: len(cnt[1]) <= 1, counts)) if len(orphans) > 0: reply += "\nFollowing tags are orphans: " + ", ".join( map(itemgetter(0), orphans) ) update.message.reply_text(reply, parse_mode=telegram.ParseMode.HTML) def _get_orphan_tags(self): result = [] for directory in self._get_tag_dirs(): files = os.listdir(os.path.join(self.out_dir, directory)) if len(files) == 1: result.append((directory, files[0])) if len(files) == 0: result.append((directory, "NO FILE AT ALL...")) return sorted(result, key=itemgetter(0)) def tg_orphan(self, _, update: telegram.Update): self._log_msg(update) self.db.initialize() if update.message.chat.id not in self.db.get_protected_chats(): update.message.reply_text( (self.markov.make_sentence() + "!") if self.markov and random() > 0.7 else "nope." ) return orphans = self._get_orphan_tags() if len(orphans) == 0: update.message.reply_text("Good job, no orphan tags!") else: update.message.reply_text( "The following tags only contain a single file:\n" + ", ".join(map(itemgetter(0), orphans)) ) def tg_orphan_full(self, _, update): self._log_msg(update) self.db.initialize() if update.message.chat.id not in self.db.get_protected_chats(): update.message.reply_text( (self.markov.make_sentence() + "!") if self.markov and random() > 0.7 else "nope." ) return orphans = self._get_orphan_tags() if len(orphans) == 0: update.message.reply_text("Good job, no orphan tags!") else: tmp_reply = "The following tags only contain a single file:\n" for directory, file in orphans: line = "{}: {}\n".format(directory, file) if len(tmp_reply + line) > 4096: update.message.reply_text(tmp_reply) tmp_reply = "" tmp_reply += line if len(tmp_reply) > 0: update.message.reply_text(tmp_reply) def tg_retag(self, _, update: telegram.Update): self._log_msg(update) if self.last_downloaded.get(update.message.chat.id) is not None: files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id] out_dir = self.redirects.get(hashtags[0], self.out_dir) mp3s = [filename for filename in files if filename.endswith("mp3")] if len(mp3s) > 0: arg_raw = re.sub(r"^/[@\w]+ ?", "", update.message.text).strip() artist, title = None, None reverse = len(arg_raw) == 0 if not reverse: tagline = arg_raw.split(" - ") if len(tagline) == 1: title = tagline[0].strip() else: artist = tagline[0].strip() title = tagline[1].strip() for mp3 in mp3s: if reverse: orig_artist, orig_title = self._get_tags(mp3) title, artist = orig_artist, orig_title self._tag_file(mp3, artist, cast(str, title)) update.message.reply_text( 'Tagging "{}" as "{}" by "{}"!'.format( mp3[len(out_dir) + 1 :], title, artist ) ) else: update.message.reply_text( (self.markov.make_sentence() if self.markov and random() > 0.7 else "") + "???" ) def tg_delete(self, _, update: telegram.Update): self._log_msg(update) if self.last_downloaded.get(update.message.chat.id) is not None: files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id] out_dir = self.redirects.get(hashtags[0], self.out_dir) for file in files: update.message.reply_text( 'Removing "{}"!'.format(file[len(out_dir) + 1 :]) ) os.remove(file) parent_dir = os.path.dirname(file) while True: if len(os.listdir(parent_dir)) == 0: update.message.reply_text( 'Removing directory "{}" as it\'s empty...'.format( parent_dir[len(out_dir) + 1 :] ) ) os.rmdir(parent_dir) if parent_dir == out_dir: break parent_dir = os.path.dirname(parent_dir) if len(tumblr_ids) > 0: plural = ( "s (all {} of them)".format(len(tumblr_ids)) if len(tumblr_ids) > 1 else "" ) update.message.reply_text("Also deleting tumblr post{}!".format(plural)) for tumblr_id in tumblr_ids: if self.tumblr_client: self.tumblr_client.delete_post(self.tumblr_name, tumblr_id) self.last_downloaded[update.message.chat.id] = None return update.message.reply_text("Nothing to remove!") def tg_protect(self, _, update: telegram.Update): self._log_msg(update) self.db.initialize() msg_split = update.message.text.split(" ") if len(msg_split) != 3: update.message.reply_text( (self.markov.make_sentence() if self.markov and random() > 0.7 else "") + "???" ) return chat_in_db = self.db.get_chat(update.message.chat.id) cmd = msg_split[1] if cmd == "tag": if chat_in_db and chat_in_db[1]: tag = msg_split[2].upper() tag_in_db = self.db.get_tag(tag) if tag_in_db: _, _, protected = tag_in_db end_protected = not protected else: end_protected = True self.db.set_tag_protected(tag, end_protected) update.message.reply_text( f"got it, will {'NOT ' if not end_protected else ''}protect tag {tag}!" ) else: update.message.reply_text( ( self.markov.make_sentence() if self.markov and random() > 0.7 else "hublubl" ) ) elif cmd == "chat": password = msg_split[2] if password == self.protected_password: if chat_in_db: _, protected = chat_in_db end_protected = not protected else: end_protected = True self.db.set_chat_protected(update.message.chat.id, end_protected) update.message.reply_text( f"got it, will {'NOT ' if not end_protected else ''}protect this chat!" ) else: update.message.reply_text( ( self.markov.make_sentence() if self.markov and random() > 0.7 else "hublubl" ) ) else: update.message.reply_text( (self.markov.make_sentence() if self.markov and random() > 0.7 else "") + "???" ) def tg_queue(self, _, update: telegram.Update): if self.tumblr_client: blog_info = self.tumblr_client.blog_info(self.tumblr_name) update.message.reply_text( "Currently queued tumblr posts: " + str(blog_info["blog"].get("queue", "???")) ) else: update.message.reply_text( (self.markov.make_sentence() if self.markov and random() > 0.7 else "") + "???" ) # noinspection PyMethodMayBeStatic def tg_version(self, _, update: telegram.Update): self._log_msg(update) delojza_date = datetime.fromtimestamp( os.path.getmtime(os.path.realpath(__file__)) ).strftime("%Y/%m/%d - %H:%M:%S") update.message.reply_text( "delojza modified date: {}\nyoutube-dl version: {}".format( delojza_date, YTDL_VERSION ) ) def tg_start(self, _, update: telegram.Update): self._log_msg(update) update.message.reply_text( self.markov.make_sentence() if self.markov else "HELLO" ) def tg_error( self, bot: telegram.Bot, update: telegram.Update, error: telegram.TelegramError ): self.logger.error(error) if "Timed out" in str(error): if update is not None: default = "Mmmm, I like it..." update.message.reply_text( (self.markov.make_sentence(tries=100) if self.markov else default) or default ) self.tg_handle(bot, update) else: if update is not None: update.message.reply_text("Something is fucked: %s" % error) def run_idle(self): self.updater.start_polling() self.logger.info("Started Telegram bot...") self.updater.idle() if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) _DIR_ = os.path.dirname(os.path.realpath(__file__)) CONFIG_PATHS = [ "/etc/delojza/delojza.ini", os.path.join(os.getenv("HOME") or "", ".config/delojza/delojza.ini"), os.path.join(_DIR_, "delojza.ini"), ] config = ConfigParser() try: CONF_FILE = next( conf_path for conf_path in CONFIG_PATHS if os.path.isfile(conf_path) ) config.read(CONF_FILE) except StopIteration: logging.error("No config file found, stopping.") sys.exit(-1) try: markov = MarkovBlabberer("initial.txt") except FileNotFoundError: logging.warning( "Didn't find `initial.txt`, continuing without markov blabbering!" ) markov = None try: redirects: Optional[List[Tuple[str, str]]] = config.items("redirects") except NoSectionError: redirects = None try: tumblr_keys = ( config.get("tumblr", "consumer_key"), config.get("tumblr", "consumer_secret"), config.get("tumblr", "oauth_key"), config.get("tumblr", "oauth_secret"), ) except (NoSectionError, KeyError): tumblr_keys = None delojza = DelojzaBot( config.get("delojza", "tg_api_key"), config.get("delojza", "OUT_DIR", fallback=os.path.join(_DIR_, "out")), tmp_dir=config.get("delojza", "tmp_dir", fallback=tempfile.gettempdir()), redirects=redirects, protected_password=config.get("delojza", "protected_password", fallback=None), acoustid_key=config.get("delojza", "acoustid_api_key", fallback=None), tumblr_name=config.get("tumblr", "blog_name", fallback=None), tumblr_keys=tumblr_keys, markov=markov, ) delojza.run_idle()