diff --git a/main.py b/main.py index d7fe784..3dc6712 100644 --- a/main.py +++ b/main.py @@ -4,491 +4,348 @@ import fcntl import subprocess import sqlite3 import asyncio -import openai +import time import re -import httpx -from google import genai -from google.genai import types -import json from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect -from fastapi.responses import RedirectResponse +from fastapi.responses import RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles -from dotenv import load_dotenv, set_key -# Lade Umgebungsvariablen aus der .env Datei -load_dotenv() +DB_PATH = "cluster.db" +SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") app = FastAPI() static_path = os.path.join(os.path.dirname(__file__), "static") app.mount("/static", StaticFiles(directory=static_path), name="static") templates = Jinja2Templates(directory="templates") -SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") -DB_PATH = "cluster.db" -chat_history = [] -PROMPT_FILE = "system_prompt.txt" -ENV_FILE = os.path.join(os.path.dirname(__file__), ".env") # NEU -# --- KI KONFIGURATION (Werte aus .env laden) --- -AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower() -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") -GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") -OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1") +# ----------------------------- +# DATABASE +# ----------------------------- -# Modelle aus .env laden (mit Standardwerten als Fallback) -GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash") -OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") -OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3") - -def get_system_prompt(): - # 1. Node Info aus DB holen - conn = get_db() - nodes = conn.execute('SELECT * FROM nodes').fetchall() - conn.close() - - node_info = "" - for n in nodes: - node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}\n" - - # 2. Versuche den Prompt aus der Datei zu laden - if os.path.exists(PROMPT_FILE): - with open(PROMPT_FILE, "r", encoding="utf-8") as f: - template = f.read() - else: - # Fallback falls Datei fehlt - template = "Du bist ein Helfer. Nodes:\n{node_info}\nNutze cmd" - print(f"⚠️ Warnung: {PROMPT_FILE} nicht gefunden. Nutze Fallback.") - - return template.replace("{node_info}", node_info) - -# --- KI FUNKTIONEN --- - -async def get_ai_response(user_input, system_prompt): - global chat_history - - # 1. Die neue User-Nachricht dem Gedächtnis hinzufügen - chat_history.append({"role": "user", "content": user_input}) - - # 2. Gedächtnis auf die letzten 30 Nachrichten begrenzen - chat_history = chat_history[-30:] - - ai_msg = "" - - try: - if AI_PROVIDER == "openai" or AI_PROVIDER == "ollama": - messages = [{"role": "system", "content": system_prompt}] + chat_history - - # Sicherstellen, dass die URL für Ollama korrekt endet - if AI_PROVIDER == "ollama": - url = OLLAMA_BASE_URL - if not url.endswith('/v1') and not url.endswith('/v1/'): - url = url.rstrip('/') + '/v1' - key = "ollama" - model_to_use = OLLAMA_MODEL - else: - url = None # Benutzt Standard OpenAI URL - key = OPENAI_API_KEY - model_to_use = OPENAI_MODEL - - client = openai.OpenAI(base_url=url, api_key=key) - response = client.chat.completions.create( - model=model_to_use, - messages=messages - ) - ai_msg = response.choices[0].message.content - - elif AI_PROVIDER == "google": - # Für Google Gemini - if not GOOGLE_API_KEY: - return "Fehler: GOOGLE_API_KEY fehlt in der .env Datei!" - - client = genai.Client(api_key=GOOGLE_API_KEY) - - # Wir müssen unser Array in das spezielle Google-Format umwandeln - google_history = [] - - # Alle Nachrichten AUSSER der allerletzten (die aktuelle User-Frage) in die History packen - for msg in chat_history[:-1]: - role = "user" if msg["role"] == "user" else "model" - google_history.append( - types.Content(role=role, parts=[types.Part.from_text(text=msg["content"])]) - ) - - # Chat MIT dem übersetzten Gedächtnis starten - chat = client.chats.create( - model=GOOGLE_MODEL, - config=types.GenerateContentConfig(system_instruction=system_prompt), - history=google_history - ) - - # Jetzt erst die neue Nachricht an den Chat mit Gedächtnis schicken - response = chat.send_message(user_input) - ai_msg = response.text - - except Exception as e: - ai_msg = f"Fehler bei der KI-Anfrage: {e}" - print(f"KI Fehler: {e}") - - # 3. Die Antwort der KI ebenfalls ins Gedächtnis aufnehmen - chat_history.append({"role": "assistant", "content": ai_msg}) - - return ai_msg - -# --- DATENBANK INITIALISIERUNG --- def init_db(): conn = sqlite3.connect(DB_PATH) - conn.execute(''' - CREATE TABLE IF NOT EXISTS nodes ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, - ip TEXT UNIQUE, - user TEXT, - status TEXT - ) - ''') + + conn.execute(""" + CREATE TABLE IF NOT EXISTS nodes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + ip TEXT UNIQUE, + user TEXT, + + ssh_password TEXT, + sudo_password TEXT, + + os TEXT, + architecture TEXT, + + docker_installed INTEGER DEFAULT 0, + vnc_available INTEGER DEFAULT 0, + + last_seen INTEGER, + status TEXT + ) + """) + conn.commit() conn.close() init_db() + def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn -# --- WebSocket Manager für Logs & Chat --- -class ConnectionManager: - def __init__(self): - self.active_connections: list[WebSocket] = [] - async def connect(self, websocket: WebSocket): - await websocket.accept() - self.active_connections.append(websocket) - def disconnect(self, websocket: WebSocket): - if websocket in self.active_connections: - self.active_connections.remove(websocket) - async def broadcast(self, message: str): - for connection in self.active_connections: +# ----------------------------- +# NODE DISCOVERY +# ----------------------------- + +async def detect_node_info(ip, user): + + cmds = { + "os": "grep '^ID=' /etc/os-release 2>/dev/null | cut -d= -f2", + "arch": "uname -m", + "docker": "command -v docker >/dev/null 2>&1 && echo 1 || echo 0", + "vnc": "pgrep -f vnc >/dev/null 2>&1 && echo 1 || echo 0" + } + + results = {} + + for key, cmd in cmds.items(): + ssh_cmd = f"ssh -o ConnectTimeout=3 -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" + + try: + out = subprocess.check_output(ssh_cmd, shell=True).decode().strip() + except: + out = "" + + results[key] = out + + return results + +# ----------------------------- +# NODE MONITOR +# ----------------------------- + +async def node_monitor_loop(): + + while True: + + conn = get_db() + nodes = conn.execute("SELECT * FROM nodes").fetchall() + + for node in nodes: + + ip = node["ip"] + user = node["user"] + try: - await connection.send_text(message) + + ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" + subprocess.check_output(ssh_cmd, shell=True) + + conn.execute( + "UPDATE nodes SET status=?, last_seen=? WHERE id=?", + ("Online", int(time.time()), node["id"]) + ) + except: - pass -manager = ConnectionManager() + conn.execute( + "UPDATE nodes SET status=? WHERE id=?", + ("Offline", node["id"]) + ) + + conn.commit() + conn.close() + + await asyncio.sleep(60) + + +@app.on_event("startup") +async def start_monitor(): + asyncio.create_task(node_monitor_loop()) + +# ----------------------------- +# SSH BOOTSTRAP +# ----------------------------- + +async def bootstrap_node(ip, user, password): -# --- SSH Handshake (Nur Key kopieren) --- -async def bootstrap_ssh_only(ip, user, password): - await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...") - # Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" - - process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) - for line in process.stdout: - await manager.broadcast(f"SSH: {line.strip()}") - + + subprocess.call(ssh_copy_cmd, shell=True) + + info = await detect_node_info(ip, user) + conn = get_db() - conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,)) + + conn.execute( + """ + UPDATE nodes + SET os=?, architecture=?, docker_installed=?, vnc_available=?, status=? + WHERE ip=? + """, + ( + info["os"], + info["arch"], + int(info["docker"] or 0), + int(info["vnc"] or 0), + "Online", + ip + ) + ) + conn.commit() conn.close() - await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.") -# --- Routen --- +# ----------------------------- +# ROUTES +# ----------------------------- @app.get("/") async def index(request: Request): + conn = get_db() - nodes = conn.execute('SELECT * FROM nodes').fetchall() + nodes = conn.execute("SELECT * FROM nodes").fetchall() conn.close() + return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) + @app.post("/add_node") -async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): +async def add_node( + background_tasks: BackgroundTasks, + name: str = Form(...), + ip: str = Form(...), + user: str = Form(...), + password: str = Form(...) +): + conn = get_db() - try: - conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung...")) - conn.commit() - background_tasks.add_task(bootstrap_ssh_only, ip, user, password) - except sqlite3.IntegrityError: pass - finally: conn.close() - return RedirectResponse(url="/", status_code=303) + + conn.execute( + """ + INSERT INTO nodes (name, ip, user, ssh_password, status) + VALUES (?, ?, ?, ?, ?) + """, + (name, ip, user, password, "Connecting") + ) + + conn.commit() + conn.close() + + background_tasks.add_task(bootstrap_node, ip, user, password) + + return RedirectResponse("/", 303) + @app.post("/remove_node/{node_id}") async def remove_node(node_id: int): + conn = get_db() - conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) + conn.execute("DELETE FROM nodes WHERE id=?", (node_id,)) conn.commit() conn.close() - return RedirectResponse(url="/", status_code=303) -@app.get("/api/settings") -async def get_settings(): - return { - "provider": AI_PROVIDER, - "google_model": GOOGLE_MODEL, - "openai_model": OPENAI_MODEL, - "ollama_model": OLLAMA_MODEL, - "ollama_base_url": OLLAMA_BASE_URL # URL ans Frontend schicken - } + return RedirectResponse("/", 303) -@app.post("/api/settings") -async def update_settings(request: Request): - # WICHTIG: OLLAMA_BASE_URL als global deklarieren - global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL - - data = await request.json() - provider = data.get("provider") - model = data.get("model") - ollama_url = data.get("ollama_base_url") # URL vom Frontend empfangen - - if provider: - AI_PROVIDER = provider - set_key(ENV_FILE, "AI_PROVIDER", provider) - - if provider == "google" and model: - GOOGLE_MODEL = model - set_key(ENV_FILE, "GOOGLE_MODEL", model) - elif provider == "openai" and model: - OPENAI_MODEL = model - set_key(ENV_FILE, "OPENAI_MODEL", model) - elif provider == "ollama" and model: - OLLAMA_MODEL = model - set_key(ENV_FILE, "OLLAMA_MODEL", model) - # Wenn eine Ollama-URL mitgeschickt wurde, speichern wir sie - if ollama_url: - OLLAMA_BASE_URL = ollama_url - set_key(ENV_FILE, "OLLAMA_BASE_URL", ollama_url) +@app.get("/refresh_status/{node_id}") +async def refresh_status(node_id: int): - return {"status": "success"} - -@app.get("/api/models") -async def get_models(provider: str, url: str = None): - try: - models = [] - - if provider == "ollama" and url: - # Das Backend hat keine CORS-Probleme und fragt Ollama direkt - clean_url = url.replace("/v1", "").rstrip("/") - async with httpx.AsyncClient() as client: - response = await client.get(f"{clean_url}/api/tags", timeout=5.0) - data = response.json() - models = [m["name"] for m in data.get("models", [])] - - elif provider == "openai": - if not OPENAI_API_KEY or "hier" in OPENAI_API_KEY: return {"models": []} - # Hier greift das Backend auf deinen OpenAI API-Key zu - import openai - client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) - response = await client.models.list() - # Nur GPT-Modelle filtern, um die Liste sauber zu halten - models = [m.id for m in response.data if "gpt" in m.id or "o1" in m.id] - models.sort() - - elif provider == "google": - if not GOOGLE_API_KEY: - return {"models": ["API-Key fehlt"]} - - client = genai.Client(api_key=GOOGLE_API_KEY) - models = [] - - # Im neuen SDK (google-genai) heißt das Feld 'supported_actions' - for m in client.models.list(): - if 'generateContent' in m.supported_actions: - # Wir nehmen den Namen und entfernen das 'models/' Präfix - model_name = m.name.replace("models/", "") - models.append(model_name) - - models.sort() - - return {"models": models} - - except Exception as e: - print(f"Fehler beim Abrufen der Modelle für {provider}: {str(e)}") - return {"models": []} # Gibt eine leere Liste zurück -> Frontend nutzt Fallback - -# --- WebSockets --- - -@app.websocket("/ws/install_logs") -async def log_websocket(websocket: WebSocket): - await manager.connect(websocket) - try: - while True: - await websocket.receive_text() - except WebSocketDisconnect: - manager.disconnect(websocket) - -@app.websocket("/ws/terminal/{ip}") -async def terminal_websocket(websocket: WebSocket, ip: str): - await websocket.accept() conn = get_db() - node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() + node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() + + if not node: + return JSONResponse({"status": "Unknown"}) + + ip = node["ip"] + user = node["user"] + + try: + + ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" + subprocess.check_output(ssh_cmd, shell=True) + + status = "Online" + + except: + + status = "Offline" + + conn.execute("UPDATE nodes SET status=? WHERE id=?", (status, node_id)) + conn.commit() + conn.close() + + return {"status": status} + +# ----------------------------- +# NODE EDIT +# ----------------------------- + +@app.get("/node/{node_id}") +async def get_node(node_id: int): + + conn = get_db() + node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() + conn.close() + + if not node: + return JSONResponse({}) + + return dict(node) + + +@app.post("/update_node/{node_id}") +async def update_node( + node_id: int, + name: str = Form(...), + ip: str = Form(...), + user: str = Form(...) +): + + conn = get_db() + + conn.execute( + """ + UPDATE nodes + SET name=?, ip=?, user=? + WHERE id=? + """, + (name, ip, user, node_id) + ) + + conn.commit() + conn.close() + + return RedirectResponse("/", 303) + +# ----------------------------- +# TERMINAL +# ----------------------------- + +@app.websocket("/ws/terminal/{ip}") +async def terminal(websocket: WebSocket, ip: str): + + await websocket.accept() + + conn = get_db() + node = conn.execute("SELECT * FROM nodes WHERE ip=?", (ip,)).fetchone() conn.close() if not node: - await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n") await websocket.close() return - # Pseudo-Terminal für interaktive SSH-Session master_fd, slave_fd = pty.openpty() + proc = await asyncio.create_subprocess_exec( - "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", - stdin=slave_fd, stdout=slave_fd, stderr=slave_fd + "ssh", + "-o", + "StrictHostKeyChecking=no", + "-t", + f"{node['user']}@{ip}", + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd ) async def pty_to_ws(): - # Setzt den Master-FD auf non-blocking + fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - try: - while True: - await asyncio.sleep(0.01) - try: - data = os.read(master_fd, 1024).decode(errors='ignore') - if data: - await websocket.send_text(data) - except BlockingIOError: - continue - except Exception: pass + + while True: + + await asyncio.sleep(0.01) + + try: + data = os.read(master_fd, 1024).decode(errors="ignore") + + if data: + await websocket.send_text(data) + + except: + pass async def ws_to_pty(): + try: while True: + data = await websocket.receive_text() os.write(master_fd, data.encode()) - except Exception: pass - try: - await asyncio.gather(pty_to_ws(), ws_to_pty()) - finally: - if proc.returncode is None: proc.terminate() - os.close(master_fd) - os.close(slave_fd) + except: + pass -# --- WEBSOCKET CHAT UPDATE --- - -@app.websocket("/ws/chat") -async def chat_endpoint(websocket: WebSocket): - await websocket.accept() - - # Check ob Key vorhanden ist - if AI_PROVIDER == "google" and not GOOGLE_API_KEY: - await websocket.send_text("⚠️ **Konfigurationsfehler:** Kein GOOGLE_API_KEY in der `.env` gefunden!") - elif AI_PROVIDER == "openai" and not OPENAI_API_KEY: - await websocket.send_text("⚠️ **Konfigurationsfehler:** Kein OPENAI_API_KEY in der `.env` gefunden!") - - try: - while True: - user_msg = await websocket.receive_text() - sys_prompt = get_system_prompt() - ai_response = await get_ai_response(user_msg, sys_prompt) - - # Befehle extrahieren - commands_to_run = re.findall(r'(.*?)', ai_response, re.IGNORECASE | re.DOTALL) - clean_chat_msg = re.sub(r'.*?', '', ai_response, flags=re.IGNORECASE | re.DOTALL).strip() - - if clean_chat_msg: - await websocket.send_text(clean_chat_msg) - - if commands_to_run: - # Liste für alle laufenden Tasks erstellen - tasks = [] - for target_ip, cmd in commands_to_run: - conn = get_db() - node = conn.execute('SELECT * FROM nodes WHERE ip = ? OR name = ?', (target_ip.strip(), target_ip.strip())).fetchone() - conn.close() - - if node: - # Wir erstellen den Task, starten ihn aber noch nicht separat - tasks.append(run_remote_task(node['ip'], node['user'], cmd.strip(), "KI-Kommando")) - - if tasks: - # Dem Nutzer im Chat kurz Bescheid geben - await websocket.send_text("ℹ️ *Warte auf Rückmeldungen der Nodes...*") - - # Jetzt werden alle SSH-Befehle gleichzeitig gestartet und abgewartet - await asyncio.gather(*tasks) - - # Sobald asyncio.gather fertig ist, geht es hier weiter mit dem Follow-up: - follow_up_prompt = "Die Befehle wurden ausgeführt. Bitte fasse die Ergebnisse kurz zusammen." - ai_summary = await get_ai_response(follow_up_prompt, sys_prompt) - - await websocket.send_text("--- Ergebnis-Zusammenfassung ---") - await websocket.send_text(ai_summary) - - except Exception as e: - print(f"Chat Fehler: {e}") - -async def run_remote_task(ip, user, cmd, task_name): - global chat_history # Zugriff auf das Gedächtnis der KI - - await manager.broadcast(f"🚀 KI-Task gestartet: {cmd} auf {ip}") - - ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" - - process = await asyncio.create_subprocess_shell( - ssh_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT - ) - - # Hier speichern wir die komplette Terminal-Ausgabe - full_output = "" - - while True: - line = await process.stdout.readline() - if not line: - break - decoded_line = line.decode('utf-8', errors='ignore').strip() - if decoded_line: - await manager.broadcast(f"🛠️ {decoded_line}") - full_output += decoded_line + "\n" - # Gib dem Event-Loop kurz Zeit, andere Tasks (wie WebSockets) zu bedienen - await asyncio.sleep(0.001) - - await process.wait() - - # --- NEU: Feedback an die KI --- - # Wir bereiten den Bericht vor - if not full_output.strip(): - full_output = "Befehl wurde ohne Ausgabe ausgeführt (Exit Code 0)." - - system_report = f"[SYSTEM-RÜCKMELDUNG] Der Befehl '{cmd}' auf Node {ip} wurde beendet. Ausgabe des Terminals:\n{full_output}" - - # Wir schmuggeln den Bericht als "User"-Nachricht in den Verlauf, - # damit die KI beim nächsten Mal weiß, was passiert ist. - chat_history.append({"role": "user", "content": system_report}) - # ------------------------------- - - if "docker" in cmd.lower() and "install" in cmd.lower(): - # Kleiner Bonus: Nur updaten, wenn wirklich installiert wird - await manager.broadcast(f"✨ Bitte aktualisiere den Status für {ip} manuell über das Refresh-Icon.") - - await manager.broadcast(f"✅ Befehl auf {ip} abgeschlossen.") - -# --- Neuer Endpunkt: Manueller Refresh-Check --- -@app.get("/refresh_status/{node_id}") -async def refresh_status(node_id: int): - conn = get_db() - node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() - - if node: - # Kurzer Check via SSH, ob Docker antwortet - check_cmd = "command -v docker >/dev/null 2>&1 && echo 'Docker Aktiv' || echo 'Bereit (Kein Docker)'" - ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=2 {node['user']}@{node['ip']} \"{check_cmd}\"" - try: - # Wir führen den Befehl aus - new_status = subprocess.check_output(ssh_cmd, shell=True).decode().strip() - except Exception: - new_status = "Offline/Fehler" - - conn.execute('UPDATE nodes SET status = ? WHERE id = ?', (new_status, node_id)) - conn.commit() - conn.close() - return {"status": new_status} # Wir senden nur den Status zurück - - conn.close() - return {"status": "Unbekannt"} + await asyncio.gather(pty_to_ws(), ws_to_pty()) if __name__ == "__main__": + import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + + uvicorn.run(app, host="0.0.0.0", port=8000)