OpenSees Analysis on a FastAPI Python Server
Deep dive into building a production-ready OpenSees backend with process isolation, worker pools, and comprehensive material model implementations
Building a web backend for OpenSees analysis presents unique challenges that don't exist in typical web applications. OpenSees can crash unpredictably, consume excessive memory, and has complex convergence requirements. This post explores the production-ready architecture I developed to handle these challenges while maintaining high availability and user experience.
The Challenge
OpenSees (Open System for Earthquake Engineering Simulation) is a powerful but temperamental computational framework. When integrating it into a web application, several critical issues emerge:
- Process crashes: OpenSees can segfault or hang on invalid input
- Memory leaks: Long-running processes can accumulate memory usage
- Convergence failures: Nonlinear analysis can fail to converge, requiring recovery strategies
- Resource consumption: Analysis can consume significant CPU and memory
- Thread safety: OpenSees is not designed for concurrent execution
The solution required process isolation, robust error handling, and careful resource management.
Architecture Overview
The backend architecture centers around three main components:
- FastAPI Server: Handles HTTP requests, routing, and API logic
- Worker Pool Manager: Manages isolated OpenSees processes with queueing and backpressure
- OpenSees Workers: Isolated processes that perform actual material analysis
# server/main.py - FastAPI application with worker pool lifecycle
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from services.opensees_worker_pool import initialize_worker_pool, shutdown_worker_pool
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage worker pool lifecycle"""
logger.info("Starting OpenSees worker pool...")
try:
# Initialize worker pool with single-core optimized settings
await initialize_worker_pool(
max_workers=1, # Single worker for $5 droplet
max_queue_size=2, # Small queue for backpressure
job_timeout=45.0 # 45 second timeout per job
)
logger.info("Worker pool initialized successfully")
yield
finally:
logger.info("Shutting down worker pool...")
await shutdown_worker_pool()
This architecture ensures that OpenSees crashes cannot affect the main API server, while providing controlled resource usage suitable for production deployment.
OpenSees Integration Deep Dive
Material Model Implementation
The system supports 15+ OpenSees material models, each with unique parameter requirements and validation rules. The material definition system needed to handle complex parameter interdependencies:
# server/services/opensees_worker.py
def _define_material(material_type: str, mat_tag: int, parameters: Dict[str, Union[int, float, str]]):
"""Define OpenSees uniaxial material with enhanced validation."""
if material_type == "Steel01":
Fy = float(parameters.get('Fy', 350.0))
E0 = float(parameters.get('E0', 200000.0))
b = float(parameters.get('b', 0.01))
a1 = float(parameters.get('a1', 0.0))
a2 = float(parameters.get('a2', 1.0))
a3 = float(parameters.get('a3', 0.0))
a4 = float(parameters.get('a4', 1.0))
# Validate parameters
if Fy <= 0 or E0 <= 0:
raise ValueError(f"Steel01: Fy ({Fy}) and E0 ({E0}) must be positive")
if not 0 <= abs(b) <= 1:
raise ValueError(f"Steel01: hardening ratio b ({b}) should be between -1 and 1")
ops.uniaxialMaterial('Steel01', mat_tag, Fy, E0, b, a1, a2, a3, a4)
elif material_type == "Hysteretic":
# Enhanced Hysteretic validation with backbone curve checking
s1p = float(parameters.get('s1p', 350.0))
e1p = float(parameters.get('e1p', 0.00175))
s2p = float(parameters.get('s2p', 400.0))
e2p = float(parameters.get('e2p', 0.02))
# Handle parameter references
s3p_param = parameters.get('s3p', s2p)
s3p = float(s3p_param) if s3p_param != "s2p" else s2p
# Strict validation for backbone curves
if not (0 < e1p < e2p <= e3p):
raise ValueError(
f"Hysteretic: Positive envelope strains must satisfy 0 < e1p({e1p}) < e2p({e2p}) <= e3p({e3p})"
)
ops.uniaxialMaterial('Hysteretic', mat_tag, s1p, e1p, s2p, e2p, s3p, e3p,
s1n, e1n, s2n, e2n, s3n, e3n, pinchX, pinchY, damage1, damage2, beta)
Each material implementation includes comprehensive parameter validation that catches engineering errors before they cause OpenSees crashes.
Strain History Processing
Strain inputs need careful normalization and validation. Engineers might input percentages or decimal strains, and the system needs to handle both gracefully:
def _normalize_strain_array(strains: Sequence[float]) -> "np.ndarray":
"""Ensure strain history is finite, non-empty, and dimensionless (auto-convert from %)."""
arr = np.asarray(list(strains), dtype=float)
if arr.size == 0 or not np.all(np.isfinite(arr)):
raise ValueError("loading_protocol produced empty or non-finite strain history")
# auto-detect percent input: if any |strain| > 0.5, treat as %
if np.nanmax(np.abs(arr)) > 0.5:
arr = arr / 100.0
return arr
This automatic detection prevents common user errors while maintaining flexibility for different input formats.
Process Pool Implementation
The worker pool architecture is the heart of the system's robustness. It provides process isolation, resource limits, and automatic recovery:
# server/services/opensees_worker_pool.py
class OpenSeesWorkerPool:
"""
Manages OpenSees worker processes with strict resource limits and error handling.
Designed for single-core environments like $5 DigitalOcean droplets.
"""
def __init__(self,
max_workers: int = 1,
max_queue_size: int = 2,
job_timeout: float = 45.0,
worker_restart_delay: float = 1.0):
self.max_workers = max_workers
self.max_queue_size = max_queue_size
self.job_timeout = job_timeout
# Process pool with spawn context for better isolation
self._mp_context = mp.get_context('spawn')
# Monitoring statistics
self._active_jobs = 0
self._total_jobs = 0
self._failed_jobs = 0
self._timeout_jobs = 0
Queue Management and Backpressure
The system implements bounded queues with HTTP 429 responses when capacity is exceeded:
async def submit_analysis(self, material_type: str, parameters: Dict, strain_history: List[float]) -> Dict:
"""Submit a material analysis job with backpressure handling."""
# Check if we can acquire a semaphore slot (non-blocking)
try:
await asyncio.wait_for(self._semaphore.acquire(), timeout=0.001)
except asyncio.TimeoutError:
raise QueueFullError(f"Worker queue is full (max {self.max_queue_size} jobs)")
try:
self._active_jobs += 1
# Submit job to process pool
future = self._executor.submit(analyze_material_worker, material_type, parameters, strain_history)
# Wait for completion with timeout
result = await asyncio.wait_for(
loop.run_in_executor(None, future.result),
timeout=self.job_timeout
)
return result
except asyncio.TimeoutError:
self._timeout_jobs += 1
future.cancel()
await self._maybe_restart_pool()
raise WorkerTimeoutError(f"Analysis timed out after {self.job_timeout} seconds")
finally:
self._active_jobs -= 1
self._semaphore.release()
This approach prevents the server from being overwhelmed while providing clear feedback to clients.
Automatic Pool Restart
When workers crash or become unresponsive, the pool can restart automatically:
async def _restart_pool(self):
"""Restart the worker pool due to process corruption."""
logger.info("Restarting worker pool due to process corruption...")
# Force shutdown current pool
if self._executor:
try:
self._executor.shutdown(wait=False, cancel_futures=True)
except Exception as e:
logger.warning(f"Error during executor shutdown: {e}")
finally:
self._executor = None
# Wait for cleanup and process termination
await asyncio.sleep(max(self.worker_restart_delay, 2.0))
# Create fresh process pool
self._executor = ProcessPoolExecutor(
max_workers=self.max_workers,
mp_context=self._mp_context
)
logger.info("Worker pool successfully restarted")
This automatic recovery ensures high availability even when OpenSees processes crash.
OpenSees Analysis Implementation
The core analysis function runs in isolated worker processes and implements sophisticated convergence strategies:
def analyze_material_worker(material_type: str, parameters: Dict, strain_history: List[float]) -> Dict:
"""Worker function that runs OpenSees analysis in an isolated process."""
try:
# Clear any existing model
ops.wipe()
# Create 1D model
ops.model('basic', '-ndm', 1, '-ndf', 1)
# Define material and create test element
mat_tag = int(parameters.get('matTag', 1))
_define_material(material_type, mat_tag, parameters)
ops.node(1, 0.0)
ops.node(2, 0.0)
ops.fix(1, 1)
ops.element('zeroLength', 1, 1, 2, '-mat', mat_tag, '-dir', 1)
# Analysis setup with robust convergence strategy
ops.constraints('Plain')
ops.numberer('RCM')
ops.system('FullGeneral')
ops.test('NormUnbalance', 1e-6, 25, 0)
ops.algorithm('NewtonLineSearch', '-type', 'Bisection')
ops.integrator('LoadControl', 1.0)
ops.analysis('Static')
# Process strain history with adaptive step control
strains = _normalize_strain_array(strain_history)
hysteresis = []
for target_strain in strains:
# Adaptive stepping with convergence recovery
if not _apply_strain_increment(target_strain):
# Recovery strategies for convergence failure
if not _try_recovery_strategies():
break # Exit gracefully with partial data
# Record current state
strain = float(ops.nodeDisp(2, 1))
stress = _read_stress_from_reaction()
hysteresis.append({"strain": strain, "stress": stress})
return {
"hysteresis_data": hysteresis,
"success": True,
"material_type": material_type,
"num_points": len(hysteresis)
}
except Exception as e:
return {
"error": f"Worker analysis failed: {str(e)}",
"success": False,
"material_type": material_type
}
finally:
# Always clean up OpenSees state
try:
ops.wipe()
except:
pass
Convergence Recovery Strategies
OpenSees nonlinear analysis can fail to converge, requiring sophisticated recovery strategies:
def try_recover() -> bool:
"""Recovery strategy for convergence issues"""
try:
# Try modified Newton with relaxed tolerance
ops.algorithm('ModifiedNewton')
ops.test('NormUnbalance', 1e-5, 100, 0)
if ops.analyze(1) == 0:
# Restore original settings if successful
ops.algorithm('NewtonLineSearch', '-type', 'Bisection')
ops.test('NormUnbalance', 1e-6, 25, 0)
return True
# Restore settings and indicate failure
ops.algorithm('NewtonLineSearch', '-type', 'Bisection')
ops.test('NormUnbalance', 1e-6, 25, 0)
return False
except:
return False
This multi-stage recovery approach maximizes the chance of obtaining useful results even with challenging material models.
Production Deployment Considerations
Single-Core Optimization
The system is optimized for deployment on budget hosting like DigitalOcean $5 droplets:
# Set environment variables for single-threaded BLAS operations
env_vars = {
'OMP_NUM_THREADS': '1',
'MKL_NUM_THREADS': '1',
'OPENBLAS_NUM_THREADS': '1',
'NUMBA_NUM_THREADS': '1',
'NUMPY_NUM_THREADS': '1'
}
for key, value in env_vars.items():
os.environ[key] = value
This prevents numpy/scipy from spawning multiple threads that would overwhelm a single-core environment.
Memory Management
Process isolation provides automatic memory cleanup, but the system also implements explicit limits:
# Guardrails for API calls
max_points = max(1, min(int(parameters.get('maxPoints', 20000)), 50000)) # Hard cap
max_dU = float(parameters.get('maxDispIncrement', 1e-4))
min_dU = float(parameters.get('minDispIncrement', 1e-8))
# Additional safety limits
if max_dU > 0.1: # 10% strain increment seems excessive
max_dU = 0.1
if min_dU < 1e-12: # Avoid numerical issues
min_dU = 1e-12
These limits prevent memory exhaustion from extremely large analysis requests.
Health Monitoring
The system provides comprehensive health monitoring and statistics:
async def health_check(self) -> Dict[str, Any]:
"""Perform a health check on the worker pool"""
if self._executor is None:
return {"healthy": False, "reason": "Worker pool not started"}
# Check error rates
stats = self.get_stats()
if stats["total_jobs"] >= 5 and stats["success_rate"] < 0.3:
return {"healthy": False, "reason": f"Low success rate: {stats['success_rate']:.1%}"}
return {
"healthy": True,
"stats": {
"active_jobs": self._active_jobs,
"total_jobs": self._total_jobs,
"success_rate": (self._total_jobs - self._failed_jobs - self._timeout_jobs) / max(self._total_jobs, 1),
"max_workers": self.max_workers,
"job_timeout": self.job_timeout
}
}
This enables monitoring and alerting in production environments.
Error Handling Across Process Boundaries
Handling errors across process boundaries requires careful serialization and error classification:
# In the main process
try:
result = await worker_pool.submit_analysis(material_type, parameters, strain_history)
return result
except QueueFullError:
raise HTTPException(status_code=429, detail="Server busy - please try again later")
except WorkerTimeoutError as e:
raise HTTPException(status_code=408, detail=str(e))
except WorkerCrashError as e:
raise HTTPException(status_code=503, detail="Analysis service temporarily unavailable")
except WorkerPoolError as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
Different error types map to appropriate HTTP status codes, providing clear feedback to clients.
TypeScript Integration
The backend provides strongly-typed interfaces for the frontend through a TypeScript client:
// src/lib/opensees-client.ts
export interface MaterialAnalysisRequest {
material_type: string;
parameters: Record<string, number | string>;
strain_history: number[];
}
export interface MaterialAnalysisResponse {
material_type: string;
parameters: Record<string, number | string>;
hysteresis_data: HysteresisPoint[];
analysis_summary?: Record<string, string | number>;
}
class OpenSeesClient {
async analyzeMaterial(request: MaterialAnalysisRequest): Promise<MaterialAnalysisResponse> {
return this.request<MaterialAnalysisResponse>('/api/materials/analyze', {
method: 'POST',
body: JSON.stringify(request),
});
}
}
This provides end-to-end type safety from frontend to backend.
Performance Optimization
Request Timeout Management
Different endpoints have different timeout requirements:
async def timeout_middleware(request: Request, call_next):
try:
# Set appropriate timeouts based on endpoint
if "/analyze" in str(request.url):
timeout_seconds = 60 # Analysis can take time
elif "/health" in str(request.url):
timeout_seconds = 5 # Health checks should be fast
else:
timeout_seconds = 15 # Other endpoints
return await asyncio.wait_for(call_next(request), timeout=timeout_seconds)
except asyncio.TimeoutError:
return JSONResponse(
status_code=408,
content={"detail": f"Request timeout after {timeout_seconds} seconds"}
)
This prevents long-running requests from blocking other operations.
Caching Strategy
While the current implementation doesn't include caching, the architecture supports adding Redis caching for repeated analyses:
# Future enhancement: cache analysis results
cache_key = f"analysis:{material_type}:{hash(str(parameters))}:{hash(str(strain_history))}"
cached_result = await redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
Key Lessons Learned
Building this production backend taught several crucial lessons:
- Process Isolation is Essential: OpenSees crashes cannot be prevented, only contained
- Backpressure Prevents Cascading Failures: Bounded queues with 429 responses maintain system stability
- Automatic Recovery is Critical: Worker pools must restart automatically when corruption is detected
- Resource Limits are Non-negotiable: Hard caps prevent runaway processes from consuming all resources
- Comprehensive Error Handling: Different error types require different response strategies
- Single-Core Optimization Matters: BLAS threading must be controlled in constrained environments
Conclusion
Building a robust OpenSees backend required treating the computational engine as an untrusted, potentially dangerous dependency. The process pool architecture with comprehensive error handling, resource limits, and automatic recovery creates a production-ready system that can handle the unpredictable nature of complex structural analysis.
The key insight is that reliability comes not from preventing failures, but from isolating them and recovering gracefully. This architecture pattern applies to any system that must integrate unreliable or resource-intensive computational components.
The combination of process isolation, bounded resources, and comprehensive monitoring creates a backend that can serve engineers reliably while protecting the overall system from the inherent instability of complex numerical analysis. This approach enables bringing powerful computational tools to the web without sacrificing reliability or user experience.
Jeremy Atkinson
Jeremy is a structural engineer, researcher, and developer from BC. He works on Calcs.app and writes at Kinson.io