main.py aktualisiert

This commit is contained in:
2026-03-03 22:34:25 +00:00
parent 4f02d4428c
commit 75b5fdeca4

174
main.py
View File

@@ -1,144 +1,126 @@
import os
import subprocess
import pty
import json
import sqlite3
import paramiko
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse
import asyncio
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
app = FastAPI(title="Pi-Orchestrator Master")
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Pfade & Konstanten
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DB_PATH = "cluster.db"
# --- Datenbank Setup ---
# --- WebSocket Manager für Live-Logs ---
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
# --- Datenbank & SSH Initialisierung ---
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS nodes
(id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT)''')
conn.execute('CREATE TABLE IF NOT EXISTS nodes (id INTEGER PRIMARY KEY, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT)')
conn.commit()
conn.close()
init_db()
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_ssh_key():
if not os.path.exists(SSH_KEY):
subprocess.run(["ssh-keygen", "-t", "rsa", "-N", "", "-f", SSH_KEY], check=True)
# --- SSH & Command Logic ---
init_db()
ensure_ssh_key()
def run_ssh_cmd(ip, user, cmd):
"""Nutzt den SSH-Key (kein Passwort nötig nach Deployment)"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(ip, username=user, key_filename=SSH_KEY, timeout=10)
stdin, stdout, stderr = ssh.exec_command(f"sudo -n {cmd}")
output = stdout.read().decode()
ssh.close()
return output
except Exception as e:
return f"Fehler auf {ip}: {str(e)}"
# --- Hintergrund-Task: Key Deployment & Bootstrap ---
async def deploy_and_bootstrap(ip, user, password):
await manager.broadcast(f"Iniziere Setup für {ip}...")
# --- Routes ---
# 1. SSH-Key kopieren
# -o StrictHostKeyChecking=no verhindert die "Are you sure you want to continue connecting" Abfrage
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
await manager.broadcast(f"🔑 SSH: {line.strip()}")
# Datenbank-Status aktualisieren
update_node_status(ip, "Key hinterlegt")
await manager.broadcast(f"✅ SSH-Key erfolgreich auf {ip} kopiert.")
# 2. Abhängigkeiten installieren (Docker & Python)
await manager.broadcast(f"📦 Installiere Docker und Abhängigkeiten auf {ip}...")
bootstrap_cmd = f"ssh {user}@{ip} 'sudo apt-get update && sudo apt-get install -y python3-pip && curl -sSL https://get.docker.com | sh'"
process = subprocess.Popen(bootstrap_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
# Wir filtern hier ein wenig, um die Konsole nicht zu fluten
if "Progress" not in line:
await manager.broadcast(f"🛠️ {line.strip()[:80]}")
update_node_status(ip, "Online")
await manager.broadcast(f"🏁 Node {ip} ist nun vollständig einsatzbereit!")
def update_node_status(ip, status):
conn = sqlite3.connect(DB_PATH)
conn.execute('UPDATE nodes SET status = ? WHERE ip = ?', (status, ip))
conn.commit()
conn.close()
# --- Routen ---
@app.get("/")
async def index(request: Request):
conn = get_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
@app.post("/add_node")
async def add_node(name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...), background_tasks: BackgroundTasks = None):
# 1. In Datenbank speichern
conn = get_db()
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
conn = sqlite3.connect(DB_PATH)
try:
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)',
(name, ip, user, "Setup läuft..."))
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Setup läuft..."))
conn.commit()
# Starte den SSH-Prozess im Hintergrund
background_tasks.add_task(deploy_and_bootstrap, ip, user, password)
except sqlite3.IntegrityError:
return {"error": "Node existiert bereits"}
pass
finally:
conn.close()
# 2. Key Deployment & Installation im Hintergrund starten (wird via WebSocket geloggt)
# Da wir hier noch kein WebSocket-Objekt haben, triggern wir den Prozess
# und der User sieht den Status im UI.
return RedirectResponse(url="/", status_code=303)
# --- WebSockets ---
@app.websocket("/ws/install_logs")
async def install_logs_endpoint(websocket: WebSocket):
await websocket.accept()
# Hier können wir später spezifische Setup-Prozesse streamen
await websocket.send_text("System-Log: Bereit für Installationen...")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text() # Hält die Verbindung offen
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
user_msg = await websocket.receive_text()
user_msg_lower = user_msg.lower()
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
if "installiere docker" in user_msg_lower or "installiere ollama" in user_msg_lower:
target_node = None
for node in nodes:
if node['name'].lower() in user_msg_lower or node['ip'] in user_msg_lower:
target_node = node
break
if target_node:
await websocket.send_text(f"🤖 Starte Installation auf {target_node['name']}...")
# Beispiel: Docker Installation via SSH-Key
cmd = "curl -sSL https://get.docker.com | sh"
result = run_ssh_cmd(target_node['ip'], target_node['user'], cmd)
await websocket.send_text(f"✅ Fertig auf {target_node['name']}: {result[:50]}...")
else:
await websocket.send_text("🤖 Welchen Pi meinst du? Ich kenne: " + ", ".join([n['name'] for n in nodes]))
else:
await websocket.send_text(f"🤖 Ich habe '{user_msg}' erhalten. Wie kann ich helfen?")
# --- Terminal (Konzept) ---
@app.websocket("/ws/terminal/{ip}")
async def terminal_websocket(websocket: WebSocket, ip: str):
await websocket.accept()
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone()
conn.close()
if not node:
await websocket.send_text("Node nicht gefunden.")
await websocket.close()
return
# Startet SSH Prozess für das Terminal
cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"{node['user']}@{ip}"]
(master_fd, slave_fd) = pty.openpty()
process = subprocess.Popen(cmd, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, text=True)
# Hinweis: Für ein voll funktionsfähiges xterm.js Terminal müssten hier
# master_fd und websocket in einer Schleife (select) verbunden werden.
await websocket.send_text(f"--- Shell auf {node['name']} geöffnet ---")
msg = await websocket.receive_text()
await websocket.send_text(f"KI: Ich habe '{msg}' empfangen. Aktuell bereite ich die Nodes vor.")
if __name__ == "__main__":
import uvicorn
ensure_ssh_key()
uvicorn.run(app, host="0.0.0.0", port=8000)