import os import pty import fcntl import subprocess import sqlite3 import asyncio from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect from fastapi.responses import RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" # --- DATENBANK INITIALISIERUNG --- def init_db(): conn = sqlite3.connect(DB_PATH) conn.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT ) ''') conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # --- WebSocket Manager für Logs & Chat --- class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: try: await connection.send_text(message) except: pass manager = ConnectionManager() # --- SSH Handshake (Nur Key kopieren) --- async def bootstrap_ssh_only(ip, user, password): await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...") # Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"SSH: {line.strip()}") conn = get_db() conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,)) conn.commit() conn.close() await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.") # --- Routen --- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.post("/add_node") async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): conn = get_db() try: conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung...")) conn.commit() background_tasks.add_task(bootstrap_ssh_only, ip, user, password) except sqlite3.IntegrityError: pass finally: conn.close() return RedirectResponse(url="/", status_code=303) @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): conn = get_db() conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) conn.commit() conn.close() return RedirectResponse(url="/", status_code=303) # --- WebSockets --- @app.websocket("/ws/install_logs") async def log_websocket(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/terminal/{ip}") async def terminal_websocket(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() conn.close() if not node: await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n") await websocket.close() return # Pseudo-Terminal für interaktive SSH-Session master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd ) async def pty_to_ws(): # Setzt den Master-FD auf non-blocking fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: while True: await asyncio.sleep(0.01) try: data = os.read(master_fd, 1024).decode(errors='ignore') if data: await websocket.send_text(data) except BlockingIOError: continue except Exception: pass async def ws_to_pty(): try: while True: data = await websocket.receive_text() os.write(master_fd, data.encode()) except Exception: pass try: await asyncio.gather(pty_to_ws(), ws_to_pty()) finally: if proc.returncode is None: proc.terminate() os.close(master_fd) os.close(slave_fd) @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): await websocket.accept() while True: user_msg = await websocket.receive_text() user_msg_lower = user_msg.lower() if "installiere docker" in user_msg_lower: conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() target = next((n for n in nodes if n['name'].lower() in user_msg_lower or n['ip'] in user_msg_lower), None) if target: await websocket.send_text(f"🤖 OK. Starte Docker-Installation auf {target['name']}...") cmd = "curl -sSL https://get.docker.com | sh && sudo usermod -aG docker " + target['user'] asyncio.create_task(run_remote_task(target['ip'], target['user'], cmd, "Docker Installation")) else: await websocket.send_text("🤖 Node nicht gefunden. Welchen meinst du?") else: await websocket.send_text(f"🤖 Empfangen: {user_msg}. Soll ich etwas installieren?") # --- Status in DB aktualisieren Helper --- def update_node_status(ip, new_status): conn = get_db() conn.execute('UPDATE nodes SET status = ? WHERE ip = ?', (new_status, ip)) conn.commit() conn.close() # --- Erweiterter Remote-Task --- async def run_remote_task(ip, user, cmd, task_name): await manager.broadcast(f"🚀 KI-Task: {task_name} auf {ip}") ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"🛠️ {line.strip()[:80]}") # WICHTIG: Wenn Docker installiert wurde, Status ändern! if "docker" in task_name.lower(): update_node_status(ip, "Docker Aktiv") await manager.broadcast(f"✨ Datenbank aktualisiert: {ip} ist nun 'Docker Aktiv'") await manager.broadcast(f"✅ {task_name} fertig.") # --- Neuer Endpunkt: Manueller Refresh-Check --- @app.get("/refresh_status/{node_id}") async def refresh_status(node_id: int): conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() conn.close() if node: # Kurzer Check via SSH check_cmd = "command -v docker >/dev/null 2>&1 && echo 'Docker Aktiv' || echo 'Bereit (Kein Docker)'" ssh_cmd = f"ssh -o StrictHostKeyChecking=no {node['user']}@{node['ip']} \"{check_cmd}\"" try: result = subprocess.check_output(ssh_cmd, shell=True, timeout=5).decode().strip() update_node_status(node['ip'], result) except: update_node_status(node['ip'], "Offline/Fehler") return RedirectResponse(url="/", status_code=303) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)