tg-dudlebot/dudlebot.py

224 lines
7.6 KiB
Python
Raw Normal View History

2019-10-14 12:47:51 +02:00
import configparser
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
2019-10-14 14:43:27 +02:00
from operator import itemgetter
2019-10-14 12:47:51 +02:00
from typing import List, Dict
import telegram
from telegram.ext import Updater, CommandHandler, PicklePersistence
class DudleBot:
2019-10-16 10:29:45 +02:00
LINE_LIMIT = 8
2019-10-14 12:47:51 +02:00
def __init__(self, token):
self.logger = logging.getLogger("dudle")
persistence = PicklePersistence(filename='dudlebot.pickle')
self.updater = Updater(token, persistence=persistence, use_context=True)
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler('start', self.tg_start))
dispatcher.add_handler(CommandHandler('newplan', self.tg_newplan))
dispatcher.add_handler(CommandHandler('plan', self.tg_plan))
2019-10-14 14:43:27 +02:00
dispatcher.add_handler(CommandHandler('show', self.tg_show))
dispatcher.add_handler(CommandHandler('best', self.tg_best))
dispatcher.add_handler(CommandHandler('endplan', self.tg_end))
2019-10-14 12:47:51 +02:00
def tg_start(self, update, _):
update.message.reply_text("Hello! May luck be with you and your plans!")
def tg_newplan(self, update, context):
2019-10-14 14:43:27 +02:00
if context.chat_data.get("plan", None):
update.message.reply_text("There's a plan unresolved still! (Speak /endplan to fix it.)")
return
2019-10-14 12:47:51 +02:00
periods = {
'thisweek': datetime.today() - timedelta(days=datetime.today().weekday() % 6),
'nextweek': datetime.today() + timedelta(days=7) - timedelta(days=datetime.today().weekday() % 6)
}
period = update.message.text.partition(' ')[2]
if not period:
period = 'nextweek'
if period not in periods:
update.message.reply_text("Sorry man, I just don't understand.")
return
context.chat_data['plan'] = Plan(start=periods[period],
duration=timedelta(days=7), # TODO
entries={})
self._reply_with_plan(update, context)
def tg_plan(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
2019-10-22 20:23:15 +02:00
responses_str = " ".join(update.message.text.split(" ")[1:])
2019-10-14 12:47:51 +02:00
responses_str = re.sub(r'[^A-Za-z?]', '', responses_str)
responses_str = [char.upper() for char in responses_str]
if len(responses_str) == 0:
update.message.reply_text("Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)")
return
responses_str_padded = [responses_str[i] if i < len(responses_str) else "?"
for i in range(context.chat_data['plan'].duration.days)]
responses = []
for char in responses_str_padded:
if char == "Y":
responses.append(PlanResponse.YES)
elif char == "N":
responses.append(PlanResponse.NO)
else:
responses.append(PlanResponse.UNKNOWN)
user = update.message.from_user
2019-10-16 12:05:19 +02:00
name = user.username or ((user.first_name[0] + ". ") if user.first_name else '') + (user.last_name or '')
2019-10-14 12:47:51 +02:00
context.chat_data['plan'].entries[user.id] = PlanEntry(name=name, responses=responses)
self._reply_with_plan(update, context)
2019-10-14 14:43:27 +02:00
def tg_show(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
self._reply_with_plan(update, context)
def tg_best(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
update.message.reply_text(self._best_of_plan(context.chat_data["plan"]))
def tg_end(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
plan = context.chat_data.pop("plan")
response = "Poll ended!"
if len(plan.entries) > 0:
response += "\n\n" + self._best_of_plan(plan)
update.message.reply_text(response)
def _reply_with_plan(self, update, context):
2019-10-14 12:47:51 +02:00
plan: Plan = context.chat_data['plan']
formatted_plan = f"Poll: {plan.start.strftime('%d.%m.%Y')} -> " \
f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}"
formatted_plan += "\n\n"
formatted_plan += "```\n"
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}"
formatted_plan += "\n"
entries_with_content = [entry for entry in plan.entries.values() if
any(response != PlanResponse.NO for response in entry.responses)]
if len(entries_with_content) == 0:
entries = [PlanEntry(name="???", responses=[])]
2019-10-14 12:47:51 +02:00
else:
entries = plan.entries.values()
2019-10-14 12:47:51 +02:00
for entry in entries:
2019-10-14 12:47:51 +02:00
responses = [entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN
for i in range(plan.duration.days)]
for response in responses:
formatted_plan += f"{str(response)} "
2019-10-16 10:29:45 +02:00
formatted_plan += entry.name if len(entry.name) <= self.LINE_LIMIT \
else entry.name[:self.LINE_LIMIT - 1].strip() + ""
2019-10-14 12:47:51 +02:00
formatted_plan += "\n"
formatted_plan += "```"
update.message.reply_text(formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN)
2019-10-14 14:43:27 +02:00
@staticmethod
def _best_of_plan(plan):
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
results = [0] * plan.duration.days
for entry in plan.entries.values():
for idx, response in enumerate(entry.responses):
if response == PlanResponse.YES:
results[idx] += 1
# elif response == PlanResponse.UNKNOWN:
# results[idx] += 0.01
sorted_days = sorted(zip(days, results), key=itemgetter(1), reverse=True)
best_days = []
last_result, different_results = None, 0
for day, result in sorted_days:
if result == 0 or (len(best_days) >= 3 and different_results > 1):
break
best_days.append((day, result))
if result != last_result:
different_results += 1
last_result = result
response = f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n"
for idx, dayresult in enumerate(best_days):
day, result = dayresult
response += f"{idx + 1}. {day.strftime('%A')} ({result})\n"
return response
2019-10-14 12:47:51 +02:00
def start(self):
self.logger.info("Starting DudleBot...")
self.updater.start_polling()
self.updater.idle()
class PlanResponse(Enum):
YES = 1
NO = 2
UNKNOWN = 3
def __str__(self):
return {
self.YES: "Y",
self.NO: "N",
self.UNKNOWN: "?"
}[self]
@dataclass
class PlanEntry:
name: str
responses: List[PlanResponse]
@dataclass
class Plan:
start: datetime
duration: timedelta
entries: Dict[int, PlanEntry]
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
config = configparser.ConfigParser()
config.read("dudlebot.ini")
dudlebot = DudleBot(config.get("general", "token"))
dudlebot.start()