Odstranit robovojtik.py

This commit is contained in:
sinuhet 2025-03-20 01:31:27 +01:00
parent c6be2b2664
commit 9003d676ac

View File

@ -1,411 +0,0 @@
#!/usr/bin/env python3
"""
Robovojtík Linuxový shell asistent s vylepšeným interaktivním rozhraním
Funkce:
Rozhraní s barevným oddělením:
- Levá strana je rozdělena na hlavičku (s trvalými instrukcemi a spinnerem) a chat (historie).
- Pravá strana zobrazuje výstup příkazů.
- Mezi levým a pravým panelem je vertikální oddělovač.
- Dolní část je vstupní oblast (o dva řádky).
Spinner se zobrazuje v hlavičce, uprostřed, vždy když čekáme na odpověď asistenta.
Do chatu se zaznamenává i uživatelské potvrzení.
Logování (při spuštění s --log) běží asynchronně a loguje vše na úrovni DEBUG do souboru "robovojtik.log".
"""
import openai
import subprocess
import curses
import configparser
import json
import sys
import logging
import threading
import time
import argparse
import queue
import logging.handlers
# === 1. Načtení konfigurace z config.ini ===
config = configparser.ConfigParser()
config.read("config.ini")
openai.api_key = config["OpenAI"]["api_key"]
ASSISTANT_ID = config["OpenAI"]["assistant_id"]
# Globální proměnné
automode = False # Automatický režim
log_enabled = False # Zapnutí logování přes --log
thread_id = None # ID vlákna konverzace s OpenAI
# Asynchroní logování
logger = logging.getLogger("robovojtik")
logger.setLevel(logging.DEBUG)
log_queue = None
listener = None
# === 2. Definice funkcí pro volání OpenAI API ===
FUNCTIONS = [
{
"name": "execute_shell_command",
"description": "Spustí shellový příkaz a vrátí jeho výstup.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shellový příkaz k vykonání."
}
},
"required": ["command"],
"additionalProperties": False
}
},
{
"name": "get_system_load",
"description": "Získá aktuální zátěž systému (load average).",
"parameters": {
"type": "object",
"properties": {},
"required": [],
"additionalProperties": False
}
},
{
"name": "create_script",
"description": "Vytvoří skript s daným obsahem v souboru a nastaví ho na spustitelný.",
"parameters": {
"type": "object",
"properties": {
"file_name": {
"type": "string",
"description": "Název souboru (nebo cesta), do kterého se skript uloží."
},
"content": {
"type": "string",
"description": "Obsah skriptu, který se má uložit."
}
},
"required": ["file_name", "content"],
"additionalProperties": False
}
}
]
# === 3. (System prompt není načítán ze zdrojového kódu) ===
# === 4. Pomocné funkce pro komunikaci s OpenAI API ===
def vytvor_nove_vlakno():
global thread_id
thread = openai.beta.threads.create()
thread_id = thread.id
logger.debug(f"Vytvořeno nové vlákno: {thread_id}")
return thread_id
def clean_command(command):
return command.replace("`", "").strip()
def volani_asistenta(prompt, spinner_func=None):
result_container = {}
def worker():
odpoved = posli_dotaz_do_assistenta(prompt)
result_container['answer'] = odpoved
thread = threading.Thread(target=worker)
thread.start()
idx = 0
spinner = ["|", "/", "-", "\\"]
while thread.is_alive():
if spinner_func:
spinner_func(spinner[idx % len(spinner)])
time.sleep(0.2)
idx += 1
thread.join()
return result_container.get('answer', "")
def posli_dotaz_do_assistenta(prompt):
global thread_id
if thread_id is None:
vytvor_nove_vlakno()
logger.debug(f"Odesílám dotaz: {prompt}")
openai.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=prompt
)
run = openai.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=ASSISTANT_ID
)
while True:
run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages = openai.beta.threads.messages.list(thread_id=thread_id)
answer = messages.data[0].content[0].text.value
logger.debug(f"Asistent odpověděl: {answer}")
return answer
def posli_prikaz_do_assistenta(command):
command = clean_command(command)
global thread_id
if thread_id is None:
vytvor_nove_vlakno()
logger.debug(f"Odesílám příkaz k vykonání: {command}")
run = openai.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=ASSISTANT_ID,
instructions=f"Prosím, spusť tento příkaz: {command}"
)
while True:
run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "requires_action":
tool_calls = run_status.required_action.submit_tool_outputs.tool_calls
tool_outputs = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
if tool_name == "execute_shell_command":
vysledek = spust_prikaz(arguments["command"])
elif tool_name == "create_script":
vysledek = vytvor_skript(arguments["file_name"], arguments["content"])
else:
vysledek = "Neznámá funkce."
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": vysledek
})
openai.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run.id,
tool_outputs=tool_outputs
)
elif run_status.status == "completed":
break
time.sleep(1)
messages = openai.beta.threads.messages.list(thread_id=thread_id)
answer = messages.data[0].content[0].text.value
logger.debug(f"Výsledek spuštěného příkazu: {answer}")
return answer
def is_command_response(response):
return response.strip().lower().startswith("navrhovaný příkaz:")
def get_first_command_proposal(response):
"""Vrátí první řádek, který začíná 'Navrhovaný příkaz:'; jinak None."""
lines = response.splitlines()
for line in lines:
if line.strip().lower().startswith("navrhovaný příkaz:"):
return line
return None
# === 5. Funkce pro spouštění příkazů a report ===
def spust_prikaz(command):
try:
logger.debug(f"Lokálně spouštím příkaz: {command}")
output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
return output.strip()
except subprocess.CalledProcessError as e:
return f"Chyba při vykonávání příkazu:\n{e.output}"
def vytvor_skript(nazev, obsah):
try:
with open(nazev, "w") as f:
f.write(obsah)
subprocess.call(f"chmod +x {nazev}", shell=True)
logger.debug(f"Skript {nazev} vytvořen a nastaven jako spustitelný.")
return f"Skript {nazev} byl úspěšně vytvořen."
except Exception as e:
return f"Nastala chyba při tvorbě skriptu: {str(e)}"
def run_command_locally_and_report(command):
command = clean_command(command)
output = spust_prikaz(command)
if not output.startswith("Chyba při vykonávání příkazu:"):
report_text = f"Výstup příkazu (příkaz proběhl úspěšně):\n{output}"
else:
report_text = f"Výstup příkazu:\n{output}"
answer = posli_dotaz_do_assistenta(report_text)
return output, answer
# === 6. Vylepšené interaktivní rozhraní s curses ===
def main_curses(stdscr):
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Hlavička
curses.init_pair(2, curses.COLOR_WHITE, -1) # Chat
curses.init_pair(3, curses.COLOR_GREEN, -1) # Výstup
curses.init_pair(4, curses.COLOR_YELLOW, -1) # Vstup
curses.init_pair(5, curses.COLOR_CYAN, -1) # Spinner
curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_WHITE) # Oddělovač
curses.curs_set(1)
stdscr.nodelay(False)
stdscr.clear()
height, width = stdscr.getmaxyx()
header_height = 3
prompt_height = 2
left_width = width // 2
right_width = width - left_width - 1 # oddělovač má 1 sloupec
chat_height = height - header_height - prompt_height
header_win = curses.newwin(header_height, left_width, 0, 0)
chat_win = curses.newwin(chat_height, left_width, header_height, 0)
prompt_win = curses.newwin(prompt_height, width, height - prompt_height, 0)
output_win = curses.newwin(height - prompt_height, right_width, 0, left_width + 1)
divider_win = curses.newwin(height - prompt_height, 1, 0, left_width)
header_win.bkgd(' ', curses.color_pair(1))
chat_win.bkgd(' ', curses.color_pair(2))
prompt_win.bkgd(' ', curses.color_pair(4))
output_win.bkgd(' ', curses.color_pair(3))
divider_win.bkgd(' ', curses.color_pair(6))
d_height, _ = divider_win.getmaxyx()
for y in range(d_height):
try:
divider_win.addch(y, 0, curses.ACS_VLINE)
except curses.error:
pass
divider_win.refresh()
header_text = [
"Vítejte v Robovojtikovi!",
"Zadejte dotaz, příkaz (prefix 'cmd:'), 'automat' nebo 'skript: nazev; obsah'.",
"Pro ukončení zadejte 'vypni' nebo 'exit'."
]
for idx, line in enumerate(header_text):
header_win.addstr(idx, 1, line)
header_win.refresh()
chat_win.scrollok(True)
output_win.scrollok(True)
prompt_win.scrollok(True)
output_win.box()
output_win.refresh()
# Funkce pro spinner v hlavičce (uprostřed)
def spinner_func(ch):
mid_x = left_width // 2 - 5
header_win.addstr(1, mid_x, f"Čekám... {ch}", curses.color_pair(5) | curses.A_BOLD)
header_win.refresh()
chat_history = []
def add_to_chat(text):
chat_history.append(text)
chat_win.addstr(text + "\n")
chat_win.refresh()
add_to_chat("Historie chatu:")
while True:
prompt_win.erase()
prompt_win.addstr(">> ", curses.A_BOLD)
prompt_win.refresh()
curses.echo()
try:
user_input = prompt_win.getstr().decode("utf-8").strip()
except:
continue
curses.noecho()
if not user_input:
continue
add_to_chat("Ty: " + user_input)
if user_input.lower() in ["vypni", "exit"]:
add_to_chat("Ukončuji Robovojtíka...")
time.sleep(1)
break
if user_input.lower() == "automat":
global automode
automode = not automode
stav = "zapnut" if automode else "vypnut"
add_to_chat(f"Automód byl nyní {stav}.")
continue
if user_input.lower().startswith("skript:"):
try:
_, rest = user_input.split("skript:", 1)
nazev, obsah = rest.split(";", 1)
nazev = nazev.strip()
obsah = obsah.strip()
vysledek = vytvor_skript(nazev, obsah)
add_to_chat(vysledek)
except Exception as e:
add_to_chat(f"Chyba při vytváření skriptu: {str(e)}")
continue
if user_input.startswith("cmd:"):
command = user_input[4:].strip()
add_to_chat(f"Rozpoznán přímý příkaz: {command}")
if not automode:
prompt_win.erase()
prompt_win.addstr("Spustit tento příkaz? (y/n): ", curses.A_BOLD)
prompt_win.refresh()
potvrzeni = prompt_win.getstr().decode("utf-8").strip().lower()
add_to_chat("Ty: " + potvrzeni)
if potvrzeni != "y":
add_to_chat("Příkaz nebyl spuštěn.")
continue
output, response = run_command_locally_and_report(command)
output_win.erase()
output_win.addstr(1, 1, output)
output_win.box()
output_win.refresh()
add_to_chat("Robovojtík odpovídá:\n" + response)
continue
assistant_response = volani_asistenta(user_input, spinner_func=spinner_func)
add_to_chat("Robovojtík odpovídá:\n" + assistant_response)
proposal_line = get_first_command_proposal(assistant_response)
if proposal_line:
navrhovany_prikaz = proposal_line[len("Navrhovaný příkaz:"):].strip()
add_to_chat(f"Navrhovaný příkaz: {navrhovany_prikaz}")
if not automode:
prompt_win.erase()
prompt_win.addstr("Spustit tento příkaz? (y/n): ", curses.A_BOLD)
prompt_win.refresh()
potvrzeni = prompt_win.getstr().decode("utf-8").strip().lower()
add_to_chat("Ty: " + potvrzeni)
if potvrzeni != "y":
add_to_chat("Příkaz nebyl spuštěn.")
continue
output, response = run_command_locally_and_report(navrhovany_prikaz)
output_win.erase()
output_win.addstr(1, 1, output)
output_win.box()
output_win.refresh()
add_to_chat("Robovojtík odpovídá:\n" + response)
prompt_win.erase()
prompt_win.refresh()
def main():
global log_enabled, listener, log_queue
parser = argparse.ArgumentParser(description="Robovojtík interaktivní shell asistent")
parser.add_argument("--log", action="store_true", help="Zapne logování do souboru robovojtik.log")
args = parser.parse_args()
if args.log:
log_enabled = True
log_queue = queue.Queue(-1)
qh = logging.handlers.QueueHandler(log_queue)
logger.addHandler(qh)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("robovojtik.log")
fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
listener = logging.handlers.QueueListener(log_queue, fh)
listener.start()
logger.debug("Logování zapnuto.")
curses.wrapper(main_curses)
if listener:
listener.stop()
if __name__ == "__main__":
main()