import configparser import logging import re from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from operator import itemgetter from typing import List, Dict import telegram from telegram.ext import Updater, CommandHandler, PicklePersistence class DudleBot: LINE_LIMIT = 8 def __init__(self, token): self.logger = logging.getLogger("dudle") persistence = PicklePersistence(filename='dudlebot.pickle') self.updater = Updater(token, persistence=persistence, use_context=True) dispatcher = self.updater.dispatcher dispatcher.add_handler(CommandHandler('start', self.tg_start)) dispatcher.add_handler(CommandHandler('newplan', self.tg_newplan)) dispatcher.add_handler(CommandHandler('plan', self.tg_plan)) dispatcher.add_handler(CommandHandler('show', self.tg_show)) dispatcher.add_handler(CommandHandler('best', self.tg_best)) dispatcher.add_handler(CommandHandler('endplan', self.tg_end)) def tg_start(self, update, _): update.message.reply_text("Hello! May luck be with you and your plans!") def tg_newplan(self, update, context): if context.chat_data.get("plan", None): update.message.reply_text("There's a plan unresolved still! (Speak /endplan to fix it.)") return periods = { 'thisweek': datetime.today() - timedelta(days=datetime.today().weekday() % 6), 'nextweek': datetime.today() + timedelta(days=7) - timedelta(days=datetime.today().weekday() % 6) } period = update.message.text.partition(' ')[2] if not period: period = 'nextweek' if period not in periods: update.message.reply_text("Sorry man, I just don't understand.") return context.chat_data['plan'] = Plan(start=periods[period], duration=timedelta(days=7), # TODO entries={}) self._reply_with_plan(update, context) def tg_plan(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return responses_str = update.message.text[len("/plan"):] responses_str = re.sub(r'[^A-Za-z?]', '', responses_str) responses_str = [char.upper() for char in responses_str] if len(responses_str) == 0: update.message.reply_text("Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)") return responses_str_padded = [responses_str[i] if i < len(responses_str) else "?" for i in range(context.chat_data['plan'].duration.days)] responses = [] for char in responses_str_padded: if char == "Y": responses.append(PlanResponse.YES) elif char == "N": responses.append(PlanResponse.NO) else: responses.append(PlanResponse.UNKNOWN) user = update.message.from_user name = user.username or ((user.first_name[0] + ". ") if user.first_name else '') + user.last_name context.chat_data['plan'].entries[user.id] = PlanEntry(name=name, responses=responses) self._reply_with_plan(update, context) def tg_show(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return self._reply_with_plan(update, context) def tg_best(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return update.message.reply_text(self._best_of_plan(context.chat_data["plan"])) def tg_end(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return plan = context.chat_data.pop("plan") response = "Poll ended!" if len(plan.entries) > 0: response += "\n\n" + self._best_of_plan(plan) update.message.reply_text(response) def _reply_with_plan(self, update, context): plan: Plan = context.chat_data['plan'] formatted_plan = f"Poll: {plan.start.strftime('%d.%m.%Y')} -> " \ f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}" formatted_plan += "\n\n" formatted_plan += "```\n" days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)] formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}" formatted_plan += "\n" if len(plan.entries) == 0: entries = {-1: PlanEntry(name="???", responses=[])} else: entries = plan.entries for entry in entries.values(): responses = [entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN for i in range(plan.duration.days)] for response in responses: formatted_plan += f"{str(response)} " formatted_plan += entry.name if len(entry.name) <= self.LINE_LIMIT \ else entry.name[:self.LINE_LIMIT - 1].strip() + "…" formatted_plan += "\n" formatted_plan += "```" update.message.reply_text(formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN) @staticmethod def _best_of_plan(plan): days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)] results = [0] * plan.duration.days for entry in plan.entries.values(): for idx, response in enumerate(entry.responses): if response == PlanResponse.YES: results[idx] += 1 # elif response == PlanResponse.UNKNOWN: # results[idx] += 0.01 sorted_days = sorted(zip(days, results), key=itemgetter(1), reverse=True) best_days = [] last_result, different_results = None, 0 for day, result in sorted_days: if result == 0 or (len(best_days) >= 3 and different_results > 1): break best_days.append((day, result)) if result != last_result: different_results += 1 last_result = result response = f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n" for idx, dayresult in enumerate(best_days): day, result = dayresult response += f"{idx + 1}. {day.strftime('%A')} ({result})\n" return response def start(self): self.logger.info("Starting DudleBot...") self.updater.start_polling() self.updater.idle() class PlanResponse(Enum): YES = 1 NO = 2 UNKNOWN = 3 def __str__(self): return { self.YES: "Y", self.NO: "N", self.UNKNOWN: "?" }[self] @dataclass class PlanEntry: name: str responses: List[PlanResponse] @dataclass class Plan: start: datetime duration: timedelta entries: Dict[int, PlanEntry] if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') config = configparser.ConfigParser() config.read("dudlebot.ini") dudlebot = DudleBot(config.get("general", "token")) dudlebot.start()