Skip to content

KR_aiohttp

somaz edited this page Mar 6, 2025 · 5 revisions

파이썬 aiohttp

1. aiohttp 기초

aiohttp는 비동기 HTTP 클라이언트/서버 프레임워크이다.

from aiohttp import web

async def hello(request):
    return web.Response(text="Hello, World!")

async def get_user(request):
    user_id = request.match_info['id']
    return web.json_response({
        'id': user_id,
        'name': 'Test User',
        'email': 'test@example.com'
    })

app = web.Application()
app.router.add_get('/', hello)
app.router.add_get('/users/{id}', get_user)

if __name__ == '__main__':
    web.run_app(app)

2. 클라이언트 사용

import aiohttp
import asyncio

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def post_data(url, data):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data) as response:
            return await response.json()

async def main():
    # GET 요청
    data = await fetch_data('http://api.example.com/users')
    print(data)
    
    # POST 요청
    response = await post_data(
        'http://api.example.com/users',
        {'name': 'John', 'email': 'john@example.com'}
    )
    print(response)

asyncio.run(main())

3. 미들웨어와 핸들러

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        return response
    except web.HTTPException as ex:
        return web.json_response(
            {'error': str(ex)},
            status=ex.status
        )
    except Exception as ex:
        return web.json_response(
            {'error': 'Internal Server Error'},
            status=500
        )

async def handle_404(request):
    return web.json_response(
        {'error': 'Not Found'},
        status=404
    )

app = web.Application(middlewares=[error_middleware])
app.router.add_routes([
    web.get('/', hello),
    web.get('/users/{id}', get_user)
])
app.router.set_error_handler(404, handle_404)

4. 데이터베이스 통합

import aiopg
from aiohttp import web

async def init_pg(app):
    conf = app['config']['postgres']
    engine = await aiopg.create_pool(
        database=conf['database'],
        user=conf['user'],
        password=conf['password'],
        host=conf['host'],
        port=conf['port'],
        minsize=1,
        maxsize=5
    )
    app['db'] = engine

async def close_pg(app):
    app['db'].close()
    await app['db'].wait_closed()

class UserHandler:
    async def get_user(self, request):
        user_id = request.match_info['id']
        async with request.app['db'].acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    'SELECT * FROM users WHERE id = %s',
                    (user_id,)
                )
                user = await cur.fetchone()
                if not user:
                    raise web.HTTPNotFound()
                return web.json_response({
                    'id': user[0],
                    'name': user[1],
                    'email': user[2]
                })

app = web.Application()
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)

5. WebSocket 구현

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            if msg.data == 'close':
                await ws.close()
            else:
                await ws.send_str(f'서버 응답: {msg.data}')
        elif msg.type == web.WSMsgType.ERROR:
            print('ws 연결 에러:', ws.exception())
    
    return ws

app.router.add_get('/ws', websocket_handler)

6. 실용적인 예제

REST API 서비스

from aiohttp import web
import json

class RestAPI:
    def __init__(self):
        self.items = {}
    
    async def get_items(self, request):
        return web.json_response(list(self.items.values()))
    
    async def get_item(self, request):
        item_id = request.match_info['id']
        if item_id not in self.items:
            raise web.HTTPNotFound()
        return web.json_response(self.items[item_id])
    
    async def create_item(self, request):
        data = await request.json()
        item_id = str(len(self.items) + 1)
        self.items[item_id] = {
            'id': item_id,
            **data
        }
        return web.json_response(
            self.items[item_id],
            status=201
        )
    
    async def update_item(self, request):
        item_id = request.match_info['id']
        if item_id not in self.items:
            raise web.HTTPNotFound()
        
        data = await request.json()
        self.items[item_id].update(data)
        return web.json_response(self.items[item_id])

api = RestAPI()
app.router.add_get('/items', api.get_items)
app.router.add_get('/items/{id}', api.get_item)
app.router.add_post('/items', api.create_item)
app.router.add_put('/items/{id}', api.update_item)

파일 업로드 처리

import os

async def handle_upload(request):
    reader = await request.multipart()
    
    # 파일 필드 처리
    field = await reader.next()
    filename = field.filename
    
    # 파일 저장
    size = 0
    with open(os.path.join('uploads', filename), 'wb') as f:
        while True:
            chunk = await field.read_chunk()
            if not chunk:
                break
            size += len(chunk)
            f.write(chunk)
    
    return web.json_response({
        'filename': filename,
        'size': size
    })

app.router.add_post('/upload', handle_upload)

세션 관리

import aiohttp_session
from aiohttp_session import setup, get_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
import base64

async def init_app():
    app = web.Application()
    
    # 세션 설정
    secret_key = base64.urlsafe_b64decode(
        'your-secret-key=='
    )
    setup(app, EncryptedCookieStorage(secret_key))
    
    return app

async def login(request):
    session = await get_session(request)
    data = await request.json()
    
    # 인증 로직
    user_id = authenticate(data)
    if user_id:
        session['user_id'] = user_id
        return web.json_response({'status': 'logged in'})
    
    raise web.HTTPUnauthorized()

async def logout(request):
    session = await get_session(request)
    session.invalidate()
    return web.json_response({'status': 'logged out'})

7. 주요 팁

  • 비동기 코드 최적화
  • 연결 풀링 활용
  • 에러 처리 구현
  • 세션 관리
  • 보안 설정
  • 로깅 구현
  • 테스트 작성
  • 성능 모니터링

Clone this wiki locally