Skip to content

Fulcrum Python SDK Reference

Comprehensive Python SDK documentation for integrating Fulcrum AI Governance into your applications.

Package: fulcrum-governance Version: 0.1.0 Python: 3.9+ License: Apache-2.0


Table of Contents

  1. Installation
  2. Configuration
  3. Core Classes
  4. FulcrumClient
  5. Envelope
  6. API Reference
  7. Error Handling
  8. Middleware System
  9. Async Support
  10. Framework Integrations
  11. LangChain
  12. LlamaIndex
  13. OpenAI
  14. FastAPI
  15. Anthropic Claude
  16. Cost Tracking
  17. Best Practices
  18. Troubleshooting

Installation

pip install fulcrum-governance

Poetry

poetry add fulcrum-governance

requirements.txt

fulcrum-governance>=0.1.0

From Source

git clone https://github.com/fulcrum-io/fulcrum.git
cd fulcrum/sdk/python
pip install -e .

Dependencies

The SDK automatically installs:

Package Version Purpose
grpcio >=1.60.0 gRPC communication
protobuf >=4.25.0 Protocol buffer messages
googleapis-common-protos >=1.60.0 Google API protos

Development Dependencies

For SDK development:

pip install fulcrum-governance[dev]

Installs: grpcio-tools, pytest, pytest-asyncio, black, mypy, ruff


Configuration

Programmatic Configuration

from fulcrum import FulcrumClient

client = FulcrumClient(
    host="localhost:50051",       # Fulcrum server address
    api_key="your-api-key",       # API key for authentication
    on_failure="FAIL_OPEN",       # FAIL_OPEN or FAIL_CLOSED
    timeout_ms=500,               # Request timeout in milliseconds
)

Environment Variables

export FULCRUM_HOST="localhost:50051"
export FULCRUM_API_KEY="your-api-key"
export FULCRUM_TENANT_ID="your-tenant-id"
export FULCRUM_TIMEOUT_MS="500"
from fulcrum import FulcrumClient

# Auto-configure from environment
client = FulcrumClient.from_env()

Configuration Options

Parameter Type Default Description
host str "localhost:50051" Fulcrum server address
api_key Optional[str] None API key for authentication
on_failure str "FAIL_OPEN" Behavior when governance unavailable
timeout_ms int 500 Request timeout in milliseconds

TLS Configuration

TLS is automatically enabled for hosts using port 443:

# TLS auto-enabled for production
client = FulcrumClient(
    host="api.fulcrum.dev:443",
    api_key="your-api-key"
)

# Explicit insecure connection for local development
client = FulcrumClient(
    host="localhost:50051",  # Non-443 port uses insecure channel
    api_key="dev-key"
)

Failure Modes

Mode Behavior Use Case
FAIL_OPEN Allow actions when Fulcrum is unavailable Production with availability priority
FAIL_CLOSED Block actions when Fulcrum is unavailable High-security environments
# High-availability mode (default)
client = FulcrumClient(
    host="localhost:50051",
    on_failure="FAIL_OPEN"  # Allow execution if governance fails
)

# High-security mode
client = FulcrumClient(
    host="localhost:50051",
    on_failure="FAIL_CLOSED"  # Block execution if governance fails
)

Core Classes

FulcrumClient

The main entry point for interacting with Fulcrum governance.

Constructor

def __init__(
    self,
    host: str = "localhost:50051",
    api_key: Optional[str] = None,
    on_failure: str = "FAIL_OPEN",
    timeout_ms: int = 500
)

Parameters: - host: Fulcrum server address in host:port format - api_key: API key for authentication (sent as x-api-key header) - on_failure: "FAIL_OPEN" or "FAIL_CLOSED" behavior - timeout_ms: Request timeout for policy evaluations

Class Methods

from_env()

Create a client from environment variables.

@classmethod
def from_env(cls) -> 'FulcrumClient':
    """Create client from environment variables."""
    return cls(
        host=os.environ.get("FULCRUM_HOST", "localhost:50051"),
        api_key=os.environ.get("FULCRUM_API_KEY"),
        timeout_ms=int(os.environ.get("FULCRUM_TIMEOUT_MS", "500"))
    )

Instance Methods

envelope()

Create a governance envelope context manager.

