Skip to content

Data Architecture

Comprehensive documentation of Fulcrum's data layer, including database topology, schema design, multi-tenancy implementation, event sourcing patterns, and operational considerations.

Version: 1.0 Last Updated: January 6, 2026 Status: Production Ready


Table of Contents

  1. Executive Summary
  2. Database Topology
  3. Schema Design
  4. Multi-Tenancy Implementation
  5. Time-Series Data
  6. Event Sourcing Patterns
  7. Caching Strategy
  8. Data Retention Policies
  9. Backup and Recovery
  10. Performance Considerations

1. Executive Summary

Fulcrum's data architecture is designed for enterprise-grade AI governance with the following key characteristics:

Attribute Implementation
Primary Store PostgreSQL 16 with TimescaleDB
Event Streaming NATS JetStream
Caching Redis 7
Multi-Tenancy PostgreSQL Row-Level Security (RLS)
Time-Series TimescaleDB hypertables
Schema Separation fulcrum (core) + metrics (time-series)

Design Principles

  1. Security First: Hard tenant isolation at the database layer via RLS
  2. Performance: Sub-10ms policy lookups through Redis caching
  3. Durability: File-based storage with JetStream for events
  4. Scalability: Horizontal scaling via schema-based separation
  5. Observability: Comprehensive audit trails and metrics collection

2. Database Topology

2.1 Infrastructure Components

+------------------+      +------------------+      +------------------+
|   Application    |      |   Application    |      |   Application    |
|    (Go Server)   |      | (Event Processor)|      |   (Dashboard)    |
+--------+---------+      +--------+---------+      +--------+---------+
         |                         |                         |
         v                         v                         v
+--------+---------+      +--------+---------+      +--------+---------+
|                  |      |                  |      |                  |
|  PostgreSQL 16   |<---->|  NATS JetStream  |<---->|    Redis 7       |
|  + TimescaleDB   |      |   (Event Bus)    |      |   (Cache)        |
|                  |      |                  |      |                  |
+------------------+      +------------------+      +------------------+
      |                         |                         |
      v                         v                         v
+------------------+      +------------------+      +------------------+
| fulcrum schema   |      | File Storage     |      | In-Memory        |
| metrics schema   |      | 20GB max         |      | AOF Persistence  |
+------------------+      +------------------+      +------------------+

2.2 Component Specifications

PostgreSQL 16 with TimescaleDB

Setting Value Purpose
Image timescale/timescaledb-ha:pg16 High-availability TimescaleDB
Port 5432 Standard PostgreSQL
Extensions uuid-ossp, pgcrypto, timescaledb UUID generation, encryption, time-series
Schemas fulcrum, metrics Logical separation
Volume postgres_data Persistent storage

Connection String Pattern:

postgresql://{user}:{password}@postgres:5432/{database}?sslmode=disable&search_path={schema}

NATS JetStream

Setting Value Purpose
Image nats:2.10-alpine Lightweight NATS
Ports 4222 (client), 8222 (monitoring) Standard NATS ports
Max Memory 1GB Burst handling
Max File 20GB Durable storage
Domain fulcrum JetStream domain
Max Payload 8MB Large event support

Configuration (/infra/docker/nats.conf):

jetstream {
  store_dir: "/data"
  max_mem: 1G
  max_file: 20G
  domain: fulcrum
}

Redis 7

Setting Value Purpose
Image redis:7-alpine Latest stable Redis
Port 6379 Standard Redis
Persistence AOF (appendonly yes) Durability
Volume redis_data Persistent storage

3. Schema Design

3.1 Schema Overview

Fulcrum uses a single-database, multi-schema architecture:

fulcrum_dev (Database)
  |
  +-- fulcrum (Schema)      # Core business entities
  |     |-- tenants
  |     |-- budgets
  |     |-- policies
  |     |-- policy_sets
  |     |-- envelopes
  |     |-- checkpoints
  |     +-- ...
  |
  +-- metrics (Schema)      # Time-series data
        |-- llm_metrics (hypertable)
        +-- fulcrum_events (hypertable)

3.2 Entity-Relationship Diagram

+------------------+       +------------------+       +------------------+
|     TENANTS      |       |     BUDGETS      |       |    POLICIES      |
+------------------+       +------------------+       +------------------+
| id (PK, UUID)    |<---+  | budget_id (PK)   |   +-->| id (PK, UUID)    |
| name             |    |  | tenant_id (FK)   |---+   | tenant_id (FK)   |---+
| api_key_hash     |    |  | workflow_id      |       | name             |   |
| settings (JSONB) |    |  | name             |       | policy_type      |   |
| created_at       |    |  | limits (JSONB)   |       | rules (JSONB)    |   |
| updated_at       |    |  | current_spend    |       | status           |   |
| deleted_at       |    |  | reset_at         |       | priority         |   |
+------------------+    |  +------------------+       | enabled          |   |
        ^               |          ^                  +------------------+   |
        |               |          |                          ^              |
        |               |          |                          |              |
