263 lines
8.6 KiB
Python
263 lines
8.6 KiB
Python
import configparser
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import List, Dict
|
|
|
|
import telegram
|
|
from telegram.ext import Updater, CommandHandler, PicklePersistence
|
|
|
|
|
|
class DudleBot:
|
|
LINE_LIMIT = 8
|
|
|
|
def __init__(self, token):
|
|
self.logger = logging.getLogger("dudle")
|
|
|
|
persistence = PicklePersistence(filename="dudlebot.pickle")
|
|
|
|
self.updater = Updater(token, persistence=persistence, use_context=True)
|
|
dispatcher = self.updater.dispatcher
|
|
|
|
dispatcher.add_handler(CommandHandler("start", self.tg_start))
|
|
dispatcher.add_handler(CommandHandler("newplan", self.tg_newplan))
|
|
dispatcher.add_handler(CommandHandler("plan", self.tg_plan))
|
|
dispatcher.add_handler(CommandHandler("show", self.tg_show))
|
|
dispatcher.add_handler(CommandHandler("best", self.tg_best))
|
|
dispatcher.add_handler(CommandHandler("endplan", self.tg_end))
|
|
|
|
def tg_start(self, update, _):
|
|
update.message.reply_text("Hello! May luck be with you and your plans!")
|
|
|
|
def tg_newplan(self, update, context):
|
|
if context.chat_data.get("plan", None):
|
|
self.tg_end(update, context)
|
|
|
|
today = datetime.today()
|
|
|
|
periods = {
|
|
"thisweek": today - timedelta(days=today.weekday()),
|
|
"nextweek": today + timedelta(days=7) - timedelta(days=today.weekday()),
|
|
}
|
|
|
|
period = update.message.text.partition(" ")[2]
|
|
if not period:
|
|
period = "nextweek"
|
|
|
|
if period not in periods:
|
|
update.message.reply_text("Sorry man, I just don't understand.")
|
|
return
|
|
|
|
context.chat_data["plan"] = Plan(
|
|
start=periods[period], duration=timedelta(days=7), entries={} # TODO
|
|
)
|
|
|
|
self._reply_with_plan(update, context)
|
|
|
|
def tg_plan(self, update, context):
|
|
if not context.chat_data.get("plan", None):
|
|
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
|
|
return
|
|
|
|
responses_str = " ".join(update.message.text.split(" ")[1:])
|
|
responses_str = re.sub(r"[^A-Za-z?]", "", responses_str)
|
|
responses_str = [char.upper() for char in responses_str]
|
|
|
|
if len(responses_str) == 0:
|
|
update.message.reply_text(
|
|
"Did you forget something? (Ex. usage: /plan Y N ? Y N Y Y)"
|
|
)
|
|
return
|
|
|
|
responses_str_padded = [
|
|
responses_str[i] if i < len(responses_str) else "?"
|
|
for i in range(context.chat_data["plan"].duration.days)
|
|
]
|
|
|
|
responses = []
|
|
for char in responses_str_padded:
|
|
if char == "Y":
|
|
responses.append(PlanResponse.YES)
|
|
elif char == "N":
|
|
responses.append(PlanResponse.NO)
|
|
else:
|
|
responses.append(PlanResponse.UNKNOWN)
|
|
|
|
user = update.message.from_user
|
|
name = user.username or (
|
|
(user.first_name[0] + ". ") if user.first_name else ""
|
|
) + (user.last_name or "")
|
|
|
|
context.chat_data["plan"].entries[user.id] = PlanEntry(
|
|
name=name, responses=responses
|
|
)
|
|
|
|
self._reply_with_plan(update, context)
|
|
|
|
def tg_show(self, update, context):
|
|
if not context.chat_data.get("plan", None):
|
|
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
|
|
return
|
|
|
|
self._reply_with_plan(update, context)
|
|
|
|
def tg_best(self, update, context):
|
|
if not context.chat_data.get("plan", None):
|
|
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
|
|
return
|
|
|
|
update.message.reply_text(self._best_of_plan(context.chat_data["plan"]))
|
|
|
|
def tg_end(self, update, context):
|
|
if not context.chat_data.get("plan", None):
|
|
update.message.reply_text("No plan created yet! (Speak /newplan to do so.)")
|
|
return
|
|
|
|
plan = context.chat_data.pop("plan")
|
|
response = "Poll ended!"
|
|
|
|
if len(plan.entries) > 0:
|
|
response += "\n\n" + self._best_of_plan(plan)
|
|
|
|
update.message.reply_text(response)
|
|
|
|
def _reply_with_plan(self, update, context):
|
|
plan: Plan = context.chat_data["plan"]
|
|
|
|
formatted_plan = (
|
|
f"Poll: {plan.start.strftime('%d.%m.%Y')} -> "
|
|
f"{(plan.start + plan.duration).strftime('%d.%m.%Y')}"
|
|
)
|
|
formatted_plan += "\n\n"
|
|
formatted_plan += "```\n"
|
|
|
|
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
|
|
formatted_plan += f"{'|'.join([day.strftime('%A')[:2] for day in days])}"
|
|
formatted_plan += "\n"
|
|
|
|
entries_with_content = [
|
|
entry
|
|
for entry in plan.entries.values()
|
|
if any(response != PlanResponse.NO for response in entry.responses)
|
|
]
|
|
|
|
if len(entries_with_content) == 0:
|
|
entries = [PlanEntry(name="???", responses=[])]
|
|
else:
|
|
entries = entries_with_content
|
|
|
|
for entry in entries:
|
|
responses = [
|
|
entry.responses[i] if i < len(entry.responses) else PlanResponse.UNKNOWN
|
|
for i in range(plan.duration.days)
|
|
]
|
|
|
|
for response in responses:
|
|
formatted_plan += f"{str(response)} "
|
|
|
|
formatted_plan += (
|
|
entry.name
|
|
if len(entry.name) <= self.LINE_LIMIT
|
|
else entry.name[: self.LINE_LIMIT - 1].strip() + "…"
|
|
)
|
|
formatted_plan += "\n"
|
|
|
|
formatted_plan += "```"
|
|
|
|
update.message.reply_text(
|
|
formatted_plan, parse_mode=telegram.ParseMode.MARKDOWN
|
|
)
|
|
|
|
@staticmethod
|
|
def _best_of_plan(plan):
|
|
days = [plan.start + timedelta(days=i) for i in range(plan.duration.days)]
|
|
results = [(0, 0)] * plan.duration.days
|
|
for entry in plan.entries.values():
|
|
for idx, response in enumerate(entry.responses):
|
|
if response == PlanResponse.YES:
|
|
results[idx] = (results[idx][0] + 1, results[idx][1])
|
|
elif response == PlanResponse.UNKNOWN:
|
|
results[idx] = (results[idx][0], results[idx][1] + 1)
|
|
|
|
sorted_days = list(zip(days, results)) # "Sorted" by date
|
|
sorted_days.sort(
|
|
key=lambda dwr: dwr[1][1], reverse=True
|
|
) # First sort by unknowns
|
|
sorted_days.sort(
|
|
key=lambda dwr: dwr[1][0], reverse=True
|
|
) # Then sort by positive answers
|
|
|
|
best_days = []
|
|
last_result, different_results = None, 0
|
|
for idx, day_with_result in enumerate(sorted_days):
|
|
day, result = day_with_result
|
|
yes, unknown = result
|
|
if yes == 0 or (idx > 3 and different_results > 1):
|
|
break
|
|
|
|
best_days.append(day_with_result)
|
|
|
|
if result != last_result:
|
|
different_results += 1
|
|
last_result = result
|
|
|
|
if len(best_days) > 0:
|
|
response = (
|
|
f"Best {len(best_days)} result{'s' if len(best_days) > 1 else ''}:\n\n"
|
|
)
|
|
for idx, day_with_result in enumerate(best_days):
|
|
day, result = day_with_result
|
|
yes, unknown = result
|
|
result_fmt = str(yes) + (f" + {unknown}?" if unknown > 0 else "")
|
|
# result_fmt = str(yes)
|
|
# if unknown > 0:
|
|
# same_results = [result for _, result in best_days if result[0] == yes]
|
|
# if any(same_unknown != unknown for _, same_unknown in same_results):
|
|
# result_fmt += f" + {unknown}?"
|
|
response += f"{idx + 1}. {day.strftime('%A %-d/%-m')} ({result_fmt})\n"
|
|
return response
|
|
else:
|
|
return "There has been no resolution."
|
|
|
|
def start(self):
|
|
self.logger.info("Starting DudleBot...")
|
|
self.updater.start_polling()
|
|
self.updater.idle()
|
|
|
|
|
|
class PlanResponse(Enum):
|
|
YES = 1
|
|
NO = 2
|
|
UNKNOWN = 3
|
|
|
|
def __str__(self):
|
|
return {self.YES: "Y", self.NO: "N", self.UNKNOWN: "?"}[self]
|
|
|
|
|
|
@dataclass
|
|
class PlanEntry:
|
|
name: str
|
|
responses: List[PlanResponse]
|
|
|
|
|
|
@dataclass
|
|
class Plan:
|
|
start: datetime
|
|
duration: timedelta
|
|
entries: Dict[int, PlanEntry]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read("dudlebot.ini")
|
|
|
|
dudlebot = DudleBot(config.get("general", "token"))
|
|
dudlebot.start()
|