@contextmanager
def envelope(
    self,
    workflow_id: str = "default-workflow",
    execution_id: Optional[str] = None,
    tenant_id: str = "default-tenant",
    adapter_type: str = "python-sdk",
    metadata: Optional[Dict[str, str]] = None
) -> Generator['Envelope', None, None]:
    """
    Create a governance envelope for tracking agent execution.

    Parameters:
        workflow_id: Identifier for the workflow type
        execution_id: Unique execution ID (auto-generated if not provided)
        tenant_id: Tenant identifier for multi-tenancy
        adapter_type: SDK/adapter identifier
        metadata: Additional key-value metadata

    Yields:
        Envelope: Context manager for governed execution
    """

Example:

with client.envelope(
    workflow_id="customer-support",
    tenant_id="acme-corp",
    metadata={"user_id": "user123"}
) as env:
    # Governed execution
    if env.guard("send_email", input_text=user_message):
        result = send_email(user_message)
        env.log("email_sent", {"recipient": email})
add_middleware()

Add a middleware function for event processing.

def add_middleware(self, func: Callable) -> None:
    """
    Add middleware for event processing.

    Parameters:
        func: Callable with signature (event_type, payload) -> (event_type, payload) or None
              Return None to drop the event.
    """

Example:

def redact_pii(event_type: str, payload: dict) -> tuple:
    """Redact PII before logging."""
    if "email" in payload:
        payload["email"] = "***@***.***"
    return event_type, payload

client.add_middleware(redact_pii)
shutdown()

Gracefully shutdown the client.

def shutdown(self) -> None:
    """
    Cleanly shuts down the worker thread and flushes the event queue.
    Called automatically via atexit, but can be called manually.
    """

Envelope

Context wrapper for governed agent executions. Created by FulcrumClient.envelope().

Properties

Property Type Description
execution_id str Unique execution identifier
envelope_id str Envelope ID (from server or generated)
tenant_id str Tenant identifier
workflow_id str Workflow type identifier

Methods

guard()

Synchronous policy evaluation barrier.

def guard(
    self,
    action_name: str,
    input_text: str = "",
    metadata: Optional[Dict[str, str]] = None
) -> bool:
    """
    Evaluate policies before executing an action.

    Parameters:
        action_name: Name of the action to evaluate
        input_text: Input content for semantic analysis
        metadata: Additional context attributes

    Returns:
        bool: True if action is allowed, False if denied
    """

Example:

with client.envelope(workflow_id="chatbot") as env:
    # Check before executing sensitive action
    if env.guard("database_query", input_text=user_query):
        result = execute_query(user_query)
    else:
        result = "This query is not permitted by policy."
log()

Log an event for audit trail.

def log(
    self,
    event_type: str,
    payload: Dict[str, Any] = {}
) -> None:
    """
    Log an event asynchronously to Fulcrum.

    Parameters:
        event_type: Type/category of the event
        payload: Event data (must be JSON-serializable)
    """

Example:

env.log("llm_call_started", {
    "model": "gpt-4",
    "prompt_tokens": 150,
    "temperature": 0.7
})

# After execution
env.log("llm_call_completed", {
    "model": "gpt-4",
    "completion_tokens": 250,
    "latency_ms": 1234
})

API Reference

Complete Method Signatures

FulcrumClient

class FulcrumClient:
    def __init__(
        self,
        host: str = "localhost:50051",
        api_key: Optional[str] = None,
        on_failure: str = "FAIL_OPEN",
        timeout_ms: int = 500
    ) -> None: ...

    @classmethod
    def from_env(cls) -> 'FulcrumClient': ...

    def envelope(
        self,
        workflow_id: str = "default-workflow",
        execution_id: Optional[str] = None,
        tenant_id: str = "default-tenant",
        adapter_type: str = "python-sdk",
        metadata: Optional[Dict[str, str]] = None
    ) -> Generator['Envelope', None, None]: ...

    def add_middleware(
        self,
        func: Callable[[str, Dict[str, Any]], Optional[Tuple[str, Dict[str, Any]]]]
    ) -> None: ...

    def shutdown(self) -> None: ...

Envelope

