import os import pty import fcntl import subprocess import sqlite3 import asyncio import openai import re import httpx from google import genai from google.genai import types import json from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect from fastapi.responses import RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from dotenv import load_dotenv, set_key # Lade Umgebungsvariablen load_dotenv() app = FastAPI() static_path = os.path.join(os.path.dirname(__file__), "static") app.mount("/static", StaticFiles(directory=static_path), name="static") templates = Jinja2Templates(directory="templates") SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" chat_history = [] PROMPT_FILE = "system_prompt.txt" ENV_FILE = os.path.join(os.path.dirname(__file__), ".env") # KI KONFIGURATION AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1") GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.0-flash") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3") # --- DATENBANK INITIALISIERUNG (ERWEITERT) --- def init_db(): conn = sqlite3.connect(DB_PATH) # Spalten erweitert um sudo_password, os, arch, docker_installed conn.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, sudo_password TEXT, os TEXT DEFAULT 'Unbekannt', arch TEXT DEFAULT 'Unbekannt', docker_installed INTEGER DEFAULT 0, status TEXT ) ''') conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_system_prompt(): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() node_info = "" for n in nodes: docker_str = "Ja" if n['docker_installed'] else "Nein" node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n" if os.path.exists(PROMPT_FILE): with open(PROMPT_FILE, "r", encoding="utf-8") as f: template = f.read() else: template = "Du bist ein Cluster-Orchestrator. Nodes:\n{node_info}\nBefehle via cmd" print(f"⚠️ Warnung: {PROMPT_FILE} fehlt.") return template.replace("{node_info}", node_info) # --- KI LOGIK (UNVERÄNDERT) --- async def get_ai_response(user_input, system_prompt): global chat_history chat_history.append({"role": "user", "content": user_input}) chat_history = chat_history[-30:] ai_msg = "" try: if AI_PROVIDER in ["openai", "ollama"]: url = OLLAMA_BASE_URL if AI_PROVIDER == "ollama" else None if url and not url.endswith('/v1'): url = url.rstrip('/') + '/v1' key = "ollama" if AI_PROVIDER == "ollama" else OPENAI_API_KEY model_to_use = OLLAMA_MODEL if AI_PROVIDER == "ollama" else OPENAI_MODEL client = openai.OpenAI(base_url=url, api_key=key) response = client.chat.completions.create(model=model_to_use, messages=[{"role": "system", "content": system_prompt}] + chat_history) ai_msg = response.choices[0].message.content elif AI_PROVIDER == "google": client = genai.Client(api_key=GOOGLE_API_KEY) google_history = [types.Content(role="user" if m["role"] == "user" else "model", parts=[types.Part.from_text(text=msg["content"])]) for msg in chat_history[:-1]] chat = client.chats.create(model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history) response = chat.send_message(user_input) ai_msg = response.text except Exception as e: ai_msg = f"KI Fehler: {e}" chat_history.append({"role": "assistant", "content": ai_msg}) return ai_msg # --- WebSocket Manager --- class ConnectionManager: def __init__(self): self.active_connections = [] async def connect(self, ws: WebSocket): await ws.accept(); self.active_connections.append(ws) def disconnect(self, ws: WebSocket): self.active_connections.remove(ws) async def broadcast(self, msg: str): for c in self.active_connections: try: await c.send_text(msg) except: pass manager = ConnectionManager() # --- ERWEITERTES NODE BOOTSTRAPPING (Inventur) --- async def bootstrap_node(ip, user, password): await manager.broadcast(f"🔑 SSH-Handshake für {ip}...") # 1. Key kopieren ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" proc = subprocess.run(ssh_copy_cmd, shell=True, capture_output=True, text=True) if proc.returncode != 0: await manager.broadcast(f"❌ Fehler beim Key-Copy für {ip}: {proc.stderr}") return # 2. System-Infos abrufen (Inventur) await manager.broadcast(f"🔍 Inventur auf {ip} wird durchgeführt...") # Befehlskette: Arch, OS-Name, Docker-Check inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')" ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} \"{inspect_cmd}\"" try: output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') arch = output[0] if len(output) > 0 else "Unbekannt" # Mapping für Architektur if "aarch64" in arch.lower(): arch = "arm64" elif "x86_64" in arch.lower(): arch = "x86-64" os_name = output[1].replace('"', '') if len(output) > 1 else "Linux" docker_val = int(output[2]) if len(output) > 2 else 0 status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)" # Datenbank Update conn = get_db() conn.execute(''' UPDATE nodes SET os = ?, arch = ?, docker_installed = ?, status = ? WHERE ip = ? ''', (os_name, arch, docker_val, status, ip)) conn.commit() conn.close() await manager.broadcast(f"✅ Node {ip} konfiguriert ({os_name}, {arch}).") except Exception as e: await manager.broadcast(f"⚠️ Inventur auf {ip} unvollständig: {e}") # --- ROUTES --- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.get("/api/node/{node_id}") async def get_node(node_id: int): conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() conn.close() return dict(node) if node else {} @app.post("/add_node") async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): conn = get_db() try: # Speichere Initialdaten inkl. Sudo-Passwort conn.execute(''' INSERT INTO nodes (name, ip, user, sudo_password, status) VALUES (?, ?, ?, ?, ?) ''', (name, ip, user, password, "Kopplung...")) conn.commit() background_tasks.add_task(bootstrap_node, ip, user, password) except sqlite3.IntegrityError: pass finally: conn.close() return RedirectResponse(url="/", status_code=303) @app.post("/edit_node/{node_id}") async def edit_node(node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...)): conn = get_db() conn.execute('UPDATE nodes SET name=?, ip=?, user=? WHERE id=?', (name, ip, user, node_id)) conn.commit() conn.close() return RedirectResponse(url="/", status_code=303) @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): conn = get_db() conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) conn.commit() conn.close() return RedirectResponse(url="/", status_code=303) @app.get("/refresh_status/{node_id}") async def refresh_status(node_id: int): conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() if not node: return {"status": "Offline"} inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')" ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 {node['user']}@{node['ip']} \"{inspect_cmd}\"" try: output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') arch = output[0]; os_name = output[1].replace('"', ''); docker_val = int(output[2]) if "aarch64" in arch.lower(): arch = "arm64" elif "x86_64" in arch.lower(): arch = "x86-64" new_status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)" conn.execute('UPDATE nodes SET status=?, os=?, arch=?, docker_installed=? WHERE id=?', (new_status, os_name, arch, docker_val, node_id)) conn.commit() result = {"status": new_status, "os": os_name, "arch": arch, "docker": docker_val} except: new_status = "Offline" conn.execute('UPDATE nodes SET status=? WHERE id=?', (new_status, node_id)) conn.commit() result = {"status": new_status, "os": node['os'], "arch": node['arch'], "docker": node['docker_installed']} conn.close() return result # --- WebSockets Terminal / Chat / Logs (Integration wie gehabt) --- @app.websocket("/ws/install_logs") async def log_websocket(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/terminal/{ip}") async def terminal_websocket(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db(); node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone(); conn.close() if not node: await websocket.close(); return master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec("ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd) async def pty_to_ws(): fl = fcntl.fcntl(master_fd, fcntl.F_GETFL); fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: while True: await asyncio.sleep(0.01) try: data = os.read(master_fd, 1024).decode(errors='ignore') if data: await websocket.send_text(data) except BlockingIOError: continue except: pass async def ws_to_pty(): try: while True: data = await websocket.receive_text(); os.write(master_fd, data.encode()) except: pass try: await asyncio.gather(pty_to_ws(), ws_to_pty()) finally: if proc.returncode is None: proc.terminate() os.close(master_fd); os.close(slave_fd) @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): await websocket.accept() try: while True: user_msg = await websocket.receive_text() ai_response = await get_ai_response(user_msg, get_system_prompt()) commands = re.findall(r'(.*?)', ai_response, re.I | re.S) clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip() if clean_msg: await websocket.send_text(clean_msg) if commands: tasks = [] for target, cmd in commands: conn = get_db(); n = conn.execute('SELECT * FROM nodes WHERE ip=? OR name=?', (target.strip(), target.strip())).fetchone(); conn.close() if n: tasks.append(run_remote_task(n['ip'], n['user'], cmd.strip())) if tasks: await websocket.send_text("ℹ️ *Führe Befehle aus...*") await asyncio.gather(*tasks) summary = await get_ai_response("Zusammenfassung der Ergebnisse?", get_system_prompt()) await websocket.send_text(f"--- Info ---\n{summary}") except: pass async def run_remote_task(ip, user, cmd): await manager.broadcast(f"🚀 Task: {cmd} auf {ip}") proc = await asyncio.create_subprocess_shell(f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) full_output = "" while True: line = await proc.stdout.readline() if not line: break out = line.decode('utf-8', errors='ignore').strip() if out: await manager.broadcast(f"🛠️ {out}"); full_output += out + "\n" await proc.wait() chat_history.append({"role": "user", "content": f"[SYSTEM] Befehl '{cmd}' auf {ip} fertig:\n{full_output or 'Kein Output'}"}) # --- Settings API --- @app.get("/api/settings") async def get_settings(): return {"provider": AI_PROVIDER, "google_model": GOOGLE_MODEL, "openai_model": OPENAI_MODEL, "ollama_model": OLLAMA_MODEL, "ollama_base_url": OLLAMA_BASE_URL} @app.post("/api/settings") async def update_settings(request: Request): global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL data = await request.json() provider = data.get("provider") if provider: AI_PROVIDER = provider; set_key(ENV_FILE, "AI_PROVIDER", provider) if data.get("model"): m = data.get("model") if provider == "google": GOOGLE_MODEL = m; set_key(ENV_FILE, "GOOGLE_MODEL", m) if provider == "openai": OPENAI_MODEL = m; set_key(ENV_FILE, "OPENAI_MODEL", m) if provider == "ollama": OLLAMA_MODEL = m; set_key(ENV_FILE, "OLLAMA_MODEL", m) if data.get("ollama_base_url"): u = data.get("ollama_base_url"); OLLAMA_BASE_URL = u; set_key(ENV_FILE, "OLLAMA_BASE_URL", u) return {"status": "success"} @app.get("/api/models") async def get_models(provider: str): try: if provider == "ollama": url = OLLAMA_BASE_URL.replace("/v1", "").rstrip("/") async with httpx.AsyncClient() as client: r = await client.get(f"{url}/api/tags", timeout=5.0) return {"models": [m["name"] for m in r.json().get("models", [])]} elif provider == "openai": client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) r = await client.models.list() return {"models": sorted([m.id for m in r.data if "gpt" in m.id or "o1" in m.id])} elif provider == "google": client = genai.Client(api_key=GOOGLE_API_KEY) return {"models": sorted([m.name.replace("models/", "") for m in client.models.list() if 'generateContent' in m.supported_actions])} except: return {"models": []} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)