Django, Django RestFramework 를 활용해 웹 어플리케이션을 개발하면서, 처리 시간이 오래 걸리는 API 를 개발해야하는 경우가 있습니다. 처리 시간이 지연되어 사용자의 대기 시간이 길어지면서 사용자 경험은 부정적으로 흘러갑니다. 이런 문제를 해결하기 위해 Celery를 Django app에 포함 시켰고, 사용 방법을 소개 합니다. 튜토리얼 방식으로 진행하며, Python 가상환경 설정 및 Django 설치 부터 차근차근 진행하겠습니다.
본 튜토리얼을 통해, 다음 내용을 얻으실 수 있습니다.
1. Celery를 어떤 경우에 사용하는지
2. Django app 에 Celery를 적용하고, Redis 를 Message Broker로 사용하기
3. Celery task_id 를 활용한 task 진행 상태, 결과 확인
4. Celery flower 를 활용한 monitoring
5. redis 에서 celery task 조회
Celery는 언제 사용하는가?
공식 문서에 따르면, Celery는 방대한 양의 message 를 처리하는 동시에 이러한 시스템을 유지 관리하는 데 필요한 도구를 제공하는 단순하고 유연하며 안정적인 분산 시스템입니다. application을 개발하면서 다음 두 가지 경우에 Celery를 사용했습니다.
1. Celery Worker : app과 독립적으로 실행할 수 있는 분산 프로세스로 시간이 오래 걸리는 작업을 분리합니다.
1) Data Preprocessing (전처리)
2) AI Model Train(학습)
3) AI Model Inference(추론)
4) Email 발송
2. Celery Beat : 주기적으로 동작해야하는 작업(Periodic task/job, Scheduling)
1) 주기적인 알림 메일 발송
2) 정책에 의해 백업/삭제 등 기능 실행
튜토리얼 목차
1. 개발 환경
2. python 가상환경, package 설치
3. redis 설치 (docker)
4. Django setting/실행
5. Celery 설정
6. Celery Worker example
7. Celery Beat exmaple
8. Celery Result
9. Monitoring : Celery Flower
10. Celery Test Code 작성(예정)
11. 맺음말
12. References
1. 개발 환경
OS : macOS Ventura
python : 3.10
Django : 4.1.4
Django RestFramework : 3.14.0
Celery : 5.2.7
django-celery-beat : 2.5.0
django-celery-results : 2.5.0
2. python 가상환경, package 설치
1) python virtualenv
2) Django, Redis, Celery python package 설치
- django와 django restframework 를 사용하여, RestfulAPI를 개발합니다.
- celery 로 worker를 구동하고, django-celery-beat는 scheduling에 사용합니다.
- django-celery-results 는 task 결과를 저장합니다.
3) Django project 생성
- project 이름을 config로 사용했으며, 원하시는 이름으로 변경하셔도 좋습니다.
- app 이름을 api로 사용했으며, 원하시는 이름으로 변경하셔도 좋습니다.
- 본 튜토리얼 전체에서 config 는 project 이름을 의미하며, api는 app 이름을 의미합니다.
4) Django project 구조
3. redis 설치 (docker)
Django app 과 Celery 사이의 Message Broker로 사용 될 Redis를 설치합니다. local 환경에서 Docker 로 띄웁니다.
4. Django setting/실행
django 에서 celery 를 실행하기 위해서 생성한 'django_celery_beat', 'django_celery_results' 를 app에 추가합니다. 최초 django project 생성시 만들었던 api app과 djangorestframework도 추가합니다.
그리고 Message Broker로 Redis를 사용하기 위해 CELERY_BROKER_URL 을 지정하고, Results를 저장하기 위해 CELERY_RESULT_BACKEND 를 설치한 redis url로 지정 합니다.
5. Celery 설정
Celery 설정을 위해 아래와 같이 프로젝트 파일을 수정하고, 추가 합니다.
config/celery.py
Celery 관련 설정을 Django에 하기 위해서 Celery library를 app을 지칭해 정의해야 합니다. (define as instance of the Celery library(called an "app"))
공식문서에 따르면, project_root/config{{project명}}/celery.py 에 Celery Instance를 정의하는 것을 권장합니다.
!!참고
"config.settings" 이 부분은 'celery' program 구동을 위한 Default Django settings module을 setting합니다.
저는 실제 프로젝트에서는 settings.py 를 base, dev, prod로 구분했습니다. 이름에서 알 수 있듯이, 소스코드가 동작하는 환경에 따라 settings를 분리 했는데요. - /config - /settings - base.py - dev.py - prod.py 그렇기 때문에 default setting을 위해 "config.settings.base" 경로로 선언한 것입니다. 다시 한 번 원리를 설명 드리면 "{project}.{celery setting이 위치한 파일}"
config/tasks.py
분산 비동기 task/job을 수행할 기능을 모아놓은 곳 입니다. celery task를 수행 시킬 기능을 이 곳에 작성하면 됩니다. @sharedtask 데코레이터를 추가해 Celery app이 데코레이터를 사용 할 수 있도록 합니다.
config/__init__.py
Django 가 시작될 때 app이 구동되도록 하여 @shared_task 데코레이터를 사용할 수 있도록 합니다.
6. Celery Worker example
Celery에 관한 기본 설정을 마치고 수행 할 task를 정의 했으니, 실제 Django App에서 API를 호출해 보겠습니다.
RestfulAPI 기능 추가를 위해 Proejct 에 아래와 같이 파일을 수정하고, 추가하시기 바랍니다.
api/views.py
view 에 task를 호출 하는 API를 추가합니다.
api/urls.py
api app에 urls.py 를 추가하고 아래와 같이 endpoint를 정의합니다.
config/urls.py
api app 내부에 생성한 urls.py 에 정의한 endpoint를 찾아갈 수 있도록 urlpatterns에 include 합니다.
Django App, Celery Worker 실행
2개의 터미널을 열고 각각 django app과 celery worker를 구동합니다.
API 호출
Endpoint: http://localhost:8000/api/test
API 호출 시 Celery 가 구동된 터미널의 console에 2+5의 결과인 7이 반환됨을 확인 할 수 있습니다.
7. Celery Beat exmaple
주기적으로 동작해야하는 작업을 task에 정의 하고 이를 celery beat가 scheduling 할 수 있습니다. 이를 위해 celery,py 와 tasks.py에 celery beat 관련 코드를 추가합니다. (tasks에 정의된 어떤 코드도 beat로 동작 시킬 수 있습니다.)
config/celery.py
api/tasks.py
Celery Beat 구동
새로운 터미널을 열어, 'celery -A config beat -l info' 명령어를 통해 Celery Beat를 구동합니다.
8. Celery Result
Celery Worker 를 사용한 다는 것은 대게 시간이 오래 걸리는 작업을 맡긴다는 것을 의미합니다. 그러다보면, worker에게 위임한 작업의 진행 상황을 알고 싶은 경우가 있습니다. 예를 들어, 모델 학습에 필요한 Image Data Preproecssing을 진행한다고 했을 때, 적게는 1시간에서 길게는 몇 일이 걸릴 수 있습니다. 이런 경우, Client에서 주기적으로 polling을 통해 상태를 확인 할 수 있도록 API를 개발하고 이를 호출 하여, Celery Task의 상태를 확인할 수 있도록 합니다.
앞서 만들었던 test API 를 task_id를 반환 하도록 수정하고, 이 task_id로부터 상태를 확인 할 수 있는 API를 개발 합니다.
config/settings.py
api/views.py
api/urls.py
API 실행
test API 를 호출해 "task_id" 를 받고, 받아온 "task_id" 를 tasks/<task_id> API 에 전달하여, 상태를 확인합니다.
9. Monitoring : Celery Flower
client에서 호출 하기 위해 task_id 를 받아오고 이를 활용해 status를 확인하는 방법도 있지만, 개발 환경에서 Debugging을 위해 모니터링이 필요한 경우도 있습니다. Flower를 활용해 celery task를 모니터링 하겠습니다.
사용법은 간단합니다. flower 를 설치하고 구동한 뒤 브라우저에서 접속하시면 됩니다. 아래 명령어를 실행 후, localhost:5566 으로 접속하시어, task를 확인하시기 바랍니다.
그 다음 http://localhost:5566/ 으로 접속하시면, 아래와 같이 Flower Dash 보드를 확인 하실 수 있습니다. 그리고 8번에서 확인했던 task_id와 상태를 모두 확인하실 수 있습니다.
10. Celery Test Code 작성(예정)
11. 맺음말
Celery Worker 를 활용해 AI 프로젝트에서 Data Preprocessing과 Model Train, Inference에 사용했고, Contract Lifecycle Management 프로젝트에서는 계약서 Inference 결과, 계약서 상태 변화에 따른 관련자 이메일 발송 등에 Celery Worker를, 주기적인 Remind 이메일 발송에 Celery Beat를 사용했습니다.
덕분에 프로젝트를 무사히 마칠 수 있었습니다. 다만, 아쉬웠던 점은 Celery 로 동작하는 기능에 대한 TestCode를 작성하지 못했다는 점 입니다. 해당 부분 보완하여, 사내 소스코드에도 반영하고 본 포스트에 업데이트 하도록 하겠습니다.
12. References
https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
https://testdriven.io/blog/django-and-celery/
https://realpython.com/asynchronous-tasks-with-django-and-celery/