#!/usr/bin/env python3 """ Robovojtík – Linuxový shell asistent s vylepšeným interaktivním rozhraním Funkce: • Rozhraní s barevným oddělením: - Levá strana je rozdělena na hlavičku (s trvalými instrukcemi a spinnerem) a chat (historie). - Pravá strana zobrazuje výstup příkazů. - Mezi levým a pravým panelem je vertikální oddělovač. - Dolní část je vstupní oblast (o dva řádky). • Spinner se zobrazuje v hlavičce, uprostřed, vždy když čekáme na odpověď asistenta. • Do chatu se zaznamenává i uživatelské potvrzení. • Logování (při spuštění s --log) běží asynchronně a loguje vše na úrovni DEBUG do souboru "robovojtik.log". """ import openai import subprocess import curses import configparser import json import sys import logging import threading import time import argparse import queue import logging.handlers # === 1. Načtení konfigurace z config.ini === config = configparser.ConfigParser() config.read("config.ini") openai.api_key = config["OpenAI"]["api_key"] ASSISTANT_ID = config["OpenAI"]["assistant_id"] # Globální proměnné automode = False # Automatický režim log_enabled = False # Zapnutí logování přes --log thread_id = None # ID vlákna konverzace s OpenAI # Asynchroní logování logger = logging.getLogger("robovojtik") logger.setLevel(logging.DEBUG) log_queue = None listener = None # === 2. Definice funkcí pro volání OpenAI API === FUNCTIONS = [ { "name": "execute_shell_command", "description": "Spustí shellový příkaz a vrátí jeho výstup.", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "Shellový příkaz k vykonání." } }, "required": ["command"], "additionalProperties": False } }, { "name": "get_system_load", "description": "Získá aktuální zátěž systému (load average).", "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False } }, { "name": "create_script", "description": "Vytvoří skript s daným obsahem v souboru a nastaví ho na spustitelný.", "parameters": { "type": "object", "properties": { "file_name": { "type": "string", "description": "Název souboru (nebo cesta), do kterého se skript uloží." }, "content": { "type": "string", "description": "Obsah skriptu, který se má uložit." } }, "required": ["file_name", "content"], "additionalProperties": False } } ] # === 3. (System prompt není načítán ze zdrojového kódu) === # === 4. Pomocné funkce pro komunikaci s OpenAI API === def vytvor_nove_vlakno(): global thread_id thread = openai.beta.threads.create() thread_id = thread.id logger.debug(f"Vytvořeno nové vlákno: {thread_id}") return thread_id def clean_command(command): return command.replace("`", "").strip() def volani_asistenta(prompt, spinner_func=None): result_container = {} def worker(): odpoved = posli_dotaz_do_assistenta(prompt) result_container['answer'] = odpoved thread = threading.Thread(target=worker) thread.start() idx = 0 spinner = ["|", "/", "-", "\\"] while thread.is_alive(): if spinner_func: spinner_func(spinner[idx % len(spinner)]) time.sleep(0.2) idx += 1 thread.join() return result_container.get('answer', "") def posli_dotaz_do_assistenta(prompt): global thread_id if thread_id is None: vytvor_nove_vlakno() logger.debug(f"Odesílám dotaz: {prompt}") openai.beta.threads.messages.create( thread_id=thread_id, role="user", content=prompt ) run = openai.beta.threads.runs.create( thread_id=thread_id, assistant_id=ASSISTANT_ID ) while True: run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if run_status.status == "completed": break time.sleep(1) messages = openai.beta.threads.messages.list(thread_id=thread_id) answer = messages.data[0].content[0].text.value logger.debug(f"Asistent odpověděl: {answer}") return answer def posli_prikaz_do_assistenta(command): command = clean_command(command) global thread_id if thread_id is None: vytvor_nove_vlakno() logger.debug(f"Odesílám příkaz k vykonání: {command}") run = openai.beta.threads.runs.create( thread_id=thread_id, assistant_id=ASSISTANT_ID, instructions=f"Prosím, spusť tento příkaz: {command}" ) while True: run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if run_status.status == "requires_action": tool_calls = run_status.required_action.submit_tool_outputs.tool_calls tool_outputs = [] for tool_call in tool_calls: tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) if tool_name == "execute_shell_command": vysledek = spust_prikaz(arguments["command"]) elif tool_name == "create_script": vysledek = vytvor_skript(arguments["file_name"], arguments["content"]) else: vysledek = "Neznámá funkce." tool_outputs.append({ "tool_call_id": tool_call.id, "output": vysledek }) openai.beta.threads.runs.submit_tool_outputs( thread_id=thread_id, run_id=run.id, tool_outputs=tool_outputs ) elif run_status.status == "completed": break time.sleep(1) messages = openai.beta.threads.messages.list(thread_id=thread_id) answer = messages.data[0].content[0].text.value logger.debug(f"Výsledek spuštěného příkazu: {answer}") return answer def is_command_response(response): return response.strip().lower().startswith("navrhovaný příkaz:") def get_first_command_proposal(response): """Vrátí první řádek, který začíná 'Navrhovaný příkaz:'; jinak None.""" lines = response.splitlines() for line in lines: if line.strip().lower().startswith("navrhovaný příkaz:"): return line return None # === 5. Funkce pro spouštění příkazů a report === def spust_prikaz(command): try: logger.debug(f"Lokálně spouštím příkaz: {command}") output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) return output.strip() except subprocess.CalledProcessError as e: return f"Chyba při vykonávání příkazu:\n{e.output}" def vytvor_skript(nazev, obsah): try: with open(nazev, "w") as f: f.write(obsah) subprocess.call(f"chmod +x {nazev}", shell=True) logger.debug(f"Skript {nazev} vytvořen a nastaven jako spustitelný.") return f"Skript {nazev} byl úspěšně vytvořen." except Exception as e: return f"Nastala chyba při tvorbě skriptu: {str(e)}" def run_command_locally_and_report(command): command = clean_command(command) output = spust_prikaz(command) if not output.startswith("Chyba při vykonávání příkazu:"): report_text = f"Výstup příkazu (příkaz proběhl úspěšně):\n{output}" else: report_text = f"Výstup příkazu:\n{output}" answer = posli_dotaz_do_assistenta(report_text) return output, answer # === 6. Vylepšené interaktivní rozhraní s curses === def main_curses(stdscr): curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Hlavička curses.init_pair(2, curses.COLOR_WHITE, -1) # Chat curses.init_pair(3, curses.COLOR_GREEN, -1) # Výstup curses.init_pair(4, curses.COLOR_YELLOW, -1) # Vstup curses.init_pair(5, curses.COLOR_CYAN, -1) # Spinner curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_WHITE) # Oddělovač curses.curs_set(1) stdscr.nodelay(False) stdscr.clear() height, width = stdscr.getmaxyx() header_height = 3 prompt_height = 2 left_width = width // 2 right_width = width - left_width - 1 # oddělovač má 1 sloupec chat_height = height - header_height - prompt_height header_win = curses.newwin(header_height, left_width, 0, 0) chat_win = curses.newwin(chat_height, left_width, header_height, 0) prompt_win = curses.newwin(prompt_height, width, height - prompt_height, 0) output_win = curses.newwin(height - prompt_height, right_width, 0, left_width + 1) divider_win = curses.newwin(height - prompt_height, 1, 0, left_width) header_win.bkgd(' ', curses.color_pair(1)) chat_win.bkgd(' ', curses.color_pair(2)) prompt_win.bkgd(' ', curses.color_pair(4)) output_win.bkgd(' ', curses.color_pair(3)) divider_win.bkgd(' ', curses.color_pair(6)) d_height, _ = divider_win.getmaxyx() for y in range(d_height): try: divider_win.addch(y, 0, curses.ACS_VLINE) except curses.error: pass divider_win.refresh() header_text = [ "Vítejte v Robovojtikovi!", "Zadejte dotaz, příkaz (prefix 'cmd:'), 'automat' nebo 'skript: nazev; obsah'.", "Pro ukončení zadejte 'vypni' nebo 'exit'." ] for idx, line in enumerate(header_text): header_win.addstr(idx, 1, line) header_win.refresh() chat_win.scrollok(True) output_win.scrollok(True) prompt_win.scrollok(True) output_win.box() output_win.refresh() # Funkce pro spinner v hlavičce (uprostřed) def spinner_func(ch): mid_x = left_width // 2 - 5 header_win.addstr(1, mid_x, f"Čekám... {ch}", curses.color_pair(5) | curses.A_BOLD) header_win.refresh() chat_history = [] def add_to_chat(text): chat_history.append(text) chat_win.addstr(text + "\n") chat_win.refresh() add_to_chat("Historie chatu:") while True: prompt_win.erase() prompt_win.addstr(">> ", curses.A_BOLD) prompt_win.refresh() curses.echo() try: user_input = prompt_win.getstr().decode("utf-8").strip() except: continue curses.noecho() if not user_input: continue add_to_chat("Ty: " + user_input) if user_input.lower() in ["vypni", "exit"]: add_to_chat("Ukončuji Robovojtíka...") time.sleep(1) break if user_input.lower() == "automat": global automode automode = not automode stav = "zapnut" if automode else "vypnut" add_to_chat(f"Automód byl nyní {stav}.") continue if user_input.lower().startswith("skript:"): try: _, rest = user_input.split("skript:", 1) nazev, obsah = rest.split(";", 1) nazev = nazev.strip() obsah = obsah.strip() vysledek = vytvor_skript(nazev, obsah) add_to_chat(vysledek) except Exception as e: add_to_chat(f"Chyba při vytváření skriptu: {str(e)}") continue if user_input.startswith("cmd:"): command = user_input[4:].strip() add_to_chat(f"Rozpoznán přímý příkaz: {command}") if not automode: prompt_win.erase() prompt_win.addstr("Spustit tento příkaz? (y/n): ", curses.A_BOLD) prompt_win.refresh() potvrzeni = prompt_win.getstr().decode("utf-8").strip().lower() add_to_chat("Ty: " + potvrzeni) if potvrzeni != "y": add_to_chat("Příkaz nebyl spuštěn.") continue output, response = run_command_locally_and_report(command) output_win.erase() output_win.addstr(1, 1, output) output_win.box() output_win.refresh() add_to_chat("Robovojtík odpovídá:\n" + response) continue assistant_response = volani_asistenta(user_input, spinner_func=spinner_func) add_to_chat("Robovojtík odpovídá:\n" + assistant_response) proposal_line = get_first_command_proposal(assistant_response) if proposal_line: navrhovany_prikaz = proposal_line[len("Navrhovaný příkaz:"):].strip() add_to_chat(f"Navrhovaný příkaz: {navrhovany_prikaz}") if not automode: prompt_win.erase() prompt_win.addstr("Spustit tento příkaz? (y/n): ", curses.A_BOLD) prompt_win.refresh() potvrzeni = prompt_win.getstr().decode("utf-8").strip().lower() add_to_chat("Ty: " + potvrzeni) if potvrzeni != "y": add_to_chat("Příkaz nebyl spuštěn.") continue output, response = run_command_locally_and_report(navrhovany_prikaz) output_win.erase() output_win.addstr(1, 1, output) output_win.box() output_win.refresh() add_to_chat("Robovojtík odpovídá:\n" + response) prompt_win.erase() prompt_win.refresh() def main(): global log_enabled, listener, log_queue parser = argparse.ArgumentParser(description="Robovojtík – interaktivní shell asistent") parser.add_argument("--log", action="store_true", help="Zapne logování do souboru robovojtik.log") args = parser.parse_args() if args.log: log_enabled = True log_queue = queue.Queue(-1) qh = logging.handlers.QueueHandler(log_queue) logger.addHandler(qh) logger.setLevel(logging.DEBUG) fh = logging.FileHandler("robovojtik.log") fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) listener = logging.handlers.QueueListener(log_queue, fh) listener.start() logger.debug("Logování zapnuto.") curses.wrapper(main_curses) if listener: listener.stop() if __name__ == "__main__": main()