class Envelope:
    execution_id: str
    envelope_id: str
    tenant_id: str
    workflow_id: str

    def guard(
        self,
        action_name: str,
        input_text: str = "",
        metadata: Optional[Dict[str, str]] = None
    ) -> bool: ...

    def log(
        self,
        event_type: str,
        payload: Dict[str, Any] = {}
    ) -> None: ...

Error Handling

Exception Hierarchy

from fulcrum.exceptions import (
    FulcrumError,           # Base exception for all Fulcrum errors
    PolicyViolationError,   # Action blocked by policy
    BudgetExceededError,    # Budget limit reached
    ConnectionError,        # Server unreachable
    AuthenticationError,    # Invalid API key
    TimeoutError,           # Request timed out
)

Exception Details

FulcrumError (Base)

class FulcrumError(Exception):
    """Base exception for all Fulcrum SDK errors."""
    message: str
    code: Optional[str]

PolicyViolationError

class PolicyViolationError(FulcrumError):
    """Raised when an action is blocked by policy."""
    policy_id: str
    message: str
    matched_rules: List[str]
    severity: str

BudgetExceededError

class BudgetExceededError(FulcrumError):
    """Raised when budget limits are exceeded."""
    budget_id: str
    current_spend: float
    budget_limit: float
    limit_type: str  # "tokens", "cost", "calls"

Error Handling Patterns

Basic Error Handling

from fulcrum import FulcrumClient
from fulcrum.exceptions import (
    FulcrumError,
    PolicyViolationError,
    BudgetExceededError,
    ConnectionError,
    TimeoutError,
)

client = FulcrumClient(host="localhost:50051", api_key="key")

try:
    with client.envelope(workflow_id="my-agent") as env:
        if env.guard("send_email", input_text="Hello world"):
            send_email("Hello world")
except PolicyViolationError as e:
    print(f"Policy blocked: {e.policy_id}")
    print(f"Reason: {e.message}")
    print(f"Matched rules: {e.matched_rules}")
except BudgetExceededError as e:
    print(f"Budget exceeded: ${e.current_spend:.2f} / ${e.budget_limit:.2f}")
except ConnectionError as e:
    print(f"Cannot reach Fulcrum server: {e}")
    # Behavior depends on FAIL_OPEN/FAIL_CLOSED
except TimeoutError:
    print("Request timed out")
except FulcrumError as e:
    print(f"Fulcrum error: {e}")

Graceful Degradation

def execute_with_governance(action: str, input_text: str) -> dict:
    """Execute action with governance, gracefully handle failures."""
    try:
        with client.envelope(workflow_id="agent") as env:
            allowed = env.guard(action, input_text=input_text)
            if not allowed:
                return {"status": "blocked", "reason": "policy_violation"}

            result = execute_action(action, input_text)
            env.log("action_completed", {"result": "success"})
            return {"status": "success", "result": result}

    except ConnectionError:
        # Fail-open: execute without governance
        if client.on_failure == "FAIL_OPEN":
            result = execute_action(action, input_text)
            return {"status": "success", "result": result, "governed": False}
        else:
            return {"status": "blocked", "reason": "governance_unavailable"}
    except Exception as e:
        return {"status": "error", "reason": str(e)}

Retry Logic

The SDK implements automatic retry with exponential backoff for gRPC errors:

# Retry configuration (internal)
# Attempts: 5
# Backoff: 0.1s, 2.1s, 4.1s, 8.1s, 16.1s
# Reconnection: On UNAVAILABLE or INTERNAL errors

Middleware System

Middleware functions intercept events before they are logged, enabling: - PII redaction - Event filtering - Payload enrichment - Custom transformations

Middleware Signature

def middleware_function(
    event_type: str,
    payload: Dict[str, Any]
) -> Optional[Tuple[str, Dict[str, Any]]]:
    """
    Process an event before logging.

    Parameters:
        event_type: The event type string
        payload: The event payload dict

    Returns:
        Tuple of (event_type, payload) to continue, or None to drop the event
    """

Example Middlewares

PII Redaction

import re

def redact_pii(event_type: str, payload: dict) -> tuple:
    """Redact sensitive data from event payloads."""
    redacted = payload.copy()

    # Redact email addresses
    for key, value in redacted.items():
        if isinstance(value, str):
            redacted[key] = re.sub(
                r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
                '[REDACTED_EMAIL]',
                value
            )

    return event_type, redacted

