tg-dudlebot/dudlebot.py

221 lines
7.4 KiB
Python

import configparser
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from operator import itemgetter
from typing import List, Dict
import telegram
from telegram.ext import Updater, CommandHandler, PicklePersistence
class DudleBot:
LINE_LIMIT = 9
def __init__(self, token):
self.logger = logging.getLogger("dudle")
persistence = PicklePersistence(filename='dudlebot.pickle')
self.updater = Updater(token, persistence=persistence, use_context=True)
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler('start', self.tg_start))
dispatcher.add_handler(CommandHandler('newplan', self.tg_newplan))
dispatcher.add_handler(CommandHandler('plan', self.tg_plan))
dispatcher.add_handler(CommandHandler('show', self.tg_show))
dispatcher.add_handler(CommandHandler('best', self.tg_best))
dispatcher.add_handler(CommandHandler('endplan', self.tg_end))
def tg_start(self, update, _):
update.message.reply_text("Hello! May luck be with you and your plans!")
def tg_newplan(self, update, context):
if context.chat_data.get("plan", None):
update.message.reply_text("There's a plan unresolved still! (Speak /endplan to fix it.)")
return
periods = {
'thisweek': datetime.today() - timedelta(days=datetime.today().weekday() % 6),
'nextweek': datetime.today() + timedelta(days=7) - timedelta(days=datetime.today().weekday() % 6)
}
period = update.message.text.partition(' ')[2]
if not period:
period = 'nextweek'
if period not in periods:
update.message.reply_text("Sorry man, I just don't understand.")
return
context.chat_data['plan'] = Plan(start=periods[period],
duration=timedelta(days=7), # TODO
entries={})
self._reply_with_plan(update, context)
def tg_plan(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
responses_str = update.message.text[len("/plan"):]
responses_str = re.sub(r'[^A-Za-z?]', '', responses_str)
responses_str = [char.upper() for char in responses_str]
if len(responses_str) == 0:
update.message.reply_text("Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)")
return
responses_str_padded = [responses_str[i] if i < len(responses_str) else "?"
for i in range(context.chat_data['plan'].duration.days)]
responses = []
for char in responses_str_padded:
if char == "Y":
responses.append(PlanResponse.YES)
elif char == "N":
responses.append(PlanResponse.NO)
else:
responses.append(PlanResponse.UNKNOWN)
user = update.message.from_user
name = user.username or ((user.first_name[0] + ". ") if user.first_name else '') + user.last_name
context.chat_data['plan'].entries[user.id] = PlanEntry(name=name, responses=responses)
self._reply_with_plan(update, context)
def tg_show(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
self._reply_with_plan(update, context)
def tg_best(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
update.message.reply_text(self._best_of_plan(context.chat_data["plan"]))
def tg_end(self, update, context):
if not context.chat_data.get("plan", None):
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
return
plan = context.chat_data.pop("plan")
response = "Poll ended!"
if len(plan.entries) > 0:
response += "\n\n" + self._best_of_plan(plan)
update.message.reply_text(response)
def _reply_with_plan(self, update, context):
plan: Plan = context.chat_data['plan']
formatted_plan = f"Poll: {plan.start.strftime('%d.%m.%Y')} -> " \
f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}"
formatted_plan += "\n\n"
formatted_plan += "```\n"
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}"
formatted_plan += "\n"
if len(plan.entries) == 0:
entries = {-1: PlanEntry(name="???", responses=[])}
else:
entries = plan.entries
for entry in entries.values():
responses = [entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN
for i in range(plan.duration.days)]
for response in responses:
formatted_plan += f"{str(response)} "
formatted_plan += entry.name if len(entry.name) < self.LINE_LIMIT \
else entry[:self.LINE_LIMIT - 2] + ""
formatted_plan += "\n"
formatted_plan += "```"
update.message.reply_text(formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN)
@staticmethod
def _best_of_plan(plan):
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
results = [0] * plan.duration.days
for entry in plan.entries.values():
for idx, response in enumerate(entry.responses):
if response == PlanResponse.YES:
results[idx] += 1
# elif response == PlanResponse.UNKNOWN:
# results[idx] += 0.01
sorted_days = sorted(zip(days, results), key=itemgetter(1), reverse=True)
best_days = []
last_result, different_results = None, 0
for day, result in sorted_days:
if result == 0 or (len(best_days) >= 3 and different_results > 1):
break
best_days.append((day, result))
if result != last_result:
different_results += 1
last_result = result
response = f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n"
for idx, dayresult in enumerate(best_days):
day, result = dayresult
response += f"{idx + 1}. {day.strftime('%A')} ({result})\n"
return response
def start(self):
self.logger.info("Starting DudleBot...")
self.updater.start_polling()
self.updater.idle()
class PlanResponse(Enum):
YES = 1
NO = 2
UNKNOWN = 3
def __str__(self):
return {
self.YES: "Y",
self.NO: "N",
self.UNKNOWN: "?"
}[self]
@dataclass
class PlanEntry:
name: str
responses: List[PlanResponse]
@dataclass
class Plan:
start: datetime
duration: timedelta
entries: Dict[int, PlanEntry]
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
config = configparser.ConfigParser()
config.read("dudlebot.ini")
dudlebot = DudleBot(config.get("general", "token"))
dudlebot.start()