Skip to content

Latency and Cost Optimization

Performance and Cost Challenges

AI agents face significant challenges related to response times and operational costs. At VrealSoft, we’ve developed comprehensive optimization strategies to address these issues.

Key Performance Bottlenecks

LLM Inference Time

Time required for model generation, especially for large models

Sequential Tool Calls

Each tool adds latency when called in sequence

Context Processing

Time spent processing and organizing large context windows

Network Latency

Delays from API calls and data transfers

Optimization Strategies Overview

optimization_strategies = {
"system_architecture": [
"parallel_processing",
"asynchronous_workflows",
"request_batching",
"edge_deployment"
],
"model_optimizations": [
"model_distillation",
"quantization",
"tiered_model_selection",
"speculative_execution"
],
"caching_strategies": [
"response_caching",
"embedding_caching",
"tool_result_caching",
"precomputation"
],
"content_optimization": [
"context_compression",
"progressive_generation",
"streaming_responses",
"incremental_updates"
]
}

Architecture Optimization Techniques

Parallel Processing Implementation

# Example of parallel tool execution
async def execute_parallel_tools(agent_state, tools_to_execute):
# Create tasks for all tools that can run in parallel
tasks = []
for tool_call in tools_to_execute:
# Check if tool can run in parallel
tool_def = get_tool_definition(tool_call["name"])
if tool_def.get("parallel_safe", False):
# Create task
task = asyncio.create_task(
execute_tool(
tool_name=tool_call["name"],
parameters=tool_call["parameters"],
agent_state=agent_state
)
)
tasks.append((tool_call, task))
# Wait for all tasks to complete
results = {}
for tool_call, task in tasks:
try:
result = await task
results[tool_call["id"]] = {
"status": "success",
"result": result
}
except Exception as e:
results[tool_call["id"]] = {
"status": "error",
"error": str(e)
}
return results

Asynchronous Workflow Architecture

  1. Request initialization with immediate acknowledgment 2. Background processing of time-consuming tasks 3. Intermediate updates to keep users informed 4. Final response delivery when complete

Progressive Response

Generating initial responses quickly, then refining them

Background Processing

Handling computationally intensive tasks asynchronously

Intermediate Results

Providing partial responses while work continues

# Progressive response generation system
class ProgressiveResponseGenerator:
def __init__(self, websocket_connection):
self.websocket = websocket_connection
self.response_id = generate_id()
self.response_parts = []
self.is_complete = False
async def start_response(self, query):
# Send acknowledgment immediately
await self.websocket.send_json({
"type": "response_started",
"response_id": self.response_id,
"query": query,
"timestamp": current_time()
})
# Generate quick initial response
initial_response = await generate_initial_response(query)
# Send initial response
await self.websocket.send_json({
"type": "initial_response",
"response_id": self.response_id,
"content": initial_response,
"is_final": False,
"timestamp": current_time()
})
self.response_parts.append(initial_response)
# Start background processing for complete response
asyncio.create_task(self.process_full_response(query))
return self.response_id
async def process_full_response(self, query):
try:
# Start tool identification
tools_needed = await identify_required_tools(query)
# Send tool usage update
await self.websocket.send_json({
"type": "tool_execution_started",
"response_id": self.response_id,
"tools": [t["name"] for t in tools_needed],
"timestamp": current_time()
})
# Execute tools in parallel where possible
tool_results = await execute_tools_optimally(tools_needed, query)
# Send update that tool execution is complete
await self.websocket.send_json({
"type": "tool_execution_complete",
"response_id": self.response_id,
"timestamp": current_time()
})
# Generate final response based on tool results
final_response = await generate_final_response(query, tool_results)
# Send final response
await self.websocket.send_json({
"type": "final_response",
"response_id": self.response_id,
"content": final_response,
"is_final": True,
"timestamp": current_time()
})
self.response_parts.append(final_response)
self.is_complete = True
except Exception as e:
# Handle errors
await self.websocket.send_json({
"type": "response_error",
"response_id": self.response_id,
"error": str(e),
"timestamp": current_time()
})

Model Optimization Techniques

Tiered Model Selection

def select_optimal_model(task):
# Classify the task type
task_type = classify_task(task)
# Define model selection criteria
selection_criteria = {
"complexity": assess_complexity(task),
"creativity_required": assess_creativity_needed(task),
"reasoning_depth": assess_reasoning_required(task),
"factual_knowledge": assess_knowledge_required(task),
"response_length": estimate_response_length(task)
}
# Map task types to appropriate models
model_tiers = {
"simple_factual": {
"primary": "small_model",
"fallback": "medium_model",
"threshold": {
"complexity": 3,
"reasoning_depth": 2
}
},
"creative_generation": {
"primary": "medium_model",
"fallback": "large_model",
"threshold": {
"creativity_required": 4,
"response_length": 500
}
},
"complex_reasoning": {
"primary": "large_model",
"fallback": None,
"threshold": None
},
"tool_usage": {
"primary": "medium_model",
"fallback": "large_model",
"threshold": {
"complexity": 4,
"reasoning_depth": 3
}
}
}
# Get model tier for this task type
tier = model_tiers.get(task_type, model_tiers["complex_reasoning"])
# Check if we need to use fallback model based on thresholds
if tier["threshold"]:
for criterion, threshold in tier["threshold"].items():
if selection_criteria[criterion] > threshold:
return tier["fallback"] or tier["primary"]
return tier["primary"]

