Skip to content

KR_Celery

somaz edited this page Apr 22, 2025 · 4 revisions

Python Celery 개념 정리


1️⃣ Celery 기본 설정

Celery는 분산 작업 큐를 제공하는 비동기 작업 큐이다.

# tasks.py
from celery import Celery

# Celery 인스턴스 생성
app = Celery('tasks',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/0')

# 기본 태스크 정의
@app.task
def add(x: int, y: int) -> int:
    return x + y

# 태스크 설정
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Seoul',
    enable_utc=True,
)

특징:

  • Redis/RabbitMQ 브로커 지원
  • 비동기 작업 처리
  • 유연한 설정 관리
  • 분산 작업 처리
  • 결과 백엔드 저장


2️⃣ 태스크 정의와 실행

작업을 태스크로 정의하고 비동기적으로 실행하는 방법이다.

from celery import Task
from typing import Any, Dict

class CustomTask(Task):
    def on_success(self, retval: Any, task_id: str, args: tuple, kwargs: Dict) -> None:
        print(f"Task {task_id} completed successfully")
    
    def on_failure(self, exc: Exception, task_id: str, args: tuple, kwargs: Dict, einfo: Any) -> None:
        print(f"Task {task_id} failed: {exc}")

@app.task(base=CustomTask, bind=True)
def process_data(self, data: Dict) -> Dict:
    try:
        # 데이터 처리 로직
        result = {'processed': data['value'] * 2}
        return result
    except Exception as e:
        self.retry(exc=e, countdown=60)  # 1분 후 재시도

# 태스크 체이닝
from celery import chain

@app.task
def validate_data(data: Dict) -> Dict:
    if 'value' not in data:
        raise ValueError("Missing 'value' key")
    return data

# 태스크 체인 실행
task_chain = chain(
    validate_data.s({'value': 10}),
    process_data.s()
)
result = task_chain()

특징:

  • 커스텀 태스크 클래스
  • 태스크 체이닝
  • 에러 처리와 재시도
  • 태스크 상태 추적
  • 시그니처를 통한 인자 전달


3️⃣ 주기적 태스크 설정

일정 주기로 자동 실행되는 태스크를 설정하는 방법이다.

from celery.schedules import crontab

app.conf.beat_schedule = {
    'daily-cleanup': {
        'task': 'tasks.cleanup',
        'schedule': crontab(hour=0, minute=0),  # 매일 자정
    },
    'hourly-check': {
        'task': 'tasks.health_check',
        'schedule': 3600.0,  # 1시간마다
        'args': ('system',)
    }
}

@app.task
def cleanup():
    """일일 정리 작업"""
    # 정리 작업 수행
    pass

@app.task
def health_check(system: str):
    """시스템 상태 확인"""
    # 상태 확인 로직
    pass

특징:

  • Crontab 스케줄링
  • 주기적 작업 실행
  • 인자 전달 지원
  • 다양한 시간 단위 지원
  • 동적 스케줄 변경 가능


4️⃣ 에러 처리와 재시도

태스크 실행 중 발생하는 오류를 처리하고 재시도하는 메커니즘이다.

from celery.exceptions import MaxRetriesExceededError
from typing import Optional

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError,),
    retry_backoff=True
)
def process_with_retry(self, data: Dict) -> Optional[Dict]:
    try:
        # 처리 로직
        result = external_api_call(data)
        return result
    except ConnectionError as exc:
        # 자동 재시도
        raise self.retry(exc=exc)
    except Exception as exc:
        # 다른 예외 처리
        self.update_state(state='FAILURE', meta={'error': str(exc)})
        return None

class TaskManager:
    def __init__(self):
        self.tasks = {}
    
    def register_task(self, task_id: str, task):
        self.tasks[task_id] = task
    
    def revoke_task(self, task_id: str):
        if task_id in self.tasks:
            app.control.revoke(task_id, terminate=True)
            del self.tasks[task_id]

특징:

  • 자동 재시도 설정
  • 상세한 에러 처리
  • 태스크 상태 관리
  • 지수 백오프 지원
  • 태스크 취소 기능


5️⃣ 워커 설정과 모니터링

Celery 워커의 설정과 모니터링을 위한 방법이다.

