Skip to content

Phase 12: Serverless Functions Architecture

Document Type: Technical Architecture
Phase: 12 - Serverless Functions
Status: Active


System Overview

┌─────────────────────────────────────────────────────────────────┐
│                        Client Applications                      │
│                   (REST API, Cron Scheduler)                    │
└────────────────────────────┬────────────────────────────────────┘
                  ┌──────────▼──────────┐
                  │  Functions Module   │
                  │ (Invoker, Scheduler)│
                  └──────────┬──────────┘
        ┌────────────────────┼────────────────────┐
        │                    │                    │
┌───────▼────────┐  ┌────────▼────────┐  ┌───────▼────────┐
│WASM Runtime    │  │ Function        │  │    Database    │
│ (Sandbox)      │  │   Registry      │  │   (Executor)   │
└────────────────┘  └─────────────────┘  └────────────────┘
┌───────▼────────┐
│  Host          │
│  Functions     │
│(db, log, http) │
└────────────────┘

Component Architecture

1. Function Registry

Purpose: Store and index deployed functions

pub struct FunctionRegistry {
    functions: HashMap<Uuid, Function>,
    name_index: HashMap<String, Uuid>,
    trigger_index: HashMap<TriggerType, HashSet<Uuid>>,
}

Operations: - register(function) - Add function to registry - get_by_name(name) - Lookup by name - get_by_trigger(trigger) - Find all functions for trigger type - unregister(id) - Remove function


2. WASM Runtime

Integration: wasmer or wasmtime

pub struct WasmRuntime {
    engine: Engine,
    store: Store,
}

impl WasmRuntime {
    pub fn load_module(&self, wasm_bytes: &[u8]) -> Result<Module> {
        Module::new(&self.engine, wasm_bytes)
    }

    pub fn instantiate(&self, module: &Module) -> Result<Instance> {
        let imports = create_host_functions();
        Instance::new(&mut self.store, module, &imports)
    }
}

Resource Limits:

pub struct ResourceLimits {
    timeout: Duration,      // Default: 10s
    max_memory: usize,      // Default: 128MB
    max_stack: usize,       // Default: 1MB
}


3. Invoker

Purpose: Execute functions with resource limits

pub struct Invoker {
    runtime: WasmRuntime,
    registry: Arc<FunctionRegistry>,
    executor: Arc<Executor>,  // For database access
}

impl Invoker {
    pub async fn invoke(
        &self,
        function_name: &str,
        payload: Value,
        context: &RlsContext,
    ) -> Result<InvocationResult> {
        // 1. Lookup function
        let func = self.registry.get_by_name(function_name)?;

        // 2. Load WASM module
        let module = self.runtime.load_module(&func.wasm_bytes)?;

        // 3. Create instance with host functions
        let instance = self.runtime.instantiate(&module)?;

        // 4. Call handler with timeout
        let result = timeout(
            func.config.timeout,
            self.call_handler(instance, payload, context)
        ).await?;

        Ok(result)
    }
}

4. Scheduler

Purpose: Run cron-based jobs

pub struct FunctionScheduler {
    jobs: HashMap<Uuid, ScheduledJob>,
    invoker: Arc<Invoker>,
}

pub struct ScheduledJob {
    id: Uuid,
    function_id: Uuid,
    cron_expression: String,
    next_run: DateTime<Utc>,
    enabled: bool,
}

impl FunctionScheduler {
    pub async fn run(&self) {
        loop {
            let now = Utc::now();
            let due_jobs = self.get_due_jobs(now);

            for job in due_jobs {
                tokio::spawn(async move {
                    self.invoker.invoke(...).await;
                });

                self.update_next_run(&job);
            }

            sleep(Duration::from_secs(1)).await;
        }
    }
}

Module Structure

src/functions/
├── mod.rs              # Module entry, exports
├── errors.rs           # Function-specific errors
├── function.rs         # Function model, metadata
├── trigger.rs          # Trigger types (HTTP, DB, Schedule)
├── registry.rs         # Function registry (in-memory index)
├── runtime.rs          # WASM runtime (wasmer integration)
├── invoker.rs          # Function execution with limits
├── scheduler.rs        # Cron job scheduler
└── host_functions.rs   # Host functions (db_query, log, etc.)

Data Flow

HTTP Trigger Flow

1. Client → REST API
   POST /functions/v1/my-function
   Headers: Authorization, Content-Type
   Body: {"key": "value"}

2. REST API → Auth Module
   Extract JWT → RlsContext

3. REST API → Invoker
   invoke("my-function", payload, context)

4. Invoker:
   a. Lookup function in registry
   b. Load WASM module
   c. Create instance with host functions
   d. Call exported `handler` function
   e. Enforce timeout (10s default)
   f. Enforce memory limit (128MB default)

5. Function (WASM):
   a. Parse payload
   b. Call host function: db_query(...)
   c. Call host function: log(...)
   d. Return result

6. Invoker → REST API
   Return result or error

7. REST API → Client
   200 OK {result} or 500 Error

Database Trigger Flow

1. Database → WAL
   INSERT INTO users VALUES (...)

2. WAL → Event Log (Phase 10)
   Emit DatabaseEvent::Insert

3. Event Log → Function Trigger Dispatcher
   Check: Are there DB triggers for "users" table?

4. Trigger Dispatcher → Invoker
   For each matching function:
     invoke(function, {table: "users", op: "INSERT", row: ...})

5. Invoker executes function (same as HTTP flow)

6. Function result logged, errors don't crash database

Non-Determinism Note: Database triggers run after WAL commit, not during transaction. They are best-effort, not transactional.


Schedule Trigger Flow

