From 6a5e13571fe78b3c3a2494034b9f687094134920 Mon Sep 17 00:00:00 2001 From: "info@pi-farm.de" Date: Tue, 3 Mar 2026 23:20:27 +0000 Subject: [PATCH] main.py aktualisiert --- main.py | 111 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/main.py b/main.py index d713ce0..dda5af8 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ import os +import pty +import fcntl import subprocess import sqlite3 import asyncio @@ -12,10 +14,9 @@ templates = Jinja2Templates(directory="templates") SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") DB_PATH = "cluster.db" -# --- DATENBANK INITIALISIERUNG (WICHTIG!) --- +# --- DATENBANK INITIALISIERUNG --- def init_db(): conn = sqlite3.connect(DB_PATH) - # Wir stellen sicher, dass die Tabelle existiert conn.execute(''' CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -28,7 +29,6 @@ def init_db(): conn.commit() conn.close() -# Diese Funktion wird JETZT beim Start aufgerufen init_db() def get_db(): @@ -36,7 +36,7 @@ def get_db(): conn.row_factory = sqlite3.Row return conn -# --- WebSocket Manager --- +# --- WebSocket Manager für Logs & Chat --- class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] @@ -44,22 +44,21 @@ class ConnectionManager: await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): - self.active_connections.remove(websocket) + if websocket in self.active_connections: + self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: - await connection.send_text(message) + try: + await connection.send_text(message) + except: + pass manager = ConnectionManager() -# --- DB & SSH Helper --- -def get_db(): - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - return conn - -# Nur SSH-Key kopieren, nichts installieren +# --- SSH Handshake (Nur Key kopieren) --- async def bootstrap_ssh_only(ip, user, password): await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...") + # Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) @@ -100,15 +99,64 @@ async def remove_node(node_id: int): conn.close() return RedirectResponse(url="/", status_code=303) -async def check_docker_installed(ip, user): - # Prüft via SSH, ob der Befehl 'docker' existiert - cmd = "command -v docker >/dev/null 2>&1 && echo 'yes' || echo 'no'" - ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" - process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, text=True) - output = process.stdout.read().strip() - return output == "yes" - -# --- Chat & KI Logik --- +# --- WebSockets --- + +@app.websocket("/ws/install_logs") +async def log_websocket(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + manager.disconnect(websocket) + +@app.websocket("/ws/terminal/{ip}") +async def terminal_websocket(websocket: WebSocket, ip: str): + await websocket.accept() + conn = get_db() + node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone() + conn.close() + + if not node: + await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n") + await websocket.close() + return + + # Pseudo-Terminal für interaktive SSH-Session + master_fd, slave_fd = pty.openpty() + proc = await asyncio.create_subprocess_exec( + "ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", + stdin=slave_fd, stdout=slave_fd, stderr=slave_fd + ) + + async def pty_to_ws(): + # Setzt den Master-FD auf non-blocking + fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) + fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + try: + while True: + await asyncio.sleep(0.01) + try: + data = os.read(master_fd, 1024).decode(errors='ignore') + if data: + await websocket.send_text(data) + except BlockingIOError: + continue + except Exception: pass + + async def ws_to_pty(): + try: + while True: + data = await websocket.receive_text() + os.write(master_fd, data.encode()) + except Exception: pass + + try: + await asyncio.gather(pty_to_ws(), ws_to_pty()) + finally: + if proc.returncode is None: proc.terminate() + os.close(master_fd) + os.close(slave_fd) @app.websocket("/ws/chat") async def chat_endpoint(websocket: WebSocket): @@ -117,30 +165,29 @@ async def chat_endpoint(websocket: WebSocket): user_msg = await websocket.receive_text() user_msg_lower = user_msg.lower() - # Logik: "Installiere Docker auf [Name]" if "installiere docker" in user_msg_lower: conn = get_db() nodes = conn.execute('SELECT * FROM nodes').fetchall() conn.close() - target = next((n for n in nodes if n['name'].lower() in user_msg_lower or n['ip'] in user_msg_lower), None) if target: - await websocket.send_text(f"🤖 Verstanden. Ich installiere Docker auf {target['name']} ({target['ip']})...") - # Hier rufen wir die Installation via SSH-Key auf + await websocket.send_text(f"🤖 OK. Starte Docker-Installation auf {target['name']}...") cmd = "curl -sSL https://get.docker.com | sh && sudo usermod -aG docker " + target['user'] - # Wir schicken das Ergebnis an die Log-Konsole asyncio.create_task(run_remote_task(target['ip'], target['user'], cmd, "Docker Installation")) else: - await websocket.send_text("🤖 Ich konnte den Node nicht finden. Bitte nenne den Namen korrekt.") + await websocket.send_text("🤖 Node nicht gefunden. Welchen meinst du?") else: - await websocket.send_text(f"🤖 Empfangen: {user_msg}. Soll ich Docker oder Ollama auf einem Node installieren?") + await websocket.send_text(f"🤖 Empfangen: {user_msg}. Soll ich etwas installieren?") async def run_remote_task(ip, user, cmd, task_name): - await manager.broadcast(f"🚀 KI-Task gestartet: {task_name} auf {ip}") - # Nutzung von SSH-Key (kein Passwort nötig) + await manager.broadcast(f"🚀 KI-Task: {task_name} auf {ip}") ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: await manager.broadcast(f"🛠️ {line.strip()[:80]}") - await manager.broadcast(f"✅ {task_name} auf {ip} abgeschlossen.") \ No newline at end of file + await manager.broadcast(f"✅ {task_name} fertig.") + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file