import os import pty import fcntl import subprocess import sqlite3 import asyncio import re import json import httpx from dotenv import load_dotenv, set_key from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from pydantic import BaseModel load_dotenv() app = FastAPI() STATIC_DIR = os.path.join(os.path.dirname(__file__),"static") app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") DB_PATH = "cluster.db" SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") # ------------------------------------------------- # DATABASE # ------------------------------------------------- def init_db(): conn = sqlite3.connect(DB_PATH) conn.execute(""" CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, ip TEXT UNIQUE, user TEXT, sudo_password TEXT, os TEXT, arch TEXT, docker INTEGER DEFAULT 0, ssh INTEGER DEFAULT 1, vnc INTEGER DEFAULT 0, status TEXT DEFAULT 'unknown', last_seen TEXT ) """) conn.commit() conn.close() init_db() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # ------------------------------------------------- # MODELS # ------------------------------------------------- class NodeCreate(BaseModel): name:str ip:str user:str class NodeUpdate(BaseModel): name:str ip:str user:str class AISettings(BaseModel): provider:str model:str ollama:str # ------------------------------------------------- # NODE API # ------------------------------------------------- @app.get("/nodes") def get_nodes(): conn = get_db() rows = conn.execute("SELECT * FROM nodes").fetchall() conn.close() return [dict(r) for r in rows] @app.post("/nodes") def add_node(node:NodeCreate): conn = get_db() conn.execute("INSERT INTO nodes(name,ip,user) VALUES(?,?,?)", (node.name,node.ip,node.user)) conn.commit() conn.close() return {"status":"ok"} @app.put("/nodes/{node_id}") def update_node(node_id:int,node:NodeUpdate): conn = get_db() conn.execute(""" UPDATE nodes SET name=?, ip=?, user=? WHERE id=? """, (node.name,node.ip,node.user,node_id)) conn.commit() conn.close() return {"status":"updated"} # ------------------------------------------------- # NODE SCANNING # ------------------------------------------------- async def run_ssh(ip,user,cmd): ssh_cmd=[ "ssh", "-o","StrictHostKeyChecking=no", f"{user}@{ip}", cmd ] proc = await asyncio.create_subprocess_exec( *ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) out,_ = await proc.communicate() return out.decode().strip() async def detect_node(node): ip=node['ip'] user=node['user'] try: arch = await run_ssh(ip,user,"uname -m") os_release = await run_ssh(ip,user,"cat /etc/os-release || uname") docker = await run_ssh(ip,user,"docker --version || echo nodocker") vnc = await run_ssh(ip,user,"pgrep Xtightvnc || echo novnc") os_name="unknown" if "debian" in os_release.lower(): os_name="debian" elif "fedora" in os_release.lower(): os_name="fedora" elif "raspbian" in os_release.lower(): os_name="raspberrypi" elif "ubuntu" in os_release.lower(): os_name="ubuntu" docker_installed = 0 if "nodocker" in docker else 1 vnc_enabled = 0 if "novnc" in vnc else 1 status="online" except: arch="" os_name="" docker_installed=0 vnc_enabled=0 status="offline" conn=get_db() conn.execute(""" UPDATE nodes SET os=?,arch=?,docker=?,vnc=?,status=?,last_seen=datetime('now') WHERE id=? """, (os_name,arch,docker_installed,vnc_enabled,status,node['id'])) conn.commit() conn.close() async def scan_nodes(): while True: conn=get_db() nodes=conn.execute("SELECT * FROM nodes").fetchall() conn.close() tasks=[] for n in nodes: tasks.append(detect_node(n)) if tasks: await asyncio.gather(*tasks) await asyncio.sleep(60) @app.on_event("startup") async def start_scanner(): asyncio.create_task(scan_nodes()) # ------------------------------------------------- # TERMINAL WEBSOCKET # ------------------------------------------------- @app.websocket("/ws/terminal") async def terminal_ws(websocket:WebSocket): await websocket.accept() pid,fd = pty.fork() if pid==0: os.execvp("bash",["bash"]) while True: await asyncio.sleep(0.01) try: data=os.read(fd,1024).decode() await websocket.send_text(data) except: pass try: msg = await asyncio.wait_for(websocket.receive_text(),0.01) os.write(fd,msg.encode()) except: pass # ------------------------------------------------- # AI CHAT # ------------------------------------------------- chat_history=[] async def fake_ai(message:str): if "nodes" in message.lower(): conn=get_db() rows=conn.execute("SELECT name,ip,status FROM nodes").fetchall() conn.close() txt="Nodes:\n" for r in rows: txt+=f"{r['name']} {r['ip']} {r['status']}\n" return txt return "AI connected." @app.websocket("/ws/chat") async def chat_ws(websocket:WebSocket): await websocket.accept() while True: msg = await websocket.receive_text() chat_history.append(msg) reply = await fake_ai(msg) await websocket.send_text(reply) # ------------------------------------------------- # AI SETTINGS # ------------------------------------------------- ENV_FILE = ".env" @app.post("/ai/settings") async def save_settings(settings:AISettings): set_key(ENV_FILE,"AI_PROVIDER",settings.provider) set_key(ENV_FILE,"OLLAMA_BASE_URL",settings.ollama) return {"status":"saved"} # ------------------------------------------------- # ROOT # ------------------------------------------------- @app.get("/") def root(): return {"status":"PiDoBot running"}