+-------+---------------+----------+-------+                  |              |
|                                          |                  |              |
|              +------------------+        |                  |              |
|              |    ENVELOPES     |        |                  |              |
|              +------------------+        |                  |              |
|              | id (PK, UUID)    |        |                  |              |
+--------------| tenant_id (FK)   |        |                  |              |
               | budget_id (FK)   |--------+                  |              |
               | policy_set_id    |                           |              |
               | status           |                           |              |
               | adapter_type     |                           |              |
               | workflow_id      |                           |              |
               | tokens_used      |                           |              |
               | cost_usd         |                           |              |
               | llm_calls        |                           |              |
               | tool_calls       |                           |              |
               | metadata (JSONB) |                           |              |
               +------------------+                           |              |
                       |                                      |              |
                       v                                      |              |
               +------------------+       +------------------+|              |
               |POLICY_EVALUATIONS|       |   POLICY_SETS    ||              |
               +------------------+       +------------------+|              |
               | id (PK, UUID)    |       | id (PK, UUID)    ||              |
               | envelope_id (FK) |       | tenant_id (FK)   |-+             |
               | policy_id (FK)   |------>| name             |               |
               | decision         |       | description      |               |
               | message          |       +------------------+               |
               | details (JSONB)  |               |                          |
               +------------------+               v                          |
                       |              +----------------------+               |
                       v              |  POLICY_SET_MEMBERS  |               |
               +------------------+   +----------------------+               |
               | POLICY_APPROVALS |   | policy_set_id (FK)   |               |
               +------------------+   | policy_id (FK)       |---------------+
               | id (PK, UUID)    |   +----------------------+
               | evaluation_id    |
               | status           |
               | reviewer_id      |
               | review_note      |
               +------------------+

3.3 Core Tables

tenants

The root entity for multi-tenancy. All other entities reference back to a tenant.

CREATE TABLE fulcrum.tenants (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name VARCHAR(255) NOT NULL,
    api_key_hash VARCHAR(255) NOT NULL,
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    deleted_at TIMESTAMPTZ
);

-- Indexes
CREATE INDEX idx_tenants_api_key ON tenants(api_key_hash);
CREATE INDEX idx_tenants_deleted ON tenants(deleted_at) WHERE deleted_at IS NULL;

Source: /infra/migrations/postgres/000001_initial_schema.up.sql:15-27

budgets

Budget tracking with hierarchical limits and spend tracking.

