Web App Inference Flow

Overview

This document describes how machine learning inference works in the VIPR Web Application, covering the complete flow from user interaction in the frontend through backend processing to core execution.

Architecture Overview

VIPR Web Application uses a 3-tier architecture with asynchronous background processing:

┌─────────────────────────────────────────────────────┐
│  Tier 1: Presentation Layer                         │
│  Frontend (Nuxt.js/Vue)                             │
│  - Configuration UI                                 │
│  - Pinia State Management                           │
│  - Progress Tracking                                │
└──────────────┬──────────────────────────────────────┘
               │ REST API (JSON)
┌──────────────▼──────────────────────────────────────┐
│  Tier 2: Application Logic Layer                    │
│  Backend (FastAPI + Celery + Redis)                 │
│  ┌─────────────────────────────────────────────┐   │
│  │ FastAPI                                     │   │
│  │ - API Endpoints                             │   │
│  │ - Request Validation                        │   │
│  │ - Task Orchestration                        │   │
│  └─────────────┬───────────────────────────────┘   │
│                │ Message Queue                      │
│  ┌─────────────▼───────────────────────────────┐   │
│  │ Redis (Message Broker)                      │   │
│  └─────────────┬───────────────────────────────┘   │
│                │ Task Queue                         │
│  ┌─────────────▼───────────────────────────────┐   │
│  │ Celery Workers                              │   │
│  │ - Async Task Execution                      │   │
│  │ - Progress Reporting                        │   │
│  │ - Result Storage                            │   │
│  └─────────────────────────────────────────────┘   │
└──────────────┬──────────────────────────────────────┘
               │ Subprocess Execution
┌──────────────▼──────────────────────────────────────┐
│  Tier 3: Business Logic Layer                       │
│  ┌────────────────────────────────────────────┐    │
│  │ Core Framework (VIPR CLI)                  │    │
│  │ - Generic 6-Step Inference Workflow        │    │
│  │ - Plugin System                            │    │
│  │ - Hook/Filter Processing                   │    │
│  └────────────┬───────────────────────────────┘    │
│               │ Extended by Domain Plugins         │
│  ┌────────────▼───────────────────────────────┐    │
│  │ Domain Plugins (e.g., Reflectometry)       │    │
│  │ - Custom Handlers (data, model, predictor) │    │
│  │ - Domain-specific Hooks & Filters          │    │
│  │ - ML Models (Reflectorch, PANPE)           │    │
│  └────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────┘

Complete Inference Flow

Sequence Diagram

Web App Inference Workflow

📊 View Mermaid Source Code (for editors with Mermaid support)
sequenceDiagram
    participant U as User
    participant F as Frontend
    participant API as FastAPI Backend
    participant C as Celery Worker
    participant CLI as VIPR CLI
    participant R as Result Storage

    U->>F: 1. Configure & Click "Run Prediction"
    F->>F: 2. Prepare config (Pinia Store)
    F->>API: 3. POST /api/inference/run (JSON)
    API->>API: 4. Validate config (Security)
    API->>C: 5. Start Celery task
    C->>C: 6. Create temp YAML config
    C->>CLI: 7. Execute: vipr --config config.yaml inference run
    
    loop Progress Updates
        CLI->>C: 8a. Update progress (Celery state)
        C->>F: 8b. Poll: GET /api/inference/progress/{task_id}
        F->>F: 8c. Update progress UI
    end
    
    CLI->>R: 9. Store results (UUID)
    CLI-->>C: 10. Return success
    C-->>API: 11. Task complete
    F->>API: 12. GET /api/ui/{task_id}
    API->>R: 13. Fetch results
    R-->>API: 14. Return data
    API-->>F: 15. JSON response
    F->>U: 16. Display results

Detailed Steps

1. Configuration (Frontend)

Location: vipr-frontend/layers/base/app/stores/pipelineConfigurationStore.ts

The user configures the inference pipeline through the Web UI:

// Store structure
interface VIPRConfigWithStreaming {
  config_name: string;
  vipr: {
    inference: {
      hooks: {...},
      filters: {...},
      load_data: {...},
      load_model: {...},
      normalize: {...},
      preprocess: {...},
      prediction: {...},
      postprocess: {...}
    }
  };
  streaming?: {...}
}

Key Components:

  • ExportImportConfig.vue: Load/save configurations

  • PipelineConfigurationStore: Centralized state management

  • UI Components: Step-by-step configuration interface

2. Trigger Inference (Frontend)

Location: vipr-frontend/layers/base/app/stores/inferenceStore.ts

User clicks “Run Prediction” button:

async function runInference() {
  isLoading.value = true;
  
  // 1. Call async prediction endpoint
  const response = await $inferenceApi.runInferenceAsyncApiInferenceRunPost(
    pipelineConfigStore.standardConfig
  );
  
  const taskId = response.data?.task_id;
  
  // 2. Start progress tracking
  const { startPolling } = useProgressTracking(taskId);
  await startPolling();
  
  // 3. Fetch results when complete
  const resultResponse = await $uiApi.getUIResult(taskId);
  displayData.value = resultResponse.data.data;
}

3. API Endpoint (Backend)

Location: vipr-api/services/vipr/api/web/routers/inference/tasks.py

Backend receives the inference request:

@router.post("/run")
async def run_inference_async(config: VIPRInference) -> dict:
    # 1. Security validation
    validation_result = validate_config_security(config_dict)
    
    # 2. Start Celery background task
    task = run_vipr_inference.delay(config_dict, None)
    
    return {
        "task_id": task.id,
        "status": "started",
        "message": "VIPR inference task started successfully"
    }

Key Features:

  • Security Validation: Blocks malicious configs

  • Async Execution: Non-blocking API response

  • Task ID: UUID for tracking and result retrieval

4. Celery Task (Background Processing)

Location: vipr-api/celery_workers/src/tasks/inference.py

Celery worker executes the inference in a dedicated process:

@celery_app.task(bind=True, base=VIPRTask)
def run_vipr_inference(self, config_dict: dict, result_id: str):
    # 1. Transform config (add result_id, consumer_id)
    config_dict = prepare_vipr_config(config_dict, task_id)
    
    # 2. Create temporary YAML config file
    temp_path = Path(config_dir) / f"config_{uuid.uuid4()}.yaml"
    with open(temp_path, 'w') as f:
        yaml.dump(config_dict, f)
    
    # 3. Update progress
    self.update_state(
        state=CeleryState.PROGRESS,
        meta={'message': 'Starting VIPR runner...'}
    )
    
    # 4. Execute VIPR CLI
    runner.run_controller("inference", "run", str(temp_path), celery_task_ref=self)
    
    return {
        'status': 'SUCCESS',
        'result_id': task_id
    }

Why Celery?

Celery workers run in dedicated processes, solving several critical issues:

  1. Process Isolation for Signal Handling

    • Problem: VIPR CLI (Cement framework) requires signal handlers (SIGINT, SIGTERM) for graceful shutdown and cleanup

    • Conflict: Uvicorn (FastAPI’s ASGI server) runs an asyncio event loop in the main thread, preventing proper signal handler registration by VIPR

    • Solution: Celery workers execute VIPR CLI in their own process with a dedicated main thread, allowing signal handlers to function correctly

  2. Non-blocking API

    • FastAPI returns immediately with a task_id

    • Long-running inference doesn’t block the API server

  3. Real-time Progress Updates

    • Workers report progress via Celery state mechanism

    • Frontend can poll for updates without blocking

  4. Horizontal Scalability

    • Multiple Celery workers can process tasks in parallel

    • Each worker handles inference in its own isolated process

5. VIPR CLI Execution (Core)

Location: vipr-core/vipr/plugins/inference/inference.py

The CLI executes the 6-step inference workflow:

class Inference(BaseInference):
    def run(self):
        # Global start hooks
        self._execute_start_hooks()
        
        # 1. Load Data
        self.load_data_step.run()
        self.original_data = self.app.inference.data
        
        # 2. Load Model
        self.load_model_step.run()
        self.model = self.app.inference.model
        
        # 3. Normalize (via filters)
        self.normalize_step.run()
        
        # 4. Preprocess (via filters)
        self.preprocess_step.run()
        self.preprocessed_data = self.app.inference.data
        
        # 5. Predict
        self.prediction_step.run()
        self.result = self.app.inference.result
        
        # 6. Postprocess
        self.postprocess_step.run()
        
        # Global complete hooks
        self._execute_complete_hooks()
        
        return self.result

Progress Updates: Each step can report progress back to Celery using the celery_task_ref:

if hasattr(self.app, 'celery_task_ref') and self.app.celery_task_ref:
    self.app.celery_task_ref.update_state(
        state=CeleryState.PROGRESS,
        meta={
            'current_item': i + 1,
            'total_items': total,
            'message': f'Processing item {i+1}/{total}'
        }
    )

6. Result Storage (Backend)

Location: vipr-core/vipr/plugins/api/__init__.py

Results are automatically stored at the end of inference via a hook:

# Automatic hook after inference completes
@app.hook.register('INFERENCE_COMPLETE_HOOK')
def store_ui_data(app):
    result_id = app.config.get('vipr', 'result_id')
    
    # DataCollector automatically saves all collected data
    app.datacollector.save_result(result_id)

What Gets Stored:

  • UIData Model: Tables, diagrams, images, logs collected during the 6-step workflow

  • Structured Format: Server-side storage with UUID-based retrieval

  • Separate Files: CSV exports of tables/diagrams, PNG/SVG images for easy viewing outside the web app

Retrieval:

# Frontend fetches via API endpoint
GET /api/ui/{result_id}

Why Server-Side Storage?

  • Large Results: Avoid sending large data in immediate API responses

  • Persistent Access: Results remain available after task completion

7. Progress Tracking (Frontend)

Location: vipr-frontend/app/composables/useProgressTracking.ts

Frontend polls for progress updates:

export function useProgressTracking(taskId: string) {
  const progress = ref<TaskProgressResponse | null>(null);
  
  async function startPolling() {
    const interval = setInterval(async () => {
      const response = await $inferenceApi.getTaskProgressApiInferenceProgressTaskIdGet(taskId);
      progress.value = response.data;
      
      if (isComplete.value) {
        clearInterval(interval);
      }
    }, 1000); // Poll every second
  }
  
  return { progress, startPolling };
}

8. Result Visualization (Frontend)

Location: vipr-frontend/layers/base/app/stores/resultsStore.ts

Results are displayed using specialized components:

const displayData = ref<UIData[]>([]);

// Fetch and display results
const resultResponse = await $uiApi.getUIResult(taskId);
displayData.value = resultResponse.data.data;

Result Types:

  • Diagrams: Line plots, scatter plots (Plotly.js)

  • Tables: Parameter tables, statistics

  • Images: SLD profiles, visualizations

  • Logs: Processing logs, errors

Configuration Formats

Web App Format (JSON)

{
  "config_name": "PTCDI-C3_XRR",
  "vipr": {
    "inference": {
      "load_data": {
        "handler": "csv_spectrareader",
        "parameters": {
          "data_path": "@vipr_reflectometry/reflectorch/examples/data/PTCDI-C3.txt",
          "column_mapping": {"q": 0, "I": 1}
        }
      },
      "load_model": {
        "handler": "reflectorch",
        "parameters": {
          "config_name": "b_mc_point_xray_conv_standard_L2_InputQ"
        }
      },
      "prediction": {
        "handler": "reflectorch_predictor",
        "parameters": {
          "calc_pred_curve": true,
          "calc_pred_sld_profile": true
        }
      }
    }
  },
  "streaming": {
    "rabbitmq_config": {
      "rabbitmq_url": "amqp://localhost:5672/"
    }
  }
}

CLI Format (YAML)

vipr:
  inference:
    load_data:
      handler: csv_spectrareader
      parameters:
        data_path: '@vipr_reflectometry/reflectorch/examples/data/PTCDI-C3.txt'
        column_mapping:
          q: 0
          I: 1
    load_model:
      handler: reflectorch
      parameters:
        config_name: b_mc_point_xray_conv_standard_L2_InputQ
    prediction:
      handler: reflectorch_predictor
      parameters:
        calc_pred_curve: true
        calc_pred_sld_profile: true
  result_id: PTCDI-C3_XRR_a1b2c3d4