1062 lines
39 KiB
Python
Executable file
1062 lines
39 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import logging
|
|
import os
|
|
import pprint
|
|
import re
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import unicodedata
|
|
from configparser import ConfigParser, NoSectionError
|
|
from datetime import datetime, timedelta
|
|
from glob import glob
|
|
from operator import itemgetter
|
|
from random import random
|
|
from sqlite3.dbapi2 import Connection
|
|
from time import sleep
|
|
from typing import Any, List, Optional, Tuple, cast
|
|
|
|
import acoustid
|
|
import filetype
|
|
import mutagen.id3
|
|
import pytumblr
|
|
import requests
|
|
import telegram
|
|
import youtube_dl
|
|
from _typeshed import StrPath
|
|
from mutagen import File, FileType
|
|
from mutagen.easyid3 import EasyID3
|
|
from telegram.ext import CommandHandler, MessageHandler, Updater
|
|
from telegram.ext.callbackcontext import CallbackContext
|
|
from telegram.ext.filters import Filters
|
|
from telegram.update import Update
|
|
from youtube_dl import DownloadError
|
|
from youtube_dl.version import __version__ as YTDL_VERSION
|
|
|
|
from markov import MarkovBlabberer
|
|
from util import datestr, mkdir_p
|
|
|
|
|
|
class DelojzaDB:
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
self.db: Optional[Connection] = None
|
|
|
|
def initialize(self):
|
|
if self.db is None:
|
|
self.db = sqlite3.connect(self.db_path)
|
|
|
|
def get_protected_tags(self):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
results = self.db.execute("SELECT tag FROM tags WHERE protected == 1")
|
|
return [res[0] for res in results.fetchall()]
|
|
|
|
def get_protected_chats(self):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
results = self.db.execute("SELECT id FROM chats WHERE protected == 1")
|
|
return [res[0] for res in results.fetchall()]
|
|
|
|
def get_chat(self, id):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
return self.db.execute(
|
|
"SELECT id, protected FROM chats WHERE id == ?", (id,)
|
|
).fetchone()
|
|
|
|
def set_chat_protected(self, id: int, protected: bool):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
chat_in_db = self.get_chat(id)
|
|
if chat_in_db:
|
|
self.db.execute(
|
|
"UPDATE chats SET protected = ? WHERE id = ?", (protected, id)
|
|
)
|
|
else:
|
|
self.db.execute(
|
|
"INSERT INTO chats (id, protected) VALUES (?, ?)", (id, protected)
|
|
)
|
|
self.db.commit()
|
|
|
|
def get_tag(self, tag: str):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
return self.db.execute(
|
|
"SELECT id, tag, protected FROM tags WHERE tag == ?", (tag,)
|
|
).fetchone()
|
|
|
|
def set_tag_protected(self, tag: str, protected: bool):
|
|
if self.db is None:
|
|
raise RuntimeError("Database not initialized!")
|
|
tag_in_db = self.get_tag(tag)
|
|
if tag_in_db:
|
|
self.db.execute(
|
|
"UPDATE tags SET protected = ? WHERE tag = ?", (protected, tag)
|
|
)
|
|
else:
|
|
self.db.execute(
|
|
"INSERT INTO tags (tag, protected) VALUES (?, ?)", (tag, protected)
|
|
)
|
|
self.db.commit()
|
|
|
|
|
|
class DelojzaBot:
|
|
def __init__(
|
|
self,
|
|
tg_api_key: str,
|
|
out_dir: StrPath,
|
|
redirects: Optional[List[Tuple[str, str]]] = None,
|
|
tmp_dir: Optional[StrPath] = None,
|
|
db_path: Optional[StrPath] = None,
|
|
protected_password: Optional[str] = None,
|
|
acoustid_key: Optional[str] = None,
|
|
tumblr_name: Optional[str] = None,
|
|
tumblr_keys: Optional[Tuple[str, str, str, str]] = None,
|
|
markov: Optional[MarkovBlabberer] = None,
|
|
):
|
|
self._setup_logging(os.path.dirname(os.path.realpath(__file__)))
|
|
|
|
self.db = DelojzaDB(
|
|
db_path
|
|
or os.path.join(os.path.dirname(os.path.realpath(__file__)), "delojza.db")
|
|
)
|
|
|
|
self.out_dir = os.path.abspath(out_dir)
|
|
self.out_dir = self.out_dir[:-1] if self.out_dir[-1] == "/" else self.out_dir
|
|
self.logger.debug(f"OUT_DIR: {out_dir}")
|
|
self.tmp_dir = tmp_dir if tmp_dir else tempfile.gettempdir()
|
|
self.logger.debug(f"TMP_DIR: {tmp_dir}")
|
|
self.markov = markov
|
|
|
|
self.redirects = {}
|
|
if redirects is not None:
|
|
for hashtag, directory in redirects:
|
|
hashtag = hashtag.upper()
|
|
directory = str(directory)
|
|
directory = directory[:-1] if directory[-1] == "/" else directory
|
|
mkdir_p(directory)
|
|
self.redirects[hashtag] = directory
|
|
self.logger.debug(f"Will redirect hashtag {hashtag} to {directory}")
|
|
|
|
self.updater = Updater(tg_api_key)
|
|
dp = self.updater.dispatcher
|
|
|
|
dp.add_handler(CommandHandler("start", self.tg_start))
|
|
dp.add_error_handler(self.tg_error)
|
|
dp.add_handler(CommandHandler("stats", self.tg_stats))
|
|
dp.add_handler(CommandHandler("orphans", self.tg_orphan))
|
|
dp.add_handler(CommandHandler("orphans_full", self.tg_orphan_full))
|
|
dp.add_handler(CommandHandler("retag", self.tg_retag))
|
|
dp.add_handler(CommandHandler("delete", self.tg_delete))
|
|
dp.add_handler(CommandHandler("protect", self.tg_protect))
|
|
dp.add_handler(CommandHandler("version", self.tg_version))
|
|
dp.add_handler(CommandHandler("queue", self.tg_queue))
|
|
dp.add_handler(MessageHandler(Filters.all, self.tg_handle))
|
|
|
|
self.acoustid_key = acoustid_key
|
|
|
|
if tumblr_name and tumblr_keys:
|
|
self.tumblr_name = tumblr_name
|
|
self.tumblr_client = pytumblr.TumblrRestClient(*tumblr_keys)
|
|
else:
|
|
self.tumblr_client = None
|
|
|
|
self.protected_password = protected_password
|
|
self.last_downloaded = {}
|
|
self.last_hashtags = {}
|
|
|
|
def _setup_logging(self, log_path: StrPath):
|
|
self.logger = logging.getLogger("delojza")
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.INFO)
|
|
|
|
dfh = logging.FileHandler(os.path.join(log_path, "delojza.log"))
|
|
dfh.setLevel(logging.DEBUG)
|
|
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s - %(name)s [%(levelname)s] %(message)s"
|
|
)
|
|
|
|
ch.setFormatter(formatter)
|
|
dfh.setFormatter(formatter)
|
|
|
|
self.logger.addHandler(ch)
|
|
self.logger.addHandler(dfh)
|
|
|
|
def _log_msg(self, update):
|
|
from_user = update.message.from_user
|
|
self.logger.debug(
|
|
f"Received from {from_user.username or (from_user.first_name + from_user.last_name)}"
|
|
f" ({update.message.chat.id}): " + (update.message.text or "<NONE>")
|
|
)
|
|
|
|
@staticmethod
|
|
def ytdl_can(url: str):
|
|
ies = youtube_dl.extractor.gen_extractors()
|
|
for ie in ies:
|
|
if ie.suitable(url) and ie.IE_NAME != "generic" and "/channel/" not in url:
|
|
# Site has dedicated extractor
|
|
return True
|
|
return False
|
|
|
|
# https://github.com/django/django/blob/master/django/utils/text.py#L393
|
|
@staticmethod
|
|
def sanitize(text: str):
|
|
if text is None:
|
|
return ""
|
|
text = (
|
|
unicodedata.normalize("NFKD", text)
|
|
.encode("ascii", "ignore")
|
|
.decode("ascii")
|
|
)
|
|
return re.sub(r"[^\w.()\[\]{}#-]", "_", text)
|
|
|
|
@staticmethod
|
|
def _get_tags(filepath: StrPath):
|
|
try:
|
|
audio = EasyID3(filepath)
|
|
return (
|
|
audio["artist"][0] if audio["artist"] else None,
|
|
audio["title"][0] if audio["title"] else None,
|
|
)
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _tag_file(filepath: StrPath, artist: Optional[str], title: str):
|
|
try:
|
|
id3 = mutagen.id3.ID3(filepath)
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
mutafile = cast(Optional[FileType], File(filepath))
|
|
if not mutafile:
|
|
return
|
|
mutafile.add_tags()
|
|
mutafile.save()
|
|
id3 = mutagen.id3.ID3(filepath)
|
|
id3.add(mutagen.id3.TIT2(encoding=3, text=title))
|
|
if artist:
|
|
id3.add(mutagen.id3.TOPE(encoding=3, text=artist))
|
|
id3.add(mutagen.id3.TPE1(encoding=3, text=artist))
|
|
id3.save()
|
|
|
|
def _autotag_file(self, filepath, message, info=None):
|
|
if info is None:
|
|
info = {}
|
|
|
|
title = None
|
|
artist = None
|
|
source = None
|
|
|
|
best_acoustid_score = 0
|
|
|
|
if self.acoustid_key:
|
|
try:
|
|
self.logger.debug("Requesting AcoustID for {}".format(filepath))
|
|
results = sorted(
|
|
acoustid.match(self.acoustid_key, filepath),
|
|
key=itemgetter(0),
|
|
reverse=True,
|
|
)
|
|
if len(results) > 0:
|
|
score, rid, aid_title, aid_artist = results[0]
|
|
if score > 0.4:
|
|
title = aid_title
|
|
artist = re.sub(r" *; +", " & ", aid_artist or "")
|
|
best_acoustid_score = score
|
|
source = "AcoustID ({}%)".format(round(score * 100))
|
|
except acoustid.NoBackendError:
|
|
self.logger.warning("chromaprint library/tool not found")
|
|
except acoustid.FingerprintGenerationError:
|
|
self.logger.warning("fingerprint could not be calculated")
|
|
except acoustid.WebServiceError as exc:
|
|
self.logger.warning(
|
|
"web service request failed: {}".format(exc.message)
|
|
)
|
|
|
|
if best_acoustid_score < 0.8:
|
|
if "track" in info:
|
|
title = info["track"]
|
|
if "artist" in info:
|
|
artist = info["artist"]
|
|
|
|
if "track" in info or "artist" in info:
|
|
source = "supplied metadata"
|
|
|
|
if title is None and artist is None and "-" in info.get("title", ""):
|
|
split = info["title"].split("-")
|
|
artist = split[0]
|
|
title = split[1]
|
|
source = "fallback (artist - title)"
|
|
|
|
if title is None and "title" in info:
|
|
title = info["title"]
|
|
source = "full title fallback"
|
|
|
|
if "soundcloud" in info.get("extractor", "") and artist is None:
|
|
artist = info["uploader"]
|
|
source = 'soundcloud "fallback"'
|
|
|
|
artist = artist.strip() if artist else None
|
|
title = title.strip() if title else None
|
|
|
|
if title is None:
|
|
message.reply_text("Tried tagging, found nothing :(")
|
|
return
|
|
|
|
message.reply_text(
|
|
'Tagging as "{}" by "{}"\nvia {}'.format(title, artist, source)
|
|
)
|
|
self.logger.info(
|
|
"Tagging {} w/ {} - {} [{}]...".format(filepath, title, artist, source)
|
|
)
|
|
self._tag_file(filepath, artist, title)
|
|
|
|
@staticmethod
|
|
def _get_percent_filled(directory: str):
|
|
output = subprocess.check_output(["df", directory])
|
|
percents_re = re.search(r"[0-9]+%", output.decode("utf-8"))
|
|
if not percents_re:
|
|
raise RuntimeError
|
|
return int(percents_re.group(0)[:-1])
|
|
|
|
# noinspection PyUnusedLocal
|
|
def download_ytdl(
|
|
self,
|
|
urls: List[str],
|
|
out_path: StrPath,
|
|
date: datetime,
|
|
message: telegram.Message,
|
|
audio: bool = False,
|
|
filetitle: Optional[str] = None,
|
|
):
|
|
ytdl = {
|
|
"noplaylist": True,
|
|
"restrictfilenames": True,
|
|
"outtmpl": os.path.join(
|
|
self.tmp_dir, "{}__%(title)s__%(id)s.%(ext)s".format(datestr(date))
|
|
),
|
|
}
|
|
if audio:
|
|
ytdl["format"] = "bestaudio/best"
|
|
ytdl["postprocessors"] = [
|
|
{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
"preferredquality": "256",
|
|
}
|
|
]
|
|
ytdl["postprocessor_args"] = ["-ar", "44100"]
|
|
filenames = []
|
|
with youtube_dl.YoutubeDL(ytdl) as ytdl:
|
|
attempts = 0
|
|
while True:
|
|
try:
|
|
ytdl.download(urls)
|
|
break
|
|
except DownloadError as exc:
|
|
attempts += 1
|
|
if "403" in str(exc) and attempts < 5:
|
|
self.logger.warning("Received a 403!")
|
|
sleep(1.357)
|
|
if self.markov:
|
|
message.reply_text(self.markov.make_sentence())
|
|
else:
|
|
raise exc
|
|
for info in [ytdl.extract_info(url, download=False) for url in urls]:
|
|
filename = cast(str, ytdl.prepare_filename(info))
|
|
globbeds = glob(os.path.splitext(filename)[0] + ".*")
|
|
for globbed in globbeds:
|
|
if globbed.endswith("mp3"):
|
|
self._autotag_file(globbed, message, info=info)
|
|
self.logger.info("Moving %s to %s..." % (globbed, out_path))
|
|
dest = shutil.move(globbed, out_path)
|
|
filenames.append(dest)
|
|
return filenames
|
|
|
|
def download_raw(
|
|
self,
|
|
urls: List[str],
|
|
out_path: StrPath,
|
|
date: datetime,
|
|
message: telegram.Message,
|
|
audio: bool = False,
|
|
filetitle: Optional[str] = None,
|
|
):
|
|
filenames = []
|
|
for url in urls:
|
|
local_filename = os.path.join(
|
|
out_path,
|
|
"{}__{}".format(
|
|
datestr(date), self.sanitize(filetitle or url.split("/")[-1])
|
|
),
|
|
)
|
|
final_filename = local_filename
|
|
is_mp3 = local_filename.endswith("mp3")
|
|
|
|
r = requests.get(url, stream=True)
|
|
with open(local_filename, "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
if not re.match(r".*\..{3,5}$", os.path.split(local_filename)[-1]):
|
|
kind = filetype.guess(local_filename)
|
|
if kind is None:
|
|
self.logger.error(
|
|
"File has no extension and could not be determined!"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
"Moving file without extension... %s?" % kind.extension
|
|
)
|
|
final_filename = shutil.move(
|
|
local_filename, local_filename + "." + kind.extension
|
|
)
|
|
is_mp3 = kind.extension == "mp3"
|
|
|
|
filenames.append(final_filename)
|
|
|
|
if audio and is_mp3:
|
|
try:
|
|
id3 = mutagen.id3.ID3(final_filename)
|
|
untagged = "TIT2" not in id3
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
untagged = True
|
|
if untagged:
|
|
self._autotag_file(final_filename, message)
|
|
|
|
return filenames
|
|
|
|
@staticmethod
|
|
def extract_hashtags(message: telegram.Message):
|
|
hashtags = list(
|
|
map(
|
|
message.parse_entity,
|
|
list(filter(lambda e: e.type == "hashtag", message.entities)),
|
|
)
|
|
)
|
|
hashtags += list(
|
|
map(
|
|
message.parse_caption_entity,
|
|
list(filter(lambda e: e.type == "hashtag", message.caption_entities)),
|
|
)
|
|
)
|
|
if len(hashtags) > 0:
|
|
hashtags = [hashtag[1:].upper() for hashtag in hashtags]
|
|
for i, hashtag in enumerate(hashtags):
|
|
if "PRAS" in hashtag:
|
|
hashtags[i] = "PRAS"
|
|
return hashtags
|
|
|
|
def _get_hashtags(self, message: telegram.Message):
|
|
hashtags = self.extract_hashtags(message)
|
|
if len(hashtags) == 0 and self.last_hashtags.get(message.chat.id) is not None:
|
|
user, ts, last_hashtags = self.last_hashtags[message.chat.id]
|
|
if user == message.from_user and ts > datetime.now() - timedelta(hours=1):
|
|
hashtags = last_hashtags
|
|
return hashtags
|
|
|
|
def handle_text(self, message: telegram.Message, hashtags: List[str]):
|
|
if len(hashtags) == 0 or hashtags[0] not in ("TEXT", "TXT"):
|
|
return
|
|
|
|
info_line = self.sanitize(
|
|
"-".join(re.sub(r"#[\w]+", "", message.text).strip().split()[:7])
|
|
)
|
|
if len(info_line) > 64:
|
|
info_line = info_line[:64]
|
|
|
|
filename = "{}__{}.txt".format(datestr(message.date), info_line)
|
|
out_dir = self.redirects.get(hashtags[0], self.out_dir)
|
|
out_path = os.path.join(out_dir, *hashtags[1:] or ["TEXT"])
|
|
file_path = os.path.join(out_path, filename)
|
|
|
|
mkdir_p(out_path)
|
|
|
|
with open(file_path, "w") as out_file:
|
|
out_file.write(message.text)
|
|
|
|
message.reply_text(
|
|
'Saved text to "{}"...'.format(
|
|
os.path.join(*hashtags[1:] or ["TEXT"], filename)
|
|
)
|
|
)
|
|
|
|
# noinspection PyBroadException
|
|
def handle(
|
|
self,
|
|
urls: List[str],
|
|
message: telegram.Message,
|
|
hashtags: List[str],
|
|
download_fn: Any,
|
|
filetitle=None,
|
|
):
|
|
self.db.initialize()
|
|
|
|
try:
|
|
if len(hashtags) == 0:
|
|
self.logger.info("Ignoring %s due to no hashtag present..." % urls)
|
|
return False
|
|
|
|
original_hashtags = hashtags
|
|
if hashtags[0] in self.redirects:
|
|
out_dir = self.redirects[hashtags[0]]
|
|
hashtags = hashtags[1:]
|
|
else:
|
|
out_dir = self.out_dir
|
|
|
|
if any(
|
|
hashtag in self.db.get_protected_tags() for hashtag in original_hashtags
|
|
):
|
|
if message.chat.id not in self.db.get_protected_chats():
|
|
self.logger.info(
|
|
"Redirecting {} in chat {} due to protected hashtags: {}...".format(
|
|
urls, message.chat.title, hashtags
|
|
)
|
|
)
|
|
hashtags.insert(0, "PUBLIC")
|
|
|
|
for i in range(len(hashtags)):
|
|
current_path = hashtags[: i + 1]
|
|
if not os.path.isdir(os.path.join(out_dir, *current_path)):
|
|
test_path = current_path
|
|
test_path[-1] = "_" + test_path[-1]
|
|
if os.path.isdir(os.path.join(out_dir, *test_path)):
|
|
self.logger.debug(
|
|
f"Rerouting {current_path[-1]} to _{test_path[-1]}"
|
|
)
|
|
hashtags[i] = test_path[-1]
|
|
|
|
self.last_hashtags[message.chat.id] = None
|
|
|
|
self.logger.info(
|
|
"Downloading %s into '%s' (%s)"
|
|
% (urls, "/".join(original_hashtags), out_dir)
|
|
)
|
|
|
|
out_path = os.path.join(out_dir, *hashtags)
|
|
mkdir_p(out_path)
|
|
|
|
reply = 'Downloading to "{}"...'.format("/".join(original_hashtags))
|
|
|
|
audio = any(
|
|
[
|
|
any([tag in hashtag for tag in ("AUDIO", "RADIO")])
|
|
for hashtag in original_hashtags
|
|
]
|
|
)
|
|
if audio and download_fn != self.download_raw:
|
|
reply += " (And also guessing you want to extract the audio)"
|
|
message.reply_text(reply)
|
|
|
|
filenames = download_fn(
|
|
urls, out_path, message.date, message, audio=audio, filetitle=filetitle
|
|
)
|
|
|
|
cmd_hashtag = original_hashtags[0]
|
|
|
|
tumblr_ids = []
|
|
if cmd_hashtag in ("TUMBLR", "TUMBLR_NOW") and self.tumblr_client:
|
|
now = cmd_hashtag == "TUMBLR_NOW"
|
|
reply = "(btw, {})".format(
|
|
"***FIRING TO TUMBLR RIGHT AWAY***" if now else "queueing to tumblr"
|
|
)
|
|
message.reply_text(reply, parse_mode=telegram.ParseMode.MARKDOWN)
|
|
for filename in filenames:
|
|
if filename.endswith(".mp4"):
|
|
try:
|
|
output_filename = filename[: -len(".mp4")] + ".gif"
|
|
subprocess.check_output(
|
|
["ffmpeg", "-i", filename, output_filename]
|
|
)
|
|
filename = output_filename
|
|
except subprocess.CalledProcessError:
|
|
message.reply_text(
|
|
"Conversion to gif failed, sorry! Check log..."
|
|
)
|
|
continue
|
|
response = self.tumblr_client.create_photo(
|
|
self.tumblr_name,
|
|
data=filename,
|
|
state="published" if now else "queue",
|
|
)
|
|
if "id" in response:
|
|
tumblr_ids.append(response["id"])
|
|
else:
|
|
self.logger.warning(
|
|
"Did not receive 'id' in tumblr response: \n"
|
|
+ pprint.pformat(response)
|
|
)
|
|
message.reply_text(
|
|
"Something weird happened with the tumblrs, check it!"
|
|
)
|
|
|
|
self.last_downloaded[message.chat.id] = (
|
|
filenames,
|
|
original_hashtags,
|
|
tumblr_ids,
|
|
)
|
|
return True
|
|
except:
|
|
exc_type, exc_value, __ = sys.exc_info()
|
|
if "Timed out" not in str(exc_value):
|
|
message.reply_text(
|
|
"Something is FUCKED: [{}] {}".format(exc_type, exc_value)
|
|
)
|
|
return False
|
|
|
|
def handle_tg_message(self, message, bot, hashtag):
|
|
file, filetitle, tumblr = None, None, False
|
|
if len(message.photo) > 0:
|
|
photo = max(message.photo, key=lambda p: p.width)
|
|
file = photo.file_id
|
|
elif message.document is not None:
|
|
filetitle = message.document.file_name
|
|
file = message.document.file_id
|
|
elif message.audio is not None:
|
|
filetitle = message.audio.title
|
|
file = message.audio.file_id
|
|
elif message.video is not None:
|
|
file = message.video.file_id
|
|
elif message.video_note is not None:
|
|
file = message.video_note.file_id
|
|
elif message.voice is not None:
|
|
file = message.voice.file_id
|
|
|
|
if file is not None:
|
|
url = bot.getFile(file).file_path
|
|
return self.handle(
|
|
[url], message, hashtag, self.download_raw, filetitle=filetitle
|
|
)
|
|
else:
|
|
return False
|
|
|
|
def handle_urls(self, message: telegram.Message, hashtags: List[str]):
|
|
urls = list(
|
|
map(
|
|
lambda e: message.parse_entity(e),
|
|
filter(lambda e: e.type == "url", message.entities),
|
|
)
|
|
)
|
|
|
|
ytdl_res = False
|
|
ytdl_urls = [url for url in urls if self.ytdl_can(url)]
|
|
if len(ytdl_urls) > 0:
|
|
ytdl_res = self.handle(ytdl_urls, message, hashtags, self.download_ytdl)
|
|
|
|
raw_res = False
|
|
normal_urls = [url for url in urls if not self.ytdl_can(url)]
|
|
if len(normal_urls) > 0:
|
|
file_urls = [
|
|
url
|
|
for url in normal_urls
|
|
if "text" not in requests.head(url).headers.get("Content-Type", "text")
|
|
]
|
|
if len(file_urls) > 0:
|
|
raw_res = self.handle(file_urls, message, hashtags, self.download_raw)
|
|
|
|
return ytdl_res or raw_res
|
|
|
|
def tg_handle(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
hashtags = self._get_hashtags(update.message)
|
|
if hashtags:
|
|
url_res = self.handle_urls(
|
|
update.message, self._get_hashtags(update.message)
|
|
)
|
|
if url_res:
|
|
return
|
|
|
|
msg_res = self.handle_tg_message(
|
|
update.message, context.bot, self._get_hashtags(update.message)
|
|
)
|
|
if msg_res:
|
|
return
|
|
|
|
hashtags = self.extract_hashtags(update.message)
|
|
if len(hashtags) > 0:
|
|
self.handle_text(
|
|
update.message.reply_to_message or update.message, hashtags
|
|
)
|
|
|
|
if update.message.reply_to_message:
|
|
self.handle_tg_message(update.message.reply_to_message, context.bot, hashtags)
|
|
self.handle_urls(update.message.reply_to_message, hashtags)
|
|
else:
|
|
self.last_hashtags[update.message.chat.id] = (
|
|
update.message.from_user,
|
|
datetime.now(),
|
|
hashtags,
|
|
)
|
|
else:
|
|
if self.markov and update.message.text:
|
|
self.markov.add_to_corpus(update.message.text)
|
|
|
|
def _get_tag_dirs(self):
|
|
return (
|
|
list(
|
|
filter(
|
|
lambda x: x.upper() == x,
|
|
filter(
|
|
lambda directory: os.path.isdir(
|
|
os.path.join(self.out_dir, directory)
|
|
),
|
|
os.listdir(self.out_dir),
|
|
),
|
|
)
|
|
)
|
|
+ list(self.redirects.keys())
|
|
)
|
|
|
|
def tg_stats(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() + "!")
|
|
if self.markov and random() > 0.7
|
|
else "nope."
|
|
)
|
|
return
|
|
tag_dirs = self._get_tag_dirs()
|
|
reply = "Total number of tags: {}\n\n".format(len(tag_dirs))
|
|
counts = [
|
|
(directory, os.listdir(os.path.join(self.out_dir, directory)))
|
|
for directory in tag_dirs
|
|
] # TODO REDIRECTS
|
|
counts.sort(key=itemgetter(0))
|
|
counts.sort(key=lambda x: len(x[1]), reverse=True)
|
|
for directory, files in counts:
|
|
if len(files) == 1:
|
|
break
|
|
abs_paths = [
|
|
os.path.join(self.out_dir, directory, file) for file in files
|
|
] # TODO REDIRECTS
|
|
abs_files = list(filter(os.path.isfile, abs_paths))
|
|
# mimes = [magic.from_file(path, mime=True).split("/")[0] for path in abs_files]
|
|
# mime_counts = [(mime, mimes.count(mime)) for mime in set(mimes)]
|
|
exts = [
|
|
ext[1:]
|
|
for ext in [os.path.splitext(path)[1] for path in abs_files]
|
|
if len(ext) > 0
|
|
]
|
|
ext_counts = [(ext, exts.count(ext)) for ext in set(exts)]
|
|
dir_cnt = len(abs_paths) - len(abs_files)
|
|
type_counts = ext_counts + (
|
|
[("directorie", dir_cnt)] if dir_cnt > 0 else []
|
|
)
|
|
details = ", ".join(
|
|
[
|
|
"{} {}s".format(cnt, mime)
|
|
for mime, cnt in sorted(
|
|
type_counts, key=itemgetter(1), reverse=True
|
|
)
|
|
]
|
|
)
|
|
if len(type_counts) == 1:
|
|
reply += "<b>{}:</b> {}\n".format(directory, details)
|
|
else:
|
|
reply += "<b>{}:</b> {} files ({})\n".format(
|
|
directory, len(files), details
|
|
)
|
|
orphans = list(filter(lambda cnt: len(cnt[1]) <= 1, counts))
|
|
if len(orphans) > 0:
|
|
reply += "\nFollowing tags are orphans: " + ", ".join(
|
|
map(itemgetter(0), orphans)
|
|
)
|
|
update.message.reply_text(reply, parse_mode=telegram.ParseMode.HTML)
|
|
|
|
def _get_orphan_tags(self):
|
|
result = []
|
|
for directory in self._get_tag_dirs():
|
|
files = os.listdir(os.path.join(self.out_dir, directory))
|
|
if len(files) == 1:
|
|
result.append((directory, files[0]))
|
|
if len(files) == 0:
|
|
result.append((directory, "NO FILE AT ALL..."))
|
|
return sorted(result, key=itemgetter(0))
|
|
|
|
def tg_orphan(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() + "!")
|
|
if self.markov and random() > 0.7
|
|
else "nope."
|
|
)
|
|
return
|
|
orphans = self._get_orphan_tags()
|
|
if len(orphans) == 0:
|
|
update.message.reply_text("Good job, no orphan tags!")
|
|
else:
|
|
update.message.reply_text(
|
|
"The following tags only contain a single file:\n"
|
|
+ ", ".join(map(itemgetter(0), orphans))
|
|
)
|
|
|
|
def tg_orphan_full(self, _, update):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() + "!")
|
|
if self.markov and random() > 0.7
|
|
else "nope."
|
|
)
|
|
return
|
|
orphans = self._get_orphan_tags()
|
|
if len(orphans) == 0:
|
|
update.message.reply_text("Good job, no orphan tags!")
|
|
else:
|
|
tmp_reply = "The following tags only contain a single file:\n"
|
|
for directory, file in orphans:
|
|
line = "{}: {}\n".format(directory, file)
|
|
if len(tmp_reply + line) > 4096:
|
|
update.message.reply_text(tmp_reply)
|
|
tmp_reply = ""
|
|
tmp_reply += line
|
|
if len(tmp_reply) > 0:
|
|
update.message.reply_text(tmp_reply)
|
|
|
|
def tg_retag(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
if self.last_downloaded.get(update.message.chat.id) is not None:
|
|
files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id]
|
|
out_dir = self.redirects.get(hashtags[0], self.out_dir)
|
|
mp3s = [filename for filename in files if filename.endswith("mp3")]
|
|
if len(mp3s) > 0:
|
|
arg_raw = re.sub(r"^/[@\w]+ ?", "", update.message.text).strip()
|
|
artist, title = None, None
|
|
|
|
reverse = len(arg_raw) == 0
|
|
if not reverse:
|
|
tagline = arg_raw.split(" - ")
|
|
if len(tagline) == 1:
|
|
title = tagline[0].strip()
|
|
else:
|
|
artist = tagline[0].strip()
|
|
title = tagline[1].strip()
|
|
|
|
for mp3 in mp3s:
|
|
if reverse:
|
|
orig_artist, orig_title = self._get_tags(mp3)
|
|
title, artist = orig_artist, orig_title
|
|
|
|
self._tag_file(mp3, artist, cast(str, title))
|
|
update.message.reply_text(
|
|
'Tagging "{}" as "{}" by "{}"!'.format(
|
|
mp3[len(out_dir) + 1 :], title, artist
|
|
)
|
|
)
|
|
else:
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() if self.markov and random() > 0.7 else "")
|
|
+ "???"
|
|
)
|
|
|
|
def tg_delete(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
if self.last_downloaded.get(update.message.chat.id) is not None:
|
|
files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id]
|
|
out_dir = self.redirects.get(hashtags[0], self.out_dir)
|
|
for file in files:
|
|
update.message.reply_text(
|
|
'Removing "{}"!'.format(file[len(out_dir) + 1 :])
|
|
)
|
|
os.remove(file)
|
|
parent_dir = os.path.dirname(file)
|
|
while True:
|
|
if len(os.listdir(parent_dir)) == 0:
|
|
update.message.reply_text(
|
|
'Removing directory "{}" as it\'s empty...'.format(
|
|
parent_dir[len(out_dir) + 1 :]
|
|
)
|
|
)
|
|
os.rmdir(parent_dir)
|
|
if parent_dir == out_dir:
|
|
break
|
|
parent_dir = os.path.dirname(parent_dir)
|
|
if len(tumblr_ids) > 0:
|
|
plural = (
|
|
"s (all {} of them)".format(len(tumblr_ids))
|
|
if len(tumblr_ids) > 1
|
|
else ""
|
|
)
|
|
update.message.reply_text("Also deleting tumblr post{}!".format(plural))
|
|
for tumblr_id in tumblr_ids:
|
|
if self.tumblr_client:
|
|
self.tumblr_client.delete_post(self.tumblr_name, tumblr_id)
|
|
self.last_downloaded[update.message.chat.id] = None
|
|
return
|
|
update.message.reply_text("Nothing to remove!")
|
|
|
|
def tg_protect(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
|
|
msg_split = update.message.text.split(" ")
|
|
if len(msg_split) != 3:
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() if self.markov and random() > 0.7 else "")
|
|
+ "???"
|
|
)
|
|
return
|
|
|
|
chat_in_db = self.db.get_chat(update.message.chat.id)
|
|
|
|
cmd = msg_split[1]
|
|
if cmd == "tag":
|
|
if chat_in_db and chat_in_db[1]:
|
|
tag = msg_split[2].upper()
|
|
tag_in_db = self.db.get_tag(tag)
|
|
if tag_in_db:
|
|
_, _, protected = tag_in_db
|
|
end_protected = not protected
|
|
else:
|
|
end_protected = True
|
|
|
|
self.db.set_tag_protected(tag, end_protected)
|
|
update.message.reply_text(
|
|
f"got it, will {'NOT ' if not end_protected else ''}protect tag {tag}!"
|
|
)
|
|
else:
|
|
update.message.reply_text(
|
|
(
|
|
self.markov.make_sentence()
|
|
if self.markov and random() > 0.7
|
|
else "hublubl"
|
|
)
|
|
)
|
|
elif cmd == "chat":
|
|
password = msg_split[2]
|
|
if password == self.protected_password:
|
|
if chat_in_db:
|
|
_, protected = chat_in_db
|
|
end_protected = not protected
|
|
else:
|
|
end_protected = True
|
|
|
|
self.db.set_chat_protected(update.message.chat.id, end_protected)
|
|
update.message.reply_text(
|
|
f"got it, will {'NOT ' if not end_protected else ''}protect this chat!"
|
|
)
|
|
else:
|
|
update.message.reply_text(
|
|
(
|
|
self.markov.make_sentence()
|
|
if self.markov and random() > 0.7
|
|
else "hublubl"
|
|
)
|
|
)
|
|
else:
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() if self.markov and random() > 0.7 else "")
|
|
+ "???"
|
|
)
|
|
|
|
def tg_queue(self, update: Update, context: CallbackContext):
|
|
if self.tumblr_client:
|
|
blog_info = self.tumblr_client.blog_info(self.tumblr_name)
|
|
update.message.reply_text(
|
|
"Currently queued tumblr posts: "
|
|
+ str(blog_info["blog"].get("queue", "???"))
|
|
)
|
|
else:
|
|
update.message.reply_text(
|
|
(self.markov.make_sentence() if self.markov and random() > 0.7 else "")
|
|
+ "???"
|
|
)
|
|
|
|
# noinspection PyMethodMayBeStatic
|
|
def tg_version(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
delojza_date = datetime.fromtimestamp(
|
|
os.path.getmtime(os.path.realpath(__file__))
|
|
).strftime("%Y/%m/%d - %H:%M:%S")
|
|
update.message.reply_text(
|
|
"delojza modified date: {}\nyoutube-dl version: {}".format(
|
|
delojza_date, YTDL_VERSION
|
|
)
|
|
)
|
|
|
|
def tg_start(self, update: Update, context: CallbackContext):
|
|
self._log_msg(update)
|
|
update.message.reply_text(
|
|
self.markov.make_sentence() if self.markov else "HELLO"
|
|
)
|
|
|
|
def tg_error(self, update: object, context: CallbackContext):
|
|
self.logger.error(context.error)
|
|
if isinstance(update, Update):
|
|
update.message.reply_text(f"Something is fucked: {context.error}")
|
|
|
|
def run_idle(self):
|
|
self.updater.start_polling()
|
|
self.logger.info("Started Telegram bot...")
|
|
self.updater.idle()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
_DIR_ = os.path.dirname(os.path.realpath(__file__))
|
|
CONFIG_PATHS = [
|
|
"/etc/delojza/delojza.ini",
|
|
os.path.join(os.getenv("HOME") or "", ".config/delojza/delojza.ini"),
|
|
os.path.join(_DIR_, "delojza.ini"),
|
|
]
|
|
|
|
config = ConfigParser()
|
|
try:
|
|
CONF_FILE = next(
|
|
conf_path for conf_path in CONFIG_PATHS if os.path.isfile(conf_path)
|
|
)
|
|
config.read(CONF_FILE)
|
|
except StopIteration:
|
|
logging.error("No config file found, quitting.")
|
|
sys.exit(-1)
|
|
|
|
try:
|
|
markov = MarkovBlabberer("initial.txt")
|
|
except FileNotFoundError:
|
|
logging.warning(
|
|
"Didn't find `initial.txt`, continuing without markov blabbering!"
|
|
)
|
|
markov = None
|
|
|
|
try:
|
|
redirects: Optional[List[Tuple[str, str]]] = config.items("redirects")
|
|
except NoSectionError:
|
|
redirects = None
|
|
|
|
try:
|
|
tumblr_keys = (
|
|
config.get("tumblr", "consumer_key"),
|
|
config.get("tumblr", "consumer_secret"),
|
|
config.get("tumblr", "oauth_key"),
|
|
config.get("tumblr", "oauth_secret"),
|
|
)
|
|
except (NoSectionError, KeyError):
|
|
tumblr_keys = None
|
|
|
|
delojza = DelojzaBot(
|
|
config.get("delojza", "tg_api_key"),
|
|
config.get("delojza", "OUT_DIR", fallback=os.path.join(_DIR_, "out")),
|
|
tmp_dir=config.get("delojza", "tmp_dir", fallback=tempfile.gettempdir()),
|
|
redirects=redirects,
|
|
protected_password=config.get("delojza", "protected_password", fallback=None),
|
|
acoustid_key=config.get("delojza", "acoustid_api_key", fallback=None),
|
|
tumblr_name=config.get("tumblr", "blog_name", fallback=None),
|
|
tumblr_keys=tumblr_keys,
|
|
markov=markov,
|
|
)
|
|
delojza.run_idle()
|