import os import subprocess import pty import json import sqlite3 import paramiko from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles app = FastAPI(title="Pi-Orchestrator Master") templates = Jinja2Templates(directory="templates") # Pfade & Konstanten SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" # --- Datenbank Setup --- def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS nodes (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT)''') conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def ensure_ssh_key(): if not os.path.exists(SSH_KEY): subprocess.run(["ssh-keygen", "-t", "rsa", "-N", "", "-f", SSH_KEY], check=True) # --- SSH & Command Logic --- def run_ssh_cmd(ip, user, cmd): """Nutzt den SSH-Key (kein Passwort nötig nach Deployment)""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(ip, username=user, key_filename=SSH_KEY, timeout=10) stdin, stdout, stderr = ssh.exec_command(f"sudo -n {cmd}") output = stdout.read().decode() ssh.close() return output except Exception as e: return f"Fehler auf {ip}: {str(e)}" # --- Routes --- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.post("/add_node") async def add_node(name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...), background_tasks: BackgroundTasks = None): # 1. In Datenbank speichern conn = get_db() try: conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Setup läuft...")) conn.commit() except sqlite3.IntegrityError: return {"error": "Node existiert bereits"} finally: conn.close() # 2. Key Deployment & Installation im Hintergrund starten (wird via WebSocket geloggt) # Da wir hier noch kein WebSocket-Objekt haben, triggern wir den Prozess # und der User sieht den Status im UI. return RedirectResponse(url="/", status_code=303) # --- WebSockets --- @app.websocket("/ws/install_logs") async def install_logs_endpoint(websocket: WebSocket): await websocket.accept() # Hier können wir später spezifische Setup-Prozesse streamen await websocket.send_text("System-Log: Bereit für Installationen...") @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): await websocket.accept() while True: user_msg = await websocket.receive_text() user_msg_lower = user_msg.lower() conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() if "installiere docker" in user_msg_lower or "installiere ollama" in user_msg_lower: target_node = None for node in nodes: if node['name'].lower() in user_msg_lower or node['ip'] in user_msg_lower: target_node = node break if target_node: await websocket.send_text(f"🤖 Starte Installation auf {target_node['name']}...") # Beispiel: Docker Installation via SSH-Key cmd = "curl -sSL https://get.docker.com | sh" result = run_ssh_cmd(target_node['ip'], target_node['user'], cmd) await websocket.send_text(f"✅ Fertig auf {target_node['name']}: {result[:50]}...") else: await websocket.send_text("🤖 Welchen Pi meinst du? Ich kenne: " + ", ".join([n['name'] for n in nodes])) else: await websocket.send_text(f"🤖 Ich habe '{user_msg}' erhalten. Wie kann ich helfen?") # --- Terminal (Konzept) --- @app.websocket("/ws/terminal/{ip}") async def terminal_websocket(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() conn.close() if not node: await websocket.send_text("Node nicht gefunden.") await websocket.close() return # Startet SSH Prozess für das Terminal cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"{node['user']}@{ip}"] (master_fd, slave_fd) = pty.openpty() process = subprocess.Popen(cmd, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, text=True) # Hinweis: Für ein voll funktionsfähiges xterm.js Terminal müssten hier # master_fd und websocket in einer Schleife (select) verbunden werden. await websocket.send_text(f"--- Shell auf {node['name']} geöffnet ---") if __name__ == "__main__": import uvicorn ensure_ssh_key() uvicorn.run(app, host="0.0.0.0", port=8000)