vipr_api.celery.src.tasks package

Submodules

vipr_api.celery.src.tasks.inference module

VIPR Inference Task

Celery task for running VIPR inference in background worker processes. This resolves the signal handler issue by executing VIPR Runner in the main thread of the worker process, while keeping the FastAPI non-blocking.

Integrated into vipr-framework for 3-tier architecture.

(task)vipr_api.celery.src.tasks.inference.generate_standard_config(base_config: dict[str, Any]) dict[str, Any]

Generate standard VIPR configuration from base parameters.

This task can be used to prepare configuration for inference tasks.

Parameters:
  • self – Celery task instance (bound task)

  • base_config – Base configuration parameters

Returns:

dict containing generated standard configuration

(task)vipr_api.celery.src.tasks.inference.run_vipr_inference(config_dict: dict[str, Any], result_id: str) dict[str, Any]

Execute VIPR inference in dedicated worker process.

This solves the signal handler problem because VIPR Runner runs in the main thread of the worker process, where signal handlers function properly.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – VIPR configuration dictionary

  • result_id – UUID for result storage

Returns:

dict containing task result status and result_id

Raises:

Exception – Any error during VIPR execution

vipr_api.celery.src.tasks.streaming module

Streaming Prediction Celery Tasks

Celery tasks for processing individual RabbitMQ messages in real-time streaming prediction scenarios. Each message triggers an independent VIPR inference task using realistic CSV file paths.

(task)vipr_api.celery.src.tasks.streaming.predict_stream_message(config_dict: Dict[str, Any], message_data: Dict[str, Any]) Dict[str, Any]

Process a single RabbitMQ message for streaming prediction using CSV file paths.

This task extracts CSV file paths from RabbitMQ messages and uses the existing csv_spectrareader data loader for optimal performance and realistic simulation of live measurements.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – Base VIPR configuration dictionary

  • message_data – RabbitMQ message containing CSV file path reference

Returns:

dict containing task result status and metadata

Raises:

Exception – Any error during streaming prediction

(task)vipr_api.celery.src.tasks.streaming.process_streaming_batch(config_dict: Dict[str, Any], messages: List[Dict[str, Any]]) Dict[str, Any]

Process a batch of streaming messages for batch inference scenarios.

This task is useful when multiple messages arrive in quick succession and can be processed together for efficiency.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – Base VIPR configuration dictionary

  • messages – List of RabbitMQ messages to process

Returns:

dict containing batch processing results

Module contents

VIPR Celery Tasks

Background tasks for VIPR framework processing.

(task)vipr_api.celery.src.tasks.generate_standard_config(base_config: dict[str, Any]) dict[str, Any]

Generate standard VIPR configuration from base parameters.

This task can be used to prepare configuration for inference tasks.

Parameters:
  • self – Celery task instance (bound task)

  • base_config – Base configuration parameters

Returns:

dict containing generated standard configuration

(task)vipr_api.celery.src.tasks.predict_stream_message(config_dict: Dict[str, Any], message_data: Dict[str, Any]) Dict[str, Any]

Process a single RabbitMQ message for streaming prediction using CSV file paths.

This task extracts CSV file paths from RabbitMQ messages and uses the existing csv_spectrareader data loader for optimal performance and realistic simulation of live measurements.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – Base VIPR configuration dictionary

  • message_data – RabbitMQ message containing CSV file path reference

Returns:

dict containing task result status and metadata

Raises:

Exception – Any error during streaming prediction

(task)vipr_api.celery.src.tasks.process_streaming_batch(config_dict: Dict[str, Any], messages: List[Dict[str, Any]]) Dict[str, Any]

Process a batch of streaming messages for batch inference scenarios.

This task is useful when multiple messages arrive in quick succession and can be processed together for efficiency.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – Base VIPR configuration dictionary

  • messages – List of RabbitMQ messages to process

Returns:

dict containing batch processing results

(task)vipr_api.celery.src.tasks.run_vipr_inference(config_dict: dict[str, Any], result_id: str) dict[str, Any]

Execute VIPR inference in dedicated worker process.

This solves the signal handler problem because VIPR Runner runs in the main thread of the worker process, where signal handlers function properly.

Parameters:
  • self – Celery task instance (bound task)

  • config_dict – VIPR configuration dictionary

  • result_id – UUID for result storage

Returns:

dict containing task result status and result_id

Raises:

Exception – Any error during VIPR execution