Skip to content

Production Guide

Everything you need to know about running FlowStack agents in production environments.

Production Architecture

When you deploy an agent, FlowStack creates a robust, scalable architecture:

graph TB
    User[Users] --> CDN[Global CDN]
    CDN --> Gateway[API Gateway]
    Gateway --> LB[Load Balancer]
    LB --> Function1[Function Instance 1]
    LB --> Function2[Function Instance 2] 
    LB --> Function3[Function Instance N]
    Function1 --> DataVault[(DataVault)]
    Function2 --> DataVault
    Function3 --> DataVault
    Function1 --> AI[AI Providers]
    Function2 --> AI
    Function3 --> AI

What's Included in Production

Automatic Scaling

Your agent handles traffic spikes without configuration:

  • 0 to 1000+ concurrent requests - Automatic scaling
  • Global distribution - Served from multiple regions
  • Load balancing - Intelligent request routing
  • Cold start optimization - Minimal startup latency

Enterprise-Grade Security

Production deployments include comprehensive security:

  • API key authentication - Secure access control
  • Request isolation - Each request runs independently
  • Rate limiting - Automatic abuse protection
  • HTTPS encryption - All traffic encrypted
  • Network isolation - Secure runtime environment

Built-in Monitoring

Comprehensive observability out of the box:

  • Real-time metrics - Response times, error rates, throughput
  • Error tracking - Automatic error capture and alerting
  • Usage analytics - Session tracking and billing metrics
  • Health monitoring - Automatic health checks and recovery

Data Persistence

DataVault provides enterprise-grade data storage:

  • MongoDB-backed - Reliable, scalable database
  • Automatic backups - Data protection and recovery
  • Encryption at rest - Secure data storage
  • Namespaced isolation - Your data stays separate

Production Checklist

Before going live, ensure your agent is production-ready:

✅ Code Quality

  • Error handling - All tools handle errors gracefully
  • Input validation - Validate all user inputs
  • Type hints - Use proper type annotations
  • Docstrings - Document all tools and functions
  • Testing - Test tools thoroughly before deployment

✅ Performance

  • Response time - Tools complete within reasonable time
  • Memory usage - Efficient memory management
  • Caching - Cache expensive operations
  • Model selection - Use appropriate models for tasks
  • Timeout handling - Handle slow operations gracefully

✅ Security

  • API key protection - Never expose API keys in code
  • Input sanitization - Clean user inputs
  • Data validation - Validate all data before processing
  • Error messages - Don't expose sensitive information
  • Access control - Implement proper authorization

✅ Monitoring

  • Health checks - Verify agent responds correctly
  • Usage tracking - Monitor session consumption
  • Error monitoring - Track and handle errors
  • Performance monitoring - Watch response times
  • Alerting setup - Get notified of issues

Performance Optimization

Response Time Optimization

from flowstack import Agent, Models

agent = Agent("optimized-agent", api_key="fs_...")

@agent.tool
def fast_operation(query: str) -> dict:
    """Optimized for speed"""

    # Check cache first
    cache_key = f"fast_{hash(query)}"
    cached = agent.vault.retrieve('cache', key=cache_key)

    if cached and not is_expired(cached):
        return {"result": cached['data'], "cached": True}

    # Use fastest model for simple queries
    if is_simple_query(query):
        agent.set_model(Models.CLAUDE_35_HAIKU)  # Fastest Claude model

    # Process and cache result
    result = process_query(query)
    agent.vault.store('cache', {
        'data': result,
        'expires_at': get_expiry_time()
    }, key=cache_key)

    return {"result": result, "cached": False}

def is_simple_query(query: str) -> bool:
    """Determine if query is simple enough for fast model"""
    simple_keywords = ['hello', 'help', 'what', 'how', 'basic']
    return any(keyword in query.lower() for keyword in simple_keywords)

Memory Optimization

@agent.tool
def memory_efficient_processing(data: list) -> dict:
    """Process large datasets efficiently"""

    # Process in chunks to avoid memory issues
    chunk_size = 100
    processed_count = 0

    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]

        # Process chunk
        process_data_chunk(chunk)
        processed_count += len(chunk)

        # Optional: Store intermediate results
        if processed_count % 1000 == 0:
            save_checkpoint(processed_count)

    return {"processed": processed_count, "total": len(data)}

