import asyncio import datetime import json import logging import os import sqlite3 import subprocess from sqlite3 import Connection from typing import Optional import requests import telegram.constants from bs4 import BeautifulSoup from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, CallbackQueryHandler log_dir = os.getenv("BEEBOT_LOGS") if not log_dir: log_dir = "/mnt/logs" if not os.path.exists(log_dir): os.mkdir(log_dir) logging.basicConfig( format="[%(asctime)s] %(name)s/%(levelname)s - %(message)s", level=logging.INFO, handlers=[ logging.FileHandler( os.path.join( log_dir, datetime.datetime.today().strftime("%Y%m%d-%H%M%S.log") ) ), logging.StreamHandler() ] ) logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) def flag(code): offset = 127462 - ord('A') code = code.upper() return chr(ord(code[0]) + offset) + chr(ord(code[1]) + offset) class BeeBot: MENU_URL = "https://www.epfl.ch/campus/restaurants-shops-hotels/fr/industrie21-epfl-valais-wallis/?date={date}" RESTO_ID = 545 BASE_DIR = os.path.dirname(__file__) CACHE_PATH = os.path.join(BASE_DIR, "cache.json") DB_PATH = os.path.join(BASE_DIR, "database.db") TEMPLATE_PATH = os.path.join(BASE_DIR, "menu.typ") IMG_DIR = "/tmp/menu_images" LANGUAGES = { "en": { "emoji": flag("gb"), "name": "English" }, "fr": { "emoji": flag("fr"), "name": "Français" }, "de": { "emoji": flag("de"), "name": "Deutsch" } } CATEGORIES = { "E": { "name": "Étudiant" }, "D": { "name": "Doctorant" }, "C": { "name": "Campus" }, "V": { "name": "Visiteur" } } def __init__(self, token: str): self.tg_token: str = token self.tg_app = None self.cache: dict[str, str] = {} self.db_con: Optional[Connection] = None self.locks: dict[str, bool] = {} self.fetch_lock = asyncio.Lock() self.gen_lock = asyncio.Lock() def mainloop(self): logger.info("Initialization") self.db_con = sqlite3.connect(self.DB_PATH) self.check_database() self.load_cache() app = ApplicationBuilder().token(self.tg_token).build() app.add_handler(CommandHandler("week", self.cmd_week)) app.add_handler(CommandHandler("today", self.cmd_today)) app.add_handler(CommandHandler("settings", self.cmd_settings)) app.add_handler(CallbackQueryHandler(self.cb_handler)) logger.info("Starting bot") app.run_polling() async def cmd_week(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /week") await self.request_menu(update, context, False) async def cmd_today(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /today") await self.request_menu(update, context, True) async def cmd_settings(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /settings") menu = self.get_settings_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_chat.send_message(text="Your preferences", reply_markup=reply_markup) async def cb_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query if query.data == "change_language": await self.cb_change_language(update, context) elif query.data.startswith("set_language"): await self.cb_set_language(update, context) elif query.data == "change_categories": await self.cb_change_categories(update, context) elif query.data.startswith("toggle_category"): await self.cb_toggle_category(update, context) elif query.data == "back_to_settings": menu = self.get_settings_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_message.edit_text(text="Your preferences", reply_markup=reply_markup) async def cb_change_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Clicked 'Change language'") menu = self.get_language_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_message.edit_text(text="Choose a language", reply_markup=reply_markup) async def cb_set_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: lang = update.callback_query.data.split(":")[1] logger.debug(f"Clicked 'Set language to {lang}'") cur = self.db_con.cursor() cur.execute( "UPDATE user SET lang=? WHERE telegram_id=?", (lang, update.effective_user.id) ) self.db_con.commit() cur.close() menu = self.get_language_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_message.edit_text(text="Choose a language", reply_markup=reply_markup) async def cb_change_categories(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Clicked 'Change categories'") menu = self.get_categories_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_message.edit_text(text="Choose the price categories to display", reply_markup=reply_markup) async def cb_toggle_category(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: category = update.callback_query.data.split(":")[1] logger.debug(f"Clicked 'Toggle category {category}'") categories: set[str] = self.get_user_pref(update)["categories"] if category in categories: categories.remove(category) else: categories.add(category) cur = self.db_con.cursor() cur.execute( "UPDATE user SET categories=? WHERE telegram_id=?", (",".join(categories), update.effective_user.id) ) self.db_con.commit() cur.close() menu = self.get_categories_menu(update) reply_markup = InlineKeyboardMarkup(menu) await update.effective_message.edit_text(text="Choose the price categories to display", reply_markup=reply_markup) async def request_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE, today_only: bool) -> None: chat_id = update.effective_chat.id await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) prefs = self.get_user_pref(update) logger.debug(f"User prefs: {prefs}") categories = prefs["categories"] img_id = self.get_img_id(today_only, categories) filename = img_id + ".png" img_path = os.path.join(self.IMG_DIR, filename) menu_id = "today_menu" if today_only else "week_menu" menu_path = os.path.join(self.BASE_DIR, "menus_today.json" if today_only else "menus_week.json") # If menu needs to be fetched if not os.path.exists(menu_path) or self.is_outdated(menu_id, today_only): # Notify user msg = await update.message.reply_text("The menu is being updated, please wait...") async with self.fetch_lock: if today_only: await self.fetch_today_menu() else: await self.fetch_week_menu() await msg.delete() # If image needs to be (re)generated if not os.path.exists(img_path) or self.is_outdated(img_id, today_only): await self.gen_image(today_only, categories, img_path, img_id) await context.bot.send_photo(chat_id=chat_id, photo=img_path) def check_database(self): cur = self.db_con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS "user" ( "id" INTEGER, "telegram_id" INTEGER NOT NULL UNIQUE, "categories" TEXT, "lang" TEXT NOT NULL DEFAULT 'fr', "created_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY("id" AUTOINCREMENT) ); """) self.db_con.commit() cur.close() def get_user_pref(self, update: Update) -> dict: user_id = update.effective_user.id cur = self.db_con.cursor() res = cur.execute("SELECT categories, lang FROM user WHERE telegram_id=?", (user_id,)) user = res.fetchone() cur.close() if user is None: self.create_user(user_id) return { "categories": {"E", "D", "C", "V"}, "lang": "fr" } categories = set(user[0].split(",")) return { "categories": categories, "lang": user[1] } def create_user(self, telegram_id: int) -> None: logger.debug(f"New user with id {telegram_id}") cur = self.db_con.cursor() cur.execute( "INSERT INTO user (telegram_id, categories, lang) VALUES (?, ?, ?)", (telegram_id, "E,D,C,V", "fr") ) self.db_con.commit() cur.close() def load_cache(self): if os.path.exists(self.CACHE_PATH): with open(self.CACHE_PATH, "r") as f: self.cache = json.load(f) def save_cache(self): with open(self.CACHE_PATH, "w") as f: json.dump(self.cache, f, indent=4) @staticmethod def get_img_id(today_only: bool, categories: set[str]) -> str: categs = "".join(sorted(categories)) layout = "today" if today_only else "week" return f"{layout}-{categs}" def is_outdated(self, obj_id: str, today_only: bool) -> bool: if obj_id not in self.cache: return True gen_date = datetime.datetime.strptime(self.cache[obj_id], "%Y-%m-%d") today = datetime.datetime.today() if today_only: if (today - gen_date).days > 0: return True else: delta1 = datetime.timedelta(days=gen_date.weekday()) delta2 = datetime.timedelta(days=today.weekday()) gen_monday = gen_date - delta1 cur_monday = today - delta2 if (cur_monday - gen_monday).days > 0: return True return False async def gen_image(self, today_only: bool, categories: set[str], outpath: str, img_id: str) -> None: if not os.path.exists(self.IMG_DIR): os.mkdir(self.IMG_DIR) logger.info(f"Generating {'today' if today_only else 'week'} menu image for categories {categories}") menu_path = "menus_today.json" if today_only else "menus_week.json" subprocess.call([ "typst", "compile", "--root", self.BASE_DIR, "--font-path", os.path.join(self.BASE_DIR, "fonts"), "--input", f"path={menu_path}", "--input", f"categories={','.join(categories)}", self.TEMPLATE_PATH, outpath ]) self.cache[img_id] = datetime.datetime.today().strftime("%Y-%m-%d") self.save_cache() def save_menu(self, days: list, menu_id: str, filename: str) -> None: with open(os.path.join(self.BASE_DIR, filename), "w") as f: json.dump(days, f) self.cache[menu_id] = datetime.datetime.today().strftime("%Y-%m-%d") self.save_cache() async def fetch_week_menu(self) -> None: logger.info("Fetching week menu") today = datetime.datetime.today() delta = datetime.timedelta(days=today.weekday()) monday = today - delta days = [] for i in range(5): dt = datetime.timedelta(days=i) date = monday + dt menus = await self.fetch_menu(date) days.append({ "date": date.strftime("%Y-%m-%d"), "menus": menus }) self.save_menu(days, "week_menu", "menus_week.json") async def fetch_today_menu(self) -> None: logger.info("Fetching today menu") today = datetime.datetime.today() menus = await self.fetch_menu(today) days = [{ "date": today.strftime("%Y-%m-%d"), "menus": menus }] self.save_menu(days, "today_menu", "menus_today.json") async def fetch_menu(self, date: datetime.date) -> list: url = self.MENU_URL.format(date=date.strftime("%Y-%m-%d")) headers = { "User-Agent": "BeeBot 1.0" } res = requests.get(url, headers=headers) if res.status_code != 200: return [] bs = BeautifulSoup(res.content, features="lxml") table = bs.find("table", {"id": "menuTable"}) lines = table.find("tbody").findAll("tr", {"data-restoid": str(self.RESTO_ID)}) menus = [] for line in lines: menu = line.find("td", class_="menu").find("div", class_="descr").contents[0].text menu_lines = menu.split("\n") price_cells = line.find("td", class_="prices").findAll("span", class_="price") prices = {} for cell in price_cells: category = cell.find("abbr").text price = cell.contents[-1].replace("CHF", "").strip() prices[category] = float(price) menus.append({ "name": menu_lines[0], "extra": menu_lines[1:], "prices": prices }) return menus def get_settings_menu(self, update: Update) -> list[list[InlineKeyboardButton]]: prefs = self.get_user_pref(update) menu = [ [ InlineKeyboardButton(f"Language: {prefs['lang']}", callback_data="change_language") ], [ InlineKeyboardButton(f"Categories: {' / '.join(prefs['categories'])}", callback_data="change_categories") ] ] return menu def get_language_menu(self, update: Update) -> list[list[InlineKeyboardButton]]: prefs = self.get_user_pref(update) buttons = [] for lang, data in self.LANGUAGES.items(): extra = " ✅" if prefs["lang"] == lang else "" buttons.append( InlineKeyboardButton( data["emoji"] + extra, callback_data=f"set_language:{lang}" ) ) menu = [buttons[i:i+2] for i in range(0, len(buttons), 2)] menu.append([ InlineKeyboardButton("Back to settings", callback_data="back_to_settings") ]) return menu def get_categories_menu(self, update: Update) -> list[list[InlineKeyboardButton]]: prefs = self.get_user_pref(update) buttons = [] for categ, data in self.CATEGORIES.items(): extra = " ✅" if categ in prefs["categories"] else " ❌" buttons.append( InlineKeyboardButton( data["name"] + extra, callback_data=f"toggle_category:{categ}" ) ) menu = [buttons[i:i+2] for i in range(0, len(buttons), 2)] menu.append([ InlineKeyboardButton("Back to settings", callback_data="back_to_settings") ]) return menu if __name__ == '__main__': logger.info("Welcome to BeeBot !") bot = BeeBot(os.getenv("TELEGRAM_TOKEN")) bot.mainloop()