from celery.signals import worker_ready, worker_shutting_down
from prometheus_client import Counter, Gauge

# 메트릭 정의
task_counter = Counter('celery_tasks_total', 'Total number of Celery tasks')
task_latency = Gauge('celery_task_latency_seconds', 'Task processing latency')

@worker_ready.connect
def worker_ready_handler(**kwargs):
    print("Celery worker is ready!")

@worker_shutting_down.connect
def worker_shutdown_handler(**kwargs):
    print("Celery worker is shutting down...")

class MonitoredTask(Task):
    def __call__(self, *args, **kwargs):
        task_counter.inc()
        with task_latency.time():
            return super().__call__(*args, **kwargs)

@app.task(base=MonitoredTask)
def monitored_task():
    # 작업 수행
    pass

특징:

  • 워커 생명주기 관리
  • 메트릭 수집
  • 모니터링 통합
  • 시그널 처리
  • 워커 컨트롤 명령


6️⃣ 실용적인 예제

Celery를 사용한 실제 애플리케이션 구현 예제를 살펴보자.

이메일 발송 시스템:

from celery import group
from typing import List, Dict
import smtplib
from email.mime.text import MIMEText

@app.task(rate_limit='100/m')  # 분당 100개 제한
def send_email(to_email: str, subject: str, body: str) -> bool:
    try:
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['To'] = to_email
        
        with smtplib.SMTP('smtp.gmail.com', 587) as server:
            server.starttls()
            server.login('your@email.com', 'password')
            server.send_message(msg)
        return True
    except Exception as e:
        print(f"Failed to send email: {e}")
        return False

def send_bulk_emails(email_list: List[Dict]):
    # 병렬 처리로 이메일 발송
    tasks = group(
        send_email.s(
            email['to'],
            email['subject'],
            email['body']
        ) for email in email_list
    )
    result = tasks.apply_async()
    return result

파일 처리 시스템:

import os
from PIL import Image
from typing import Tuple

@app.task
def process_image(image_path: str, size: Tuple[int, int]) -> str:
    try:
        with Image.open(image_path) as img:
            # 이미지 리사이징
            img.thumbnail(size)
            
            # 저장 경로 생성
            filename = os.path.basename(image_path)
            output_path = f"processed_{filename}"
            
            # 처리된 이미지 저장
            img.save(output_path)
            return output_path
    except Exception as e:
        print(f"Image processing failed: {e}")
        raise

class ImageProcessor:
    def __init__(self):
        self.processing_queue = []
    
    def add_image(self, image_path: str, size: Tuple[int, int]):
        task = process_image.delay(image_path, size)
        self.processing_queue.append(task)
    
    def get_results(self):
        return [task.get() for task in self.processing_queue if task.ready()]

데이터 처리 파이프라인:

from celery import chord
import pandas as pd
import numpy as np
from typing import List, Dict, Any

@app.task
def extract_data(source: str) -> pd.DataFrame:
    """데이터 소스에서 데이터 추출"""
    if source.endswith('.csv'):
        return pd.read_csv(source)
    elif source.endswith('.json'):
        return pd.read_json(source)
    else:
        raise ValueError(f"Unsupported source format: {source}")

@app.task
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """데이터 변환 및 전처리"""
    # 결측치 처리
    df = df.fillna(0)
    
    # 특성 엔지니어링
    if 'date' in df.columns:
        df['year'] = pd.to_datetime(df['date']).dt.year
        df['month'] = pd.to_datetime(df['date']).dt.month
    
    # 이상치 처리
    for col in df.select_dtypes(include=[np.number]).columns:
        q1 = df[col].quantile(0.25)
        q3 = df[col].quantile(0.75)
        iqr = q3 - q1
        df[col] = df[col].clip(q1 - 1.5 * iqr, q3 + 1.5 * iqr)
    
    return df

@app.task
def load_data(df: pd.DataFrame, destination: str) -> bool:
    """처리된 데이터 저장"""
    if destination.endswith('.csv'):
        df.to_csv(destination, index=False)
    elif destination.endswith('.json'):
        df.to_json(destination, orient='records')
    else:
        raise ValueError(f"Unsupported destination format: {destination}")
    return True

