diff --git a/source/main.py b/source/main.py index 05eb125..020e57d 100644 --- a/source/main.py +++ b/source/main.py @@ -112,28 +112,38 @@ def get_db(): conn.row_factory = sqlite3.Row return conn -def get_system_prompt(current_user=WEB_USER_NAME): - conn = get_db() - nodes = conn.execute('SELECT * FROM nodes').fetchall() - conn.close() - - node_info = "" - for n in nodes: - docker_str = "Ja" if n['docker_installed'] else "Nein" - node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n" - - # Datei auslesen - if PROMPT_FILE.exists(): - prompt = PROMPT_FILE.read_text(encoding="utf-8") +def get_system_prompt(current_user=WEB_USER_NAME, is_admin=False): + # Entscheide, welche Datei geladen wird + if is_admin: + prompt_path = CONFIG_DIR / "system_prompt.txt" else: - prompt = "Fehler: system_prompt.txt nicht gefunden!" + prompt_path = CONFIG_DIR / "group_prompt.txt" - # Platzhalter durch die echten Werte ersetzen - prompt = prompt.replace("{node_info}", node_info) - prompt = prompt.replace("{workspace_dir}", str(WORKSPACE_DIR)) - prompt = prompt.replace("{notes_file}", str(NOTES_FILE)) - prompt = prompt.replace("{todo_file}", str(TODO_FILE)) - prompt = prompt.replace("{user_name}", current_user) # <-- NEU: Der Name! + # Datei auslesen + if prompt_path.exists(): + prompt = prompt_path.read_text(encoding="utf-8") + else: + # Fallback, falls die Datei fehlt + prompt = f"Hallo {current_user}, ich bin dein Assistent." + + # Namen ersetzen (funktioniert in beiden Prompts) + prompt = prompt.replace("{user_name}", current_user) + + # Server-Infos NUR für Admins einfügen + if is_admin: + conn = get_db() + nodes = conn.execute('SELECT * FROM nodes').fetchall() + conn.close() + + node_info = "" + for n in nodes: + docker_str = "Ja" if n['docker_installed'] else "Nein" + node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n" + + prompt = prompt.replace("{node_info}", node_info) + prompt = prompt.replace("{workspace_dir}", str(WORKSPACE_DIR)) + prompt = prompt.replace("{notes_file}", str(NOTES_FILE)) + prompt = prompt.replace("{todo_file}", str(TODO_FILE)) return prompt @@ -250,7 +260,15 @@ async def handle_telegram_message(update: Update, context: ContextTypes.DEFAULT_ # 2. KI fragen (Wir übergeben die spezifische Historie!) # HINWEIS: Stelle sicher, dass deine get_ai_response Funktion die current_history auch annimmt. - ai_response = await get_ai_response(user_msg, get_system_prompt(sender_name), current_history) + # Prüfen, ob der Sender der Admin ist + is_admin_user = (user_id == ALLOWED_ID) + + # KI fragen (mit Admin-Flag für den Prompt!) + ai_response = await get_ai_response( + user_msg, + get_system_prompt(sender_name, is_admin=is_admin_user), + current_history + ) commands = re.findall(r'(.*?)', ai_response, re.I | re.S) clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip() @@ -630,7 +648,7 @@ async def chat_endpoint(websocket: WebSocket): chat_histories["private"].append({"role": "user", "content": user_msg, "timestamp": now}) # 2. KI fragen (mit der Historie!) - ai_response = await get_ai_response(user_msg, get_system_prompt(), chat_histories["private"]) + ai_response = await get_ai_response(user_msg, get_system_prompt(is_admin=True), chat_histories["private"]) commands = re.findall(r'(.*?)', ai_response, re.I | re.S) clean_msg = re.sub(r'.*?', '', ai_response, flags=re.I | re.S).strip()