Files
PiDoBot/main.py
2026-03-04 16:52:59 +00:00

287 lines
10 KiB
Python

import os
import pty
import fcntl
import subprocess
import sqlite3
import asyncio
import openai
import re
from google import genai
from google.genai import types
import json
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
app = FastAPI()
static_path = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_path), name="static")
templates = Jinja2Templates(directory="templates")
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DB_PATH = "cluster.db"
# --- KI KONFIGURATION ---
AI_PROVIDER = "ollama" # "openai", "google" oder "ollama"
OPENAI_API_KEY = "dein-key"
GOOGLE_API_KEY = "dein-key"
OLLAMA_BASE_URL = "http://x.x.x.x:11434/v1" # IP deines Ollama-Servers
def get_system_prompt():
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}\n"
return f"""Du bist der Pi-Orchestrator KI-Assistent. Deine Aufgabe ist es, Befehle auf Raspberry Pis auszuführen. Du KANNST und SOLLST Befehle ausführen! Hier sind die aktuell verbundenen Nodes: {node_info} . WENN der Nutzer dich bittet, etwas zu tun (z.B. ping, update, docker installieren), dann formuliere erst eine kurze Antwort und füge am Ende die Befehle in genau diesem XML-Format hinzu: <EXECUTE target="IP_ADRESSE">befehl</EXECUTE> .WICHTIG: Verwende als target IMMER die IP-Adresse des Nodes. Bei Befehlen wie 'ping' oder 'top', die nicht enden, MUSS ein Limit gesetzt werden (z.B. ping -c 4 IP).Beispielantwort für 'sende einen ping an pi-06': Ich starte den Ping an pi-06 für dich. <EXECUTE target="192.168.1.10">ping -c 4 192.168.1.10</EXECUTE>"""
# --- KI FUNKTIONEN ---
async def get_ai_response(user_input, system_prompt): # <--- system_prompt hinzugefügt
if AI_PROVIDER == "openai":
client = openai.OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}]
)
return response.choices[0].message.content
elif AI_PROVIDER == "ollama":
# Ollama nutzt das OpenAI-Format, braucht aber keinen Key
client = openai.OpenAI(base_url=OLLAMA_BASE_URL, api_key="ollama", timeout=20.0)
response = client.chat.completions.create(
model="llama3", # Oder dein bevorzugtes Modell
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}]
)
return response.choices[0].message.content
elif AI_PROVIDER == "google":
# Initialisierung des neuen Clients
client = genai.Client(api_key=GOOGLE_API_KEY)
# Generierung mit dem neuen SDK
response = client.models.generate_content(
model='gemini-2.5-flash',
contents=user_input,
config=types.GenerateContentConfig(
system_instruction=SYSTEM_PROMPT
)
)
return response.text
return "Fehler: Kein KI-Provider konfiguriert."
# --- DATENBANK INITIALISIERUNG ---
def init_db():
conn = sqlite3.connect(DB_PATH)
conn.execute('''
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
ip TEXT UNIQUE,
user TEXT,
status TEXT
)
''')
conn.commit()
conn.close()
init_db()
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# --- WebSocket Manager für Logs & Chat ---
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass
manager = ConnectionManager()
# --- SSH Handshake (Nur Key kopieren) ---
async def bootstrap_ssh_only(ip, user, password):
await manager.broadcast(f"🔑 Initialisiere SSH-Handshake für {ip}...")
# Nutzt sshpass um den Key einmalig mit Passwort zu hinterlegen
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
process = subprocess.Popen(ssh_copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
await manager.broadcast(f"SSH: {line.strip()}")
conn = get_db()
conn.execute('UPDATE nodes SET status = "Bereit (Kein Docker)" WHERE ip = ?', (ip,))
conn.commit()
conn.close()
await manager.broadcast(f"✅ Node {ip} ist verbunden. KI kann nun Befehle senden.")
# --- Routen ---
@app.get("/")
async def index(request: Request):
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
@app.post("/add_node")
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
conn = get_db()
try:
conn.execute('INSERT INTO nodes (name, ip, user, status) VALUES (?, ?, ?, ?)', (name, ip, user, "Kopplung..."))
conn.commit()
background_tasks.add_task(bootstrap_ssh_only, ip, user, password)
except sqlite3.IntegrityError: pass
finally: conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/remove_node/{node_id}")
async def remove_node(node_id: int):
conn = get_db()
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
# --- WebSockets ---
@app.websocket("/ws/install_logs")
async def log_websocket(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/terminal/{ip}")
async def terminal_websocket(websocket: WebSocket, ip: str):
await websocket.accept()
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone()
conn.close()
if not node:
await websocket.send_text("\r\nFehler: Node nicht gefunden.\r\n")
await websocket.close()
return
# Pseudo-Terminal für interaktive SSH-Session
master_fd, slave_fd = pty.openpty()
proc = await asyncio.create_subprocess_exec(
"ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}",
stdin=slave_fd, stdout=slave_fd, stderr=slave_fd
)
async def pty_to_ws():
# Setzt den Master-FD auf non-blocking
fl = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
while True:
await asyncio.sleep(0.01)
try:
data = os.read(master_fd, 1024).decode(errors='ignore')
if data:
await websocket.send_text(data)
except BlockingIOError:
continue
except Exception: pass
async def ws_to_pty():
try:
while True:
data = await websocket.receive_text()
os.write(master_fd, data.encode())
except Exception: pass
try:
await asyncio.gather(pty_to_ws(), ws_to_pty())
finally:
if proc.returncode is None: proc.terminate()
os.close(master_fd)
os.close(slave_fd)
# --- WEBSOCKET CHAT UPDATE ---
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# Hole Antwort von der gewählten KI
ai_msg = await get_ai_response(data)
await websocket.send_text(ai_msg)
except Exception as e:
print(f"Chat Error: {e}")
# --- Status in DB aktualisieren Helper ---
def update_node_status(ip, new_status):
conn = get_db()
conn.execute('UPDATE nodes SET status = ? WHERE ip = ?', (new_status, ip))
conn.commit()
conn.close()
# --- Erweiterter Remote-Task ---
async def run_remote_task(ip, user, cmd, task_name):
await manager.broadcast(f"🚀 KI-Task: {task_name} auf {ip}")
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'"
process = subprocess.Popen(ssh_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
await manager.broadcast(f"🛠️ {line.strip()[:80]}")
# WICHTIG: Wenn Docker installiert wurde, Status ändern!
if "docker" in task_name.lower():
update_node_status(ip, "Docker Aktiv")
await manager.broadcast(f"✨ Datenbank aktualisiert: {ip} ist nun 'Docker Aktiv'")
await manager.broadcast(f"{task_name} fertig.")
# --- Neuer Endpunkt: Manueller Refresh-Check ---
@app.get("/refresh_status/{node_id}")
async def refresh_status(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
if node:
# Kurzer Check via SSH, ob Docker antwortet
check_cmd = "command -v docker >/dev/null 2>&1 && echo 'Docker Aktiv' || echo 'Bereit (Kein Docker)'"
ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=2 {node['user']}@{node['ip']} \"{check_cmd}\""
try:
# Wir führen den Befehl aus
new_status = subprocess.check_output(ssh_cmd, shell=True).decode().strip()
except Exception:
new_status = "Offline/Fehler"
conn.execute('UPDATE nodes SET status = ? WHERE id = ?', (new_status, node_id))
conn.commit()
conn.close()
return {"status": new_status} # Wir senden nur den Status zurück
conn.close()
return {"status": "Unbekannt"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)