@app.task
def notify_completion(results: List[Any]) -> Dict:
    """모든 작업 완료 후 알림"""
    return {
        'status': 'completed',
        'processed_files': len(results),
        'success_count': sum(1 for r in results if r)
    }

def process_data_pipeline(sources: List[str], destination_prefix: str):
    """ETL 파이프라인 실행"""
    # 병렬 추출 및 변환 작업
    tasks = []
    for i, source in enumerate(sources):
        destination = f"{destination_prefix}_{i}.csv"
        
        # 개별 ETL 파이프라인
        pipeline = chain(
            extract_data.s(source),
            transform_data.s(),
            load_data.s(destination)
        )
        tasks.append(pipeline)
    
    # chord를 사용하여 모든 파이프라인 완료 후 알림
    workflow = chord(tasks)(notify_completion.s())
    return workflow

특징:

  • 대량 작업의 병렬 처리
  • 작업 속도 제한
  • 복잡한 워크플로우 구성
  • 이벤트 기반 처리
  • 자원 효율적 사용
  • 확장성 있는 설계

7️⃣ 고급 Celery 패턴

복잡한 분산 시스템을 위한 고급 Celery 패턴이다.

우선순위 큐 설정:

# 우선순위 큐 설정
app.conf.task_routes = {
    'tasks.high_priority': {'queue': 'high_priority'},
    'tasks.default_priority': {'queue': 'default'},
    'tasks.low_priority': {'queue': 'low_priority'},
}

@app.task(queue='high_priority')
def high_priority(data):
    """긴급 처리가 필요한 작업"""
    # 고우선순위 작업 처리
    return "High priority task completed"

@app.task(queue='default')
def default_priority(data):
    """일반 작업"""
    # 일반 작업 처리
    return "Default task completed"

@app.task(queue='low_priority')
def low_priority(data):
    """배경 작업"""
    # 저우선순위 작업 처리
    return "Low priority task completed"

# 워커 시작 명령어 예시:
# celery -A tasks worker -Q high_priority,default,low_priority -l info

워커 풀 최적화:

# celeryconfig.py
worker_concurrency = 8  # CPU 코어 수에 맞춤
worker_prefetch_multiplier = 1  # 작업당 하나씩만 가져오도록 설정
worker_max_tasks_per_child = 1000  # 메모리 누수 방지
task_time_limit = 3600  # 1시간 제한
task_soft_time_limit = 3000  # 소프트 제한 50분

# 작업 종류별 최적화
task_annotations = {
    'tasks.cpu_intensive': {'pool': 'solo'},  # CPU 작업은 별도 프로세스
    'tasks.io_intensive': {'pool': 'gevent', 'rate_limit': '100/m'},  # I/O 작업은 gevent
}

# 데드레터 큐 설정
task_reject_on_worker_lost = True
task_acks_late = True

캔버스 워크플로우:

from celery import group, chain, chord, signature

@app.task
def analysis_task(data_chunk):
    # 데이터 분석 작업
    return {'chunk_id': data_chunk['id'], 'result': len(data_chunk['data'])}

@app.task
def reduce_results(results):
    # 결과 집계
    total = sum(result['result'] for result in results)
    return {'total_count': total}

def map_reduce_workflow(data_chunks):
    """맵리듀스 패턴 구현"""
    # 맵 단계 (병렬 처리)
    map_tasks = group(analysis_task.s(chunk) for chunk in data_chunks)
    
    # 리듀스 단계 (결과 집계)
    workflow = chord(map_tasks)(reduce_results.s())
    
    return workflow

# 동적 워크플로우 생성
def create_dynamic_workflow(initial_data):
    """실행 시점에 동적으로 워크플로우 생성"""
    if initial_data['type'] == 'simple':
        return simple_task.s(initial_data)
    elif initial_data['type'] == 'complex':
        # 복잡한 체인 생성
        tasks = [initial_task.s(initial_data)]
        
        # 조건에 따라 다른 태스크 추가
        if initial_data.get('needs_validation'):
            tasks.append(validation_task.s())
        
        if initial_data.get('needs_processing'):
            tasks.append(processing_task.s())
        
        # 결과 형식 지정
        tasks.append(format_result.s())
        
        # 동적 체인 생성
        return chain(*tasks)
    else:
        raise ValueError(f"Unknown workflow type: {initial_data['type']}")

