리셋 되지 말자

celery 분리 구성 [2] 본문

프로젝트

celery 분리 구성 [2]

kyeongjun-dev 2021. 12. 15. 12:43

celery server, celery worker 분리 이슈

django에서 celery를 설정하고, 실행하면 task가 어떻게 등록되는지 확인해보자. 

- csapi/tasks.py

from celery import shared_task

@shared_task
def test_task():
    print('connect test task')
    return 'test'

현재 csapi 앱의 tasks.py에 test_task가 shared_task로 등록되어 있다.

 

- csapi/views.py

from django.shortcuts import render, redirect
from .forms import UploadForm
import uuid
import boto3
import botocore
from .tasks import test_task

def upload(request):
    if request.method == 'POST':
        file = request.FILES['origin_image']
        file_name = str(uuid.uuid1())
        bucket = 'csapi-bucket'
        content_type = file.content_type.split('/')[1]
        key = 'images/origin-images/' + file_name + '.' + content_type
        s3 = boto3.client('s3')
        response = s3.upload_fileobj(file, bucket, key, ExtraArgs = {'ContentType': content_type})
        test_task.delay()
        return redirect('csapi:image_show', file_name + '.' + content_type)

    else:
        form = UploadForm()
    return render(request, 'csapi/upload.html', {
        'form' : form
    })

현재 aws의 s3로 file을 업로드하는 함수에 test_task.delay()를 호출하여 실행하도록 해놓았다.

 

- celery_app.py

ls
Dockerfile  __pycache__  celery_app.py  csapi-boto.py  requirements.txt  tasks.py



from celery import Celery

app = Celery('celery', backend='rpc://', broker='amqp://guest:guest@192.168.1.167:5672//')

간단하게 celery 스크립트를 설정하고 celery를 실행해본다.

 

- celery 실행

celery -A celery_app worker --loglevel=DEBUG
(생략)
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

실행하면 등록된 task 목록을 확인할 수 있다. 여기서 tasks.py에 등록된 task가 등록이 되지 않았다. 이와 관련해서는 아래에서 다룬다.

 

- django에서 task 실행

실행을 하면, unregistered task라는 메시지가 출력된다. 이는 broker를 통해 전달된 task가 celery에 등록되어 있지 않기 때문이다.

 

celery app에 task 등록

- tasks 확인

celery -A celery_app worker --loglevel=DEBUG
(생략)
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

다시 tasks를 살펴보면, tasks.py에 등록된 task가 등록되지 않았다. 이를 등록하기 위해 수정한다.

 

- task include

from celery import Celery

app = Celery('celery', backend='rpc://', broker='amqp://guest:guest@192.168.1.167:5672//', include=['tasks'])

celery 인스턴스를 등록할 때, include를 파라미터로 추가한다.

 

- task 목록

celery -A celery_app worker --loglevel=DEBUG
(생략)
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . tasks.startAPI
  . tasks.test_task

tasks.py에 등록한 shared_task가 등록된 걸 확인할 수 있다.

 

task 등록 key

분명 동일한 이름의 task가 등록되어 있어도, unregistred task 에러가 발생한다.

 

- task Key 비교

# django의 task key
csapi.tasks.test_task

# 현재 celery의 task key
tasks.test_task

위처럼 등록된 task의 key가 다르다. 이를 맞춰줘야 한다.

 

- 디렉토리 구조 변경

root@ip-192-168-3-53:/celery# mkdir csapi
root@ip-192-168-3-53:/celery# mv celery_app.py tasks.py csapi/
root@ip-192-168-3-53:/celery# ls csapi
celery_app.py  tasks.py
root@ip-192-168-3-53:/celery# celery -A csapi.celery_app worker --loglevel=debug

위처럼 csapi 디렉토리를 생성하고, 위처럼 실행한다.

 

- celery_app.py

from celery import Celery

app = Celery('celery', backend='rpc://', broker='amqp://guest:guest@192.168.1.167:5672//', include=['csapi.tasks'])

celery worker를 실행하는 위치가 바뀌었으므로 include도 변경해준다. (실행위치 기준)

 

- celery worker 실행

celery -A csapi.celery_app worker --loglevel=debug
(생략)
[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . csapi.tasks.startAPI
  . csapi.tasks.test_task

등록된 task key가 변경된 것을 확인할 수 있다.

 

- django에서 celery task 수행

django에서 전달한 task가 celery에서 정상적으로 수행되는것을 확인할 수 있다. celery의 최종 디렉토리 구조는 아래와 같다.

 

root@ip-192-168-3-53:/celery# ls csapi/
celery_app.py  tasks.py

 

결과

디렉토리 구조와 worker 시작 커맨드가 수행되는 위치만 잘 감안하면 얼마든지 celery worker를 분리할 수 있다.

Comments