CREATE TABLE fulcrum.budgets (
    budget_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    tenant_id UUID NOT NULL REFERENCES tenants(id),
    workflow_id VARCHAR(255),
    name VARCHAR(255) NOT NULL,
    description TEXT,
    status VARCHAR(50),
    period JSONB,           -- Period configuration
    limits JSONB,           -- Budget limits (tokens, cost, etc.)
    thresholds JSONB,       -- Alert thresholds
    current_spend JSONB,    -- Current spend tracking
    tags JSONB DEFAULT '{}',
    budget_data JSONB NOT NULL,  -- Full Protobuf storage
    reset_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Source: /infra/migrations/postgres/000001_initial_schema.up.sql:31-49

policies

Governance policies with three execution phases.

CREATE TABLE fulcrum.policies (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    tenant_id UUID NOT NULL REFERENCES tenants(id),
    name VARCHAR(255) NOT NULL,
    policy_type VARCHAR(50) NOT NULL CHECK (policy_type IN ('PRE', 'MID', 'POST')),
    rules JSONB NOT NULL,
    status VARCHAR(50) DEFAULT 'ACTIVE',  -- DRAFT, ACTIVE, INACTIVE, ARCHIVED
    priority INTEGER DEFAULT 0,
    enabled BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for policy lookup performance
CREATE INDEX idx_policies_tenant ON policies(tenant_id);
CREATE INDEX idx_policies_type ON policies(policy_type);
CREATE INDEX idx_policies_enabled ON policies(enabled) WHERE enabled = true;

Policy Types: - PRE: Pre-execution validation (before agent starts) - MID: Mid-execution checks (before LLM calls) - POST: Post-execution analysis (after completion)

Source: /infra/migrations/postgres/000001_initial_schema.up.sql:55-69

envelopes

Execution envelopes track the lifecycle of governed AI operations.

CREATE TABLE fulcrum.envelopes (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    tenant_id UUID NOT NULL REFERENCES tenants(id),
    budget_id UUID REFERENCES budgets(budget_id),
    policy_set_id UUID REFERENCES policy_sets(id),

    -- Lifecycle
    status VARCHAR(50) NOT NULL DEFAULT 'PENDING'
        CHECK (status IN ('PENDING', 'AUTHORIZED', 'RUNNING', 'PAUSED',
                          'COMPLETED', 'FAILED', 'TERMINATED')),

    -- Framework context
    adapter_type VARCHAR(50),
    workflow_id TEXT,
    native_ref JSONB,
    checkpoint_id UUID,

    -- Cost tracking
    tokens_used BIGINT DEFAULT 0,
    cost_usd DECIMAL(18, 8) DEFAULT 0,
    llm_calls INTEGER DEFAULT 0,
    tool_calls INTEGER DEFAULT 0,

    -- Metadata
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ
);

Source: /infra/migrations/postgres/000001_initial_schema.up.sql:92-127

checkpoints

State persistence for execution recovery.

CREATE TABLE fulcrum.checkpoints (
    checkpoint_id VARCHAR(255) NOT NULL,
    execution_id VARCHAR(255) NOT NULL,
    envelope_id VARCHAR(255) NOT NULL,
    tenant_id VARCHAR(255) NOT NULL,
    version INTEGER NOT NULL DEFAULT 1,
    parent_version VARCHAR(255),
    metadata JSONB NOT NULL DEFAULT '{}',
    data JSONB NOT NULL DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    expires_at TIMESTAMP WITH TIME ZONE,
    size_bytes BIGINT NOT NULL DEFAULT 0,
    save_duration_ms BIGINT NOT NULL DEFAULT 0,
    PRIMARY KEY (execution_id, version),
    UNIQUE (checkpoint_id)
);

-- Indexes for checkpoint retrieval
CREATE INDEX idx_checkpoints_envelope_id ON checkpoints(envelope_id);
CREATE INDEX idx_checkpoints_tenant_id ON checkpoints(tenant_id);
CREATE INDEX idx_checkpoints_created_at ON checkpoints(created_at DESC);
CREATE INDEX idx_checkpoints_metadata ON checkpoints USING GIN (metadata);

Source: /infra/migrations/postgres/000001_initial_schema.up.sql:131-157

3.4 Supporting Tables

api_keys

Granular API key management with scopes.

CREATE TABLE fulcrum.api_keys (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    key_hash VARCHAR(64) NOT NULL UNIQUE,  -- SHA256 hex string
    key_hint VARCHAR(10),                   -- Last 4 chars for display
    name VARCHAR(255),
    scopes TEXT[] DEFAULT '{}',             -- ["policy:read", "policy:approve", "admin"]
    expires_at TIMESTAMPTZ,
    last_used_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Source: /infra/migrations/postgres/000005_granular_api_keys.up.sql:6-17

subscriptions (Billing)

Stripe integration for SaaS billing.

CREATE TABLE fulcrum.subscriptions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    stripe_customer_id VARCHAR(255) NOT NULL,
    stripe_subscription_id VARCHAR(255),
    plan_id VARCHAR(50) NOT NULL DEFAULT 'developer',
    status VARCHAR(50) NOT NULL DEFAULT 'active',
    current_period_start TIMESTAMPTZ,
    current_period_end TIMESTAMPTZ,
    token_limit BIGINT NOT NULL DEFAULT 10000,
    tokens_used BIGINT NOT NULL DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(tenant_id),
    UNIQUE(stripe_customer_id)
);

Source: /infra/migrations/postgres/000011_billing.up.sql:9-24

audit_logs

General-purpose audit trail for compliance.

CREATE TABLE fulcrum.audit_logs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id VARCHAR(255) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    actor_id VARCHAR(255) NOT NULL,
    actor_email VARCHAR(255),
    action VARCHAR(100) NOT NULL,
    resource_type VARCHAR(50) NOT NULL,
    resource_id VARCHAR(255),
    resource_name VARCHAR(255),
    changes JSONB,
    ip_address INET,
    user_agent TEXT,
    status VARCHAR(20) NOT NULL DEFAULT 'success'
        CHECK (status IN ('success', 'failure')),
    error_message TEXT
);

-- Indexes for common audit queries
CREATE INDEX idx_audit_logs_org_timestamp ON audit_logs(org_id, timestamp DESC);
CREATE INDEX idx_audit_logs_actor ON audit_logs(org_id, actor_id);
CREATE INDEX idx_audit_logs_resource ON audit_logs(org_id, resource_type, resource_id);
CREATE INDEX idx_audit_logs_action ON audit_logs(org_id, action);

Source: /infra/migrations/postgres/000012_audit_logs.up.sql:3-28

incidents (Immune System)

Security incident tracking for the Immune System component.

CREATE TABLE fulcrum.incidents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL REFERENCES tenants(id),
    incident_id VARCHAR(255) UNIQUE NOT NULL,
    severity VARCHAR(50) NOT NULL,
    category VARCHAR(100) NOT NULL,
    description TEXT,
    details JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_incidents_tenant ON incidents(tenant_id);
CREATE INDEX idx_incidents_category ON incidents(category);
CREATE INDEX idx_incidents_created ON incidents(created_at DESC);

Source: /infra/migrations/postgres/000006_immune_system.up.sql:2-15

attack_patterns (Immune System)

Pattern recognition for automated policy generation.

CREATE TABLE fulcrum.attack_patterns (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pattern_id VARCHAR(255) UNIQUE NOT NULL,
    signature VARCHAR(500) NOT NULL,
    frequency INTEGER DEFAULT 1,
    indicators JSONB DEFAULT '[]',
    affected_tenants JSONB DEFAULT '[]',
    first_seen TIMESTAMPTZ DEFAULT NOW(),
    last_seen TIMESTAMPTZ DEFAULT NOW(),
    generated_policy_id UUID REFERENCES policies(id)
);

CREATE INDEX idx_patterns_signature ON attack_patterns(signature);
CREATE INDEX idx_patterns_frequency ON attack_patterns(frequency DESC);

Source: /infra/migrations/postgres/000006_immune_system.up.sql:17-31


4. Multi-Tenancy Implementation

4.1 Row-Level Security (RLS)

Fulcrum enforces hard tenant isolation at the database layer using PostgreSQL's Row-Level Security feature. This provides defense-in-depth even if application-level bugs exist.

How It Works

Application Request
        |
        v
+------------------+
| Set Session Var  |
| fulcrum.current_ |
| tenant = 'uuid'  |
+------------------+
        |
        v
+------------------+
|   Execute Query  |
|  SELECT * FROM   |
|    envelopes     |
+------------------+
        |
        v
+------------------+
|   RLS Policy     |
|  tenant_id =     |
|  current_tenant  |
+------------------+
        |
        v
+------------------+
| Only tenant's    |
| rows returned    |
+------------------+

RLS Policy Definitions

-- Enable RLS on core tables
ALTER TABLE fulcrum.tenants ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.budgets ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policies ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_sets ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.envelopes ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_evaluations ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_approvals ENABLE ROW LEVEL SECURITY;

-- Tenant isolation policy (example for budgets)
CREATE POLICY tenant_isolation_budgets ON fulcrum.budgets
    FOR ALL
    USING (tenant_id = current_setting('fulcrum.current_tenant')::uuid);

-- Nested isolation for evaluations (via envelope -> tenant)
CREATE POLICY tenant_isolation_evaluations ON fulcrum.policy_evaluations
    FOR ALL
    USING (
        envelope_id IN (
            SELECT id FROM fulcrum.envelopes
            WHERE tenant_id = current_setting('fulcrum.current_tenant')::uuid
        )
    );

Source: /infra/migrations/postgres/000002_enable_rls.up.sql

4.2 Tenant Context Injection

Every database operation must set the tenant context before executing queries:

// In Go application code
func (s *Store) setTenantContext(ctx context.Context, tenantID string) error {
    query := fmt.Sprintf("SET fulcrum.current_tenant = '%s'", tenantID)
    _, err := s.db.ExecContext(ctx, query)
    return err
}

4.3 Tables with RLS

Table RLS Enabled Isolation Strategy
tenants Yes Direct (id = current_tenant)
budgets Yes Direct (tenant_id)
policies Yes Direct (tenant_id)
policy_sets Yes Direct (tenant_id)
envelopes Yes Direct (tenant_id)
policy_evaluations Yes Via envelope FK
policy_approvals Yes Via evaluation FK
incidents Yes Direct (tenant_id)
attack_patterns Yes Permissive (system-wide patterns)
subscriptions Yes Direct (tenant_id)
billing_events Yes Direct (tenant_id)
audit_logs Yes Via org_id
retention_policies Yes Read-only (global)

5. Time-Series Data

5.1 TimescaleDB Configuration

TimescaleDB extends PostgreSQL with automatic partitioning (hypertables) optimized for time-series workloads.

-- Create TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- Create metrics schema
CREATE SCHEMA IF NOT EXISTS metrics;

5.2 Hypertables

llm_metrics

Tracks LLM call performance and costs.

CREATE TABLE metrics.llm_metrics (
    time TIMESTAMPTZ NOT NULL,
    tenant_id UUID NOT NULL,
    execution_id UUID,
    model TEXT NOT NULL,
    tokens_used INT NOT NULL,
    cost_cents INT NOT NULL,
    latency_ms INT NOT NULL,
    status TEXT NOT NULL
);

-- Convert to hypertable (auto-partitions by time)
SELECT create_hypertable('metrics.llm_metrics', 'time', if_not_exists => TRUE);

-- Add retention policy (30 days)
SELECT add_retention_policy('metrics.llm_metrics', INTERVAL '30 days', if_not_exists => TRUE);

-- Indexes for common query patterns
CREATE INDEX idx_llm_metrics_tenant_time ON metrics.llm_metrics (tenant_id, time DESC);
CREATE INDEX idx_llm_metrics_execution ON metrics.llm_metrics (execution_id, time DESC);

Source: /infra/migrations/postgres/000009_create_metrics_tables.up.sql:7-36

fulcrum_events

Event storage for analytics and replay.

CREATE TABLE metrics.fulcrum_events (
    event_id UUID NOT NULL,
    envelope_id UUID NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    event_type TEXT NOT NULL,
    event_data JSONB,
    severity TEXT,
    processed_at TIMESTAMPTZ,
    PRIMARY KEY (timestamp, event_id)
);

-- Convert to hypertable
SELECT create_hypertable('metrics.fulcrum_events', 'timestamp', if_not_exists => TRUE);

-- Indexes
CREATE INDEX idx_fulcrum_events_envelope ON metrics.fulcrum_events (envelope_id, timestamp DESC);
CREATE INDEX idx_fulcrum_events_timestamp ON metrics.fulcrum_events (timestamp DESC);

Source: /infra/migrations/postgres/000009_create_metrics_tables.up.sql:38-61

5.3 Hypertable Benefits

Feature Benefit
Auto-partitioning No manual partition management
Chunk exclusion Queries only scan relevant time ranges
Compression Automatic compression of old chunks
Retention policies Automatic deletion of expired data
Continuous aggregates Pre-computed rollups for dashboards

5.4 Query Patterns

-- Recent LLM metrics for a tenant
SELECT time, model, tokens_used, latency_ms
FROM metrics.llm_metrics
WHERE tenant_id = $1
  AND time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;

-- Aggregated daily costs
SELECT
    time_bucket('1 day', time) AS day,
    model,
    SUM(tokens_used) AS total_tokens,
    SUM(cost_cents) AS total_cost_cents,
    AVG(latency_ms) AS avg_latency_ms
FROM metrics.llm_metrics
WHERE tenant_id = $1
  AND time > NOW() - INTERVAL '30 days'
GROUP BY day, model
ORDER BY day DESC;

6. Event Sourcing Patterns

6.1 NATS JetStream Streams

Fulcrum uses NATS JetStream for durable event streaming with the following stream architecture:

+------------------+    +------------------+    +------------------+
|   EXECUTION      |    |      LLM         |    |      TOOL        |
|    EVENTS        |    |    EVENTS        |    |    EVENTS        |
+------------------+    +------------------+    +------------------+
| fulcrum.events.  |    | fulcrum.events.  |    | fulcrum.events.  |
| execution.*      |    | llm.*            |    | tool.*           |
+------------------+    +------------------+    +------------------+

+------------------+    +------------------+
|   CHECKPOINTS    |    |   AUDIT_LOG      |
+------------------+    +------------------+
| fulcrum.events.  |    | fulcrum.events.  |
| checkpoint.*     |    | audit.*          |
+------------------+    +------------------+

Stream Definitions

// Stream names
const (
    StreamExecutionEvents = "FULCRUM_EXECUTION_EVENTS"
    StreamLLMEvents       = "FULCRUM_LLM_EVENTS"
    StreamToolEvents      = "FULCRUM_TOOL_EVENTS"
    StreamCheckpoints     = "FULCRUM_CHECKPOINTS"
    StreamAuditLog        = "FULCRUM_AUDIT_LOG"
)

// Stream configuration
streams := []struct {
    name     string
    subjects []string
}{
    {
        name:     StreamExecutionEvents,
        subjects: []string{"fulcrum.events.execution.*"},
    },
    {
        name:     StreamLLMEvents,
        subjects: []string{"fulcrum.events.llm.*"},
    },
    {
        name:     StreamToolEvents,
        subjects: []string{"fulcrum.events.tool.*"},
    },
    {
        name:     StreamCheckpoints,
        subjects: []string{"fulcrum.events.checkpoint.*"},
    },
    {
        name:     StreamAuditLog,
        subjects: []string{"fulcrum.events.audit.*"},
    },
}

Source: /internal/eventstore/store.go:23-115

6.2 Stream Configuration

Setting Value Purpose
Retention LimitsPolicy Bounded by age/size
Max Age 7 days (default) Auto-cleanup
Max Messages 1,000,000 Per-stream limit
Storage FileStorage Durable persistence
Replicas 1 Single node (increase for HA)
Compression S2Compression Space efficiency
Discard DiscardOld Drop oldest on overflow

6.3 Event Types

// Subject mapping based on event type
func (es *EventStore) getSubjectForEventType(eventType string) string {
    switch {
    case eventType == "execution_started" ||
         eventType == "execution_completed" ||
         eventType == "execution_failed":
        return "fulcrum.events.execution." + eventType

    case eventType == "llm_call" || eventType == "llm_response":
        return "fulcrum.events.llm." + eventType

    case eventType == "tool_call" || eventType == "tool_response":
        return "fulcrum.events.tool." + eventType

    case eventType == "checkpoint_saved" || eventType == "checkpoint_restored":
        return "fulcrum.events.checkpoint." + eventType

    default:
        return "fulcrum.events.audit." + eventType
    }
}

Source: /internal/eventstore/store.go:476-489

6.4 Event Payload Structure

message StoredEvent {
    string event_id = 1;
    string execution_id = 2;
    string envelope_id = 3;
    string tenant_id = 4;
    string workflow_id = 5;
    string event_type = 6;
    google.protobuf.Timestamp timestamp = 7;
    google.protobuf.Struct payload = 8;
    string trace_id = 9;
    string span_id = 10;
    map<string, string> labels = 11;
    google.protobuf.Timestamp stored_at = 12;
    string stream_name = 13;
    int64 stream_sequence = 14;
}

6.5 Consumer Patterns

Pull-Based Query

// Query events with filters
func (es *EventStore) QueryEvents(ctx context.Context, req *QueryEventsRequest) (*QueryEventsResponse, error) {
    opts := []nats.SubOpt{
        nats.BindStream(streamName),
        nats.DeliverAll(),
        nats.AckNone(),
        nats.MaxDeliver(1),
    }

    if req.StartTime != nil {
        opts = append(opts, nats.StartTime(req.StartTime.AsTime()))
    }

    consumer, err := es.js.PullSubscribe("", "", opts...)
    // ... fetch and filter
}

Push-Based Streaming

// Real-time event streaming
func (es *EventStore) StreamEvents(ctx context.Context, req *StreamEventsRequest,
    eventChan chan<- *StoredEvent) error {

    opts := []nats.SubOpt{
        nats.BindStream(streamName),
        nats.DeliverNew(),       // Only new events
        nats.AckExplicit(),      // Manual acknowledgment
    }

    sub, err := es.js.Subscribe("", func(msg *nats.Msg) {
        // ... process and send to channel
        msg.Ack()
    }, opts...)
}

Source: /internal/eventstore/store.go:233-412


7. Caching Strategy

7.1 Redis Cache Architecture

+-------------------+     +-------------------+
|  Policy Engine    |     |  Cost Engine      |
+-------------------+     +-------------------+
         |                         |
         v                         v
+-------------------+     +-------------------+
|  Redis Cache      |     |  In-Memory LRU    |
|  (Distributed)    |     |  (Local)          |
+-------------------+     +-------------------+

7.2 Redis Key Patterns

Policy Cache

Key Pattern: policies:{tenant_id}:{status}
TTL: 5 minutes
Value: Protobuf-encoded PolicyList

Example:
  policies:550e8400-e29b-41d4-a716-446655440000:POLICY_STATUS_ACTIVE

Implementation:

func (s *Store) ListPolicies(ctx context.Context, tenantID string, status PolicyStatus) ([]*Policy, error) {
    // 1. Try cache
    if s.rdb != nil {
        cacheKey := fmt.Sprintf("policies:%s:%s", tenantID, status)
        val, err := s.rdb.Get(ctx, cacheKey).Bytes()
        if err == nil {
            var list PolicyList
            if err := proto.Unmarshal(val, &list); err == nil {
                return list.Policies, nil  // Cache hit
            }
        }
    }

    // 2. Query database
    policies := queryFromDB(...)

    // 3. Update cache
    if s.rdb != nil {
        cacheKey := fmt.Sprintf("policies:%s:%s", tenantID, status)
        data, _ := proto.Marshal(&PolicyList{Policies: policies})
        s.rdb.Set(ctx, cacheKey, data, 5*time.Minute)
    }

    return policies, nil
}

Source: /internal/policyengine/store.go:39-156

7.3 Cache Invalidation Strategy

// On policy create/update/delete
func (s *Store) invalidatePolicyCache(ctx context.Context, tenantID string) {
    if s.rdb == nil {
        return
    }

    // Pattern-based deletion for all status variants
    iter := s.rdb.Scan(ctx, 0, fmt.Sprintf("policies:%s:*", tenantID), 0).Iterator()
    for iter.Next(ctx) {
        s.rdb.Del(ctx, iter.Val())
    }
}

Source: /internal/policyengine/store.go:256-269

7.4 In-Memory LRU Cache

For high-frequency lookups within a single process, Fulcrum uses a bounded LRU cache:

type BoundedCache[T any] struct {
    maxSize int
    mu      sync.RWMutex
    items   map[string]*list.Element
    lruList *list.List
}

// O(1) operations with LRU eviction
func (c *BoundedCache[T]) Get(key string) (T, bool) { ... }
func (c *BoundedCache[T]) Set(key string, value T) { ... }
func (c *BoundedCache[T]) Delete(key string) bool { ... }

Source: /internal/costengine/cache.go

7.5 Cache TTL Policies

Cache Type TTL Rationale
Policy list 5 min Balance freshness vs. performance
Tenant config 10 min Rarely changes
Budget state 1 min Needs near-real-time accuracy
Session data 30 min User session timeout

8. Data Retention Policies

8.1 Retention Policy Table

CREATE TABLE fulcrum.retention_policies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    table_name VARCHAR(255) NOT NULL,
    retention_days INTEGER NOT NULL CHECK (retention_days > 0),
    partition_column VARCHAR(255) NOT NULL DEFAULT 'created_at',
    enabled BOOLEAN NOT NULL DEFAULT true,
    last_cleanup_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(table_name)
);

