vipr_api.streaming package

Submodules

vipr_api.streaming.rabbitmq_consumer module

RabbitMQ Consumer for Streaming Predictions

Handles RabbitMQ message consumption for real-time streaming predictions. Each subscan message triggers an independent Celery task for VIPR inference.

class vipr_api.streaming.rabbitmq_consumer.StreamingConsumerManager

Bases: object

Manager for multiple streaming consumers.

This class handles lifecycle management of multiple consumers and provides centralized control and monitoring.

get_all_stats() Dict[str, Dict[str, Any]]

Get statistics for all active consumers.

Returns:

Statistics for all consumers

Return type:

dict

get_consumer_stats(consumer_id: str) Dict[str, Any] | None

Get statistics for a specific consumer.

Parameters:

consumer_id – ID of consumer

Returns:

Consumer statistics, or None if not found

Return type:

dict

get_consumer_tasks(consumer_id: str, limit: int | None = None, since: str | None = None) List[Dict[str, Any]] | None

Get task list for a specific consumer.

Parameters:
  • consumer_id – ID of consumer

  • limit – Maximum number of tasks to return

  • since – ISO timestamp to filter tasks

Returns:

Task list, or None if consumer not found

Return type:

list

list_consumers() List[str]

List all active consumer IDs.

Returns:

Consumer IDs

Return type:

list

start_consumer(config_dict: Dict[str, Any], rabbitmq_url: str = 'amqp://localhost:5672/', exchange_name: str = 'vipr.experiments', routing_pattern: str = 'reflectometry.growth.#') str

Start a new streaming consumer.

Parameters:
  • config_dict – VIPR configuration for predictions

  • rabbitmq_url – RabbitMQ connection URL

  • exchange_name – Exchange name

  • routing_pattern – Routing key pattern

Returns:

Consumer ID for tracking

Return type:

str

stop_all_consumers() Dict[str, Dict[str, Any]]

Stop all active consumers.

Returns:

Final statistics for all consumers

Return type:

dict

stop_consumer(consumer_id: str) Dict[str, Any] | None

Stop a specific consumer.

Parameters:

consumer_id – ID of consumer to stop

Returns:

Final consumer statistics, or None if consumer not found

Return type:

dict

class vipr_api.streaming.rabbitmq_consumer.VIPRStreamingConsumer(config_dict: Dict[str, Any], rabbitmq_url: str = 'amqp://localhost:5672/', exchange_name: str = 'vipr.experiments', routing_pattern: str = 'reflectometry.growth.#')

Bases: object

RabbitMQ Consumer for VIPR streaming predictions.

This consumer connects to RabbitMQ, filters relevant messages, and triggers Celery tasks for each spectrum measurement.

get_stats() Dict[str, Any]

Get current consumer statistics.

Returns:

Consumer statistics and status

Return type:

dict

get_task_list(limit: int | None = None, since: str | None = None) List[Dict[str, Any]]

Get list of triggered tasks with metadata.

Parameters:
  • limit – Maximum number of tasks to return (most recent first)

  • since – ISO timestamp to filter tasks triggered after this time

Returns:

List of task metadata dictionaries

Return type:

list

start_consuming() str

Start consuming RabbitMQ messages in a separate thread.

Returns:

Consumer ID for tracking and management

Return type:

str

Raises:

Exception – If consumer startup fails

stop_consuming() Dict[str, Any]

Stop the RabbitMQ consumer and return final statistics.

Returns:

Final consumer statistics

Return type:

dict

Module contents

VIPR Real-time Streaming Module

This module provides mock RabbitMQ producers and consumers for simulating real-time spectral data acquisition similar to NICOS experimental setups.

Components: - MockSpectralProducer: Generates NICOS-like messages from HDF5 datasets - Message formats compatible with experimental data notifications - RabbitMQ integration for realistic streaming simulation