vipr_api.celery.src.tasks package¶
Submodules¶
vipr_api.celery.src.tasks.inference module¶
VIPR Inference Task
Celery task for running VIPR inference in background worker processes. This resolves the signal handler issue by executing VIPR Runner in the main thread of the worker process, while keeping the FastAPI non-blocking.
Integrated into vipr-framework for 3-tier architecture.
- (task)vipr_api.celery.src.tasks.inference.generate_standard_config(base_config: dict[str, Any]) dict[str, Any]¶
Generate standard VIPR configuration from base parameters.
This task can be used to prepare configuration for inference tasks.
- Parameters:
self – Celery task instance (bound task)
base_config – Base configuration parameters
- Returns:
dict containing generated standard configuration
- (task)vipr_api.celery.src.tasks.inference.run_vipr_inference(config_dict: dict[str, Any], result_id: str) dict[str, Any]¶
Execute VIPR inference in dedicated worker process.
This solves the signal handler problem because VIPR Runner runs in the main thread of the worker process, where signal handlers function properly.
- Parameters:
self – Celery task instance (bound task)
config_dict – VIPR configuration dictionary
result_id – UUID for result storage
- Returns:
dict containing task result status and result_id
- Raises:
Exception – Any error during VIPR execution
vipr_api.celery.src.tasks.streaming module¶
Streaming Prediction Celery Tasks
Celery tasks for processing individual RabbitMQ messages in real-time streaming prediction scenarios. Each message triggers an independent VIPR inference task using realistic CSV file paths.
- (task)vipr_api.celery.src.tasks.streaming.predict_stream_message(config_dict: Dict[str, Any], message_data: Dict[str, Any]) Dict[str, Any]¶
Process a single RabbitMQ message for streaming prediction using CSV file paths.
This task extracts CSV file paths from RabbitMQ messages and uses the existing csv_spectrareader data loader for optimal performance and realistic simulation of live measurements.
- Parameters:
self – Celery task instance (bound task)
config_dict – Base VIPR configuration dictionary
message_data – RabbitMQ message containing CSV file path reference
- Returns:
dict containing task result status and metadata
- Raises:
Exception – Any error during streaming prediction
- (task)vipr_api.celery.src.tasks.streaming.process_streaming_batch(config_dict: Dict[str, Any], messages: List[Dict[str, Any]]) Dict[str, Any]¶
Process a batch of streaming messages for batch inference scenarios.
This task is useful when multiple messages arrive in quick succession and can be processed together for efficiency.
- Parameters:
self – Celery task instance (bound task)
config_dict – Base VIPR configuration dictionary
messages – List of RabbitMQ messages to process
- Returns:
dict containing batch processing results
Module contents¶
VIPR Celery Tasks
Background tasks for VIPR framework processing.
- (task)vipr_api.celery.src.tasks.generate_standard_config(base_config: dict[str, Any]) dict[str, Any]¶
Generate standard VIPR configuration from base parameters.
This task can be used to prepare configuration for inference tasks.
- Parameters:
self – Celery task instance (bound task)
base_config – Base configuration parameters
- Returns:
dict containing generated standard configuration
- (task)vipr_api.celery.src.tasks.predict_stream_message(config_dict: Dict[str, Any], message_data: Dict[str, Any]) Dict[str, Any]¶
Process a single RabbitMQ message for streaming prediction using CSV file paths.
This task extracts CSV file paths from RabbitMQ messages and uses the existing csv_spectrareader data loader for optimal performance and realistic simulation of live measurements.
- Parameters:
self – Celery task instance (bound task)
config_dict – Base VIPR configuration dictionary
message_data – RabbitMQ message containing CSV file path reference
- Returns:
dict containing task result status and metadata
- Raises:
Exception – Any error during streaming prediction
- (task)vipr_api.celery.src.tasks.process_streaming_batch(config_dict: Dict[str, Any], messages: List[Dict[str, Any]]) Dict[str, Any]¶
Process a batch of streaming messages for batch inference scenarios.
This task is useful when multiple messages arrive in quick succession and can be processed together for efficiency.
- Parameters:
self – Celery task instance (bound task)
config_dict – Base VIPR configuration dictionary
messages – List of RabbitMQ messages to process
- Returns:
dict containing batch processing results
- (task)vipr_api.celery.src.tasks.run_vipr_inference(config_dict: dict[str, Any], result_id: str) dict[str, Any]¶
Execute VIPR inference in dedicated worker process.
This solves the signal handler problem because VIPR Runner runs in the main thread of the worker process, where signal handlers function properly.
- Parameters:
self – Celery task instance (bound task)
config_dict – VIPR configuration dictionary
result_id – UUID for result storage
- Returns:
dict containing task result status and result_id
- Raises:
Exception – Any error during VIPR execution