특징:

  • 우선순위 기반 작업 처리
  • 리소스별 워커 최적화
  • 복잡한 워크플로우 패턴
  • 맵리듀스 구현
  • 동적 워크플로우 구성

8️⃣ 운영 환경 구성

프로덕션 환경에서 Celery를 안정적으로 운영하기 위한 구성이다.

감시와 자동 재시작:

# supervisord.conf
[program:celery]
command=/path/to/venv/bin/celery -A tasks worker --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999

[program:celerybeat]
command=/path/to/venv/bin/celery -A tasks beat --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999

[group:celery-cluster]
programs=celery,celerybeat
priority=999

프로덕션 설정:

# celeryconfig_prod.py
import os
from kombu.common import Broadcast, Queue

# 브로커 및 백엔드 설정
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')

# 보안 설정
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
enable_utc = True

# 큐 설정
task_default_queue = 'default'
task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('high_priority', routing_key='high_task.#'),
    Queue('low_priority', routing_key='low_task.#'),
    Broadcast('broadcast'),  # 브로드캐스트 큐
)

# 라우팅 설정
task_routes = {
    'tasks.critical_task': {'queue': 'high_priority', 'routing_key': 'high_task.critical'},
    'tasks.background_task': {'queue': 'low_priority', 'routing_key': 'low_task.background'},
}

# 워커 설정
worker_concurrency = int(os.environ.get('CELERY_CONCURRENCY', 8))
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000

# 로깅 설정
worker_hijack_root_logger = False
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'

# 에러 처리
task_acks_late = True
task_reject_on_worker_lost = True
task_acks_on_failure_or_timeout = False

# 모니터링
worker_send_task_events = True
task_send_sent_event = True

도커 컴포즈 구성:

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

  celery-worker:
    build: .
    command: celery -A tasks worker --loglevel=INFO
    volumes:
      - .:/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - CELERY_CONCURRENCY=8
    depends_on:
      - redis
    restart: unless-stopped

  celery-beat:
    build: .
    command: celery -A tasks beat --loglevel=INFO
    volumes:
      - .:/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis
    restart: unless-stopped

  flower:
    build: .
    command: celery -A tasks flower --port=5555
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis
      - celery-worker
    restart: unless-stopped

volumes:
  redis-data:

특징:

  • 고가용성 구성
  • 컨테이너화된 배포
  • 감시 및 로깅 설정
  • 큐 및 라우팅 최적화
  • 확장 가능한 아키텍처
  • 보안 설정
  • 리소스 관리

주요 팁

모범 사례:

  • 멱등성 보장: 태스크는 여러 번 실행해도 동일한 결과가 나오도록 설계
  • 적절한 큐 분리: 작업 특성에 따라 큐를 분리하여 자원 경쟁 방지
  • 데이터 직렬화: 태스크 데이터는 최소한으로 유지하고 직렬화 가능해야 함
  • 실행 시간 모니터링: 오래 실행되는 태스크는 모니터링하고 시간 제한 설정
  • 메모리 관리: worker_max_tasks_per_child로 메모리 누수 방지
  • 재시도 전략: 일시적 오류는 지수 백오프로 재시도, 영구적 오류는 데드레터 큐로 이동
  • 태스크 취소 처리: 태스크 취소 시 리소스 정리 로직 구현
  • 로깅 전략: 구조화된 로깅으로 태스크 추적 용이하게 설정
  • 백엔드 선택: 데이터 양에 따라 적절한 결과 백엔드 선택 (Redis, RabbitMQ, 데이터베이스)
  • 브로커 고가용성: 중요 시스템은 브로커 클러스터링 구성
  • 워커 확장: 로드에 따라 워커 수를 동적으로 조정하는 메커니즘 구현
  • 모니터링 도구 활용: Flower, Prometheus 등으로 실시간 모니터링
  • 주기적 정리: 완료된 태스크 결과 정기적으로 정리
  • 태스크 타임아웃: 모든 태스크에 타임아웃 설정
  • 보안 고려: 브로커 및 결과 백엔드에 인증 설정


</rewritten_file>

Clone this wiki locally