Skip to main content
Version: Next

Agentic LLM

Build applications with ASI:One's agentic models that can autonomously call agents from the Agentverse marketplace and handle complex workflows. These models can discover, coordinate, and execute tasks through a network of specialized agents available on Agentverse.


Overview​

ASI:One's agentic models (asi1-agentic, asi1-fast-agentic, asi1-extended-agentic) are designed to automatically discover and coordinate with agents from the Agentverse marketplace to accomplish complex tasks. They handle agent selection, orchestration, and execution planning autonomously by connecting to the vast ecosystem of agents available on Agentverse.

Key Features:

  • Autonomous Agent Discovery: Automatically finds relevant agents from Agentverse marketplace for your tasks
  • Session Persistence: Maintains conversation context across multiple interactions
  • Asynchronous Processing: Handles long-running agent workflows from Agentverse
  • Streaming Support: Real-time response streaming for better UX

Quick Start​

import os, uuid, json, requests, sys, readline
import time

API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
MODEL = "asi1-fast-agentic"
TIMEOUT = 90 # seconds

# In-memory session management
SESSION_MAP: dict[str, str] = {}

def get_session_id(conv_id: str) -> str:
"""Return existing session UUID for this conversation or create a new one."""
sid = SESSION_MAP.get(conv_id)
if sid is None:
sid = str(uuid.uuid4())
SESSION_MAP[conv_id] = sid
return sid

def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
"""Send messages list to asi1-agentic; return assistant reply."""
session_id = get_session_id(conv_id)
print(f"[session] Using session-id: {session_id}")

headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": session_id,
"Content-Type": "application/json",
}

payload = {
"model": MODEL,
"messages": messages,
"stream": stream,
}

if not stream:
resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]

# Streaming implementation
with requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=True) as resp:
resp.raise_for_status()
full_text = ""
for line in resp.iter_lines(decode_unicode=True):
if not line or not line.startswith("data: "):
continue
line = line[len("data: "):]
if line == "[DONE]":
break
try:
chunk = json.loads(line)
choices = chunk.get("choices")
if choices and "content" in choices[0].get("delta", {}):
token = choices[0]["delta"]["content"]
sys.stdout.write(token)
sys.stdout.flush()
full_text += token
except json.JSONDecodeError:
continue
print()
return full_text

# Simple usage example - agent will be called from Agentverse marketplace
conv_id = str(uuid.uuid4())
messages = [{"role": "user", "content": "Help me book a restaurant for dinner tonight"}]
reply = ask(conv_id, messages, stream=True)
print(f"Assistant: {reply}")

Session Management​

Agentic models require session persistence to maintain context across agent interactions with the Agentverse marketplace. Always include the x-session-id header:

import uuid

# Create or retrieve session ID for conversation
def get_session_id(conversation_id: str) -> str:
# In production, store this in Redis or database
session_id = SESSION_MAP.get(conversation_id)
if not session_id:
session_id = str(uuid.uuid4())
SESSION_MAP[conversation_id] = session_id
return session_id

# Include in every request
headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": get_session_id("user_123_chat"),
"Content-Type": "application/json"
}

Asynchronous Agent Processing​

When agents from Agentverse marketplace need time to complete tasks, the model may send a deferred response. Poll for updates:

def poll_for_async_reply(conv_id: str, history: list[dict], *, wait_sec: int = 10, max_attempts: int = 30):
"""Poll the conversation every `wait_sec` seconds for a deferred reply."""
for attempt in range(max_attempts):
time.sleep(wait_sec)
print(f"šŸ”„ polling (attempt {attempt + 1}) …", flush=True)

reply = ask(conv_id, history, stream=False)
if reply and "no new message" not in reply.lower():
return reply
return None

# Usage after getting "I've sent the message" response from Agentverse agent
if "I've sent the message" in assistant_reply:
follow_up = poll_for_async_reply(conv_id, conversation_history)
if follow_up:
print(f"Agentverse agent completed task: {follow_up}")
conversation_history.append({"role": "assistant", "content": follow_up})

Available Agentic Models​

ModelBest ForLatencyContext Window
asi1-agenticGeneral orchestration & prototypingMedium32K tokens
asi1-fast-agenticReal-time agent coordinationUltra-fast24K tokens
asi1-extended-agenticComplex multi-stage workflowsSlower64K tokens

Complete Interactive Example​

import os, uuid, json, requests, sys, time

API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
MODEL = "asi1-fast-agentic"
TIMEOUT = 90

SESSION_MAP: dict[str, str] = {}

def get_session_id(conv_id: str) -> str:
sid = SESSION_MAP.get(conv_id)
if sid is None:
sid = str(uuid.uuid4())
SESSION_MAP[conv_id] = sid
return sid

def _post(payload: dict, session_id: str, stream: bool = False):
headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": session_id,
"Content-Type": "application/json",
}
return requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=stream)

def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
session_id = get_session_id(conv_id)
print(f"[session] Using session-id: {session_id}")

payload = {
"model": MODEL,
"messages": messages,
"stream": stream,
}

if not stream:
resp = _post(payload, session_id)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]

# Streaming
with _post(payload, session_id, stream=True) as resp:
resp.raise_for_status()
full_text = ""
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("data: "):
line = line[len("data: ") :]
if line == "[DONE]":
break
try:
chunk = json.loads(line)
except json.JSONDecodeError:
continue

choices = chunk.get("choices")
if not choices:
continue

delta = choices[0].get("delta", {})
if "content" in delta:
token = delta["content"]
sys.stdout.write(token)
sys.stdout.flush()
full_text += token
print()
return full_text

def poll_for_async_reply(conv_id: str, history: list[dict], *, wait_sec: int = 10, max_attempts: int = 30):
for attempt in range(max_attempts):
time.sleep(wait_sec)
print(f"\nšŸ”„ polling (attempt {attempt + 1}) …", flush=True)
reply = ask(conv_id, history, stream=False)
if reply and "no new message" not in reply.lower():
return reply
return None

# Interactive CLI
if __name__ == "__main__":
conv_id = str(uuid.uuid4())
history: list[dict] = []

print("Agentic LLM demo. Type Ctrl+C to exit.\n")
try:
while True:
user_input = input("you > ").strip()
if not user_input:
continue

history.append({"role": "user", "content": user_input})
reply = ask(conv_id, history, stream=True)
history.append({"role": "assistant", "content": reply})

if "I've sent the message" in reply:
follow = poll_for_async_reply(conv_id, history)
if follow:
print(f"\n[Agentverse agent reply]\n{follow}")
history.append({"role": "assistant", "content": follow})
except KeyboardInterrupt:
print("\nBye!")

Best Practices​

Session Management​

  • Use UUIDs for session IDs to avoid collisions
  • Store session mappings in Redis or database for production
  • Include x-session-id header in every request to maintain context

Error Handling​

  • Implement timeouts for long-running agent tasks
  • Handle network failures with exponential backoff
  • Validate responses before processing agent results

Performance Optimization​

  • Use streaming for better user experience
  • Choose appropriate model based on complexity needs
  • Implement async polling for deferred agent responses

Agent Coordination​

  • Be specific in requests to help agent discovery from Agentverse marketplace
  • Allow time for complex multi-agent workflows involving Agentverse agents
  • Monitor session state to understand Agentverse agent progress