2024-03-18 16:25:18 +00:00
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import random
|
|
|
|
import re
|
|
|
|
import shutil
|
|
|
|
import signal
|
|
|
|
import subprocess
|
|
|
|
|
|
|
|
import psutil
|
|
|
|
import requests
|
|
|
|
from selenium.webdriver import (Firefox, FirefoxOptions, FirefoxProfile,
|
|
|
|
FirefoxService)
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
|
|
|
|
|
|
class ANSI:
|
|
|
|
CTRL = "\x1b["
|
|
|
|
RESET = 0
|
|
|
|
BOLD = 1
|
|
|
|
ITALIC = 3
|
|
|
|
UNDERLINE = 4
|
|
|
|
BLACK = 30
|
|
|
|
RED = 31
|
|
|
|
GREEN = 32
|
|
|
|
YELLOW = 33
|
|
|
|
BLUE = 34
|
|
|
|
MAGENTA = 35
|
|
|
|
CYAN = 36
|
|
|
|
WHITE = 37
|
|
|
|
|
|
|
|
class Logger:
|
|
|
|
def __init__(self, tag: str = "", styles: list[int] = None) -> None:
|
|
|
|
self.tag: str = tag
|
|
|
|
if styles is None:
|
|
|
|
styles = []
|
|
|
|
self.styles: list[int] = styles
|
|
|
|
|
|
|
|
def log(self, msg: str):
|
|
|
|
style = ANSI.CTRL + ";".join(map(str, self.styles)) + "m"
|
|
|
|
reset = ANSI.CTRL + str(ANSI.RESET) + "m"
|
|
|
|
tag = f"[{self.tag}] " if self.tag else ""
|
|
|
|
print(style + tag + msg + reset)
|
|
|
|
|
|
|
|
infoLogger = Logger("INFO", [ANSI.BLUE+60])
|
|
|
|
warningLogger = Logger("WARNING", [ANSI.YELLOW])
|
|
|
|
errorLogger = Logger("ERROR", [ANSI.BOLD, ANSI.RED])
|
|
|
|
vpnLogger = Logger("", [ANSI.ITALIC, ANSI.BLACK+60])
|
|
|
|
|
|
|
|
def getPublicIp():
|
|
|
|
return requests.head("https://wikipedia.org").headers["X-Client-IP"]
|
|
|
|
|
|
|
|
if getPublicIp() == "153.109.1.93":
|
|
|
|
warningLogger.log("You are already in the HEI network. You cannot use the VPN")
|
|
|
|
exit()
|
|
|
|
|
2024-03-18 17:56:11 +00:00
|
|
|
IS_SNAP = os.path.isfile("/snap/bin/firefox") and os.access("/snap/bin/firefox", os.X_OK)
|
2024-03-18 16:25:18 +00:00
|
|
|
DRIVER_PATH = "/snap/bin/geckodriver" if IS_SNAP else "/usr/local/bin/geckodriver"
|
|
|
|
|
|
|
|
if IS_SNAP:
|
|
|
|
infoLogger.log("Firefox installed as snap")
|
|
|
|
|
|
|
|
ROOT = "/tmp/snap-private-tmp/snap.firefox/" if IS_SNAP else "/"
|
|
|
|
APP_PATH = os.path.abspath(os.path.expanduser("~/.config/hei-vpn"))
|
|
|
|
PERSISTENT_PROFILE_PATH = os.path.join(APP_PATH, "firefox_profile/")
|
|
|
|
if not os.path.isdir(APP_PATH):
|
|
|
|
os.mkdir(APP_PATH)
|
|
|
|
|
2024-03-18 19:42:45 +00:00
|
|
|
# by artfulrobot on StackOverflow
|
|
|
|
# https://stackoverflow.com/a/16464305/11109181
|
2024-03-18 16:25:18 +00:00
|
|
|
class AnyEc:
|
|
|
|
""" Use with WebDriverWait to combine expected_conditions
|
|
|
|
in an OR.
|
|
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
|
|
self.ecs = args
|
|
|
|
def __call__(self, driver):
|
|
|
|
for fn in self.ecs:
|
|
|
|
try:
|
|
|
|
res = fn(driver)
|
|
|
|
if res:
|
|
|
|
return True
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def startBrowser(headless: bool = True) -> tuple[Firefox, FirefoxProfile]:
|
|
|
|
infoLogger.log("Loading browser...")
|
|
|
|
|
|
|
|
profilePath = PERSISTENT_PROFILE_PATH
|
|
|
|
if not os.path.isdir(profilePath):
|
|
|
|
dataDir = "~/snap/firefox/common/.mozilla/firefox" if IS_SNAP else "~/.mozilla/firefox"
|
|
|
|
dataDir = os.path.expanduser(dataDir)
|
|
|
|
dirs = os.listdir(dataDir)
|
|
|
|
|
|
|
|
for d in dirs:
|
|
|
|
if ".default" in d:
|
|
|
|
profilePath = os.path.join(dataDir, d)
|
|
|
|
|
|
|
|
if d.endswith(".default-release"):
|
|
|
|
break
|
|
|
|
else:
|
2024-03-18 17:56:11 +00:00
|
|
|
if profilePath == PERSISTENT_PROFILE_PATH:
|
|
|
|
warningLogger.log("Default firefox profile not found. Please create one manually")
|
|
|
|
exit()
|
2024-03-18 16:25:18 +00:00
|
|
|
|
|
|
|
infoLogger.log(f"Using firefox profile in {profilePath}")
|
|
|
|
|
|
|
|
options = FirefoxOptions()
|
2024-03-18 17:56:11 +00:00
|
|
|
profile = FirefoxProfile(profilePath)
|
2024-03-18 16:25:18 +00:00
|
|
|
options.profile = profile
|
|
|
|
distinguishkey = "-distinguishkey" + str(random.randint(111111, 999999))
|
|
|
|
options.add_argument(distinguishkey)
|
|
|
|
|
|
|
|
if headless:
|
|
|
|
options.add_argument("--headless")
|
|
|
|
|
2024-03-18 17:56:11 +00:00
|
|
|
service = FirefoxService(executable_path=DRIVER_PATH, log_output="/dev/null")
|
2024-03-18 16:25:18 +00:00
|
|
|
driver = Firefox(options, service)
|
|
|
|
|
|
|
|
return driver, distinguishkey
|
|
|
|
|
|
|
|
def closeBrowser(driver: Firefox, distinguishkey: str) -> None:
|
|
|
|
infoLogger.log("Closing browser...")
|
|
|
|
|
2024-03-18 19:42:45 +00:00
|
|
|
# Workaround by sh4dowb on GitHub
|
|
|
|
# https://gist.github.com/sh4dowb/0054e4b25bf81bccca6f9e622c5cb160
|
2024-03-18 16:25:18 +00:00
|
|
|
for pid in psutil.pids():
|
|
|
|
try:
|
|
|
|
cmdline = open("/proc/"+str(pid)+"/cmdline", "r").read()
|
|
|
|
if distinguishkey in cmdline:
|
|
|
|
realProfilePath = cmdline.split('-profile')[1].split(' ')[0].replace('\x00', '')
|
|
|
|
realProfilePath = os.path.abspath(os.path.join(ROOT, "./" + realProfilePath))
|
|
|
|
break
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
errorLogger.log("Cannot find firefox pid")
|
|
|
|
exit()
|
|
|
|
|
|
|
|
infoLogger.log(f"Profile is stored in {realProfilePath}")
|
|
|
|
|
2024-03-18 17:56:11 +00:00
|
|
|
psutil.Process(pid).kill() # kill firefox (nicely) and unlock profile lock
|
2024-03-18 16:25:18 +00:00
|
|
|
if os.path.isdir(PERSISTENT_PROFILE_PATH):
|
|
|
|
shutil.rmtree(PERSISTENT_PROFILE_PATH)
|
|
|
|
|
|
|
|
# Copy profile
|
2024-03-18 17:56:11 +00:00
|
|
|
r = subprocess.run(["sudo", "cp", "-r", realProfilePath, PERSISTENT_PROFILE_PATH])
|
|
|
|
|
|
|
|
if r.returncode:
|
|
|
|
errorLogger.log("Could not copy temporary profile to persistent directory")
|
|
|
|
errorLogger.log(f"Tried: sudo cp -r {realProfilePath} {PERSISTENT_PROFILE_PATH}")
|
|
|
|
exit()
|
|
|
|
|
2024-03-18 19:27:33 +00:00
|
|
|
# Remove lock (if still present)
|
|
|
|
lockPath = os.path.join(PERSISTENT_PROFILE_PATH, "lock")
|
|
|
|
if os.path.isfile(lockPath):
|
|
|
|
subprocess.run(["sudo", "rm", lockPath])
|
2024-03-18 16:25:18 +00:00
|
|
|
|
|
|
|
# Grant user access to directory
|
|
|
|
subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", PERSISTENT_PROFILE_PATH])
|
2024-03-18 19:27:33 +00:00
|
|
|
subprocess.run(["sudo", "chmod", "-R", "0644", PERSISTENT_PROFILE_PATH])
|
2024-03-18 16:25:18 +00:00
|
|
|
subprocess.run(["sudo", "chmod", "-R", "+X", PERSISTENT_PROFILE_PATH])
|
|
|
|
|
|
|
|
try:
|
|
|
|
driver.quit()
|
|
|
|
except:
|
2024-03-18 19:27:33 +00:00
|
|
|
pass # error expected because we killed the processe
|
|
|
|
|
|
|
|
# Cleanup
|
|
|
|
if os.path.isdir(realProfilePath):
|
|
|
|
# Extra security to avoid deleting / recursively :)
|
|
|
|
# Costs nothing to be extra safe
|
|
|
|
if realProfilePath.startswith("/tmp/") and \
|
|
|
|
realProfilePath.startswith(os.path.abspath(os.path.join(ROOT, "tmp"))) and \
|
|
|
|
len(realProfilePath.split(os.path.sep)) >= 2 and \
|
|
|
|
".." not in realProfilePath:
|
|
|
|
|
|
|
|
subprocess.run(["sudo", "rm", "-r", realProfilePath])
|
2024-03-18 16:25:18 +00:00
|
|
|
|
|
|
|
def doLogin() -> None:
|
|
|
|
print("+--------------------------+")
|
|
|
|
print("| Prompting user for login |")
|
|
|
|
print("+--------------------------+")
|
|
|
|
driver, dkey = startBrowser(False)
|
|
|
|
|
|
|
|
driver.get(config["gateway"])
|
|
|
|
wait = WebDriverWait(driver, 9999)
|
|
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")))
|
|
|
|
|
|
|
|
closeBrowser(driver, dkey)
|
|
|
|
|
|
|
|
with open("config.json", "r") as f:
|
|
|
|
config = json.load(f)
|
|
|
|
|
|
|
|
url = config["gateway"]
|
|
|
|
timeout = config["timeout"]
|
|
|
|
|
|
|
|
print("+-------------------+")
|
|
|
|
print("| Accessing gateway |")
|
|
|
|
print("+-------------------+")
|
|
|
|
|
|
|
|
driver, dkey = startBrowser()
|
|
|
|
|
|
|
|
infoLogger.log("Waiting for page to load...")
|
|
|
|
driver.get(url)
|
|
|
|
wait = WebDriverWait(driver, timeout)
|
|
|
|
wait.until(AnyEc(
|
|
|
|
EC.url_contains("microsoft"),
|
|
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))
|
|
|
|
))
|
|
|
|
|
|
|
|
if "microsoft" in driver.current_url:
|
|
|
|
infoLogger.log("Detected login page")
|
|
|
|
closeBrowser(driver, dkey)
|
|
|
|
doLogin()
|
|
|
|
driver, dkey = startBrowser()
|
|
|
|
driver.get(url)
|
|
|
|
wait = WebDriverWait(driver, timeout)
|
|
|
|
|
|
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")))
|
|
|
|
|
|
|
|
driver.implicitly_wait(0)
|
|
|
|
continueBtn = driver.find_elements(By.ID, "btnContinue")
|
|
|
|
driver.implicitly_wait(3)
|
|
|
|
|
|
|
|
if len(continueBtn) > 0:
|
|
|
|
continueBtn = continueBtn[0]
|
|
|
|
infoLogger.log("Detected session already running button")
|
|
|
|
continueBtn.click()
|
|
|
|
|
|
|
|
wait.until(EC.presence_of_element_located((By.ID, "table_welcome_2")))
|
|
|
|
|
|
|
|
infoLogger.log("Extracting cookies...")
|
|
|
|
DSID = driver.get_cookie("DSID")["value"]
|
|
|
|
infoLogger.log(f"DSID = {DSID}")
|
|
|
|
closeBrowser(driver, dkey)
|
|
|
|
|
|
|
|
if re.match(r"[a-f0-9]{32}", DSID):
|
|
|
|
infoLogger.log("Starting VPN...")
|
|
|
|
proc = subprocess.Popen(["sudo", "openconnect", "--protocol=nc", "-C", f"DSID={DSID}", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
line = proc.stdout.readline().decode("utf-8").strip()
|
|
|
|
if not line: break
|
|
|
|
vpnLogger.log(line)
|
|
|
|
if line == "ESP session established with server":
|
|
|
|
infoLogger.log("VPN is up and running")
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
2024-03-18 17:56:11 +00:00
|
|
|
print()
|
2024-04-12 08:16:12 +00:00
|
|
|
infoLogger.log("Stopping VPN...")
|
2024-03-18 16:25:18 +00:00
|
|
|
proc.send_signal(signal.SIGINT)
|
|
|
|
proc.wait()
|
|
|
|
|
|
|
|
if proc.returncode == 0:
|
|
|
|
infoLogger.log("VPN stopped")
|
|
|
|
else:
|
|
|
|
errorLogger.log(f"VPN stopped unexpectedly (code: {proc.returncode})")
|
|
|
|
else:
|
|
|
|
errorLogger.log(f"Invalid DSID cookie format: {DSID}")
|