Fulcrum Python SDK Reference
Comprehensive Python SDK documentation for integrating Fulcrum AI Governance into your applications.
Package: fulcrum-governance
Version: 0.1.0
Python: 3.9+
License: Apache-2.0
Table of Contents
- Installation
- Configuration
- Core Classes
- FulcrumClient
- Envelope
- API Reference
- Error Handling
- Middleware System
- Async Support
- Framework Integrations
- LangChain
- LlamaIndex
- OpenAI
- FastAPI
- Anthropic Claude
- Cost Tracking
- Best Practices
- Troubleshooting
Installation
pip (Recommended)
Poetry
requirements.txt
From Source
Dependencies
The SDK automatically installs:
| Package | Version | Purpose |
|---|---|---|
grpcio |
>=1.60.0 | gRPC communication |
protobuf |
>=4.25.0 | Protocol buffer messages |
googleapis-common-protos |
>=1.60.0 | Google API protos |
Development Dependencies
For SDK development:
Installs: grpcio-tools, pytest, pytest-asyncio, black, mypy, ruff
Configuration
Programmatic Configuration
from fulcrum import FulcrumClient
client = FulcrumClient(
host="localhost:50051", # Fulcrum server address
api_key="your-api-key", # API key for authentication
on_failure="FAIL_OPEN", # FAIL_OPEN or FAIL_CLOSED
timeout_ms=500, # Request timeout in milliseconds
)
Environment Variables
export FULCRUM_HOST="localhost:50051"
export FULCRUM_API_KEY="your-api-key"
export FULCRUM_TENANT_ID="your-tenant-id"
export FULCRUM_TIMEOUT_MS="500"
from fulcrum import FulcrumClient
# Auto-configure from environment
client = FulcrumClient.from_env()
Configuration Options
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost:50051" |
Fulcrum server address |
api_key |
Optional[str] |
None |
API key for authentication |
on_failure |
str |
"FAIL_OPEN" |
Behavior when governance unavailable |
timeout_ms |
int |
500 |
Request timeout in milliseconds |
TLS Configuration
TLS is automatically enabled for hosts using port 443:
# TLS auto-enabled for production
client = FulcrumClient(
host="api.fulcrum.dev:443",
api_key="your-api-key"
)
# Explicit insecure connection for local development
client = FulcrumClient(
host="localhost:50051", # Non-443 port uses insecure channel
api_key="dev-key"
)
Failure Modes
| Mode | Behavior | Use Case |
|---|---|---|
FAIL_OPEN |
Allow actions when Fulcrum is unavailable | Production with availability priority |
FAIL_CLOSED |
Block actions when Fulcrum is unavailable | High-security environments |
# High-availability mode (default)
client = FulcrumClient(
host="localhost:50051",
on_failure="FAIL_OPEN" # Allow execution if governance fails
)
# High-security mode
client = FulcrumClient(
host="localhost:50051",
on_failure="FAIL_CLOSED" # Block execution if governance fails
)
Core Classes
FulcrumClient
The main entry point for interacting with Fulcrum governance.
Constructor
def __init__(
self,
host: str = "localhost:50051",
api_key: Optional[str] = None,
on_failure: str = "FAIL_OPEN",
timeout_ms: int = 500
)
Parameters:
- host: Fulcrum server address in host:port format
- api_key: API key for authentication (sent as x-api-key header)
- on_failure: "FAIL_OPEN" or "FAIL_CLOSED" behavior
- timeout_ms: Request timeout for policy evaluations
Class Methods
from_env()
Create a client from environment variables.
@classmethod
def from_env(cls) -> 'FulcrumClient':
"""Create client from environment variables."""
return cls(
host=os.environ.get("FULCRUM_HOST", "localhost:50051"),
api_key=os.environ.get("FULCRUM_API_KEY"),
timeout_ms=int(os.environ.get("FULCRUM_TIMEOUT_MS", "500"))
)
Instance Methods
envelope()
Create a governance envelope context manager.
@contextmanager
def envelope(
self,
workflow_id: str = "default-workflow",
execution_id: Optional[str] = None,
tenant_id: str = "default-tenant",
adapter_type: str = "python-sdk",
metadata: Optional[Dict[str, str]] = None
) -> Generator['Envelope', None, None]:
"""
Create a governance envelope for tracking agent execution.
Parameters:
workflow_id: Identifier for the workflow type
execution_id: Unique execution ID (auto-generated if not provided)
tenant_id: Tenant identifier for multi-tenancy
adapter_type: SDK/adapter identifier
metadata: Additional key-value metadata
Yields:
Envelope: Context manager for governed execution
"""
Example:
with client.envelope(
workflow_id="customer-support",
tenant_id="acme-corp",
metadata={"user_id": "user123"}
) as env:
# Governed execution
if env.guard("send_email", input_text=user_message):
result = send_email(user_message)
env.log("email_sent", {"recipient": email})
add_middleware()
Add a middleware function for event processing.
def add_middleware(self, func: Callable) -> None:
"""
Add middleware for event processing.
Parameters:
func: Callable with signature (event_type, payload) -> (event_type, payload) or None
Return None to drop the event.
"""
Example:
def redact_pii(event_type: str, payload: dict) -> tuple:
"""Redact PII before logging."""
if "email" in payload:
payload["email"] = "***@***.***"
return event_type, payload
client.add_middleware(redact_pii)
shutdown()
Gracefully shutdown the client.
def shutdown(self) -> None:
"""
Cleanly shuts down the worker thread and flushes the event queue.
Called automatically via atexit, but can be called manually.
"""
Envelope
Context wrapper for governed agent executions. Created by FulcrumClient.envelope().
Properties
| Property | Type | Description |
|---|---|---|
execution_id |
str |
Unique execution identifier |
envelope_id |
str |
Envelope ID (from server or generated) |
tenant_id |
str |
Tenant identifier |
workflow_id |
str |
Workflow type identifier |
Methods
guard()
Synchronous policy evaluation barrier.
def guard(
self,
action_name: str,
input_text: str = "",
metadata: Optional[Dict[str, str]] = None
) -> bool:
"""
Evaluate policies before executing an action.
Parameters:
action_name: Name of the action to evaluate
input_text: Input content for semantic analysis
metadata: Additional context attributes
Returns:
bool: True if action is allowed, False if denied
"""
Example:
with client.envelope(workflow_id="chatbot") as env:
# Check before executing sensitive action
if env.guard("database_query", input_text=user_query):
result = execute_query(user_query)
else:
result = "This query is not permitted by policy."
log()
Log an event for audit trail.
def log(
self,
event_type: str,
payload: Dict[str, Any] = {}
) -> None:
"""
Log an event asynchronously to Fulcrum.
Parameters:
event_type: Type/category of the event
payload: Event data (must be JSON-serializable)
"""
Example:
env.log("llm_call_started", {
"model": "gpt-4",
"prompt_tokens": 150,
"temperature": 0.7
})
# After execution
env.log("llm_call_completed", {
"model": "gpt-4",
"completion_tokens": 250,
"latency_ms": 1234
})
API Reference
Complete Method Signatures
FulcrumClient
class FulcrumClient:
def __init__(
self,
host: str = "localhost:50051",
api_key: Optional[str] = None,
on_failure: str = "FAIL_OPEN",
timeout_ms: int = 500
) -> None: ...
@classmethod
def from_env(cls) -> 'FulcrumClient': ...
def envelope(
self,
workflow_id: str = "default-workflow",
execution_id: Optional[str] = None,
tenant_id: str = "default-tenant",
adapter_type: str = "python-sdk",
metadata: Optional[Dict[str, str]] = None
) -> Generator['Envelope', None, None]: ...
def add_middleware(
self,
func: Callable[[str, Dict[str, Any]], Optional[Tuple[str, Dict[str, Any]]]]
) -> None: ...
def shutdown(self) -> None: ...
Envelope
class Envelope:
execution_id: str
envelope_id: str
tenant_id: str
workflow_id: str
def guard(
self,
action_name: str,
input_text: str = "",
metadata: Optional[Dict[str, str]] = None
) -> bool: ...
def log(
self,
event_type: str,
payload: Dict[str, Any] = {}
) -> None: ...
Error Handling
Exception Hierarchy
from fulcrum.exceptions import (
FulcrumError, # Base exception for all Fulcrum errors
PolicyViolationError, # Action blocked by policy
BudgetExceededError, # Budget limit reached
ConnectionError, # Server unreachable
AuthenticationError, # Invalid API key
TimeoutError, # Request timed out
)
Exception Details
FulcrumError (Base)
class FulcrumError(Exception):
"""Base exception for all Fulcrum SDK errors."""
message: str
code: Optional[str]
PolicyViolationError
class PolicyViolationError(FulcrumError):
"""Raised when an action is blocked by policy."""
policy_id: str
message: str
matched_rules: List[str]
severity: str
BudgetExceededError
class BudgetExceededError(FulcrumError):
"""Raised when budget limits are exceeded."""
budget_id: str
current_spend: float
budget_limit: float
limit_type: str # "tokens", "cost", "calls"
Error Handling Patterns
Basic Error Handling
from fulcrum import FulcrumClient
from fulcrum.exceptions import (
FulcrumError,
PolicyViolationError,
BudgetExceededError,
ConnectionError,
TimeoutError,
)
client = FulcrumClient(host="localhost:50051", api_key="key")
try:
with client.envelope(workflow_id="my-agent") as env:
if env.guard("send_email", input_text="Hello world"):
send_email("Hello world")
except PolicyViolationError as e:
print(f"Policy blocked: {e.policy_id}")
print(f"Reason: {e.message}")
print(f"Matched rules: {e.matched_rules}")
except BudgetExceededError as e:
print(f"Budget exceeded: ${e.current_spend:.2f} / ${e.budget_limit:.2f}")
except ConnectionError as e:
print(f"Cannot reach Fulcrum server: {e}")
# Behavior depends on FAIL_OPEN/FAIL_CLOSED
except TimeoutError:
print("Request timed out")
except FulcrumError as e:
print(f"Fulcrum error: {e}")
Graceful Degradation
def execute_with_governance(action: str, input_text: str) -> dict:
"""Execute action with governance, gracefully handle failures."""
try:
with client.envelope(workflow_id="agent") as env:
allowed = env.guard(action, input_text=input_text)
if not allowed:
return {"status": "blocked", "reason": "policy_violation"}
result = execute_action(action, input_text)
env.log("action_completed", {"result": "success"})
return {"status": "success", "result": result}
except ConnectionError:
# Fail-open: execute without governance
if client.on_failure == "FAIL_OPEN":
result = execute_action(action, input_text)
return {"status": "success", "result": result, "governed": False}
else:
return {"status": "blocked", "reason": "governance_unavailable"}
except Exception as e:
return {"status": "error", "reason": str(e)}
Retry Logic
The SDK implements automatic retry with exponential backoff for gRPC errors:
# Retry configuration (internal)
# Attempts: 5
# Backoff: 0.1s, 2.1s, 4.1s, 8.1s, 16.1s
# Reconnection: On UNAVAILABLE or INTERNAL errors
Middleware System
Middleware functions intercept events before they are logged, enabling: - PII redaction - Event filtering - Payload enrichment - Custom transformations
Middleware Signature
def middleware_function(
event_type: str,
payload: Dict[str, Any]
) -> Optional[Tuple[str, Dict[str, Any]]]:
"""
Process an event before logging.
Parameters:
event_type: The event type string
payload: The event payload dict
Returns:
Tuple of (event_type, payload) to continue, or None to drop the event
"""
Example Middlewares
PII Redaction
import re
def redact_pii(event_type: str, payload: dict) -> tuple:
"""Redact sensitive data from event payloads."""
redacted = payload.copy()
# Redact email addresses
for key, value in redacted.items():
if isinstance(value, str):
redacted[key] = re.sub(
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'[REDACTED_EMAIL]',
value
)
return event_type, redacted
client.add_middleware(redact_pii)
Event Filtering
def filter_debug_events(event_type: str, payload: dict) -> tuple:
"""Filter out debug events in production."""
if event_type.startswith("debug_"):
return None # Drop the event
return event_type, payload
client.add_middleware(filter_debug_events)
Payload Enrichment
import socket
from datetime import datetime
def enrich_metadata(event_type: str, payload: dict) -> tuple:
"""Add standard metadata to all events."""
enriched = payload.copy()
enriched["_hostname"] = socket.gethostname()
enriched["_timestamp_utc"] = datetime.utcnow().isoformat()
enriched["_sdk_version"] = "0.1.0"
return event_type, enriched
client.add_middleware(enrich_metadata)
Chaining Multiple Middlewares
# Middlewares execute in order added
client.add_middleware(redact_pii) # First
client.add_middleware(filter_debug_events) # Second
client.add_middleware(enrich_metadata) # Third
Async Support
The SDK provides an async client for event-loop-based applications.
AsyncFulcrumClient
import asyncio
from fulcrum import AsyncFulcrumClient
async def main():
client = AsyncFulcrumClient(
host="localhost:50051",
api_key="your-api-key"
)
async with client.envelope(workflow_id="async-agent") as env:
# Async guard check
allowed = await env.guard("action", input_text="hello")
if allowed:
result = await do_async_work()
await env.log("completed", {"result": result})
await client.close()
asyncio.run(main())
Async Context Manager
async with client.envelope(workflow_id="async-workflow") as env:
# Multiple concurrent guard checks
results = await asyncio.gather(
env.guard("action1", input_text="input1"),
env.guard("action2", input_text="input2"),
env.guard("action3", input_text="input3"),
)
for i, allowed in enumerate(results):
if allowed:
await process_action(i)
Framework Integrations
LangChain Integration
Basic Agent Wrapper
from langchain.agents import AgentExecutor
from langchain.chat_models import ChatOpenAI
from fulcrum import FulcrumClient
client = FulcrumClient.from_env()
def governed_agent_run(agent: AgentExecutor, query: str) -> dict:
"""Run LangChain agent with Fulcrum governance."""
with client.envelope(workflow_id="langchain-agent") as env:
# Pre-execution policy check
if not env.guard("process_query", input_text=query):
return {"error": "Query blocked by policy"}
env.log("agent_started", {"query": query})
# Run agent with tool governance
result = agent.invoke({"input": query})
env.log("agent_completed", {
"output": result["output"][:200], # Truncate for logging
"intermediate_steps": len(result.get("intermediate_steps", []))
})
return result
# Usage
result = governed_agent_run(agent, "What's the weather in NYC?")
Tool-Level Governance
from langchain.tools import BaseTool
from fulcrum import FulcrumClient
client = FulcrumClient.from_env()
class GovernedTool(BaseTool):
"""Wrapper that adds governance to any LangChain tool."""
def __init__(self, wrapped_tool: BaseTool, envelope):
super().__init__(
name=wrapped_tool.name,
description=wrapped_tool.description
)
self.wrapped_tool = wrapped_tool
self.envelope = envelope
def _run(self, tool_input: str) -> str:
# Check tool usage policy
if not self.envelope.guard(
action_name=f"tool:{self.name}",
input_text=tool_input
):
return f"Tool '{self.name}' blocked by governance policy"
self.envelope.log("tool_executed", {
"tool": self.name,
"input_length": len(tool_input)
})
return self.wrapped_tool._run(tool_input)
# Usage in agent
with client.envelope(workflow_id="langchain-agent") as env:
governed_tools = [GovernedTool(tool, env) for tool in tools]
agent = create_agent(llm, governed_tools)
result = agent.invoke({"input": query})
LangChain Callback Handler
from langchain.callbacks.base import BaseCallbackHandler
from fulcrum import FulcrumClient
class FulcrumCallbackHandler(BaseCallbackHandler):
"""LangChain callback handler for Fulcrum governance."""
def __init__(self, envelope):
self.envelope = envelope
def on_llm_start(self, serialized, prompts, **kwargs):
self.envelope.log("llm_start", {
"model": serialized.get("name"),
"prompt_count": len(prompts)
})
def on_llm_end(self, response, **kwargs):
self.envelope.log("llm_end", {
"generations": len(response.generations)
})
def on_tool_start(self, serialized, input_str, **kwargs):
tool_name = serialized.get("name", "unknown")
# Check tool policy
if not self.envelope.guard(f"tool:{tool_name}", input_text=input_str):
raise ValueError(f"Tool {tool_name} blocked by policy")
self.envelope.log("tool_start", {"tool": tool_name})
def on_tool_end(self, output, **kwargs):
self.envelope.log("tool_end", {"output_length": len(str(output))})
# Usage
with client.envelope(workflow_id="langchain-agent") as env:
handler = FulcrumCallbackHandler(env)
agent.invoke({"input": query}, config={"callbacks": [handler]})
LlamaIndex Integration
Query Engine Wrapper
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from fulcrum import FulcrumClient
client = FulcrumClient.from_env()
def governed_query(index: VectorStoreIndex, query: str) -> str:
"""Execute LlamaIndex query with governance."""
with client.envelope(workflow_id="llamaindex-rag") as env:
# Pre-query policy check
if not env.guard("query", input_text=query):
raise ValueError("Query not permitted by policy")
env.log("rag_query_started", {"query": query})
# Execute query
query_engine = index.as_query_engine()
response = query_engine.query(query)
env.log("rag_query_completed", {
"query": query,
"response_length": len(str(response)),
"source_nodes": len(response.source_nodes) if hasattr(response, 'source_nodes') else 0
})
return str(response)
# Usage
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
result = governed_query(index, "What is the company policy on PTO?")
Retrieval Callback
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from fulcrum import FulcrumClient
class FulcrumTokenHandler(TokenCountingHandler):
"""Track token usage with Fulcrum."""
def __init__(self, envelope, tokenizer):
super().__init__(tokenizer=tokenizer)
self.envelope = envelope
def on_event_end(self, event_type, payload, **kwargs):
super().on_event_end(event_type, payload, **kwargs)
self.envelope.log("token_usage", {
"event": event_type.value,
"prompt_tokens": self.prompt_llm_token_count,
"completion_tokens": self.completion_llm_token_count,
"embedding_tokens": self.total_embedding_token_count
})
OpenAI Integration
Direct API Wrapper
import openai
from fulcrum import FulcrumClient
client = FulcrumClient.from_env()
openai_client = openai.OpenAI()
def governed_completion(
messages: list,
model: str = "gpt-4",
**kwargs
) -> openai.types.chat.ChatCompletion:
"""OpenAI completion with Fulcrum governance."""
# Extract user message for policy check
user_message = next(
(m["content"] for m in messages if m["role"] == "user"),
""
)
with client.envelope(workflow_id="openai-direct") as env:
# Check input policy
if not env.guard("llm_call", input_text=user_message):
raise ValueError("Request blocked by governance policy")
env.log("openai_request", {
"model": model,
"message_count": len(messages)
})
# Make API call
response = openai_client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
# Log usage
env.log("openai_response", {
"model": model,
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
})
return response
# Usage
response = governed_completion(
messages=[{"role": "user", "content": "Explain quantum computing"}],
model="gpt-4",
temperature=0.7
)
Function Calling Governance
def governed_function_call(messages: list, tools: list) -> dict:
"""OpenAI function calling with governance on each function."""
with client.envelope(workflow_id="openai-functions") as env:
response = openai_client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
)
results = []
# Check each function call against policies
for tool_call in response.choices[0].message.tool_calls or []:
func_name = tool_call.function.name
func_args = tool_call.function.arguments
# Policy check for function
if not env.guard(f"function:{func_name}", input_text=func_args):
env.log("function_blocked", {"function": func_name})
results.append({
"tool_call_id": tool_call.id,
"error": f"Function {func_name} blocked by policy"
})
continue
# Execute approved function
result = execute_function(func_name, func_args)
env.log("function_executed", {
"function": func_name,
"success": True,
})
results.append({
"tool_call_id": tool_call.id,
"result": result
})
return {"response": response, "function_results": results}
FastAPI Middleware
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fulcrum import FulcrumClient
from typing import Callable
import uuid
app = FastAPI()
fulcrum_client = FulcrumClient.from_env()
class FulcrumMiddleware:
"""FastAPI middleware for Fulcrum governance."""
def __init__(self, app: FastAPI, client: FulcrumClient):
self.app = app
self.client = client
async def __call__(self, request: Request, call_next: Callable):
# Generate request ID
request_id = str(uuid.uuid4())
# Create envelope for request
with self.client.envelope(
workflow_id=f"api:{request.url.path}",
execution_id=request_id,
metadata={
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host
}
) as env:
# Check route policy
if not env.guard(
action_name=f"api:{request.method}:{request.url.path}",
input_text=""
):
env.log("request_blocked", {"reason": "policy_violation"})
return JSONResponse(
status_code=403,
content={"error": "Request blocked by governance policy"}
)
env.log("request_started", {
"method": request.method,
"path": request.url.path
})
# Process request
response = await call_next(request)
env.log("request_completed", {
"status_code": response.status_code
})
return response
# Add middleware
app.add_middleware(FulcrumMiddleware, client=fulcrum_client)
# Endpoint-specific governance
@app.post("/api/agent/execute")
async def execute_agent(request: Request):
"""Endpoint with additional governance."""
body = await request.json()
with fulcrum_client.envelope(workflow_id="agent-api") as env:
if not env.guard("agent_execute", input_text=body.get("query", "")):
raise HTTPException(status_code=403, detail="Agent execution blocked")
result = await run_agent(body["query"])
env.log("agent_executed", {"query_length": len(body["query"])})
return {"result": result}
Anthropic Claude Integration
import anthropic
from fulcrum import FulcrumClient
client = FulcrumClient.from_env()
anthropic_client = anthropic.Anthropic()
def governed_claude_completion(
messages: list,
model: str = "claude-3-opus-20240229",
max_tokens: int = 1024,
**kwargs
) -> anthropic.types.Message:
"""Claude completion with Fulcrum governance."""
# Extract user message for policy check
user_message = next(
(m["content"] for m in messages if m["role"] == "user"),
""
)
with client.envelope(workflow_id="anthropic-claude") as env:
# Check input policy
if not env.guard("llm_call", input_text=user_message):
raise ValueError("Request blocked by governance policy")
env.log("claude_request", {
"model": model,
"max_tokens": max_tokens
})
# Make API call
response = anthropic_client.messages.create(
model=model,
max_tokens=max_tokens,
messages=messages,
**kwargs
)
# Log usage
env.log("claude_response", {
"model": model,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"stop_reason": response.stop_reason
})
return response
# Usage
response = governed_claude_completion(
messages=[{"role": "user", "content": "Explain machine learning"}],
model="claude-3-opus-20240229",
max_tokens=2048
)
Claude Tool Use Governance
def governed_claude_tools(messages: list, tools: list) -> dict:
"""Claude tool use with governance on each tool call."""
with client.envelope(workflow_id="claude-tools") as env:
response = anthropic_client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4096,
messages=messages,
tools=tools,
)
results = []
# Process tool use blocks
for block in response.content:
if block.type == "tool_use":
tool_name = block.name
tool_input = str(block.input)
# Policy check for tool
if not env.guard(f"tool:{tool_name}", input_text=tool_input):
env.log("tool_blocked", {"tool": tool_name})
results.append({
"tool_use_id": block.id,
"error": f"Tool {tool_name} blocked by policy"
})
continue
# Execute approved tool
result = execute_tool(tool_name, block.input)
env.log("tool_executed", {"tool": tool_name})
results.append({
"tool_use_id": block.id,
"result": result
})
return {"response": response, "tool_results": results}
Cost Tracking
Accessing Cost Information
with client.envelope(workflow_id="cost-tracking-example") as env:
# Execute operations
result = process_query(query)
# Log cost information
env.log("execution_cost", {
"input_tokens": 150,
"output_tokens": 300,
"model": "gpt-4",
"cost_usd": 0.0045
})
Integration with LLM Cost Tracking
def track_openai_costs(response, envelope):
"""Track OpenAI API costs in Fulcrum."""
# Model pricing (example rates)
PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
}
model = response.model
usage = response.usage
rates = PRICING.get(model, PRICING["gpt-3.5-turbo"])
cost = (
(usage.prompt_tokens / 1000) * rates["input"] +
(usage.completion_tokens / 1000) * rates["output"]
)
envelope.log("llm_cost", {
"model": model,
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
"cost_usd": round(cost, 6)
})
return cost
Best Practices
1. Initialize Client Once
# GOOD: Single client instance
client = FulcrumClient.from_env()
def process_request(query):
with client.envelope(workflow_id="my-agent") as env:
# Use shared client
pass
# BAD: Creating new clients per request
def process_request_bad(query):
client = FulcrumClient.from_env() # Creates new connection each time
with client.envelope(workflow_id="my-agent") as env:
pass
2. Use Meaningful Workflow IDs
# GOOD: Descriptive workflow IDs
with client.envelope(workflow_id="customer-support-email-agent") as env:
pass
with client.envelope(workflow_id="code-review-assistant") as env:
pass
# BAD: Generic workflow IDs
with client.envelope(workflow_id="agent") as env:
pass
3. Guard Before Sensitive Operations
with client.envelope(workflow_id="data-agent") as env:
# GOOD: Guard before each sensitive operation
if env.guard("read_database", input_text=query):
data = database.query(query)
if env.guard("send_email", input_text=str(data)):
send_email(data)
# BAD: Single guard for multiple operations
if env.guard("process", input_text=query):
data = database.query(query) # Not individually governed
send_email(data) # Not individually governed
4. Log Meaningful Events
with client.envelope(workflow_id="my-agent") as env:
# GOOD: Structured, meaningful events
env.log("document_processed", {
"document_id": doc.id,
"page_count": len(doc.pages),
"processing_time_ms": 1234,
"status": "success"
})
# BAD: Unstructured or minimal events
env.log("done", {"ok": True})
5. Handle Failures Gracefully
def safe_governed_action(action: str, input_text: str) -> dict:
"""Execute with governance, handle all failure modes."""
try:
with client.envelope(workflow_id="safe-agent") as env:
if not env.guard(action, input_text=input_text):
return {
"status": "blocked",
"reason": "policy_violation",
"action": action
}
result = execute_action(action, input_text)
return {"status": "success", "result": result}
except PolicyViolationError as e:
return {
"status": "blocked",
"reason": "policy_violation",
"policy_id": e.policy_id,
"message": e.message
}
except BudgetExceededError as e:
return {
"status": "blocked",
"reason": "budget_exceeded",
"current": e.current_spend,
"limit": e.budget_limit
}
except Exception as e:
# Log unexpected errors
logging.error(f"Governance error: {e}")
if client.on_failure == "FAIL_OPEN":
# Allow execution without governance
result = execute_action(action, input_text)
return {"status": "success", "result": result, "governed": False}
else:
return {"status": "error", "reason": str(e)}
6. Use Middleware for Cross-Cutting Concerns
# Add once, applies to all events
client.add_middleware(redact_pii)
client.add_middleware(add_correlation_ids)
client.add_middleware(filter_verbose_events)
7. Configure Appropriate Timeouts
# High-throughput, latency-sensitive applications
client = FulcrumClient(
host="localhost:50051",
timeout_ms=100, # Fast timeout
on_failure="FAIL_OPEN" # Don't block on timeout
)
# Security-critical applications
client = FulcrumClient(
host="localhost:50051",
timeout_ms=5000, # Allow retries
on_failure="FAIL_CLOSED" # Block on timeout
)
Troubleshooting
Connection Issues
Error: "failed to connect to all addresses"
# Check 1: Verify server is running
# Run: nc -zv localhost 50051
# Check 2: Verify host format
client = FulcrumClient(host="localhost:50051") # Correct
client = FulcrumClient(host="http://localhost:50051") # Wrong: no protocol
# Check 3: Check firewall/network
Error: "Deadline Exceeded"
# Increase timeout
client = FulcrumClient(
host="localhost:50051",
timeout_ms=2000 # Increase from default 500ms
)
Authentication Issues
Error: "UNAUTHENTICATED"
# Check API key is correct
client = FulcrumClient(
host="localhost:50051",
api_key="your-actual-api-key" # Verify this is correct
)
# Verify environment variable
import os
print(os.environ.get("FULCRUM_API_KEY"))
Policy Issues
Actions Always Blocked
# 1. Check if using FAIL_CLOSED with unavailable server
client = FulcrumClient(
host="localhost:50051",
on_failure="FAIL_OPEN" # Try FAIL_OPEN for testing
)
# 2. Enable debug logging
import logging
logging.getLogger("fulcrum").setLevel(logging.DEBUG)
# 3. Check policy evaluation result
with client.envelope(workflow_id="debug") as env:
result = env.guard("test_action", input_text="test")
print(f"Guard result: {result}")
Actions Always Allowed
# Check if FAIL_OPEN is masking errors
import logging
logging.getLogger("fulcrum").setLevel(logging.DEBUG)
# Temporarily use FAIL_CLOSED to see real results
client = FulcrumClient(
host="localhost:50051",
on_failure="FAIL_CLOSED"
)
Event Logging Issues
Events Not Appearing
# 1. Ensure proper shutdown
client.shutdown() # Flushes event queue
# 2. Check for middleware dropping events
# Your middleware might return None
# 3. Check payload serialization
env.log("test", {"valid": "json"}) # Works
env.log("test", {"invalid": object()}) # May fail silently
Debug Logging
import logging
# Enable detailed logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("fulcrum")
logger.setLevel(logging.DEBUG)
# This will show:
# - Connection attempts
# - Policy evaluation requests/responses
# - Event queue operations
# - Retry attempts
Health Check
def check_fulcrum_health(client: FulcrumClient) -> dict:
"""Check Fulcrum connectivity and basic operation."""
health = {
"connected": False,
"policy_service": False,
"event_service": False,
"errors": []
}
try:
# Test envelope creation
with client.envelope(workflow_id="health-check") as env:
health["connected"] = True
# Test policy evaluation
try:
env.guard("health_check", input_text="test")
health["policy_service"] = True
except Exception as e:
health["errors"].append(f"Policy: {e}")
# Test event logging
try:
env.log("health_check", {"test": True})
health["event_service"] = True
except Exception as e:
health["errors"].append(f"Events: {e}")
except Exception as e:
health["errors"].append(f"Connection: {e}")
return health
# Usage
status = check_fulcrum_health(client)
print(f"Fulcrum Health: {status}")
Version History
| Version | Date | Changes |
|---|---|---|
| 0.1.0 | 2026-01-06 | Initial release |
Support
- Documentation: https://docs.fulcrum.dev
- Email: support@fulcrum.dev
- GitHub Issues: https://github.com/fulcrum-io/fulcrum/issues
Document Version: 1.0 Last Updated: January 6, 2026 SDK Version: 0.1.0