From 3d5504b5e8c0fc7402918aa7656749d1f8cc0a7a Mon Sep 17 00:00:00 2001 From: "info@pi-farm.de" Date: Fri, 6 Mar 2026 15:52:07 +0000 Subject: [PATCH] main.py aktualisiert --- main.py | 744 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 413 insertions(+), 331 deletions(-) diff --git a/main.py b/main.py index 8d6ffa8..26b5441 100644 --- a/main.py +++ b/main.py @@ -1,353 +1,435 @@ -import os -import pty -import fcntl -import subprocess -import sqlite3 -import asyncio -import openai -import re -import httpx -from google import genai -from google.genai import types -import json -from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect -from fastapi.responses import RedirectResponse -from fastapi.templating import Jinja2Templates -from fastapi.staticfiles import StaticFiles -from dotenv import load_dotenv, set_key - -# Lade Umgebungsvariablen -load_dotenv() - -app = FastAPI() -static_path = os.path.join(os.path.dirname(__file__), "static") -app.mount("/static", StaticFiles(directory=static_path), name="static") -templates = Jinja2Templates(directory="templates") - -SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") -DB_PATH = "cluster.db" -chat_history = [] -PROMPT_FILE = "system_prompt.txt" -ENV_FILE = os.path.join(os.path.dirname(__file__), ".env") - -# KI KONFIGURATION -AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower() -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") -GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") -OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1") - -GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.0-flash") -OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o") -OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3") - -# --- DATENBANK INITIALISIERUNG (ERWEITERT) --- -def init_db(): - conn = sqlite3.connect(DB_PATH) - # Spalten erweitert um sudo_password, os, arch, docker_installed - conn.execute(''' - CREATE TABLE IF NOT EXISTS nodes ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, - ip TEXT UNIQUE, - user TEXT, - sudo_password TEXT, - os TEXT DEFAULT 'Unbekannt', - arch TEXT DEFAULT 'Unbekannt', - docker_installed INTEGER DEFAULT 0, - status TEXT - ) - ''') - conn.commit() - conn.close() - -init_db() - -def get_db(): - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - return conn - -def get_system_prompt(): - conn = get_db() - nodes = conn.execute('SELECT * FROM nodes').fetchall() - conn.close() + + + + + Pi-Orchestrator AI Dashboard + - node_info = "" - for n in nodes: - docker_str = "Ja" if n['docker_installed'] else "Nein" - node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n" + + + + + + + - if os.path.exists(PROMPT_FILE): - with open(PROMPT_FILE, "r", encoding="utf-8") as f: - template = f.read() - else: - template = "Du bist ein Cluster-Orchestrator. Nodes:\n{node_info}\nBefehle via cmd" - print(f"⚠️ Warnung: {PROMPT_FILE} fehlt.") - - return template.replace("{node_info}", node_info) - -# --- KI LOGIK (UNVERÄNDERT) --- -async def get_ai_response(user_input, system_prompt): - global chat_history - chat_history.append({"role": "user", "content": user_input}) - chat_history = chat_history[-30:] - ai_msg = "" - try: - if AI_PROVIDER in ["openai", "ollama"]: - url = OLLAMA_BASE_URL if AI_PROVIDER == "ollama" else None - if url and not url.endswith('/v1'): url = url.rstrip('/') + '/v1' - key = "ollama" if AI_PROVIDER == "ollama" else OPENAI_API_KEY - model_to_use = OLLAMA_MODEL if AI_PROVIDER == "ollama" else OPENAI_MODEL - client = openai.OpenAI(base_url=url, api_key=key) - response = client.chat.completions.create(model=model_to_use, messages=[{"role": "system", "content": system_prompt}] + chat_history) - ai_msg = response.choices[0].message.content - elif AI_PROVIDER == "google": - client = genai.Client(api_key=GOOGLE_API_KEY) - google_history = [types.Content(role="user" if m["role"] == "user" else "model", parts=[types.Part.from_text(text=msg["content"])]) for msg in chat_history[:-1]] - chat = client.chats.create(model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history) - response = chat.send_message(user_input) - ai_msg = response.text - except Exception as e: - ai_msg = f"KI Fehler: {e}" - chat_history.append({"role": "assistant", "content": ai_msg}) - return ai_msg - -# --- WebSocket Manager --- -class ConnectionManager: - def __init__(self): self.active_connections = [] - async def connect(self, ws: WebSocket): await ws.accept(); self.active_connections.append(ws) - def disconnect(self, ws: WebSocket): self.active_connections.remove(ws) - async def broadcast(self, msg: str): - for c in self.active_connections: - try: await c.send_text(msg) - except: pass -manager = ConnectionManager() - -# --- ERWEITERTES NODE BOOTSTRAPPING (Inventur) --- -async def bootstrap_node(ip, user, password): - await manager.broadcast(f"🔑 SSH-Handshake für {ip}...") - # 1. Key kopieren - ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" - proc = subprocess.run(ssh_copy_cmd, shell=True, capture_output=True, text=True) - - if proc.returncode != 0: - await manager.broadcast(f"❌ Fehler beim Key-Copy für {ip}: {proc.stderr}") - return - - # 2. System-Infos abrufen (Inventur) - await manager.broadcast(f"🔍 Inventur auf {ip} wird durchgeführt...") - - # Befehlskette: Arch, OS-Name, Docker-Check - inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')" - ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} \"{inspect_cmd}\"" - - try: - output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') - arch = output[0] if len(output) > 0 else "Unbekannt" - # Mapping für Architektur - if "aarch64" in arch.lower(): arch = "arm64" - elif "x86_64" in arch.lower(): arch = "x86-64" + + + - try: - output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n') - arch = output[0]; os_name = output[1].replace('"', ''); docker_val = int(output[2]) - if "aarch64" in arch.lower(): arch = "arm64" - elif "x86_64" in arch.lower(): arch = "x86-64" - new_status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)" +
+
🤖 KI-Orchestrator
- conn.execute('UPDATE nodes SET status=?, os=?, arch=?, docker_installed=? WHERE id=?', - (new_status, os_name, arch, docker_val, node_id)) - conn.commit() - result = {"status": new_status, "os": os_name, "arch": arch, "docker": docker_val} - except: - new_status = "Offline" - conn.execute('UPDATE nodes SET status=? WHERE id=?', (new_status, node_id)) - conn.commit() - result = {"status": new_status, "os": node['os'], "arch": node['arch'], "docker": node['docker_installed']} - - conn.close() - return result +
+ -# --- WebSockets Terminal / Chat / Logs (Integration wie gehabt) --- -@app.websocket("/ws/install_logs") -async def log_websocket(websocket: WebSocket): - await manager.connect(websocket) - try: - while True: await websocket.receive_text() - except WebSocketDisconnect: manager.disconnect(websocket) +
-@app.websocket("/ws/terminal/{ip}") -async def terminal_websocket(websocket: WebSocket, ip: str): - await websocket.accept() - conn = get_db(); node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone(); conn.close() - if not node: await websocket.close(); return - master_fd, slave_fd = pty.openpty() - proc = await asyncio.create_subprocess_exec("ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd) - async def pty_to_ws(): - fl = fcntl.fcntl(master_fd, fcntl.F_GETFL); fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - try: - while True: - await asyncio.sleep(0.01) - try: - data = os.read(master_fd, 1024).decode(errors='ignore') - if data: await websocket.send_text(data) - except BlockingIOError: continue - except: pass - async def ws_to_pty(): - try: - while True: - data = await websocket.receive_text(); os.write(master_fd, data.encode()) - except: pass - try: await asyncio.gather(pty_to_ws(), ws_to_pty()) - finally: - if proc.returncode is None: proc.terminate() - os.close(master_fd); os.close(slave_fd) + + + + + + +
+ + +
+ + + +
+
-@app.websocket("/ws/chat") -async def chat_endpoint(websocket: WebSocket): - await websocket.accept() - try: - while True: - user_msg = await websocket.receive_text() - ai_response = await get_ai_response(user_msg, get_system_prompt()) - commands = re.findall(r'(.*?)', ai_response, re.I | re.S) - clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip() - if clean_msg: await websocket.send_text(clean_msg) - if commands: - tasks = [] - for target, cmd in commands: - conn = get_db(); n = conn.execute('SELECT * FROM nodes WHERE ip=? OR name=?', (target.strip(), target.strip())).fetchone(); conn.close() - if n: tasks.append(run_remote_task(n['ip'], n['user'], cmd.strip())) - if tasks: - await websocket.send_text("ℹ️ *Führe Befehle aus...*") - await asyncio.gather(*tasks) - summary = await get_ai_response("Zusammenfassung der Ergebnisse?", get_system_prompt()) - await websocket.send_text(f"--- Info ---\n{summary}") - except: pass +
+
+
+ {% for node in nodes %} +
+
+
{{ node.name }}
+
+ +
+
+ +
+ {{ node.ip }} + +
-async def run_remote_task(ip, user, cmd): - await manager.broadcast(f"🚀 Task: {cmd} auf {ip}") - proc = await asyncio.create_subprocess_shell(f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) - full_output = "" - while True: - line = await proc.stdout.readline() - if not line: break - out = line.decode('utf-8', errors='ignore').strip() - if out: await manager.broadcast(f"🛠️ {out}"); full_output += out + "\n" - await proc.wait() - chat_history.append({"role": "user", "content": f"[SYSTEM] Befehl '{cmd}' auf {ip} fertig:\n{full_output or 'Kein Output'}"}) +
+ {{ node.os }} + {{ node.arch }} +
-# --- Settings API --- -@app.get("/api/settings") -async def get_settings(): - return {"provider": AI_PROVIDER, "google_model": GOOGLE_MODEL, "openai_model": OPENAI_MODEL, "ollama_model": OLLAMA_MODEL, "ollama_base_url": OLLAMA_BASE_URL} +
+ + {{ node.status }} +
+ +
+ {% endfor %} +
+ +
-@app.post("/api/settings") -async def update_settings(request: Request): - global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL - data = await request.json() - provider = data.get("provider") - if provider: - AI_PROVIDER = provider; set_key(ENV_FILE, "AI_PROVIDER", provider) - if data.get("model"): - m = data.get("model") - if provider == "google": GOOGLE_MODEL = m; set_key(ENV_FILE, "GOOGLE_MODEL", m) - if provider == "openai": OPENAI_MODEL = m; set_key(ENV_FILE, "OPENAI_MODEL", m) - if provider == "ollama": OLLAMA_MODEL = m; set_key(ENV_FILE, "OLLAMA_MODEL", m) - if data.get("ollama_base_url"): - u = data.get("ollama_base_url"); OLLAMA_BASE_URL = u; set_key(ENV_FILE, "OLLAMA_BASE_URL", u) - return {"status": "success"} +
+
+
+
+
💬 KI Chat
+
+
+ + +
+
+
-@app.get("/api/models") -async def get_models(provider: str): - try: - if provider == "ollama": - url = OLLAMA_BASE_URL.replace("/v1", "").rstrip("/") - async with httpx.AsyncClient() as client: - r = await client.get(f"{url}/api/tags", timeout=5.0) - return {"models": [m["name"] for m in r.json().get("models", [])]} - elif provider == "openai": - client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY) - r = await client.models.list() - return {"models": sorted([m.id for m in r.data if "gpt" in m.id or "o1" in m.id])} - elif provider == "google": - client = genai.Client(api_key=GOOGLE_API_KEY) - return {"models": sorted([m.name.replace("models/", "") for m in client.models.list() if 'generateContent' in m.supported_actions])} - except: return {"models": []} +
+
+
📜 System Logs
+
Warte auf Aufgaben...
+
+
-if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file +
+
+
🖥️ Live Terminal
+
+
+
+
+
+
+ + + + + + \ No newline at end of file