import json import os import random import re import shutil import signal import subprocess import psutil import requests from selenium.webdriver import (Firefox, FirefoxOptions, FirefoxProfile, FirefoxService) from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait class ANSI: CTRL = "\x1b[" RESET = 0 BOLD = 1 ITALIC = 3 UNDERLINE = 4 BLACK = 30 RED = 31 GREEN = 32 YELLOW = 33 BLUE = 34 MAGENTA = 35 CYAN = 36 WHITE = 37 class Logger: def __init__(self, tag: str = "", styles: list[int] = None) -> None: self.tag: str = tag if styles is None: styles = [] self.styles: list[int] = styles def log(self, msg: str): style = ANSI.CTRL + ";".join(map(str, self.styles)) + "m" reset = ANSI.CTRL + str(ANSI.RESET) + "m" tag = f"[{self.tag}] " if self.tag else "" print(style + tag + msg + reset) infoLogger = Logger("INFO", [ANSI.BLUE+60]) warningLogger = Logger("WARNING", [ANSI.YELLOW]) errorLogger = Logger("ERROR", [ANSI.BOLD, ANSI.RED]) vpnLogger = Logger("", [ANSI.ITALIC, ANSI.BLACK+60]) def getPublicIp(): return requests.head("https://wikipedia.org").headers["X-Client-IP"] if getPublicIp() == "153.109.1.93": warningLogger.log("You are already in the HEI network. You cannot use the VPN") exit() IS_SNAP = b"snap" in subprocess.run(["which", "firefox"], stdout=subprocess.PIPE).stdout DRIVER_PATH = "/snap/bin/geckodriver" if IS_SNAP else "/usr/local/bin/geckodriver" if IS_SNAP: infoLogger.log("Firefox installed as snap") ROOT = "/tmp/snap-private-tmp/snap.firefox/" if IS_SNAP else "/" APP_PATH = os.path.abspath(os.path.expanduser("~/.config/hei-vpn")) PERSISTENT_PROFILE_PATH = os.path.join(APP_PATH, "firefox_profile/") if not os.path.isdir(APP_PATH): os.mkdir(APP_PATH) class AnyEc: """ Use with WebDriverWait to combine expected_conditions in an OR. """ def __init__(self, *args): self.ecs = args def __call__(self, driver): for fn in self.ecs: try: res = fn(driver) if res: return True except: pass def startBrowser(headless: bool = True) -> tuple[Firefox, FirefoxProfile]: infoLogger.log("Loading browser...") profilePath = PERSISTENT_PROFILE_PATH if not os.path.isdir(profilePath): dataDir = "~/snap/firefox/common/.mozilla/firefox" if IS_SNAP else "~/.mozilla/firefox" dataDir = os.path.expanduser(dataDir) dirs = os.listdir(dataDir) for d in dirs: if ".default" in d: profilePath = os.path.join(dataDir, d) if d.endswith(".default-release"): break else: warningLogger.log("Default firefox profile not found. Please create one manually") exit() profile = FirefoxProfile(profilePath) infoLogger.log(f"Using firefox profile in {profilePath}") options = FirefoxOptions() options.profile = profile distinguishkey = "-distinguishkey" + str(random.randint(111111, 999999)) options.add_argument(distinguishkey) if headless: options.add_argument("--headless") service = FirefoxService(executable_path=DRIVER_PATH) driver = Firefox(options, service) return driver, distinguishkey def closeBrowser(driver: Firefox, distinguishkey: str) -> None: infoLogger.log("Closing browser...") for pid in psutil.pids(): try: cmdline = open("/proc/"+str(pid)+"/cmdline", "r").read() if distinguishkey in cmdline: realProfilePath = cmdline.split('-profile')[1].split(' ')[0].replace('\x00', '') realProfilePath = os.path.abspath(os.path.join(ROOT, "./" + realProfilePath)) break except: pass else: errorLogger.log("Cannot find firefox pid") exit() infoLogger.log(f"Profile is stored in {realProfilePath}") psutil.Process(pid).kill() # kill firefox (nicely) and unlock profile lock if os.path.isdir(PERSISTENT_PROFILE_PATH): shutil.rmtree(PERSISTENT_PROFILE_PATH) # Copy profile subprocess.run(["sudo", "cp", "-r", realProfilePath, PERSISTENT_PROFILE_PATH]) # Grant user access to directory subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", PERSISTENT_PROFILE_PATH]) subprocess.run(["sudo", "chmod", "-R", "0664", PERSISTENT_PROFILE_PATH]) subprocess.run(["sudo", "chmod", "-R", "+X", PERSISTENT_PROFILE_PATH]) try: driver.quit() except: pass # error expected because we killed the processed def doLogin() -> None: print("+--------------------------+") print("| Prompting user for login |") print("+--------------------------+") driver, dkey = startBrowser(False) driver.get(config["gateway"]) wait = WebDriverWait(driver, 9999) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) closeBrowser(driver, dkey) with open("config.json", "r") as f: config = json.load(f) url = config["gateway"] timeout = config["timeout"] print("+-------------------+") print("| Accessing gateway |") print("+-------------------+") driver, dkey = startBrowser() infoLogger.log("Waiting for page to load...") driver.get(url) wait = WebDriverWait(driver, timeout) wait.until(AnyEc( EC.url_contains("microsoft"), EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")) )) if "microsoft" in driver.current_url: infoLogger.log("Detected login page") closeBrowser(driver, dkey) doLogin() driver, dkey = startBrowser() driver.get(url) wait = WebDriverWait(driver, timeout) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) driver.implicitly_wait(0) continueBtn = driver.find_elements(By.ID, "btnContinue") driver.implicitly_wait(3) if len(continueBtn) > 0: continueBtn = continueBtn[0] infoLogger.log("Detected session already running button") continueBtn.click() wait.until(EC.presence_of_element_located((By.ID, "table_welcome_2"))) infoLogger.log("Extracting cookies...") DSID = driver.get_cookie("DSID")["value"] infoLogger.log(f"DSID = {DSID}") closeBrowser(driver, dkey) if re.match(r"[a-f0-9]{32}", DSID): infoLogger.log("Starting VPN...") proc = subprocess.Popen(["sudo", "openconnect", "--protocol=nc", "-C", f"DSID={DSID}", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: while True: line = proc.stdout.readline().decode("utf-8").strip() if not line: break vpnLogger.log(line) if line == "ESP session established with server": infoLogger.log("VPN is up and running") except KeyboardInterrupt: proc.send_signal(signal.SIGINT) proc.wait() if proc.returncode == 0: infoLogger.log("VPN stopped") else: errorLogger.log(f"VPN stopped unexpectedly (code: {proc.returncode})") else: errorLogger.log(f"Invalid DSID cookie format: {DSID}")