import os
import pty
import fcntl
import subprocess
import sqlite3
import asyncio
import openai
import re
import httpx
from google import genai
from google.genai import types
import json
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv, set_key
# Lade Umgebungsvariablen aus der .env Datei
load_dotenv()
app = FastAPI()
static_path = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_path), name="static")
templates = Jinja2Templates(directory="templates")
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DB_PATH = "cluster.db"
chat_history = []
PROMPT_FILE = "system_prompt.txt"
ENV_FILE = os.path.join(os.path.dirname(__file__), ".env")
# --- KI KONFIGURATION ---
AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.5-flash")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3")
def get_system_prompt():
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {'Ja' if n['docker'] else 'Nein'}\n"
if os.path.exists(PROMPT_FILE):
with open(PROMPT_FILE, "r", encoding="utf-8") as f:
template = f.read()
else:
template = "Du bist ein Helfer. Nodes:\n{node_info}\nNutze cmd"
print(f"⚠️ Warnung: {PROMPT_FILE} nicht gefunden. Nutze Fallback.")
return template.replace("{node_info}", node_info)
# --- KI FUNKTIONEN ---
async def get_ai_response(user_input, system_prompt):
global chat_history
chat_history.append({"role": "user", "content": user_input})
chat_history = chat_history[-30:]
ai_msg = ""
try:
if AI_PROVIDER == "openai" or AI_PROVIDER == "ollama":
messages = [{"role": "system", "content": system_prompt}] + chat_history
if AI_PROVIDER == "ollama":
url = OLLAMA_BASE_URL
if not url.endswith('/v1') and not url.endswith('/v1/'): url = url.rstrip('/') + '/v1'
key = "ollama"
model_to_use = OLLAMA_MODEL
else:
url = None
key = OPENAI_API_KEY
model_to_use = OPENAI_MODEL
client = openai.OpenAI(base_url=url, api_key=key)
response = client.chat.completions.create(model=model_to_use, messages=messages)
ai_msg = response.choices[0].message.content
elif AI_PROVIDER == "google":
if not GOOGLE_API_KEY: return "Fehler: GOOGLE_API_KEY fehlt in der .env Datei!"
client = genai.Client(api_key=GOOGLE_API_KEY)
google_history = []
for msg in chat_history[:-1]:
role = "user" if msg["role"] == "user" else "model"
google_history.append(types.Content(role=role, parts=[types.Part.from_text(text=msg["content"])]))
chat = client.chats.create(model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history)
response = chat.send_message(user_input)
ai_msg = response.text
except Exception as e:
ai_msg = f"Fehler bei der KI-Anfrage: {e}"
print(f"KI Fehler: {e}")
chat_history.append({"role": "assistant", "content": ai_msg})
return ai_msg
# --- DATENBANK INITIALISIERUNG & MIGRATION ---
def init_db():
conn = sqlite3.connect(DB_PATH)
# Basis-Tabelle erstellen
conn.execute('''
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT
)
''')
# Prüfen, ob die neuen Spalten existieren, und sie ansonsten anhängen (Migration)
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(nodes)")
columns = [info[1] for info in cursor.fetchall()]
new_columns = {
"sudo_pass": "TEXT DEFAULT ''",
"os": "TEXT DEFAULT 'Unbekannt'",
"arch": "TEXT DEFAULT 'Unbekannt'",
"docker": "INTEGER DEFAULT 0",
"vnc": "INTEGER DEFAULT 0"
}
for col, dtype in new_columns.items():
if col not in columns:
conn.execute(f"ALTER TABLE nodes ADD COLUMN {col} {dtype}")
print(f"Datenbank aktualisiert: Spalte '{col}' hinzugefügt.")
conn.commit()
conn.close()
init_db()
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# --- WebSocket Manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try: await connection.send_text(message)
except: pass
manager = ConnectionManager()
# --- SSH Handshake ---
async def bootstrap_ssh_only(ip, user, password, sudo_pass=""):
await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...")
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
await manager.broadcast(f"SSH: {line.strip()}")
# Nach dem Handshake sofort die Node-Infos (OS, Arch) abrufen
conn = get_db()
node = conn.execute("SELECT id FROM nodes WHERE ip=?", (ip,)).fetchone()
conn.close()
if node:
await check_and_update_node(node['id'])
await manager.broadcast(f"✅ Node {ip} verbunden und analysiert.")
# --- AUTO-REFRESH (Alle 60 Sekunden) ---
@app.on_event("startup")
async def start_auto_refresh():
asyncio.create_task(auto_refresh_loop())
async def auto_refresh_loop():
while True:
await asyncio.sleep(60) # Alle 60 Sekunden
conn = get_db()
nodes = conn.execute('SELECT id, ip FROM nodes').fetchall()
conn.close()
for n in nodes:
# 1. Schneller Ping-Check, um lange SSH-Timeouts bei Offline-Nodes zu vermeiden
proc = await asyncio.create_subprocess_exec(
"ping", "-c", "1", "-W", "1", n['ip'],
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL
)
await proc.wait()
if proc.returncode == 0:
# Node ist online, führe SSH Info-Check durch
await check_and_update_node(n['id'])
else:
# Node ist offline
conn = get_db()
conn.execute("UPDATE nodes SET status='Offline' WHERE id=?", (n['id'],))
conn.commit()
conn.close()
# Hilfsfunktion für den Info-Check via SSH
async def check_and_update_node(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
if not node:
conn.close()
return "Node nicht gefunden"
# Dieser geniale Einzeiler holt uns Architektur, Docker-Status und das OS
check_cmd = """
arch=$(uname -m);
dock=$(command -v docker >/dev/null 2>&1 && echo 1 || echo 0);
os=$(grep '^ID=' /etc/os-release 2>/dev/null | cut -d= -f2 | tr -d '"');
[ -z "$os" ] && os=$(uname -s);
vnc=$(command -v vncserver >/dev/null 2>&1 && echo 1 || echo 0);
echo "$arch|$dock|$os|$vnc"
"""
ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 {node['user']}@{node['ip']} '{check_cmd}'"
try:
output = subprocess.check_output(ssh_cmd, shell=True).decode().strip()
arch, dock, os_name, vnc = output.split('|')
status = "Docker Aktiv" if dock == "1" else "Bereit (Kein Docker)"
conn.execute('''
UPDATE nodes
SET status=?, arch=?, docker=?, os=?, vnc=?
WHERE id=?
''', (status, arch, int(dock), os_name, int(vnc), node_id))
conn.commit()
conn.close()
return status
except Exception as e:
conn.execute("UPDATE nodes SET status='Offline/Fehler' WHERE id=?", (node_id,))
conn.commit()
conn.close()
return "Offline/Fehler"
# --- Routen ---
@app.get("/")
async def index(request: Request):
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
@app.post("/add_node")
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...), sudo_pass: str = Form(default="")):
conn = get_db()
try:
conn.execute('''
INSERT INTO nodes (name, ip, user, sudo_pass, status)
VALUES (?, ?, ?, ?, ?)
''', (name, ip, user, sudo_pass, "Kopplung..."))
conn.commit()
background_tasks.add_task(bootstrap_ssh_only, ip, user, password, sudo_pass)
except sqlite3.IntegrityError: pass
finally: conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/remove_node/{node_id}")
async def remove_node(node_id: int):
conn = get_db()
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
# --- NEU: Endpunkt um Node-Daten abzufragen (für das Edit-Fenster) ---
@app.get("/api/node/{node_id}")
async def get_node_details(node_id: int):
conn = get_db()
node = conn.execute('SELECT id, name, ip, user, sudo_pass FROM nodes WHERE id = ?', (node_id,)).fetchone()
conn.close()
if node:
return dict(node)
return {"error": "Node nicht gefunden"}
# --- NEU: Endpunkt um Node-Daten zu speichern ---
@app.post("/edit_node/{node_id}")
async def edit_node(node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...), sudo_pass: str = Form(default="")):
conn = get_db()
conn.execute('''
UPDATE nodes SET name=?, ip=?, user=?, sudo_pass=? WHERE id=?
''', (name, ip, user, sudo_pass, node_id))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
@app.get("/refresh_status/{node_id}")
async def refresh_status_endpoint(node_id: int):
# Ruft unsere neue mächtige Info-Update-Funktion auf
new_status = await check_and_update_node(node_id)
# Die neu geladenen Daten aus der DB holen um sie ans Frontend zu schicken
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id=?', (node_id,)).fetchone()
conn.close()
if node:
return {
"status": node["status"],
"os": node["os"],
"arch": node["arch"],
"docker": node["docker"],
"vnc": node["vnc"]
}
return {"status": "Offline/Fehler"}
# --- EINSTELLUNGEN & KI MODELLE ---
@app.get("/api/settings")
async def get_settings():
return {
"provider": AI_PROVIDER,
"google_model": GOOGLE_MODEL,
"openai_model": OPENAI_MODEL,
"ollama_model": OLLAMA_MODEL,
"ollama_base_url": OLLAMA_BASE_URL
}
@app.post("/api/settings")
async def update_settings(request: Request):
global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL
data = await request.json()
provider = data.get("provider")
model = data.get("model")
ollama_url = data.get("ollama_base_url")
if provider:
AI_PROVIDER = provider
set_key(ENV_FILE, "AI_PROVIDER", provider)
if provider == "google" and model:
GOOGLE_MODEL = model
set_key(ENV_FILE, "GOOGLE_MODEL", model)
elif provider == "openai" and model:
OPENAI_MODEL = model
set_key(ENV_FILE, "OPENAI_MODEL", model)
elif provider == "ollama" and model:
OLLAMA_MODEL = model
set_key(ENV_FILE, "OLLAMA_MODEL", model)
if ollama_url:
OLLAMA_BASE_URL = ollama_url
set_key(ENV_FILE, "OLLAMA_BASE_URL", ollama_url)
return {"status": "success"}
@app.get("/api/models")
async def get_models(provider: str, url: str = None):
try:
models = []
if provider == "ollama" and url:
clean_url = url.replace("/v1", "").rstrip("/")
async with httpx.AsyncClient() as client:
response = await client.get(f"{clean_url}/api/tags", timeout=5.0)
data = response.json()
models = [m["name"] for m in data.get("models", [])]
elif provider == "openai":
if not OPENAI_API_KEY or "hier" in OPENAI_API_KEY: return {"models": []}
import openai
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
response = await client.models.list()
models = [m.id for m in response.data if "gpt" in m.id or "o1" in m.id]
models.sort()
elif provider == "google":
if not GOOGLE_API_KEY: return {"models": ["API-Key fehlt"]}
client = genai.Client(api_key=GOOGLE_API_KEY)
for m in client.models.list():
if 'generateContent' in m.supported_actions:
models.append(m.name.replace("models/", ""))
models.sort()
return {"models": models}
except Exception as e:
print(f"Fehler: {str(e)}")
return {"models": []}
# --- WebSockets Logs & Terminal (Bleibt identisch) ---
@app.websocket("/ws/install_logs")
async def log_websocket(websocket: WebSocket):
await manager.connect(websocket)
try:
while True: await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/terminal/{ip}")
async def terminal_websocket(websocket: WebSocket, ip: str):
await websocket.accept()
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone()
conn.close()
if not node:
await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n")
await websocket.close()
return
master_fd, slave_fd = pty.openpty()
proc = await asyncio.create_subprocess_exec(
"ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}",
stdin=slave_fd, stdout=slave_fd, stderr=slave_fd
)
async def pty_to_ws():
fl = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
while True:
await asyncio.sleep(0.01)
try:
data = os.read(master_fd, 1024).decode(errors='ignore')
if data: await websocket.send_text(data)
except BlockingIOError: continue
except Exception: pass
async def ws_to_pty():
try:
while True:
data = await websocket.receive_text()
os.write(master_fd, data.encode())
except Exception: pass
try:
await asyncio.gather(pty_to_ws(), ws_to_pty())
finally:
if proc.returncode is None: proc.terminate()
os.close(master_fd)
os.close(slave_fd)
# --- WEBSOCKET CHAT UPDATE ---
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
if AI_PROVIDER == "google" and not GOOGLE_API_KEY:
await websocket.send_text("⚠️ Kein GOOGLE_API_KEY in der `.env` gefunden!")
elif AI_PROVIDER == "openai" and not OPENAI_API_KEY:
await websocket.send_text("⚠️ Kein OPENAI_API_KEY in der `.env` gefunden!")
try:
while True:
user_msg = await websocket.receive_text()
sys_prompt = get_system_prompt()
ai_response = await get_ai_response(user_msg, sys_prompt)
commands_to_run = re.findall(r'(.*?)', ai_response, re.IGNORECASE | re.DOTALL)
clean_chat_msg = re.sub(r'.*?', '', ai_response, flags=re.IGNORECASE | re.DOTALL).strip()
if clean_chat_msg: await websocket.send_text(clean_chat_msg)
if commands_to_run:
tasks = []
for target_ip, cmd in commands_to_run:
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE ip = ? OR name = ?', (target_ip.strip(), target_ip.strip())).fetchone()
conn.close()
if node:
# KI soll sudo mit dem hinterlegten Passwort nutzen
if "sudo " in cmd and node['sudo_pass']:
# Ersetze `sudo Befehl` mit `echo 'passwort' | sudo -S Befehl`
cmd = cmd.replace("sudo ", f"echo '{node['sudo_pass']}' | sudo -S ")
tasks.append(run_remote_task(node['ip'], node['user'], cmd.strip(), "KI-Kommando"))
if tasks:
await websocket.send_text("ℹ️ *Warte auf Rückmeldungen der Nodes...*")
await asyncio.gather(*tasks)
follow_up_prompt = "Die Befehle wurden ausgeführt. Bitte fasse die Ergebnisse kurz zusammen."
ai_summary = await get_ai_response(follow_up_prompt, sys_prompt)
await websocket.send_text("--- Ergebnis ---")
await websocket.send_text(ai_summary)
except Exception as e: print(f"Chat Fehler: {e}")
async def run_remote_task(ip, user, cmd, task_name):
global chat_history
await manager.broadcast(f"🚀 KI-Task gestartet: {cmd} auf {ip}")
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'"
process = await asyncio.create_subprocess_shell(ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
full_output = ""
while True:
line = await process.stdout.readline()
if not line: break
decoded_line = line.decode('utf-8', errors='ignore').strip()
if decoded_line:
await manager.broadcast(f"🛠️ {decoded_line}")
full_output += decoded_line + "\n"
await asyncio.sleep(0.001)
await process.wait()
if not full_output.strip(): full_output = "Befehl wurde ohne Ausgabe ausgeführt."
system_report = f"[SYSTEM] Befehl '{cmd}' auf {ip} beendet. Ausgabe:\n{full_output}"
chat_history.append({"role": "user", "content": system_report})
await manager.broadcast(f"✅ Befehl auf {ip} abgeschlossen.")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)