hei-vpn-for-linux/main.py

255 lines
8.2 KiB
Python
Raw Normal View History

2024-03-18 16:25:18 +00:00
import json
import os
import random
import re
import shutil
import signal
import subprocess
import psutil
import requests
from selenium.webdriver import (Firefox, FirefoxOptions, FirefoxProfile,
FirefoxService)
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
class ANSI:
CTRL = "\x1b["
RESET = 0
BOLD = 1
ITALIC = 3
UNDERLINE = 4
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
class Logger:
def __init__(self, tag: str = "", styles: list[int] = None) -> None:
self.tag: str = tag
if styles is None:
styles = []
self.styles: list[int] = styles
def log(self, msg: str):
style = ANSI.CTRL + ";".join(map(str, self.styles)) + "m"
reset = ANSI.CTRL + str(ANSI.RESET) + "m"
tag = f"[{self.tag}] " if self.tag else ""
print(style + tag + msg + reset)
infoLogger = Logger("INFO", [ANSI.BLUE+60])
warningLogger = Logger("WARNING", [ANSI.YELLOW])
errorLogger = Logger("ERROR", [ANSI.BOLD, ANSI.RED])
vpnLogger = Logger("", [ANSI.ITALIC, ANSI.BLACK+60])
def getPublicIp():
return requests.head("https://wikipedia.org").headers["X-Client-IP"]
if getPublicIp() == "153.109.1.93":
warningLogger.log("You are already in the HEI network. You cannot use the VPN")
exit()
2024-03-18 17:56:11 +00:00
IS_SNAP = os.path.isfile("/snap/bin/firefox") and os.access("/snap/bin/firefox", os.X_OK)
2024-03-18 16:25:18 +00:00
DRIVER_PATH = "/snap/bin/geckodriver" if IS_SNAP else "/usr/local/bin/geckodriver"
if IS_SNAP:
infoLogger.log("Firefox installed as snap")
ROOT = "/tmp/snap-private-tmp/snap.firefox/" if IS_SNAP else "/"
APP_PATH = os.path.abspath(os.path.expanduser("~/.config/hei-vpn"))
PERSISTENT_PROFILE_PATH = os.path.join(APP_PATH, "firefox_profile/")
if not os.path.isdir(APP_PATH):
os.mkdir(APP_PATH)
class AnyEc:
""" Use with WebDriverWait to combine expected_conditions
in an OR.
"""
def __init__(self, *args):
self.ecs = args
def __call__(self, driver):
for fn in self.ecs:
try:
res = fn(driver)
if res:
return True
except:
pass
def startBrowser(headless: bool = True) -> tuple[Firefox, FirefoxProfile]:
infoLogger.log("Loading browser...")
profilePath = PERSISTENT_PROFILE_PATH
if not os.path.isdir(profilePath):
dataDir = "~/snap/firefox/common/.mozilla/firefox" if IS_SNAP else "~/.mozilla/firefox"
dataDir = os.path.expanduser(dataDir)
dirs = os.listdir(dataDir)
for d in dirs:
if ".default" in d:
profilePath = os.path.join(dataDir, d)
if d.endswith(".default-release"):
break
else:
2024-03-18 17:56:11 +00:00
if profilePath == PERSISTENT_PROFILE_PATH:
warningLogger.log("Default firefox profile not found. Please create one manually")
exit()
2024-03-18 16:25:18 +00:00
infoLogger.log(f"Using firefox profile in {profilePath}")
options = FirefoxOptions()
2024-03-18 17:56:11 +00:00
profile = FirefoxProfile(profilePath)
2024-03-18 16:25:18 +00:00
options.profile = profile
distinguishkey = "-distinguishkey" + str(random.randint(111111, 999999))
options.add_argument(distinguishkey)
if headless:
options.add_argument("--headless")
2024-03-18 17:56:11 +00:00
service = FirefoxService(executable_path=DRIVER_PATH, log_output="/dev/null")
2024-03-18 16:25:18 +00:00
driver = Firefox(options, service)
return driver, distinguishkey
def closeBrowser(driver: Firefox, distinguishkey: str) -> None:
infoLogger.log("Closing browser...")
for pid in psutil.pids():
try:
cmdline = open("/proc/"+str(pid)+"/cmdline", "r").read()
if distinguishkey in cmdline:
realProfilePath = cmdline.split('-profile')[1].split(' ')[0].replace('\x00', '')
realProfilePath = os.path.abspath(os.path.join(ROOT, "./" + realProfilePath))
break
except:
pass
else:
errorLogger.log("Cannot find firefox pid")
exit()
infoLogger.log(f"Profile is stored in {realProfilePath}")
2024-03-18 17:56:11 +00:00
psutil.Process(pid).kill() # kill firefox (nicely) and unlock profile lock
2024-03-18 16:25:18 +00:00
if os.path.isdir(PERSISTENT_PROFILE_PATH):
shutil.rmtree(PERSISTENT_PROFILE_PATH)
# Copy profile
2024-03-18 17:56:11 +00:00
r = subprocess.run(["sudo", "cp", "-r", realProfilePath, PERSISTENT_PROFILE_PATH])
if r.returncode:
errorLogger.log("Could not copy temporary profile to persistent directory")
errorLogger.log(f"Tried: sudo cp -r {realProfilePath} {PERSISTENT_PROFILE_PATH}")
exit()
# Remove lock (if still present)
lockPath = os.path.join(PERSISTENT_PROFILE_PATH, "lock")
if os.path.isfile(lockPath):
subprocess.run(["sudo", "rm", lockPath])
2024-03-18 16:25:18 +00:00
# Grant user access to directory
subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", PERSISTENT_PROFILE_PATH])
subprocess.run(["sudo", "chmod", "-R", "0644", PERSISTENT_PROFILE_PATH])
2024-03-18 16:25:18 +00:00
subprocess.run(["sudo", "chmod", "-R", "+X", PERSISTENT_PROFILE_PATH])
try:
driver.quit()
except:
pass # error expected because we killed the processe
# Cleanup
if os.path.isdir(realProfilePath):
# Extra security to avoid deleting / recursively :)
# Costs nothing to be extra safe
if realProfilePath.startswith("/tmp/") and \
realProfilePath.startswith(os.path.abspath(os.path.join(ROOT, "tmp"))) and \
len(realProfilePath.split(os.path.sep)) >= 2 and \
".." not in realProfilePath:
subprocess.run(["sudo", "rm", "-r", realProfilePath])
2024-03-18 16:25:18 +00:00
def doLogin() -> None:
print("+--------------------------+")
print("| Prompting user for login |")
print("+--------------------------+")
driver, dkey = startBrowser(False)
driver.get(config["gateway"])
wait = WebDriverWait(driver, 9999)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")))
closeBrowser(driver, dkey)
with open("config.json", "r") as f:
config = json.load(f)
url = config["gateway"]
timeout = config["timeout"]
print("+-------------------+")
print("| Accessing gateway |")
print("+-------------------+")
driver, dkey = startBrowser()
infoLogger.log("Waiting for page to load...")
driver.get(url)
wait = WebDriverWait(driver, timeout)
wait.until(AnyEc(
EC.url_contains("microsoft"),
EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]"))
))
if "microsoft" in driver.current_url:
infoLogger.log("Detected login page")
closeBrowser(driver, dkey)
doLogin()
driver, dkey = startBrowser()
driver.get(url)
wait = WebDriverWait(driver, timeout)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt=Logo],img[alt=logo]")))
driver.implicitly_wait(0)
continueBtn = driver.find_elements(By.ID, "btnContinue")
driver.implicitly_wait(3)
if len(continueBtn) > 0:
continueBtn = continueBtn[0]
infoLogger.log("Detected session already running button")
continueBtn.click()
wait.until(EC.presence_of_element_located((By.ID, "table_welcome_2")))
infoLogger.log("Extracting cookies...")
DSID = driver.get_cookie("DSID")["value"]
infoLogger.log(f"DSID = {DSID}")
closeBrowser(driver, dkey)
if re.match(r"[a-f0-9]{32}", DSID):
infoLogger.log("Starting VPN...")
proc = subprocess.Popen(["sudo", "openconnect", "--protocol=nc", "-C", f"DSID={DSID}", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
while True:
line = proc.stdout.readline().decode("utf-8").strip()
if not line: break
vpnLogger.log(line)
if line == "ESP session established with server":
infoLogger.log("VPN is up and running")
except KeyboardInterrupt:
2024-03-18 17:56:11 +00:00
print()
2024-03-18 16:25:18 +00:00
proc.send_signal(signal.SIGINT)
proc.wait()
if proc.returncode == 0:
infoLogger.log("VPN stopped")
else:
errorLogger.log(f"VPN stopped unexpectedly (code: {proc.returncode})")
else:
errorLogger.log(f"Invalid DSID cookie format: {DSID}")