Error 429 - ai-rate-limiting #13131
-
|
Hi, I am currently testing APISIX via a Helm chart deployment. I've successfully implemented route creation and verified communication with my vLLM production stack using a Python script. However, I'm encountering an issue with the ai-rate-limiting plugin: it does not seem to return a 429 error, even when sending several requests in parallel that should exceed the defined threshold. Could you help me troubleshoot why the limit isn't triggering? Here the code: #!/usr/bin/env python3
"""
Test script for APISIX rate limiting - Simplified version.
Supports both POST (LLM) and GET (health check) endpoints.
"""
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
import asyncio
import aiohttp
import time
import sys
from collections import defaultdict
# ─── Configuration ─────────────────────────────────────────────────────────────
# URLs
GW_URL = "https://apisix.test"
KEYCLOAK_URL = "https://keycloak.test"
ENDPOINT_POST = "/v1/chat/completions"
ENDPOINT_GET = "/health" # Simple health check endpoint
# Keycloak credentials
CLIENT_ID = "apisix"
CLIENT_SECRET = "fDU9xU"
# Timeouts (reduced for faster failure detection)
TOKEN_TIMEOUT = 30
CHAT_TIMEOUT = 120 # Reduced from 600s
HEALTH_TIMEOUT = 10
# Test configuration
MAX_ITERATIONS = 5
DEFAULT_REQ_PER_SEC = 3
# Prompt for testing
PROMPT_TEST = "Bonjour."
# ─── Fonctions d'authentification ─────────────────────────────────────────────
async def get_keycloak_token(session):
"""Obtenir un token Keycloak via client_credentials grant."""
url = f"{KEYCLOAK_URL}/auth/realms/master/protocol/openid-connect/token"
data = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
try:
async with session.post(url, data=data, timeout=TOKEN_TIMEOUT) as response:
if response.status == 200:
token_data = await response.json()
return token_data.get("access_token")
else:
error_data = await response.text()
print(f"ERREUR Keycloak ({response.status}): {error_data[:200]}")
return None
except asyncio.TimeoutError:
print("TIMEOUT: Keycloak token request timed out")
return None
except Exception as e:
print(f"ERREUR Keycloak: {e}")
return None
async def get_tokens_for_tests(session, num_tokens=3):
"""Obtenir plusieurs tokens pour tester."""
tokens = []
for i in range(num_tokens):
token = await get_keycloak_token(session)
if token:
tokens.append(f"Bearer {token}")
print(f"Token {i+1} obtenu")
else:
print(f"Token {i+1} échoué")
return tokens
# ─── Fonctions de requête ─────────────────────────────────────────────────────
async def send_request_post(session, token, url):
"""Envoyer une requête POST LLM."""
headers = {
"Authorization": token,
"Content-Type": "application/json"
}
data = {
"model": "mistralai/Devstral-Small-2-24B-Instruct-2512",
"messages": [{"role": "user", "content": PROMPT_TEST}]
}
current_time_str = time.strftime("%H:%M:%S", time.localtime())
token_short = token[-5:] if len(token) > 5 else token
try:
async with session.post(url, json=data, headers=headers, timeout=CHAT_TIMEOUT) as response:
status = response.status
return status, current_time_str, token_short
except asyncio.TimeoutError:
print(f"[{current_time_str}] {token_short}: 504 Gateway Timeout")
return 504, current_time_str, token_short
except aiohttp.ClientError as e:
print(f"[{current_time_str}] {token_short}: Client Error - {type(e).__name__}")
return 500, current_time_str, token_short
except Exception as e:
print(f"[{current_time_str}] {token_short}: Error - {e}")
return 500, current_time_str, token_short
async def send_request_get(session, token, url):
"""Envoyer une requête GET (health check ou simple endpoint)."""
headers = {"Authorization": token}
current_time_str = time.strftime("%H:%M:%S", time.localtime())
token_short = token[-5:] if len(token) > 5 else token
try:
async with session.get(url, headers=headers, timeout=HEALTH_TIMEOUT) as response:
status = response.status
return status, current_time_str, token_short
except asyncio.TimeoutError:
return 504, current_time_str, token_short
except Exception as e:
return 500, current_time_str, token_short
# ─── Tests ─────────────────────────────────────────────────────────────────────
async def test_rate_limiting(session, tokens, url, use_post=True, req_sec=5):
"""Tester le rate limiting avec plusieurs tokens."""
response_counts = defaultdict(lambda: {"200_OK": 0, "429_TOO_MANY_REQUESTS": 0, "504_TIMEOUT": 0, "OTHER": 0})
async def send_requests(api_key):
for iteration in range(MAX_ITERATIONS):
tasks = []
for _ in range(req_sec):
if use_post:
tasks.append(send_request_post(session, api_key, url))
else:
tasks.append(send_request_get(session, api_key, url))
results = await asyncio.gather(*tasks)
for status, time_str, token_short in results:
if status == 200:
response_counts[api_key]["200_OK"] += 1
print(f"[{time_str}] {token_short}: {status} OK")
elif status == 429:
response_counts[api_key]["429_TOO_MANY_REQUESTS"] += 1
print(f"[{time_str}] {token_short}: {status} RATE LIMITED")
elif status == 504:
response_counts[api_key]["504_TIMEOUT"] += 1
print(f"[{time_str}] {token_short}: {status} GATEWAY TIMEOUT")
else:
response_counts[api_key]["OTHER"] += 1
print(f"[{time_str}] {token_short}: {status} OTHER")
await asyncio.sleep(1)
print("\n" + "=" * 60)
print("TEST RATE LIMITING")
print(f"URL: {url}")
print(f"Type: {'POST (LLM)' if use_post else 'GET (Health)'}")
print(f"Tokens: {len(tokens)}")
print(f"Req/sec: {req_sec}, Iterations: {MAX_ITERATIONS}")
print("=" * 60)
tasks = [send_requests(token) for token in tokens]
await asyncio.gather(*tasks)
# Statistiques
print("\n" + "=" * 60)
print("STATISTIQUES")
print("=" * 60)
for api_key, counts in response_counts.items():
token_short = api_key[-10:] if len(api_key) > 10 else api_key
total_requests = MAX_ITERATIONS * req_sec
print(f"\nKey: {token_short}")
print(f" Total: {total_requests}")
print(f" 200 OK: {counts['200_OK']} ({counts['200_OK']/total_requests:.1%})")
print(f" 429 LIMITED: {counts['429_TOO_MANY_REQUESTS']} ({counts['429_TOO_MANY_REQUESTS']/total_requests:.1%})")
print(f" 504 TIMEOUT: {counts['504_TIMEOUT']} ({counts['504_TIMEOUT']/total_requests:.1%})")
print(f" OTHER: {counts['OTHER']} ({counts['OTHER']/total_requests:.1%})")
print("\n" + "=" * 60)
# ─── Main ─────────────────────────────────────────────────────────────────────
async def main():
"""Fonction principale."""
req_sec = DEFAULT_REQ_PER_SEC
use_post = True
num_tokens = 3
# Parse arguments: python test_quotas_ok.py [req_sec] [get|post] [num_tokens]
if len(sys.argv) > 1:
req_sec = int(sys.argv[1])
if len(sys.argv) > 2:
use_post = sys.argv[2].lower() != "get"
if len(sys.argv) > 3:
num_tokens = int(sys.argv[3])
print("=" * 60)
print("Test Rate Limiting - APISIX")
print("=" * 60)
# URL cible
if use_post:
url = f"{GW_URL}{ENDPOINT_POST}"
print(f"\nMéthode: POST (LLM)")
else:
url = f"{GW_URL}{ENDPOINT_GET}"
print(f"\nMéthode: GET (Health Check)")
print(f"URL: {url}")
print(f"Req/seconde: {req_sec}")
print(f"Iterations: {MAX_ITERATIONS}")
print(f"Tokens: {num_tokens}")
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
# Authentification
print("\n" + "=" * 60)
print("Authentification Keycloak...")
print("=" * 60)
tokens = await get_tokens_for_tests(session, num_tokens=num_tokens)
if not tokens:
print("ERREUR: Impossible d'obtenir des tokens")
return
print(f"✓ {len(tokens)} tokens obtenus")
# Test rate limiting
await test_rate_limiting(session, tokens, url, use_post=use_post, req_sec=req_sec)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\nArrêt par l'utilisateur.")
except Exception as e:
print(f"\n\nErreur: {e}")
import traceback
traceback.print_exc()and sample of my config route: ROUTE_JSON=$(printf '{
"uri": "/v1/chat/completions",
"name": "test-chat",
"priority": 20,
"plugins": {
"openid-connect": {
"client_id": "%s",
"client_secret": "%s",
"discovery": "%s/auth/realms/master/.well-known/openid-configuration",
"bearer_only": true,
"ssl_verify": false,
"timeout": 40
},
"proxy-rewrite": {
"headers": {
"set": {
"Authorization": "Bearer %s"
}
}
},
"ai-rate-limiting": {
"limit": 1000,
"time_window": 60,
"limit_strategy": "total_tokens",
"rejected_code": 429,
"rejected_msg": "User token quota exceeded",
"show_limit_quota_header": true
}
},
"upstream": {
"type": "roundrobin",
"scheme": "http",
"nodes": {
"vllm-production-stack.local:80": 1
}
}
}' "$CLIENT_ID" "$CLIENT_SECRET" "$KEYCLOAK" "$VLLM_KEY") |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
|
results: ============================================================
STATISTIQUES
============================================================
Key: ARbrgWWJdg
Total: 15
200 OK: 13 (86.7%)
429 LIMITED: 0 (0.0%)
504 TIMEOUT: 2 (13.3%)
OTHER: 0 (0.0%)
Key: K5h3Hs85KQ
Total: 15
200 OK: 13 (86.7%)
429 LIMITED: 0 (0.0%)
504 TIMEOUT: 2 (13.3%)
OTHER: 0 (0.0%)
Key: EkRYSa-DPg
Total: 15
200 OK: 10 (66.7%)
429 LIMITED: 0 (0.0%)
504 TIMEOUT: 5 (33.3%)
OTHER: 0 (0.0%)
|
Beta Was this translation helpful? Give feedback.
-
|
Hey everyone! I just created a high-speed asynchronous compute core in C++ and Python. Check it out, running tests now! The GitHub view counter is live: https://github.com/nlozkina19-crypto/vector-zero-compute |
Beta Was this translation helpful? Give feedback.
There are a few important things that explain why you're not seeing any 429 responses:
1. The
ai-rate-limitingplugin requiresai-proxyorai-proxy-multiThe plugin only activates when
ctx.picked_ai_instance_nameis set, which is populated by theai-proxyorai-proxy-multiplugin [1]. In your config, you're using a plainupstreamwithproxy-rewrite— so theai-rate-limitingplugin never actually triggers. You would need to route throughai-proxyfor the plugin to work.2. Token counting happens after the LLM responds, not before
The plugin uses a two-phase approach: a 1-token pre-flight check in the access phase, then the real token deduction in the log phase after parsing the LLM respon…