Source: /infra/migrations/postgres/000007_add_retention_policies.up.sql:9-19

8.2 Default Retention Periods

Table Retention Rationale
policy_evaluations 90 days Compliance audit trail
policy_approvals 180 days Extended compliance
checkpoints 30 days Short-lived state
checkpoints_archive 30 days Archived state
execution_contexts 60 days Debug context
llm_metrics 30 days TimescaleDB retention
audit_logs 365 days 1 year compliance (implicit)

8.3 Automated Cleanup Function

CREATE OR REPLACE FUNCTION fulcrum.cleanup_old_data()
RETURNS TABLE (
    table_name VARCHAR(255),
    rows_deleted BIGINT,
    cleanup_time TIMESTAMPTZ
) AS $$
DECLARE
    policy_record RECORD;
    delete_count BIGINT;
    cutoff_date TIMESTAMPTZ;
BEGIN
    FOR policy_record IN
        SELECT * FROM fulcrum.retention_policies
        WHERE enabled = true
    LOOP
        cutoff_date := NOW() - (policy_record.retention_days || ' days')::INTERVAL;

        EXECUTE format(
            'DELETE FROM fulcrum.%I WHERE %I < $1',
            policy_record.table_name,
            policy_record.partition_column
        ) USING cutoff_date;

        GET DIAGNOSTICS delete_count = ROW_COUNT;

        UPDATE fulcrum.retention_policies
        SET last_cleanup_at = NOW()
        WHERE id = policy_record.id;

        table_name := policy_record.table_name;
        rows_deleted := delete_count;
        cleanup_time := NOW();
        RETURN NEXT;
    END LOOP;

    RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