Provider Optimization

@agent.tool
def cost_optimized_request(task: str, complexity: str = "medium") -> dict:
    """Route to optimal provider based on task complexity"""

    if complexity == "simple":
        # Use cheapest option for simple tasks
        agent.set_provider("openai", byok={"api_key": "sk-..."})
        agent.set_model("gpt-3.5-turbo")
    elif complexity == "complex":
        # Use most capable model for complex tasks
        agent.set_provider("anthropic", byok={"api_key": "sk-ant-..."})
        agent.set_model("claude-3-opus-20240229")
    else:
        # Use balanced option for medium complexity
        agent.set_provider("bedrock")  # Managed
        agent.set_model(Models.CLAUDE_35_SONNET)

    response = agent.chat(task)
    return {"response": response, "provider": agent.provider, "model": agent.model}

Error Handling & Recovery

Robust Error Handling

from flowstack import Agent, FlowStackError, QuotaExceededError
import logging

# Configure logging for production
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

agent = Agent("production-agent", api_key="fs_...")

@agent.tool
def production_ready_tool(data: dict) -> dict:
    """Production-ready tool with comprehensive error handling"""

    try:
        # Validate inputs
        if not data or not isinstance(data, dict):
            return {"error": "Invalid input: data must be a non-empty dictionary"}

        required_fields = ['id', 'type']
        missing_fields = [field for field in required_fields if field not in data]
        if missing_fields:
            return {"error": f"Missing required fields: {missing_fields}"}

        # Process data
        result = process_business_logic(data)

        # Log success
        logger.info(f"Successfully processed data for ID: {data['id']}")

        return {"success": True, "result": result, "id": data['id']}

    except ValueError as e:
        logger.warning(f"Invalid data format: {e}")
        return {"error": "Invalid data format", "details": str(e), "retry": False}

    except ConnectionError as e:
        logger.error(f"External service unavailable: {e}")
        return {"error": "Service temporarily unavailable", "retry": True}

    except QuotaExceededError as e:
        logger.warning(f"Quota exceeded: {e}")
        return {"error": "Usage limit reached", "upgrade_required": True}

    except FlowStackError as e:
        logger.error(f"FlowStack error: {e}")
        return {"error": "Service error", "retry": True}

    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return {"error": "Internal error", "retry": True}

def process_business_logic(data: dict) -> dict:
    """Your business logic here"""
    # Simulate processing
    return {"processed": True, "timestamp": "2024-01-15T10:30:00Z"}

Graceful Degradation

@agent.tool
def resilient_operation(query: str) -> dict:
    """Operation that gracefully degrades on failures"""

    try:
        # Try primary approach
        result = primary_processing_method(query)
        return {"result": result, "method": "primary", "quality": "high"}

    except Exception as e:
        logger.warning(f"Primary method failed: {e}")

        try:
            # Fallback to secondary approach
            result = secondary_processing_method(query)
            return {"result": result, "method": "secondary", "quality": "medium"}

        except Exception as e:
            logger.error(f"Secondary method failed: {e}")

            # Final fallback - basic response
            return {
                "result": "I'm experiencing technical difficulties. Please try again later.",
                "method": "fallback",
                "quality": "low",
                "error": True
            }

Monitoring & Alerting

Usage Monitoring

@agent.tool
def monitor_usage() -> dict:
    """Monitor and alert on usage patterns"""

    usage = agent.get_usage()

    # Check for approaching limits
    if usage.usage_percentage > 90:
        send_alert("critical", f"Usage at {usage.usage_percentage}% - immediate action required")
    elif usage.usage_percentage > 80:
        send_alert("warning", f"Usage at {usage.usage_percentage}% - consider upgrading")

    # Track usage patterns
    agent.vault.store('usage_logs', {
        'timestamp': datetime.now().isoformat(),
        'sessions_used': usage.sessions_used,
        'sessions_limit': usage.sessions_limit,
        'usage_percentage': usage.usage_percentage
    })

    return {
        "current_usage": usage.usage_percentage,
        "sessions_remaining": usage.sessions_remaining,
        "status": "healthy" if usage.usage_percentage < 80 else "warning"
    }