Caching Implementation

Effective caching can dramatically reduce both latency and costs:

Response Caching

Storing common question/answer pairs

Embedding Caching

Preserving vector representations of frequently accessed content

Tool Result Caching

Storing results from expensive tool calls

Component Caching

Caching intermediate results in multi-step workflows

# Multi-level caching system
class AgentCacheSystem:
def __init__(self):
self.response_cache = ResponseCache()
self.embedding_cache = EmbeddingCache()
self.tool_result_cache = ToolResultCache()
self.component_cache = ComponentCache()
# Tracking and optimization
self.hit_rates = {
"response": [],
"embedding": [],
"tool_result": [],
"component": []
}
async def get_cached_response(self, query, context_hash):
"""Try to retrieve a full response from cache"""
cache_key = generate_response_cache_key(query, context_hash)
# Check cache
cached = await self.response_cache.get(cache_key)
if cached:
# Record hit
self.hit_rates["response"].append(1)
return {
"cache_hit": True,
"source": "response_cache",
"result": cached
}
else:
# Record miss
self.hit_rates["response"].append(0)
return {"cache_hit": False}
async def get_cached_tool_result(self, tool_name, params_hash):
"""Try to retrieve a tool execution result from cache"""
cache_key = f"{tool_name}:{params_hash}"
# Check if tool results can be cached
tool_def = get_tool_definition(tool_name)
if not tool_def.get("cacheable", False):
return {"cache_hit": False, "reason": "tool_not_cacheable"}
# Check cache with TTL consideration
cached = await self.tool_result_cache.get(cache_key)
if cached:
# Check if result is still valid
ttl = tool_def.get("cache_ttl", 3600) # Default 1 hour
if (current_time() - cached["timestamp"]) < ttl:
# Record hit
self.hit_rates["tool_result"].append(1)
return {
"cache_hit": True,
"source": "tool_result_cache",
"result": cached["result"]
}
# Record miss
self.hit_rates["tool_result"].append(0)
return {"cache_hit": False}
async def store_tool_result(self, tool_name, params_hash, result):
"""Store a tool execution result in cache"""
tool_def = get_tool_definition(tool_name)
if not tool_def.get("cacheable", False):
return
cache_key = f"{tool_name}:{params_hash}"
await self.tool_result_cache.set(cache_key, {
"result": result,
"timestamp": current_time()
})
def get_cache_analytics(self):
"""Get analytics about cache performance"""
return {
"hit_rates": {
"response": sum(self.hit_rates["response"][-1000:]) / max(len(self.hit_rates["response"][-1000:]), 1),
"embedding": sum(self.hit_rates["embedding"][-1000:]) / max(len(self.hit_rates["embedding"][-1000:]), 1),
"tool_result": sum(self.hit_rates["tool_result"][-1000:]) / max(len(self.hit_rates["tool_result"][-1000:]), 1),
"component": sum(self.hit_rates["component"][-1000:]) / max(len(self.hit_rates["component"][-1000:]), 1)
},
"cache_sizes": {
"response": self.response_cache.size(),
"embedding": self.embedding_cache.size(),
"tool_result": self.tool_result_cache.size(),
"component": self.component_cache.size()
},
"estimated_savings": self.calculate_estimated_savings()
}

Token Optimization Techniques

  1. Context compression to reduce token usage while preserving information
  2. Dynamic truncation to keep only the most relevant information 3. Information prioritization to ensure the most important content is included 4. Tokenization-aware formatting to minimize tokens used for structure
def compress_context(context_items, max_tokens=4000):
# Classify items by importance
critical_items = [item for item in context_items if item["importance"] == "critical"]
high_items = [item for item in context_items if item["importance"] == "high"]
medium_items = [item for item in context_items if item["importance"] == "medium"]
low_items = [item for item in context_items if item["importance"] == "low"]
# First pass: calculate token counts
critical_tokens = sum(count_tokens(item["content"]) for item in critical_items)
high_tokens = sum(count_tokens(item["content"]) for item in high_items)
medium_tokens = sum(count_tokens(item["content"]) for item in medium_items)
low_tokens = sum(count_tokens(item["content"]) for item in low_items)
total_tokens = critical_tokens + high_tokens + medium_tokens + low_tokens
# If we're already under limit, return all items
if total_tokens <= max_tokens:
return context_items
# Calculate how many tokens we need to reduce
excess_tokens = total_tokens - max_tokens
# Second pass: apply compression strategies based on importance
compressed_items = critical_items.copy() # Start with critical items unchanged
remaining_tokens = max_tokens - critical_tokens
# Process high importance items
compressed_high = compress_items(high_items,
max_tokens=min(remaining_tokens, high_tokens),
compression_level="light")
compressed_items.extend(compressed_high)
remaining_tokens -= sum(count_tokens(item["content"]) for item in compressed_high)
# Process medium importance items if space remains
if remaining_tokens > 0:
compressed_medium = compress_items(medium_items,
max_tokens=min(remaining_tokens, medium_tokens),
compression_level="medium")
compressed_items.extend(compressed_medium)
remaining_tokens -= sum(count_tokens(item["content"]) for item in compressed_medium)
# Process low importance items if space remains
if remaining_tokens > 0:
compressed_low = compress_items(low_items,
max_tokens=min(remaining_tokens, low_tokens),
compression_level="high")
compressed_items.extend(compressed_low)
return compressed_items

