import requests
import time
from typing import Callable, Dict, List
# Set variables
API_KEY = "api_key" # change_me
SCENARIO_IDS = [1514, 1515] # change_me
AGENT_ID = 26 # change_me
def call(number: str) -> None:
'''
Outbound call function to be implemented
Example: number = "+14154133900"
'''
print(f"📞 Initiating outbound call to {number}...") # Replace with actual call logic
headers = {
'X-VOCERA-API-KEY': API_KEY,
'Content-Type': 'application/json'
}
base_url = "https://api.cekura.ai/test_framework"
def check_agent_inbound(agent_id: int) -> bool:
url = f'{base_url}/v1/aiagents-external/{agent_id}/'
data = {
'agent_id': agent_id
}
response = requests.get(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()["inbound"]
elif response.status_code == 404:
print(f"❌ Agent {agent_id} not found")
exit(0)
else:
print(f"❌ Error checking agent inbound status: {response.status_code}")
exit(0)
def run_evaluators(scenarios: List[int], agent_id: int, freq: int = 1) -> Dict:
url = f'{base_url}/v1/scenarios-external/run_scenarios/'
data = {
'scenarios': scenarios,
'freq': freq,
'agent_id': agent_id
}
response = requests.post(url, headers=headers, json=data)
return response.json()
status_message = {
"completed": "✅ Run completed successfully",
"failed": "❌ Run failed during execution",
"pending": "⏳ Run is waiting to be executed",
"in_progress": "🔄 Run is in progress",
"evaluating": "🔍 Run is being evaluated",
"in_queue": "📋 Run is waiting in queue",
"timeout": "⏱️ Run timed out after exceeding time limit"
}
def monitor_runs(response_json: Dict, func: Callable, call_func: bool = False) -> None:
run_ids = [str(run["id"]) for run in response_json.get("runs")]
while run_ids:
url = f'{base_url}/v1/runs-external/bulk/?run_ids={",".join(run_ids)}'
runs_list = requests.get(url, headers=headers).json()
for run in runs_list:
phone_number = run.get("outbound_number")
if run["status"] == "pending" and call_func:
func(phone_number) # Call function immediately
run_ids.remove(str(run["id"]))
if not call_func:
print(f"{status_message.get(run['status'])} - Run ID: {run['id']}, Phone: {phone_number}")
if run["status"] in ["completed", "failed", "timeout"]:
run_ids.remove(str(run["id"]))
if not run_ids:
break
time.sleep(3) # Poll every 3 seconds
if check_agent_inbound(AGENT_ID):
print(f"⚠️ Please change Agent {AGENT_ID} Inbound to False before running this script")
exit(0)
response_json = run_evaluators(SCENARIO_IDS, AGENT_ID)
monitor_runs(response_json, func=call, call_func=True)
monitor_runs(response_json, func=call, call_func=False)