Prompt Injection
Attempts to manipulate agent behavior through carefully crafted inputs
AI agents with increasing capabilities introduce new security and safety challenges. At VrealSoft, we’ve developed a comprehensive approach to address these concerns.
Prompt Injection
Attempts to manipulate agent behavior through carefully crafted inputs
Data Exfiltration
Unauthorized access to or extraction of sensitive information
Tool Misuse
Exploitation of tool access for unintended purposes
Authorization Bypassing
Circumventing established permission models
Prompt injection attacks attempt to override or manipulate the agent’s instructions by including malicious content in user inputs.
def sanitize_user_input(user_input): # Check for common prompt injection patterns injection_patterns = [ r"ignore previous instructions", r"disregard your guidelines", r"your new instructions are", r"you are now [a-zA-Z]+" ]
for pattern in injection_patterns: if re.search(pattern, user_input, re.IGNORECASE): # Flag potential injection attempt return { "sanitized_input": re.sub(pattern, "[FILTERED]", user_input), "risk_level": "high", "flagged_pattern": pattern }
# Check for suspicious formatting attempts formatting_patterns = [ r"```system", r"<(?:instructions|prompt|system|assistant)>", r"//\s*system" ]
for pattern in formatting_patterns: if re.search(pattern, user_input, re.IGNORECASE): return { "sanitized_input": re.sub(pattern, "[FILTERED]", user_input), "risk_level": "medium", "flagged_pattern": pattern }
# Input appears safe return { "sanitized_input": user_input, "risk_level": "low", "flagged_pattern": None }def process_user_message(user_message, conversation_history): # Sanitize the user input sanitization_result = sanitize_user_input(user_message)
# Determine if additional instruction reinforcement is needed reinforcement_needed = ( sanitization_result["risk_level"] in ["medium", "high"] or is_conversation_drifting(conversation_history) or len(conversation_history) > 10 # Periodic reinforcement )
# Construct the prompt with appropriate reinforcement if reinforcement_needed: prompt = construct_reinforced_prompt( user_message=sanitization_result["sanitized_input"], history=conversation_history, reinforcement_level= "high" if sanitization_result["risk_level"] == "high" else "medium" ) else: prompt = construct_standard_prompt( user_message=sanitization_result["sanitized_input"], history=conversation_history )
# Log any security concerns if sanitization_result["risk_level"] != "low": log_security_event({ "event_type": "potential_prompt_injection", "risk_level": sanitization_result["risk_level"], "pattern": sanitization_result["flagged_pattern"], "action_taken": "applied instruction reinforcement" })
return promptOur permission architecture includes:
Granular Access Control
Fine-grained permissions for specific tools and operations
Context-aware Permissions
Adjusting access based on conversation context and user intent
Escalation Procedures
Processes for requesting and granting elevated permissions
Audit Logging
Comprehensive logging of all permission decisions
# Example permission system implementationclass AgentPermissionSystem: def __init__(self, agent_id, permission_store): self.agent_id = agent_id self.permission_store = permission_store self.current_permissions = set() self.active_session = None self.audit_log = []
def start_session(self, user_id, session_context): # Initialize a new session with base permissions self.active_session = { "session_id": generate_session_id(), "user_id": user_id, "start_time": current_time(), "context": session_context }
# Apply base permissions for this agent and user self.current_permissions = self.permission_store.get_base_permissions( self.agent_id, user_id )
# Log session start self.log_event("session_started", { "permissions": list(self.current_permissions) })
return self.active_session["session_id"]
def check_permission(self, action, resource, additional_context=None): if not self.active_session: return {"allowed": False, "reason": "no_active_session"}
permission_key = f"{action}:{resource}"
# Direct permission check if permission_key in self.current_permissions: self.log_event("permission_granted", { "action": action, "resource": resource, "method": "direct" }) return {"allowed": True, "method": "direct"}
# Check for pattern-based permissions for pattern in self.current_permissions: if pattern.endswith("*") and permission_key.startswith(pattern[:-1]): self.log_event("permission_granted", { "action": action, "resource": resource, "method": "pattern", "pattern": pattern }) return {"allowed": True, "method": "pattern", "pattern": pattern}
# Permission denied self.log_event("permission_denied", { "action": action, "resource": resource })
return {"allowed": False, "reason": "not_permitted"}
def request_elevated_permission(self, action, resource, justification): # Request temporary elevation of permissions request_id = generate_request_id()
self.log_event("elevation_requested", { "request_id": request_id, "action": action, "resource": resource, "justification": justification })
# In a real system, this might trigger an approval workflow # For this example, we'll simulate an automatic approval for certain cases auto_approved = self.permission_store.can_auto_approve( self.agent_id, self.active_session["user_id"], action, resource, justification )
if auto_approved: # Grant temporary permission temp_permission = f"{action}:{resource}" self.current_permissions.add(temp_permission)
self.log_event("elevation_approved", { "request_id": request_id, "action": action, "resource": resource, "approval_method": "automatic", "expiration": current_time() + 3600 # 1 hour expiration })
# Schedule permission removal schedule_permission_removal(self, temp_permission, 3600)
return {"approved": True, "method": "automatic", "expiration": 3600} else: # Would typically wait for human approval here return {"approved": False, "method": "requires_human_approval", "request_id": request_id}
def log_event(self, event_type, details): log_entry = { "timestamp": current_time(), "agent_id": self.agent_id, "session_id": self.active_session["session_id"] if self.active_session else None, "user_id": self.active_session["user_id"] if self.active_session else None, "event_type": event_type, "details": details }
self.audit_log.append(log_entry) self.permission_store.store_audit_log(log_entry)Our sandboxing approach includes:
sandbox_config = { "isolation": { "process": True, # Run in separate process "network": True, # Network namespace isolation "filesystem": True # File system isolation }, "resources": { "memory_limit_mb": 256, "cpu_limit_percent": 50, "disk_quota_mb": 100, "network_rate_limit_kbps": 1024 }, "timeouts": { "execution_timeout_sec": 30, "idle_timeout_sec": 300 }, "filesystem": { "read_only_paths": ["/lib", "/usr/lib", "/etc/ssl"], "writable_paths": ["/tmp/sandbox"], "hidden_paths": ["/etc/passwd", "/etc/shadow", "/.env"] }, "network": { "allowed_domains": ["api.company.com", "data.company.com"], "allowed_ports": [443, 8443], "blocked_domains": ["malicious.example.com"] }}def execute_tool_in_sandbox(tool_name, parameters, user_context): # Get tool definition tool = get_tool_definition(tool_name)
# Check if tool is permitted for this user/context permission_check = permission_system.check_permission( action="execute", resource=f"tool:{tool_name}", additional_context=user_context )
if not permission_check["allowed"]: return { "status": "error", "error_type": "permission_denied", "message": f"Not permitted to use {tool_name}" }
# Determine sandbox level based on tool risk sandbox_level = tool.get("risk_level", "medium") sandbox_settings = get_sandbox_config(sandbox_level)
# Prepare sandbox environment sandbox = create_sandbox(sandbox_settings)
try: # Start execution with timeout with execution_timeout(sandbox_settings["timeouts"]["execution_timeout_sec"]): # Load tool code into sandbox sandbox.load_code(tool["implementation"])
# Validate parameters before execution validated_params = validate_parameters(tool, parameters)
# Execute in sandbox result = sandbox.execute("main", validated_params)
# Validate output before returning sanitized_result = sanitize_tool_output(result, tool)
return { "status": "success", "result": sanitized_result } except TimeoutError: return { "status": "error", "error_type": "timeout", "message": "Tool execution exceeded time limit" } except SandboxViolation as e: # Log security event log_security_event({ "event_type": "sandbox_violation", "tool": tool_name, "violation": str(e), "user_context": user_context })
return { "status": "error", "error_type": "security_violation", "message": "Security constraint violated" } finally: # Ensure sandbox is destroyed sandbox.destroy()Continuous monitoring is essential for detecting potential security issues:
Behavioral Baselines
Establishing normal patterns of agent behavior
Real-time Monitoring
Watching for anomalous actions or responses
Intervention Mechanisms
Systems for interrupting problematic behavior
Incident Response
Procedures for addressing detected security events
# Simplified monitoring systemclass AgentMonitor: def __init__(self, agent_id): self.agent_id = agent_id self.behavior_profiles = load_behavior_profiles(agent_id) self.current_session = None self.session_events = [] self.anomaly_thresholds = load_thresholds(agent_id)
def start_session(self, user_id, session_context): self.current_session = { "session_id": generate_session_id(), "user_id": user_id, "start_time": current_time(), "context": session_context } self.session_events = []
return self.current_session["session_id"]
def record_event(self, event_type, event_data): event = { "timestamp": current_time(), "type": event_type, "data": event_data }
self.session_events.append(event)
# Check for anomalies anomalies = self.detect_anomalies(event)
if anomalies: self.handle_anomalies(event, anomalies)
return event
def detect_anomalies(self, event): anomalies = []
# Check against different anomaly detectors for detector_name, detector in self.get_detectors(event["type"]).items(): result = detector.check(event, self.session_events, self.behavior_profiles)
if result["is_anomaly"]: anomalies.append({ "detector": detector_name, "score": result["score"], "reason": result["reason"], "severity": result["severity"] })
return anomalies
def handle_anomalies(self, event, anomalies): # Log all anomalies for anomaly in anomalies: log_security_event({ "event_type": "anomaly_detected", "agent_id": self.agent_id, "session_id": self.current_session["session_id"], "detector": anomaly["detector"], "severity": anomaly["severity"], "reason": anomaly["reason"], "event": event })
# Take action based on severity if anomaly["severity"] == "critical": # Terminate session immediately self.terminate_session("critical_anomaly_detected")
# Alert security team alert_security_team({ "agent_id": self.agent_id, "session_id": self.current_session["session_id"], "anomaly": anomaly, "event": event })
elif anomaly["severity"] == "high": # Apply restrictions to the session apply_session_restrictions(self.current_session["session_id"], { "reason": "high_severity_anomaly", "restrictions": ["limit_sensitive_tools", "increase_monitoring"] })
def terminate_session(self, reason): self.current_session["end_time"] = current_time() self.current_session["end_reason"] = reason
# Store session data for analysis store_session_data(self.current_session, self.session_events)
# Clear current session self.current_session = None self.session_events = []We continue to explore: