diff --git a/main.py b/main.py index fe011e9..8d6ffa8 100644 --- a/main.py +++ b/main.py @@ -1,241 +1,353 @@ import os -import json -import threading -import time -from flask import Flask, render_template, request, jsonify, redirect, url_for -from flask_sqlalchemy import SQLAlchemy -from flask_sock import Sock -import paramiko -import requests - -# AI Provider Imports +import pty +import fcntl +import subprocess +import sqlite3 +import asyncio import openai -import google.generativeai as genai +import re +import httpx +from google import genai +from google.genai import types +import json +from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates +from fastapi.staticfiles import StaticFiles +from dotenv import load_dotenv, set_key -app = Flask(__name__) -app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///orchestrator.db' -app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False -db = SQLAlchemy(app) -sock = Sock(app) +# Lade Umgebungsvariablen +load_dotenv() -# --- DATENBANK MODELLE --- +app = FastAPI() +static_path = os.path.join(os.path.dirname(__file__), "static") +app.mount("/static", StaticFiles(directory=static_path), name="static") +templates = Jinja2Templates(directory="templates") -class Node(db.Model): - id = db.Column(db.Integer, primary_key=True) - name = db.Column(db.String(100), nullable=False) - ip = db.Column(db.String(50), nullable=False) - user = db.Column(db.String(50), nullable=False) - password = db.Column(db.String(100), nullable=False) - status = db.Column(db.String(50), default="Unbekannt") - arch = db.Column(db.String(20), default="N/A") - os_type = db.Column(db.String(50), default="linux") - has_docker = db.Column(db.Boolean, default=False) - has_vnc = db.Column(db.Boolean, default=False) +SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") +DB_PATH = "cluster.db" +chat_history = [] +PROMPT_FILE = "system_prompt.txt" +ENV_FILE = os.path.join(os.path.dirname(__file__), ".env") - def to_dict(self): - return {c.name: getattr(self, c.name) for c in self.__table__.columns} +# KI KONFIGURATION +AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") +OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1") -class Settings(db.Model): - id = db.Column(db.Integer, primary_key=True) - provider = db.Column(db.String(50), default="google") - google_model = db.Column(db.String(100), default="gemini-1.5-flash") - openai_model = db.Column(db.String(100), default="gpt-4o-mini") - ollama_model = db.Column(db.String(100), default="llama3") - ollama_base_url = db.Column(db.String(200), default="http://localhost:11434/v1") +GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.0-flash") +OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") +OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3") -with app.app_context(): - db.create_all() - if not Settings.query.first(): - db.session.add(Settings()) - db.session.commit() +# --- DATENBANK INITIALISIERUNG (ERWEITERT) --- +def init_db(): + conn = sqlite3.connect(DB_PATH) + # Spalten erweitert um sudo_password, os, arch, docker_installed + conn.execute(''' + CREATE TABLE IF NOT EXISTS nodes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + ip TEXT UNIQUE, + user TEXT, + sudo_password TEXT, + os TEXT DEFAULT 'Unbekannt', + arch TEXT DEFAULT 'Unbekannt', + docker_installed INTEGER DEFAULT 0, + status TEXT + ) + ''') + conn.commit() + conn.close() -# --- HILFSFUNKTIONEN --- +init_db() -def get_ssh_client(node): - client = paramiko.SSHClient() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect(node.ip, username=node.user, password=node.password, timeout=5) - return client +def get_db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn -def run_ssh_cmd(node, cmd): - try: - client = get_ssh_client(node) - stdin, stdout, stderr = client.exec_command(cmd) - result = stdout.read().decode().strip() - client.close() - return result - except: - return None - -# --- ROUTEN --- - -@app.route('/') -def index(): - nodes = Node.query.all() - return render_template('index.html', nodes=nodes) - -@app.route('/add_node', methods=['POST']) -def add_node(): - new_node = Node( - name=request.form['name'], - ip=request.form['ip'], - user=request.form['user'], - password=request.form['password'] - ) - db.session.add(new_node) - db.session.commit() - return redirect(url_for('index')) - -@app.route('/edit_node/', methods=['POST']) -def edit_node(id): - node = Node.query.get_or_404(id) - node.name = request.form.get('name', node.name) - node.ip = request.form.get('ip', node.ip) - node.user = request.form.get('user', node.user) - if request.form.get('password'): - node.password = request.form.get('password') - db.session.commit() - return redirect(url_for('index')) - -@app.route('/remove_node/', methods=['POST']) -def remove_node(id): - node = Node.query.get_or_404(id) - db.session.delete(node) - db.session.commit() - return redirect(url_for('index')) - -@app.route('/refresh_status/') -def refresh_status(id): - node = Node.query.get_or_404(id) - try: - # Architektur prüfen - node.arch = run_ssh_cmd(node, "uname -m") or "N/A" - # OS Typ prüfen - os_info = run_ssh_cmd(node, "cat /etc/os-release | grep ^ID=") - node.os_type = os_info.split('=')[1].replace('"', '') if os_info else "linux" - # Docker prüfen - docker_check = run_ssh_cmd(node, "docker ps") - node.has_docker = True if docker_check is not None else False - # VNC prüfen (Port 5900/5901) - vnc_check = run_ssh_cmd(node, "netstat -tuln | grep :590") - node.has_vnc = True if vnc_check else False - - node.status = "Online" - except: - node.status = "Offline" +def get_system_prompt(): + conn = get_db() + nodes = conn.execute('SELECT * FROM nodes').fetchall() + conn.close() - db.session.commit() - return jsonify(node.to_dict()) - -# --- SETTINGS API --- - -@app.route('/api/settings', methods=['GET', 'POST']) -def handle_settings(): - s = Settings.query.first() - if request.method == 'POST': - data = request.json - s.provider = data.get('provider', s.provider) - s.ollama_base_url = data.get('ollama_base_url', s.ollama_base_url) - # Speichere das Modell für den aktuellen Provider - setattr(s, f"{s.provider}_model", data.get('model')) - db.session.commit() - return jsonify({"status": "success"}) - return jsonify({ - "provider": s.provider, - "google_model": s.google_model, - "openai_model": s.openai_model, - "ollama_model": s.ollama_model, - "ollama_base_url": s.ollama_base_url - }) - -@app.route('/api/models') -def get_models(): - provider = request.args.get('provider') - if provider == "google": - return jsonify({"models": ["gemini-1.5-flash", "gemini-1.5-pro"]}) - elif provider == "openai": - return jsonify({"models": ["gpt-4o-mini", "gpt-4o", "gpt-3.5-turbo"]}) - elif provider == "ollama": - url = request.args.get('url', 'http://localhost:11434/v1') - try: - # Versuche Modelle von Ollama API zu laden - r = requests.get(url.replace('/v1', '/api/tags'), timeout=2) - names = [m['name'] for m in r.json().get('models', [])] - return jsonify({"models": names}) - except: - return jsonify({"models": ["llama3", "mistral", "codellama"]}) - return jsonify({"models": []}) - -# --- WEBSOCKETS --- - -@sock.route('/ws/install_logs') -def install_logs(ws): - # Dummy Log Stream für Demo-Zwecke - while True: - data = ws.receive(timeout=1) - # Hier könnten echte Hintergrund-Prozesse ihre Logs senden - pass - -@sock.route('/ws/chat') -def chat_handler(ws): - s = Settings.query.first() - while True: - msg = ws.receive() - if not msg: break - - # KI LOGIK - response = "Fehler: Provider nicht konfiguriert" - try: - if s.provider == "google": - genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) - model = genai.GenerativeModel(s.google_model) - response = model.generate_content(msg).text - elif s.provider == "openai": - client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - res = client.chat.completions.create( - model=s.openai_model, - messages=[{"role": "user", "content": msg}] - ) - response = res.choices[0].message.content - elif s.provider == "ollama": - r = requests.post(f"{s.ollama_base_url}/chat/completions", json={ - "model": s.ollama_model, - "messages": [{"role": "user", "content": msg}], - "stream": False - }) - response = r.json()['choices'][0]['message']['content'] - except Exception as e: - response = f"KI-Fehler: {str(e)}" - - ws.send(response) - -@sock.route('/ws/terminal/') -def terminal_handler(ws, ip): - node = Node.query.filter_by(ip=ip).first() - if not node: return + node_info = "" + for n in nodes: + docker_str = "Ja" if n['docker_installed'] else "Nein" + node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n" + if os.path.exists(PROMPT_FILE): + with open(PROMPT_FILE, "r", encoding="utf-8") as f: + template = f.read() + else: + template = "Du bist ein Cluster-Orchestrator. Nodes:\n{node_info}\nBefehle via cmd" + print(f"⚠️ Warnung: {PROMPT_FILE} fehlt.") + + return template.replace("{node_info}", node_info) + +# --- KI LOGIK (UNVERÄNDERT) --- +async def get_ai_response(user_input, system_prompt): + global chat_history + chat_history.append({"role": "user", "content": user_input}) + chat_history = chat_history[-30:] + ai_msg = "" try: - client = get_ssh_client(node) - chan = client.invoke_shell() - - def split_reader(): - while True: - if chan.recv_ready(): - out = chan.recv(1024).decode() - ws.send(out) - time.sleep(0.01) - - threading.Thread(target=split_reader, daemon=True).start() - - while True: - cmd = ws.receive() - if not cmd: break - chan.send(cmd) - + if AI_PROVIDER in ["openai", "ollama"]: + url = OLLAMA_BASE_URL if AI_PROVIDER == "ollama" else None + if url and not url.endswith('/v1'): url = url.rstrip('/') + '/v1' + key = "ollama" if AI_PROVIDER == "ollama" else OPENAI_API_KEY + model_to_use = OLLAMA_MODEL if AI_PROVIDER == "ollama" else OPENAI_MODEL + client = openai.OpenAI(base_url=url, api_key=key) + response = client.chat.completions.create(model=model_to_use, messages=[{"role": "system", "content": system_prompt}] + chat_history) + ai_msg = response.choices[0].message.content + elif AI_PROVIDER == "google": + client = genai.Client(api_key=GOOGLE_API_KEY) + google_history = [types.Content(role="user" if m["role"] == "user" else "model", parts=[types.Part.from_text(text=msg["content"])]) for msg in chat_history[:-1]] + chat = client.chats.create(model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history) + response = chat.send_message(user_input) + ai_msg = response.text except Exception as e: - ws.send(f"\r\n[SSH FEHLER]: {str(e)}\r\n") + ai_msg = f"KI Fehler: {e}" + chat_history.append({"role": "assistant", "content": ai_msg}) + return ai_msg -if __name__ == '__main__': - # Stelle sicher, dass API Keys in der Umgebung sind oder hier hartcodiert werden - # os.environ["GOOGLE_API_KEY"] = "DEIN_KEY" - app.run(host='0.0.0.0', port=5000, debug=True) \ No newline at end of file +# --- WebSocket Manager --- +class ConnectionManager: + def __init__(self): self.active_connections = [] + async def connect(self, ws: WebSocket): await ws.accept(); self.active_connections.append(ws) + def disconnect(self, ws: WebSocket): self.active_connections.remove(ws) + async def broadcast(self, msg: str): + for c in self.active_connections: + try: await c.send_text(msg) + except: pass +manager = ConnectionManager() + +# --- ERWEITERTES NODE BOOTSTRAPPING (Inventur) --- +async def bootstrap_node(ip, user, password): + await manager.broadcast(f"🔑 SSH-Handshake für {ip}...") + # 1. Key kopieren + ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" + proc = subprocess.run(ssh_copy_cmd, shell=True, capture_output=True, text=True) + + if proc.returncode != 0: + await manager.broadcast(f"❌ Fehler beim Key-Copy für {ip}: {proc.stderr}") + return + + # 2. System-Infos abrufen (Inventur) + await manager.broadcast(f"🔍 Inventur auf {ip} wird durchgeführt...") + + # Befehlskette: Arch, OS-Name, Docker-Check + inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')" + ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} \"{inspect_cmd}\"" + + try: + output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') + arch = output[0] if len(output) > 0 else "Unbekannt" + # Mapping für Architektur + if "aarch64" in arch.lower(): arch = "arm64" + elif "x86_64" in arch.lower(): arch = "x86-64" + + os_name = output[1].replace('"', '') if len(output) > 1 else "Linux" + docker_val = int(output[2]) if len(output) > 2 else 0 + + status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)" + + # Datenbank Update + conn = get_db() + conn.execute(''' + UPDATE nodes SET os = ?, arch = ?, docker_installed = ?, status = ? + WHERE ip = ? + ''', (os_name, arch, docker_val, status, ip)) + conn.commit() + conn.close() + await manager.broadcast(f"✅ Node {ip} konfiguriert ({os_name}, {arch}).") + except Exception as e: + await manager.broadcast(f"⚠️ Inventur auf {ip} unvollständig: {e}") + +# --- ROUTES --- + +@app.get("/") +async def index(request: Request): + conn = get_db() + nodes = conn.execute('SELECT * FROM nodes').fetchall() + conn.close() + return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) + +@app.get("/api/node/{node_id}") +async def get_node(node_id: int): + conn = get_db() + node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() + conn.close() + return dict(node) if node else {} + +@app.post("/add_node") +async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)): + conn = get_db() + try: + # Speichere Initialdaten inkl. Sudo-Passwort + conn.execute(''' + INSERT INTO nodes (name, ip, user, sudo_password, status) + VALUES (?, ?, ?, ?, ?) + ''', (name, ip, user, password, "Kopplung...")) + conn.commit() + background_tasks.add_task(bootstrap_node, ip, user, password) + except sqlite3.IntegrityError: pass + finally: conn.close() + return RedirectResponse(url="/", status_code=303) + +@app.post("/edit_node/{node_id}") +async def edit_node(node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...)): + conn = get_db() + conn.execute('UPDATE nodes SET name=?, ip=?, user=? WHERE id=?', (name, ip, user, node_id)) + conn.commit() + conn.close() + return RedirectResponse(url="/", status_code=303) + +@app.post("/remove_node/{node_id}") +async def remove_node(node_id: int): + conn = get_db() + conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,)) + conn.commit() + conn.close() + return RedirectResponse(url="/", status_code=303) + +@app.get("/refresh_status/{node_id}") +async def refresh_status(node_id: int): + conn = get_db() + node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone() + if not node: return {"status": "Offline"} + + inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')" + ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 {node['user']}@{node['ip']} \"{inspect_cmd}\"" + + try: + output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') + arch = output[0]; os_name = output[1].replace('"', ''); docker_val = int(output[2]) + if "aarch64" in arch.lower(): arch = "arm64" + elif "x86_64" in arch.lower(): arch = "x86-64" + new_status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)" + + conn.execute('UPDATE nodes SET status=?, os=?, arch=?, docker_installed=? WHERE id=?', + (new_status, os_name, arch, docker_val, node_id)) + conn.commit() + result = {"status": new_status, "os": os_name, "arch": arch, "docker": docker_val} + except: + new_status = "Offline" + conn.execute('UPDATE nodes SET status=? WHERE id=?', (new_status, node_id)) + conn.commit() + result = {"status": new_status, "os": node['os'], "arch": node['arch'], "docker": node['docker_installed']} + + conn.close() + return result + +# --- WebSockets Terminal / Chat / Logs (Integration wie gehabt) --- +@app.websocket("/ws/install_logs") +async def log_websocket(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: await websocket.receive_text() + except WebSocketDisconnect: manager.disconnect(websocket) + +@app.websocket("/ws/terminal/{ip}") +async def terminal_websocket(websocket: WebSocket, ip: str): + await websocket.accept() + conn = get_db(); node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone(); conn.close() + if not node: await websocket.close(); return + master_fd, slave_fd = pty.openpty() + proc = await asyncio.create_subprocess_exec("ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd) + async def pty_to_ws(): + fl = fcntl.fcntl(master_fd, fcntl.F_GETFL); fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + try: + while True: + await asyncio.sleep(0.01) + try: + data = os.read(master_fd, 1024).decode(errors='ignore') + if data: await websocket.send_text(data) + except BlockingIOError: continue + except: pass + async def ws_to_pty(): + try: + while True: + data = await websocket.receive_text(); os.write(master_fd, data.encode()) + except: pass + try: await asyncio.gather(pty_to_ws(), ws_to_pty()) + finally: + if proc.returncode is None: proc.terminate() + os.close(master_fd); os.close(slave_fd) + +@app.websocket("/ws/chat") +async def chat_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + user_msg = await websocket.receive_text() + ai_response = await get_ai_response(user_msg, get_system_prompt()) + commands = re.findall(r'(.*?)', ai_response, re.I | re.S) + clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip() + if clean_msg: await websocket.send_text(clean_msg) + if commands: + tasks = [] + for target, cmd in commands: + conn = get_db(); n = conn.execute('SELECT * FROM nodes WHERE ip=? OR name=?', (target.strip(), target.strip())).fetchone(); conn.close() + if n: tasks.append(run_remote_task(n['ip'], n['user'], cmd.strip())) + if tasks: + await websocket.send_text("ℹ️ *Führe Befehle aus...*") + await asyncio.gather(*tasks) + summary = await get_ai_response("Zusammenfassung der Ergebnisse?", get_system_prompt()) + await websocket.send_text(f"--- Info ---\n{summary}") + except: pass + +async def run_remote_task(ip, user, cmd): + await manager.broadcast(f"🚀 Task: {cmd} auf {ip}") + proc = await asyncio.create_subprocess_shell(f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) + full_output = "" + while True: + line = await proc.stdout.readline() + if not line: break + out = line.decode('utf-8', errors='ignore').strip() + if out: await manager.broadcast(f"🛠️ {out}"); full_output += out + "\n" + await proc.wait() + chat_history.append({"role": "user", "content": f"[SYSTEM] Befehl '{cmd}' auf {ip} fertig:\n{full_output or 'Kein Output'}"}) + +# --- Settings API --- +@app.get("/api/settings") +async def get_settings(): + return {"provider": AI_PROVIDER, "google_model": GOOGLE_MODEL, "openai_model": OPENAI_MODEL, "ollama_model": OLLAMA_MODEL, "ollama_base_url": OLLAMA_BASE_URL} + +@app.post("/api/settings") +async def update_settings(request: Request): + global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL + data = await request.json() + provider = data.get("provider") + if provider: + AI_PROVIDER = provider; set_key(ENV_FILE, "AI_PROVIDER", provider) + if data.get("model"): + m = data.get("model") + if provider == "google": GOOGLE_MODEL = m; set_key(ENV_FILE, "GOOGLE_MODEL", m) + if provider == "openai": OPENAI_MODEL = m; set_key(ENV_FILE, "OPENAI_MODEL", m) + if provider == "ollama": OLLAMA_MODEL = m; set_key(ENV_FILE, "OLLAMA_MODEL", m) + if data.get("ollama_base_url"): + u = data.get("ollama_base_url"); OLLAMA_BASE_URL = u; set_key(ENV_FILE, "OLLAMA_BASE_URL", u) + return {"status": "success"} + +@app.get("/api/models") +async def get_models(provider: str): + try: + if provider == "ollama": + url = OLLAMA_BASE_URL.replace("/v1", "").rstrip("/") + async with httpx.AsyncClient() as client: + r = await client.get(f"{url}/api/tags", timeout=5.0) + return {"models": [m["name"] for m in r.json().get("models", [])]} + elif provider == "openai": + client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) + r = await client.models.list() + return {"models": sorted([m.id for m in r.data if "gpt" in m.id or "o1" in m.id])} + elif provider == "google": + client = genai.Client(api_key=GOOGLE_API_KEY) + return {"models": sorted([m.name.replace("models/", "") for m in client.models.list() if 'generateContent' in m.supported_actions])} + except: return {"models": []} + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file