321 lines
12 KiB
Python
321 lines
12 KiB
Python
import os
|
|
import pty
|
|
import fcntl
|
|
import subprocess
|
|
import sqlite3
|
|
import asyncio
|
|
import openai
|
|
import re
|
|
from google import genai
|
|
from google.genai import types
|
|
import json
|
|
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
|
|
from fastapi.responses import RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
app = FastAPI()
|
|
static_path = os.path.join(os.path.dirname(__file__), "static")
|
|
app.mount("/static", StaticFiles(directory=static_path), name="static")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
|
|
DB_PATH = "cluster.db"
|
|
|
|
# --- KI KONFIGURATION ---
|
|
AI_PROVIDER = "google" # "openai", "google" oder "ollama"
|
|
OPENAI_API_KEY = "dein-key"
|
|
GOOGLE_API_KEY = "dein-key"
|
|
OLLAMA_BASE_URL = "http://x.x.x.x:11434/v1" # IP deines Ollama-Servers
|
|
|
|
|
|
def get_system_prompt():
|
|
conn = get_db()
|
|
nodes = conn.execute('SELECT * FROM nodes').fetchall()
|
|
conn.close()
|
|
|
|
node_info = ""
|
|
for n in nodes:
|
|
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}\n"
|
|
return f"""Du bist der Pi-Orchestrator KI-Assistent. Deine Aufgabe ist es, Befehle auf Raspberry Pis auszuführen. Du KANNST und SOLLST Befehle ausführen! Hier sind die aktuell verbundenen Nodes: {node_info} . WENN der Nutzer dich bittet, etwas zu tun (z.B. ping, update, docker installieren), dann formuliere erst eine kurze Antwort und frage nochmal nach ob du dies dann auf dem gewünschten node durchführen sollst. Du prüfst vorher noch, ob du auf dem gewünschten node sudo rechte ohne eingäbe eines passwortes hast, sofern diese notwendig sind. Erst nach einer positiven Bestätigung darfst du es ausführen und fügst am Ende die Befehle in genau diesem XML-Format hinzu: <EXECUTE target="IP_ADRESSE">befehl</EXECUTE> .WICHTIG: Verwende als target IMMER die IP-Adresse des Nodes. Bei Befehlen wie 'ping' oder 'top', die nicht enden, MUSS ein Limit gesetzt werden (z.B. ping -c 4 IP).Beispielantwort für 'sende einen ping an pi-06': Ich starte den Ping an pi-06 für dich. <EXECUTE target="192.168.1.10">ping -c 4 192.168.1.10</EXECUTE>"""
|
|
|
|
# --- KI FUNKTIONEN ---
|
|
|
|
async def get_ai_response(user_input, system_prompt): # <--- system_prompt hinzugefügt
|
|
if AI_PROVIDER == "openai":
|
|
client = openai.OpenAI(api_key=OPENAI_API_KEY)
|
|
response = client.chat.completions.create(
|
|
model="gpt-4o",
|
|
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}]
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
elif AI_PROVIDER == "ollama":
|
|
# Ollama nutzt das OpenAI-Format, braucht aber keinen Key
|
|
client = openai.OpenAI(base_url=OLLAMA_BASE_URL, api_key="ollama", timeout=20.0)
|
|
response = client.chat.completions.create(
|
|
model="llama3", # Oder dein bevorzugtes Modell
|
|
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}]
|
|
)
|
|
return response.choices[0].message.content
|
|
|
|
elif AI_PROVIDER == "google":
|
|
# Initialisierung des neuen Clients
|
|
client = genai.Client(api_key=GOOGLE_API_KEY)
|
|
|
|
# Generierung mit dem neuen SDK
|
|
response = client.models.generate_content(
|
|
model='gemini-2.5-flash',
|
|
contents=user_input,
|
|
config=types.GenerateContentConfig(
|
|
system_instruction=system_prompt
|
|
)
|
|
)
|
|
return response.text
|
|
|
|
return "Fehler: Kein KI-Provider konfiguriert."
|
|
|
|
|
|
# --- DATENBANK INITIALISIERUNG ---
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT,
|
|
ip TEXT UNIQUE,
|
|
user TEXT,
|
|
status TEXT
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
init_db()
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
# --- WebSocket Manager für Logs & Chat ---
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: list[WebSocket] = []
|
|
async def connect(self, websocket: WebSocket):
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
def disconnect(self, websocket: WebSocket):
|
|
if websocket in self.active_connections:
|
|
self.active_connections.remove(websocket)
|
|
async def broadcast(self, message: str):
|
|
for connection in self.active_connections:
|
|
try:
|
|
await connection.send_text(message)
|
|
except:
|
|
pass
|
|
|
|
manager = ConnectionManager()
|
|
|
|
# --- SSH Handshake (Nur Key kopieren) ---
|
|
async def bootstrap_ssh_only(ip, user, password):
|
|
await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...")
|
|
# Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen
|
|
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
|
|
|
|
process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
for line in process.stdout:
|
|
await manager.broadcast(f"SSH: {line.strip()}")
|
|
|
|
conn = get_db()
|
|
conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,))
|
|
conn.commit()
|
|
conn.close()
|
|
await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.")
|
|
|
|
# --- Routen ---
|
|
|
|
@app.get("/")
|
|
async def index(request: Request):
|
|
conn = get_db()
|
|
nodes = conn.execute('SELECT * FROM nodes').fetchall()
|
|
conn.close()
|
|
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
|
|
|
|
@app.post("/add_node")
|
|
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
|
|
conn = get_db()
|
|
try:
|
|
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung..."))
|
|
conn.commit()
|
|
background_tasks.add_task(bootstrap_ssh_only, ip, user, password)
|
|
except sqlite3.IntegrityError: pass
|
|
finally: conn.close()
|
|
return RedirectResponse(url="/", status_code=303)
|
|
|
|
@app.post("/remove_node/{node_id}")
|
|
async def remove_node(node_id: int):
|
|
conn = get_db()
|
|
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
return RedirectResponse(url="/", status_code=303)
|
|
|
|
# --- WebSockets ---
|
|
|
|
@app.websocket("/ws/install_logs")
|
|
async def log_websocket(websocket: WebSocket):
|
|
await manager.connect(websocket)
|
|
try:
|
|
while True:
|
|
await websocket.receive_text()
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(websocket)
|
|
|
|
@app.websocket("/ws/terminal/{ip}")
|
|
async def terminal_websocket(websocket: WebSocket, ip: str):
|
|
await websocket.accept()
|
|
conn = get_db()
|
|
node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone()
|
|
conn.close()
|
|
|
|
if not node:
|
|
await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n")
|
|
await websocket.close()
|
|
return
|
|
|
|
# Pseudo-Terminal für interaktive SSH-Session
|
|
master_fd, slave_fd = pty.openpty()
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}",
|
|
stdin=slave_fd, stdout=slave_fd, stderr=slave_fd
|
|
)
|
|
|
|
async def pty_to_ws():
|
|
# Setzt den Master-FD auf non-blocking
|
|
fl = fcntl.fcntl(master_fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(0.01)
|
|
try:
|
|
data = os.read(master_fd, 1024).decode(errors='ignore')
|
|
if data:
|
|
await websocket.send_text(data)
|
|
except BlockingIOError:
|
|
continue
|
|
except Exception: pass
|
|
|
|
async def ws_to_pty():
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_text()
|
|
os.write(master_fd, data.encode())
|
|
except Exception: pass
|
|
|
|
try:
|
|
await asyncio.gather(pty_to_ws(), ws_to_pty())
|
|
finally:
|
|
if proc.returncode is None: proc.terminate()
|
|
os.close(master_fd)
|
|
os.close(slave_fd)
|
|
|
|
# --- WEBSOCKET CHAT UPDATE ---
|
|
|
|
@app.websocket("/ws/chat")
|
|
async def chat_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
try:
|
|
while True:
|
|
user_msg = await websocket.receive_text()
|
|
|
|
# 1. Dynamischen Prompt laden
|
|
sys_prompt = get_system_prompt()
|
|
|
|
# 2. KI fragen
|
|
ai_response = await get_ai_response(user_msg, sys_prompt)
|
|
|
|
# 3. XML-Tags auslesen (<EXECUTE target="IP">Befehl</EXECUTE>)
|
|
commands_to_run = re.findall(r'<EXECUTE target="(.*?)">(.*?)</EXECUTE>', ai_response, re.IGNORECASE | re.DOTALL)
|
|
|
|
# 4. XML-Tags für das UI ausblenden, damit der Chat sauber aussieht
|
|
clean_chat_msg = re.sub(r'<EXECUTE.*?>.*?</EXECUTE>', '', ai_response, flags=re.IGNORECASE | re.DOTALL).strip()
|
|
|
|
if clean_chat_msg:
|
|
await websocket.send_text(clean_chat_msg)
|
|
|
|
# 5. Gefundene Befehle an den Pi schicken!
|
|
for target_ip, cmd in commands_to_run:
|
|
target_ip = target_ip.strip()
|
|
cmd = cmd.strip()
|
|
|
|
conn = get_db()
|
|
node = conn.execute('SELECT * FROM nodes WHERE ip = ? OR name = ?', (target_ip, target_ip)).fetchone()
|
|
conn.close()
|
|
|
|
if node:
|
|
# Leite Aufgabe an die System Logs weiter
|
|
asyncio.create_task(run_remote_task(node['ip'], node['user'], cmd, "KI-Kommando"))
|
|
else:
|
|
await websocket.send_text(f"⚠️ Konnte Node {target_ip} in DB nicht finden.")
|
|
|
|
except Exception as e:
|
|
print(f"Chat Fehler: {e}")
|
|
|
|
async def run_remote_task(ip, user, cmd, task_name):
|
|
await manager.broadcast(f"🚀 KI-Task gestartet: {cmd} auf {ip}")
|
|
|
|
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'"
|
|
|
|
# Asynchroner Subprozess, damit das restliche Dashboard nicht einfriert
|
|
process = await asyncio.create_subprocess_shell(
|
|
ssh_cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT
|
|
)
|
|
|
|
# Lese die Logs Zeile für Zeile, während sie entstehen
|
|
while True:
|
|
line = await process.stdout.readline()
|
|
if not line:
|
|
break
|
|
decoded_line = line.decode('utf-8', errors='ignore').strip()
|
|
if decoded_line:
|
|
await manager.broadcast(f"🛠️ {decoded_line}")
|
|
|
|
await process.wait()
|
|
|
|
# Falls es ein Docker-Befehl war, Status updaten
|
|
if "docker" in cmd.lower():
|
|
update_node_status(ip, "Docker Aktiv")
|
|
await manager.broadcast(f"✨ {ip} ist nun 'Docker Aktiv'")
|
|
|
|
await manager.broadcast(f"✅ Befehl auf {ip} abgeschlossen.")
|
|
|
|
# --- Neuer Endpunkt: Manueller Refresh-Check ---
|
|
@app.get("/refresh_status/{node_id}")
|
|
async def refresh_status(node_id: int):
|
|
conn = get_db()
|
|
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
|
|
|
|
if node:
|
|
# Kurzer Check via SSH, ob Docker antwortet
|
|
check_cmd = "command -v docker >/dev/null 2>&1 && echo 'Docker Aktiv' || echo 'Bereit (Kein Docker)'"
|
|
ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=2 {node['user']}@{node['ip']} \"{check_cmd}\""
|
|
try:
|
|
# Wir führen den Befehl aus
|
|
new_status = subprocess.check_output(ssh_cmd, shell=True).decode().strip()
|
|
except Exception:
|
|
new_status = "Offline/Fehler"
|
|
|
|
conn.execute('UPDATE nodes SET status = ? WHERE id = ?', (new_status, node_id))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"status": new_status} # Wir senden nur den Status zurück
|
|
|
|
conn.close()
|
|
return {"status": "Unbekannt"}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |