Webseite überarbeitet und Telegram Bot funktion hinzugefügt #1

Merged
pi-farm merged 59 commits from dev into main 2026-03-07 23:50:03 +00:00
Showing only changes of commit 5fbd16c5ec - Show all commits

566
main.py
View File

@@ -1,241 +1,353 @@
import os
import json
import threading
import time
from flask import Flask, render_template, request, jsonify, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_sock import Sock
import paramiko
import requests
# AI Provider Imports
import pty
import fcntl
import subprocess
import sqlite3
import asyncio
import openai
import google.generativeai as genai
import re
import httpx
from google import genai
from google.genai import types
import json
from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv, set_key
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///orchestrator.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
sock = Sock(app)
# Lade Umgebungsvariablen
load_dotenv()
# --- DATENBANK MODELLE ---
app = FastAPI()
static_path = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_path), name="static")
templates = Jinja2Templates(directory="templates")
class Node(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
ip = db.Column(db.String(50), nullable=False)
user = db.Column(db.String(50), nullable=False)
password = db.Column(db.String(100), nullable=False)
status = db.Column(db.String(50), default="Unbekannt")
arch = db.Column(db.String(20), default="N/A")
os_type = db.Column(db.String(50), default="linux")
has_docker = db.Column(db.Boolean, default=False)
has_vnc = db.Column(db.Boolean, default=False)
SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DB_PATH = "cluster.db"
chat_history = []
PROMPT_FILE = "system_prompt.txt"
ENV_FILE = os.path.join(os.path.dirname(__file__), ".env")
def to_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
# KI KONFIGURATION
AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434/v1")
class Settings(db.Model):
id = db.Column(db.Integer, primary_key=True)
provider = db.Column(db.String(50), default="google")
google_model = db.Column(db.String(100), default="gemini-1.5-flash")
openai_model = db.Column(db.String(100), default="gpt-4o-mini")
ollama_model = db.Column(db.String(100), default="llama3")
ollama_base_url = db.Column(db.String(200), default="http://localhost:11434/v1")
GOOGLE_MODEL = os.getenv("GOOGLE_MODEL", "gemini-2.0-flash")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3")
with app.app_context():
db.create_all()
if not Settings.query.first():
db.session.add(Settings())
db.session.commit()
# --- DATENBANK INITIALISIERUNG (ERWEITERT) ---
def init_db():
conn = sqlite3.connect(DB_PATH)
# Spalten erweitert um sudo_password, os, arch, docker_installed
conn.execute('''
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
ip TEXT UNIQUE,
user TEXT,
sudo_password TEXT,
os TEXT DEFAULT 'Unbekannt',
arch TEXT DEFAULT 'Unbekannt',
docker_installed INTEGER DEFAULT 0,
status TEXT
)
''')
conn.commit()
conn.close()
# --- HILFSFUNKTIONEN ---
init_db()
def get_ssh_client(node):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(node.ip, username=node.user, password=node.password, timeout=5)
return client
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def run_ssh_cmd(node, cmd):
def get_system_prompt():
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
docker_str = "Ja" if n['docker_installed'] else "Nein"
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n"
if os.path.exists(PROMPT_FILE):
with open(PROMPT_FILE, "r", encoding="utf-8") as f:
template = f.read()
else:
template = "Du bist ein Cluster-Orchestrator. Nodes:\n{node_info}\nBefehle via <EXECUTE target=\"IP\">cmd</EXECUTE>"
print(f"⚠️ Warnung: {PROMPT_FILE} fehlt.")
return template.replace("{node_info}", node_info)
# --- KI LOGIK (UNVERÄNDERT) ---
async def get_ai_response(user_input, system_prompt):
global chat_history
chat_history.append({"role": "user", "content": user_input})
chat_history = chat_history[-30:]
ai_msg = ""
try:
client = get_ssh_client(node)
stdin, stdout, stderr = client.exec_command(cmd)
result = stdout.read().decode().strip()
client.close()
return result
except:
return None
# --- ROUTEN ---
@app.route('/')
def index():
nodes = Node.query.all()
return render_template('index.html', nodes=nodes)
@app.route('/add_node', methods=['POST'])
def add_node():
new_node = Node(
name=request.form['name'],
ip=request.form['ip'],
user=request.form['user'],
password=request.form['password']
)
db.session.add(new_node)
db.session.commit()
return redirect(url_for('index'))
@app.route('/edit_node/<int:id>', methods=['POST'])
def edit_node(id):
node = Node.query.get_or_404(id)
node.name = request.form.get('name', node.name)
node.ip = request.form.get('ip', node.ip)
node.user = request.form.get('user', node.user)
if request.form.get('password'):
node.password = request.form.get('password')
db.session.commit()
return redirect(url_for('index'))
@app.route('/remove_node/<int:id>', methods=['POST'])
def remove_node(id):
node = Node.query.get_or_404(id)
db.session.delete(node)
db.session.commit()
return redirect(url_for('index'))
@app.route('/refresh_status/<int:id>')
def refresh_status(id):
node = Node.query.get_or_404(id)
try:
# Architektur prüfen
node.arch = run_ssh_cmd(node, "uname -m") or "N/A"
# OS Typ prüfen
os_info = run_ssh_cmd(node, "cat /etc/os-release | grep ^ID=")
node.os_type = os_info.split('=')[1].replace('"', '') if os_info else "linux"
# Docker prüfen
docker_check = run_ssh_cmd(node, "docker ps")
node.has_docker = True if docker_check is not None else False
# VNC prüfen (Port 5900/5901)
vnc_check = run_ssh_cmd(node, "netstat -tuln | grep :590")
node.has_vnc = True if vnc_check else False
node.status = "Online"
except:
node.status = "Offline"
db.session.commit()
return jsonify(node.to_dict())
# --- SETTINGS API ---
@app.route('/api/settings', methods=['GET', 'POST'])
def handle_settings():
s = Settings.query.first()
if request.method == 'POST':
data = request.json
s.provider = data.get('provider', s.provider)
s.ollama_base_url = data.get('ollama_base_url', s.ollama_base_url)
# Speichere das Modell für den aktuellen Provider
setattr(s, f"{s.provider}_model", data.get('model'))
db.session.commit()
return jsonify({"status": "success"})
return jsonify({
"provider": s.provider,
"google_model": s.google_model,
"openai_model": s.openai_model,
"ollama_model": s.ollama_model,
"ollama_base_url": s.ollama_base_url
})
@app.route('/api/models')
def get_models():
provider = request.args.get('provider')
if provider == "google":
return jsonify({"models": ["gemini-1.5-flash", "gemini-1.5-pro"]})
elif provider == "openai":
return jsonify({"models": ["gpt-4o-mini", "gpt-4o", "gpt-3.5-turbo"]})
elif provider == "ollama":
url = request.args.get('url', 'http://localhost:11434/v1')
try:
# Versuche Modelle von Ollama API zu laden
r = requests.get(url.replace('/v1', '/api/tags'), timeout=2)
names = [m['name'] for m in r.json().get('models', [])]
return jsonify({"models": names})
except:
return jsonify({"models": ["llama3", "mistral", "codellama"]})
return jsonify({"models": []})
# --- WEBSOCKETS ---
@sock.route('/ws/install_logs')
def install_logs(ws):
# Dummy Log Stream für Demo-Zwecke
while True:
data = ws.receive(timeout=1)
# Hier könnten echte Hintergrund-Prozesse ihre Logs senden
pass
@sock.route('/ws/chat')
def chat_handler(ws):
s = Settings.query.first()
while True:
msg = ws.receive()
if not msg: break
# KI LOGIK
response = "Fehler: Provider nicht konfiguriert"
try:
if s.provider == "google":
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
model = genai.GenerativeModel(s.google_model)
response = model.generate_content(msg).text
elif s.provider == "openai":
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
res = client.chat.completions.create(
model=s.openai_model,
messages=[{"role": "user", "content": msg}]
)
response = res.choices[0].message.content
elif s.provider == "ollama":
r = requests.post(f"{s.ollama_base_url}/chat/completions", json={
"model": s.ollama_model,
"messages": [{"role": "user", "content": msg}],
"stream": False
})
response = r.json()['choices'][0]['message']['content']
except Exception as e:
response = f"KI-Fehler: {str(e)}"
ws.send(response)
@sock.route('/ws/terminal/<ip>')
def terminal_handler(ws, ip):
node = Node.query.filter_by(ip=ip).first()
if not node: return
try:
client = get_ssh_client(node)
chan = client.invoke_shell()
def split_reader():
while True:
if chan.recv_ready():
out = chan.recv(1024).decode()
ws.send(out)
time.sleep(0.01)
threading.Thread(target=split_reader, daemon=True).start()
while True:
cmd = ws.receive()
if not cmd: break
chan.send(cmd)
if AI_PROVIDER in ["openai", "ollama"]:
url = OLLAMA_BASE_URL if AI_PROVIDER == "ollama" else None
if url and not url.endswith('/v1'): url = url.rstrip('/') + '/v1'
key = "ollama" if AI_PROVIDER == "ollama" else OPENAI_API_KEY
model_to_use = OLLAMA_MODEL if AI_PROVIDER == "ollama" else OPENAI_MODEL
client = openai.OpenAI(base_url=url, api_key=key)
response = client.chat.completions.create(model=model_to_use, messages=[{"role": "system", "content": system_prompt}] + chat_history)
ai_msg = response.choices[0].message.content
elif AI_PROVIDER == "google":
client = genai.Client(api_key=GOOGLE_API_KEY)
google_history = [types.Content(role="user" if m["role"] == "user" else "model", parts=[types.Part.from_text(text=msg["content"])]) for msg in chat_history[:-1]]
chat = client.chats.create(model=GOOGLE_MODEL, config=types.GenerateContentConfig(system_instruction=system_prompt), history=google_history)
response = chat.send_message(user_input)
ai_msg = response.text
except Exception as e:
ws.send(f"\r\n[SSH FEHLER]: {str(e)}\r\n")
ai_msg = f"KI Fehler: {e}"
chat_history.append({"role": "assistant", "content": ai_msg})
return ai_msg
if __name__ == '__main__':
# Stelle sicher, dass API Keys in der Umgebung sind oder hier hartcodiert werden
# os.environ["GOOGLE_API_KEY"] = "DEIN_KEY"
app.run(host='0.0.0.0', port=5000, debug=True)
# --- WebSocket Manager ---
class ConnectionManager:
def __init__(self): self.active_connections = []
async def connect(self, ws: WebSocket): await ws.accept(); self.active_connections.append(ws)
def disconnect(self, ws: WebSocket): self.active_connections.remove(ws)
async def broadcast(self, msg: str):
for c in self.active_connections:
try: await c.send_text(msg)
except: pass
manager = ConnectionManager()
# --- ERWEITERTES NODE BOOTSTRAPPING (Inventur) ---
async def bootstrap_node(ip, user, password):
await manager.broadcast(f"🔑 SSH-Handshake für {ip}...")
# 1. Key kopieren
ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}"
proc = subprocess.run(ssh_copy_cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
await manager.broadcast(f"❌ Fehler beim Key-Copy für {ip}: {proc.stderr}")
return
# 2. System-Infos abrufen (Inventur)
await manager.broadcast(f"🔍 Inventur auf {ip} wird durchgeführt...")
# Befehlskette: Arch, OS-Name, Docker-Check
inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')"
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {user}@{ip} \"{inspect_cmd}\""
try:
output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n')
arch = output[0] if len(output) > 0 else "Unbekannt"
# Mapping für Architektur
if "aarch64" in arch.lower(): arch = "arm64"
elif "x86_64" in arch.lower(): arch = "x86-64"
os_name = output[1].replace('"', '') if len(output) > 1 else "Linux"
docker_val = int(output[2]) if len(output) > 2 else 0
status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)"
# Datenbank Update
conn = get_db()
conn.execute('''
UPDATE nodes SET os = ?, arch = ?, docker_installed = ?, status = ?
WHERE ip = ?
''', (os_name, arch, docker_val, status, ip))
conn.commit()
conn.close()
await manager.broadcast(f"✅ Node {ip} konfiguriert ({os_name}, {arch}).")
except Exception as e:
await manager.broadcast(f"⚠️ Inventur auf {ip} unvollständig: {e}")
# --- ROUTES ---
@app.get("/")
async def index(request: Request):
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes})
@app.get("/api/node/{node_id}")
async def get_node(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
conn.close()
return dict(node) if node else {}
@app.post("/add_node")
async def add_node(background_tasks: BackgroundTasks, name: str = Form(...), ip: str = Form(...), user: str = Form(...), password: str = Form(...)):
conn = get_db()
try:
# Speichere Initialdaten inkl. Sudo-Passwort
conn.execute('''
INSERT INTO nodes (name, ip, user, sudo_password, status)
VALUES (?, ?, ?, ?, ?)
''', (name, ip, user, password, "Kopplung..."))
conn.commit()
background_tasks.add_task(bootstrap_node, ip, user, password)
except sqlite3.IntegrityError: pass
finally: conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/edit_node/{node_id}")
async def edit_node(node_id: int, name: str = Form(...), ip: str = Form(...), user: str = Form(...)):
conn = get_db()
conn.execute('UPDATE nodes SET name=?, ip=?, user=? WHERE id=?', (name, ip, user, node_id))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
@app.post("/remove_node/{node_id}")
async def remove_node(node_id: int):
conn = get_db()
conn.execute('DELETE FROM nodes WHERE id = ?', (node_id,))
conn.commit()
conn.close()
return RedirectResponse(url="/", status_code=303)
@app.get("/refresh_status/{node_id}")
async def refresh_status(node_id: int):
conn = get_db()
node = conn.execute('SELECT * FROM nodes WHERE id = ?', (node_id,)).fetchone()
if not node: return {"status": "Offline"}
inspect_cmd = "uname -m && (lsb_release -ds || cat /etc/os-release | grep PRETTY_NAME | cut -d'=' -f2) && (command -v docker >/dev/null 2>&1 && echo '1' || echo '0')"
ssh_cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=3 {node['user']}@{node['ip']} \"{inspect_cmd}\""
try:
output = subprocess.check_output(ssh_cmd, shell=True).decode().strip().split('\n')
arch = output[0]; os_name = output[1].replace('"', ''); docker_val = int(output[2])
if "aarch64" in arch.lower(): arch = "arm64"
elif "x86_64" in arch.lower(): arch = "x86-64"
new_status = "Docker Aktiv" if docker_val else "Bereit (Kein Docker)"
conn.execute('UPDATE nodes SET status=?, os=?, arch=?, docker_installed=? WHERE id=?',
(new_status, os_name, arch, docker_val, node_id))
conn.commit()
result = {"status": new_status, "os": os_name, "arch": arch, "docker": docker_val}
except:
new_status = "Offline"
conn.execute('UPDATE nodes SET status=? WHERE id=?', (new_status, node_id))
conn.commit()
result = {"status": new_status, "os": node['os'], "arch": node['arch'], "docker": node['docker_installed']}
conn.close()
return result
# --- WebSockets Terminal / Chat / Logs (Integration wie gehabt) ---
@app.websocket("/ws/install_logs")
async def log_websocket(websocket: WebSocket):
await manager.connect(websocket)
try:
while True: await websocket.receive_text()
except WebSocketDisconnect: manager.disconnect(websocket)
@app.websocket("/ws/terminal/{ip}")
async def terminal_websocket(websocket: WebSocket, ip: str):
await websocket.accept()
conn = get_db(); node = conn.execute('SELECT * FROM nodes WHERE ip = ?', (ip,)).fetchone(); conn.close()
if not node: await websocket.close(); return
master_fd, slave_fd = pty.openpty()
proc = await asyncio.create_subprocess_exec("ssh", "-o", "StrictHostKeyChecking=no", "-t", f"{node['user']}@{ip}", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd)
async def pty_to_ws():
fl = fcntl.fcntl(master_fd, fcntl.F_GETFL); fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
while True:
await asyncio.sleep(0.01)
try:
data = os.read(master_fd, 1024).decode(errors='ignore')
if data: await websocket.send_text(data)
except BlockingIOError: continue
except: pass
async def ws_to_pty():
try:
while True:
data = await websocket.receive_text(); os.write(master_fd, data.encode())
except: pass
try: await asyncio.gather(pty_to_ws(), ws_to_pty())
finally:
if proc.returncode is None: proc.terminate()
os.close(master_fd); os.close(slave_fd)
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
user_msg = await websocket.receive_text()
ai_response = await get_ai_response(user_msg, get_system_prompt())
commands = re.findall(r'<EXECUTE target="(.*?)">(.*?)</EXECUTE>', ai_response, re.I | re.S)
clean_msg = re.sub(r'<EXECUTE.*?>.*?</EXECUTE>', '', ai_response, flags=re.I | re.S).strip()
if clean_msg: await websocket.send_text(clean_msg)
if commands:
tasks = []
for target, cmd in commands:
conn = get_db(); n = conn.execute('SELECT * FROM nodes WHERE ip=? OR name=?', (target.strip(), target.strip())).fetchone(); conn.close()
if n: tasks.append(run_remote_task(n['ip'], n['user'], cmd.strip()))
if tasks:
await websocket.send_text(" *Führe Befehle aus...*")
await asyncio.gather(*tasks)
summary = await get_ai_response("Zusammenfassung der Ergebnisse?", get_system_prompt())
await websocket.send_text(f"--- Info ---\n{summary}")
except: pass
async def run_remote_task(ip, user, cmd):
await manager.broadcast(f"🚀 Task: {cmd} auf {ip}")
proc = await asyncio.create_subprocess_shell(f"ssh -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
full_output = ""
while True:
line = await proc.stdout.readline()
if not line: break
out = line.decode('utf-8', errors='ignore').strip()
if out: await manager.broadcast(f"🛠️ {out}"); full_output += out + "\n"
await proc.wait()
chat_history.append({"role": "user", "content": f"[SYSTEM] Befehl '{cmd}' auf {ip} fertig:\n{full_output or 'Kein Output'}"})
# --- Settings API ---
@app.get("/api/settings")
async def get_settings():
return {"provider": AI_PROVIDER, "google_model": GOOGLE_MODEL, "openai_model": OPENAI_MODEL, "ollama_model": OLLAMA_MODEL, "ollama_base_url": OLLAMA_BASE_URL}
@app.post("/api/settings")
async def update_settings(request: Request):
global AI_PROVIDER, GOOGLE_MODEL, OPENAI_MODEL, OLLAMA_MODEL, OLLAMA_BASE_URL
data = await request.json()
provider = data.get("provider")
if provider:
AI_PROVIDER = provider; set_key(ENV_FILE, "AI_PROVIDER", provider)
if data.get("model"):
m = data.get("model")
if provider == "google": GOOGLE_MODEL = m; set_key(ENV_FILE, "GOOGLE_MODEL", m)
if provider == "openai": OPENAI_MODEL = m; set_key(ENV_FILE, "OPENAI_MODEL", m)
if provider == "ollama": OLLAMA_MODEL = m; set_key(ENV_FILE, "OLLAMA_MODEL", m)
if data.get("ollama_base_url"):
u = data.get("ollama_base_url"); OLLAMA_BASE_URL = u; set_key(ENV_FILE, "OLLAMA_BASE_URL", u)
return {"status": "success"}
@app.get("/api/models")
async def get_models(provider: str):
try:
if provider == "ollama":
url = OLLAMA_BASE_URL.replace("/v1", "").rstrip("/")
async with httpx.AsyncClient() as client:
r = await client.get(f"{url}/api/tags", timeout=5.0)
return {"models": [m["name"] for m in r.json().get("models", [])]}
elif provider == "openai":
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
r = await client.models.list()
return {"models": sorted([m.id for m in r.data if "gpt" in m.id or "o1" in m.id])}
elif provider == "google":
client = genai.Client(api_key=GOOGLE_API_KEY)
return {"models": sorted([m.name.replace("models/", "") for m in client.models.list() if 'generateContent' in m.supported_actions])}
except: return {"models": []}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)