Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why is Celery creating 50,000+ RabbitMQ queues? #8795

Closed
richardARPANET opened this issue Jan 16, 2024 · 2 comments
Closed

Why is Celery creating 50,000+ RabbitMQ queues? #8795

richardARPANET opened this issue Jan 16, 2024 · 2 comments

Comments

@richardARPANET
Copy link

richardARPANET commented Jan 16, 2024

celery==5.3.6
Django==4.1.6
amqp==5.2.0

Here is my worker run script:

celery -A app.celery worker --loglevel=info -P gevent -c 500 -Q websockets --without-gossip -E --without-mingle --without-heartbeat --prefetch-multiplier=1

and the 1 celery task

@async_task(
    acks_late=False,
    autoretry_for=(),
    ignore_result=True,
    expires=3600,
)
def task_from_websockets(message):
    logger.info('task_from_websockets task executed, message: %s', message)
   ...

these run and more and more queues continue to appear on,why is this?> it is causing high ram usage.

image

related settings.py config:

RABBITMQ_USER = os.environ['RABBITMQ_USER']
RABBITMQ_PASS = os.environ['RABBITMQ_PASS']
RABBITMQ_PORT = os.environ['RABBITMQ_PORT']
RABBITMQ_HOST = os.environ['RABBITMQ_HOST']
RABBITMQ_VHOST = os.environ['RABBITMQ_VHOST']
CELERY_BROKER_URL = (
    f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@'
    f'{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}'
)
LONGEST_POSSIBLE_TASK_DURATION = (
    3600 * 24 * 7
)  # 7 days, adjust for Syncs maybe.
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'visibility_timeout': LONGEST_POSSIBLE_TASK_DURATION
}
BROKER_URL = CELERY_BROKER_URL
CELERY_IGNORE_RESULT = False  # Must be False for chords to work.
CELERY_DEFAULT_ACKS_LATE = True
CELERY_TASK_ALLOW_ERROR_CB_ON_CHORD_HEADER = True

REDIS_DB_CELERY = 2
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {'master_name': REDIS_MASTER_NAME}
CELERY_RESULT_BACKEND = ';'.join(
    [
        f'sentinel://{item[0]}:{item[1]}/{REDIS_DB_CELERY}'
        for item in REDIS_SENTINELS
    ]
)
CELERY_RESULT_PERSISTENT = True
CELERY_TASK_RESULT_EXPIRES = (
    CELERY_RESULT_EXPIRES
) = CELERY_AMQP_TASK_RESULT_EXPIRES = 18000  # 5 hrs.
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_WORKER_MAX_TASKS_PER_CHILD = 50
# CELERY_AMQP_TASK_RESULT_EXPIRES = 18000  # 5 hours.
CELERY_BROKER_HEARTBEAT = 0
# This setting will ensure if the worker is killed/lost that upon worker
# restart tasks are restarted
CELERY_TASK_REJECT_ON_WORKER_LOST = True
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True
CELERY_TASK_ALWAYS_EAGER = (
    os.environ.get('CELERY_TASK_ALWAYS_EAGER') or 'false'
).lower() == 'true'
QUEUE_CHANNEL_SYNCS = 'channel_syncs'
QUEUE_CHANNEL_TRANSCODING = 'transcoding'
QUEUE_CHANNEL_WEBSOCKETS = 'websockets'
CELERY_TASK_ROUTES = CELERY_ROUTES = {
    'sync': {'queue': QUEUE_CHANNEL_SYNCS},
    'task_from_websockets': {'queue': QUEUE_CHANNEL_WEBSOCKETS},
    'task_transcode': {'queue': QUEUE_CHANNEL_TRANSCODING},
    'task_channel_sync': {'queue': QUEUE_CHANNEL_SYNCS},
    'task_trigger_all_channel_syncs': {'queue': QUEUE_CHANNEL_SYNCS},
    'task_channel_syncs_on_error': {'queue': QUEUE_CHANNEL_SYNCS},
    'task_on_all_channel_syncs_completed': {'queue': QUEUE_CHANNEL_SYNCS},
    'task_notify_channel_sync_does_not_meet_min_requirements': {
        'queue': QUEUE_CHANNEL_SYNCS
    },
    'task_notify_channel_sync_has_started': {'queue': QUEUE_CHANNEL_SYNCS},
}
@richardARPANET
Copy link
Author

these queues have this expiry on them, which seems to not match the task result config expiry

CELERY_TASK_RESULT_EXPIRES = (
    CELERY_RESULT_EXPIRES
) = CELERY_AMQP_TASK_RESULT_EXPIRES = 60*60*5  # 5 hrs

image

@richardARPANET
Copy link
Author

it seems that Celery setting 'result_expires' is not being used to delete these automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant