import os import pty import fcntl import subprocess import sqlite3 import asyncio import time import re from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect from fastapi.responses import RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles DB_PATH = "cluster.db" SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") app = FastAPI() static_path = os.path.join(os.path.dirname(__file__), "static") app.mount("/static", StaticFiles(directory=static_path), name="static") templates = Jinja2Templates(directory="templates") # ----------------------------- # DATABASE # ----------------------------- def init_db(): conn = sqlite3.connect(DB_PATH) conn.execute(""" CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, ssh_password TEXT, sudo_password TEXT, os TEXT, architecture TEXT, docker_installed INTEGER DEFAULT 0, vnc_available INTEGER DEFAULT 0, last_seen INTEGER, status TEXT ) """) conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # ----------------------------- # NODE DISCOVERY # ----------------------------- async def detect_node_info(ip, user): cmds = { "os": "grep '^ID=' /etc/os-release 2>/dev/null | cut -d= -f2", "arch": "uname -m", "docker": "command -v docker >/dev/null 2>&1 && echo 1 || echo 0", "vnc": "pgrep -f vnc >/dev/null 2>&1 && echo 1 || echo 0" } results = {} for key, cmd in cmds.items(): ssh_cmd = f"ssh -o ConnectTimeout=3 -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" try: out = subprocess.check_output(ssh_cmd, shell=True).decode().strip() except: out = "" results[key] = out return results # ----------------------------- # NODE MONITOR # ----------------------------- async def node_monitor_loop(): while True: conn = get_db() nodes = conn.execute("SELECT * FROM nodes").fetchall() for node in nodes: ip = node["ip"] user = node["user"] try: ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" subprocess.check_output(ssh_cmd, shell=True) conn.execute( "UPDATE nodes SET status=?, last_seen=? WHERE id=?", ("Online", int(time.time()), node["id"]) ) except: conn.execute( "UPDATE nodes SET status=? WHERE id=?", ("Offline", node["id"]) ) conn.commit() conn.close() await asyncio.sleep(60) @app.on_event("startup") async def start_monitor(): asyncio.create_task(node_monitor_loop()) # ----------------------------- # SSH BOOTSTRAP # ----------------------------- async def bootstrap_node(ip, user, password): ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" subprocess.call(ssh_copy_cmd, shell=True) info = await detect_node_info(ip, user) conn = get_db() conn.execute( """ UPDATE nodes SET os=?, architecture=?, docker_installed=?, vnc_available=?, status=? WHERE ip=? """, ( info["os"], info["arch"], int(info["docker"] or 0), int(info["vnc"] or 0), "Online", ip ) ) conn.commit() conn.close() # ----------------------------- # ROUTES # ----------------------------- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute("SELECT * FROM nodes").fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.post("/add_node") async def add_node( background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...) ): conn = get_db() conn.execute( """ INSERT INTO nodes (name, ip, user, ssh_password, status) VALUES (?, ?, ?, ?, ?) """, (name, ip, user, password, "Connecting") ) conn.commit() conn.close() background_tasks.add_task(bootstrap_node, ip, user, password) return RedirectResponse("/", 303) @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): conn = get_db() conn.execute("DELETE FROM nodes WHERE id=?", (node_id,)) conn.commit() conn.close() return RedirectResponse("/", 303) @app.get("/refresh_status/{node_id}") async def refresh_status(node_id: int): conn = get_db() node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() if not node: return JSONResponse({"status": "Unknown"}) ip = node["ip"] user = node["user"] try: ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" subprocess.check_output(ssh_cmd, shell=True) status = "Online" except: status = "Offline" conn.execute("UPDATE nodes SET status=? WHERE id=?", (status, node_id)) conn.commit() conn.close() return {"status": status} # ----------------------------- # NODE EDIT # ----------------------------- @app.get("/node/{node_id}") async def get_node(node_id: int): conn = get_db() node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() conn.close() if not node: return JSONResponse({}) return dict(node) @app.post("/update_node/{node_id}") async def update_node( node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...) ): conn = get_db() conn.execute( """ UPDATE nodes SET name=?, ip=?, user=? WHERE id=? """, (name, ip, user, node_id) ) conn.commit() conn.close() return RedirectResponse("/", 303) # ----------------------------- # TERMINAL # ----------------------------- @app.websocket("/ws/terminal/{ip}") async def terminal(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db() node = conn.execute("SELECT * FROM nodes WHERE ip=?", (ip,)).fetchone() conn.close() if not node: await websocket.close() return master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd ) async def pty_to_ws(): fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) while True: await asyncio.sleep(0.01) try: data = os.read(master_fd, 1024).decode(errors="ignore") if data: await websocket.send_text(data) except: pass async def ws_to_pty(): try: while True: data = await websocket.receive_text() os.write(master_fd, data.encode()) except: pass await asyncio.gather(pty_to_ws(), ws_to_pty()) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)