import json import os import random import re import shutil import signal import subprocess import psutil import requests from selenium.webdriver import (Firefox, FirefoxOptions, FirefoxProfile, FirefoxService) from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait class ANSI: CTRL = "\x1b[" RESET = 0 BOLD = 1 ITALIC = 3 UNDERLINE = 4 BLACK = 30 RED = 31 GREEN = 32 YELLOW = 33 BLUE = 34 MAGENTA = 35 CYAN = 36 WHITE = 37 class Logger: def __init__(self, tag: str = "", styles: list[int] = None) -> None: self.tag: str = tag if styles is None: styles = [] self.styles: list[int] = styles def log(self, msg: str): style = ANSI.CTRL + ";".join(map(str, self.styles)) + "m" reset = ANSI.CTRL + str(ANSI.RESET) + "m" tag = f"[{self.tag}] " if self.tag else "" print(style + tag + msg + reset) infoLogger = Logger("INFO", [ANSI.BLUE+60]) warningLogger = Logger("WARNING", [ANSI.YELLOW]) errorLogger = Logger("ERROR", [ANSI.BOLD, ANSI.RED]) vpnLogger = Logger("", [ANSI.ITALIC, ANSI.BLACK+60]) def getPublicIp(): return requests.head("https://wikipedia.org").headers["X-Client-IP"] if getPublicIp() == "153.109.1.93": warningLogger.log("You are already in the HEI network. You cannot use the VPN") exit() IS_SNAP = os.path.isfile("/snap/bin/firefox") and os.access("/snap/bin/firefox", os.X_OK) DRIVER_PATH = "/snap/bin/geckodriver" if IS_SNAP else "/usr/local/bin/geckodriver" if IS_SNAP: infoLogger.log("Firefox installed as snap") ROOT = "/tmp/snap-private-tmp/snap.firefox/" if IS_SNAP else "/" APP_PATH = os.path.abspath(os.path.expanduser("~/.config/hei-vpn")) PERSISTENT_PROFILE_PATH = os.path.join(APP_PATH, "firefox_profile/") if not os.path.isdir(APP_PATH): os.mkdir(APP_PATH) # by artfulrobot on StackOverflow # https://stackoverflow.com/a/16464305/11109181 class AnyEc: """ Use with WebDriverWait to combine expected_conditions in an OR. """ def __init__(self, *args): self.ecs = args def __call__(self, driver): for fn in self.ecs: try: res = fn(driver) if res: return True except: pass def startBrowser(headless: bool = True) -> tuple[Firefox, FirefoxProfile]: infoLogger.log("Loading browser...") profilePath = PERSISTENT_PROFILE_PATH if not os.path.isdir(profilePath): dataDir = "~/snap/firefox/common/.mozilla/firefox" if IS_SNAP else "~/.mozilla/firefox" dataDir = os.path.expanduser(dataDir) dirs = os.listdir(dataDir) for d in dirs: if ".default" in d: profilePath = os.path.join(dataDir, d) if d.endswith(".default-release"): break else: if profilePath == PERSISTENT_PROFILE_PATH: warningLogger.log("Default firefox profile not found. Please create one manually") exit() infoLogger.log(f"Using firefox profile in {profilePath}") options = FirefoxOptions() profile = FirefoxProfile(profilePath) options.profile = profile distinguishkey = "-distinguishkey" + str(random.randint(111111, 999999)) options.add_argument(distinguishkey) if headless: options.add_argument("--headless") service = FirefoxService(executable_path=DRIVER_PATH, log_output="/dev/null") driver = Firefox(options, service) return driver, distinguishkey def closeBrowser(driver: Firefox, distinguishkey: str) -> None: infoLogger.log("Closing browser...") # Workaround by sh4dowb on GitHub # https://gist.github.com/sh4dowb/0054e4b25bf81bccca6f9e622c5cb160 for pid in psutil.pids(): try: cmdline = open("/proc/"+str(pid)+"/cmdline", "r").read() if distinguishkey in cmdline: realProfilePath = cmdline.split('-profile')[1].split(' ')[0].replace('\x00', '') realProfilePath = os.path.abspath(os.path.join(ROOT, "./" + realProfilePath)) break except: pass else: errorLogger.log("Cannot find firefox pid") exit() infoLogger.log(f"Profile is stored in {realProfilePath}") psutil.Process(pid).kill() # kill firefox (nicely) and unlock profile lock if os.path.isdir(PERSISTENT_PROFILE_PATH): shutil.rmtree(PERSISTENT_PROFILE_PATH) # Copy profile r = subprocess.run(["sudo", "cp", "-r", realProfilePath, PERSISTENT_PROFILE_PATH]) if r.returncode: errorLogger.log("Could not copy temporary profile to persistent directory") errorLogger.log(f"Tried: sudo cp -r {realProfilePath} {PERSISTENT_PROFILE_PATH}") exit() # Remove lock (if still present) lockPath = os.path.join(PERSISTENT_PROFILE_PATH, "lock") if os.path.isfile(lockPath): subprocess.run(["sudo", "rm", lockPath]) # Grant user access to directory subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", PERSISTENT_PROFILE_PATH]) subprocess.run(["sudo", "chmod", "-R", "0644", PERSISTENT_PROFILE_PATH]) subprocess.run(["sudo", "chmod", "-R", "+X", PERSISTENT_PROFILE_PATH]) try: driver.quit() except: pass # error expected because we killed the processe # Cleanup if os.path.isdir(realProfilePath): # Extra security to avoid deleting / recursively :) # Costs nothing to be extra safe if realProfilePath.startswith("/tmp/") and \ realProfilePath.startswith(os.path.abspath(os.path.join(ROOT, "tmp"))) and \ len(realProfilePath.split(os.path.sep)) >= 2 and \ ".." not in realProfilePath: subprocess.run(["sudo", "rm", "-r", realProfilePath]) def doLogin() -> None: print("+--------------------------+") print("| Prompting user for login |") print("+--------------------------+") driver, dkey = startBrowser(False) driver.get(config["gateway"]) wait = WebDriverWait(driver, 9999) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) closeBrowser(driver, dkey) with open("config.json", "r") as f: config = json.load(f) url = config["gateway"] timeout = config["timeout"] print("+-------------------+") print("| Accessing gateway |") print("+-------------------+") driver, dkey = startBrowser() infoLogger.log("Waiting for page to load...") driver.get(url) wait = WebDriverWait(driver, timeout) wait.until(AnyEc( EC.url_contains("microsoft"), EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")) )) if "microsoft" in driver.current_url: infoLogger.log("Detected login page") closeBrowser(driver, dkey) doLogin() driver, dkey = startBrowser() driver.get(url) wait = WebDriverWait(driver, timeout) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))) driver.implicitly_wait(0) continueBtn = driver.find_elements(By.ID, "btnContinue") driver.implicitly_wait(3) if len(continueBtn) > 0: continueBtn = continueBtn[0] infoLogger.log("Detected session already running button") continueBtn.click() wait.until(EC.presence_of_element_located((By.ID, "table_welcome_2"))) infoLogger.log("Extracting cookies...") DSID = driver.get_cookie("DSID")["value"] infoLogger.log(f"DSID = {DSID}") closeBrowser(driver, dkey) if re.match(r"[a-f0-9]{32}", DSID): infoLogger.log("Starting VPN...") proc = subprocess.Popen(["sudo", "openconnect", "--protocol=nc", "-C", f"DSID={DSID}", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: while True: line = proc.stdout.readline().decode("utf-8").strip() if not line: break vpnLogger.log(line) if line == "ESP session established with server": infoLogger.log("VPN is up and running") except KeyboardInterrupt: print() proc.send_signal(signal.SIGINT) proc.wait() if proc.returncode == 0: infoLogger.log("VPN stopped") else: errorLogger.log(f"VPN stopped unexpectedly (code: {proc.returncode})") else: errorLogger.log(f"Invalid DSID cookie format: {DSID}")