def send_alert(level: str, message: str):
    """Send alert to monitoring system"""
    # Integrate with your alerting system (Slack, PagerDuty, etc.)
    logger.info(f"ALERT [{level}]: {message}")

Health Monitoring

@agent.tool
def health_check() -> dict:
    """Comprehensive health check"""

    health_status = {
        "status": "healthy",
        "checks": {},
        "timestamp": datetime.now().isoformat()
    }

    # Check DataVault connectivity
    try:
        agent.vault.store('health_check', {'test': True}, key='health_test')
        agent.vault.delete('health_check', 'health_test')
        health_status["checks"]["datavault"] = "healthy"
    except Exception as e:
        health_status["checks"]["datavault"] = f"unhealthy: {e}"
        health_status["status"] = "degraded"

    # Check AI provider connectivity
    try:
        test_response = agent.chat("test")
        if test_response:
            health_status["checks"]["ai_provider"] = "healthy"
        else:
            health_status["checks"]["ai_provider"] = "unhealthy: no response"
            health_status["status"] = "degraded"
    except Exception as e:
        health_status["checks"]["ai_provider"] = f"unhealthy: {e}"
        health_status["status"] = "unhealthy"

    # Check memory usage (if available)
    import psutil
    memory_percent = psutil.virtual_memory().percent
    if memory_percent > 90:
        health_status["checks"]["memory"] = f"warning: {memory_percent}% used"
        health_status["status"] = "degraded"
    else:
        health_status["checks"]["memory"] = f"healthy: {memory_percent}% used"

    return health_status

Security Best Practices

Input Validation

import re
from typing import Union

@agent.tool
def secure_user_input(email: str, message: str) -> dict:
    """Securely handle user input"""

    # Validate email format
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(email_pattern, email):
        return {"error": "Invalid email format"}

    # Sanitize message
    message = sanitize_text(message)

    # Check message length
    if len(message) > 1000:
        return {"error": "Message too long (max 1000 characters)"}

    # Check for potentially harmful content
    if contains_harmful_content(message):
        return {"error": "Message contains inappropriate content"}

    # Process safe input
    return process_safe_input(email, message)

def sanitize_text(text: str) -> str:
    """Remove potentially harmful characters"""
    # Remove HTML tags
    import html
    text = html.escape(text)

    # Remove excessive whitespace
    text = ' '.join(text.split())

    return text

def contains_harmful_content(text: str) -> bool:
    """Check for harmful content patterns"""
    harmful_patterns = [
        r'<script.*?>.*?</script>',
        r'javascript:',
        r'vbscript:',
        r'onload\s*=',
        r'onerror\s*='
    ]

    for pattern in harmful_patterns:
        if re.search(pattern, text, re.IGNORECASE):
            return True

    return False

API Key Security

import os

# ✅ Good: Use environment variables
agent = Agent(
    name="secure-agent",
    api_key=os.getenv("FLOWSTACK_API_KEY"),
    provider="openai",
    byok={"api_key": os.getenv("OPENAI_API_KEY")}
)

# ❌ Bad: Hardcode API keys
# agent = Agent(
#     name="insecure-agent", 
#     api_key="fs_abc123...",  # Don't do this!
#     byok={"api_key": "sk-..."}  # Don't do this!
# )

Data Protection

@agent.tool
def protect_sensitive_data(user_data: dict) -> dict:
    """Handle sensitive data securely"""

    # Create a copy to avoid modifying original
    safe_data = user_data.copy()

    # Remove or mask sensitive fields
    sensitive_fields = ['ssn', 'credit_card', 'password', 'api_key']
    for field in sensitive_fields:
        if field in safe_data:
            if field == 'credit_card':
                # Mask credit card (show last 4 digits)
                safe_data[field] = f"****-****-****-{safe_data[field][-4:]}"
            else:
                # Remove other sensitive data
                del safe_data[field]

    # Log access for audit trail
    logger.info(f"Accessed user data for user: {safe_data.get('user_id', 'unknown')}")

    return {"data": safe_data, "sensitive_fields_removed": len(sensitive_fields)}

Scaling Considerations

High Traffic Handling

FlowStack automatically handles scaling, but you can optimize for high traffic:

