We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
celery==5.3.6 Django==4.1.6 amqp==5.2.0
Here is my worker run script:
celery -A app.celery worker --loglevel=info -P gevent -c 500 -Q websockets --without-gossip -E --without-mingle --without-heartbeat --prefetch-multiplier=1
and the 1 celery task
@async_task( acks_late=False, autoretry_for=(), ignore_result=True, expires=3600, ) def task_from_websockets(message): logger.info('task_from_websockets task executed, message: %s', message) ...
these run and more and more queues continue to appear on,why is this?> it is causing high ram usage.
related settings.py config:
RABBITMQ_USER = os.environ['RABBITMQ_USER'] RABBITMQ_PASS = os.environ['RABBITMQ_PASS'] RABBITMQ_PORT = os.environ['RABBITMQ_PORT'] RABBITMQ_HOST = os.environ['RABBITMQ_HOST'] RABBITMQ_VHOST = os.environ['RABBITMQ_VHOST'] CELERY_BROKER_URL = ( f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@' f'{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}' ) LONGEST_POSSIBLE_TASK_DURATION = ( 3600 * 24 * 7 ) # 7 days, adjust for Syncs maybe. CELERY_BROKER_TRANSPORT_OPTIONS = { 'visibility_timeout': LONGEST_POSSIBLE_TASK_DURATION } BROKER_URL = CELERY_BROKER_URL CELERY_IGNORE_RESULT = False # Must be False for chords to work. CELERY_DEFAULT_ACKS_LATE = True CELERY_TASK_ALLOW_ERROR_CB_ON_CHORD_HEADER = True REDIS_DB_CELERY = 2 CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {'master_name': REDIS_MASTER_NAME} CELERY_RESULT_BACKEND = ';'.join( [ f'sentinel://{item[0]}:{item[1]}/{REDIS_DB_CELERY}' for item in REDIS_SENTINELS ] ) CELERY_RESULT_PERSISTENT = True CELERY_TASK_RESULT_EXPIRES = ( CELERY_RESULT_EXPIRES ) = CELERY_AMQP_TASK_RESULT_EXPIRES = 18000 # 5 hrs. CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_WORKER_MAX_TASKS_PER_CHILD = 50 # CELERY_AMQP_TASK_RESULT_EXPIRES = 18000 # 5 hours. CELERY_BROKER_HEARTBEAT = 0 # This setting will ensure if the worker is killed/lost that upon worker # restart tasks are restarted CELERY_TASK_REJECT_ON_WORKER_LOST = True CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True CELERY_TASK_ALWAYS_EAGER = ( os.environ.get('CELERY_TASK_ALWAYS_EAGER') or 'false' ).lower() == 'true' QUEUE_CHANNEL_SYNCS = 'channel_syncs' QUEUE_CHANNEL_TRANSCODING = 'transcoding' QUEUE_CHANNEL_WEBSOCKETS = 'websockets' CELERY_TASK_ROUTES = CELERY_ROUTES = { 'sync': {'queue': QUEUE_CHANNEL_SYNCS}, 'task_from_websockets': {'queue': QUEUE_CHANNEL_WEBSOCKETS}, 'task_transcode': {'queue': QUEUE_CHANNEL_TRANSCODING}, 'task_channel_sync': {'queue': QUEUE_CHANNEL_SYNCS}, 'task_trigger_all_channel_syncs': {'queue': QUEUE_CHANNEL_SYNCS}, 'task_channel_syncs_on_error': {'queue': QUEUE_CHANNEL_SYNCS}, 'task_on_all_channel_syncs_completed': {'queue': QUEUE_CHANNEL_SYNCS}, 'task_notify_channel_sync_does_not_meet_min_requirements': { 'queue': QUEUE_CHANNEL_SYNCS }, 'task_notify_channel_sync_has_started': {'queue': QUEUE_CHANNEL_SYNCS}, }
The text was updated successfully, but these errors were encountered:
these queues have this expiry on them, which seems to not match the task result config expiry
CELERY_TASK_RESULT_EXPIRES = ( CELERY_RESULT_EXPIRES ) = CELERY_AMQP_TASK_RESULT_EXPIRES = 60*60*5 # 5 hrs
Sorry, something went wrong.
it seems that Celery setting 'result_expires' is not being used to delete these automatically.
No branches or pull requests
Here is my worker run script:
and the 1 celery task
these run and more and more queues continue to appear on,why is this?> it is causing high ram usage.
related settings.py config:
The text was updated successfully, but these errors were encountered: