# Web App Inference Flow
## Overview
This document describes how machine learning inference works in the VIPR Web Application, covering the complete flow from user interaction in the frontend through backend processing to core execution.
## Architecture Overview
VIPR Web Application uses a **3-tier architecture** with **asynchronous background processing**:
```
┌─────────────────────────────────────────────────────┐
│ Tier 1: Presentation Layer │
│ Frontend (Nuxt.js/Vue) │
│ - Configuration UI │
│ - Pinia State Management │
│ - Progress Tracking │
└──────────────┬──────────────────────────────────────┘
│ REST API (JSON)
┌──────────────▼──────────────────────────────────────┐
│ Tier 2: Application Logic Layer │
│ Backend (FastAPI + Celery + Redis) │
│ ┌─────────────────────────────────────────────┐ │
│ │ FastAPI │ │
│ │ - API Endpoints │ │
│ │ - Request Validation │ │
│ │ - Task Orchestration │ │
│ └─────────────┬───────────────────────────────┘ │
│ │ Message Queue │
│ ┌─────────────▼───────────────────────────────┐ │
│ │ Redis (Message Broker) │ │
│ └─────────────┬───────────────────────────────┘ │
│ │ Task Queue │
│ ┌─────────────▼───────────────────────────────┐ │
│ │ Celery Workers │ │
│ │ - Async Task Execution │ │
│ │ - Progress Reporting │ │
│ │ - Result Storage │ │
│ └─────────────────────────────────────────────┘ │
└──────────────┬──────────────────────────────────────┘
│ Subprocess Execution
┌──────────────▼──────────────────────────────────────┐
│ Tier 3: Business Logic Layer │
│ ┌────────────────────────────────────────────┐ │
│ │ Core Framework (VIPR CLI) │ │
│ │ - Generic 5-Step Inference Workflow │ │
│ │ - Plugin System │ │
│ │ - Hook/Filter Processing │ │
│ └────────────┬───────────────────────────────┘ │
│ │ Extended by Domain Plugins │
│ ┌────────────▼───────────────────────────────┐ │
│ │ Domain Plugins (e.g., Reflectometry) │ │
│ │ - Custom Handlers (data, model, predictor) │ │
│ │ - Domain-specific Hooks & Filters │ │
│ │ - ML Models (Reflectorch, PANPE) │ │
│ └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
```
## Complete Inference Flow
### Sequence Diagram

📊 View Mermaid Source Code (for editors with Mermaid support)
```mermaid
sequenceDiagram
participant U as User
participant F as Frontend
participant API as FastAPI Backend
participant C as Celery Worker
participant CLI as VIPR CLI
participant R as Result Storage
U->>F: 1. Configure & Click "Run Prediction"
F->>F: 2. Prepare config (Pinia Store)
F->>API: 3. POST /api/inference/run (JSON)
API->>API: 4. Validate config (Security)
API->>C: 5. Start Celery task
C->>C: 6. Create temp YAML config
C->>CLI: 7. Execute: vipr --config config.yaml inference run
loop Progress Updates
CLI->>C: 8a. Update progress (Celery state)
C->>F: 8b. Poll: GET /api/inference/progress/{task_id}
F->>F: 8c. Update progress UI
end
CLI->>R: 9. Store results (UUID)
CLI-->>C: 10. Return success
C-->>API: 11. Task complete
F->>API: 12. GET /api/ui/{task_id}
API->>R: 13. Fetch results
R-->>API: 14. Return data
API-->>F: 15. JSON response
F->>U: 16. Display results
```
## Detailed Steps
### 1. Configuration (Frontend)
**Location:** `vipr-frontend/layers/base/app/stores/pipelineConfigurationStore.ts`
The user configures the inference pipeline through the Web UI:
```typescript
// Store structure (vipr-frontend/layers/base/app/stores/pipelineConfigurationStore.ts)
const config = reactive({
config_name: 'Default config',
vipr: {
inference: {
hooks: {},
filters: {},
load_data: {},
load_model: {},
prediction: {},
preprocess: {},
postprocess: {}
},
config_name: 'Default config'
},
streaming: {
rabbitmq_config: DEFAULT_STREAMING_CONFIG.rabbitmq_config
}
});
```
**Key Components:**
- **ExportImportConfig.vue**: Load/save configurations
- **PipelineConfigurationStore**: Centralized state management
- **UI Components**: Step-by-step configuration interface
### 2. Trigger Inference (Frontend)
**Location:** `vipr-frontend/layers/base/app/stores/inferenceStore.ts`
User clicks "Run Prediction" button:
```typescript
async function runInference() {
isLoading.value = true;
// 1. Call async prediction endpoint
const response = await $inferenceApi.runInferenceAsyncApiInferenceRunPost(
pipelineConfigStore.standardConfig
);
const taskId = response.data?.task_id;
// 2. Start progress tracking
const { startPolling } = useProgressTracking(taskId);
await startPolling();
// 3. Fetch results when complete
const resultResponse = await $uiApi.getUIResult(taskId);
displayData.value = resultResponse.data.data;
}
```
### 3. API Endpoint (Backend)
**Location:** `vipr-api/vipr_api/web/routers/inference/tasks.py`
Backend receives the inference request:
```python
@router.post("/run")
async def run_inference_async(config: VIPRInference) -> dict[str, Any]:
# 0. Check Celery backend availability (Redis or eager mode)
can_execute, mode, error_msg = check_celery_backend()
if not can_execute:
raise HTTPException(status_code=503, detail=error_msg)
# 1. Convert config to dict
config_dict = config.model_dump(by_alias=True)
# 2. Security validation — blocks malicious configs
validation_result = validate_config_security(config_dict)
if validation_result['errors']:
raise HTTPException(status_code=400, detail=...)
# 3. Start Celery background task
task = run_vipr_inference.delay(config_dict, None)
return {
"task_id": task.id,
"status": "started",
"message": "VIPR inference task started successfully",
"security_validated": True
}
```
**Key Features:**
- **Backend Health Check**: Verifies Redis/Celery availability before accepting request
- **Security Validation**: Blocks malicious configs; warnings are logged but non-blocking
- **Async Execution**: Non-blocking API response
- **Task ID**: UUID for tracking and result retrieval
### 4. Celery Task (Background Processing)
**Location:** `vipr-api/vipr_api/celery/src/tasks/inference.py`
Celery worker executes the inference in a dedicated process:
```python
@celery_app.task(bind=True, base=VIPRTask)
def run_vipr_inference(self, config_dict: dict[str, Any], result_id: str) -> dict[str, Any]:
# 1. Report initial progress
self.update_state(state=CeleryState.PROGRESS,
meta={'message': 'Starting VIPR inference...'})
# 2. Use task_id as unified result identifier
task_id = self.request.id
# 3. Transform config (inject result_id, consumer_id)
config_dict = prepare_vipr_config(config_dict, task_id)
# 4. Write temporary YAML config file (VIPR runner requires file-based config)
config_name = config_dict.get('vipr', {}).get('config_name', 'unnamed')
temp_path = Path(config_dir) / f"{config_name}_config_{task_id}.yaml"
with open(temp_path, 'w') as f:
yaml.dump(config_dict, f)
# 5. Execute VIPR CLI in worker main thread (signal handlers work here)
runner.run_controller("inference", "run", str(temp_path), celery_task_ref=self)
return {
'status': 'SUCCESS',
'result_id': task_id,
'message': 'VIPR inference completed successfully'
}
```
**Why Celery?**
Celery workers run in dedicated processes, solving several critical issues:
1. **Process Isolation for Signal Handling**
- **Problem**: VIPR CLI (Cement framework) requires signal handlers (SIGINT, SIGTERM) for graceful shutdown and cleanup
- **Conflict**: Uvicorn (FastAPI's ASGI server) runs an asyncio event loop in the main thread, preventing proper signal handler registration by VIPR
- **Solution**: Celery workers execute VIPR CLI in their own process with a dedicated main thread, allowing signal handlers to function correctly
2. **Non-blocking API**
- FastAPI returns immediately with a task_id
- Long-running inference doesn't block the API server
3. **Real-time Progress Updates**
- Workers report progress via Celery state mechanism
- Frontend can poll for updates without blocking
4. **Horizontal Scalability**
- Multiple Celery workers can process tasks in parallel
- Each worker handles inference in its own isolated process
### 5. VIPR CLI Execution (Core)
**Location:** `vipr-core/vipr/plugins/inference/inference.py`
The CLI executes a 5-step inference workflow:
```python
class Inference(BaseInference):
def run(self, **config_overrides):
# Global start hooks
self._execute_start_hooks()
# 1. Load Data → returns DataSet
self.original_data = self.load_data_step.run(**self._ovr(config_overrides, "load_data"))
# 2. Load Model
self.model = self.load_model_step.run(**self._ovr(config_overrides, "load_model"))
# 3. Step: Preprocess data (DataSet -> DataSet)
# Note: normalization is handled as a filter inside this step
# (registered with a negative weight so it runs first)
self.preprocessed_data = self.preprocess_step.run(
self.original_data, **self._ovr(config_overrides, "preprocess")
)
# 4. Step: Perform prediction (DataSet -> results)
predicted_data = self.prediction_step.run(
self.preprocessed_data, **self._ovr(config_overrides, "prediction")
)
# 5. Postprocess
self.result = self.postprocess_step.run(
predicted_data, **self._ovr(config_overrides, "postprocess")
)
# Global completion hook
self._execute_complete_hook()
return self.result
```
**Progress Updates:**
Each step can report progress back to Celery using the `celery_task_ref`:
```python
if hasattr(self.app, 'celery_task_ref') and self.app.celery_task_ref:
self.app.celery_task_ref.update_state(
state=CeleryState.PROGRESS,
meta={
'current_item': i + 1,
'total_items': total,
'message': f'Processing item {i+1}/{total}'
}
)
```
### 6. Result Storage (Backend)
**Location:** `vipr-core/vipr/plugins/api/__init__.py`
Results are automatically stored at the end of inference via a hook registered during plugin load:
```python
def store_ui_data_directly(app, inference=None):
# 1. Collect buffered log entries from VIPRLogHandler
if hasattr(app.log, '_log_buffer'):
app.datacollector.data.global_logs.extend(app.log._log_buffer)
app.log._log_buffer.clear()
# 2. Read result UUID from config
result_id = app.config.get('vipr', 'result_id')
# 3. Persist all collected UI data (tables, diagrams, images, logs)
app.datacollector.save_result(result_id)
app.hook.register('INFERENCE_COMPLETE_HOOK', store_ui_data_directly, weight=200)
```
**What Gets Stored:**
- **`data.pkl`**: Full UIData model (tables, diagrams, images, logs) serialised as pickle for UUID-based retrieval
- **`images/*.svg`**: SVG image exports
- **`tables/*.csv` / `tables/*.txt`**: CSV and plain-text exports of tables
- **`diagrams/*.csv`**: CSV exports of diagram series data; diagram statistics as `*_stats.txt`
- **`scripts/`**: Auto-generated standalone Plotly scripts for each diagram
**Retrieval:**
```
GET /api/ui/result?id={result_id}
```
**Design rationale:** Because inference runs asynchronously in a Celery worker, the result cannot be returned in the HTTP response — the worker and the API communicate only via Redis task metadata (`task_id`, state). Redis is intentionally kept free of large payloads. The worker therefore writes the result directly to the filesystem. Once the task reaches `SUCCESS`, the frontend retrieves the result by ID through a dedicated API endpoint, fully decoupled from the original request.
### 7. Progress Tracking (Frontend)
**Location:** `vipr-frontend/layers/base/app/composables/useProgressTracking.ts`
The composable uses a recursive `setTimeout` loop (not `setInterval`) so the next poll only starts after the current one finishes. The interval adapts: 100 ms on success, 1000 ms after a network error.
```typescript
export const useProgressTracking = (taskId: string) => {
const progress = ref(null)
const isComplete = ref(false)
const error = ref(null)
const isPolling = ref(false)
const fetchProgress = async () => {
const response = await $inferenceApi.getTaskProgressApiInferenceProgressTaskIdGet(taskId)
const data = response.data
if (data.status === CeleryState.Success) {
progress.value = { ..., status: CeleryState.Success, percentage: 100 }
isComplete.value = true
stopPolling()
return NORMAL_INTERVAL
}
// handle Progress / Failure / Pending states ...
return NORMAL_INTERVAL // or ERROR_INTERVAL on catch
}
const poll = async () => {
if (!isPolling.value) return
const nextInterval = await fetchProgress()
if (!isPolling.value) return
timeoutId = setTimeout(poll, nextInterval)
}
const startPolling = () => { isPolling.value = true; poll() }
const stopPolling = () => { clearTimeout(timeoutId); isPolling.value = false }
return { progress, isComplete, error, isPolling, startPolling, stopPolling, cancelTask, reset }
}
```
Once `isComplete` flips to `true`, the Pinia `inferenceStore` (which owns the polling lifecycle) reacts via a Vue `watch` and fetches the actual result data:
```typescript
// vipr-frontend/layers/base/app/stores/inferenceStore.ts
unwatch = watch(isComplete, (complete) => {
if (complete) {
resolve() // breaks the await-Promise
}
})
// After the promise resolves:
const resultResponse = await $uiApi.getUIResult(taskId)
if (resultResponse?.data?.data) {
displayData.value = resultResponse.data.data
resultsStore.resultId = taskId
await resultsStore.loadResultConfig(taskId) // fetch the original YAML config
}
```
### 8. Result Visualization (Frontend)
**Location:** `vipr-frontend/layers/base/app/stores/resultsStore.ts`
After the task reaches `SUCCESS`, the result is loaded by ID into the Pinia results store:
```typescript
const displayData = ref(null)
async function loadHistoryResult(id: string) {
const response = await $uiApi.getUIResult(id)
if (response.data?.data) {
displayData.value = response.data.data as UIData
resultId.value = id
await loadResultConfig(id) // also fetch the original YAML config
}
}
```
**Result Types:**
- **Diagrams**: Interactive line/scatter plots rendered with Plotly.js
- **Tables**: Parameter tables and fit statistics
- **Images**: SVG exports (e.g. SLD profiles)
- **Logs**: Processing logs collected during inference
## Configuration Formats
### Web App Format (JSON)
```json
{
"config_name": "PTCDI-C3_XRR",
"vipr": {
"inference": {
"load_data": {
"handler": "csv_spectrareader",
"parameters": {
"data_path": "@vipr_reflectometry/reflectorch/examples/data/PTCDI-C3.txt",
"column_mapping": {"q": 0, "I": 1}
}
},
"load_model": {
"handler": "reflectorch",
"parameters": {
"config_name": "b_mc_point_xray_conv_standard_L2_InputQ"
}
},
"prediction": {
"handler": "reflectorch_predictor",
"parameters": {
"calc_pred_curve": true,
"calc_pred_sld_profile": true
}
}
}
},
"streaming": {
"rabbitmq_config": {
"rabbitmq_url": "amqp://localhost:5672/"
}
}
}
```
### CLI Format (YAML)
```yaml
vipr:
inference:
load_data:
handler: csv_spectrareader
parameters:
data_path: '@vipr_reflectometry/reflectorch/examples/data/PTCDI-C3.txt'
column_mapping:
q: 0
I: 1
load_model:
handler: reflectorch
parameters:
config_name: b_mc_point_xray_conv_standard_L2_InputQ
prediction:
handler: reflectorch_predictor
parameters:
calc_pred_curve: true
calc_pred_sld_profile: true
result_id: PTCDI-C3_XRR_a1b2c3d4
```