main.py aktualisiert
This commit is contained in:
122
main.py
122
main.py
@@ -12,115 +12,103 @@ templates = Jinja2Templates(directory="templates")
|
||||
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
|
||||
DB_PATH = "cluster.db"
|
||||
|
||||
# --- WebSocket Manager für Live-Logs ---
|
||||
# --- WebSocket Manager ---
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: list[WebSocket] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
async def broadcast(self, message: str):
|
||||
for connection in self.active_connections:
|
||||
await connection.send_text(message)
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
# --- Datenbank & SSH Initialisierung ---
|
||||
def init_db():
|
||||
# --- DB & SSH Helper ---
|
||||
def get_db():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.execute('CREATE TABLE IF NOT EXISTS nodes (id INTEGER PRIMARY KEY, name TEXT, ip TEXT UNIQUE, user TEXT, status TEXT)')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def ensure_ssh_key():
|
||||
if not os.path.exists(SSH_KEY):
|
||||
subprocess.run(["ssh-keygen", "-t", "rsa", "-N", "", "-f", SSH_KEY], check=True)
|
||||
|
||||
init_db()
|
||||
ensure_ssh_key()
|
||||
|
||||
# --- Hintergrund-Task: Key Deployment & Bootstrap ---
|
||||
async def deploy_and_bootstrap(ip, user, password):
|
||||
await manager.broadcast(f"Iniziere Setup für {ip}...")
|
||||
|
||||
# 1. SSH-Key kopieren
|
||||
# -o StrictHostKeyChecking=no verhindert die "Are you sure you want to continue connecting" Abfrage
|
||||
# Nur SSH-Key kopieren, nichts installieren
|
||||
async def bootstrap_ssh_only(ip, user, password):
|
||||
await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...")
|
||||
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
|
||||
|
||||
process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
||||
for line in process.stdout:
|
||||
await manager.broadcast(f"🔑 SSH: {line.strip()}")
|
||||
await manager.broadcast(f"SSH: {line.strip()}")
|
||||
|
||||
# Datenbank-Status aktualisieren
|
||||
update_node_status(ip, "Key hinterlegt")
|
||||
await manager.broadcast(f"✅ SSH-Key erfolgreich auf {ip} kopiert.")
|
||||
|
||||
# 2. Abhängigkeiten installieren (Docker & Python)
|
||||
await manager.broadcast(f"📦 Installiere Docker und Abhängigkeiten auf {ip}...")
|
||||
bootstrap_cmd = f"ssh {user}@{ip} 'sudo apt-get update && sudo apt-get install -y python3-pip && curl -sSL https://get.docker.com | sh'"
|
||||
|
||||
process = subprocess.Popen(bootstrap_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
||||
for line in process.stdout:
|
||||
# Wir filtern hier ein wenig, um die Konsole nicht zu fluten
|
||||
if "Progress" not in line:
|
||||
await manager.broadcast(f"🛠️ {line.strip()[:80]}")
|
||||
|
||||
update_node_status(ip, "Online")
|
||||
await manager.broadcast(f"🏁 Node {ip} ist nun vollständig einsatzbereit!")
|
||||
|
||||
def update_node_status(ip, status):
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.execute('UPDATE nodes SET status = ? WHERE ip = ?', (status, ip))
|
||||
conn = get_db()
|
||||
conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.")
|
||||
|
||||
# --- Routen ---
|
||||
|
||||
@app.get("/")
|
||||
async def index(request: Request):
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn = get_db()
|
||||
nodes = conn.execute('SELECT * FROM nodes').fetchall()
|
||||
conn.close()
|
||||
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
|
||||
|
||||
@app.post("/add_node")
|
||||
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn = get_db()
|
||||
try:
|
||||
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Setup läuft..."))
|
||||
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung..."))
|
||||
conn.commit()
|
||||
background_tasks.add_task(bootstrap_ssh_only, ip, user, password)
|
||||
except sqlite3.IntegrityError: pass
|
||||
finally: conn.close()
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
@app.post("/remove_node/{node_id}")
|
||||
async def remove_node(node_id: int):
|
||||
conn = get_db()
|
||||
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
|
||||
conn.commit()
|
||||
# Starte den SSH-Prozess im Hintergrund
|
||||
background_tasks.add_task(deploy_and_bootstrap, ip, user, password)
|
||||
except sqlite3.IntegrityError:
|
||||
pass
|
||||
finally:
|
||||
conn.close()
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
# --- WebSockets ---
|
||||
|
||||
@app.websocket("/ws/install_logs")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await manager.connect(websocket)
|
||||
try:
|
||||
while True:
|
||||
await websocket.receive_text() # Hält die Verbindung offen
|
||||
except WebSocketDisconnect:
|
||||
manager.disconnect(websocket)
|
||||
# --- Chat & KI Logik ---
|
||||
|
||||
@app.websocket("/ws/chat")
|
||||
async def chat_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
while True:
|
||||
msg = await websocket.receive_text()
|
||||
await websocket.send_text(f"KI: Ich habe '{msg}' empfangen. Aktuell bereite ich die Nodes vor.")
|
||||
user_msg = await websocket.receive_text()
|
||||
user_msg_lower = user_msg.lower()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
# Logik: "Installiere Docker auf [Name]"
|
||||
if "installiere docker" in user_msg_lower:
|
||||
conn = get_db()
|
||||
nodes = conn.execute('SELECT * FROM nodes').fetchall()
|
||||
conn.close()
|
||||
|
||||
target = next((n for n in nodes if n['name'].lower() in user_msg_lower or n['ip'] in user_msg_lower), None)
|
||||
|
||||
if target:
|
||||
await websocket.send_text(f"🤖 Verstanden. Ich installiere Docker auf {target['name']} ({target['ip']})...")
|
||||
# Hier rufen wir die Installation via SSH-Key auf
|
||||
cmd = "curl -sSL https://get.docker.com | sh && sudo usermod -aG docker " + target['user']
|
||||
# Wir schicken das Ergebnis an die Log-Konsole
|
||||
asyncio.create_task(run_remote_task(target['ip'], target['user'], cmd, "Docker Installation"))
|
||||
else:
|
||||
await websocket.send_text("🤖 Ich konnte den Node nicht finden. Bitte nenne den Namen korrekt.")
|
||||
else:
|
||||
await websocket.send_text(f"🤖 Empfangen: {user_msg}. Soll ich Docker oder Ollama auf einem Node installieren?")
|
||||
|
||||
async def run_remote_task(ip, user, cmd, task_name):
|
||||
await manager.broadcast(f"🚀 KI-Task gestartet: {task_name} auf {ip}")
|
||||
# Nutzung von SSH-Key (kein Passwort nötig)
|
||||
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'"
|
||||
process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
||||
for line in process.stdout:
|
||||
await manager.broadcast(f"🛠️ {line.strip()[:80]}")
|
||||
await manager.broadcast(f"✅ {task_name} auf {ip} abgeschlossen.")
|
||||
Reference in New Issue
Block a user