@agent.tool
def high_traffic_optimized(request_data: dict) -> dict:
    """Optimized for high-traffic scenarios"""

    # Use caching aggressively
    cache_key = generate_cache_key(request_data)
    cached_result = agent.vault.retrieve('high_traffic_cache', key=cache_key)

    if cached_result and not is_cache_expired(cached_result):
        return {"result": cached_result['data'], "cached": True}

    # Process with minimal resource usage
    result = lightweight_processing(request_data)

    # Cache with short TTL for high traffic
    agent.vault.store('high_traffic_cache', {
        'data': result,
        'expires_at': (datetime.now() + timedelta(minutes=5)).isoformat()
    }, key=cache_key)

    return {"result": result, "cached": False}

def generate_cache_key(data: dict) -> str:
    """Generate consistent cache key"""
    import hashlib
    key_data = json.dumps(data, sort_keys=True)
    return hashlib.md5(key_data.encode()).hexdigest()

Resource Management

@agent.tool
def resource_aware_processing(large_dataset: list) -> dict:
    """Process large datasets with resource awareness"""

    # Check current system load
    import psutil
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent

    # Adjust processing based on resource availability
    if cpu_percent > 80 or memory_percent > 80:
        # Use smaller chunks when resources are constrained
        chunk_size = 50
        delay_between_chunks = 0.1
    else:
        # Use larger chunks when resources are available
        chunk_size = 200
        delay_between_chunks = 0

    results = []
    for i in range(0, len(large_dataset), chunk_size):
        chunk = large_dataset[i:i + chunk_size]
        chunk_result = process_chunk(chunk)
        results.append(chunk_result)

        # Optional delay to prevent resource exhaustion
        if delay_between_chunks:
            time.sleep(delay_between_chunks)

    return {
        "processed_chunks": len(results),
        "total_items": len(large_dataset),
        "chunk_size_used": chunk_size
    }

Deployment Strategies

Blue-Green Deployment

# Deploy new version alongside existing
new_endpoint = agent.deploy(version="2.0.0", strategy="blue_green")

# Test new version
if test_new_version(new_endpoint):
    # Switch traffic to new version
    agent.promote_deployment("2.0.0")
else:
    # Rollback to previous version
    agent.rollback_deployment()

Canary Deployment

# Deploy to small percentage of traffic
agent.deploy(
    version="2.0.0",
    strategy="canary",
    traffic_percentage=10  # 10% of traffic
)

# Monitor metrics and gradually increase traffic
if metrics_look_good():
    agent.increase_canary_traffic(50)  # 50% of traffic

    if still_looking_good():
        agent.promote_canary()  # 100% of traffic

Cost Optimization

Smart Provider Usage

@agent.tool
def cost_optimized_routing(task: str, budget_per_session: float = 0.15) -> dict:
    """Route tasks to optimize costs while maintaining quality"""

    # Analyze task complexity
    complexity = analyze_task_complexity(task)

    if complexity == "simple" and budget_per_session <= 0.10:
        # Use cheapest option for simple tasks
        agent.set_provider("openai", byok={"api_key": os.getenv("OPENAI_KEY")})
        agent.set_model("gpt-3.5-turbo")
        estimated_cost = 0.05
    elif complexity == "medium" and budget_per_session <= 0.25:
        # Use managed Bedrock for balanced cost/performance
        agent.set_provider("bedrock")
        agent.set_model(Models.CLAUDE_35_SONNET)
        estimated_cost = 0.20
    elif complexity == "complex":
        # Use best model regardless of cost
        agent.set_provider("anthropic", byok={"api_key": os.getenv("ANTHROPIC_KEY")})
        agent.set_model("claude-3-opus-20240229")
        estimated_cost = 0.35
    else:
        return {"error": f"Task too expensive for budget of ${budget_per_session}"}

    response = agent.chat(task)

    # Track actual costs
    agent.vault.store('cost_tracking', {
        'task': task,
        'complexity': complexity,
        'provider': agent.provider,
        'model': agent.model,
        'estimated_cost': estimated_cost,
        'timestamp': datetime.now().isoformat()
    })

    return {
        "response": response,
        "estimated_cost": estimated_cost,
        "provider": agent.provider
    }

Ready for production? Start with our deployment guide or check the billing documentation to understand costs.