797 lines
34 KiB
Python
Executable file
797 lines
34 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import errno
|
|
import logging
|
|
import os
|
|
import pprint
|
|
import re
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import unicodedata
|
|
from configparser import ConfigParser
|
|
from datetime import datetime, timedelta
|
|
from glob import glob
|
|
from operator import itemgetter
|
|
from random import random
|
|
from time import sleep
|
|
|
|
import acoustid
|
|
import filetype
|
|
import markovify
|
|
import mutagen.id3
|
|
import pytumblr
|
|
import requests
|
|
import telegram
|
|
import youtube_dl
|
|
from mutagen.easyid3 import EasyID3
|
|
from telegram.ext import Updater, CommandHandler, MessageHandler
|
|
from youtube_dl import DownloadError
|
|
from youtube_dl.version import __version__ as YTDL_VERSION
|
|
|
|
|
|
def mkdir_p(path):
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as exc:
|
|
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
|
|
def datestr(date):
|
|
return date.strftime("%Y-%m-%d@%H%M")
|
|
|
|
|
|
class DelojzaDB:
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
self.db = None
|
|
|
|
def initialize(self):
|
|
if self.db is None:
|
|
self.db = sqlite3.connect(self.db_path)
|
|
|
|
def get_protected_tags(self):
|
|
results = self.db.execute("SELECT tag FROM tags WHERE protected == 1")
|
|
return [res[0] for res in results.fetchall()]
|
|
|
|
def get_protected_chats(self):
|
|
results = self.db.execute("SELECT id FROM chats WHERE protected == 1")
|
|
return [res[0] for res in results.fetchall()]
|
|
|
|
def get_chat(self, id):
|
|
return self.db.execute("SELECT id, protected FROM chats WHERE id == ?", (id,)).fetchone()
|
|
|
|
def set_chat_protected(self, id, protected):
|
|
chat_in_db = self.get_chat(id)
|
|
if chat_in_db:
|
|
self.db.execute("UPDATE chats SET protected = ? WHERE id = ?", (protected, id))
|
|
else:
|
|
self.db.execute("INSERT INTO chats (id, protected) VALUES (?, ?)", (id, protected))
|
|
self.db.commit()
|
|
|
|
def get_tag(self, tag):
|
|
return self.db.execute("SELECT id, tag, protected FROM tags WHERE tag == ?", (tag,)).fetchone()
|
|
|
|
def set_tag_protected(self, tag, protected):
|
|
tag_in_db = self.get_tag(tag)
|
|
if tag_in_db:
|
|
self.db.execute("UPDATE tags SET protected = ? WHERE tag = ?", (protected, tag))
|
|
else:
|
|
self.db.execute("INSERT INTO tags (tag, protected) VALUES (?, ?)", (tag, protected))
|
|
self.db.commit()
|
|
|
|
|
|
class DelojzaBot:
|
|
def __init__(self, tg_api_key, out_dir, tmp_dir=None, db_path=None, protected_password=None,
|
|
acoustid_key=None, tumblr_name=None, tumblr_keys=None, markov=None):
|
|
self._setup_logging(os.path.dirname(os.path.realpath(__file__)))
|
|
|
|
self.db = DelojzaDB(db_path or os.path.join(os.path.dirname(os.path.realpath(__file__)), "delojza.db"))
|
|
|
|
self.out_dir = os.path.abspath(out_dir)
|
|
self.logger.debug('OUT_DIR: ' + out_dir)
|
|
self.tmp_dir = tmp_dir if tmp_dir else tempfile.gettempdir()
|
|
self.logger.debug('TMP_DIR: ' + tmp_dir)
|
|
self.markov = markov
|
|
|
|
self.updater = Updater(tg_api_key)
|
|
dp = self.updater.dispatcher
|
|
|
|
dp.add_handler(CommandHandler("start", self.tg_start))
|
|
dp.add_error_handler(self.tg_error)
|
|
dp.add_handler(CommandHandler("stats", self.tg_stats))
|
|
dp.add_handler(CommandHandler("orphans", self.tg_orphan))
|
|
dp.add_handler(CommandHandler("orphans_full", self.tg_orphan_full))
|
|
dp.add_handler(CommandHandler("retag", self.tg_retag))
|
|
dp.add_handler(CommandHandler("delete", self.tg_delete))
|
|
dp.add_handler(CommandHandler("protect", self.tg_protect))
|
|
dp.add_handler(CommandHandler("version", self.tg_version))
|
|
dp.add_handler(CommandHandler("queue", self.tg_queue))
|
|
dp.add_handler(MessageHandler(None, self.tg_handle))
|
|
|
|
self.acoustid_key = acoustid_key
|
|
|
|
if tumblr_name and tumblr_keys:
|
|
self.tumblr_name = tumblr_name
|
|
self.tumblr_client = pytumblr.TumblrRestClient(*tumblr_keys)
|
|
else:
|
|
self.tumblr_client = None
|
|
|
|
self.protected_password = protected_password
|
|
self.last_downloaded = {}
|
|
self.last_hashtags = {}
|
|
|
|
def _setup_logging(self, log_path):
|
|
self.logger = logging.getLogger("delojza")
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.INFO)
|
|
|
|
dfh = logging.FileHandler(log_path + "/delojza.log")
|
|
dfh.setLevel(logging.DEBUG)
|
|
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s [%(levelname)s] %(message)s')
|
|
|
|
ch.setFormatter(formatter)
|
|
dfh.setFormatter(formatter)
|
|
|
|
self.logger.addHandler(ch)
|
|
self.logger.addHandler(dfh)
|
|
|
|
def _log_msg(self, update):
|
|
from_user = update.message.from_user
|
|
self.logger.debug(f"Received from {from_user.username or (from_user.first_name + from_user.last_name)}"
|
|
f" ({update.message.chat.id}): " + (update.message.text or "<NONE>"))
|
|
|
|
@staticmethod
|
|
def ytdl_can(url):
|
|
ies = youtube_dl.extractor.gen_extractors()
|
|
for ie in ies:
|
|
if ie.suitable(url) and ie.IE_NAME != 'generic' \
|
|
and '/channel/' not in url:
|
|
# Site has dedicated extractor
|
|
return True
|
|
return False
|
|
|
|
# https://github.com/django/django/blob/master/django/utils/text.py#L393
|
|
@staticmethod
|
|
def sanitize(filepath):
|
|
if filepath is None:
|
|
return None
|
|
filepath = unicodedata.normalize('NFKD', filepath).encode('ascii', 'ignore').decode('ascii')
|
|
return re.sub(r'[^\w.()\[\]{}#-]', '_', filepath)
|
|
|
|
@staticmethod
|
|
def _get_tags(filepath):
|
|
try:
|
|
audio = EasyID3(filepath)
|
|
return audio["artist"][0] if audio["artist"] else None, audio["title"][0] if audio["title"] else None
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _tag_file(filepath, artist, title):
|
|
try:
|
|
id3 = mutagen.id3.ID3(filepath)
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
mutafile = mutagen.File(filepath)
|
|
mutafile.add_tags()
|
|
mutafile.save()
|
|
id3 = mutagen.id3.ID3(filepath)
|
|
id3.add(mutagen.id3.TIT2(encoding=3, text=title))
|
|
if artist:
|
|
id3.add(mutagen.id3.TOPE(encoding=3, text=artist))
|
|
id3.add(mutagen.id3.TPE1(encoding=3, text=artist))
|
|
id3.save()
|
|
|
|
def _autotag_file(self, filepath, message, info=None):
|
|
if info is None:
|
|
info = {}
|
|
|
|
title = None
|
|
artist = None
|
|
source = None
|
|
|
|
best_acoustid_score = 0
|
|
|
|
if self.acoustid_key:
|
|
try:
|
|
self.logger.debug("Requesting AcoustID for {}".format(filepath))
|
|
results = sorted(acoustid.match(self.acoustid_key, filepath), key=itemgetter(0), reverse=True)
|
|
if len(results) > 0:
|
|
score, rid, aid_title, aid_artist = results[0]
|
|
if score > .4:
|
|
title = aid_title
|
|
artist = re.sub(r' *; +', ' & ', aid_artist)
|
|
best_acoustid_score = score
|
|
source = "AcoustID ({}%)".format(round(score * 100))
|
|
except acoustid.NoBackendError:
|
|
self.logger.warning("chromaprint library/tool not found")
|
|
except acoustid.FingerprintGenerationError:
|
|
self.logger.warning("fingerprint could not be calculated")
|
|
except acoustid.WebServiceError as exc:
|
|
self.logger.warning("web service request failed: {}".format(exc.message))
|
|
|
|
if best_acoustid_score < .8:
|
|
if 'track' in info:
|
|
title = info['track']
|
|
if 'artist' in info:
|
|
artist = info['artist']
|
|
|
|
if 'track' in info or 'artist' in info:
|
|
source = "supplied metadata"
|
|
|
|
if title is None and artist is None and '-' in info.get("title", ""):
|
|
split = info['title'].split("-")
|
|
artist = split[0]
|
|
title = split[1]
|
|
source = "fallback (artist - title)"
|
|
|
|
if title is None and 'title' in info:
|
|
title = info['title']
|
|
source = "full title fallback"
|
|
|
|
if 'soundcloud' in info.get("extractor", "") and artist is None:
|
|
artist = info['uploader']
|
|
source = "soundcloud \"fallback\""
|
|
|
|
artist = artist.strip() if artist else None
|
|
title = title.strip() if title else None
|
|
|
|
if title is None and artist is None:
|
|
message.reply_text("Tried tagging, found nothing :(")
|
|
return
|
|
|
|
message.reply_text("Tagging as \"{}\" by \"{}\"\nvia {}".format(title, artist, source))
|
|
self.logger.info("Tagging {} w/ {} - {} [{}]...".format(filepath, title, artist, source))
|
|
self._tag_file(filepath, artist, title)
|
|
|
|
def _get_percent_filled(self):
|
|
output = subprocess.check_output(["df", self.out_dir])
|
|
percents_re = re.search(r"[0-9]+%", output.decode('utf-8'))
|
|
if not percents_re:
|
|
raise RuntimeError
|
|
return int(percents_re.group(0)[:-1])
|
|
|
|
# noinspection PyUnusedLocal
|
|
def download_ytdl(self, urls, out_path, date, message, audio=False, filetitle=None):
|
|
ytdl = {
|
|
'noplaylist': True,
|
|
'restrictfilenames': True,
|
|
'outtmpl': os.path.join(self.tmp_dir, '{}__%(title)s__%(id)s.%(ext)s'.format(datestr(date)))
|
|
}
|
|
if audio:
|
|
ytdl['format'] = 'bestaudio/best'
|
|
ytdl['postprocessors'] = [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '256'
|
|
}]
|
|
ytdl['postprocessor_args'] = ['-ar', '44100']
|
|
filenames = []
|
|
with youtube_dl.YoutubeDL(ytdl) as ytdl:
|
|
attempts = 0
|
|
while True:
|
|
try:
|
|
ytdl.download(urls)
|
|
break
|
|
except DownloadError as exc:
|
|
attempts += 1
|
|
if '403' in str(exc) and attempts < 5:
|
|
self.logger.warning("Received a 403!")
|
|
sleep(1.357)
|
|
if self.markov:
|
|
message.reply_text(self.markov.make_sentence)
|
|
else:
|
|
raise exc
|
|
for info in [ytdl.extract_info(url, download=False) for url in urls]:
|
|
filename = ytdl.prepare_filename(info)
|
|
globbeds = glob(os.path.splitext(filename)[0] + '.*')
|
|
for globbed in globbeds:
|
|
if globbed.endswith("mp3"):
|
|
self._autotag_file(globbed, message, info=info)
|
|
self.logger.info("Moving %s to %s..." % (globbed, out_path))
|
|
dest = shutil.move(globbed, out_path)
|
|
filenames.append(dest)
|
|
return filenames
|
|
|
|
def download_raw(self, urls, out_path, date, message, audio=False, filetitle=None):
|
|
filenames = []
|
|
for url in urls:
|
|
local_filename = os.path.join(out_path, "{}__{}".format(datestr(date),
|
|
self.sanitize(filetitle or url.split('/')[-1])))
|
|
final_filename = local_filename
|
|
is_mp3 = local_filename.endswith("mp3")
|
|
|
|
r = requests.get(url, stream=True)
|
|
with open(local_filename, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
if not re.match(r'.*\..{3,5}$', os.path.split(local_filename)[-1]):
|
|
kind = filetype.guess(local_filename)
|
|
if kind is None:
|
|
self.logger.error("File has no extension and could not be determined!")
|
|
else:
|
|
self.logger.info('Moving file without extension... %s?' % kind.extension)
|
|
final_filename = shutil.move(local_filename, local_filename + '.' + kind.extension)
|
|
is_mp3 = kind.extension == "mp3"
|
|
|
|
filenames.append(final_filename)
|
|
|
|
if audio and is_mp3:
|
|
try:
|
|
id3 = mutagen.id3.ID3(final_filename)
|
|
untagged = 'TIT2' not in id3
|
|
except mutagen.id3.ID3NoHeaderError:
|
|
untagged = True
|
|
if untagged:
|
|
self._autotag_file(final_filename, message)
|
|
|
|
return filenames
|
|
|
|
@staticmethod
|
|
def extract_hashtags(message):
|
|
hashtags = list(map(message.parse_entity,
|
|
list(filter(lambda e: e.type == 'hashtag', message.entities))))
|
|
hashtags += list(map(message.parse_caption_entity,
|
|
list(filter(lambda e: e.type == 'hashtag', message.caption_entities))))
|
|
if len(hashtags) > 0:
|
|
hashtags = [hashtag[1:].upper() for hashtag in hashtags]
|
|
for i, hashtag in enumerate(hashtags):
|
|
if "PRAS" in hashtag:
|
|
hashtags[i] = "PRAS"
|
|
return hashtags
|
|
|
|
def _get_hashtags(self, message):
|
|
hashtags = self.extract_hashtags(message)
|
|
if len(hashtags) == 0 and self.last_hashtags.get(message.chat.id) is not None:
|
|
user, ts, last_hashtags = self.last_hashtags[message.chat.id]
|
|
if user == message.from_user and ts > datetime.now() - timedelta(hours=1):
|
|
hashtags = last_hashtags
|
|
return hashtags
|
|
|
|
def handle_text(self, message, hashtags):
|
|
if len(hashtags) == 0 or hashtags[0] not in ('TEXT', 'TXT'):
|
|
return
|
|
|
|
info_line = self.sanitize("-".join(re.sub(r'#[\w]+', '', message.text).strip().split()[:7]))
|
|
if len(info_line) > 64:
|
|
info_line = info_line[:64]
|
|
|
|
filename = '{}__{}.txt'.format(datestr(message.date), info_line)
|
|
|
|
out_path = os.path.join(self.out_dir, *hashtags[1:] or ['TEXT'])
|
|
file_path = os.path.join(out_path, filename)
|
|
|
|
mkdir_p(out_path)
|
|
|
|
with open(file_path, 'w') as out_file:
|
|
out_file.write(message.text)
|
|
|
|
message.reply_text("Saved text to \"{}\"...".format(os.path.join(*hashtags[1:] or ['TEXT'], filename)))
|
|
|
|
# noinspection PyBroadException
|
|
def handle(self, urls, message, hashtags, download_fn, filetitle=None):
|
|
self.db.initialize()
|
|
|
|
try:
|
|
if self._get_percent_filled() >= 98:
|
|
message.reply("NO! Less than 2% of drive space left :(")
|
|
return
|
|
except Exception:
|
|
message.reply("NO! Couldn't figure out how much space is left???")
|
|
return
|
|
|
|
try:
|
|
if len(hashtags) == 0:
|
|
self.logger.info("Ignoring %s due to no hashtag present..." % urls)
|
|
return False
|
|
|
|
if any(hashtag in self.db.get_protected_tags() for hashtag in hashtags):
|
|
if message.chat.id not in self.db.get_protected_chats():
|
|
self.logger.info("Redirecting {} in chat {} due to protected hashtags: {}..."
|
|
.format(urls, message.chat.title, hashtags))
|
|
hashtags.insert(0, "PUBLIC")
|
|
|
|
for i in range(len(hashtags)):
|
|
current_path = hashtags[:i + 1]
|
|
if not os.path.isdir(os.path.join(self.out_dir, *current_path)):
|
|
test_path = current_path
|
|
test_path[-1] = "_" + test_path[-1]
|
|
if os.path.isdir(os.path.join(self.out_dir, *test_path)):
|
|
self.logger.debug(f"Rerouting {current_path[-1]} to _{test_path[-1]}")
|
|
hashtags[i] = test_path[-1]
|
|
|
|
self.last_hashtags[message.chat.id] = None
|
|
|
|
self.logger.info("Downloading %s under '%s'" % (urls, "/".join(hashtags)))
|
|
|
|
out_path = os.path.join(self.out_dir, *hashtags)
|
|
mkdir_p(out_path)
|
|
|
|
reply = 'Downloading to "{}"...'.format("/".join(hashtags))
|
|
|
|
audio = any([any([tag in hashtag for tag in ('AUDIO', 'RADIO')]) for hashtag in hashtags])
|
|
if audio and download_fn != self.download_raw:
|
|
reply += ' (And also guessing you want to extract the audio)'
|
|
message.reply_text(reply)
|
|
|
|
filenames = download_fn(urls, out_path, message.date, message, audio=audio, filetitle=filetitle)
|
|
|
|
cmd_hashtag = hashtags[0]
|
|
|
|
tumblr_ids = []
|
|
if cmd_hashtag in ('TUMBLR', 'TUMBLR_NOW') and self.tumblr_client:
|
|
now = cmd_hashtag == 'TUMBLR_NOW'
|
|
reply = '(btw, {})'.format("***FIRING TO TUMBLR RIGHT AWAY***" if now else "queueing to tumblr")
|
|
message.reply_text(reply, parse_mode=telegram.ParseMode.MARKDOWN)
|
|
for filename in filenames:
|
|
if filename.endswith(".mp4"):
|
|
try:
|
|
output_filename = filename[:-len(".mp4")] + ".gif"
|
|
subprocess.check_output(['ffmpeg', '-i', filename, output_filename])
|
|
filename = output_filename
|
|
except subprocess.CalledProcessError:
|
|
message.reply_text("Conversion to gif failed, sorry! Check log...")
|
|
continue
|
|
response = self.tumblr_client.create_photo(self.tumblr_name, data=filename,
|
|
state="published" if now else "queue")
|
|
if 'id' in response:
|
|
tumblr_ids.append(response['id'])
|
|
else:
|
|
self.logger.warning("Did not receive 'id' in tumblr response: \n" + pprint.pformat(response))
|
|
message.reply_text('Something weird happened with the tumblrs, check it!')
|
|
|
|
self.last_downloaded[message.chat.id] = filenames, hashtags, tumblr_ids
|
|
return True
|
|
except:
|
|
exc_type, exc_value, __ = sys.exc_info()
|
|
if "Timed out" not in str(exc_value):
|
|
message.reply_text("Something is FUCKED: [{}] {}".format(exc_type, exc_value))
|
|
return False
|
|
|
|
def handle_tg_message(self, message, bot, hashtag):
|
|
file, filetitle, tumblr = None, None, False
|
|
if len(message.photo) > 0:
|
|
photo = max(message.photo, key=lambda p: p.width)
|
|
file = photo.file_id
|
|
elif message.document is not None:
|
|
filetitle = message.document.file_name
|
|
file = message.document.file_id
|
|
elif message.audio is not None:
|
|
filetitle = message.audio.title
|
|
file = message.audio.file_id
|
|
elif message.video is not None:
|
|
file = message.video.file_id
|
|
elif message.video_note is not None:
|
|
file = message.video_note.file_id
|
|
elif message.voice is not None:
|
|
file = message.voice.file_id
|
|
|
|
if file is not None:
|
|
url = bot.getFile(file).file_path
|
|
return self.handle([url], message, hashtag, self.download_raw, filetitle=filetitle)
|
|
else:
|
|
return False
|
|
|
|
def handle_urls(self, message, hashtags):
|
|
urls = list(map(lambda e: message.parse_entity(e),
|
|
filter(lambda e: e.type == 'url', message.entities)))
|
|
|
|
ytdl_res = False
|
|
ytdl_urls = [url for url in urls if self.ytdl_can(url)]
|
|
if len(ytdl_urls) > 0:
|
|
ytdl_res = self.handle(ytdl_urls, message, hashtags, self.download_ytdl)
|
|
|
|
raw_res = False
|
|
normal_urls = [url for url in urls if not self.ytdl_can(url)]
|
|
if len(normal_urls) > 0:
|
|
file_urls = [url for url in normal_urls if
|
|
"text" not in requests.head(url).headers.get("Content-Type", "text")]
|
|
if len(file_urls) > 0:
|
|
raw_res = self.handle(file_urls, message, hashtags, self.download_raw)
|
|
|
|
return ytdl_res or raw_res
|
|
|
|
def tg_handle(self, bot, update):
|
|
self._log_msg(update)
|
|
hashtags = self._get_hashtags(update.message)
|
|
if hashtags:
|
|
url_res = self.handle_urls(update.message, self._get_hashtags(update.message))
|
|
if url_res:
|
|
return
|
|
|
|
msg_res = self.handle_tg_message(update.message, bot, self._get_hashtags(update.message))
|
|
if msg_res:
|
|
return
|
|
|
|
hashtags = self.extract_hashtags(update.message)
|
|
if len(hashtags) > 0:
|
|
self.handle_text(update.message.reply_to_message or update.message, hashtags)
|
|
|
|
if update.message.reply_to_message:
|
|
self.handle_tg_message(update.message.reply_to_message, bot, hashtags)
|
|
self.handle_urls(update.message.reply_to_message, hashtags)
|
|
else:
|
|
self.last_hashtags[update.message.chat.id] = update.message.from_user, datetime.now(), hashtags
|
|
else:
|
|
if self.markov and update.message.text:
|
|
self.markov.add_to_corpus(update.message.text)
|
|
|
|
def _get_tag_dirs(self):
|
|
return list(filter(lambda x: x.upper() == x,
|
|
filter(lambda directory: os.path.isdir(os.path.join(self.out_dir, directory)),
|
|
os.listdir(self.out_dir))))
|
|
|
|
def tg_stats(self, _, update):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text((self.markov.make_sentence() + "!") if self.markov and random() > .7 else "nope.")
|
|
return
|
|
tag_dirs = self._get_tag_dirs()
|
|
reply = "Total number of tags: {}\n\n".format(len(tag_dirs))
|
|
counts = [(directory, os.listdir(os.path.join(self.out_dir, directory))) for directory in tag_dirs]
|
|
counts.sort(key=itemgetter(0))
|
|
counts.sort(key=lambda x: len(x[1]), reverse=True)
|
|
for directory, files in counts:
|
|
if len(files) == 1:
|
|
break
|
|
abs_paths = [os.path.join(self.out_dir, directory, file) for file in files]
|
|
abs_files = list(filter(os.path.isfile, abs_paths))
|
|
# mimes = [magic.from_file(path, mime=True).split("/")[0] for path in abs_files]
|
|
# mime_counts = [(mime, mimes.count(mime)) for mime in set(mimes)]
|
|
exts = [ext[1:] for ext in [os.path.splitext(path)[1] for path in abs_files] if len(ext) > 0]
|
|
ext_counts = [(ext, exts.count(ext)) for ext in set(exts)]
|
|
dir_cnt = len(abs_paths) - len(abs_files)
|
|
type_counts = ext_counts + ([("directorie", dir_cnt)] if dir_cnt > 0 else [])
|
|
details = ", ".join(["{} {}s".format(cnt, mime) for mime, cnt in
|
|
sorted(type_counts, key=itemgetter(1), reverse=True)])
|
|
if len(type_counts) == 1:
|
|
reply += "<b>{}:</b> {}\n".format(directory, details)
|
|
else:
|
|
reply += "<b>{}:</b> {} files ({})\n".format(directory, len(files), details)
|
|
orphans = list(filter(lambda cnt: len(cnt[1]) <= 1, counts))
|
|
if len(orphans) > 0:
|
|
reply += "\nFollowing tags are orphans: " + ", ".join(map(itemgetter(0), orphans))
|
|
update.message.reply_text(reply, parse_mode=telegram.ParseMode.HTML)
|
|
|
|
def _get_orphan_tags(self):
|
|
result = []
|
|
for directory in self._get_tag_dirs():
|
|
files = os.listdir(os.path.join(self.out_dir, directory))
|
|
if len(files) == 1:
|
|
result.append((directory, files[0]))
|
|
if len(files) == 0:
|
|
result.append((directory, "NO FILE AT ALL..."))
|
|
return sorted(result, key=itemgetter(0))
|
|
|
|
def tg_orphan(self, _, update):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text((self.markov.make_sentence() + "!") if self.markov and random() > .7 else "nope.")
|
|
return
|
|
orphans = self._get_orphan_tags()
|
|
if len(orphans) == 0:
|
|
update.message.reply_text("Good job, no orphan tags!")
|
|
else:
|
|
update.message.reply_text("The following tags only contain a single file:\n" +
|
|
", ".join(map(itemgetter(0), orphans)))
|
|
|
|
def tg_orphan_full(self, _, update):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
if update.message.chat.id not in self.db.get_protected_chats():
|
|
update.message.reply_text((self.markov.make_sentence() + "!") if self.markov and random() > .7 else "nope.")
|
|
return
|
|
orphans = self._get_orphan_tags()
|
|
if len(orphans) == 0:
|
|
update.message.reply_text("Good job, no orphan tags!")
|
|
else:
|
|
tmp_reply = "The following tags only contain a single file:\n"
|
|
for directory, file in orphans:
|
|
line = "{}: {}\n".format(directory, file)
|
|
if len(tmp_reply + line) > 4096:
|
|
update.message.reply_text(tmp_reply)
|
|
tmp_reply = ""
|
|
tmp_reply += line
|
|
if len(tmp_reply) > 0:
|
|
update.message.reply_text(tmp_reply)
|
|
|
|
def tg_retag(self, _, update):
|
|
self._log_msg(update)
|
|
if self.last_downloaded.get(update.message.chat.id) is not None:
|
|
files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id]
|
|
mp3s = [filename for filename in files if filename.endswith("mp3")]
|
|
if len(mp3s) > 0:
|
|
arg_raw = re.sub(r'^/[@\w]+ ?', '', update.message.text).strip()
|
|
artist, title = None, None
|
|
|
|
reverse = len(arg_raw) == 0
|
|
if not reverse:
|
|
tagline = arg_raw.split(" - ")
|
|
if len(tagline) == 1:
|
|
title = tagline[0].strip()
|
|
else:
|
|
artist = tagline[0].strip()
|
|
title = tagline[1].strip()
|
|
|
|
for mp3 in mp3s:
|
|
if reverse:
|
|
orig_artist, orig_title = self._get_tags(mp3)
|
|
title, artist = orig_artist, orig_title
|
|
|
|
self._tag_file(mp3, artist, title)
|
|
update.message.reply_text("Tagging \"{}\" as \"{}\" by \"{}\"!"
|
|
.format(mp3[len(self.out_dir) + 1:], title, artist))
|
|
else:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "") + "???")
|
|
|
|
def tg_delete(self, _, update):
|
|
self._log_msg(update)
|
|
if self.last_downloaded.get(update.message.chat.id) is not None:
|
|
files, hashtags, tumblr_ids = self.last_downloaded[update.message.chat.id]
|
|
for file in files:
|
|
update.message.reply_text("Removing \"{}\"!".format(file[len(self.out_dir) + 1:]))
|
|
os.remove(file)
|
|
parent_dir = os.path.dirname(file)
|
|
while True:
|
|
if len(os.listdir(parent_dir)) == 0:
|
|
update.message.reply_text("Removing directory \"{}\" as it's empty..."
|
|
.format(parent_dir[len(self.out_dir) + 1:]))
|
|
os.rmdir(parent_dir)
|
|
parent_dir = os.path.dirname(parent_dir)
|
|
if parent_dir == self.out_dir:
|
|
break
|
|
if len(tumblr_ids) > 0:
|
|
plural = "s (all {} of them)".format(len(tumblr_ids)) if len(tumblr_ids) > 1 else ""
|
|
update.message.reply_text("Also deleting tumblr post{}!".format(plural))
|
|
for tumblr_id in tumblr_ids:
|
|
if self.tumblr_client:
|
|
self.tumblr_client.delete_post(self.tumblr_name, tumblr_id)
|
|
self.last_downloaded[update.message.chat.id] = None
|
|
return
|
|
update.message.reply_text("Nothing to remove!")
|
|
|
|
def tg_protect(self, _, update):
|
|
self._log_msg(update)
|
|
self.db.initialize()
|
|
|
|
msg_split = update.message.text.split(" ")
|
|
if len(msg_split) != 3:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "") + "???")
|
|
return
|
|
|
|
chat_in_db = self.db.get_chat(update.message.chat.id)
|
|
|
|
cmd = msg_split[1]
|
|
if cmd == 'tag':
|
|
if chat_in_db and chat_in_db[1]:
|
|
tag = msg_split[2].upper()
|
|
tag_in_db = self.db.get_tag(tag)
|
|
if tag_in_db:
|
|
_, _, protected = tag_in_db
|
|
end_protected = not protected
|
|
else:
|
|
end_protected = True
|
|
|
|
self.db.set_tag_protected(tag, end_protected)
|
|
update.message.reply_text(f"got it, will {'NOT ' if not end_protected else ''}protect tag {tag}!")
|
|
else:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "hublubl"))
|
|
elif cmd == 'chat':
|
|
password = msg_split[2]
|
|
if password == self.protected_password:
|
|
if chat_in_db:
|
|
_, protected = chat_in_db
|
|
end_protected = not protected
|
|
else:
|
|
end_protected = True
|
|
|
|
self.db.set_chat_protected(update.message.chat.id, end_protected)
|
|
update.message.reply_text(f"got it, will {'NOT ' if not end_protected else ''}protect this chat!")
|
|
else:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "hublubl"))
|
|
else:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "") + "???")
|
|
|
|
def tg_queue(self, _, update):
|
|
if self.tumblr_client:
|
|
blog_info = self.tumblr_client.blog_info(self.tumblr_name)
|
|
update.message.reply_text("Currently queued tumblr posts: " + str(blog_info['blog'].get('queue', "???")))
|
|
else:
|
|
update.message.reply_text((self.markov.make_sentence() if self.markov and random() > .7 else "") + "???")
|
|
|
|
# noinspection PyMethodMayBeStatic
|
|
def tg_version(self, _, update):
|
|
self._log_msg(update)
|
|
delojza_date = datetime.fromtimestamp(os.path.getmtime(os.path.realpath(__file__))) \
|
|
.strftime('%Y/%m/%d - %H:%M:%S')
|
|
update.message.reply_text("delojza modified date: {}\nyoutube-dl version: {}"
|
|
.format(delojza_date, YTDL_VERSION))
|
|
|
|
def tg_start(self, _, update):
|
|
self._log_msg(update)
|
|
update.message.reply_text(self.markov.make_sentence() if self.markov else "HELLO")
|
|
|
|
def tg_error(self, bot, update, error):
|
|
self.logger.error(error)
|
|
if "Timed out" in str(error):
|
|
if update is not None:
|
|
default = "Mmmm, I like it..."
|
|
update.message.reply_text((self.markov.make_sentence(tries=100) if self.markov else default) or default)
|
|
self.tg_handle(bot, update)
|
|
else:
|
|
if update is not None:
|
|
update.message.reply_text("Something is fucked: %s" % error)
|
|
|
|
def run_idle(self):
|
|
self.updater.start_polling()
|
|
self.logger.info("Started Telegram bot...")
|
|
self.updater.idle()
|
|
|
|
|
|
class MarkovBlabberer:
|
|
def __init__(self, filepath):
|
|
self.logger = logging.getLogger('markov')
|
|
self.filepath = filepath
|
|
|
|
with open(filepath) as f:
|
|
text = f.read()
|
|
self.markov = markovify.NewlineText(text.lower())
|
|
self.logger.info("Sentence of the day: " + self.make_sentence())
|
|
|
|
def make_sentence(self, tries=100):
|
|
return self.markov.make_sentence(tries=tries) or "???"
|
|
|
|
def add_to_corpus(self, text):
|
|
text = text.lower()
|
|
new_sentence = markovify.NewlineText(text)
|
|
self.markov = markovify.combine([self.markov, new_sentence])
|
|
with open(self.filepath, 'a') as f:
|
|
f.write(text + '\n')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
_DIR_ = os.path.dirname(os.path.realpath(__file__))
|
|
CONFIG_PATHS = ['/etc/delojza/delojza.ini',
|
|
os.path.join(os.getenv("HOME") or "", ".config/delojza/delojza.ini"),
|
|
os.path.join(_DIR_, "delojza.ini")]
|
|
|
|
config = ConfigParser()
|
|
try:
|
|
CONF_FILE = next(conf_path for conf_path in CONFIG_PATHS if os.path.isfile(conf_path))
|
|
config.read(CONF_FILE)
|
|
except StopIteration:
|
|
logging.error("No config file found, stopping.")
|
|
sys.exit(-1)
|
|
|
|
try:
|
|
markov = MarkovBlabberer("initial.txt")
|
|
except FileNotFoundError:
|
|
logging.warning("Didn't find `initial.txt`, continuing without markov blabbering!")
|
|
markov = None
|
|
|
|
delojza = DelojzaBot(config.get('delojza', 'tg_api_key'),
|
|
config.get('delojza', 'OUT_DIR', fallback=os.path.join(_DIR_, "out")),
|
|
tmp_dir=config.get('delojza', 'tmp_dir', fallback=tempfile.gettempdir()),
|
|
protected_password=config.get('delojza', 'protected_password', fallback=None),
|
|
acoustid_key=config.get('delojza', 'acoustid_api_key', fallback=None),
|
|
tumblr_name=config.get('tumblr', 'blog_name', fallback=None),
|
|
tumblr_keys=(config.get('tumblr', 'consumer_key', fallback=None),
|
|
config.get('tumblr', 'consumer_secret', fallback=None),
|
|
config.get('tumblr', 'oauth_key', fallback=None),
|
|
config.get('tumblr', 'oauth_secret', fallback=None)),
|
|
markov=markov)
|
|
delojza.run_idle()
|