From 575bdc4968ca41e77a73e176b0d97bdf1bd71400 Mon Sep 17 00:00:00 2001 From: LordBaryhobal Date: Mon, 18 Mar 2024 17:25:18 +0100 Subject: [PATCH] initial commit --- config.json | 4 + main.py | 232 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 config.json create mode 100644 main.py diff --git a/config.json b/config.json new file mode 100644 index 0000000..9114273 --- /dev/null +++ b/config.json @@ -0,0 +1,4 @@ +{ + "gateway": "https://remote.hevs.ch", + "timeout": 20 +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..99f2ddf --- /dev/null +++ b/main.py @@ -0,0 +1,232 @@ +import json +import os +import random +import re +import shutil +import signal +import subprocess + +import psutil +import requests +from selenium.webdriver import (Firefox, FirefoxOptions, FirefoxProfile, + FirefoxService) +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait + +class ANSI: + CTRL = "\x1b[" + RESET = 0 + BOLD = 1 + ITALIC = 3 + UNDERLINE = 4 + BLACK = 30 + RED = 31 + GREEN = 32 + YELLOW = 33 + BLUE = 34 + MAGENTA = 35 + CYAN = 36 + WHITE = 37 + +class Logger: + def __init__(self, tag: str = "", styles: list[int] = None) -> None: + self.tag: str = tag + if styles is None: + styles = [] + self.styles: list[int] = styles + + def log(self, msg: str): + style = ANSI.CTRL + ";".join(map(str, self.styles)) + "m" + reset = ANSI.CTRL + str(ANSI.RESET) + "m" + tag = f"[{self.tag}] " if self.tag else "" + print(style + tag + msg + reset) + +infoLogger = Logger("INFO", [ANSI.BLUE+60]) +warningLogger = Logger("WARNING", [ANSI.YELLOW]) +errorLogger = Logger("ERROR", [ANSI.BOLD, ANSI.RED]) +vpnLogger = Logger("", [ANSI.ITALIC, ANSI.BLACK+60]) + +def getPublicIp(): + return requests.head("https://wikipedia.org").headers["X-Client-IP"] + +if getPublicIp() == "153.109.1.93": + warningLogger.log("You are already in the HEI network. You cannot use the VPN") + exit() + +IS_SNAP = b"snap" in subprocess.run(["which", "firefox"], stdout=subprocess.PIPE).stdout +DRIVER_PATH = "/snap/bin/geckodriver" if IS_SNAP else "/usr/local/bin/geckodriver" + +if IS_SNAP: + infoLogger.log("Firefox installed as snap") + +ROOT = "/tmp/snap-private-tmp/snap.firefox/" if IS_SNAP else "/" +APP_PATH = os.path.abspath(os.path.expanduser("~/.config/hei-vpn")) +PERSISTENT_PROFILE_PATH = os.path.join(APP_PATH, "firefox_profile/") +if not os.path.isdir(APP_PATH): + os.mkdir(APP_PATH) + +class AnyEc: + """ Use with WebDriverWait to combine expected_conditions + in an OR. + """ + def __init__(self, *args): + self.ecs = args + def __call__(self, driver): + for fn in self.ecs: + try: + res = fn(driver) + if res: + return True + except: + pass + +def startBrowser(headless: bool = True) -> tuple[Firefox, FirefoxProfile]: + infoLogger.log("Loading browser...") + + profilePath = PERSISTENT_PROFILE_PATH + if not os.path.isdir(profilePath): + dataDir = "~/snap/firefox/common/.mozilla/firefox" if IS_SNAP else "~/.mozilla/firefox" + dataDir = os.path.expanduser(dataDir) + dirs = os.listdir(dataDir) + + for d in dirs: + if ".default" in d: + profilePath = os.path.join(dataDir, d) + + if d.endswith(".default-release"): + break + else: + warningLogger.log("Default firefox profile not found. Please create one manually") + exit() + + profile = FirefoxProfile(profilePath) + infoLogger.log(f"Using firefox profile in {profilePath}") + + options = FirefoxOptions() + options.profile = profile + distinguishkey = "-distinguishkey" + str(random.randint(111111, 999999)) + options.add_argument(distinguishkey) + + if headless: + options.add_argument("--headless") + + service = FirefoxService(executable_path=DRIVER_PATH) + driver = Firefox(options, service) + + return driver, distinguishkey + +def closeBrowser(driver: Firefox, distinguishkey: str) -> None: + infoLogger.log("Closing browser...") + + for pid in psutil.pids(): + try: + cmdline = open("/proc/"+str(pid)+"/cmdline", "r").read() + if distinguishkey in cmdline: + realProfilePath = cmdline.split('-profile')[1].split(' ')[0].replace('\x00', '') + realProfilePath = os.path.abspath(os.path.join(ROOT, "./" + realProfilePath)) + break + except: + pass + else: + errorLogger.log("Cannot find firefox pid") + exit() + + infoLogger.log(f"Profile is stored in {realProfilePath}") + + psutil.Process(pid).kill() # kill firefox (nicely) and unlock profile lock + if os.path.isdir(PERSISTENT_PROFILE_PATH): + shutil.rmtree(PERSISTENT_PROFILE_PATH) + + # Copy profile + subprocess.run(["sudo", "cp", "-r", realProfilePath, PERSISTENT_PROFILE_PATH]) + + # Grant user access to directory + subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", PERSISTENT_PROFILE_PATH]) + subprocess.run(["sudo", "chmod", "-R", "0664", PERSISTENT_PROFILE_PATH]) + subprocess.run(["sudo", "chmod", "-R", "+X", PERSISTENT_PROFILE_PATH]) + + try: + driver.quit() + except: + pass # error expected because we killed the processed + +def doLogin() -> None: + print("+--------------------------+") + print("| Prompting user for login |") + print("+--------------------------+") + driver, dkey = startBrowser(False) + + driver.get(config["gateway"]) + wait = WebDriverWait(driver, 9999) + wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) + + closeBrowser(driver, dkey) + +with open("config.json", "r") as f: + config = json.load(f) + +url = config["gateway"] +timeout = config["timeout"] + +print("+-------------------+") +print("| Accessing gateway |") +print("+-------------------+") + +driver, dkey = startBrowser() + +infoLogger.log("Waiting for page to load...") +driver.get(url) +wait = WebDriverWait(driver, timeout) +wait.until(AnyEc( + EC.url_contains("microsoft"), + EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")) +)) + +if "microsoft" in driver.current_url: + infoLogger.log("Detected login page") + closeBrowser(driver, dkey) + doLogin() + driver, dkey = startBrowser() + driver.get(url) + wait = WebDriverWait(driver, timeout) + +wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) + +driver.implicitly_wait(0) +continueBtn = driver.find_elements(By.ID, "btnContinue") +driver.implicitly_wait(3) + +if len(continueBtn) > 0: + continueBtn = continueBtn[0] + infoLogger.log("Detected session already running button") + continueBtn.click() + +wait.until(EC.presence_of_element_located((By.ID, "table_welcome_2"))) + +infoLogger.log("Extracting cookies...") +DSID = driver.get_cookie("DSID")["value"] +infoLogger.log(f"DSID = {DSID}") +closeBrowser(driver, dkey) + +if re.match(r"[a-f0-9]{32}", DSID): + infoLogger.log("Starting VPN...") + proc = subprocess.Popen(["sudo", "openconnect", "--protocol=nc", "-C", f"DSID={DSID}", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + try: + while True: + line = proc.stdout.readline().decode("utf-8").strip() + if not line: break + vpnLogger.log(line) + if line == "ESP session established with server": + infoLogger.log("VPN is up and running") + + except KeyboardInterrupt: + proc.send_signal(signal.SIGINT) + proc.wait() + + if proc.returncode == 0: + infoLogger.log("VPN stopped") + else: + errorLogger.log(f"VPN stopped unexpectedly (code: {proc.returncode})") +else: + errorLogger.log(f"Invalid DSID cookie format: {DSID}") \ No newline at end of file