import configparser import logging import re from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from typing import List, Dict import telegram from telegram.ext import Updater, CommandHandler, PicklePersistence class DudleBot: LINE_LIMIT = 8 def __init__(self, token): self.logger = logging.getLogger("dudle") persistence = PicklePersistence(filename="dudlebot.pickle") self.updater = Updater(token, persistence=persistence, use_context=True) dispatcher = self.updater.dispatcher dispatcher.add_handler(CommandHandler("start", self.tg_start)) dispatcher.add_handler(CommandHandler("newplan", self.tg_newplan)) dispatcher.add_handler(CommandHandler("plan", self.tg_plan)) dispatcher.add_handler(CommandHandler("show", self.tg_show)) dispatcher.add_handler(CommandHandler("best", self.tg_best)) dispatcher.add_handler(CommandHandler("endplan", self.tg_end)) def tg_start(self, update, _): update.message.reply_text("Hello! May luck be with you and your plans!") def tg_newplan(self, update, context): if context.chat_data.get("plan", None): self.tg_end(update, context) today = datetime.today() periods = { "thisweek": today - timedelta(days=today.weekday()), "nextweek": today + timedelta(days=7) - timedelta(days=today.weekday()), } period = update.message.text.partition(" ")[2] if not period: period = "nextweek" if period not in periods: update.message.reply_text("Sorry man, I just don't understand.") return context.chat_data["plan"] = Plan( start=periods[period], duration=timedelta(days=7), entries={} # TODO ) self._reply_with_plan(update, context) def tg_plan(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return responses_str = " ".join(update.message.text.split(" ")[1:]) responses_str = re.sub(r"[^A-Za-z?]", "", responses_str) responses_str = [char.upper() for char in responses_str] if len(responses_str) == 0: update.message.reply_text( "Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)" ) return responses_str_padded = [ responses_str[i] if i < len(responses_str) else "?" for i in range(context.chat_data["plan"].duration.days) ] responses = [] for char in responses_str_padded: if char == "Y": responses.append(PlanResponse.YES) elif char == "N": responses.append(PlanResponse.NO) else: responses.append(PlanResponse.UNKNOWN) user = update.message.from_user name = user.username or ( (user.first_name[0] + ". ") if user.first_name else "" ) + (user.last_name or "") context.chat_data["plan"].entries[user.id] = PlanEntry( name=name, responses=responses ) self._reply_with_plan(update, context) def tg_show(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return self._reply_with_plan(update, context) def tg_best(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return update.message.reply_text(self._best_of_plan(context.chat_data["plan"])) def tg_end(self, update, context): if not context.chat_data.get("plan", None): update.message.reply_text("No plan created yet! (Speak /newplan to do so.)") return plan = context.chat_data.pop("plan") response = "Poll ended!" if len(plan.entries) > 0: response += "\n\n" + self._best_of_plan(plan) update.message.reply_text(response) def _reply_with_plan(self, update, context): plan: Plan = context.chat_data["plan"] formatted_plan = ( f"Poll: {plan.start.strftime('%d.%m.%Y')} -> " f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}" ) formatted_plan += "\n\n" formatted_plan += "```\n" days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)] formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}" formatted_plan += "\n" entries_with_content = [ entry for entry in plan.entries.values() if any(response != PlanResponse.NO for response in entry.responses) ] if len(entries_with_content) == 0: entries = [PlanEntry(name="???", responses=[])] else: entries = entries_with_content for entry in entries: responses = [ entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN for i in range(plan.duration.days) ] for response in responses: formatted_plan += f"{str(response)} " formatted_plan += ( entry.name if len(entry.name) <= self.LINE_LIMIT else entry.name[: self.LINE_LIMIT - 1].strip() + "…" ) formatted_plan += "\n" formatted_plan += "```" update.message.reply_text( formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN ) @staticmethod def _best_of_plan(plan): days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)] results = [(0, 0)] * plan.duration.days for entry in plan.entries.values(): for idx, response in enumerate(entry.responses): if response == PlanResponse.YES: results[idx] = (results[idx][0] + 1, results[idx][1]) elif response == PlanResponse.UNKNOWN: results[idx] = (results[idx][0], results[idx][1] + 1) sorted_days = list(zip(days, results)) # "Sorted" by date sorted_days.sort( key=lambda dwr: dwr[1][1], reverse=True ) # First sort by unknowns sorted_days.sort( key=lambda dwr: dwr[1][0], reverse=True ) # Then sort by positive answers best_days = [] last_result, different_results = None, 0 for idx, day_with_result in enumerate(sorted_days): day, result = day_with_result yes, unknown = result if yes == 0 or (idx > 3 and different_results > 1): break best_days.append(day_with_result) if result != last_result: different_results += 1 last_result = result if len(best_days) > 0: response = ( f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n" ) for idx, day_with_result in enumerate(best_days): day, result = day_with_result yes, unknown = result result_fmt = str(yes) + (f" + {unknown}?" if unknown > 0 else "") # result_fmt = str(yes) # if unknown > 0: # same_results = [result for _, result in best_days if result[0] == yes] # if any(same_unknown != unknown for _, same_unknown in same_results): # result_fmt += f" + {unknown}?" response += f"{idx + 1}. {day.strftime('%A %-d/%-m')} ({result_fmt})\n" return response else: return "There has been no resolution." def start(self): self.logger.info("Starting DudleBot...") self.updater.start_polling() self.updater.idle() class PlanResponse(Enum): YES = 1 NO = 2 UNKNOWN = 3 def __str__(self): return {self.YES: "Y", self.NO: "N", self.UNKNOWN: "?"}[self] @dataclass class PlanEntry: name: str responses: List[PlanResponse] @dataclass class Plan: start: datetime duration: timedelta entries: Dict[int, PlanEntry] if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) config = configparser.ConfigParser() config.read("dudlebot.ini") dudlebot = DudleBot(config.get("general", "token")) dudlebot.start()