-
Notifications
You must be signed in to change notification settings - Fork 0
KR_Celery
somaz edited this page Apr 22, 2025
·
4 revisions
Celery는 분산 작업 큐를 제공하는 비동기 작업 큐이다.
# tasks.py
from celery import Celery
# Celery 인스턴스 생성
app = Celery('tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0')
# 기본 태스크 정의
@app.task
def add(x: int, y: int) -> int:
return x + y
# 태스크 설정
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
)✅ 특징:
- Redis/RabbitMQ 브로커 지원
- 비동기 작업 처리
- 유연한 설정 관리
- 분산 작업 처리
- 결과 백엔드 저장
작업을 태스크로 정의하고 비동기적으로 실행하는 방법이다.
from celery import Task
from typing import Any, Dict
class CustomTask(Task):
def on_success(self, retval: Any, task_id: str, args: tuple, kwargs: Dict) -> None:
print(f"Task {task_id} completed successfully")
def on_failure(self, exc: Exception, task_id: str, args: tuple, kwargs: Dict, einfo: Any) -> None:
print(f"Task {task_id} failed: {exc}")
@app.task(base=CustomTask, bind=True)
def process_data(self, data: Dict) -> Dict:
try:
# 데이터 처리 로직
result = {'processed': data['value'] * 2}
return result
except Exception as e:
self.retry(exc=e, countdown=60) # 1분 후 재시도
# 태스크 체이닝
from celery import chain
@app.task
def validate_data(data: Dict) -> Dict:
if 'value' not in data:
raise ValueError("Missing 'value' key")
return data
# 태스크 체인 실행
task_chain = chain(
validate_data.s({'value': 10}),
process_data.s()
)
result = task_chain()✅ 특징:
- 커스텀 태스크 클래스
- 태스크 체이닝
- 에러 처리와 재시도
- 태스크 상태 추적
- 시그니처를 통한 인자 전달
일정 주기로 자동 실행되는 태스크를 설정하는 방법이다.
from celery.schedules import crontab
app.conf.beat_schedule = {
'daily-cleanup': {
'task': 'tasks.cleanup',
'schedule': crontab(hour=0, minute=0), # 매일 자정
},
'hourly-check': {
'task': 'tasks.health_check',
'schedule': 3600.0, # 1시간마다
'args': ('system',)
}
}
@app.task
def cleanup():
"""일일 정리 작업"""
# 정리 작업 수행
pass
@app.task
def health_check(system: str):
"""시스템 상태 확인"""
# 상태 확인 로직
pass✅ 특징:
- Crontab 스케줄링
- 주기적 작업 실행
- 인자 전달 지원
- 다양한 시간 단위 지원
- 동적 스케줄 변경 가능
태스크 실행 중 발생하는 오류를 처리하고 재시도하는 메커니즘이다.
from celery.exceptions import MaxRetriesExceededError
from typing import Optional
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError,),
retry_backoff=True
)
def process_with_retry(self, data: Dict) -> Optional[Dict]:
try:
# 처리 로직
result = external_api_call(data)
return result
except ConnectionError as exc:
# 자동 재시도
raise self.retry(exc=exc)
except Exception as exc:
# 다른 예외 처리
self.update_state(state='FAILURE', meta={'error': str(exc)})
return None
class TaskManager:
def __init__(self):
self.tasks = {}
def register_task(self, task_id: str, task):
self.tasks[task_id] = task
def revoke_task(self, task_id: str):
if task_id in self.tasks:
app.control.revoke(task_id, terminate=True)
del self.tasks[task_id]✅ 특징:
- 자동 재시도 설정
- 상세한 에러 처리
- 태스크 상태 관리
- 지수 백오프 지원
- 태스크 취소 기능
Celery 워커의 설정과 모니터링을 위한 방법이다.
from celery.signals import worker_ready, worker_shutting_down
from prometheus_client import Counter, Gauge
# 메트릭 정의
task_counter = Counter('celery_tasks_total', 'Total number of Celery tasks')
task_latency = Gauge('celery_task_latency_seconds', 'Task processing latency')
@worker_ready.connect
def worker_ready_handler(**kwargs):
print("Celery worker is ready!")
@worker_shutting_down.connect
def worker_shutdown_handler(**kwargs):
print("Celery worker is shutting down...")
class MonitoredTask(Task):
def __call__(self, *args, **kwargs):
task_counter.inc()
with task_latency.time():
return super().__call__(*args, **kwargs)
@app.task(base=MonitoredTask)
def monitored_task():
# 작업 수행
pass✅ 특징:
- 워커 생명주기 관리
- 메트릭 수집
- 모니터링 통합
- 시그널 처리
- 워커 컨트롤 명령
Celery를 사용한 실제 애플리케이션 구현 예제를 살펴보자.
from celery import group
from typing import List, Dict
import smtplib
from email.mime.text import MIMEText
@app.task(rate_limit='100/m') # 분당 100개 제한
def send_email(to_email: str, subject: str, body: str) -> bool:
try:
msg = MIMEText(body)
msg['Subject'] = subject
msg['To'] = to_email
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login('your@email.com', 'password')
server.send_message(msg)
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False
def send_bulk_emails(email_list: List[Dict]):
# 병렬 처리로 이메일 발송
tasks = group(
send_email.s(
email['to'],
email['subject'],
email['body']
) for email in email_list
)
result = tasks.apply_async()
return resultimport os
from PIL import Image
from typing import Tuple
@app.task
def process_image(image_path: str, size: Tuple[int, int]) -> str:
try:
with Image.open(image_path) as img:
# 이미지 리사이징
img.thumbnail(size)
# 저장 경로 생성
filename = os.path.basename(image_path)
output_path = f"processed_{filename}"
# 처리된 이미지 저장
img.save(output_path)
return output_path
except Exception as e:
print(f"Image processing failed: {e}")
raise
class ImageProcessor:
def __init__(self):
self.processing_queue = []
def add_image(self, image_path: str, size: Tuple[int, int]):
task = process_image.delay(image_path, size)
self.processing_queue.append(task)
def get_results(self):
return [task.get() for task in self.processing_queue if task.ready()]from celery import chord
import pandas as pd
import numpy as np
from typing import List, Dict, Any
@app.task
def extract_data(source: str) -> pd.DataFrame:
"""데이터 소스에서 데이터 추출"""
if source.endswith('.csv'):
return pd.read_csv(source)
elif source.endswith('.json'):
return pd.read_json(source)
else:
raise ValueError(f"Unsupported source format: {source}")
@app.task
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""데이터 변환 및 전처리"""
# 결측치 처리
df = df.fillna(0)
# 특성 엔지니어링
if 'date' in df.columns:
df['year'] = pd.to_datetime(df['date']).dt.year
df['month'] = pd.to_datetime(df['date']).dt.month
# 이상치 처리
for col in df.select_dtypes(include=[np.number]).columns:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
df[col] = df[col].clip(q1 - 1.5 * iqr, q3 + 1.5 * iqr)
return df
@app.task
def load_data(df: pd.DataFrame, destination: str) -> bool:
"""처리된 데이터 저장"""
if destination.endswith('.csv'):
df.to_csv(destination, index=False)
elif destination.endswith('.json'):
df.to_json(destination, orient='records')
else:
raise ValueError(f"Unsupported destination format: {destination}")
return True
@app.task
def notify_completion(results: List[Any]) -> Dict:
"""모든 작업 완료 후 알림"""
return {
'status': 'completed',
'processed_files': len(results),
'success_count': sum(1 for r in results if r)
}
def process_data_pipeline(sources: List[str], destination_prefix: str):
"""ETL 파이프라인 실행"""
# 병렬 추출 및 변환 작업
tasks = []
for i, source in enumerate(sources):
destination = f"{destination_prefix}_{i}.csv"
# 개별 ETL 파이프라인
pipeline = chain(
extract_data.s(source),
transform_data.s(),
load_data.s(destination)
)
tasks.append(pipeline)
# chord를 사용하여 모든 파이프라인 완료 후 알림
workflow = chord(tasks)(notify_completion.s())
return workflow✅ 특징:
- 대량 작업의 병렬 처리
- 작업 속도 제한
- 복잡한 워크플로우 구성
- 이벤트 기반 처리
- 자원 효율적 사용
- 확장성 있는 설계
복잡한 분산 시스템을 위한 고급 Celery 패턴이다.
# 우선순위 큐 설정
app.conf.task_routes = {
'tasks.high_priority': {'queue': 'high_priority'},
'tasks.default_priority': {'queue': 'default'},
'tasks.low_priority': {'queue': 'low_priority'},
}
@app.task(queue='high_priority')
def high_priority(data):
"""긴급 처리가 필요한 작업"""
# 고우선순위 작업 처리
return "High priority task completed"
@app.task(queue='default')
def default_priority(data):
"""일반 작업"""
# 일반 작업 처리
return "Default task completed"
@app.task(queue='low_priority')
def low_priority(data):
"""배경 작업"""
# 저우선순위 작업 처리
return "Low priority task completed"
# 워커 시작 명령어 예시:
# celery -A tasks worker -Q high_priority,default,low_priority -l info# celeryconfig.py
worker_concurrency = 8 # CPU 코어 수에 맞춤
worker_prefetch_multiplier = 1 # 작업당 하나씩만 가져오도록 설정
worker_max_tasks_per_child = 1000 # 메모리 누수 방지
task_time_limit = 3600 # 1시간 제한
task_soft_time_limit = 3000 # 소프트 제한 50분
# 작업 종류별 최적화
task_annotations = {
'tasks.cpu_intensive': {'pool': 'solo'}, # CPU 작업은 별도 프로세스
'tasks.io_intensive': {'pool': 'gevent', 'rate_limit': '100/m'}, # I/O 작업은 gevent
}
# 데드레터 큐 설정
task_reject_on_worker_lost = True
task_acks_late = Truefrom celery import group, chain, chord, signature
@app.task
def analysis_task(data_chunk):
# 데이터 분석 작업
return {'chunk_id': data_chunk['id'], 'result': len(data_chunk['data'])}
@app.task
def reduce_results(results):
# 결과 집계
total = sum(result['result'] for result in results)
return {'total_count': total}
def map_reduce_workflow(data_chunks):
"""맵리듀스 패턴 구현"""
# 맵 단계 (병렬 처리)
map_tasks = group(analysis_task.s(chunk) for chunk in data_chunks)
# 리듀스 단계 (결과 집계)
workflow = chord(map_tasks)(reduce_results.s())
return workflow
# 동적 워크플로우 생성
def create_dynamic_workflow(initial_data):
"""실행 시점에 동적으로 워크플로우 생성"""
if initial_data['type'] == 'simple':
return simple_task.s(initial_data)
elif initial_data['type'] == 'complex':
# 복잡한 체인 생성
tasks = [initial_task.s(initial_data)]
# 조건에 따라 다른 태스크 추가
if initial_data.get('needs_validation'):
tasks.append(validation_task.s())
if initial_data.get('needs_processing'):
tasks.append(processing_task.s())
# 결과 형식 지정
tasks.append(format_result.s())
# 동적 체인 생성
return chain(*tasks)
else:
raise ValueError(f"Unknown workflow type: {initial_data['type']}")✅ 특징:
- 우선순위 기반 작업 처리
- 리소스별 워커 최적화
- 복잡한 워크플로우 패턴
- 맵리듀스 구현
- 동적 워크플로우 구성
프로덕션 환경에서 Celery를 안정적으로 운영하기 위한 구성이다.
# supervisord.conf
[program:celery]
command=/path/to/venv/bin/celery -A tasks worker --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999
[program:celerybeat]
command=/path/to/venv/bin/celery -A tasks beat --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999
[group:celery-cluster]
programs=celery,celerybeat
priority=999# celeryconfig_prod.py
import os
from kombu.common import Broadcast, Queue
# 브로커 및 백엔드 설정
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')
# 보안 설정
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
enable_utc = True
# 큐 설정
task_default_queue = 'default'
task_queues = (
Queue('default', routing_key='task.#'),
Queue('high_priority', routing_key='high_task.#'),
Queue('low_priority', routing_key='low_task.#'),
Broadcast('broadcast'), # 브로드캐스트 큐
)
# 라우팅 설정
task_routes = {
'tasks.critical_task': {'queue': 'high_priority', 'routing_key': 'high_task.critical'},
'tasks.background_task': {'queue': 'low_priority', 'routing_key': 'low_task.background'},
}
# 워커 설정
worker_concurrency = int(os.environ.get('CELERY_CONCURRENCY', 8))
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
# 로깅 설정
worker_hijack_root_logger = False
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
# 에러 처리
task_acks_late = True
task_reject_on_worker_lost = True
task_acks_on_failure_or_timeout = False
# 모니터링
worker_send_task_events = True
task_send_sent_event = True# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
celery-worker:
build: .
command: celery -A tasks worker --loglevel=INFO
volumes:
- .:/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- CELERY_CONCURRENCY=8
depends_on:
- redis
restart: unless-stopped
celery-beat:
build: .
command: celery -A tasks beat --loglevel=INFO
volumes:
- .:/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
flower:
build: .
command: celery -A tasks flower --port=5555
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis
- celery-worker
restart: unless-stopped
volumes:
redis-data:✅ 특징:
- 고가용성 구성
- 컨테이너화된 배포
- 감시 및 로깅 설정
- 큐 및 라우팅 최적화
- 확장 가능한 아키텍처
- 보안 설정
- 리소스 관리
✅ 모범 사례:
- 멱등성 보장: 태스크는 여러 번 실행해도 동일한 결과가 나오도록 설계
- 적절한 큐 분리: 작업 특성에 따라 큐를 분리하여 자원 경쟁 방지
- 데이터 직렬화: 태스크 데이터는 최소한으로 유지하고 직렬화 가능해야 함
- 실행 시간 모니터링: 오래 실행되는 태스크는 모니터링하고 시간 제한 설정
-
메모리 관리:
worker_max_tasks_per_child로 메모리 누수 방지 - 재시도 전략: 일시적 오류는 지수 백오프로 재시도, 영구적 오류는 데드레터 큐로 이동
- 태스크 취소 처리: 태스크 취소 시 리소스 정리 로직 구현
- 로깅 전략: 구조화된 로깅으로 태스크 추적 용이하게 설정
- 백엔드 선택: 데이터 양에 따라 적절한 결과 백엔드 선택 (Redis, RabbitMQ, 데이터베이스)
- 브로커 고가용성: 중요 시스템은 브로커 클러스터링 구성
- 워커 확장: 로드에 따라 워커 수를 동적으로 조정하는 메커니즘 구현
- 모니터링 도구 활용: Flower, Prometheus 등으로 실시간 모니터링
- 주기적 정리: 완료된 태스크 결과 정기적으로 정리
- 태스크 타임아웃: 모든 태스크에 타임아웃 설정
- 보안 고려: 브로커 및 결과 백엔드에 인증 설정
</rewritten_file>