import os
import re
import sqlite3
import asyncio
import openai
import sys
import subprocess
import edge_tts
from google import genai
from google.genai import types
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from colorama import init, Fore, Style
# ====================================================
# INITIALISIERUNG
# ====================================================
init(autoreset=True)
# ====================================================
# PFADE & SETUP
# ====================================================
BASE_DIR = Path(__file__).resolve().parent
CONFIG_DIR = BASE_DIR / "config"
DATA_DIR = BASE_DIR / "data"
WORKSPACE_DIR = BASE_DIR / "workspace"
ENV_FILE = CONFIG_DIR / ".env"
load_dotenv(ENV_FILE)
DB_PATH = DATA_DIR / "cluster.db"
NOTES_FILE = WORKSPACE_DIR / "NOTIZEN.md"
TODO_FILE = WORKSPACE_DIR / "TODO.md"
CHAT_LOG_FILE = WORKSPACE_DIR / "chat_history.log"
WEB_USER_NAME = os.getenv("WEB_USER_NAME", "Meik")
# ====================================================
# TERMINAL FARBEN
# ====================================================
USER_COLOR = Fore.CYAN
JARVIS_COLOR = Fore.GREEN
SYSTEM_COLOR = Fore.YELLOW
ERROR_COLOR = Fore.RED
OUTPUT_COLOR = Fore.MAGENTA
INFO_COLOR = Fore.BLUE
RESET = Style.RESET_ALL
# ====================================================
# ORDNER & DATEIEN
# ====================================================
for d in [WORKSPACE_DIR, DATA_DIR, CONFIG_DIR]:
d.mkdir(parents=True, exist_ok=True)
for f in [NOTES_FILE, TODO_FILE]:
if not f.exists():
f.write_text(
f"# {f.name}\nHier fängt dein Gedächtnis an, J.A.R.V.I.S.\n",
encoding="utf-8"
)
# ====================================================
# KI KONFIGURATION
# ====================================================
AI_PROVIDER = os.getenv("AI_PROVIDER", "google").lower()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
OLLAMA_BASE_URL = os.getenv(
"OLLAMA_BASE_URL",
"http://127.0.0.1:11434/v1"
)
GOOGLE_MODEL = os.getenv(
"GOOGLE_MODEL",
"gemini-2.5-flash"
)
OPENAI_MODEL = os.getenv(
"OPENAI_MODEL",
"gpt-4o"
)
OLLAMA_MODEL = os.getenv(
"OLLAMA_MODEL",
"llama3"
)
NVIDIA_MODEL = os.getenv(
"NVIDIA_MODEL",
"moonshotai/kimi-k2.6"
)
GROQ_MODEL = os.getenv(
"GROQ_MODEL",
"meta-llama/llama-4-scout-17b-16e-instruct"
)
# ====================================================
# DATENBANK
# ====================================================
def init_db():
conn = sqlite3.connect(DB_PATH)
conn.execute('''
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
ip TEXT UNIQUE,
user TEXT,
sudo_password TEXT,
os TEXT DEFAULT 'Unbekannt',
arch TEXT DEFAULT 'Unbekannt',
docker_installed INTEGER DEFAULT 0,
status TEXT
)
''')
conn.commit()
conn.close()
init_db()
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# ====================================================
# DYNAMISCHE PROGRAMM-ERKENNUNG (NEU)
# ====================================================
def get_installed_gui_apps():
"""Scannt das System nach installierten GUI-Programmen und deren Befehlen."""
apps_dir = Path("/usr/share/applications")
detected_apps = {}
if apps_dir.exists():
for desktop_file in apps_dir.glob("*.desktop"):
try:
# Wir lesen die .desktop Datei aus
content = desktop_file.read_text(encoding="utf-8", errors="ignore")
# Suchen nach Name und Exec-Befehl
name_match = re.search(r"^Name=(.+)$", content, re.M)
exec_match = re.search(r"^Exec=([^ \n%]+)", content, re.M) # Nur den reinen Befehl ohne Argumente (%U etc.)
if name_match and exec_match:
app_name = name_match.group(1).strip()
app_cmd = exec_match.group(1).strip()
# Ignoriere Core-Systemkram, der Meik nur nerven würde
if not any(x in app_cmd.lower() for x in ["debian", "im-config", "openjdk", "systemd"]):
detected_apps[app_name] = app_cmd
except Exception:
continue
return detected_apps
# ====================================================
# SYSTEM PROMPT
# ====================================================
def get_system_prompt():
prompt_path = CONFIG_DIR / "system_prompt.txt"
prompt = (
prompt_path.read_text(encoding="utf-8")
if prompt_path.exists()
else f"Hallo {WEB_USER_NAME}, ich bin J.A.R.V.I.S."
)
prompt = prompt.replace("{user_name}", WEB_USER_NAME)
prompt = prompt.replace("{workspace_dir}", str(WORKSPACE_DIR))
prompt = prompt.replace("{notes_file}", str(NOTES_FILE))
prompt = prompt.replace("{todo_file}", str(TODO_FILE))
# --- DYNAMISCHE PROGRAMME INJIZIEREN ---
installed_apps = get_installed_gui_apps()
apps_prompt_string = "VERFÜGBARE LOKALE DESKTOP-PROGRAMME (Nutze NUR diese Befehe zum Starten!):\n"
for app_name, app_cmd in installed_apps.items():
apps_prompt_string += f"- {app_name}: Befehl lautet '{app_cmd}'\n"
# Wir hängen die Liste einfach an den Prompt an oder ersetzen einen Platzhalter
if "{installed_apps}" in prompt:
prompt = prompt.replace("{installed_apps}", apps_prompt_string)
else:
prompt += "\n\n" + apps_prompt_string
# ---------------------------------------
conn = get_db()
nodes = conn.execute('SELECT * FROM nodes').fetchall()
conn.close()
node_info = ""
for n in nodes:
node_info += f"- Name: {n['name']}, IP: {n['ip']}, User: {n['user']}\n"
return prompt.replace("{node_info}", node_info)
# ====================================================
# KI KOMMUNIKATION
# ====================================================
async def get_ai_response(user_msg, system_prompt, history_list):
try:
if AI_PROVIDER in ["openai", "ollama", "nvidia", "groq"]:
# ========================================
# PAYLOAD WASCHEN (Für strikte APIs wie Groq)
# ========================================
clean_history = [
{
"role": msg["role"],
"content": msg["content"]
}
for msg in history_list
]
messages = [
{
"role": "system",
"content": system_prompt
}
] + clean_history
if AI_PROVIDER == "ollama":
url = (
OLLAMA_BASE_URL
if OLLAMA_BASE_URL.endswith('/v1')
else OLLAMA_BASE_URL.rstrip('/') + '/v1'
)
key = "ollama"
model_to_use = OLLAMA_MODEL
elif AI_PROVIDER == "nvidia":
url = "https://integrate.api.nvidia.com/v1"
key = NVIDIA_API_KEY
model_to_use = NVIDIA_MODEL
elif AI_PROVIDER == "groq":
url = "https://api.groq.com/openai/v1"
key = GROQ_API_KEY
model_to_use = GROQ_MODEL
else:
url = None
key = OPENAI_API_KEY
model_to_use = OPENAI_MODEL
client = openai.AsyncOpenAI(
base_url=url,
api_key=key
)
response = await client.chat.completions.create(
model=model_to_use,
messages=messages
)
return response.choices[0].message.content
elif AI_PROVIDER == "google":
client = genai.Client(api_key=GOOGLE_API_KEY)
google_history = [
types.Content(
role="user" if msg["role"] == "user" else "model",
parts=[
types.Part.from_text(
text=msg["content"]
)
]
)
for msg in history_list[:-1]
]
chat = client.chats.create(
model=GOOGLE_MODEL,
config=types.GenerateContentConfig(
system_instruction=system_prompt
),
history=google_history
)
return chat.send_message(user_msg).text
except Exception as e:
return f"Fehler bei der KI-Anfrage: {e}"
# ====================================================
# BEFEHLSAUSFÜHRUNG
# ====================================================
async def run_task(target, cmd):
print(
f"\n{SYSTEM_COLOR}"
f"⚙️ STARTE TASK AUF [{target}]"
f"{RESET}"
)
print(
f"{INFO_COLOR}"
f"➡️ {cmd}"
f"{RESET}\n"
)
try:
# ========================================
# GUI APPS ERKENNEN
# ========================================
# Holt sich alle bekannten System-GUI-Befehle dynamisch
gui_apps = list(get_installed_gui_apps().values())
first_word = cmd.strip().split()[0]
is_gui_app = (
first_word in gui_apps
or cmd.strip().endswith("&")
)
# ========================================
# GUI APPS DETACHED STARTEN
# ========================================
if is_gui_app:
detached_cmd = (
f"nohup {cmd.replace('&', '').strip()} "
f">/dev/null 2>&1 &"
)
print(
f"{SYSTEM_COLOR}"
f"🖥️ GUI-APP erkannt → Detached Mode"
f"{RESET}"
)
if target.lower() in ["localhost", "127.0.0.1"]:
proc = await asyncio.create_subprocess_shell(
detached_cmd
)
else:
conn = get_db()
n = conn.execute(
'SELECT * FROM nodes WHERE ip=? OR name=?',
(target, target)
).fetchone()
conn.close()
if not n:
err = (
f"Node '{target}' "
f"nicht gefunden."
)
print(
f"{ERROR_COLOR}{err}{RESET}"
)
return err
ssh_cmd = (
f"ssh "
f"-o StrictHostKeyChecking=no "
f"-o LogLevel=ERROR "
f"{n['user']}@{n['ip']} "
f"'{detached_cmd}'"
)
proc = await asyncio.create_subprocess_shell(
ssh_cmd
)
await proc.wait()
print(
f"{JARVIS_COLOR}"
f"✅ GUI-Programm gestartet"
f"{RESET}\n"
)
return "GUI application started."
# ========================================
# NORMALE COMMANDS
# ========================================
else:
if target.lower() in ["localhost", "127.0.0.1"]:
final_cmd = cmd
else:
conn = get_db()
n = conn.execute(
'SELECT * FROM nodes WHERE ip=? OR name=?',
(target, target)
).fetchone()
conn.close()
if not n:
err = (
f"Node '{target}' "
f"nicht gefunden."
)
print(
f"{ERROR_COLOR}{err}{RESET}"
)
return err
final_cmd = (
f"ssh "
f"-o StrictHostKeyChecking=no "
f"-o LogLevel=ERROR "
f"{n['user']}@{n['ip']} "
f"'{cmd}'"
)
proc = await asyncio.create_subprocess_shell(
final_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
)
collected_output = []
# ========================================
# KRISENFESTES AUSLESEN MIT TIMEOUT
# ========================================
try:
while True:
# Warte maximal 2 Sekunden auf die nächste Zeile
line = await asyncio.wait_for(proc.stdout.readline(), timeout=2.0)
if not line:
break
decoded = line.decode("utf-8", errors="ignore").rstrip()
collected_output.append(decoded)
print(f"{OUTPUT_COLOR}│ {decoded}{RESET}")
except asyncio.TimeoutError:
# Falls das Tool die Pipe offen hält, lesen wir einfach nicht weiter
print(f"{SYSTEM_COLOR}⏳ Ausgabe-Stream stagniert. Erzwinge Prozess-Check...{RESET}")
# Maximal 2 Sekunden auf das offizielle Ende des Prozesses warten
try:
await asyncio.wait_for(proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
print(f"{ERROR_COLOR}⚠️ Prozess reagiert nicht. Setze Ablauf trotzdem fort.{RESET}")
# Optional: proc.terminate() falls du ihn hart killen willst
print()
except Exception as e:
err = f"❌ Fehler: {e}"
print(
f"{ERROR_COLOR}"
f"{err}"
f"{RESET}\n"
)
return err
# ====================================================
# FILE LOGGING
# ====================================================
def log_to_file(role, content):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(CHAT_LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{now}] {role.upper()}:\n{content}\n{'-'*60}\n")
except Exception as e:
print(f"{ERROR_COLOR}⚠️ Konnte nicht ins Log schreiben: {e}{RESET}")
# ====================================================
# USER INPUT
# ====================================================
async def listen_to_user():
return await asyncio.to_thread(
input,
f"\n{USER_COLOR}👤 Du:{RESET} "
)
# ====================================================
# JARVIS OUTPUT
# ====================================================
async def speak_to_user(text):
print(f"\n{JARVIS_COLOR}🤖 J.A.R.V.I.S.{RESET}")
print(f"{JARVIS_COLOR}{'-'*60}{RESET}")
print(text)
print(f"{JARVIS_COLOR}{'-'*60}{RESET}\n")
"""Generiert eine hochauflösende KI-Stimme via Edge-TTS und spielt sie ab."""
if not text.strip():
return
# Definition der Stimme (Killian und Conrad sind hervorragende deutsche Männerstimmen)
VOICE = "de-DE-KillianNeural"
OUTPUT_FILE = "/tmp/jarvis_response.mp3"
LOCK_FILE = "/tmp/.jarvis_speaking"
try:
# 1. Erstelle die Lock-Datei, damit das Mikrofon im Wakeword-Skript stummschaltet
with open(LOCK_FILE, "w") as f:
f.write("1")
print(f"🔊 J.A.R.V.I.S. spricht: {text}")
# 2. Audio aus der Cloud abrufen (Jetzt sauber mit direktem await!)
communicate = edge_tts.Communicate(text, VOICE)
await communicate.save(OUTPUT_FILE)
# 3. Audio ressourcenschonend & asynchron abspielen
proc = await asyncio.create_subprocess_exec(
"mpv", "--no-video", OUTPUT_FILE,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
# Warten, bis mpv fertig gesprochen hat
await proc.wait()
except Exception as e:
print(f"❌ Fehler bei der Sprachausgabe: {e}")
finally:
# 4. Lock-Datei IMMER löschen, damit J.A.R.V.I.S. wieder zuhört
if os.path.exists(LOCK_FILE):
os.remove(LOCK_FILE)
# ====================================================
# MAIN LOOP
# ====================================================
async def main_chat_loop():
print(f"{INFO_COLOR}")
print("====================================================")
print("🤖 J.A.R.V.I.S. Terminal Interface geladen")
print(f"🧠 Provider: {AI_PROVIDER.upper()}")
print("⌨️ Tippe 'exit', um zu beenden")
print("====================================================")
print(RESET)
chat_history = []
while True:
user_msg = await listen_to_user()
if user_msg.lower().strip() in ['exit', 'quit']:
print(
f"{SYSTEM_COLOR}"
f"\nJ.A.R.V.I.S. geht offline."
f"{RESET}"
)
break
if not user_msg.strip():
continue
now = datetime.now().strftime("%d.%m.%Y %H:%M")
chat_history.append({
"role": "user",
"content": user_msg,
"timestamp": now
})
# LOG: User Eingabe hier schreiben!
log_to_file("Du", user_msg)
print(
f"{SYSTEM_COLOR}"
f"🧠 J.A.R.V.I.S. denkt nach..."
f"{RESET}",
end="\r"
)
system_prompt = get_system_prompt()
ai_response = await get_ai_response(
user_msg,
system_prompt,
chat_history
)
# ============================================
# EXECUTE TAGS SUCHEN
# Unterstützt:
# cmd
# cmd
# ============================================
commands = []
# Toleranter Regex: Erlaubt Leerzeichen vor dem '>'
execute_matches = re.finditer(
r']*?(?:target="(.*?)")?[^>]*>(.*?)',
ai_response,
re.I | re.S
)
for match in execute_matches:
target = match.group(1)
cmd = match.group(2)
if not target:
target = "localhost"
# Markdown-Backticks bereinigen, falls die KI sie in den Tag mogelt
cmd = cmd.strip()
cmd = re.sub(r'^```[a-zA-Z]*\n?', '', cmd)
cmd = re.sub(r'\n?```$', '', cmd)
cmd = cmd.strip()
commands.append((target.strip(), cmd))
# Die Tags für die Sprach-/Textausgabe sauber entfernen
clean_msg = re.sub(
r']*?>.*?',
'',
ai_response,
flags=re.I | re.S
).strip()
# ============================================
# JARVIS TEXT
# ============================================
if clean_msg:
await speak_to_user(clean_msg)
chat_history.append({
"role": "assistant",
"content": clean_msg,
"timestamp": now
})
# LOG: Jarvis Antwort hier schreiben!
log_to_file("J.A.R.V.I.S.", clean_msg)
# ============================================
# COMMANDS AUSFÜHREN
# ============================================
if commands:
for target, cmd in commands:
target = target.strip()
cmd = cmd.strip()
action_msg = (
f"⚙️ Ich führe jetzt folgenden Befehl "
f"auf [{target}] aus:\n\n"
f"{cmd}"
)
# ÄNDERUNG: Auch hier nur im Terminal anzeigen, NICHT vorlesen!
print(f"\n{SYSTEM_COLOR}{action_msg}{RESET}\n")
log_to_file("SYSTEM", action_msg)
# ========================================
# COMMAND AUSFÜHREN
# ========================================
output = await run_task(
target,
cmd
)
# ========================================
# OUTPUT IM CHAT SICHTBAR MACHEN
# ========================================
if output:
output_msg = (
f"💻 Ergebnis der Ausführung "
f"auf [{target}]:\n\n"
f"{output}"
)
else:
output_msg = (
f"✅ Befehl auf [{target}] "
f"erfolgreich abgeschlossen."
)
# Das hier wird weiterhin laut vorgelesen!
await speak_to_user(output_msg)
sys_now = datetime.now().strftime("%d.%m.%Y %H:%M")
chat_history.append({
"role": "assistant",
"content": output_msg,
"timestamp": sys_now
})
log_to_file("SYSTEM", output_msg)
# ============================================
# HISTORY LIMIT
# ============================================
if len(chat_history) > 20:
chat_history = chat_history[-20:]
# ====================================================
# EINZEL-BEFEHL MODUS (Für das Sprachskript)
# ====================================================
async def run_single_command(command_text):
"""Verarbeitet einen einzelnen Befehl von außen und beendet sich wieder."""
init_db()
system_prompt = get_system_prompt()
# Wir tun so, als käme die Eingabe aus dem Chat-History-Verlauf
now = datetime.now().strftime("%d.%m.%Y %H:%M")
chat_history = [{
"role": "user",
"content": command_text,
"timestamp": now
}]
log_to_file("Voice-Input", command_text)
ai_response = await get_ai_response(
command_text,
system_prompt,
chat_history
)
if ai_response is None:
return
# EXECUTE-Tags suchen und ausführen
commands = []
execute_matches = re.finditer(
r']*?(?:target="(.*?)")?[^>]*>(.*?)',
ai_response,
re.I | re.S
)
for match in execute_matches:
target = match.group(1) or "localhost"
cmd = match.group(2).strip()
cmd = re.sub(r'^```[a-zA-Z]*\n?', '', cmd)
cmd = re.sub(r'\n?```$', '', cmd)
commands.append((target.strip(), cmd.strip()))
clean_msg = re.sub(r']*?>.*?', '', ai_response, flags=re.I | re.S).strip()
if clean_msg:
await speak_to_user(clean_msg)
log_to_file("J.A.R.V.I.S.", clean_msg)
if commands:
for target, cmd in commands:
action_msg = f"⚙️ Führe Sprachbefehl auf [{target}] aus:\n{cmd}"
# ÄNDERUNG: Nur im Terminal anzeigen, NICHT vorlesen!
print(f"\n{SYSTEM_COLOR}{action_msg}{RESET}\n")
log_to_file("SYSTEM", action_msg)
# Befehl im Hintergrund ausführen
await run_task(target, cmd)
# ====================================================
# START
# ====================================================
if __name__ == "__main__":
try:
# Wenn Argumente übergeben wurden (z.B. python3 jarvis.py --voice-cmd "...")
if len(sys.argv) > 2 and sys.argv[1] == "--voice-cmd":
command_text = sys.argv[2]
asyncio.run(run_single_command(command_text))
else:
# Normaler Terminal-Modus
asyncio.run(main_chat_loop())
except KeyboardInterrupt:
print(f"\n{ERROR_COLOR}⛔ J.A.R.V.I.S. hart beendet.{RESET}")