client.add_middleware(redact_pii)

Event Filtering

def filter_debug_events(event_type: str, payload: dict) -> tuple:
    """Filter out debug events in production."""
    if event_type.startswith("debug_"):
        return None  # Drop the event
    return event_type, payload

client.add_middleware(filter_debug_events)

Payload Enrichment

import socket
from datetime import datetime

def enrich_metadata(event_type: str, payload: dict) -> tuple:
    """Add standard metadata to all events."""
    enriched = payload.copy()
    enriched["_hostname"] = socket.gethostname()
    enriched["_timestamp_utc"] = datetime.utcnow().isoformat()
    enriched["_sdk_version"] = "0.1.0"
    return event_type, enriched

client.add_middleware(enrich_metadata)

Chaining Multiple Middlewares

# Middlewares execute in order added
client.add_middleware(redact_pii)        # First
client.add_middleware(filter_debug_events)  # Second
client.add_middleware(enrich_metadata)    # Third

Async Support

The SDK provides an async client for event-loop-based applications.

AsyncFulcrumClient

import asyncio
from fulcrum import AsyncFulcrumClient

async def main():
    client = AsyncFulcrumClient(
        host="localhost:50051",
        api_key="your-api-key"
    )

    async with client.envelope(workflow_id="async-agent") as env:
        # Async guard check
        allowed = await env.guard("action", input_text="hello")

        if allowed:
            result = await do_async_work()
            await env.log("completed", {"result": result})

    await client.close()

asyncio.run(main())

Async Context Manager

async with client.envelope(workflow_id="async-workflow") as env:
    # Multiple concurrent guard checks
    results = await asyncio.gather(
        env.guard("action1", input_text="input1"),
        env.guard("action2", input_text="input2"),
        env.guard("action3", input_text="input3"),
    )

    for i, allowed in enumerate(results):
        if allowed:
            await process_action(i)

Framework Integrations

LangChain Integration

Basic Agent Wrapper

from langchain.agents import AgentExecutor
from langchain.chat_models import ChatOpenAI
from fulcrum import FulcrumClient

client = FulcrumClient.from_env()

def governed_agent_run(agent: AgentExecutor, query: str) -> dict:
    """Run LangChain agent with Fulcrum governance."""

    with client.envelope(workflow_id="langchain-agent") as env:
        # Pre-execution policy check
        if not env.guard("process_query", input_text=query):
            return {"error": "Query blocked by policy"}

        env.log("agent_started", {"query": query})

        # Run agent with tool governance
        result = agent.invoke({"input": query})

        env.log("agent_completed", {
            "output": result["output"][:200],  # Truncate for logging
            "intermediate_steps": len(result.get("intermediate_steps", []))
        })

        return result

# Usage
result = governed_agent_run(agent, "What's the weather in NYC?")

Tool-Level Governance

from langchain.tools import BaseTool
from fulcrum import FulcrumClient

client = FulcrumClient.from_env()

class GovernedTool(BaseTool):
    """Wrapper that adds governance to any LangChain tool."""

    def __init__(self, wrapped_tool: BaseTool, envelope):
        super().__init__(
            name=wrapped_tool.name,
            description=wrapped_tool.description
        )
        self.wrapped_tool = wrapped_tool
        self.envelope = envelope

    def _run(self, tool_input: str) -> str:
        # Check tool usage policy
        if not self.envelope.guard(
            action_name=f"tool:{self.name}",
            input_text=tool_input
        ):
            return f"Tool '{self.name}' blocked by governance policy"

        self.envelope.log("tool_executed", {
            "tool": self.name,
            "input_length": len(tool_input)
        })

        return self.wrapped_tool._run(tool_input)


# Usage in agent
with client.envelope(workflow_id="langchain-agent") as env:
    governed_tools = [GovernedTool(tool, env) for tool in tools]
    agent = create_agent(llm, governed_tools)
    result = agent.invoke({"input": query})

LangChain Callback Handler

from langchain.callbacks.base import BaseCallbackHandler
from fulcrum import FulcrumClient

