vipr_api.celery package

Subpackages

Submodules

vipr_api.celery.celery_app module

VIPR Framework Celery Application

Celery integration for vipr-framework to handle background VIPR inference tasks. This resolves the signal handler issue by running VIPR in dedicated worker processes.

Based on ms_daemon pattern but integrated into vipr-framework for simpler 3-tier architecture.

vipr_api.celery.celery_app.get_bool_env(key: str, default: bool = False) bool

Convert environment variable to boolean using JSON parsing.

vipr_api.celery.celery_app.get_broker_url() str

Get broker URL from environment variables with Redis fallback.

vipr_api.celery.celery_app.get_redis_url(db: int = 0) str

Build Redis URL from environment variables.

Parameters:

db – Redis database number (0 for broker, 1 for results)

Returns:

Complete Redis URL

Return type:

str

vipr_api.celery.celery_app.get_result_backend() str

Get result backend URL from environment variables with Redis fallback.

vipr_api.celery.daemon_task module

VIPRTask - Base Task Class for VIPR Framework Celery Workers

Simplified version of ms_daemon’s DaemonTask, optimized for VIPR use case: - No database integration (not needed for VIPR) - Optional Sentry integration (can be added later) - Structured logging with task lifecycle hooks - Error handling and cleanup

Integrated into vipr-framework for 3-tier architecture.

class vipr_api.celery.daemon_task.VIPRTask

Bases: Task

Base task class for all VIPR framework background tasks.

Simplified version of ms_daemon’s DaemonTask: - No database connection management (not needed for VIPR) - No Sentry integration (optional for future) - Focused on VIPR Runner execution and logging

after_return(status: str, retval: Any, task_id: str, args: Tuple, kwargs: Dict, einfo: Any | None) None

Handler called after the task returns.

Parameters:
  • status – Current task state

  • retval – Task return value/exception

  • task_id – Unique id of the task

  • args – Original arguments for the task

  • kwargs – Original keyword arguments for the task

  • einfo – Exception information

before_start(task_id: str, args: Tuple, kwargs: Dict) None

Handler called before the task starts.

Parameters:
  • task_id – Unique id of the task to execute

  • args – Original arguments for the task to execute

  • kwargs – Original keyword arguments for the task to execute

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.

on_failure(exc: Exception, task_id: str, args: Tuple, kwargs: Dict, einfo: Any) None

Error handler called when the task fails.

Parameters:
  • exc – The exception raised by the task

  • task_id – Unique id of the failed task

  • args – Original arguments for the task that failed

  • kwargs – Original keyword arguments for the task that failed

  • einfo – Exception information

on_retry(exc: Exception, task_id: str, args: Tuple, kwargs: Dict, einfo: Any) None

Retry handler called when the task is to be retried.

Parameters:
  • exc – The exception sent to retry

  • task_id – Unique id of the retried task

  • args – Original arguments for the retried task

  • kwargs – Original keyword arguments for the retried task

  • einfo – Exception information

on_success(retval: Any, task_id: str, args: Tuple, kwargs: Dict) None

Success handler called when the task executes successfully.

Parameters:
  • retval – The return value of the task

  • task_id – Unique id of the executed task

  • args – Original arguments for the executed task

  • kwargs – Original keyword arguments for the executed task

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = None

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

static stringify_kwargs(**kwargs) str

Convert kwargs to readable string for logging. Removes ‘self’ reference for cleaner logs.

Parameters:

**kwargs – Keyword arguments to stringify

Returns:

Formatted string of key=value pairs

Return type:

str

track_started = False

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the :setting:`task_track_started` setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

Module contents

VIPR Daemon - Background Task Processing Service

A Celery-based microservice for executing VIPR inference tasks in dedicated worker processes. Follows ms_daemon architecture pattern - separate microservice for VIPR processing.

This resolves the signal handler issue where VIPR Runner requires main thread access while FastAPI needs non-blocking execution.