import os
import pty
import fcntl
import subprocess
import sqlite3
import asyncio
import openai
import re
import httpx
from google import genai
from google.genai import types
import json
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv, set_key
# Lade Umgebungsvariablen
load_dotenv()
app = FastAPI()
static_path = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_path), name="static")
templates = Jinja2Templates(directory="templates")
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DB_PATH = "cluster.db"
chat_history = []
PROMPT_FILE = "system_prompt.txt"
ENV_FILE = os.path.join(os.path.dirname(__file__), ".env")
# KI KONFIGURATION
AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.0-flash")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3")
# --- DATENBANK INITIALISIERUNG (ERWEITERT) ---
def init_db():
conn = sqlite3.connect(DB_PATH)
# Spalten erweitert um sudo_password, os, arch, docker_installed
conn.execute('''
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
ip TEXT UNIQUE,
user TEXT,
sudo_password TEXT,
os TEXT DEFAULT 'Unbekannt',
arch TEXT DEFAULT 'Unbekannt',
docker_installed INTEGER DEFAULT 0,
status TEXT
)
''')
conn.commit()
conn.close()
init_db()
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_system_prompt():
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
docker_str = "Ja" if n['docker_installed'] else "Nein"
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n"
if os.path.exists(PROMPT_FILE):
with open(PROMPT_FILE, "r", encoding="utf-8") as f:
template = f.read()
else:
template = "Du bist ein Cluster-Orchestrator. Nodes:\n{node_info}\nBefehle via cmd"
print(f"⚠️ Warnung: {PROMPT_FILE} fehlt.")
return template.replace("{node_info}", node_info)
# --- KI FUNKTIONEN ---
async def get_ai_response(user_input, system_prompt):
global chat_history
# 1. Die neue User-Nachricht dem Gedächtnis hinzufügen
chat_history.append({"role": "user", "content": user_input})
# 2. Gedächtnis auf die letzten 30 Nachrichten begrenzen
chat_history = chat_history[-30:]
ai_msg = ""
try:
if AI_PROVIDER == "openai" or AI_PROVIDER == "ollama":
messages = [{"role": "system", "content": system_prompt}] + chat_history
# Sicherstellen, dass die URL für Ollama korrekt endet
if AI_PROVIDER == "ollama":
url = OLLAMA_BASE_URL
if not url.endswith('/v1') and not url.endswith('/v1/'):
url = url.rstrip('/') + '/v1'
key = "ollama"
model_to_use = OLLAMA_MODEL
else:
url = None # Benutzt Standard OpenAI URL
key = OPENAI_API_KEY
model_to_use = OPENAI_MODEL
client = openai.OpenAI(base_url=url, api_key=key)
response = client.chat.completions.create(
model=model_to_use,
messages=messages
)
ai_msg = response.choices[0].message.content
elif AI_PROVIDER == "google":
# Für Google Gemini
if not GOOGLE_API_KEY:
return "Fehler: GOOGLE_API_KEY fehlt in der .env Datei!"
client = genai.Client(api_key=GOOGLE_API_KEY)
# Wir müssen unser Array in das spezielle Google-Format umwandeln
google_history = []
# Alle Nachrichten AUSSER der allerletzten (die aktuelle User-Frage) in die History packen
for msg in chat_history[:-1]:
role = "user" if msg["role"] == "user" else "model"
google_history.append(
types.Content(role=role, parts=[types.Part.from_text(text=msg["content"])])
)
# Chat MIT dem übersetzten Gedächtnis starten
chat = client.chats.create(
model=GOOGLE_MODEL,
config=types.GenerateContentConfig(system_instruction=system_prompt),
history=google_history
)
# Jetzt erst die neue Nachricht an den Chat mit Gedächtnis schicken
response = chat.send_message(user_input)
ai_msg = response.text
except Exception as e:
ai_msg = f"Fehler bei der KI-Anfrage: {e}"
print(f"KI Fehler: {e}")
# 3. Die Antwort der KI ebenfalls ins Gedächtnis aufnehmen
chat_history.append({"role": "assistant", "content": ai_msg})
return ai_msg
# --- WebSocket Manager ---
class ConnectionManager:
def __init__(self): self.active_connections = []
async def connect(self, ws: WebSocket): await ws.accept(); self.active_connections.append(ws)
def disconnect(self, ws: WebSocket): self.active_connections.remove(ws)
async def broadcast(self, msg: str):
for c in self.active_connections:
try: await c.send_text(msg)
except: pass
manager = ConnectionManager()
# --- ERWEITERTES NODE BOOTSTRAPPING (Inventur) ---
async def bootstrap_node(ip, user, password):
await manager.broadcast(f"🔑 SSH-Handshake für {ip}...")
# 1. Key kopieren
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
proc = subprocess.run(ssh_copy_cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
await manager.broadcast(f"❌ Fehler beim Key-Copy für {ip}: {proc.stderr}")
return
# 2. System-Infos abrufen (Inventur)
await manager.broadcast(f"🔍 Inventur auf {ip} wird durchgeführt...")
# Befehlskette: Arch, OS-Name, Docker-Check
inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')"
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} \"{inspect_cmd}\""
try:
output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n')
arch = output[0] if len(output) > 0 else "Unbekannt"
# Mapping für Architektur
if "aarch64" in arch.lower(): arch = "arm64"
elif "x86_64" in arch.lower(): arch = "x86-64"
os_name = output[1].replace('"', '') if len(output) > 1 else "Linux"
docker_val = int(output[2]) if len(output) > 2 else 0
status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)"
# Datenbank Update
conn = get_db()
conn.execute('''
UPDATE nodes SET os = ?, arch = ?, docker_installed = ?, status = ?
WHERE ip = ?
''', (os_name, arch, docker_val, status, ip))
conn.commit()
conn.close()
await manager.broadcast(f"✅ Node {ip} konfiguriert ({os_name}, {arch}).")
except Exception as e:
await manager.broadcast(f"⚠️ Inventur auf {ip} unvollständig: {e}")
# --- ROUTES ---
@app.get("/")
async def index(request: Request):
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
@app.get("/api/node/{node_id}")
async def get_node(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
conn.close()
return dict(node) if node else {}
@app.post("/add_node")
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
conn = get_db()
try:
# Speichere Initialdaten inkl. Sudo-Passwort
conn.execute('''
INSERT INTO nodes (name, ip, user, sudo_password, status)
VALUES (?, ?, ?, ?, ?)
''', (name, ip, user, password, "Kopplung..."))
conn.commit()
background_tasks.add_task(bootstrap_node, ip, user, password)
except sqlite3.IntegrityError: pass
finally: conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/edit_node/{node_id}")
async def edit_node(node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...)):
conn = get_db()
conn.execute('UPDATE nodes SET name=?, ip=?, user=? WHERE id=?', (name, ip, user, node_id))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/remove_node/{node_id}")
async def remove_node(node_id: int):
conn = get_db()
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
@app.get("/refresh_status/{node_id}")
async def refresh_status(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
if not node: return {"status": "Offline"}
inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')"
ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 {node['user']}@{node['ip']} \"{inspect_cmd}\""
try:
output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n')
arch = output[0]; os_name = output[1].replace('"', ''); docker_val = int(output[2])
if "aarch64" in arch.lower(): arch = "arm64"
elif "x86_64" in arch.lower(): arch = "x86-64"
new_status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)"
conn.execute('UPDATE nodes SET status=?, os=?, arch=?, docker_installed=? WHERE id=?',
(new_status, os_name, arch, docker_val, node_id))
conn.commit()
result = {"status": new_status, "os": os_name, "arch": arch, "docker": docker_val}
except:
new_status = "Offline"
conn.execute('UPDATE nodes SET status=? WHERE id=?', (new_status, node_id))
conn.commit()
result = {"status": new_status, "os": node['os'], "arch": node['arch'], "docker": node['docker_installed']}
conn.close()
return result
# --- WebSockets Terminal / Chat / Logs (Integration wie gehabt) ---
@app.websocket("/ws/install_logs")
async def log_websocket(websocket: WebSocket):
await manager.connect(websocket)
try:
while True: await websocket.receive_text()
except WebSocketDisconnect: manager.disconnect(websocket)
@app.websocket("/ws/terminal/{ip}")
async def terminal_websocket(websocket: WebSocket, ip: str):
await websocket.accept()
conn = get_db(); node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone(); conn.close()
if not node: await websocket.close(); return
master_fd, slave_fd = pty.openpty()
proc = await asyncio.create_subprocess_exec("ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd)
async def pty_to_ws():
fl = fcntl.fcntl(master_fd, fcntl.F_GETFL); fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
while True:
await asyncio.sleep(0.01)
try:
data = os.read(master_fd, 1024).decode(errors='ignore')
if data: await websocket.send_text(data)
except BlockingIOError: continue
except: pass
async def ws_to_pty():
try:
while True:
data = await websocket.receive_text(); os.write(master_fd, data.encode())
except: pass
try: await asyncio.gather(pty_to_ws(), ws_to_pty())
finally:
if proc.returncode is None: proc.terminate()
os.close(master_fd); os.close(slave_fd)
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
user_msg = await websocket.receive_text()
ai_response = await get_ai_response(user_msg, get_system_prompt())
commands = re.findall(r'(.*?)', ai_response, re.I | re.S)
clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip()
if clean_msg: await websocket.send_text(clean_msg)
if commands:
tasks = []
for target, cmd in commands:
conn = get_db(); n = conn.execute('SELECT * FROM nodes WHERE ip=? OR name=?', (target.strip(), target.strip())).fetchone(); conn.close()
if n: tasks.append(run_remote_task(n['ip'], n['user'], cmd.strip()))
if tasks:
await websocket.send_text("ℹ️ *Führe Befehle aus...*")
await asyncio.gather(*tasks)
summary = await get_ai_response("Zusammenfassung der Ergebnisse?", get_system_prompt())
await websocket.send_text(f"--- Info ---\n{summary}")
except: pass
async def run_remote_task(ip, user, cmd):
await manager.broadcast(f"🚀 Task: {cmd} auf {ip}")
proc = await asyncio.create_subprocess_shell(f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
full_output = ""
while True:
line = await proc.stdout.readline()
if not line: break
out = line.decode('utf-8', errors='ignore').strip()
if out: await manager.broadcast(f"🛠️ {out}"); full_output += out + "\n"
await proc.wait()
chat_history.append({"role": "user", "content": f"[SYSTEM] Befehl '{cmd}' auf {ip} fertig:\n{full_output or 'Kein Output'}"})
# --- Settings API ---
@app.get("/api/settings")
async def get_settings():
return {
"provider": AI_PROVIDER,
"google_model": GOOGLE_MODEL,
"openai_model": OPENAI_MODEL,
"ollama_model": OLLAMA_MODEL,
"ollama_base_url": OLLAMA_BASE_URL # URL ans Frontend schicken
}
@app.post("/api/settings")
async def update_settings(request: Request):
# WICHTIG: OLLAMA_BASE_URL als global deklarieren
global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL
data = await request.json()
provider = data.get("provider")
model = data.get("model")
ollama_url = data.get("ollama_base_url") # URL vom Frontend empfangen
if provider:
AI_PROVIDER = provider
set_key(ENV_FILE, "AI_PROVIDER", provider)
if provider == "google" and model:
GOOGLE_MODEL = model
set_key(ENV_FILE, "GOOGLE_MODEL", model)
elif provider == "openai" and model:
OPENAI_MODEL = model
set_key(ENV_FILE, "OPENAI_MODEL", model)
elif provider == "ollama" and model:
OLLAMA_MODEL = model
set_key(ENV_FILE, "OLLAMA_MODEL", model)
# Wenn eine Ollama-URL mitgeschickt wurde, speichern wir sie
if ollama_url:
OLLAMA_BASE_URL = ollama_url
set_key(ENV_FILE, "OLLAMA_BASE_URL", ollama_url)
return {"status": "success"}
@app.get("/api/models")
async def get_models(provider: str, url: str = None):
try:
models = []
if provider == "ollama" and url:
# Das Backend hat keine CORS-Probleme und fragt Ollama direkt
clean_url = url.replace("/v1", "").rstrip("/")
async with httpx.AsyncClient() as client:
response = await client.get(f"{clean_url}/api/tags", timeout=5.0)
data = response.json()
models = [m["name"] for m in data.get("models", [])]
elif provider == "openai":
if not OPENAI_API_KEY or "hier" in OPENAI_API_KEY: return {"models": []}
# Hier greift das Backend auf deinen OpenAI API-Key zu
import openai
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
response = await client.models.list()
# Nur GPT-Modelle filtern, um die Liste sauber zu halten
models = [m.id for m in response.data if "gpt" in m.id or "o1" in m.id]
models.sort()
elif provider == "google":
if not GOOGLE_API_KEY:
return {"models": ["API-Key fehlt"]}
client = genai.Client(api_key=GOOGLE_API_KEY)
models = []
# Im neuen SDK (google-genai) heißt das Feld 'supported_actions'
for m in client.models.list():
if 'generateContent' in m.supported_actions:
# Wir nehmen den Namen und entfernen das 'models/' Präfix
model_name = m.name.replace("models/", "")
models.append(model_name)
models.sort()
return {"models": models}
except Exception as e:
print(f"Fehler beim Abrufen der Modelle für {provider}: {str(e)}")
return {"models": []} # Gibt eine leere Liste zurück -> Frontend nutzt Fallback
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)