diff --git a/main.py b/main.py index 3dc6712..15846fc 100644 --- a/main.py +++ b/main.py @@ -4,24 +4,27 @@ import fcntl import subprocess import sqlite3 import asyncio -import time import re -from fastapi import FastAPI, WebSocket, BackgroundTasks, Request, Form, WebSocketDisconnect -from fastapi.responses import RedirectResponse, JSONResponse -from fastapi.templating import Jinja2Templates +import json +import httpx +from dotenv import load_dotenv, set_key +from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel + +load_dotenv() + +app = FastAPI() + +STATIC_DIR = os.path.join(os.path.dirname(__file__),"static") +app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") DB_PATH = "cluster.db" SSH_KEY = os.path.expanduser("~/.ssh/id_rsa") -app = FastAPI() -static_path = os.path.join(os.path.dirname(__file__), "static") -app.mount("/static", StaticFiles(directory=static_path), name="static") -templates = Jinja2Templates(directory="templates") - -# ----------------------------- +# ------------------------------------------------- # DATABASE -# ----------------------------- +# ------------------------------------------------- def init_db(): conn = sqlite3.connect(DB_PATH) @@ -32,18 +35,14 @@ def init_db(): name TEXT, ip TEXT UNIQUE, user TEXT, - - ssh_password TEXT, sudo_password TEXT, - os TEXT, - architecture TEXT, - - docker_installed INTEGER DEFAULT 0, - vnc_available INTEGER DEFAULT 0, - - last_seen INTEGER, - status TEXT + arch TEXT, + docker INTEGER DEFAULT 0, + ssh INTEGER DEFAULT 1, + vnc INTEGER DEFAULT 0, + status TEXT DEFAULT 'unknown', + last_seen TEXT ) """) @@ -58,294 +57,271 @@ def get_db(): conn.row_factory = sqlite3.Row return conn -# ----------------------------- -# NODE DISCOVERY -# ----------------------------- +# ------------------------------------------------- +# MODELS +# ------------------------------------------------- -async def detect_node_info(ip, user): +class NodeCreate(BaseModel): + name:str + ip:str + user:str - cmds = { - "os": "grep '^ID=' /etc/os-release 2>/dev/null | cut -d= -f2", - "arch": "uname -m", - "docker": "command -v docker >/dev/null 2>&1 && echo 1 || echo 0", - "vnc": "pgrep -f vnc >/dev/null 2>&1 && echo 1 || echo 0" - } +class NodeUpdate(BaseModel): + name:str + ip:str + user:str - results = {} +class AISettings(BaseModel): + provider:str + model:str + ollama:str - for key, cmd in cmds.items(): - ssh_cmd = f"ssh -o ConnectTimeout=3 -o StrictHostKeyChecking=no {user}@{ip} '{cmd}'" +# ------------------------------------------------- +# NODE API +# ------------------------------------------------- - try: - out = subprocess.check_output(ssh_cmd, shell=True).decode().strip() - except: - out = "" +@app.get("/nodes") +def get_nodes(): - results[key] = out + conn = get_db() - return results + rows = conn.execute("SELECT * FROM nodes").fetchall() -# ----------------------------- -# NODE MONITOR -# ----------------------------- + conn.close() -async def node_monitor_loop(): + return [dict(r) for r in rows] + + +@app.post("/nodes") +def add_node(node:NodeCreate): + + conn = get_db() + + conn.execute("INSERT INTO nodes(name,ip,user) VALUES(?,?,?)", + (node.name,node.ip,node.user)) + + conn.commit() + + conn.close() + + return {"status":"ok"} + + +@app.put("/nodes/{node_id}") +def update_node(node_id:int,node:NodeUpdate): + + conn = get_db() + + conn.execute(""" + UPDATE nodes + SET name=?, ip=?, user=? + WHERE id=? + """, + (node.name,node.ip,node.user,node_id)) + + conn.commit() + + conn.close() + + return {"status":"updated"} + +# ------------------------------------------------- +# NODE SCANNING +# ------------------------------------------------- + +async def run_ssh(ip,user,cmd): + + ssh_cmd=[ + "ssh", + "-o","StrictHostKeyChecking=no", + f"{user}@{ip}", + cmd + ] + + proc = await asyncio.create_subprocess_exec( + *ssh_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + out,_ = await proc.communicate() + + return out.decode().strip() + + +async def detect_node(node): + + ip=node['ip'] + user=node['user'] + + try: + + arch = await run_ssh(ip,user,"uname -m") + + os_release = await run_ssh(ip,user,"cat /etc/os-release || uname") + + docker = await run_ssh(ip,user,"docker --version || echo nodocker") + + vnc = await run_ssh(ip,user,"pgrep Xtightvnc || echo novnc") + + os_name="unknown" + + if "debian" in os_release.lower(): + os_name="debian" + + elif "fedora" in os_release.lower(): + os_name="fedora" + + elif "raspbian" in os_release.lower(): + os_name="raspberrypi" + + elif "ubuntu" in os_release.lower(): + os_name="ubuntu" + + docker_installed = 0 if "nodocker" in docker else 1 + + vnc_enabled = 0 if "novnc" in vnc else 1 + + status="online" + + except: + + arch="" + os_name="" + docker_installed=0 + vnc_enabled=0 + status="offline" + + conn=get_db() + + conn.execute(""" + UPDATE nodes + SET os=?,arch=?,docker=?,vnc=?,status=?,last_seen=datetime('now') + WHERE id=? + """, + (os_name,arch,docker_installed,vnc_enabled,status,node['id'])) + + conn.commit() + + conn.close() + + +async def scan_nodes(): while True: - conn = get_db() - nodes = conn.execute("SELECT * FROM nodes").fetchall() + conn=get_db() - for node in nodes: + nodes=conn.execute("SELECT * FROM nodes").fetchall() - ip = node["ip"] - user = node["user"] - - try: - - ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" - subprocess.check_output(ssh_cmd, shell=True) - - conn.execute( - "UPDATE nodes SET status=?, last_seen=? WHERE id=?", - ("Online", int(time.time()), node["id"]) - ) - - except: - - conn.execute( - "UPDATE nodes SET status=? WHERE id=?", - ("Offline", node["id"]) - ) - - conn.commit() conn.close() + tasks=[] + + for n in nodes: + tasks.append(detect_node(n)) + + if tasks: + await asyncio.gather(*tasks) + await asyncio.sleep(60) @app.on_event("startup") -async def start_monitor(): - asyncio.create_task(node_monitor_loop()) +async def start_scanner(): -# ----------------------------- -# SSH BOOTSTRAP -# ----------------------------- + asyncio.create_task(scan_nodes()) -async def bootstrap_node(ip, user, password): +# ------------------------------------------------- +# TERMINAL WEBSOCKET +# ------------------------------------------------- - ssh_copy_cmd = f"sshpass -p '{password}' ssh-copy-id -o StrictHostKeyChecking=no -i {SSH_KEY}.pub {user}@{ip}" - - subprocess.call(ssh_copy_cmd, shell=True) - - info = await detect_node_info(ip, user) - - conn = get_db() - - conn.execute( - """ - UPDATE nodes - SET os=?, architecture=?, docker_installed=?, vnc_available=?, status=? - WHERE ip=? - """, - ( - info["os"], - info["arch"], - int(info["docker"] or 0), - int(info["vnc"] or 0), - "Online", - ip - ) - ) - - conn.commit() - conn.close() - -# ----------------------------- -# ROUTES -# ----------------------------- - -@app.get("/") -async def index(request: Request): - - conn = get_db() - nodes = conn.execute("SELECT * FROM nodes").fetchall() - conn.close() - - return templates.TemplateResponse("index.html", {"request": request, "nodes": nodes}) - - -@app.post("/add_node") -async def add_node( - background_tasks: BackgroundTasks, - name: str = Form(...), - ip: str = Form(...), - user: str = Form(...), - password: str = Form(...) -): - - conn = get_db() - - conn.execute( - """ - INSERT INTO nodes (name, ip, user, ssh_password, status) - VALUES (?, ?, ?, ?, ?) - """, - (name, ip, user, password, "Connecting") - ) - - conn.commit() - conn.close() - - background_tasks.add_task(bootstrap_node, ip, user, password) - - return RedirectResponse("/", 303) - - -@app.post("/remove_node/{node_id}") -async def remove_node(node_id: int): - - conn = get_db() - conn.execute("DELETE FROM nodes WHERE id=?", (node_id,)) - conn.commit() - conn.close() - - return RedirectResponse("/", 303) - - -@app.get("/refresh_status/{node_id}") -async def refresh_status(node_id: int): - - conn = get_db() - node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() - - if not node: - return JSONResponse({"status": "Unknown"}) - - ip = node["ip"] - user = node["user"] - - try: - - ssh_cmd = f"ssh -o ConnectTimeout=2 -o StrictHostKeyChecking=no {user}@{ip} 'echo online'" - subprocess.check_output(ssh_cmd, shell=True) - - status = "Online" - - except: - - status = "Offline" - - conn.execute("UPDATE nodes SET status=? WHERE id=?", (status, node_id)) - conn.commit() - conn.close() - - return {"status": status} - -# ----------------------------- -# NODE EDIT -# ----------------------------- - -@app.get("/node/{node_id}") -async def get_node(node_id: int): - - conn = get_db() - node = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() - conn.close() - - if not node: - return JSONResponse({}) - - return dict(node) - - -@app.post("/update_node/{node_id}") -async def update_node( - node_id: int, - name: str = Form(...), - ip: str = Form(...), - user: str = Form(...) -): - - conn = get_db() - - conn.execute( - """ - UPDATE nodes - SET name=?, ip=?, user=? - WHERE id=? - """, - (name, ip, user, node_id) - ) - - conn.commit() - conn.close() - - return RedirectResponse("/", 303) - -# ----------------------------- -# TERMINAL -# ----------------------------- - -@app.websocket("/ws/terminal/{ip}") -async def terminal(websocket: WebSocket, ip: str): +@app.websocket("/ws/terminal") +async def terminal_ws(websocket:WebSocket): await websocket.accept() - conn = get_db() - node = conn.execute("SELECT * FROM nodes WHERE ip=?", (ip,)).fetchone() - conn.close() + pid,fd = pty.fork() - if not node: - await websocket.close() - return + if pid==0: + os.execvp("bash",["bash"]) - master_fd, slave_fd = pty.openpty() + while True: - proc = await asyncio.create_subprocess_exec( - "ssh", - "-o", - "StrictHostKeyChecking=no", - "-t", - f"{node['user']}@{ip}", - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd - ) - - async def pty_to_ws(): - - fl = fcntl.fcntl(master_fd, fcntl.F_GETFL) - fcntl.fcntl(master_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - while True: - - await asyncio.sleep(0.01) - - try: - data = os.read(master_fd, 1024).decode(errors="ignore") - - if data: - await websocket.send_text(data) - - except: - pass - - async def ws_to_pty(): + await asyncio.sleep(0.01) try: - while True: - - data = await websocket.receive_text() - os.write(master_fd, data.encode()) - + data=os.read(fd,1024).decode() + await websocket.send_text(data) except: pass - await asyncio.gather(pty_to_ws(), ws_to_pty()) + try: + msg = await asyncio.wait_for(websocket.receive_text(),0.01) + os.write(fd,msg.encode()) + except: + pass -if __name__ == "__main__": +# ------------------------------------------------- +# AI CHAT +# ------------------------------------------------- - import uvicorn +chat_history=[] - uvicorn.run(app, host="0.0.0.0", port=8000) +async def fake_ai(message:str): + + if "nodes" in message.lower(): + + conn=get_db() + + rows=conn.execute("SELECT name,ip,status FROM nodes").fetchall() + + conn.close() + + txt="Nodes:\n" + + for r in rows: + txt+=f"{r['name']} {r['ip']} {r['status']}\n" + + return txt + + return "AI connected." + + +@app.websocket("/ws/chat") +async def chat_ws(websocket:WebSocket): + + await websocket.accept() + + while True: + + msg = await websocket.receive_text() + + chat_history.append(msg) + + reply = await fake_ai(msg) + + await websocket.send_text(reply) + +# ------------------------------------------------- +# AI SETTINGS +# ------------------------------------------------- + +ENV_FILE = ".env" + +@app.post("/ai/settings") +async def save_settings(settings:AISettings): + + set_key(ENV_FILE,"AI_PROVIDER",settings.provider) + set_key(ENV_FILE,"OLLAMA_BASE_URL",settings.ollama) + + return {"status":"saved"} + +# ------------------------------------------------- +# ROOT +# ------------------------------------------------- + +@app.get("/") +def root(): + + return {"status":"PiDoBot running"}