import os import pty import fcntl import subprocess import sqlite3 import asyncio from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect from fastapi.responses import RedirectResponse from fastapi.templating import Jinja2Templates app = FastAPI() templates = Jinja2Templates(directory="templates") SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" # --- DATENBANK INITIALISIERUNG --- def init_db(): conn = sqlite3.connect(DB_PATH) conn.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT ) ''') conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # --- WebSocket Manager für Logs & Chat --- class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: try: await connection.send_text(message) except: pass manager = ConnectionManager() # --- SSH Handshake (Nur Key kopieren) --- async def bootstrap_ssh_only(ip, user, password): await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...") # Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"SSH: {line.strip()}") conn = get_db() conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,)) conn.commit() conn.close() await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.") # --- Routen --- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.post("/add_node") async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): conn = get_db() try: conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung...")) conn.commit() background_tasks.add_task(bootstrap_ssh_only, ip, user, password) except sqlite3.IntegrityError: pass finally: conn.close() return RedirectResponse(url="/", status_code=303) @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): conn = get_db() conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) conn.commit() conn.close() return RedirectResponse(url="/", status_code=303) # --- WebSockets --- @app.websocket("/ws/install_logs") async def log_websocket(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/terminal/{ip}") async def terminal_websocket(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() conn.close() if not node: await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n") await websocket.close() return # Pseudo-Terminal für interaktive SSH-Session master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd ) async def pty_to_ws(): # Setzt den Master-FD auf non-blocking fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: while True: await asyncio.sleep(0.01) try: data = os.read(master_fd, 1024).decode(errors='ignore') if data: await websocket.send_text(data) except BlockingIOError: continue except Exception: pass async def ws_to_pty(): try: while True: data = await websocket.receive_text() os.write(master_fd, data.encode()) except Exception: pass try: await asyncio.gather(pty_to_ws(), ws_to_pty()) finally: if proc.returncode is None: proc.terminate() os.close(master_fd) os.close(slave_fd) @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): await websocket.accept() while True: user_msg = await websocket.receive_text() user_msg_lower = user_msg.lower() if "installiere docker" in user_msg_lower: conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() target = next((n for n in nodes if n['name'].lower() in user_msg_lower or n['ip'] in user_msg_lower), None) if target: await websocket.send_text(f"🤖 OK. Starte Docker-Installation auf {target['name']}...") cmd = "curl -sSL https://get.docker.com | sh && sudo usermod -aG docker " + target['user'] asyncio.create_task(run_remote_task(target['ip'], target['user'], cmd, "Docker Installation")) else: await websocket.send_text("🤖 Node nicht gefunden. Welchen meinst du?") else: await websocket.send_text(f"🤖 Empfangen: {user_msg}. Soll ich etwas installieren?") async def run_remote_task(ip, user, cmd, task_name): await manager.broadcast(f"🚀 KI-Task: {task_name} auf {ip}") ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"🛠️ {line.strip()[:80]}") await manager.broadcast(f"✅ {task_name} fertig.") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)