[fix][autoscaler] recover from WrongClusterID on head restart#60860
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a recovery mechanism for WrongClusterID authentication errors that can occur when the head node restarts. This is achieved by recreating the GCS client in both v1 and v2 autoscaler monitors. The changes are logical and well-tested for the v2 monitor. However, the error detection mechanism is a bit fragile, and the test for the v1 monitor could be more comprehensive. I've added a few comments to address these points.
| @@ -436,6 +446,16 @@ def _run(self): | |||
| _internal_kv_put( | |||
| ray_constants.DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True | |||
| ) | |||
| except AuthenticationError as e: | |||
| if "WrongClusterID" in str(e): | |||
There was a problem hiding this comment.
Relying on string matching ("WrongClusterID" in str(e)) to detect this specific error is brittle. If the error message from AuthenticationError changes in the future, this recovery logic could fail silently. It would be more robust to check for a specific error code or use a more specific exception type if one is available.
| else: | ||
| logger.warning("No autoscaling state to report.") | ||
| except AuthenticationError as e: | ||
| if "WrongClusterID" in str(e): |
There was a problem hiding this comment.
Relying on string matching ("WrongClusterID" in str(e)) to detect this specific error is brittle. If the error message from AuthenticationError changes in the future, this recovery logic could fail silently. It would be more robust to check for a specific error code or use a more specific exception type if one is available.
There was a problem hiding this comment.
I agree it is brittle but I think this is the only reliable way to track this error for now.
Both WrongClusterID and InvalidAuthToken the same Unauthenticated gRPC status code and surface as the same AuthenticationError in Python, so string matching is the only way to differentiate them.
There was a problem hiding this comment.
Thought: The "correct" way would be to expose a different kind of error specifically for the WrongClusterId error.
But another "faster" way, would be to add a unit test on the error message. So if it changes in the future, we are aware of it and change this code accordingly.
I am not a ray maintainer, not sure if the second solution is in the guidelines.
There was a problem hiding this comment.
Can we throw all kinds of AuthenticationError?
There was a problem hiding this comment.
Make sense to me. Since we are throwing all kinds of AuthenticationError, the if statement is unnecessary. We can simply log out the exception before throwing.
| def testWrongClusterIDRecoveryV1(self): | ||
| ray.init() | ||
| gcs_address = ray._private.worker.global_worker.gcs_client.address | ||
| monitor = Monitor( | ||
| address=gcs_address, | ||
| autoscaling_config=None, | ||
| log_dir=self.tmpdir, | ||
| ) | ||
| old_client = monitor.gcs_client | ||
|
|
||
| calls = 0 | ||
| original_get_all_resource_usage = old_client.get_all_resource_usage | ||
|
|
||
| def flaky(): | ||
| nonlocal calls | ||
| if calls == 0: | ||
| calls += 1 | ||
| raise ray.exceptions.AuthenticationError("WrongClusterID") | ||
| return original_get_all_resource_usage() | ||
|
|
||
| # replace time.sleep(AUTOSCALER_UPDATE_INTERVAL_S) in monitor._run() | ||
| def raise_stop(_): | ||
| raise RuntimeError("stop") | ||
|
|
||
| # inject the failure once | ||
| with patch.object(old_client, "get_all_resource_usage", side_effect=flaky): | ||
| # breaks the infinite loop with the patched time.sleep | ||
| with patch.object(time, "sleep", side_effect=raise_stop): | ||
| # verify stop happened | ||
| with self.assertRaises(RuntimeError): | ||
| # raise AuthenticationError once | ||
| monitor._run() | ||
|
|
||
| # assert the gcs client is refreshed on both monitor | ||
| self.assertIsNot(monitor.gcs_client, old_client) |
There was a problem hiding this comment.
This test for the v1 monitor recovery doesn't initialize the autoscaler within the monitor, so monitor.autoscaler remains None. This means the logic in recreate_gcs_client that updates self.autoscaler._gcs_client is not covered. Please initialize the autoscaler and add an assertion to ensure the autoscaler's GCS client is also refreshed for more complete test coverage.
def testWrongClusterIDRecoveryV1(self):
ray.init()
gcs_address = ray._private.worker.global_worker.gcs_client.address
monitor = Monitor(
address=gcs_address,
autoscaling_config=None,
log_dir=self.tmpdir,
)
monitor._initialize_autoscaler()
old_client = monitor.gcs_client
old_autoscaler_client = monitor.autoscaler._gcs_client
calls = 0
original_get_all_resource_usage = old_client.get_all_resource_usage
def flaky():
nonlocal calls
if calls == 0:
calls += 1
raise ray.exceptions.AuthenticationError("WrongClusterID")
return original_get_all_resource_usage()
# replace time.sleep(AUTOSCALER_UPDATE_INTERVAL_S) in monitor._run()
def raise_stop(_):
raise RuntimeError("stop")
# inject the failure once
with patch.object(old_client, "get_all_resource_usage", side_effect=flaky):
# breaks the infinite loop with the patched time.sleep
with patch.object(time, "sleep", side_effect=raise_stop):
# verify stop happened
with self.assertRaises(RuntimeError):
# raise AuthenticationError once
monitor._run()
# assert the gcs client is refreshed on both monitor and autoscaler
self.assertIsNot(monitor.gcs_client, old_client)
self.assertIsNot(monitor.autoscaler._gcs_client, old_autoscaler_client)Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>
c3158f1 to
ff3a851
Compare
…connect-on-head-restart-60534
…connect-on-head-restart-60534
Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>
…e_usage Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>
| self.recreate_gcs_client() | ||
| elif self.retry_on_failure: | ||
| logger.exception( | ||
| "Monitor: AuthenticationError exception. Trying again..." |
There was a problem hiding this comment.
Nitpick: I would remove the "Monitor", because, as for now, the only other error is the Monitor one. But that might not be true in the future.
| else: | ||
| logger.warning("No autoscaling state to report.") | ||
| except AuthenticationError as e: | ||
| if "WrongClusterID" in str(e): |
There was a problem hiding this comment.
Thought: The "correct" way would be to expose a different kind of error specifically for the WrongClusterId error.
But another "faster" way, would be to add a unit test on the error message. So if it changes in the future, we are aware of it and change this code accordingly.
I am not a ray maintainer, not sure if the second solution is in the guidelines.
ae22d38 to
9f2c1c7
Compare
9f2c1c7 to
3789bcb
Compare
Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>
c6945c4 to
bb3717c
Compare
…connect-on-head-restart-60534
142dbcc to
b069a20
Compare
Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>
b069a20 to
2f20ab5
Compare
| ) | ||
| except AuthenticationError as e: | ||
| logger.warning(f"AuthenticationError detected, restarting autoscaler: {e}") | ||
| raise |
There was a problem hiding this comment.
All auth failures now force autoscaler exit
Medium Severity
update_autoscaling_state() now re-raises every AuthenticationError, not just WrongClusterID. This makes monitor._run() terminate for non-recoverable auth problems like invalid/missing tokens, so the autoscaler can enter repeated crash/restart cycles instead of continuing to run and report state.
There was a problem hiding this comment.
Yes, this is by design. All AuthenticationErrors (WrongClusterID, InvalidAuthToken) indicate the autoscaler can no longer communicate with GCS. Restarting is preferable to silently continuing with no state updates.
…connect-on-head-restart-60534
rueian
left a comment
There was a problem hiding this comment.
LGTM. Thanks @justinyeh1995! cc @edoakes to merge 🙏
Thanks for your guidance along the way! |
## Description Previously, when the head node restarts, GCS gets a new cluster ID causing the autoscaler's existing GCS client to fail with AuthenticationError. This fix addresses this by raising the error to exit the autoscaler main process and let Kubernetes restart a new autoscaler container to fix the issue. Note that this issue arises when`spec.headGroupSpec.template.spec.restartPolicy` is unset, which defaults to Always, without `spec.autoscalerOptions.version: v2`. Other configurations are unaffected: - `spec.autoscalerOptions.version: v2`: KubeRay operator forces restartPolicy: Never and recreates the entire pod on head failure, so the autoscaler sidecar is always fresh - `spec.autoscalerOptions.version: v2` with `restartPolicy: Always`: Rejected by the KubeRay operator ## Related issues Fixes #60534 ## Additional information ### Testing plan - [x] Unit tests - [x] Manual test with kuberay Manul Testing Steps 1. Build local images, noted this built is based on nightly version which is **post 2.46.0** ```bash docker build --progress=plain \ --build-arg BUILD_DATE="$(date +%Y-%m-%d:%H:%M:%S)" \ -t rayproject/autoscaling_e2e_test_image \ -f ./python/ray/tests/kuberay/Dockerfile . ``` 2. loaded into kind cluster ``` kind create cluster --name test-cluster-autoscaler --image kindest/node:v1.35.0 kind load docker-image rayproject/autoscaling_e2e_test_image:latest --name test-cluster-autoscaler ``` 3. Install kuberay-operator ``` helm repo update helm install kuberay-operator kuberay/kuberay-operator --version 1.5.1 ``` 3. apply manifest file for autoscaler v2 ```yaml apiVersion: ray.io/v1 kind: RayCluster metadata: name: test-autoscaler-fix spec: rayVersion: '3.0.0' enableInTreeAutoscaling: true autoscalerOptions: upscalingMode: Default idleTimeoutSeconds: 60 imagePullPolicy: IfNotPresent securityContext: {} env: [] envFrom: [] resources: limits: cpu: "500m" memory: "512Mi" requests: cpu: "500m" memory: "512Mi" headGroupSpec: rayStartParams: num-cpus: "0" template: spec: containers: - name: ray-head image: rayproject/autoscaling_e2e_test_image:latest imagePullPolicy: IfNotPresent ports: - containerPort: 6379 name: gcs - containerPort: 8265 name: dashboard - containerPort: 10001 name: client resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "1" memory: "2Gi" volumeMounts: - mountPath: /home/ray/samples name: ray-example-configmap volumes: - name: ray-example-configmap configMap: name: ray-example defaultMode: 0777 items: - key: detached_actor.py path: detached_actor.py - key: terminate_detached_actor.py path: terminate_detached_actor.py workerGroupSpecs: - replicas: 0 minReplicas: 0 maxReplicas: 10 groupName: small-group rayStartParams: {} template: spec: containers: - name: ray-worker image: rayproject/autoscaling_e2e_test_image:latest imagePullPolicy: IfNotPresent resources: limits: cpu: "1" memory: "1Gi" requests: cpu: "1" memory: "1Gi" --- apiVersion: v1 kind: ConfigMap metadata: name: ray-example data: detached_actor.py: | import ray import sys @ray.remote(num_cpus=1) class Actor: pass ray.init(namespace="default_namespace") Actor.options(name=sys.argv[1], lifetime="detached").remote() terminate_detached_actor.py: | import ray import sys ray.init(namespace="default_namespace") detached_actor = ray.get_actor(sys.argv[1]) ray.kill(detached_actor) ``` 3. Trigger OOM ``` HEAD_POD=$(kubectl get pods -l ray.io/cluster=test-autoscaler-fix,ray.io/node-type=head -o jsonpath='{.items[0].metadata.name}') kubectl exec $HEAD_POD -c ray-head -- python -c "x = bytearray(1800 * 1024 * 1024)" ``` 4. Quick sanity check ``` kubectl get pod $HEAD_POD -o jsonpath='{.status.containerStatuses[?(@.name=="ray-head")].lastState.terminated.reason}{"\n"}' # OOMKilled kubectl get pod "$HEAD_POD" -o jsonpath='{range .status.containerStatuses[*]}{.name}: restarts={.restartCount}{"\n"}{end}' # autoscaler: restarts=1 # ray-head: restarts=1 ``` 6. Check the logs of the dead autoscaler container ```bash # the autoscaler did raise the WrongClusterID issue and the error propagated which cause the process to exist kubectl logs "$HEAD_POD" -c autoscaler --previous | grep -iE "WrongCluster|restarting autoscaler|AuthenticationError" 2026-03-03 23:35:59,082 - WARNING - AuthenticationError detected, restarting autoscaler: RPC error: WrongClusterID: Perhaps the client is accessing GCS after it has restarted.. Note: RAY_AUTH_MODE is currently 'disabled' (not 'token'). Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. To generate a token for local development, use `ray get-auth-token --generate` For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html 2026-03-03 23:35:59,082 WARNING autoscaler.py:223 -- AuthenticationError detected, restarting autoscaler: RPC error: WrongClusterID: Perhaps the client is accessing GCS after it has restarted.. Note: RAY_AUTH_MODE is currently 'disabled' (not 'token'). Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. To generate a token for local development, use `ray get-auth-token --generate` For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html ray.exceptions.AuthenticationError: RPC error: WrongClusterID: Perhaps the client is accessing GCS after it has restarted.. Note: RAY_AUTH_MODE is currently 'disabled' (not 'token'). Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. To generate a token for local development, use `ray get-auth-token --generate` For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html ray.exceptions.AuthenticationError: RPC error: WrongClusterID: Perhaps the client is accessing GCS after it has restarted.. Note: RAY_AUTH_MODE is currently 'disabled' (not 'token'). Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. To generate a token for local development, use `ray get-auth-token --generate` For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html ray.exceptions.AuthenticationError: RPC error: WrongClusterID: Perhaps the client is accessing GCS after it has restarted.. Note: RAY_AUTH_MODE is currently 'disabled' (not 'token'). Ensure that the token for the cluster is available in a local file (e.g., ~/.ray/auth_token or via RAY_AUTH_TOKEN_PATH) or as the `RAY_AUTH_TOKEN` environment variable. To generate a token for local development, use `ray get-auth-token --generate` For remote clusters, ensure that the token is propagated to all nodes of the cluster when token authentication is enabled. For more information, see: https://docs.ray.io/en/latest/ray-security/token-auth.html ``` 7. Check the logs of the recreated autoscaler and head containers ```bash kubectl logs "$HEAD_POD" -c autoscaler --timestamps | tail -20 2026-03-04T07:42:11.446543463Z 2026-03-03 23:42:11,446 - INFO - Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:11.446555630Z 2026-03-03 23:42:11,446 INFO config.py:183 -- Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:11.446685338Z 2026-03-03 23:42:11,446 - INFO - Refreshing K8s API client token and certs. 2026-03-04T07:42:11.446690213Z 2026-03-03 23:42:11,446 INFO node_provider.py:283 -- Refreshing K8s API client token and certs. 2026-03-04T07:42:11.455373047Z 2026-03-03 23:42:11,455 - INFO - Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:11.455382838Z 2026-03-03 23:42:11,455 INFO cloud_provider.py:496 -- Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:11.458700130Z 2026-03-03 23:42:11,458 - INFO - Fetched pod data at resource version 357821. 2026-03-04T07:42:11.458704213Z 2026-03-03 23:42:11,458 INFO cloud_provider.py:514 -- Fetched pod data at resource version 357821. 2026-03-04T07:42:16.478599382Z 2026-03-03 23:42:16,478 - INFO - Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:16.478637757Z 2026-03-03 23:42:16,478 INFO config.py:183 -- Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:16.487190632Z 2026-03-03 23:42:16,486 - INFO - Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:16.487201591Z 2026-03-03 23:42:16,486 INFO cloud_provider.py:496 -- Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:16.490396924Z 2026-03-03 23:42:16,490 - INFO - Fetched pod data at resource version 357829. 2026-03-04T07:42:16.490401799Z 2026-03-03 23:42:16,490 INFO cloud_provider.py:514 -- Fetched pod data at resource version 357829. 2026-03-04T07:42:21.521817051Z 2026-03-03 23:42:21,521 - INFO - Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:21.521845968Z 2026-03-03 23:42:21,521 INFO config.py:183 -- Calculating hashes for file mounts and ray commands. 2026-03-04T07:42:21.534636551Z 2026-03-03 23:42:21,534 - INFO - Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:21.534649635Z 2026-03-03 23:42:21,534 INFO cloud_provider.py:496 -- Listing pods for RayCluster test-autoscaler-fix in namespace default at pods resource version >= 357171. 2026-03-04T07:42:21.539141176Z 2026-03-03 23:42:21,539 - INFO - Fetched pod data at resource version 357839. 2026-03-04T07:42:21.539253385Z 2026-03-03 23:42:21,539 INFO cloud_provider.py:514 -- Fetched pod data at resource version 357839. ``` ```bash kubectl logs "$HEAD_POD" -c ray-head --timestamps | tail -20 2026-03-04T07:36:06.255084878Z 2026-03-03 23:36:06,253 INFO scripts.py:974 -- To add another node to this Ray cluster, run 2026-03-04T07:36:06.255085794Z 2026-03-03 23:36:06,253 INFO scripts.py:977 -- ray start --address='10.244.0.10:6379' 2026-03-04T07:36:06.255086461Z 2026-03-03 23:36:06,253 INFO scripts.py:988 -- To connect to this Ray cluster: 2026-03-04T07:36:06.255087044Z 2026-03-03 23:36:06,253 INFO scripts.py:990 -- import ray 2026-03-04T07:36:06.255087503Z 2026-03-03 23:36:06,254 INFO scripts.py:991 -- ray.init() 2026-03-04T07:36:06.255087919Z 2026-03-03 23:36:06,254 INFO scripts.py:1005 -- To submit a Ray job using the Ray Jobs CLI: 2026-03-04T07:36:06.255096086Z 2026-03-03 23:36:06,254 INFO scripts.py:1006 -- RAY_API_SERVER_ADDRESS='http://10.244.0.10:8265' ray job submit --working-dir . -- python my_script.py 2026-03-04T07:36:06.255101378Z 2026-03-03 23:36:06,254 INFO scripts.py:1015 -- See https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html 2026-03-04T07:36:06.255105753Z 2026-03-03 23:36:06,254 INFO scripts.py:1019 -- for more information on submitting Ray jobs to the Ray cluster. 2026-03-04T07:36:06.255109086Z 2026-03-03 23:36:06,254 INFO scripts.py:1024 -- To terminate the Ray runtime, run 2026-03-04T07:36:06.255110211Z 2026-03-03 23:36:06,254 INFO scripts.py:1025 -- ray stop 2026-03-04T07:36:06.255110669Z 2026-03-03 23:36:06,254 INFO scripts.py:1028 -- To view the status of the cluster, use 2026-03-04T07:36:06.255111128Z 2026-03-03 23:36:06,254 INFO scripts.py:1029 -- ray status 2026-03-04T07:36:06.255111586Z 2026-03-03 23:36:06,254 INFO scripts.py:1033 -- To monitor and debug Ray, view the dashboard at 2026-03-04T07:36:06.255112003Z 2026-03-03 23:36:06,254 INFO scripts.py:1034 -- 10.244.0.10:8265 2026-03-04T07:36:06.255117419Z 2026-03-03 23:36:06,254 INFO scripts.py:1041 -- If connection to the dashboard fails, check your firewall settings and network configuration. 2026-03-04T07:36:06.255119711Z 2026-03-03 23:36:06,254 INFO scripts.py:1147 -- --block 2026-03-04T07:36:06.255122544Z 2026-03-03 23:36:06,254 INFO scripts.py:1148 -- This command will now block forever until terminated by a signal. 2026-03-04T07:36:06.255139253Z 2026-03-03 23:36:06,254 INFO scripts.py:1151 -- Running subprocesses are monitored and a message will be printed if any of them terminate unexpectedly. Subprocesses exit with SIGTERM will be treated as graceful, thus NOT reported. 2026-03-04T07:36:06.255141753Z 2026-03-03 23:36:06,254 INFO scripts.py:1156 -- Process exit logs will be saved to: /tmp/ray/session_2026-03-03_23-35-58_976470_1/logs/ray_process_exit.log ``` --------- Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com>


Description
Previously, when the head node restarts, GCS gets a new cluster ID causing the autoscaler's existing GCS client to fail with AuthenticationError.
This fix addresses this by raising the error to exit the autoscaler main process and let Kubernetes restart a new autoscaler container to fix the issue.
Note that this issue arises when
spec.headGroupSpec.template.spec.restartPolicyis unset, which defaults to Always, withoutspec.autoscalerOptions.version: v2.Other configurations are unaffected:
spec.autoscalerOptions.version: v2:KubeRay operator forces restartPolicy: Never and recreates the entire pod on head failure, so the autoscaler sidecar is always fresh
spec.autoscalerOptions.version: v2withrestartPolicy: Always:Rejected by the KubeRay operator
Related issues
Fixes #60534
Additional information
Testing plan
Manul Testing Steps
docker build --progress=plain \ --build-arg BUILD_DATE="$(date +%Y-%m-%d:%H:%M:%S)" \ -t rayproject/autoscaling_e2e_test_image \ -f ./python/ray/tests/kuberay/Dockerfile .