source/main.py aktualisiert

This commit is contained in:
2026-03-09 14:58:42 +00:00
parent 3ed41cf810
commit b9291d9c6b

View File

@@ -112,28 +112,38 @@ def get_db():
conn.row_factory = sqlite3.Row
return conn
def get_system_prompt(current_user=WEB_USER_NAME):
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
docker_str = "Ja" if n['docker_installed'] else "Nein"
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n"
# Datei auslesen
if PROMPT_FILE.exists():
prompt = PROMPT_FILE.read_text(encoding="utf-8")
def get_system_prompt(current_user=WEB_USER_NAME, is_admin=False):
# Entscheide, welche Datei geladen wird
if is_admin:
prompt_path = CONFIG_DIR / "system_prompt.txt"
else:
prompt = "Fehler: system_prompt.txt nicht gefunden!"
prompt_path = CONFIG_DIR / "group_prompt.txt"
# Platzhalter durch die echten Werte ersetzen
prompt = prompt.replace("{node_info}", node_info)
prompt = prompt.replace("{workspace_dir}", str(WORKSPACE_DIR))
prompt = prompt.replace("{notes_file}", str(NOTES_FILE))
prompt = prompt.replace("{todo_file}", str(TODO_FILE))
prompt = prompt.replace("{user_name}", current_user) # <-- NEU: Der Name!
# Datei auslesen
if prompt_path.exists():
prompt = prompt_path.read_text(encoding="utf-8")
else:
# Fallback, falls die Datei fehlt
prompt = f"Hallo {current_user}, ich bin dein Assistent."
# Namen ersetzen (funktioniert in beiden Prompts)
prompt = prompt.replace("{user_name}", current_user)
# Server-Infos NUR für Admins einfügen
if is_admin:
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
docker_str = "Ja" if n['docker_installed'] else "Nein"
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}, OS: {n['os']}, Arch: {n['arch']}, Docker: {docker_str}\n"
prompt = prompt.replace("{node_info}", node_info)
prompt = prompt.replace("{workspace_dir}", str(WORKSPACE_DIR))
prompt = prompt.replace("{notes_file}", str(NOTES_FILE))
prompt = prompt.replace("{todo_file}", str(TODO_FILE))
return prompt
@@ -250,7 +260,15 @@ async def handle_telegram_message(update: Update, context: ContextTypes.DEFAULT_
# 2. KI fragen (Wir übergeben die spezifische Historie!)
# HINWEIS: Stelle sicher, dass deine get_ai_response Funktion die current_history auch annimmt.
ai_response = await get_ai_response(user_msg, get_system_prompt(sender_name), current_history)
# Prüfen, ob der Sender der Admin ist
is_admin_user = (user_id == ALLOWED_ID)
# KI fragen (mit Admin-Flag für den Prompt!)
ai_response = await get_ai_response(
user_msg,
get_system_prompt(sender_name, is_admin=is_admin_user),
current_history
)
commands = re.findall(r'<EXECUTE target="(.*?)">(.*?)</EXECUTE>', ai_response, re.I | re.S)
clean_msg = re.sub(r'<EXECUTE.*?>.*?</EXECUTE>', '', ai_response, flags=re.I | re.S).strip()
@@ -630,7 +648,7 @@ async def chat_endpoint(websocket: WebSocket):
chat_histories["private"].append({"role": "user", "content": user_msg, "timestamp": now})
# 2. KI fragen (mit der Historie!)
ai_response = await get_ai_response(user_msg, get_system_prompt(), chat_histories["private"])
ai_response = await get_ai_response(user_msg, get_system_prompt(is_admin=True), chat_histories["private"])
commands = re.findall(r'<EXECUTE target="(.*?)">(.*?)</EXECUTE>', ai_response, re.I | re.S)
clean_msg = re.sub(r'<EXECUTE.*?>.*?</EXECUTE>', '', ai_response, flags=re.I | re.S).strip()