-
Notifications
You must be signed in to change notification settings - Fork 0
KR_aiohttp
somaz edited this page Mar 6, 2025
·
5 revisions
aiohttp는 비동기 HTTP 클라이언트/서버 프레임워크이다.
from aiohttp import web
async def hello(request):
return web.Response(text="Hello, World!")
async def get_user(request):
user_id = request.match_info['id']
return web.json_response({
'id': user_id,
'name': 'Test User',
'email': 'test@example.com'
})
app = web.Application()
app.router.add_get('/', hello)
app.router.add_get('/users/{id}', get_user)
if __name__ == '__main__':
web.run_app(app)import aiohttp
import asyncio
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def post_data(url, data):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
return await response.json()
async def main():
# GET 요청
data = await fetch_data('http://api.example.com/users')
print(data)
# POST 요청
response = await post_data(
'http://api.example.com/users',
{'name': 'John', 'email': 'john@example.com'}
)
print(response)
asyncio.run(main())@web.middleware
async def error_middleware(request, handler):
try:
response = await handler(request)
return response
except web.HTTPException as ex:
return web.json_response(
{'error': str(ex)},
status=ex.status
)
except Exception as ex:
return web.json_response(
{'error': 'Internal Server Error'},
status=500
)
async def handle_404(request):
return web.json_response(
{'error': 'Not Found'},
status=404
)
app = web.Application(middlewares=[error_middleware])
app.router.add_routes([
web.get('/', hello),
web.get('/users/{id}', get_user)
])
app.router.set_error_handler(404, handle_404)import aiopg
from aiohttp import web
async def init_pg(app):
conf = app['config']['postgres']
engine = await aiopg.create_pool(
database=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['port'],
minsize=1,
maxsize=5
)
app['db'] = engine
async def close_pg(app):
app['db'].close()
await app['db'].wait_closed()
class UserHandler:
async def get_user(self, request):
user_id = request.match_info['id']
async with request.app['db'].acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
'SELECT * FROM users WHERE id = %s',
(user_id,)
)
user = await cur.fetchone()
if not user:
raise web.HTTPNotFound()
return web.json_response({
'id': user[0],
'name': user[1],
'email': user[2]
})
app = web.Application()
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(f'서버 응답: {msg.data}')
elif msg.type == web.WSMsgType.ERROR:
print('ws 연결 에러:', ws.exception())
return ws
app.router.add_get('/ws', websocket_handler)from aiohttp import web
import json
class RestAPI:
def __init__(self):
self.items = {}
async def get_items(self, request):
return web.json_response(list(self.items.values()))
async def get_item(self, request):
item_id = request.match_info['id']
if item_id not in self.items:
raise web.HTTPNotFound()
return web.json_response(self.items[item_id])
async def create_item(self, request):
data = await request.json()
item_id = str(len(self.items) + 1)
self.items[item_id] = {
'id': item_id,
**data
}
return web.json_response(
self.items[item_id],
status=201
)
async def update_item(self, request):
item_id = request.match_info['id']
if item_id not in self.items:
raise web.HTTPNotFound()
data = await request.json()
self.items[item_id].update(data)
return web.json_response(self.items[item_id])
api = RestAPI()
app.router.add_get('/items', api.get_items)
app.router.add_get('/items/{id}', api.get_item)
app.router.add_post('/items', api.create_item)
app.router.add_put('/items/{id}', api.update_item)import os
async def handle_upload(request):
reader = await request.multipart()
# 파일 필드 처리
field = await reader.next()
filename = field.filename
# 파일 저장
size = 0
with open(os.path.join('uploads', filename), 'wb') as f:
while True:
chunk = await field.read_chunk()
if not chunk:
break
size += len(chunk)
f.write(chunk)
return web.json_response({
'filename': filename,
'size': size
})
app.router.add_post('/upload', handle_upload)import aiohttp_session
from aiohttp_session import setup, get_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
import base64
async def init_app():
app = web.Application()
# 세션 설정
secret_key = base64.urlsafe_b64decode(
'your-secret-key=='
)
setup(app, EncryptedCookieStorage(secret_key))
return app
async def login(request):
session = await get_session(request)
data = await request.json()
# 인증 로직
user_id = authenticate(data)
if user_id:
session['user_id'] = user_id
return web.json_response({'status': 'logged in'})
raise web.HTTPUnauthorized()
async def logout(request):
session = await get_session(request)
session.invalidate()
return web.json_response({'status': 'logged out'})- 비동기 코드 최적화
- 연결 풀링 활용
- 에러 처리 구현
- 세션 관리
- 보안 설정
- 로깅 구현
- 테스트 작성
- 성능 모니터링