import os import pty import fcntl import subprocess import sqlite3 import asyncio import openai import re from google import genai from google.genai import types import json from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect from fastapi.responses import RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from dotenv import load_dotenv, set_key # Lade Umgebungsvariablen aus der .env Datei load_dotenv() app = FastAPI() static_path = os.path.join(os.path.dirname(__file__), "static") app.mount("/static", StaticFiles(directory=static_path), name="static") templates = Jinja2Templates(directory="templates") SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" chat_history = [] PROMPT_FILE = "system_prompt.txt" ENV_FILE = os.path.join(os.path.dirname(__file__), ".env") # NEU # --- KI KONFIGURATION (Werte aus .env laden) --- AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1") # Modelle aus .env laden (mit Standardwerten als Fallback) GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3") def get_system_prompt(): # 1. Node Info aus DB holen conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() node_info = "" for n in nodes: node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}\n" # 2. Versuche den Prompt aus der Datei zu laden if os.path.exists(PROMPT_FILE): with open(PROMPT_FILE, "r", encoding="utf-8") as f: template = f.read() else: # Fallback falls Datei fehlt template = "Du bist ein Helfer. Nodes:\n{node_info}\nNutze cmd" print(f"⚠️ Warnung: {PROMPT_FILE} nicht gefunden. Nutze Fallback.") return template.replace("{node_info}", node_info) # --- KI FUNKTIONEN --- async def get_ai_response(user_input, system_prompt): global chat_history # 1. Die neue User-Nachricht dem Gedächtnis hinzufügen chat_history.append({"role": "user", "content": user_input}) # 2. Gedächtnis auf die letzten 30 Nachrichten begrenzen chat_history = chat_history[-30:] ai_msg = "" try: if AI_PROVIDER == "openai" or AI_PROVIDER == "ollama": messages = [{"role": "system", "content": system_prompt}] + chat_history # Sicherstellen, dass die URL für Ollama korrekt endet if AI_PROVIDER == "ollama": url = OLLAMA_BASE_URL if not url.endswith('/v1') and not url.endswith('/v1/'): url = url.rstrip('/') + '/v1' key = "ollama" model_to_use = OLLAMA_MODEL else: url = None # Benutzt Standard OpenAI URL key = OPENAI_API_KEY model_to_use = OPENAI_MODEL client = openai.OpenAI(base_url=url, api_key=key) response = client.chat.completions.create( model=model_to_use, messages=messages ) ai_msg = response.choices[0].message.content elif AI_PROVIDER == "google": # Für Google Gemini if not GOOGLE_API_KEY: return "Fehler: GOOGLE_API_KEY fehlt in der .env Datei!" client = genai.Client(api_key=GOOGLE_API_KEY) # Wir müssen unser Array in das spezielle Google-Format umwandeln google_history = [] # Alle Nachrichten AUSSER der allerletzten (die aktuelle User-Frage) in die History packen for msg in chat_history[:-1]: role = "user" if msg["role"] == "user" else "model" google_history.append( types.Content(role=role, parts=[types.Part.from_text(text=msg["content"])]) ) # Chat MIT dem übersetzten Gedächtnis starten chat = client.chats.create( model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history ) # Jetzt erst die neue Nachricht an den Chat mit Gedächtnis schicken response = chat.send_message(user_input) ai_msg = response.text except Exception as e: ai_msg = f"Fehler bei der KI-Anfrage: {e}" print(f"KI Fehler: {e}") # 3. Die Antwort der KI ebenfalls ins Gedächtnis aufnehmen chat_history.append({"role": "assistant", "content": ai_msg}) return ai_msg # --- DATENBANK INITIALISIERUNG --- def init_db(): conn = sqlite3.connect(DB_PATH) conn.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT ) ''') conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # --- WebSocket Manager für Logs & Chat --- class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: try: await connection.send_text(message) except: pass manager = ConnectionManager() # --- SSH Handshake (Nur Key kopieren) --- async def bootstrap_ssh_only(ip, user, password): await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...") # Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"SSH: {line.strip()}") conn = get_db() conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,)) conn.commit() conn.close() await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.") # --- Routen --- @app.get("/") async def index(request: Request): conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) @app.post("/add_node") async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): conn = get_db() try: conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung...")) conn.commit() background_tasks.add_task(bootstrap_ssh_only, ip, user, password) except sqlite3.IntegrityError: pass finally: conn.close() return RedirectResponse(url="/", status_code=303) @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): conn = get_db() conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) conn.commit() conn.close() return RedirectResponse(url="/", status_code=303) @app.get("/api/settings") async def get_settings(): return { "provider": AI_PROVIDER, "google_model": GOOGLE_MODEL, "openai_model": OPENAI_MODEL, "ollama_model": OLLAMA_MODEL, "ollama_base_url": OLLAMA_BASE_URL # URL ans Frontend schicken } @app.post("/api/settings") async def update_settings(request: Request): # WICHTIG: OLLAMA_BASE_URL als global deklarieren global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL data = await request.json() provider = data.get("provider") model = data.get("model") ollama_url = data.get("ollama_base_url") # URL vom Frontend empfangen if provider: AI_PROVIDER = provider set_key(ENV_FILE, "AI_PROVIDER", provider) if provider == "google" and model: GOOGLE_MODEL = model set_key(ENV_FILE, "GOOGLE_MODEL", model) elif provider == "openai" and model: OPENAI_MODEL = model set_key(ENV_FILE, "OPENAI_MODEL", model) elif provider == "ollama" and model: OLLAMA_MODEL = model set_key(ENV_FILE, "OLLAMA_MODEL", model) # Wenn eine Ollama-URL mitgeschickt wurde, speichern wir sie if ollama_url: OLLAMA_BASE_URL = ollama_url set_key(ENV_FILE, "OLLAMA_BASE_URL", ollama_url) return {"status": "success"} # --- WebSockets --- @app.websocket("/ws/install_logs") async def log_websocket(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/terminal/{ip}") async def terminal_websocket(websocket: WebSocket, ip: str): await websocket.accept() conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() conn.close() if not node: await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n") await websocket.close() return # Pseudo-Terminal für interaktive SSH-Session master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd ) async def pty_to_ws(): # Setzt den Master-FD auf non-blocking fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: while True: await asyncio.sleep(0.01) try: data = os.read(master_fd, 1024).decode(errors='ignore') if data: await websocket.send_text(data) except BlockingIOError: continue except Exception: pass async def ws_to_pty(): try: while True: data = await websocket.receive_text() os.write(master_fd, data.encode()) except Exception: pass try: await asyncio.gather(pty_to_ws(), ws_to_pty()) finally: if proc.returncode is None: proc.terminate() os.close(master_fd) os.close(slave_fd) # --- WEBSOCKET CHAT UPDATE --- @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): await websocket.accept() # Check ob Key vorhanden ist if AI_PROVIDER == "google" and not GOOGLE_API_KEY: await websocket.send_text("⚠️ **Konfigurationsfehler:** Kein GOOGLE_API_KEY in der `.env` gefunden!") elif AI_PROVIDER == "openai" and not OPENAI_API_KEY: await websocket.send_text("⚠️ **Konfigurationsfehler:** Kein OPENAI_API_KEY in der `.env` gefunden!") try: while True: user_msg = await websocket.receive_text() sys_prompt = get_system_prompt() ai_response = await get_ai_response(user_msg, sys_prompt) # Befehle extrahieren commands_to_run = re.findall(r'(.*?)', ai_response, re.IGNORECASE | re.DOTALL) clean_chat_msg = re.sub(r'.*?', '', ai_response, flags=re.IGNORECASE | re.DOTALL).strip() if clean_chat_msg: await websocket.send_text(clean_chat_msg) if commands_to_run: # Liste für alle laufenden Tasks erstellen tasks = [] for target_ip, cmd in commands_to_run: conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE ip = ? OR name = ?', (target_ip.strip(), target_ip.strip())).fetchone() conn.close() if node: # Wir erstellen den Task, starten ihn aber noch nicht separat tasks.append(run_remote_task(node['ip'], node['user'], cmd.strip(), "KI-Kommando")) if tasks: # Dem Nutzer im Chat kurz Bescheid geben await websocket.send_text("ℹ️ *Warte auf Rückmeldungen der Nodes...*") # Jetzt werden alle SSH-Befehle gleichzeitig gestartet und abgewartet await asyncio.gather(*tasks) # Sobald asyncio.gather fertig ist, geht es hier weiter mit dem Follow-up: follow_up_prompt = "Die Befehle wurden ausgeführt. Bitte fasse die Ergebnisse kurz zusammen." ai_summary = await get_ai_response(follow_up_prompt, sys_prompt) await websocket.send_text("--- Ergebnis-Zusammenfassung ---") await websocket.send_text(ai_summary) except Exception as e: print(f"Chat Fehler: {e}") async def run_remote_task(ip, user, cmd, task_name): global chat_history # Zugriff auf das Gedächtnis der KI await manager.broadcast(f"🚀 KI-Task gestartet: {cmd} auf {ip}") ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" process = await asyncio.create_subprocess_shell( ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) # Hier speichern wir die komplette Terminal-Ausgabe full_output = "" while True: line = await process.stdout.readline() if not line: break decoded_line = line.decode('utf-8', errors='ignore').strip() if decoded_line: await manager.broadcast(f"🛠️ {decoded_line}") full_output += decoded_line + "\n" # Gib dem Event-Loop kurz Zeit, andere Tasks (wie WebSockets) zu bedienen await asyncio.sleep(0.001) await process.wait() # --- NEU: Feedback an die KI --- # Wir bereiten den Bericht vor if not full_output.strip(): full_output = "Befehl wurde ohne Ausgabe ausgeführt (Exit Code 0)." system_report = f"[SYSTEM-RÜCKMELDUNG] Der Befehl '{cmd}' auf Node {ip} wurde beendet. Ausgabe des Terminals:\n{full_output}" # Wir schmuggeln den Bericht als "User"-Nachricht in den Verlauf, # damit die KI beim nächsten Mal weiß, was passiert ist. chat_history.append({"role": "user", "content": system_report}) # ------------------------------- if "docker" in cmd.lower() and "install" in cmd.lower(): # Kleiner Bonus: Nur updaten, wenn wirklich installiert wird await manager.broadcast(f"✨ Bitte aktualisiere den Status für {ip} manuell über das Refresh-Icon.") await manager.broadcast(f"✅ Befehl auf {ip} abgeschlossen.") # --- Neuer Endpunkt: Manueller Refresh-Check --- @app.get("/refresh_status/{node_id}") async def refresh_status(node_id: int): conn = get_db() node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() if node: # Kurzer Check via SSH, ob Docker antwortet check_cmd = "command -v docker >/dev/null 2>&1 && echo 'Docker Aktiv' || echo 'Bereit (Kein Docker)'" ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=2 {node['user']}@{node['ip']} \"{check_cmd}\"" try: # Wir führen den Befehl aus new_status = subprocess.check_output(ssh_cmd, shell=True).decode().strip() except Exception: new_status = "Offline/Fehler" conn.execute('UPDATE nodes SET status = ? WHERE id = ?', (new_status, node_id)) conn.commit() conn.close() return {"status": new_status} # Wir senden nur den Status zurück conn.close() return {"status": "Unbekannt"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)