class FulcrumCallbackHandler(BaseCallbackHandler):
    """LangChain callback handler for Fulcrum governance."""

    def __init__(self, envelope):
        self.envelope = envelope

    def on_llm_start(self, serialized, prompts, **kwargs):
        self.envelope.log("llm_start", {
            "model": serialized.get("name"),
            "prompt_count": len(prompts)
        })

    def on_llm_end(self, response, **kwargs):
        self.envelope.log("llm_end", {
            "generations": len(response.generations)
        })

    def on_tool_start(self, serialized, input_str, **kwargs):
        tool_name = serialized.get("name", "unknown")

        # Check tool policy
        if not self.envelope.guard(f"tool:{tool_name}", input_text=input_str):
            raise ValueError(f"Tool {tool_name} blocked by policy")

        self.envelope.log("tool_start", {"tool": tool_name})

    def on_tool_end(self, output, **kwargs):
        self.envelope.log("tool_end", {"output_length": len(str(output))})


# Usage
with client.envelope(workflow_id="langchain-agent") as env:
    handler = FulcrumCallbackHandler(env)
    agent.invoke({"input": query}, config={"callbacks": [handler]})

LlamaIndex Integration

Query Engine Wrapper

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from fulcrum import FulcrumClient

client = FulcrumClient.from_env()

def governed_query(index: VectorStoreIndex, query: str) -> str:
    """Execute LlamaIndex query with governance."""

    with client.envelope(workflow_id="llamaindex-rag") as env:
        # Pre-query policy check
        if not env.guard("query", input_text=query):
            raise ValueError("Query not permitted by policy")

        env.log("rag_query_started", {"query": query})

        # Execute query
        query_engine = index.as_query_engine()
        response = query_engine.query(query)

        env.log("rag_query_completed", {
            "query": query,
            "response_length": len(str(response)),
            "source_nodes": len(response.source_nodes) if hasattr(response, 'source_nodes') else 0
        })

        return str(response)

# Usage
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
result = governed_query(index, "What is the company policy on PTO?")

Retrieval Callback

from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from fulcrum import FulcrumClient

class FulcrumTokenHandler(TokenCountingHandler):
    """Track token usage with Fulcrum."""

    def __init__(self, envelope, tokenizer):
        super().__init__(tokenizer=tokenizer)
        self.envelope = envelope

    def on_event_end(self, event_type, payload, **kwargs):
        super().on_event_end(event_type, payload, **kwargs)

        self.envelope.log("token_usage", {
            "event": event_type.value,
            "prompt_tokens": self.prompt_llm_token_count,
            "completion_tokens": self.completion_llm_token_count,
            "embedding_tokens": self.total_embedding_token_count
        })

OpenAI Integration

Direct API Wrapper

import openai
from fulcrum import FulcrumClient

client = FulcrumClient.from_env()
openai_client = openai.OpenAI()

def governed_completion(
    messages: list,
    model: str = "gpt-4",
    **kwargs
) -> openai.types.chat.ChatCompletion:
    """OpenAI completion with Fulcrum governance."""

    # Extract user message for policy check
    user_message = next(
        (m["content"] for m in messages if m["role"] == "user"),
        ""
    )

    with client.envelope(workflow_id="openai-direct") as env:
        # Check input policy
        if not env.guard("llm_call", input_text=user_message):
            raise ValueError("Request blocked by governance policy")

        env.log("openai_request", {
            "model": model,
            "message_count": len(messages)
        })

        # Make API call
        response = openai_client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )

        # Log usage
        env.log("openai_response", {
            "model": model,
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens
        })

        return response

# Usage
response = governed_completion(
    messages=[{"role": "user", "content": "Explain quantum computing"}],
    model="gpt-4",
    temperature=0.7
)

Function Calling Governance

def governed_function_call(messages: list, tools: list) -> dict:
    """OpenAI function calling with governance on each function."""

    with client.envelope(workflow_id="openai-functions") as env:
        response = openai_client.chat.completions.create(
            model="gpt-4",
            messages=messages,
            tools=tools,
        )

        results = []

        # Check each function call against policies
        for tool_call in response.choices[0].message.tool_calls or []:
            func_name = tool_call.function.name
            func_args = tool_call.function.arguments

            # Policy check for function
            if not env.guard(f"function:{func_name}", input_text=func_args):
                env.log("function_blocked", {"function": func_name})
                results.append({
                    "tool_call_id": tool_call.id,
                    "error": f"Function {func_name} blocked by policy"
                })
                continue

            # Execute approved function
            result = execute_function(func_name, func_args)
            env.log("function_executed", {
                "function": func_name,
                "success": True,
            })
            results.append({
                "tool_call_id": tool_call.id,
                "result": result
            })

        return {"response": response, "function_results": results}

