Data Architecture
Comprehensive documentation of Fulcrum's data layer, including database topology, schema design, multi-tenancy implementation, event sourcing patterns, and operational considerations.
Version: 1.0 Last Updated: January 6, 2026 Status: Production Ready
Table of Contents
- Executive Summary
- Database Topology
- Schema Design
- Multi-Tenancy Implementation
- Time-Series Data
- Event Sourcing Patterns
- Caching Strategy
- Data Retention Policies
- Backup and Recovery
- Performance Considerations
1. Executive Summary
Fulcrum's data architecture is designed for enterprise-grade AI governance with the following key characteristics:
| Attribute | Implementation |
|---|---|
| Primary Store | PostgreSQL 16 with TimescaleDB |
| Event Streaming | NATS JetStream |
| Caching | Redis 7 |
| Multi-Tenancy | PostgreSQL Row-Level Security (RLS) |
| Time-Series | TimescaleDB hypertables |
| Schema Separation | fulcrum (core) + metrics (time-series) |
Design Principles
- Security First: Hard tenant isolation at the database layer via RLS
- Performance: Sub-10ms policy lookups through Redis caching
- Durability: File-based storage with JetStream for events
- Scalability: Horizontal scaling via schema-based separation
- Observability: Comprehensive audit trails and metrics collection
2. Database Topology
2.1 Infrastructure Components
+------------------+ +------------------+ +------------------+
| Application | | Application | | Application |
| (Go Server) | | (Event Processor)| | (Dashboard) |
+--------+---------+ +--------+---------+ +--------+---------+
| | |
v v v
+--------+---------+ +--------+---------+ +--------+---------+
| | | | | |
| PostgreSQL 16 |<---->| NATS JetStream |<---->| Redis 7 |
| + TimescaleDB | | (Event Bus) | | (Cache) |
| | | | | |
+------------------+ +------------------+ +------------------+
| | |
v v v
+------------------+ +------------------+ +------------------+
| fulcrum schema | | File Storage | | In-Memory |
| metrics schema | | 20GB max | | AOF Persistence |
+------------------+ +------------------+ +------------------+
2.2 Component Specifications
PostgreSQL 16 with TimescaleDB
| Setting | Value | Purpose |
|---|---|---|
| Image | timescale/timescaledb-ha:pg16 |
High-availability TimescaleDB |
| Port | 5432 | Standard PostgreSQL |
| Extensions | uuid-ossp, pgcrypto, timescaledb |
UUID generation, encryption, time-series |
| Schemas | fulcrum, metrics |
Logical separation |
| Volume | postgres_data |
Persistent storage |
Connection String Pattern:
NATS JetStream
| Setting | Value | Purpose |
|---|---|---|
| Image | nats:2.10-alpine |
Lightweight NATS |
| Ports | 4222 (client), 8222 (monitoring) | Standard NATS ports |
| Max Memory | 1GB | Burst handling |
| Max File | 20GB | Durable storage |
| Domain | fulcrum |
JetStream domain |
| Max Payload | 8MB | Large event support |
Configuration (/infra/docker/nats.conf):
Redis 7
| Setting | Value | Purpose |
|---|---|---|
| Image | redis:7-alpine |
Latest stable Redis |
| Port | 6379 | Standard Redis |
| Persistence | AOF (appendonly yes) |
Durability |
| Volume | redis_data |
Persistent storage |
3. Schema Design
3.1 Schema Overview
Fulcrum uses a single-database, multi-schema architecture:
fulcrum_dev (Database)
|
+-- fulcrum (Schema) # Core business entities
| |-- tenants
| |-- budgets
| |-- policies
| |-- policy_sets
| |-- envelopes
| |-- checkpoints
| +-- ...
|
+-- metrics (Schema) # Time-series data
|-- llm_metrics (hypertable)
+-- fulcrum_events (hypertable)
3.2 Entity-Relationship Diagram
+------------------+ +------------------+ +------------------+
| TENANTS | | BUDGETS | | POLICIES |
+------------------+ +------------------+ +------------------+
| id (PK, UUID) |<---+ | budget_id (PK) | +-->| id (PK, UUID) |
| name | | | tenant_id (FK) |---+ | tenant_id (FK) |---+
| api_key_hash | | | workflow_id | | name | |
| settings (JSONB) | | | name | | policy_type | |
| created_at | | | limits (JSONB) | | rules (JSONB) | |
| updated_at | | | current_spend | | status | |
| deleted_at | | | reset_at | | priority | |
+------------------+ | +------------------+ | enabled | |
^ | ^ +------------------+ |
| | | ^ |
| | | | |
+-------+---------------+----------+-------+ | |
| | | |
| +------------------+ | | |
| | ENVELOPES | | | |
| +------------------+ | | |
| | id (PK, UUID) | | | |
+--------------| tenant_id (FK) | | | |
| budget_id (FK) |--------+ | |
| policy_set_id | | |
| status | | |
| adapter_type | | |
| workflow_id | | |
| tokens_used | | |
| cost_usd | | |
| llm_calls | | |
| tool_calls | | |
| metadata (JSONB) | | |
+------------------+ | |
| | |
v | |
+------------------+ +------------------+| |
|POLICY_EVALUATIONS| | POLICY_SETS || |
+------------------+ +------------------+| |
| id (PK, UUID) | | id (PK, UUID) || |
| envelope_id (FK) | | tenant_id (FK) |-+ |
| policy_id (FK) |------>| name | |
| decision | | description | |
| message | +------------------+ |
| details (JSONB) | | |
+------------------+ v |
| +----------------------+ |
v | POLICY_SET_MEMBERS | |
+------------------+ +----------------------+ |
| POLICY_APPROVALS | | policy_set_id (FK) | |
+------------------+ | policy_id (FK) |---------------+
| id (PK, UUID) | +----------------------+
| evaluation_id |
| status |
| reviewer_id |
| review_note |
+------------------+
3.3 Core Tables
tenants
The root entity for multi-tenancy. All other entities reference back to a tenant.
CREATE TABLE fulcrum.tenants (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
api_key_hash VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
-- Indexes
CREATE INDEX idx_tenants_api_key ON tenants(api_key_hash);
CREATE INDEX idx_tenants_deleted ON tenants(deleted_at) WHERE deleted_at IS NULL;
Source: /infra/migrations/postgres/000001_initial_schema.up.sql:15-27
budgets
Budget tracking with hierarchical limits and spend tracking.
CREATE TABLE fulcrum.budgets (
budget_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
workflow_id VARCHAR(255),
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(50),
period JSONB, -- Period configuration
limits JSONB, -- Budget limits (tokens, cost, etc.)
thresholds JSONB, -- Alert thresholds
current_spend JSONB, -- Current spend tracking
tags JSONB DEFAULT '{}',
budget_data JSONB NOT NULL, -- Full Protobuf storage
reset_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Source: /infra/migrations/postgres/000001_initial_schema.up.sql:31-49
policies
Governance policies with three execution phases.
CREATE TABLE fulcrum.policies (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
name VARCHAR(255) NOT NULL,
policy_type VARCHAR(50) NOT NULL CHECK (policy_type IN ('PRE', 'MID', 'POST')),
rules JSONB NOT NULL,
status VARCHAR(50) DEFAULT 'ACTIVE', -- DRAFT, ACTIVE, INACTIVE, ARCHIVED
priority INTEGER DEFAULT 0,
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for policy lookup performance
CREATE INDEX idx_policies_tenant ON policies(tenant_id);
CREATE INDEX idx_policies_type ON policies(policy_type);
CREATE INDEX idx_policies_enabled ON policies(enabled) WHERE enabled = true;
Policy Types:
- PRE: Pre-execution validation (before agent starts)
- MID: Mid-execution checks (before LLM calls)
- POST: Post-execution analysis (after completion)
Source: /infra/migrations/postgres/000001_initial_schema.up.sql:55-69
envelopes
Execution envelopes track the lifecycle of governed AI operations.
CREATE TABLE fulcrum.envelopes (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
budget_id UUID REFERENCES budgets(budget_id),
policy_set_id UUID REFERENCES policy_sets(id),
-- Lifecycle
status VARCHAR(50) NOT NULL DEFAULT 'PENDING'
CHECK (status IN ('PENDING', 'AUTHORIZED', 'RUNNING', 'PAUSED',
'COMPLETED', 'FAILED', 'TERMINATED')),
-- Framework context
adapter_type VARCHAR(50),
workflow_id TEXT,
native_ref JSONB,
checkpoint_id UUID,
-- Cost tracking
tokens_used BIGINT DEFAULT 0,
cost_usd DECIMAL(18, 8) DEFAULT 0,
llm_calls INTEGER DEFAULT 0,
tool_calls INTEGER DEFAULT 0,
-- Metadata
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
Source: /infra/migrations/postgres/000001_initial_schema.up.sql:92-127
checkpoints
State persistence for execution recovery.
CREATE TABLE fulcrum.checkpoints (
checkpoint_id VARCHAR(255) NOT NULL,
execution_id VARCHAR(255) NOT NULL,
envelope_id VARCHAR(255) NOT NULL,
tenant_id VARCHAR(255) NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
parent_version VARCHAR(255),
metadata JSONB NOT NULL DEFAULT '{}',
data JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE,
size_bytes BIGINT NOT NULL DEFAULT 0,
save_duration_ms BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (execution_id, version),
UNIQUE (checkpoint_id)
);
-- Indexes for checkpoint retrieval
CREATE INDEX idx_checkpoints_envelope_id ON checkpoints(envelope_id);
CREATE INDEX idx_checkpoints_tenant_id ON checkpoints(tenant_id);
CREATE INDEX idx_checkpoints_created_at ON checkpoints(created_at DESC);
CREATE INDEX idx_checkpoints_metadata ON checkpoints USING GIN (metadata);
Source: /infra/migrations/postgres/000001_initial_schema.up.sql:131-157
3.4 Supporting Tables
api_keys
Granular API key management with scopes.
CREATE TABLE fulcrum.api_keys (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
key_hash VARCHAR(64) NOT NULL UNIQUE, -- SHA256 hex string
key_hint VARCHAR(10), -- Last 4 chars for display
name VARCHAR(255),
scopes TEXT[] DEFAULT '{}', -- ["policy:read", "policy:approve", "admin"]
expires_at TIMESTAMPTZ,
last_used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Source: /infra/migrations/postgres/000005_granular_api_keys.up.sql:6-17
subscriptions (Billing)
Stripe integration for SaaS billing.
CREATE TABLE fulcrum.subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
stripe_customer_id VARCHAR(255) NOT NULL,
stripe_subscription_id VARCHAR(255),
plan_id VARCHAR(50) NOT NULL DEFAULT 'developer',
status VARCHAR(50) NOT NULL DEFAULT 'active',
current_period_start TIMESTAMPTZ,
current_period_end TIMESTAMPTZ,
token_limit BIGINT NOT NULL DEFAULT 10000,
tokens_used BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id),
UNIQUE(stripe_customer_id)
);
Source: /infra/migrations/postgres/000011_billing.up.sql:9-24
audit_logs
General-purpose audit trail for compliance.
CREATE TABLE fulcrum.audit_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id VARCHAR(255) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id VARCHAR(255) NOT NULL,
actor_email VARCHAR(255),
action VARCHAR(100) NOT NULL,
resource_type VARCHAR(50) NOT NULL,
resource_id VARCHAR(255),
resource_name VARCHAR(255),
changes JSONB,
ip_address INET,
user_agent TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'success'
CHECK (status IN ('success', 'failure')),
error_message TEXT
);
-- Indexes for common audit queries
CREATE INDEX idx_audit_logs_org_timestamp ON audit_logs(org_id, timestamp DESC);
CREATE INDEX idx_audit_logs_actor ON audit_logs(org_id, actor_id);
CREATE INDEX idx_audit_logs_resource ON audit_logs(org_id, resource_type, resource_id);
CREATE INDEX idx_audit_logs_action ON audit_logs(org_id, action);
Source: /infra/migrations/postgres/000012_audit_logs.up.sql:3-28
incidents (Immune System)
Security incident tracking for the Immune System component.
CREATE TABLE fulcrum.incidents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
incident_id VARCHAR(255) UNIQUE NOT NULL,
severity VARCHAR(50) NOT NULL,
category VARCHAR(100) NOT NULL,
description TEXT,
details JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_incidents_tenant ON incidents(tenant_id);
CREATE INDEX idx_incidents_category ON incidents(category);
CREATE INDEX idx_incidents_created ON incidents(created_at DESC);
Source: /infra/migrations/postgres/000006_immune_system.up.sql:2-15
attack_patterns (Immune System)
Pattern recognition for automated policy generation.
CREATE TABLE fulcrum.attack_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pattern_id VARCHAR(255) UNIQUE NOT NULL,
signature VARCHAR(500) NOT NULL,
frequency INTEGER DEFAULT 1,
indicators JSONB DEFAULT '[]',
affected_tenants JSONB DEFAULT '[]',
first_seen TIMESTAMPTZ DEFAULT NOW(),
last_seen TIMESTAMPTZ DEFAULT NOW(),
generated_policy_id UUID REFERENCES policies(id)
);
CREATE INDEX idx_patterns_signature ON attack_patterns(signature);
CREATE INDEX idx_patterns_frequency ON attack_patterns(frequency DESC);
Source: /infra/migrations/postgres/000006_immune_system.up.sql:17-31
4. Multi-Tenancy Implementation
4.1 Row-Level Security (RLS)
Fulcrum enforces hard tenant isolation at the database layer using PostgreSQL's Row-Level Security feature. This provides defense-in-depth even if application-level bugs exist.
How It Works
Application Request
|
v
+------------------+
| Set Session Var |
| fulcrum.current_ |
| tenant = 'uuid' |
+------------------+
|
v
+------------------+
| Execute Query |
| SELECT * FROM |
| envelopes |
+------------------+
|
v
+------------------+
| RLS Policy |
| tenant_id = |
| current_tenant |
+------------------+
|
v
+------------------+
| Only tenant's |
| rows returned |
+------------------+
RLS Policy Definitions
-- Enable RLS on core tables
ALTER TABLE fulcrum.tenants ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.budgets ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policies ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_sets ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.envelopes ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_evaluations ENABLE ROW LEVEL SECURITY;
ALTER TABLE fulcrum.policy_approvals ENABLE ROW LEVEL SECURITY;
-- Tenant isolation policy (example for budgets)
CREATE POLICY tenant_isolation_budgets ON fulcrum.budgets
FOR ALL
USING (tenant_id = current_setting('fulcrum.current_tenant')::uuid);
-- Nested isolation for evaluations (via envelope -> tenant)
CREATE POLICY tenant_isolation_evaluations ON fulcrum.policy_evaluations
FOR ALL
USING (
envelope_id IN (
SELECT id FROM fulcrum.envelopes
WHERE tenant_id = current_setting('fulcrum.current_tenant')::uuid
)
);
Source: /infra/migrations/postgres/000002_enable_rls.up.sql
4.2 Tenant Context Injection
Every database operation must set the tenant context before executing queries:
// In Go application code
func (s *Store) setTenantContext(ctx context.Context, tenantID string) error {
query := fmt.Sprintf("SET fulcrum.current_tenant = '%s'", tenantID)
_, err := s.db.ExecContext(ctx, query)
return err
}
4.3 Tables with RLS
| Table | RLS Enabled | Isolation Strategy |
|---|---|---|
tenants |
Yes | Direct (id = current_tenant) |
budgets |
Yes | Direct (tenant_id) |
policies |
Yes | Direct (tenant_id) |
policy_sets |
Yes | Direct (tenant_id) |
envelopes |
Yes | Direct (tenant_id) |
policy_evaluations |
Yes | Via envelope FK |
policy_approvals |
Yes | Via evaluation FK |
incidents |
Yes | Direct (tenant_id) |
attack_patterns |
Yes | Permissive (system-wide patterns) |
subscriptions |
Yes | Direct (tenant_id) |
billing_events |
Yes | Direct (tenant_id) |
audit_logs |
Yes | Via org_id |
retention_policies |
Yes | Read-only (global) |
5. Time-Series Data
5.1 TimescaleDB Configuration
TimescaleDB extends PostgreSQL with automatic partitioning (hypertables) optimized for time-series workloads.
-- Create TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
-- Create metrics schema
CREATE SCHEMA IF NOT EXISTS metrics;
5.2 Hypertables
llm_metrics
Tracks LLM call performance and costs.
CREATE TABLE metrics.llm_metrics (
time TIMESTAMPTZ NOT NULL,
tenant_id UUID NOT NULL,
execution_id UUID,
model TEXT NOT NULL,
tokens_used INT NOT NULL,
cost_cents INT NOT NULL,
latency_ms INT NOT NULL,
status TEXT NOT NULL
);
-- Convert to hypertable (auto-partitions by time)
SELECT create_hypertable('metrics.llm_metrics', 'time', if_not_exists => TRUE);
-- Add retention policy (30 days)
SELECT add_retention_policy('metrics.llm_metrics', INTERVAL '30 days', if_not_exists => TRUE);
-- Indexes for common query patterns
CREATE INDEX idx_llm_metrics_tenant_time ON metrics.llm_metrics (tenant_id, time DESC);
CREATE INDEX idx_llm_metrics_execution ON metrics.llm_metrics (execution_id, time DESC);
Source: /infra/migrations/postgres/000009_create_metrics_tables.up.sql:7-36
fulcrum_events
Event storage for analytics and replay.
CREATE TABLE metrics.fulcrum_events (
event_id UUID NOT NULL,
envelope_id UUID NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
event_type TEXT NOT NULL,
event_data JSONB,
severity TEXT,
processed_at TIMESTAMPTZ,
PRIMARY KEY (timestamp, event_id)
);
-- Convert to hypertable
SELECT create_hypertable('metrics.fulcrum_events', 'timestamp', if_not_exists => TRUE);
-- Indexes
CREATE INDEX idx_fulcrum_events_envelope ON metrics.fulcrum_events (envelope_id, timestamp DESC);
CREATE INDEX idx_fulcrum_events_timestamp ON metrics.fulcrum_events (timestamp DESC);
Source: /infra/migrations/postgres/000009_create_metrics_tables.up.sql:38-61
5.3 Hypertable Benefits
| Feature | Benefit |
|---|---|
| Auto-partitioning | No manual partition management |
| Chunk exclusion | Queries only scan relevant time ranges |
| Compression | Automatic compression of old chunks |
| Retention policies | Automatic deletion of expired data |
| Continuous aggregates | Pre-computed rollups for dashboards |
5.4 Query Patterns
-- Recent LLM metrics for a tenant
SELECT time, model, tokens_used, latency_ms
FROM metrics.llm_metrics
WHERE tenant_id = $1
AND time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
-- Aggregated daily costs
SELECT
time_bucket('1 day', time) AS day,
model,
SUM(tokens_used) AS total_tokens,
SUM(cost_cents) AS total_cost_cents,
AVG(latency_ms) AS avg_latency_ms
FROM metrics.llm_metrics
WHERE tenant_id = $1
AND time > NOW() - INTERVAL '30 days'
GROUP BY day, model
ORDER BY day DESC;
6. Event Sourcing Patterns
6.1 NATS JetStream Streams
Fulcrum uses NATS JetStream for durable event streaming with the following stream architecture:
+------------------+ +------------------+ +------------------+
| EXECUTION | | LLM | | TOOL |
| EVENTS | | EVENTS | | EVENTS |
+------------------+ +------------------+ +------------------+
| fulcrum.events. | | fulcrum.events. | | fulcrum.events. |
| execution.* | | llm.* | | tool.* |
+------------------+ +------------------+ +------------------+
+------------------+ +------------------+
| CHECKPOINTS | | AUDIT_LOG |
+------------------+ +------------------+
| fulcrum.events. | | fulcrum.events. |
| checkpoint.* | | audit.* |
+------------------+ +------------------+
Stream Definitions
// Stream names
const (
StreamExecutionEvents = "FULCRUM_EXECUTION_EVENTS"
StreamLLMEvents = "FULCRUM_LLM_EVENTS"
StreamToolEvents = "FULCRUM_TOOL_EVENTS"
StreamCheckpoints = "FULCRUM_CHECKPOINTS"
StreamAuditLog = "FULCRUM_AUDIT_LOG"
)
// Stream configuration
streams := []struct {
name string
subjects []string
}{
{
name: StreamExecutionEvents,
subjects: []string{"fulcrum.events.execution.*"},
},
{
name: StreamLLMEvents,
subjects: []string{"fulcrum.events.llm.*"},
},
{
name: StreamToolEvents,
subjects: []string{"fulcrum.events.tool.*"},
},
{
name: StreamCheckpoints,
subjects: []string{"fulcrum.events.checkpoint.*"},
},
{
name: StreamAuditLog,
subjects: []string{"fulcrum.events.audit.*"},
},
}
Source: /internal/eventstore/store.go:23-115
6.2 Stream Configuration
| Setting | Value | Purpose |
|---|---|---|
| Retention | LimitsPolicy |
Bounded by age/size |
| Max Age | 7 days (default) | Auto-cleanup |
| Max Messages | 1,000,000 | Per-stream limit |
| Storage | FileStorage |
Durable persistence |
| Replicas | 1 | Single node (increase for HA) |
| Compression | S2Compression |
Space efficiency |
| Discard | DiscardOld |
Drop oldest on overflow |
6.3 Event Types
// Subject mapping based on event type
func (es *EventStore) getSubjectForEventType(eventType string) string {
switch {
case eventType == "execution_started" ||
eventType == "execution_completed" ||
eventType == "execution_failed":
return "fulcrum.events.execution." + eventType
case eventType == "llm_call" || eventType == "llm_response":
return "fulcrum.events.llm." + eventType
case eventType == "tool_call" || eventType == "tool_response":
return "fulcrum.events.tool." + eventType
case eventType == "checkpoint_saved" || eventType == "checkpoint_restored":
return "fulcrum.events.checkpoint." + eventType
default:
return "fulcrum.events.audit." + eventType
}
}
Source: /internal/eventstore/store.go:476-489
6.4 Event Payload Structure
message StoredEvent {
string event_id = 1;
string execution_id = 2;
string envelope_id = 3;
string tenant_id = 4;
string workflow_id = 5;
string event_type = 6;
google.protobuf.Timestamp timestamp = 7;
google.protobuf.Struct payload = 8;
string trace_id = 9;
string span_id = 10;
map<string, string> labels = 11;
google.protobuf.Timestamp stored_at = 12;
string stream_name = 13;
int64 stream_sequence = 14;
}
6.5 Consumer Patterns
Pull-Based Query
// Query events with filters
func (es *EventStore) QueryEvents(ctx context.Context, req *QueryEventsRequest) (*QueryEventsResponse, error) {
opts := []nats.SubOpt{
nats.BindStream(streamName),
nats.DeliverAll(),
nats.AckNone(),
nats.MaxDeliver(1),
}
if req.StartTime != nil {
opts = append(opts, nats.StartTime(req.StartTime.AsTime()))
}
consumer, err := es.js.PullSubscribe("", "", opts...)
// ... fetch and filter
}
Push-Based Streaming
// Real-time event streaming
func (es *EventStore) StreamEvents(ctx context.Context, req *StreamEventsRequest,
eventChan chan<- *StoredEvent) error {
opts := []nats.SubOpt{
nats.BindStream(streamName),
nats.DeliverNew(), // Only new events
nats.AckExplicit(), // Manual acknowledgment
}
sub, err := es.js.Subscribe("", func(msg *nats.Msg) {
// ... process and send to channel
msg.Ack()
}, opts...)
}
Source: /internal/eventstore/store.go:233-412
7. Caching Strategy
7.1 Redis Cache Architecture
+-------------------+ +-------------------+
| Policy Engine | | Cost Engine |
+-------------------+ +-------------------+
| |
v v
+-------------------+ +-------------------+
| Redis Cache | | In-Memory LRU |
| (Distributed) | | (Local) |
+-------------------+ +-------------------+
7.2 Redis Key Patterns
Policy Cache
Key Pattern: policies:{tenant_id}:{status}
TTL: 5 minutes
Value: Protobuf-encoded PolicyList
Example:
policies:550e8400-e29b-41d4-a716-446655440000:POLICY_STATUS_ACTIVE
Implementation:
func (s *Store) ListPolicies(ctx context.Context, tenantID string, status PolicyStatus) ([]*Policy, error) {
// 1. Try cache
if s.rdb != nil {
cacheKey := fmt.Sprintf("policies:%s:%s", tenantID, status)
val, err := s.rdb.Get(ctx, cacheKey).Bytes()
if err == nil {
var list PolicyList
if err := proto.Unmarshal(val, &list); err == nil {
return list.Policies, nil // Cache hit
}
}
}
// 2. Query database
policies := queryFromDB(...)
// 3. Update cache
if s.rdb != nil {
cacheKey := fmt.Sprintf("policies:%s:%s", tenantID, status)
data, _ := proto.Marshal(&PolicyList{Policies: policies})
s.rdb.Set(ctx, cacheKey, data, 5*time.Minute)
}
return policies, nil
}
Source: /internal/policyengine/store.go:39-156
7.3 Cache Invalidation Strategy
// On policy create/update/delete
func (s *Store) invalidatePolicyCache(ctx context.Context, tenantID string) {
if s.rdb == nil {
return
}
// Pattern-based deletion for all status variants
iter := s.rdb.Scan(ctx, 0, fmt.Sprintf("policies:%s:*", tenantID), 0).Iterator()
for iter.Next(ctx) {
s.rdb.Del(ctx, iter.Val())
}
}
Source: /internal/policyengine/store.go:256-269
7.4 In-Memory LRU Cache
For high-frequency lookups within a single process, Fulcrum uses a bounded LRU cache:
type BoundedCache[T any] struct {
maxSize int
mu sync.RWMutex
items map[string]*list.Element
lruList *list.List
}
// O(1) operations with LRU eviction
func (c *BoundedCache[T]) Get(key string) (T, bool) { ... }
func (c *BoundedCache[T]) Set(key string, value T) { ... }
func (c *BoundedCache[T]) Delete(key string) bool { ... }
Source: /internal/costengine/cache.go
7.5 Cache TTL Policies
| Cache Type | TTL | Rationale |
|---|---|---|
| Policy list | 5 min | Balance freshness vs. performance |
| Tenant config | 10 min | Rarely changes |
| Budget state | 1 min | Needs near-real-time accuracy |
| Session data | 30 min | User session timeout |
8. Data Retention Policies
8.1 Retention Policy Table
CREATE TABLE fulcrum.retention_policies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
table_name VARCHAR(255) NOT NULL,
retention_days INTEGER NOT NULL CHECK (retention_days > 0),
partition_column VARCHAR(255) NOT NULL DEFAULT 'created_at',
enabled BOOLEAN NOT NULL DEFAULT true,
last_cleanup_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(table_name)
);
Source: /infra/migrations/postgres/000007_add_retention_policies.up.sql:9-19
8.2 Default Retention Periods
| Table | Retention | Rationale |
|---|---|---|
policy_evaluations |
90 days | Compliance audit trail |
policy_approvals |
180 days | Extended compliance |
checkpoints |
30 days | Short-lived state |
checkpoints_archive |
30 days | Archived state |
execution_contexts |
60 days | Debug context |
llm_metrics |
30 days | TimescaleDB retention |
audit_logs |
365 days | 1 year compliance (implicit) |
8.3 Automated Cleanup Function
CREATE OR REPLACE FUNCTION fulcrum.cleanup_old_data()
RETURNS TABLE (
table_name VARCHAR(255),
rows_deleted BIGINT,
cleanup_time TIMESTAMPTZ
) AS $$
DECLARE
policy_record RECORD;
delete_count BIGINT;
cutoff_date TIMESTAMPTZ;
BEGIN
FOR policy_record IN
SELECT * FROM fulcrum.retention_policies
WHERE enabled = true
LOOP
cutoff_date := NOW() - (policy_record.retention_days || ' days')::INTERVAL;
EXECUTE format(
'DELETE FROM fulcrum.%I WHERE %I < $1',
policy_record.table_name,
policy_record.partition_column
) USING cutoff_date;
GET DIAGNOSTICS delete_count = ROW_COUNT;
UPDATE fulcrum.retention_policies
SET last_cleanup_at = NOW()
WHERE id = policy_record.id;
table_name := policy_record.table_name;
rows_deleted := delete_count;
cleanup_time := NOW();
RETURN NEXT;
END LOOP;
RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
Source: /infra/migrations/postgres/000007_add_retention_policies.up.sql:54-96
8.4 Scheduled Cleanup (pg_cron)
-- Schedule daily cleanup at 2 AM UTC (requires pg_cron extension)
SELECT cron.schedule(
'fulcrum-retention-cleanup',
'0 2 * * *',
'SELECT fulcrum.cleanup_old_data()'
);
8.5 Manual Cleanup
-- Cleanup specific table manually
SELECT * FROM fulcrum.cleanup_table('policy_evaluations');
-- Check retention status
SELECT * FROM fulcrum.retention_status;
8.6 Retention Status View
CREATE OR REPLACE VIEW fulcrum.retention_status AS
SELECT
rp.table_name,
rp.retention_days,
rp.enabled,
rp.last_cleanup_at,
CASE
WHEN rp.last_cleanup_at IS NULL THEN 'Never cleaned'
WHEN rp.last_cleanup_at < NOW() - INTERVAL '2 days' THEN 'Overdue'
ELSE 'OK'
END AS status
FROM fulcrum.retention_policies rp
ORDER BY rp.table_name;
9. Backup and Recovery
9.1 Backup Strategy Overview
+------------------+ +------------------+ +------------------+
| Hot Backup | | Logical Backup | | Event Replay |
| (Streaming) | | (pg_dump) | | (JetStream) |
+------------------+ +------------------+ +------------------+
| | |
v v v
+------------------+ +------------------+ +------------------+
| Continuous WAL | | Daily Snapshots | | 7-Day Retention |
| Archiving | | to S3/GCS | | Event History |
+------------------+ +------------------+ +------------------+
9.2 pg_dump Procedures
Full Database Backup
#!/bin/bash
# Daily full backup script
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups/fulcrum"
DATABASE="fulcrum_dev"
USER="fulcrum"
# Create backup directory
mkdir -p ${BACKUP_DIR}
# Full database dump with compression
pg_dump -h localhost -U ${USER} -d ${DATABASE} \
--format=custom \
--compress=9 \
--verbose \
--file="${BACKUP_DIR}/fulcrum_${DATE}.dump"
# Verify backup
pg_restore --list "${BACKUP_DIR}/fulcrum_${DATE}.dump" > /dev/null
# Upload to remote storage (example: S3)
aws s3 cp "${BACKUP_DIR}/fulcrum_${DATE}.dump" \
"s3://fulcrum-backups/daily/${DATE}.dump"
# Cleanup old local backups (keep 7 days)
find ${BACKUP_DIR} -name "*.dump" -mtime +7 -delete
Schema-Specific Backup
# Backup specific schemas
pg_dump -h localhost -U fulcrum -d fulcrum_dev \
--schema=fulcrum \
--format=custom \
--file="fulcrum_schema_${DATE}.dump"
pg_dump -h localhost -U fulcrum -d fulcrum_dev \
--schema=metrics \
--format=custom \
--file="metrics_schema_${DATE}.dump"
Table-Specific Backup
# Backup critical tables only
pg_dump -h localhost -U fulcrum -d fulcrum_dev \
--table=fulcrum.tenants \
--table=fulcrum.policies \
--table=fulcrum.budgets \
--format=custom \
--file="critical_tables_${DATE}.dump"
9.3 Point-in-Time Recovery (PITR)
Enable WAL Archiving
# postgresql.conf
wal_level = replica
archive_mode = on
archive_command = 'test ! -f /wal_archive/%f && cp %p /wal_archive/%f'
archive_timeout = 60
Recovery Procedure
# 1. Stop PostgreSQL
pg_ctl stop -D /var/lib/postgresql/data
# 2. Clear data directory (backup first!)
rm -rf /var/lib/postgresql/data/*
# 3. Restore base backup
pg_restore -d postgres --create /backups/base_backup.dump
# 4. Create recovery.conf (PostgreSQL 12+: recovery.signal)
cat > /var/lib/postgresql/data/recovery.signal << EOF
restore_command = 'cp /wal_archive/%f %p'
recovery_target_time = '2026-01-06 14:30:00'
recovery_target_action = 'promote'
EOF
# 5. Start PostgreSQL
pg_ctl start -D /var/lib/postgresql/data
9.4 Redis Backup
# Manual RDB snapshot
redis-cli BGSAVE
# Copy RDB file
cp /data/dump.rdb /backups/redis_${DATE}.rdb
# AOF backup (for point-in-time)
cp /data/appendonly.aof /backups/redis_aof_${DATE}.aof
9.5 NATS JetStream Backup
# Export stream data
nats stream backup FULCRUM_EXECUTION_EVENTS /backups/nats/execution_events
nats stream backup FULCRUM_LLM_EVENTS /backups/nats/llm_events
nats stream backup FULCRUM_TOOL_EVENTS /backups/nats/tool_events
nats stream backup FULCRUM_CHECKPOINTS /backups/nats/checkpoints
nats stream backup FULCRUM_AUDIT_LOG /backups/nats/audit_log
# Restore stream
nats stream restore FULCRUM_EXECUTION_EVENTS /backups/nats/execution_events
9.6 Disaster Recovery Runbook
| Step | Action | RTO | RPO |
|---|---|---|---|
| 1 | Detect failure | 5 min | - |
| 2 | Assess scope | 10 min | - |
| 3 | Restore PostgreSQL from backup | 30 min | 24 hours (daily) |
| 4 | Apply WAL for PITR | 15 min | Minutes |
| 5 | Restore Redis cache | 5 min | N/A (cache) |
| 6 | Restore NATS streams | 15 min | 7 days |
| 7 | Verify application | 15 min | - |
| Total | ~90 min | Minutes to 24h |
10. Performance Considerations
10.1 Index Strategy
All tables include carefully chosen indexes for common query patterns:
-- Policy lookup (hot path)
CREATE INDEX idx_policies_tenant ON policies(tenant_id);
CREATE INDEX idx_policies_type ON policies(policy_type);
CREATE INDEX idx_policies_enabled ON policies(enabled) WHERE enabled = true;
-- Envelope queries
CREATE INDEX idx_envelopes_tenant ON envelopes(tenant_id);
CREATE INDEX idx_envelopes_status ON envelopes(status);
CREATE INDEX idx_envelopes_created ON envelopes(created_at DESC);
-- Time-series optimization (TimescaleDB)
CREATE INDEX idx_llm_metrics_tenant_time ON metrics.llm_metrics (tenant_id, time DESC);
CREATE INDEX idx_fulcrum_events_envelope ON metrics.fulcrum_events (envelope_id, timestamp DESC);
-- JSONB GIN indexes for flexible queries
CREATE INDEX idx_checkpoints_metadata ON checkpoints USING GIN (metadata);
CREATE INDEX idx_execution_contexts_data ON execution_contexts USING GIN (data);
10.2 Connection Pooling
PostgreSQL
# Recommended PgBouncer configuration
[databases]
fulcrum = host=localhost port=5432 dbname=fulcrum_dev
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5
reserve_pool_timeout = 3
Redis
// Go Redis client with pooling
rdb := redis.NewClient(&redis.Options{
Addr: "redis:6379",
PoolSize: 10,
MinIdleConns: 5,
PoolTimeout: 4 * time.Second,
})
10.3 Query Optimization Tips
-
Always filter by tenant_id first - RLS enforces this, but explicit filters help query planning
-
Use prepared statements - Reduce parse overhead for repeated queries
-
Batch inserts - Use multi-row INSERT for bulk operations
-
**Avoid SELECT *** - Explicitly list needed columns
-
Use EXPLAIN ANALYZE - Profile slow queries
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM fulcrum.policies
WHERE tenant_id = '...' AND status = 'ACTIVE';
10.4 Monitoring Queries
-- Active connections per database
SELECT datname, numbackends
FROM pg_stat_database
WHERE datname = 'fulcrum_dev';
-- Slow queries (requires pg_stat_statements)
SELECT query, calls, mean_time, total_time
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10;
-- Table sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname IN ('fulcrum', 'metrics')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- Index usage
SELECT
indexrelname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'fulcrum'
ORDER BY idx_scan DESC;
Appendix A: Migration History
| Version | Description | Date |
|---|---|---|
| 000001 | Initial schema (tenants, budgets, policies, envelopes, checkpoints) | 2025-12 |
| 000002 | Enable Row-Level Security | 2025-12 |
| 000003 | Add policy status field | 2025-12 |
| 000004 | Add policy evaluation details | 2025-12 |
| 000005 | Granular API keys | 2025-12 |
| 000006 | Immune System (incidents, attack_patterns) | 2025-12 |
| 000007 | Retention policies | 2025-12-27 |
| 000008 | Consolidate metrics schema | 2026-01 |
| 000009 | Create metrics tables (llm_metrics, fulcrum_events) | 2026-01 |
| 000010 | Policy audit logs | 2026-01 |
| 000011 | Billing (subscriptions, billing_events) | 2026-01-03 |
| 000012 | General audit logs | 2026-01 |
Appendix B: Connection Strings
Development
# PostgreSQL (fulcrum schema)
POSTGRES_CONN_STR="postgresql://fulcrum:fulcrum@postgres:5432/fulcrum_dev?sslmode=disable&search_path=fulcrum"
# PostgreSQL (metrics schema)
POSTGRES_CONN_STR_METRICS="postgresql://fulcrum:fulcrum@postgres:5432/fulcrum_dev?sslmode=disable&search_path=metrics"
# NATS
NATS_URL="nats://nats:4222"
# Redis
REDIS_URL="redis://redis:6379"
REDIS_ADDR="redis:6379"
Production
# Use secrets management (e.g., Kubernetes secrets, Vault)
POSTGRES_CONN_STR="${POSTGRES_SECRET}"
NATS_URL="${NATS_SECRET}"
REDIS_URL="${REDIS_SECRET}"
Appendix C: Related Documentation
Document generated from source analysis of Fulcrum codebase Last updated: January 6, 2026 ```