initial commit

This commit is contained in:
2024-09-14 02:06:51 +02:00
commit d9ae6527a1
13 changed files with 704 additions and 0 deletions

263
src/beebot.py Normal file
View File

@ -0,0 +1,263 @@
import asyncio
import datetime
import json
import logging
import os
import sqlite3
import subprocess
from sqlite3 import Connection
from typing import Optional
import requests
import telegram.constants
from bs4 import BeautifulSoup
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
logging.basicConfig(
format="[%(asctime)s] %(name)s/%(levelname)s - %(message)s",
level=logging.INFO
)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class BeeBot:
MENU_URL = "https://www.epfl.ch/campus/restaurants-shops-hotels/fr/industrie21-epfl-valais-wallis/?date={date}"
RESTO_ID = 545
BASE_DIR = os.path.dirname(__file__)
CACHE_PATH = os.path.join(BASE_DIR, "cache.json")
DB_PATH = os.path.join(BASE_DIR, "database.db")
TEMPLATE_PATH = os.path.join(BASE_DIR, "menu.typ")
IMG_DIR = "/tmp/menu_images"
def __init__(self, token: str):
self.tg_token: str = token
self.tg_app = None
self.cache: dict[str, str] = {}
self.db_con: Optional[Connection] = None
self.locks: dict[str, bool] = {}
self.fetch_lock = asyncio.Lock()
self.gen_lock = asyncio.Lock()
def mainloop(self):
logger.info("Initialization")
self.db_con = sqlite3.connect(self.DB_PATH)
self.check_database()
self.load_cache()
app = ApplicationBuilder().token(self.tg_token).build()
app.add_handler(CommandHandler("week", self.cmd_week))
app.add_handler(CommandHandler("today", self.cmd_today))
logger.info("Starting bot")
app.run_polling()
async def cmd_week(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.debug("Received /week")
await self.request_menu(update, context, False)
async def cmd_today(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.debug("Received /today")
await self.request_menu(update, context, True)
async def request_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE, today_only: bool) -> None:
chat_id = update.effective_chat.id
await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING)
prefs = self.get_user_pref(update)
logger.debug(f"User prefs: {prefs}")
categories = prefs["categories"]
img_id = self.get_img_id(today_only, categories)
filename = img_id + ".png"
img_path = os.path.join(self.IMG_DIR, filename)
menu_id = "today_menu" if today_only else "week_menu"
menu_path = os.path.join(self.BASE_DIR, "menus_today.json" if today_only else "menus_week.json")
# If menu needs to be fetched
if not os.path.exists(menu_path) or self.is_outdated(menu_id, today_only):
# Notify user
msg = await update.message.reply_text("The menu is being updated, please wait...")
async with self.fetch_lock:
if today_only:
await self.fetch_today_menu()
else:
await self.fetch_week_menu()
await msg.delete()
# If image needs to be (re)generated
if not os.path.exists(img_path) or self.is_outdated(img_id, today_only):
await self.gen_image(today_only, categories, img_path, img_id)
await context.bot.send_photo(chat_id=chat_id, photo=img_path)
def check_database(self):
cur = self.db_con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS "user" (
"id" INTEGER,
"telegram_id" INTEGER NOT NULL UNIQUE,
"categories" TEXT,
"created_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY("id" AUTOINCREMENT)
);
""")
self.db_con.commit()
cur.close()
def get_user_pref(self, update: Update) -> dict:
user_id = update.effective_user.id
cur = self.db_con.cursor()
res = cur.execute("SELECT categories FROM user WHERE telegram_id=?", (user_id,))
user = res.fetchone()
cur.close()
if user is None:
self.create_user(user_id)
return {
"categories": {"E", "D", "C", "V"}
}
categories = set(user[0].split(","))
return {
"categories": categories
}
def create_user(self, telegram_id: int) -> None:
logger.debug(f"New user with id {telegram_id}")
cur = self.db_con.cursor()
cur.execute(
"INSERT INTO user (telegram_id, categories) VALUES (?, ?)",
(telegram_id, "E,D,C,V")
)
self.db_con.commit()
cur.close()
def load_cache(self):
if os.path.exists(self.CACHE_PATH):
with open(self.CACHE_PATH, "r") as f:
self.cache = json.load(f)
def save_cache(self):
with open(self.CACHE_PATH, "w") as f:
json.dump(self.cache, f)
@staticmethod
def get_img_id(today_only: bool, categories: set[str]) -> str:
categs = "".join(sorted(categories))
layout = "today" if today_only else "week"
return f"{layout}-{categs}"
def is_outdated(self, obj_id: str, today_only: bool) -> bool:
if obj_id not in self.cache:
return True
gen_date = datetime.datetime.strptime(self.cache[obj_id], "%Y-%m-%d")
today = datetime.datetime.today()
if today_only:
if (today - gen_date).days > 0:
return True
else:
delta1 = datetime.timedelta(days=gen_date.weekday())
delta2 = datetime.timedelta(days=today.weekday())
gen_monday = gen_date - delta1
cur_monday = today - delta2
if (cur_monday - gen_monday).days > 0:
return True
return False
async def gen_image(self, today_only: bool, categories: set[str], outpath: str, img_id: str) -> None:
if not os.path.exists(self.IMG_DIR):
os.mkdir(self.IMG_DIR)
logger.info(f"Generating {'today' if today_only else 'week'} menu image for categories {categories}")
menu_path = "menus_today.json" if today_only else "menus_week.json"
subprocess.call([
"typst",
"compile",
"--root", self.BASE_DIR,
"--font-path", os.path.join(self.BASE_DIR, "fonts"),
"--input", f"path={menu_path}",
"--input", f"categories={','.join(categories)}",
self.TEMPLATE_PATH,
outpath
])
self.cache[img_id] = datetime.datetime.today().strftime("%Y-%m-%d")
self.save_cache()
def save_menu(self, days: list, menu_id: str, filename: str) -> None:
with open(os.path.join(self.BASE_DIR, filename), "w") as f:
json.dump(days, f)
self.cache[menu_id] = datetime.datetime.today().strftime("%Y-%m-%d")
self.save_cache()
async def fetch_week_menu(self) -> None:
logger.info("Fetching week menu")
today = datetime.datetime.today()
delta = datetime.timedelta(days=today.weekday())
monday = today - delta
days = []
for i in range(5):
dt = datetime.timedelta(days=i)
date = monday + dt
menus = await self.fetch_menu(date)
days.append({
"date": date.strftime("%Y-%m-%d"),
"menus": menus
})
self.save_menu(days, "week_menu", "menus_week.json")
async def fetch_today_menu(self) -> None:
logger.info("Fetching today menu")
today = datetime.datetime.today()
menus = await self.fetch_menu(today)
days = [{
"date": today.strftime("%Y-%m-%d"),
"menus": menus
}]
self.save_menu(days, "today_menu", "menus_today.json")
async def fetch_menu(self, date: datetime.date) -> list:
url = self.MENU_URL.format(date=date.strftime("%Y-%m-%d"))
headers = {
"User-Agent": "BeeBot 1.0"
}
res = requests.get(url, headers=headers)
if res.status_code != 200:
return []
bs = BeautifulSoup(res.content, features="lxml")
table = bs.find("table", {"id": "menuTable"})
lines = table.find("tbody").findAll("tr", {"data-restoid": str(self.RESTO_ID)})
menus = []
for line in lines:
menu = line.find("td", class_="menu").find("div", class_="descr").contents[0].text
menu_lines = menu.split("\n")
price_cells = line.find("td", class_="prices").findAll("span", class_="price")
prices = {}
for cell in price_cells:
category = cell.find("abbr").text
price = cell.contents[-1].replace("CHF", "").strip()
prices[category] = float(price)
menus.append({
"name": menu_lines[0],
"extra": menu_lines[1:],
"prices": prices
})
return menus
if __name__ == '__main__':
logger.info("Welcome to BeeBot !")
bot = BeeBot(os.getenv("TELEGRAM_TOKEN"))
bot.mainloop()

154
src/menu.typ Normal file
View File

@ -0,0 +1,154 @@
#let path = sys.inputs.at("path", default: "menus.json")
#let categories = sys.inputs.at("categories", default: "E,D,C,V").split(",")
#let days = json(path)
#let nbsp = sym.space.nobreak
#let width = if days.len() > 1 {20cm} else {10cm}
#set page(margin: 0cm, height: auto, width: width)
#let parse-date(date-txt) = {
let (year, month, day) = date-txt.split("-")
return datetime(year: int(year), month: int(month), day: int(day))
}
#let fmt-number(
number,
decimals: 2,
decimal-sep: ".",
thousands-sep: "'"
) = {
let integer = calc.trunc(number)
let decimal = calc.fract(number)
let res = str(integer).clusters()
.rev()
.chunks(3)
.map(c => c.join(""))
.join(thousands-sep)
.rev()
if decimals != 0 {
res += decimal-sep
decimal = decimal * calc.pow(10, decimals)
decimal = calc.round(decimal)
decimal = str(decimal) + ("0" * decimals)
res += decimal.slice(0, decimals)
}
return res
}
#let day-names = (
"Lundi",
"Mardi",
"Mercredi",
"Jeudi",
"Vendredi",
"Samedi",
"Dimanche"
)
#let display-menus(
days,
categories,
day-sep-stroke: rgb("#82BC00") + 1pt,
title-underline-stroke: rgb("#e34243") + 2pt,
menu-sep-stroke: rgb("#ababab") + 1pt,
price-categ-color: rgb("#577F25")
) = {
set text(font: "Liberation Sans", hyphenate: true)
let cols = calc.min(2, days.len())
let cells = ()
for day in days {
let weekday = parse-date(day.date).weekday() - 1
let day-name = day-names.at(weekday)
let menus = ()
for menu in day.menus {
let price-cell = grid.cell(rowspan: 2)[]
let prices = menu.at(
"prices",
default: (:)
)
if menu.keys().contains("price") {
prices = ("": menu.price)
}
let pairs = prices.pairs()
.filter(p => p.first() in categories)
.sorted(key: p => p.last())
let prices = pairs.map(p => {
let categ = emph(
text(
fill: price-categ-color,
p.first()
)
)
let price = fmt-number(p.last())
categ + ":" + nbsp + "CHF" + nbsp + price
})
price-cell = grid.cell(
rowspan: menu.extra.len() + 1,
stack(dir: ttb, spacing: .4em, ..prices)
)
//let price = "CHF" + sym.space.nobreak + str(menu.price)
menus.push(
grid(
columns: (1fr, auto),
column-gutter: .4em,
row-gutter: .4em,
[*#menu.name*],
price-cell,
..menu.extra.map(l => text(10pt, l))
)
)
}
let title = align(
center,
text(size: 14pt, weight: "bold", day-name)
)
let title-box = box(
width: 50%,
inset: .6em,
stroke: (bottom: title-underline-stroke),
title
)
let title-cell = grid.cell(align: center, title-box)
cells.push(box(width: 100%, grid(
columns: (100%),
inset: (x, y) => (
top: if y == 0 {0pt} else {.8em},
bottom: if y == 0 {0pt} else {2em}
),
stroke: (_, y) => (top: if y < 2 {none} else {menu-sep-stroke}),
title-cell,
..menus
)))
}
if calc.rem(cells.len(), 2) == 1 {
cells.last() = grid.cell(colspan: calc.min(2, cols), cells.last())
}
grid(
columns: (100% / cols,) * cols,
inset: (x: 1.5em, y: .8em),
stroke: (x, y) => {
let borders = (:)
if x != 0 {
borders.insert("left", day-sep-stroke)
}
if y != 0 {
borders.insert("top", day-sep-stroke)
}
return borders
},
..cells
)
}
#display-menus(days, categories)

4
src/requirements.txt Normal file
View File

@ -0,0 +1,4 @@
requests==2.25.1
beautifulsoup4==4.10.0
lxml==4.8.0
python-telegram-bot==21.5