FastAPI Middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fulcrum import FulcrumClient
from typing import Callable
import uuid

app = FastAPI()
fulcrum_client = FulcrumClient.from_env()

class FulcrumMiddleware:
    """FastAPI middleware for Fulcrum governance."""

    def __init__(self, app: FastAPI, client: FulcrumClient):
        self.app = app
        self.client = client

    async def __call__(self, request: Request, call_next: Callable):
        # Generate request ID
        request_id = str(uuid.uuid4())

        # Create envelope for request
        with self.client.envelope(
            workflow_id=f"api:{request.url.path}",
            execution_id=request_id,
            metadata={
                "method": request.method,
                "path": request.url.path,
                "client_ip": request.client.host
            }
        ) as env:
            # Check route policy
            if not env.guard(
                action_name=f"api:{request.method}:{request.url.path}",
                input_text=""
            ):
                env.log("request_blocked", {"reason": "policy_violation"})
                return JSONResponse(
                    status_code=403,
                    content={"error": "Request blocked by governance policy"}
                )

            env.log("request_started", {
                "method": request.method,
                "path": request.url.path
            })

            # Process request
            response = await call_next(request)

            env.log("request_completed", {
                "status_code": response.status_code
            })

            return response

# Add middleware
app.add_middleware(FulcrumMiddleware, client=fulcrum_client)

# Endpoint-specific governance
@app.post("/api/agent/execute")
async def execute_agent(request: Request):
    """Endpoint with additional governance."""
    body = await request.json()

    with fulcrum_client.envelope(workflow_id="agent-api") as env:
        if not env.guard("agent_execute", input_text=body.get("query", "")):
            raise HTTPException(status_code=403, detail="Agent execution blocked")

        result = await run_agent(body["query"])
        env.log("agent_executed", {"query_length": len(body["query"])})

        return {"result": result}

Anthropic Claude Integration

import anthropic
from fulcrum import FulcrumClient

client = FulcrumClient.from_env()
anthropic_client = anthropic.Anthropic()

def governed_claude_completion(
    messages: list,
    model: str = "claude-3-opus-20240229",
    max_tokens: int = 1024,
    **kwargs
) -> anthropic.types.Message:
    """Claude completion with Fulcrum governance."""

    # Extract user message for policy check
    user_message = next(
        (m["content"] for m in messages if m["role"] == "user"),
        ""
    )

    with client.envelope(workflow_id="anthropic-claude") as env:
        # Check input policy
        if not env.guard("llm_call", input_text=user_message):
            raise ValueError("Request blocked by governance policy")

        env.log("claude_request", {
            "model": model,
            "max_tokens": max_tokens
        })

        # Make API call
        response = anthropic_client.messages.create(
            model=model,
            max_tokens=max_tokens,
            messages=messages,
            **kwargs
        )

        # Log usage
        env.log("claude_response", {
            "model": model,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
            "stop_reason": response.stop_reason
        })

        return response

# Usage
response = governed_claude_completion(
    messages=[{"role": "user", "content": "Explain machine learning"}],
    model="claude-3-opus-20240229",
    max_tokens=2048
)

Claude Tool Use Governance

def governed_claude_tools(messages: list, tools: list) -> dict:
    """Claude tool use with governance on each tool call."""

    with client.envelope(workflow_id="claude-tools") as env:
        response = anthropic_client.messages.create(
            model="claude-3-opus-20240229",
            max_tokens=4096,
            messages=messages,
            tools=tools,
        )

        results = []

        # Process tool use blocks
        for block in response.content:
            if block.type == "tool_use":
                tool_name = block.name
                tool_input = str(block.input)

                # Policy check for tool
                if not env.guard(f"tool:{tool_name}", input_text=tool_input):
                    env.log("tool_blocked", {"tool": tool_name})
                    results.append({
                        "tool_use_id": block.id,
                        "error": f"Tool {tool_name} blocked by policy"
                    })
                    continue

                # Execute approved tool
                result = execute_tool(tool_name, block.input)
                env.log("tool_executed", {"tool": tool_name})
                results.append({
                    "tool_use_id": block.id,
                    "result": result
                })

        return {"response": response, "tool_results": results}

