tg-dudlebot/dudlebot.py

236 lines
8.5 KiB
Python

import configparser
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Dict
import telegram
from telegram.ext import Updater, CommandHandler, PicklePersistence
class DudleBot:
LINE_LIMIT = 8
def __init__(self, token):
self.logger = logging.getLogger("dudle")
persistence = PicklePersistence(filename='dudlebot.pickle')
self.updater = Updater(token, persistence=persistence, use_context=True)
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler('start', self.tg_start))
dispatcher.add_handler(CommandHandler('newplan', self.tg_newplan))
dispatcher.add_handler(CommandHandler('plan', self.tg_plan))
dispatcher.add_handler(CommandHandler('show', self.tg_show))
dispatcher.add_handler(CommandHandler('best', self.tg_best))
dispatcher.add_handler(CommandHandler('endplan', self.tg_end))
def tg_start(self, update, _):
update.message.reply_text("Hello! May luck be with you and your plans!")
def tg_newplan(self, update, context):
if context.chat_data.get("plan", None):
update.message.reply_text("There's a plan unresolved still! (Speak /endplan to fix it.)")
return
periods = {
'thisweek': datetime.today() - timedelta(days=datetime.today().weekday() % 6),
'nextweek': datetime.today() + timedelta(days=7) - timedelta(days=datetime.today().weekday() % 6)
}
period = update.message.text.partition(' ')[2]
if not period:
period = 'nextweek'
if period not in periods:
update.message.reply_text("Sorry man, I just don't understand.")
return
context.chat_data['plan'] = Plan(start=periods[period],
duration=timedelta(days=7), # TODO
entries={})
self._reply_with_plan(update, context)
def tg_plan(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
responses_str = " ".join(update.message.text.split(" ")[1:])
responses_str = re.sub(r'[^A-Za-z?]', '', responses_str)
responses_str = [char.upper() for char in responses_str]
if len(responses_str) == 0:
update.message.reply_text("Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)")
return
responses_str_padded = [responses_str[i] if i < len(responses_str) else "?"
for i in range(context.chat_data['plan'].duration.days)]
responses = []
for char in responses_str_padded:
if char == "Y":
responses.append(PlanResponse.YES)
elif char == "N":
responses.append(PlanResponse.NO)
else:
responses.append(PlanResponse.UNKNOWN)
user = update.message.from_user
name = user.username or ((user.first_name[0] + ". ") if user.first_name else '') + (user.last_name or '')
context.chat_data['plan'].entries[user.id] = PlanEntry(name=name, responses=responses)
self._reply_with_plan(update, context)
def tg_show(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
self._reply_with_plan(update, context)
def tg_best(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
update.message.reply_text(self._best_of_plan(context.chat_data["plan"]))
def tg_end(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
plan = context.chat_data.pop("plan")
response = "Poll ended!"
if len(plan.entries) > 0:
response += "\n\n" + self._best_of_plan(plan)
update.message.reply_text(response)
def _reply_with_plan(self, update, context):
plan: Plan = context.chat_data['plan']
formatted_plan = f"Poll: {plan.start.strftime('%d.%m.%Y')} -> " \
f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}"
formatted_plan += "\n\n"
formatted_plan += "```\n"
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}"
formatted_plan += "\n"
entries_with_content = [entry for entry in plan.entries.values() if
any(response != PlanResponse.NO for response in entry.responses)]
if len(entries_with_content) == 0:
entries = [PlanEntry(name="???", responses=[])]
else:
entries = entries_with_content
for entry in entries:
responses = [entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN
for i in range(plan.duration.days)]
for response in responses:
formatted_plan += f"{str(response)} "
formatted_plan += entry.name if len(entry.name) <= self.LINE_LIMIT \
else entry.name[:self.LINE_LIMIT - 1].strip() + ""
formatted_plan += "\n"
formatted_plan += "```"
update.message.reply_text(formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN)
@staticmethod
def _best_of_plan(plan):
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
results = [(0, 0)] * plan.duration.days
for entry in plan.entries.values():
for idx, response in enumerate(entry.responses):
if response == PlanResponse.YES:
results[idx] = (results[idx][0] + 1, results[idx][1])
elif response == PlanResponse.UNKNOWN:
results[idx] = (results[idx][0], results[idx][1] + 1)
sorted_days = list(zip(days, results)) # "Sorted" by date
sorted_days.sort(key=lambda dwr: dwr[1][1], reverse=True) # First sort by unknowns
sorted_days.sort(key=lambda dwr: dwr[1][0], reverse=True) # Then sort by positive answers
best_days = []
last_result, different_results = None, 0
for idx, day_with_result in enumerate(sorted_days):
day, result = day_with_result
yes, unknown = result
if yes == 0 or (idx > 3 and different_results > 1):
break
best_days.append(day_with_result)
if result != last_result:
different_results += 1
last_result = result
if len(best_days) > 0:
response = f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n"
for idx, day_with_result in enumerate(best_days):
day, result = day_with_result
yes, unknown = result
result_fmt = str(yes) + (f" + {unknown}?" if unknown > 0 else "")
# result_fmt = str(yes)
# if unknown > 0:
# same_results = [result for _, result in best_days if result[0] == yes]
# if any(same_unknown != unknown for _, same_unknown in same_results):
# result_fmt += f" + {unknown}?"
response += f"{idx + 1}. {day.strftime('%A %-d/%-m')} ({result_fmt})\n"
return response
else:
return "There has been no resolution."
def start(self):
self.logger.info("Starting DudleBot...")
self.updater.start_polling()
self.updater.idle()
class PlanResponse(Enum):
YES = 1
NO = 2
UNKNOWN = 3
def __str__(self):
return {
self.YES: "Y",
self.NO: "N",
self.UNKNOWN: "?"
}[self]
@dataclass
class PlanEntry:
name: str
responses: List[PlanResponse]
@dataclass
class Plan:
start: datetime
duration: timedelta
entries: Dict[int, PlanEntry]
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
config = configparser.ConfigParser()
config.read("dudlebot.ini")
dudlebot = DudleBot(config.get("general", "token"))
dudlebot.start()