Source: /infra/migrations/postgres/000007_add_retention_policies.up.sql:54-96

8.4 Scheduled Cleanup (pg_cron)

-- Schedule daily cleanup at 2 AM UTC (requires pg_cron extension)
SELECT cron.schedule(
    'fulcrum-retention-cleanup',
    '0 2 * * *',
    'SELECT fulcrum.cleanup_old_data()'
);

8.5 Manual Cleanup

-- Cleanup specific table manually
SELECT * FROM fulcrum.cleanup_table('policy_evaluations');

-- Check retention status
SELECT * FROM fulcrum.retention_status;

8.6 Retention Status View

CREATE OR REPLACE VIEW fulcrum.retention_status AS
SELECT
    rp.table_name,
    rp.retention_days,
    rp.enabled,
    rp.last_cleanup_at,
    CASE
        WHEN rp.last_cleanup_at IS NULL THEN 'Never cleaned'
        WHEN rp.last_cleanup_at < NOW() - INTERVAL '2 days' THEN 'Overdue'
        ELSE 'OK'
    END AS status
FROM fulcrum.retention_policies rp
ORDER BY rp.table_name;

9. Backup and Recovery

9.1 Backup Strategy Overview

+------------------+     +------------------+     +------------------+
|   Hot Backup     |     |  Logical Backup  |     |  Event Replay    |
|   (Streaming)    |     |   (pg_dump)      |     |  (JetStream)     |
+------------------+     +------------------+     +------------------+
        |                        |                        |
        v                        v                        v