Cost Tracking

Accessing Cost Information

with client.envelope(workflow_id="cost-tracking-example") as env:
    # Execute operations
    result = process_query(query)

    # Log cost information
    env.log("execution_cost", {
        "input_tokens": 150,
        "output_tokens": 300,
        "model": "gpt-4",
        "cost_usd": 0.0045
    })

Integration with LLM Cost Tracking

def track_openai_costs(response, envelope):
    """Track OpenAI API costs in Fulcrum."""

    # Model pricing (example rates)
    PRICING = {
        "gpt-4": {"input": 0.03, "output": 0.06},
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},
        "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
    }

    model = response.model
    usage = response.usage

    rates = PRICING.get(model, PRICING["gpt-3.5-turbo"])
    cost = (
        (usage.prompt_tokens / 1000) * rates["input"] +
        (usage.completion_tokens / 1000) * rates["output"]
    )

    envelope.log("llm_cost", {
        "model": model,
        "input_tokens": usage.prompt_tokens,
        "output_tokens": usage.completion_tokens,
        "total_tokens": usage.total_tokens,
        "cost_usd": round(cost, 6)
    })

    return cost

Best Practices

1. Initialize Client Once

# GOOD: Single client instance
client = FulcrumClient.from_env()

def process_request(query):
    with client.envelope(workflow_id="my-agent") as env:
        # Use shared client
        pass

# BAD: Creating new clients per request
def process_request_bad(query):
    client = FulcrumClient.from_env()  # Creates new connection each time
    with client.envelope(workflow_id="my-agent") as env:
        pass

2. Use Meaningful Workflow IDs

# GOOD: Descriptive workflow IDs
with client.envelope(workflow_id="customer-support-email-agent") as env:
    pass

with client.envelope(workflow_id="code-review-assistant") as env:
    pass

# BAD: Generic workflow IDs
with client.envelope(workflow_id="agent") as env:
    pass

3. Guard Before Sensitive Operations

with client.envelope(workflow_id="data-agent") as env:
    # GOOD: Guard before each sensitive operation
    if env.guard("read_database", input_text=query):
        data = database.query(query)

        if env.guard("send_email", input_text=str(data)):
            send_email(data)

    # BAD: Single guard for multiple operations
    if env.guard("process", input_text=query):
        data = database.query(query)  # Not individually governed
        send_email(data)              # Not individually governed

4. Log Meaningful Events

with client.envelope(workflow_id="my-agent") as env:
    # GOOD: Structured, meaningful events
    env.log("document_processed", {
        "document_id": doc.id,
        "page_count": len(doc.pages),
        "processing_time_ms": 1234,
        "status": "success"
    })

    # BAD: Unstructured or minimal events
    env.log("done", {"ok": True})

5. Handle Failures Gracefully

def safe_governed_action(action: str, input_text: str) -> dict:
    """Execute with governance, handle all failure modes."""
    try:
        with client.envelope(workflow_id="safe-agent") as env:
            if not env.guard(action, input_text=input_text):
                return {
                    "status": "blocked",
                    "reason": "policy_violation",
                    "action": action
                }

            result = execute_action(action, input_text)
            return {"status": "success", "result": result}

    except PolicyViolationError as e:
        return {
            "status": "blocked",
            "reason": "policy_violation",
            "policy_id": e.policy_id,
            "message": e.message
        }
    except BudgetExceededError as e:
        return {
            "status": "blocked",
            "reason": "budget_exceeded",
            "current": e.current_spend,
            "limit": e.budget_limit
        }
    except Exception as e:
        # Log unexpected errors
        logging.error(f"Governance error: {e}")

        if client.on_failure == "FAIL_OPEN":
            # Allow execution without governance
            result = execute_action(action, input_text)
            return {"status": "success", "result": result, "governed": False}
        else:
            return {"status": "error", "reason": str(e)}

