Robovojtik/api_interface.py
2025-03-20 01:32:29 +01:00

138 lines
4.8 KiB
Python

"""
Module: api_interface.py
--------------------------
Obsahuje funkce pro komunikaci s OpenAI API a správu konverzačního vlákna.
"""
import openai
import json
import time
import threading
import logging
from configparser import ConfigParser
logger = logging.getLogger("robovojtik.api_interface")
config = ConfigParser()
config.read("config.ini")
try:
openai.api_key = config["OpenAI"]["api_key"]
except KeyError:
raise ValueError("API key not found in config.ini. Please set the 'api_key' under [OpenAI] section or set OPENAI_API_KEY environment variable.")
ASSISTANT_ID = config["OpenAI"].get("assistant_id", None)
if not ASSISTANT_ID:
raise ValueError("assistant_id not found in config.ini. Please set the 'assistant_id' under [OpenAI] section.")
# Globální proměnná pro uchování ID konverzačního vlákna
thread_id = None
def vytvor_nove_vlakno():
"""
Vytvoří nové vlákno konverzace s asistentem pomocí OpenAI API.
"""
global thread_id
thread = openai.beta.threads.create()
thread_id = thread.id
logger.debug(f"Vytvořeno nové vlákno: {thread_id}")
return thread_id
def posli_dotaz_do_assistenta(prompt):
"""
Odesílá dotaz v přirozeném jazyce do asistenta a vrací jeho odpověď.
"""
global thread_id
if thread_id is None:
vytvor_nove_vlakno()
logger.debug(f"Odesílám dotaz: {prompt}")
openai.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=prompt
)
run = openai.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=ASSISTANT_ID
)
while True:
run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "completed":
break
time.sleep(1)
messages = openai.beta.threads.messages.list(thread_id=thread_id)
answer = messages.data[0].content[0].text.value
logger.debug(f"Asistent odpověděl: {answer}")
return answer
def posli_prikaz_do_assistenta(command):
"""
Odesílá schválený příkaz k vykonání do asistenta a vrací jeho odpověď.
Před odesláním se příkaz vyčistí.
"""
from shell_functions import spust_prikaz, vytvor_skript # Importujeme zde, abychom předešli cyklickým závislostem.
global thread_id
if thread_id is None:
vytvor_nove_vlakno()
logger.debug(f"Odesílám příkaz k vykonání: {command}")
run = openai.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=ASSISTANT_ID,
instructions=f"Prosím, spusť tento příkaz: {command}"
)
while True:
run_status = openai.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if run_status.status == "requires_action":
tool_calls = run_status.required_action.submit_tool_outputs.tool_calls
tool_outputs = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
if tool_name == "execute_shell_command":
vysledek = spust_prikaz(arguments["command"])
elif tool_name == "create_script":
vysledek = vytvor_skript(arguments["file_name"], arguments["content"])
else:
vysledek = "Neznámá funkce."
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": vysledek
})
openai.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run.id,
tool_outputs=tool_outputs
)
elif run_status.status == "completed":
break
time.sleep(1)
messages = openai.beta.threads.messages.list(thread_id=thread_id)
answer = messages.data[0].content[0].text.value
logger.debug(f"Výsledek spuštěného příkazu: {answer}")
return answer
def volani_asistenta(prompt, spinner_func=None):
"""
Spustí dotaz do asistenta v samostatném vlákně a během čekání volá spinner_func.
Vrací odpověď asistenta. Pokud dojde k chybě, vrací prázdný řetězec.
"""
result_container = {}
def worker():
try:
odpoved = posli_dotaz_do_assistenta(prompt)
result_container['answer'] = odpoved
except Exception as ex:
result_container['answer'] = f"Chyba při volání asistenta: {str(ex)}"
logger.exception("Chyba v worker funkci volani_asistenta.")
thread = threading.Thread(target=worker)
thread.start()
idx = 0
spinner = ["|", "/", "-", "\\"]
while thread.is_alive():
if spinner_func:
spinner_func(spinner[idx % len(spinner)])
time.sleep(0.2)
idx += 1
thread.join()
return result_container.get('answer', "")