Infrastructure Optimization

Edge Deployment

Deploying models closer to users to reduce network latency

Auto-scaling

Dynamically adjusting resources based on demand

Reserved Instances

Using committed resources for baseline capacity needs

Spot Instances

Leveraging lower-cost resources for non-critical tasks

# Infrastructure management system
infrastructure_strategy = {
"model_deployment": {
"small_model": {
"deployment_type": "global_edge",
"regions": ["us-east", "us-west", "eu-central", "ap-southeast"],
"instance_type": "gpu-small",
"scaling_policy": {
"min_instances": 2,
"max_instances": 20,
"scale_up_threshold": 70, # CPU utilization %
"scale_down_threshold": 30
}
},
"medium_model": {
"deployment_type": "regional",
"regions": ["us-east", "eu-central", "ap-southeast"],
"instance_type": "gpu-medium",
"scaling_policy": {
"min_instances": 1,
"max_instances": 10,
"scale_up_threshold": 60,
"scale_down_threshold": 20
}
},
"large_model": {
"deployment_type": "centralized",
"regions": ["us-east"],
"instance_type": "gpu-large",
"scaling_policy": {
"min_instances": 1,
"max_instances": 5,
"scale_up_threshold": 50,
"scale_down_threshold": 15
}
}
},
"instance_strategy": {
"baseline": {
"type": "reserved_instances",
"term": "1-year",
"payment": "partial_upfront"
},
"variable": {
"type": "on_demand"
},
"batch_processing": {
"type": "spot_instances"
}
},
"routing_strategy": {
"primary": "latency_based",
"fallback": "round_robin"
}
}

Performance Monitoring and Optimization

class PerformanceMonitor:
def __init__(self):
self.metrics = {
"response_times": [],
"token_usage": [],
"cache_hits": [],
"model_usage": {},
"cost_per_request": []
}
def record_request(self, request_data):
# Record basic metrics
self.metrics["response_times"].append(request_data["response_time"])
self.metrics["token_usage"].append(request_data["token_usage"])
self.metrics["cache_hits"].append(request_data["cache_hit_rate"])
self.metrics["cost_per_request"].append(request_data["cost"])
# Track model usage
model = request_data["model"]
if model not in self.metrics["model_usage"]:
self.metrics["model_usage"][model] = 0
self.metrics["model_usage"][model] += 1
def get_summary(self, time_period="day"):
# Get data for the specified time period
data = self.filter_by_time_period(time_period)
# Calculate summary statistics
summary = {
"response_time": {
"mean": mean(data["response_times"]),
"p50": percentile(data["response_times"], 50),
"p95": percentile(data["response_times"], 95),
"p99": percentile(data["response_times"], 99)
},
"token_usage": {
"mean": mean(data["token_usage"]),
"total": sum(data["token_usage"])
},
"cache_hit_rate": mean(data["cache_hits"]),
"model_distribution": {
model: count / len(data["response_times"])
for model, count in data["model_usage"].items()
},
"cost": {
"mean": mean(data["cost_per_request"]),
"total": sum(data["cost_per_request"])
}
}
# Generate optimization suggestions
summary["optimization_suggestions"] = self.generate_suggestions(summary)
return summary
def generate_suggestions(self, summary):
suggestions = []
# Check for low cache hit rate
if summary["cache_hit_rate"] < 0.3:
suggestions.append({
"area": "caching",
"suggestion": "Improve cache hit rate by analyzing common queries",
"potential_impact": "medium"
})
# Check response time
if summary["response_time"]["p95"] > 2000: # 2 seconds
suggestions.append({
"area": "latency",
"suggestion": "High p95 response times indicate optimization needed",
"potential_impact": "high"
})
# Check model distribution
large_model_usage = summary["model_distribution"].get("large_model", 0)
if large_model_usage > 0.4: # Using large model for >40% of requests
suggestions.append({
"area": "model_selection",
"suggestion": "High usage of large model. Consider optimizing model selection.",
"potential_impact": "high"
})
return suggestions

Future Optimization Directions

We continue to explore:

  • Hardware-specific optimizations (TPUs, custom ASICs)
  • Advanced model distillation techniques
  • Hybrid approaches combining cached and generated responses
  • Multi-modal optimization strategies
  • Specialized architectures for different agent types