From Prototype to Production: Scaling Web Search for AI Applications

A practical guide to scaling AI web search from a prototype handling 100 queries to a production system handling 1 million+ queries per month.

12 min readKeiro Team

Introduction

Your AI prototype works great on your laptop. It searches the web, generates accurate answers, and impresses everyone in the demo. But going from 100 queries in a demo to 100,000 queries per day in production introduces a completely different set of challenges: reliability, latency, cost, error handling, and monitoring.

In this guide, we walk through the journey from prototype to production for an AI application that relies on web search, using Keiro as the search backend.

Stage 1: The Prototype

A typical prototype looks like this:

import requests

def answer_question(question: str) -> str:
    # Search
    results = requests.post("https://kierolabs.space/api/search", json={
        "apiKey": "your-key",
        "query": question
    }).json()

    # Generate
    # ... LLM call with results as context
    return generated_answer

This is fine for testing but has many issues for production: no error handling, no timeouts, no caching, no monitoring, and a hardcoded API key.

Stage 2: Adding Reliability

Error Handling and Retries

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging

logger = logging.getLogger(__name__)

class KeiroCl:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://kierolabs.space/api"
        self.session = requests.Session()

        # Configure retries
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)

    def search(self, query: str, timeout: int = 10) -> dict:
        """Search with error handling and retries."""
        try:
            response = self.session.post(
                f"{self.base_url}/search",
                json={"apiKey": self.api_key, "query": query},
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            logger.warning(f"Search timeout for query: {query}")
            return {"results": [], "error": "timeout"}
        except requests.exceptions.HTTPError as e:
            logger.error(f"Search HTTP error: {e}")
            return {"results": [], "error": str(e)}
        except Exception as e:
            logger.error(f"Search unexpected error: {e}")
            return {"results": [], "error": str(e)}

    def research(self, query: str, timeout: int = 30) -> dict:
        """Research with longer timeout."""
        try:
            response = self.session.post(
                f"{self.base_url}/research",
                json={"apiKey": self.api_key, "query": query},
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logger.error(f"Research error: {e}")
            return {"summary": "", "sources": [], "error": str(e)}

    def crawl(self, url: str, timeout: int = 15) -> dict:
        """Web crawl with error handling."""
        try:
            response = self.session.post(
                f"{self.base_url}/web-crawler",
                json={"apiKey": self.api_key, "url": url},
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logger.error(f"Crawl error for {url}: {e}")
            return {"content": "", "title": "", "error": str(e)}

Fallback Strategy

    def search_with_fallback(self, query: str) -> dict:
        """Try search-pro first, fall back to search, then to answer."""
        # Try pro search first
        result = self.search_pro(query)
        if result.get("results"):
            return result

        # Fall back to standard search
        logger.info(f"Falling back to standard search for: {query}")
        result = self.search(query)
        if result.get("results"):
            return result

        # Last resort: use /answer endpoint
        logger.info(f"Falling back to /answer for: {query}")
        try:
            resp = self.session.post(
                f"{self.base_url}/answer",
                json={"apiKey": self.api_key, "query": query},
                timeout=15
            )
            return {"answer": resp.json().get("response", ""), "fallback": True}
        except Exception:
            return {"results": [], "error": "All search methods failed"}

Stage 3: Application-Level Caching

While Keiro provides a 50% cache discount automatically, you can add application-level caching to eliminate repeated API calls entirely:

import hashlib
import json
import time
from functools import lru_cache

class CachedKeiroClient(KeiroCl):
    def __init__(self, api_key: str, cache_ttl: int = 300):
        super().__init__(api_key)
        self.cache = {}
        self.cache_ttl = cache_ttl
        self.cache_hits = 0
        self.cache_misses = 0

    def _cache_key(self, method: str, params: dict) -> str:
        raw = f"{method}:{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(raw.encode()).hexdigest()

    def _get_cached(self, key: str) -> dict | None:
        if key in self.cache:
            data, timestamp = self.cache[key]
            if time.time() - timestamp < self.cache_ttl:
                self.cache_hits += 1
                return data
            else:
                del self.cache[key]
        self.cache_misses += 1
        return None

    def _set_cached(self, key: str, data: dict):
        self.cache[key] = (data, time.time())
        # Evict old entries if cache is too large
        if len(self.cache) > 10000:
            oldest_key = min(self.cache, key=lambda k: self.cache[k][1])
            del self.cache[oldest_key]

    def search(self, query: str, timeout: int = 10) -> dict:
        key = self._cache_key("search", {"query": query})
        cached = self._get_cached(key)
        if cached:
            return cached

        result = super().search(query, timeout)
        if not result.get("error"):
            self._set_cached(key, result)
        return result

    @property
    def cache_hit_rate(self) -> float:
        total = self.cache_hits + self.cache_misses
        return self.cache_hits / total if total > 0 else 0

Stage 4: Async Processing for Scale

For high-throughput applications, switch to async:

import aiohttp
import asyncio

class AsyncKeiroClient:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://kierolabs.space/api"

    async def search(self, query: str, session: aiohttp.ClientSession) -> dict:
        try:
            async with session.post(
                f"{self.base_url}/search",
                json={"apiKey": self.api_key, "query": query},
                timeout=aiohttp.ClientTimeout(total=10)
            ) as response:
                return await response.json()
        except Exception as e:
            return {"results": [], "error": str(e)}

    async def parallel_search(self, queries: list[str]) -> list[dict]:
        """Search multiple queries in parallel."""
        async with aiohttp.ClientSession() as session:
            tasks = [self.search(q, session) for q in queries]
            return await asyncio.gather(*tasks)

    async def search_and_crawl(self, query: str) -> dict:
        """Search then crawl top results in parallel."""
        async with aiohttp.ClientSession() as session:
            # Search first
            search_result = await self.search(query, session)
            results = search_result.get("results", [])

            if not results:
                return {"results": [], "enriched": []}

            # Crawl top 3 results in parallel
            crawl_tasks = []
            for r in results[:3]:
                crawl_tasks.append(self._crawl(r["url"], session))

            crawled = await asyncio.gather(*crawl_tasks)

            # Merge
            enriched = []
            for i, r in enumerate(results[:3]):
                r["full_content"] = crawled[i].get("content", "")
                enriched.append(r)

            return {"results": results, "enriched": enriched}

Stage 5: Monitoring and Observability

import time
from dataclasses import dataclass, field

@dataclass
class SearchMetrics:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    total_latency_ms: float = 0
    latencies: list = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        return self.successful_requests / self.total_requests if self.total_requests > 0 else 0

    @property
    def avg_latency_ms(self) -> float:
        return self.total_latency_ms / self.total_requests if self.total_requests > 0 else 0

    @property
    def p95_latency_ms(self) -> float:
        if not self.latencies:
            return 0
        sorted_latencies = sorted(self.latencies)
        idx = int(len(sorted_latencies) * 0.95)
        return sorted_latencies[idx]

    def record(self, success: bool, latency_ms: float):
        self.total_requests += 1
        if success:
            self.successful_requests += 1
        else:
            self.failed_requests += 1
        self.total_latency_ms += latency_ms
        self.latencies.append(latency_ms)
        # Keep only last 10,000 latencies for percentile calculation
        if len(self.latencies) > 10000:
            self.latencies = self.latencies[-10000:]

Stage 6: Cost Optimization at Scale

At 100,000+ queries per day, cost optimization becomes critical. Here is the playbook:

StrategyImplementationSavings
Smart search triggersOnly search when needed40-60% fewer API calls
App-level caching5-minute TTL cache20-30% fewer API calls
Keiro cache discountAutomatic50% on remaining repeats
Batch processingMove background jobs to /batch-search100% (batch is free)
/answer for simple queriesSkip the LLM for direct answers97% per query
Keiro Pro plan$24.99 for 200k requestsBest per-request rate

Production Checklist

Before going live, verify each of these:

  • Error handling: All API calls have try/catch blocks and timeouts
  • Retries: Transient failures are retried with exponential backoff
  • Fallbacks: If search fails, the system degrades gracefully
  • Caching: Application-level caching is in place with appropriate TTLs
  • Monitoring: Latency, error rates, and costs are tracked
  • Alerting: Alerts fire when error rates or latency exceeds thresholds
  • API key management: Keys are in environment variables, not code
  • Rate limiting: Your application respects API rate limits
  • Logging: All search queries and results are logged for debugging
  • Load testing: The system has been tested at 2x expected peak load

Scaling Milestones

ScaleQueries/DayRecommended Keiro PlanKey Focus
Prototype100Lite ($5.99/mo)Get it working
Beta1,000Lite ($5.99/mo)Error handling
Launch10,000Essential ($14.99/mo)Caching, monitoring
Growth50,000Pro ($24.99/mo)Cost optimization
Scale200,000+Pro + batchAsync, load balancing

Conclusion

Scaling an AI application's web search from prototype to production is a well-understood process. The key stages are: add reliability (retries, timeouts, fallbacks), add caching, add async processing, add monitoring, and optimize costs. Keiro's affordable pricing, free batch processing, and automatic cache discounts make it particularly well-suited for production workloads, keeping your search costs negligible even at scale.

Scale your AI application with Keiro. Plans from $5.99 to $24.99/month at kierolabs.space.

Ready to build something?

Join developers using Keiro — 10× cheaper with superior performance.

Get started