+------------------+     +------------------+     +------------------+
| Continuous WAL   |     | Daily Snapshots  |     | 7-Day Retention  |
| Archiving        |     | to S3/GCS        |     | Event History    |
+------------------+     +------------------+     +------------------+

9.2 pg_dump Procedures

Full Database Backup

#!/bin/bash
# Daily full backup script

DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups/fulcrum"
DATABASE="fulcrum_dev"
USER="fulcrum"

# Create backup directory
mkdir -p ${BACKUP_DIR}

# Full database dump with compression
pg_dump -h localhost -U ${USER} -d ${DATABASE} \
    --format=custom \
    --compress=9 \
    --verbose \
    --file="${BACKUP_DIR}/fulcrum_${DATE}.dump"

# Verify backup
pg_restore --list "${BACKUP_DIR}/fulcrum_${DATE}.dump" > /dev/null

# Upload to remote storage (example: S3)
aws s3 cp "${BACKUP_DIR}/fulcrum_${DATE}.dump" \
    "s3://fulcrum-backups/daily/${DATE}.dump"

# Cleanup old local backups (keep 7 days)
find ${BACKUP_DIR} -name "*.dump" -mtime +7 -delete

Schema-Specific Backup

# Backup specific schemas
pg_dump -h localhost -U fulcrum -d fulcrum_dev \
    --schema=fulcrum \
    --format=custom \
    --file="fulcrum_schema_${DATE}.dump"

