Agentic LLM
Build applications with ASI:One's agentic models that can autonomously call agents from the Agentverse marketplace and handle complex workflows. These models can discover, coordinate, and execute tasks through a network of specialized agents available on Agentverse.
Overviewā
ASI:One's agentic models (asi1-agentic
, asi1-fast-agentic
, asi1-extended-agentic
) are designed to automatically discover and coordinate with agents from the Agentverse marketplace to accomplish complex tasks. They handle agent selection, orchestration, and execution planning autonomously by connecting to the vast ecosystem of agents available on Agentverse.
Key Features:
- Autonomous Agent Discovery: Automatically finds relevant agents from Agentverse marketplace for your tasks
- Session Persistence: Maintains conversation context across multiple interactions
- Asynchronous Processing: Handles long-running agent workflows from Agentverse
- Streaming Support: Real-time response streaming for better UX
Quick Startā
- Python
- cURL
- JavaScript
import os, uuid, json, requests, sys, readline
import time
API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
MODEL = "asi1-fast-agentic"
TIMEOUT = 90 # seconds
# In-memory session management
SESSION_MAP: dict[str, str] = {}
def get_session_id(conv_id: str) -> str:
"""Return existing session UUID for this conversation or create a new one."""
sid = SESSION_MAP.get(conv_id)
if sid is None:
sid = str(uuid.uuid4())
SESSION_MAP[conv_id] = sid
return sid
def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
"""Send messages list to asi1-agentic; return assistant reply."""
session_id = get_session_id(conv_id)
print(f"[session] Using session-id: {session_id}")
headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": session_id,
"Content-Type": "application/json",
}
payload = {
"model": MODEL,
"messages": messages,
"stream": stream,
}
if not stream:
resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
# Streaming implementation
with requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=True) as resp:
resp.raise_for_status()
full_text = ""
for line in resp.iter_lines(decode_unicode=True):
if not line or not line.startswith("data: "):
continue
line = line[len("data: "):]
if line == "[DONE]":
break
try:
chunk = json.loads(line)
choices = chunk.get("choices")
if choices and "content" in choices[0].get("delta", {}):
token = choices[0]["delta"]["content"]
sys.stdout.write(token)
sys.stdout.flush()
full_text += token
except json.JSONDecodeError:
continue
print()
return full_text
# Simple usage example - agent will be called from Agentverse marketplace
conv_id = str(uuid.uuid4())
messages = [{"role": "user", "content": "Help me book a restaurant for dinner tonight"}]
reply = ask(conv_id, messages, stream=True)
print(f"Assistant: {reply}")
# Generate a session ID
SESSION_ID=$(uuidgen)
curl -X POST https://api.asi1.ai/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $ASI_ONE_API_KEY" \
-H "x-session-id: $SESSION_ID" \
-d '{
"model": "asi1-fast-agentic",
"messages": [
{"role": "user", "content": "Help me book a restaurant for dinner tonight"}
],
"stream": false
}'
import { v4 as uuidv4 } from 'uuid';
const API_KEY = process.env.ASI_ONE_API_KEY;
const ENDPOINT = "https://api.asi1.ai/v1/chat/completions";
const MODEL = "asi1-fast-agentic";
// Session management
const sessionMap = new Map();
function getSessionId(convId) {
let sessionId = sessionMap.get(convId);
if (!sessionId) {
sessionId = uuidv4();
sessionMap.set(convId, sessionId);
}
return sessionId;
}
async function ask(convId, messages, stream = false) {
const sessionId = getSessionId(convId);
console.log(`[session] Using session-id: ${sessionId}`);
const response = await fetch(ENDPOINT, {
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY}`,
'x-session-id': sessionId,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: MODEL,
messages: messages,
stream: stream
})
});
if (!stream) {
const result = await response.json();
return result.choices[0].message.content;
}
// Handle streaming response
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.trim());
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return fullText;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
process.stdout.write(content);
fullText += content;
}
} catch (e) {
// Skip malformed JSON
}
}
}
}
console.log();
return fullText;
}
// Usage example - agent will be called from Agentverse marketplace
const convId = uuidv4();
const messages = [{"role": "user", "content": "Help me book a restaurant for dinner tonight"}];
const reply = await ask(convId, messages, true);
console.log(`Assistant: ${reply}`);
Session Managementā
Agentic models require session persistence to maintain context across agent interactions with the Agentverse marketplace. Always include the x-session-id
header:
import uuid
# Create or retrieve session ID for conversation
def get_session_id(conversation_id: str) -> str:
# In production, store this in Redis or database
session_id = SESSION_MAP.get(conversation_id)
if not session_id:
session_id = str(uuid.uuid4())
SESSION_MAP[conversation_id] = session_id
return session_id
# Include in every request
headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": get_session_id("user_123_chat"),
"Content-Type": "application/json"
}
Asynchronous Agent Processingā
When agents from Agentverse marketplace need time to complete tasks, the model may send a deferred response. Poll for updates:
- Python
- JavaScript
def poll_for_async_reply(conv_id: str, history: list[dict], *, wait_sec: int = 10, max_attempts: int = 30):
"""Poll the conversation every `wait_sec` seconds for a deferred reply."""
for attempt in range(max_attempts):
time.sleep(wait_sec)
print(f"š polling (attempt {attempt + 1}) ā¦", flush=True)
reply = ask(conv_id, history, stream=False)
if reply and "no new message" not in reply.lower():
return reply
return None
# Usage after getting "I've sent the message" response from Agentverse agent
if "I've sent the message" in assistant_reply:
follow_up = poll_for_async_reply(conv_id, conversation_history)
if follow_up:
print(f"Agentverse agent completed task: {follow_up}")
conversation_history.append({"role": "assistant", "content": follow_up})
async function pollForAsyncReply(convId, history, waitSec = 10, maxAttempts = 30) {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
await new Promise(resolve => setTimeout(resolve, waitSec * 1000));
console.log(`š polling (attempt ${attempt + 1}) ā¦`);
const reply = await ask(convId, history, false);
if (reply && !reply.toLowerCase().includes("no new message")) {
return reply;
}
}
return null;
}
// Usage
if (assistantReply.includes("I've sent the message")) {
const followUp = await pollForAsyncReply(convId, conversationHistory);
if (followUp) {
console.log(`Agentverse agent completed task: ${followUp}`);
conversationHistory.push({"role": "assistant", "content": followUp});
}
}
Available Agentic Modelsā
Model | Best For | Latency | Context Window |
---|---|---|---|
asi1-agentic | General orchestration & prototyping | Medium | 32K tokens |
asi1-fast-agentic | Real-time agent coordination | Ultra-fast | 24K tokens |
asi1-extended-agentic | Complex multi-stage workflows | Slower | 64K tokens |
Complete Interactive Exampleā
- Python CLI
import os, uuid, json, requests, sys, time
API_KEY = os.getenv("ASI_ONE_API_KEY") or "sk-REPLACE_ME"
ENDPOINT = "https://api.asi1.ai/v1/chat/completions"
MODEL = "asi1-fast-agentic"
TIMEOUT = 90
SESSION_MAP: dict[str, str] = {}
def get_session_id(conv_id: str) -> str:
sid = SESSION_MAP.get(conv_id)
if sid is None:
sid = str(uuid.uuid4())
SESSION_MAP[conv_id] = sid
return sid
def _post(payload: dict, session_id: str, stream: bool = False):
headers = {
"Authorization": f"Bearer {API_KEY}",
"x-session-id": session_id,
"Content-Type": "application/json",
}
return requests.post(ENDPOINT, headers=headers, json=payload, timeout=TIMEOUT, stream=stream)
def ask(conv_id: str, messages: list[dict], *, stream: bool = False) -> str:
session_id = get_session_id(conv_id)
print(f"[session] Using session-id: {session_id}")
payload = {
"model": MODEL,
"messages": messages,
"stream": stream,
}
if not stream:
resp = _post(payload, session_id)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
# Streaming
with _post(payload, session_id, stream=True) as resp:
resp.raise_for_status()
full_text = ""
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("data: "):
line = line[len("data: ") :]
if line == "[DONE]":
break
try:
chunk = json.loads(line)
except json.JSONDecodeError:
continue
choices = chunk.get("choices")
if not choices:
continue
delta = choices[0].get("delta", {})
if "content" in delta:
token = delta["content"]
sys.stdout.write(token)
sys.stdout.flush()
full_text += token
print()
return full_text
def poll_for_async_reply(conv_id: str, history: list[dict], *, wait_sec: int = 10, max_attempts: int = 30):
for attempt in range(max_attempts):
time.sleep(wait_sec)
print(f"\nš polling (attempt {attempt + 1}) ā¦", flush=True)
reply = ask(conv_id, history, stream=False)
if reply and "no new message" not in reply.lower():
return reply
return None
# Interactive CLI
if __name__ == "__main__":
conv_id = str(uuid.uuid4())
history: list[dict] = []
print("Agentic LLM demo. Type Ctrl+C to exit.\n")
try:
while True:
user_input = input("you > ").strip()
if not user_input:
continue
history.append({"role": "user", "content": user_input})
reply = ask(conv_id, history, stream=True)
history.append({"role": "assistant", "content": reply})
if "I've sent the message" in reply:
follow = poll_for_async_reply(conv_id, history)
if follow:
print(f"\n[Agentverse agent reply]\n{follow}")
history.append({"role": "assistant", "content": follow})
except KeyboardInterrupt:
print("\nBye!")
Best Practicesā
Session Managementā
- Use UUIDs for session IDs to avoid collisions
- Store session mappings in Redis or database for production
- Include x-session-id header in every request to maintain context
Error Handlingā
- Implement timeouts for long-running agent tasks
- Handle network failures with exponential backoff
- Validate responses before processing agent results
Performance Optimizationā
- Use streaming for better user experience
- Choose appropriate model based on complexity needs
- Implement async polling for deferred agent responses
Agent Coordinationā
- Be specific in requests to help agent discovery from Agentverse marketplace
- Allow time for complex multi-agent workflows involving Agentverse agents
- Monitor session state to understand Agentverse agent progress