vipr_api.streaming package¶
Submodules¶
vipr_api.streaming.rabbitmq_consumer module¶
RabbitMQ Consumer for Streaming Predictions
Handles RabbitMQ message consumption for real-time streaming predictions. Each subscan message triggers an independent Celery task for VIPR inference.
- class vipr_api.streaming.rabbitmq_consumer.StreamingConsumerManager¶
Bases:
objectManager for multiple streaming consumers.
This class handles lifecycle management of multiple consumers and provides centralized control and monitoring.
- get_all_stats() Dict[str, Dict[str, Any]]¶
Get statistics for all active consumers.
- Returns:
Statistics for all consumers
- Return type:
- get_consumer_stats(consumer_id: str) Dict[str, Any] | None¶
Get statistics for a specific consumer.
- Parameters:
consumer_id – ID of consumer
- Returns:
Consumer statistics, or None if not found
- Return type:
- get_consumer_tasks(consumer_id: str, limit: int | None = None, since: str | None = None) List[Dict[str, Any]] | None¶
Get task list for a specific consumer.
- Parameters:
consumer_id – ID of consumer
limit – Maximum number of tasks to return
since – ISO timestamp to filter tasks
- Returns:
Task list, or None if consumer not found
- Return type:
- start_consumer(config_dict: Dict[str, Any], rabbitmq_url: str = 'amqp://localhost:5672/', exchange_name: str = 'vipr.experiments', routing_pattern: str = 'reflectometry.growth.#') str¶
Start a new streaming consumer.
- Parameters:
config_dict – VIPR configuration for predictions
rabbitmq_url – RabbitMQ connection URL
exchange_name – Exchange name
routing_pattern – Routing key pattern
- Returns:
Consumer ID for tracking
- Return type:
- class vipr_api.streaming.rabbitmq_consumer.VIPRStreamingConsumer(config_dict: Dict[str, Any], rabbitmq_url: str = 'amqp://localhost:5672/', exchange_name: str = 'vipr.experiments', routing_pattern: str = 'reflectometry.growth.#')¶
Bases:
objectRabbitMQ Consumer for VIPR streaming predictions.
This consumer connects to RabbitMQ, filters relevant messages, and triggers Celery tasks for each spectrum measurement.
- get_stats() Dict[str, Any]¶
Get current consumer statistics.
- Returns:
Consumer statistics and status
- Return type:
- get_task_list(limit: int | None = None, since: str | None = None) List[Dict[str, Any]]¶
Get list of triggered tasks with metadata.
- Parameters:
limit – Maximum number of tasks to return (most recent first)
since – ISO timestamp to filter tasks triggered after this time
- Returns:
List of task metadata dictionaries
- Return type:
Module contents¶
VIPR Real-time Streaming Module
This module provides mock RabbitMQ producers and consumers for simulating real-time spectral data acquisition similar to NICOS experimental setups.
Components: - MockSpectralProducer: Generates NICOS-like messages from HDF5 datasets - Message formats compatible with experimental data notifications - RabbitMQ integration for realistic streaming simulation