vipr_api.celery package¶
Subpackages¶
Submodules¶
vipr_api.celery.celery_app module¶
VIPR Framework Celery Application
Celery integration for vipr-framework to handle background VIPR inference tasks. This resolves the signal handler issue by running VIPR in dedicated worker processes.
Based on ms_daemon pattern but integrated into vipr-framework for simpler 3-tier architecture.
- vipr_api.celery.celery_app.get_bool_env(key: str, default: bool = False) bool¶
Convert environment variable to boolean using JSON parsing.
- vipr_api.celery.celery_app.get_broker_url() str¶
Get broker URL from environment variables with Redis fallback.
vipr_api.celery.daemon_task module¶
VIPRTask - Base Task Class for VIPR Framework Celery Workers
Simplified version of ms_daemon’s DaemonTask, optimized for VIPR use case: - No database integration (not needed for VIPR) - Optional Sentry integration (can be added later) - Structured logging with task lifecycle hooks - Error handling and cleanup
Integrated into vipr-framework for 3-tier architecture.
- class vipr_api.celery.daemon_task.VIPRTask¶
Bases:
TaskBase task class for all VIPR framework background tasks.
Simplified version of ms_daemon’s DaemonTask: - No database connection management (not needed for VIPR) - No Sentry integration (optional for future) - Focused on VIPR Runner execution and logging
- after_return(status: str, retval: Any, task_id: str, args: Tuple, kwargs: Dict, einfo: Any | None) None¶
Handler called after the task returns.
- Parameters:
status – Current task state
retval – Task return value/exception
task_id – Unique id of the task
args – Original arguments for the task
kwargs – Original keyword arguments for the task
einfo – Exception information
- before_start(task_id: str, args: Tuple, kwargs: Dict) None¶
Handler called before the task starts.
- Parameters:
task_id – Unique id of the task to execute
args – Original arguments for the task to execute
kwargs – Original keyword arguments for the task to execute
- ignore_result = False¶
If enabled the worker won’t store task state and return values for this task. Defaults to the :setting:`task_ignore_result` setting.
- on_failure(exc: Exception, task_id: str, args: Tuple, kwargs: Dict, einfo: Any) None¶
Error handler called when the task fails.
- Parameters:
exc – The exception raised by the task
task_id – Unique id of the failed task
args – Original arguments for the task that failed
kwargs – Original keyword arguments for the task that failed
einfo – Exception information
- on_retry(exc: Exception, task_id: str, args: Tuple, kwargs: Dict, einfo: Any) None¶
Retry handler called when the task is to be retried.
- Parameters:
exc – The exception sent to retry
task_id – Unique id of the retried task
args – Original arguments for the retried task
kwargs – Original keyword arguments for the retried task
einfo – Exception information
- on_success(retval: Any, task_id: str, args: Tuple, kwargs: Dict) None¶
Success handler called when the task executes successfully.
- Parameters:
retval – The return value of the task
task_id – Unique id of the executed task
args – Original arguments for the executed task
kwargs – Original keyword arguments for the executed task
- priority = None¶
Default task priority.
- rate_limit = None¶
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = None¶
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>¶
Task request stack, the current request will be the topmost.
- serializer = 'json'¶
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False¶
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- static stringify_kwargs(**kwargs) str¶
Convert kwargs to readable string for logging. Removes ‘self’ reference for cleaner logs.
- Parameters:
**kwargs – Keyword arguments to stringify
- Returns:
Formatted string of key=value pairs
- Return type:
- track_started = False¶
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the :setting:`task_track_started` setting.
- typing = True¶
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
Module contents¶
VIPR Daemon - Background Task Processing Service
A Celery-based microservice for executing VIPR inference tasks in dedicated worker processes. Follows ms_daemon architecture pattern - separate microservice for VIPR processing.
This resolves the signal handler issue where VIPR Runner requires main thread access while FastAPI needs non-blocking execution.