6. Use Middleware for Cross-Cutting Concerns

# Add once, applies to all events
client.add_middleware(redact_pii)
client.add_middleware(add_correlation_ids)
client.add_middleware(filter_verbose_events)

7. Configure Appropriate Timeouts

# High-throughput, latency-sensitive applications
client = FulcrumClient(
    host="localhost:50051",
    timeout_ms=100,  # Fast timeout
    on_failure="FAIL_OPEN"  # Don't block on timeout
)

# Security-critical applications
client = FulcrumClient(
    host="localhost:50051",
    timeout_ms=5000,  # Allow retries
    on_failure="FAIL_CLOSED"  # Block on timeout
)

Troubleshooting

Connection Issues

Error: "failed to connect to all addresses"

# Check 1: Verify server is running
# Run: nc -zv localhost 50051

# Check 2: Verify host format
client = FulcrumClient(host="localhost:50051")  # Correct
client = FulcrumClient(host="http://localhost:50051")  # Wrong: no protocol

# Check 3: Check firewall/network

Error: "Deadline Exceeded"

# Increase timeout
client = FulcrumClient(
    host="localhost:50051",
    timeout_ms=2000  # Increase from default 500ms
)

Authentication Issues

Error: "UNAUTHENTICATED"

# Check API key is correct
client = FulcrumClient(
    host="localhost:50051",
    api_key="your-actual-api-key"  # Verify this is correct
)

# Verify environment variable
import os
print(os.environ.get("FULCRUM_API_KEY"))

Policy Issues

Actions Always Blocked

# 1. Check if using FAIL_CLOSED with unavailable server
client = FulcrumClient(
    host="localhost:50051",
    on_failure="FAIL_OPEN"  # Try FAIL_OPEN for testing
)

# 2. Enable debug logging
import logging
logging.getLogger("fulcrum").setLevel(logging.DEBUG)

# 3. Check policy evaluation result
with client.envelope(workflow_id="debug") as env:
    result = env.guard("test_action", input_text="test")
    print(f"Guard result: {result}")

Actions Always Allowed

# Check if FAIL_OPEN is masking errors
import logging
logging.getLogger("fulcrum").setLevel(logging.DEBUG)

# Temporarily use FAIL_CLOSED to see real results
client = FulcrumClient(
    host="localhost:50051",
    on_failure="FAIL_CLOSED"
)

Event Logging Issues

Events Not Appearing

# 1. Ensure proper shutdown
client.shutdown()  # Flushes event queue

# 2. Check for middleware dropping events
# Your middleware might return None

# 3. Check payload serialization
env.log("test", {"valid": "json"})  # Works
env.log("test", {"invalid": object()})  # May fail silently

Debug Logging

import logging

# Enable detailed logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("fulcrum")
logger.setLevel(logging.DEBUG)

# This will show:
# - Connection attempts
# - Policy evaluation requests/responses
# - Event queue operations
# - Retry attempts

Health Check

def check_fulcrum_health(client: FulcrumClient) -> dict:
    """Check Fulcrum connectivity and basic operation."""
    health = {
        "connected": False,
        "policy_service": False,
        "event_service": False,
        "errors": []
    }

    try:
        # Test envelope creation
        with client.envelope(workflow_id="health-check") as env:
            health["connected"] = True

            # Test policy evaluation
            try:
                env.guard("health_check", input_text="test")
                health["policy_service"] = True
            except Exception as e:
                health["errors"].append(f"Policy: {e}")

            # Test event logging
            try:
                env.log("health_check", {"test": True})
                health["event_service"] = True
            except Exception as e:
                health["errors"].append(f"Events: {e}")

    except Exception as e:
        health["errors"].append(f"Connection: {e}")

    return health

# Usage
status = check_fulcrum_health(client)
print(f"Fulcrum Health: {status}")

Version History

Version Date Changes
0.1.0 2026-01-06 Initial release

Support

  • Documentation: https://docs.fulcrum.dev
  • Email: support@fulcrum.dev
  • GitHub Issues: https://github.com/fulcrum-io/fulcrum/issues

Document Version: 1.0 Last Updated: January 6, 2026 SDK Version: 0.1.0