pg_dump -h localhost -U fulcrum -d fulcrum_dev \
    --schema=metrics \
    --format=custom \
    --file="metrics_schema_${DATE}.dump"

Table-Specific Backup

# Backup critical tables only
pg_dump -h localhost -U fulcrum -d fulcrum_dev \
    --table=fulcrum.tenants \
    --table=fulcrum.policies \
    --table=fulcrum.budgets \
    --format=custom \
    --file="critical_tables_${DATE}.dump"

9.3 Point-in-Time Recovery (PITR)

Enable WAL Archiving

# postgresql.conf
wal_level = replica
archive_mode = on
archive_command = 'test ! -f /wal_archive/%f && cp %p /wal_archive/%f'
archive_timeout = 60

Recovery Procedure

# 1. Stop PostgreSQL
pg_ctl stop -D /var/lib/postgresql/data

# 2. Clear data directory (backup first!)
rm -rf /var/lib/postgresql/data/*

# 3. Restore base backup
pg_restore -d postgres --create /backups/base_backup.dump

# 4. Create recovery.conf (PostgreSQL 12+: recovery.signal)
cat > /var/lib/postgresql/data/recovery.signal << EOF
restore_command = 'cp /wal_archive/%f %p'
recovery_target_time = '2026-01-06 14:30:00'
recovery_target_action = 'promote'
EOF

# 5. Start PostgreSQL
pg_ctl start -D /var/lib/postgresql/data

9.4 Redis Backup

# Manual RDB snapshot
redis-cli BGSAVE

# Copy RDB file
cp /data/dump.rdb /backups/redis_${DATE}.rdb

# AOF backup (for point-in-time)
cp /data/appendonly.aof /backups/redis_aof_${DATE}.aof

9.5 NATS JetStream Backup

# Export stream data
nats stream backup FULCRUM_EXECUTION_EVENTS /backups/nats/execution_events
nats stream backup FULCRUM_LLM_EVENTS /backups/nats/llm_events
nats stream backup FULCRUM_TOOL_EVENTS /backups/nats/tool_events
nats stream backup FULCRUM_CHECKPOINTS /backups/nats/checkpoints
nats stream backup FULCRUM_AUDIT_LOG /backups/nats/audit_log

# Restore stream
nats stream restore FULCRUM_EXECUTION_EVENTS /backups/nats/execution_events

9.6 Disaster Recovery Runbook

Step Action RTO RPO
1 Detect failure 5 min -
2 Assess scope 10 min -
3 Restore PostgreSQL from backup 30 min 24 hours (daily)
4 Apply WAL for PITR 15 min Minutes
5 Restore Redis cache 5 min N/A (cache)
6 Restore NATS streams 15 min 7 days
7 Verify application 15 min -
Total ~90 min Minutes to 24h

10. Performance Considerations

10.1 Index Strategy

All tables include carefully chosen indexes for common query patterns:

-- Policy lookup (hot path)
CREATE INDEX idx_policies_tenant ON policies(tenant_id);
CREATE INDEX idx_policies_type ON policies(policy_type);
CREATE INDEX idx_policies_enabled ON policies(enabled) WHERE enabled = true;

-- Envelope queries
CREATE INDEX idx_envelopes_tenant ON envelopes(tenant_id);
CREATE INDEX idx_envelopes_status ON envelopes(status);
CREATE INDEX idx_envelopes_created ON envelopes(created_at DESC);

-- Time-series optimization (TimescaleDB)
CREATE INDEX idx_llm_metrics_tenant_time ON metrics.llm_metrics (tenant_id, time DESC);
CREATE INDEX idx_fulcrum_events_envelope ON metrics.fulcrum_events (envelope_id, timestamp DESC);

-- JSONB GIN indexes for flexible queries
CREATE INDEX idx_checkpoints_metadata ON checkpoints USING GIN (metadata);
CREATE INDEX idx_execution_contexts_data ON execution_contexts USING GIN (data);

10.2 Connection Pooling

PostgreSQL

# Recommended PgBouncer configuration
[databases]
fulcrum = host=localhost port=5432 dbname=fulcrum_dev

[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5
reserve_pool_timeout = 3

Redis

// Go Redis client with pooling
rdb := redis.NewClient(&redis.Options{
    Addr:         "redis:6379",
    PoolSize:     10,
    MinIdleConns: 5,
    PoolTimeout:  4 * time.Second,
})

10.3 Query Optimization Tips

  1. Always filter by tenant_id first - RLS enforces this, but explicit filters help query planning

  2. Use prepared statements - Reduce parse overhead for repeated queries

  3. Batch inserts - Use multi-row INSERT for bulk operations

  4. **Avoid SELECT *** - Explicitly list needed columns

  5. Use EXPLAIN ANALYZE - Profile slow queries

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM fulcrum.policies
WHERE tenant_id = '...' AND status = 'ACTIVE';

10.4 Monitoring Queries

-- Active connections per database
SELECT datname, numbackends
FROM pg_stat_database
WHERE datname = 'fulcrum_dev';

-- Slow queries (requires pg_stat_statements)
SELECT query, calls, mean_time, total_time
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10;

-- Table sizes
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname IN ('fulcrum', 'metrics')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

-- Index usage
SELECT
    indexrelname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'fulcrum'
ORDER BY idx_scan DESC;

Appendix A: Migration History

Version Description Date
000001 Initial schema (tenants, budgets, policies, envelopes, checkpoints) 2025-12
000002 Enable Row-Level Security 2025-12
000003 Add policy status field 2025-12
000004 Add policy evaluation details 2025-12
000005 Granular API keys 2025-12
000006 Immune System (incidents, attack_patterns) 2025-12
000007 Retention policies 2025-12-27
000008 Consolidate metrics schema 2026-01
000009 Create metrics tables (llm_metrics, fulcrum_events) 2026-01
000010 Policy audit logs 2026-01
000011 Billing (subscriptions, billing_events) 2026-01-03
000012 General audit logs 2026-01

Appendix B: Connection Strings

Development

# PostgreSQL (fulcrum schema)
POSTGRES_CONN_STR="postgresql://fulcrum:fulcrum@postgres:5432/fulcrum_dev?sslmode=disable&search_path=fulcrum"

# PostgreSQL (metrics schema)
POSTGRES_CONN_STR_METRICS="postgresql://fulcrum:fulcrum@postgres:5432/fulcrum_dev?sslmode=disable&search_path=metrics"

# NATS
NATS_URL="nats://nats:4222"

# Redis
REDIS_URL="redis://redis:6379"
REDIS_ADDR="redis:6379"

Production

# Use secrets management (e.g., Kubernetes secrets, Vault)
POSTGRES_CONN_STR="${POSTGRES_SECRET}"
NATS_URL="${NATS_SECRET}"
REDIS_URL="${REDIS_SECRET}"

Document generated from source analysis of Fulcrum codebase Last updated: January 6, 2026 ```