1. Scheduler (background thread):
   Loop every 1 second:
     a. Get current time
     b. Find jobs where next_run <= now
     c. Spawn task for each due job

2. Spawned Task → Invoker
   invoke(function, {timestamp: now})

3. Invoker executes function

4. Scheduler updates next_run based on cron expression

Host Functions

Functions call out to AeroDB via imported host functions:

db_query

#[host_function]
fn db_query(sql: String, params: Vec<Value>) -> Result<Vec<Row>> {
    // Parse SQL
    let query = parse_sql(&sql)?;

    // Execute with RLS context (from invocation)
    let result = executor.execute(query, &rls_context)?;

    Ok(result)
}

WASM Side:

// Import from host
extern "C" {
    fn db_query(sql_ptr: *const u8, sql_len: usize) -> i32;
}

// Wrapper
function query(sql, params) {
    return JSON.parse(callHost(db_query, JSON.stringify({sql, params})));
}


log

#[host_function]
fn log(level: String, message: String) {
    let level = match level.as_str() {
        "info" => log::Level::Info,
        "warn" => log::Level::Warn,
        "error" => log::Level::Error,
        _ => log::Level::Debug,
    };

    log::log!(level, "[Function] {}", message);
}

http_fetch (Future)

#[host_function]
async fn http_fetch(url: String) -> Result<String> {
    let client = reqwest::Client::new();
    let response = client.get(&url).send().await?;
    Ok(response.text().await?)
}

Security: Whitelist allowed domains in function config


env

#[host_function]
fn env(key: String) -> Option<String> {
    // Only return whitelisted env vars
    let allowed = ["DATABASE_URL", "API_KEY"];

    if allowed.contains(&key.as_str()) {
        std::env::var(&key).ok()
    } else {
        None
    }
}

Resource Enforcement

Timeout

pub async fn invoke_with_timeout(
    instance: Instance,
    payload: Value,
) -> Result<Value> {
    let timeout = Duration::from_secs(10);

    match tokio::time::timeout(timeout, call_handler(instance, payload)).await {
        Ok(result) => result,
        Err(_) => Err(FunctionError::Timeout),
    }
}

Behavior: - Function exceeds 10s → Killed, error returned - Partial work is NOT rolled back (non-transactional)


Memory Limit

let mut store = Store::new(&engine);
store.limiter(|_| ResourceLimiter {
    memory_size: 128 * 1024 * 1024,  // 128MB
});

let instance = Instance::new(&mut store, &module, &imports)?;

Behavior: - Function allocates > 128MB → Panic, error returned - Memory is freed when function completes


Stack Limit

let mut config = Config::new();
config.max_wasm_stack(1024 * 1024);  // 1MB
let engine = Engine::new(&config)?;

Behavior: - Deep recursion → Stack overflow, error returned


Error Handling

Function Errors

pub enum FunctionError {
    NotFound,              // Function doesn't exist
    InvalidWasm,           // WASM compilation failure
    Timeout,               // Exceeded timeout
    OutOfMemory,           // Exceeded memory limit
    HostFunctionError,     // db_query failed, etc.
    RuntimePanic,          // WASM panic
}

HTTP Mapping:

impl From<FunctionError> for HttpStatus {
    fn from(err: FunctionError) -> HttpStatus {
        match err {
            FunctionError::NotFound => 404,
            FunctionError::InvalidWasm => 400,
            FunctionError::Timeout => 504,  // Gateway Timeout
            FunctionError::OutOfMemory => 507,
            FunctionError::HostFunctionError => 500,
            FunctionError::RuntimePanic => 500,
        }
    }
}


Error Isolation

Key Principle: Function errors do NOT affect database or other functions.

pub async fn invoke_safe(&self, func: &Function, payload: Value) -> InvocationResult {
    match self.invoke_internal(func, payload).await {
        Ok(result) => InvocationResult::Success(result),
        Err(e) => {
            log::error!("Function '{}' failed: {}", func.name, e);
            InvocationResult::Error {
                code: e.code(),
                message: e.to_string(),
            }
        }
    }
    // Database is unaffected
}

Integration Points

With Authentication (Phase 8)

// HTTP trigger: RLS context from JWT
let context = extract_rls_context(&request)?;
invoker.invoke("my-function", payload, &context)?;

// Database trigger: Inherit context from triggering transaction
let context = transaction.rls_context();
invoker.invoke("on-user-created", user_data, &context)?;

// Schedule trigger: Use service role context
let context = RlsContext::service_role();
invoker.invoke("cleanup-job", empty_payload, &context)?;

With REST API (Phase 9)

Endpoints:

POST   /functions/v1/deploy              # Deploy new function
GET    /functions/v1/{name}              # Get function metadata
DELETE /functions/v1/{name}              # Undeploy function
POST   /functions/v1/invoke/{name}       # Invoke function (HTTP trigger)


With Real-Time (Phase 10)

Functions can write to database, events propagate automatically:

// Function writes to DB
export async function handler() {
  await db.query("INSERT INTO notifications (user_id, message) VALUES (...) ");
  // Event emitted via Phase 10
}

// Client subscribes to notifications
supabase.channel('notifications')
  .on('INSERT', handleNotification)
  .subscribe();

ObservabilityMetrics

functions_invocations_total{name, trigger, status}
functions_duration_seconds{name}
functions_memory_bytes{name}
functions_timeout_total{name}

Logs

{
  "level": "INFO",
  "service": "functions",
  "operation": "invoke",
  "function": "send-email",
  "trigger": "http",
  "duration_ms": 234,
  "memory_mb": 12,
  "status": "success",
  "user_id": "uuid"
}

Alerts

  • Timeout rate > 10%
  • Error rate > 5%
  • Memory usage > 90% of limit