Skip to content

[rollout] feat: introduce vLLM AsyncLLM to support multi-turn rollout #1138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 25, 2025

Conversation

wuxibin89
Copy link
Collaborator

@wuxibin89 wuxibin89 commented Apr 17, 2025

Summary

Introduce vLLM AsyncLLM to support multi-turn rollout and #385 #398 #710

Architecture

async_llm_arch

New Components:

  • AsyncLLMWorker: standalone vllm server instance

    • FastAPI: provide OpenAI-compatible HTTP server
    • AsyncLLM: async LLMEngine for online serving, for more details: AsyncLLM, LLMEngine
    • ExternalRayDistributedExecutor: custom executor backend manages workers in worker group, it grabs corresponding workers by actor names
  • AsyncLLManager: manages a group of vllm server instances(AsyncLLMWorker)

    • AsyncLLM lifecycle: initialization, wake_up, sleep.
    • FastAPI service discovery
  • ChatScheduler: schedule multiple chat completion requests with multiple server instances

    • Least requests load balance
    • Sticky session with prefix caching
    • Chat completion callback: tools calling

TODO

  • AsyncLLM: intialization/wake_up/sleep
  • OpenAI API: support /v1/chat/completions
  • RayPPOTrainer integration: replace generate_sequences to http call /v1/chat/completions
  • GSM8K e2e training
  • Add document

@vermouth1992
Copy link
Collaborator

Some comments:

  1. Is there any way to make infer replica to colocate with training?
  2. Not only colocate, but also in the same process
  3. If colocating is not possible, we have to implement one-step off-policy RL in a new trainer

@vermouth1992
Copy link
Collaborator

The idea case is that the construction of vllm worker and vllm single controller can be separate, and we can have a handler to the worker to make them build in the training process. Then, we point those handler to the vllm single controller. vllm single controller can be colocated with rl single controller

@wuxibin89
Copy link
Collaborator Author

wuxibin89 commented Apr 17, 2025

Some comments:

  1. Is there any way to make infer replica to colocate with training?
  2. Not only colocate, but also in the same process
  3. If colocating is not possible, we have to implement one-step off-policy RL in a new trainer

In the architecture above, the AsyncLLMWorker is a cpu-only actor which only contains Scheduler and Executor, the GPU model runner is still colocate with FSDP model in same process.

It's very complicated to colocate AsyncLLMWorker into single-controller or training process, since AsyncLLM's architecture is a combination of multiprocess and asyncio busy loop, see vllm-project/vllm#9826
image

@vermouth1992
Copy link
Collaborator

Some comments:

  1. Is there any way to make infer replica to colocate with training?
  2. Not only colocate, but also in the same process
  3. If colocating is not possible, we have to implement one-step off-policy RL in a new trainer

In the architecture above, the AsyncLLMWorker is a cpu-only actor which only contains Scheduler and Executor, the GPU model runner is still colocate with FSDP model in same process.

It's very complicated to colocate AsyncLLMWorker into single-controller or training process, since AsyncLLM's architecture is a combination of multiprocess and asyncio busy loop, see vllm-project/vllm#9826 image

Fantastic! I guess this is exactly what I expected!

@youkaichao
Copy link

this is soooo amazing, I didn't imagine you can reuse existing ray actors and let them controlled by the vllm executor 👍

@casper-hansen
Copy link
Contributor

casper-hansen commented Apr 18, 2025

I was asked to review here based on #899 and https://github.com/casper-hansen/verl/tree/feature/interleaved-tool-calling.

EDIT: I just read your ReTool paper. I am trying to achieve exactly the same thing, just with search rather than interpreter. Let me know if you need more clarification on my questions - main concern is data needed to achieve this behavior.

image

My understanding is that the new async rollout is a great abstraction. This may achieve 3x higher throughput as described in the paper for Seed-Thinking-v1.5 if you decouple model evolution from runtime execution, but at least in the current implementation, I see less flexibility for specific inference methods. So I have some clarifying questions.

Questions:

  1. If a user wants to achieve interleaved function calling (like OpenAI o3, Z1-Rumination, etc.), how do you see the path to achieving this?

    • Interleaved function calling can be understood as (1) the model generating token by token, and (2) a certain stopword like </tool> triggers an external function that (3) returns information to be included in the current generation. Current implementation here.
    • Multi-turn rollouts can achieve the same functionality as interleaved function calling because it is semantically the same and just a question of chat template formatting the inputs. However, there may be other requirements to make this work.
  2. If question 1 is not feasible with the new async implementation, how would the implementation look like for self-instructed tool calling like you would get when training a reasoner to do interleaved tool calling?

    • Is this achievable purely based on system prompting, a user question, and rule-based verification to reward certain tool calling behavior? Can we avoid having to generate synthetic data of the assistant-tool loop (see example below)?
    • A set of turns achieving this would be something like [{role: system}, {role: user}, {role: assistant}, {role: tool}, {role: assistant}, {role: tool}, ... (continuing with assistant/tool loop until done) ]

@wuxibin89
Copy link
Collaborator Author

wuxibin89 commented Apr 18, 2025

@casper-hansen For Question 1, I think we can achieve interleaved function calling by multi-turn rollouts:

  1. Send a request [{role: user}] to model, ask it to stop generation at <tool>...</tool> and return [{role: user}, {role: assistant}]
  2. Client call external tools, then send a new request with tool result [{role: user}, {role: assistant}, {role: tool}]
  3. Repeat 1/2 until generation done

We can achieve this in chat_completion callback:
https://github.com/volcengine/verl/pull/1138/files#diff-82206035fbba7264fcb965dbe70535bfccbc1a81479aed61fe885489d35a952bR33-R38

Also, maybe we can call tool at server side, see vLLM tool_calling, but I haven't investigate it yet.

@casper-hansen
Copy link
Contributor

  1. Send a request [{role: user}] to model, ask it to stop generation at <tool>...</tool> and return [{role: user}, {role: assistant}]
  2. Client call external tools, then send a new request with tool result [{role: user}, {role: assistant}, {role: tool}]

This sounds feasible. I just looked at the naive scheduler and I like the design. I can already see how to port my work from the interleaved tool calling, so I do expect to bring that to both the sync and async implementation.

@wuxibin89 I do have a general request for this PR. This would really help me test this more easily if you could create the following. The reason I ask is that veRL can sometimes take 5-10 minutes to initialize.

  • A test that can indepdently launch the AsyncLLM with any sample of multi-turn (e.g. does it run, can it do multi-turn) or a pointer to how to initialize it without the overhead of initializing veRL.

@wuxibin89
Copy link
Collaborator Author

@casper-hansen I add a small multi turn rollout test case, you can run with:

python3 tests/rollout/test_vllm_multi_turn.py

If everything is ok, you should see model outputs as below:

[round=0] role: assistant, content: Sure, I'd be happy to play along! Hello, my name is Bob and my favorite color is indeed red. How can I assist you today? Would you like to embark on an adventure or perhaps solve a mystery together?
(async_llm_worker_2 AsyncLLMWorker pid=235879, ip=[2605:340:cd51:5d00:9f11:18e0:a5f1:de3b]) INFO:     2605:340:cd51:5d00:9f11:18e0:a5f1:de3b:34942 - "POST /v1/chat/completions HTTP/1.1" 200 OK
[round=1] role: assistant, content: As Bob, my name has been established in our previous conversation. I'm here to help you with any questions or scenarios you might have in mind. Just let me know how I can assist you further!
(async_llm_worker_2 AsyncLLMWorker pid=235879, ip=[2605:340:cd51:5d00:9f11:18e0:a5f1:de3b]) INFO:     2605:340:cd51:5d00:9f11:18e0:a5f1:de3b:34954 - "POST /v1/chat/completions HTTP/1.1" 200 OK
[round=2] role: assistant, content: As Bob, my favorite color is red. This preference could influence decisions or preferences in various scenarios, such as choosing items or colors for an adventure setting or discussing color-related topics. How can we incorporate this information into our role-playing game?
Done!
(async_llm_worker_2 AsyncLLMWorker pid=235879, ip=[2605:340:cd51:5d00:9f11:18e0:a5f1:de3b]) INFO:     2605:340:cd51:5d00:9f11:18e0:a5f1:de3b:34956 - "POST /v1/chat/completions HTTP/1.1" 200 OK

@casper-hansen
Copy link
Contributor

Looks good to me! Let's get this merged.

@wuxibin89 wuxibin89 force-pushed the wuxibin/async_vllm branch from 736ca1f to da4222b Compare April 19, 2025 15:47
@KawaiiNotHawaii
Copy link

I switched to your branch and kick off a running with naive reward_manager, but ended with an error:

ValueError: Failed to look up actor with name 'svWrqkWorkerDict_0:0'. This could because 1. You are trying to look up a named actor you didn't create. 2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.

Any quick fix?

@wuxibin89
Copy link
Collaborator Author

I switched to your branch and kick off a running with naive reward_manager, but ended with an error:

ValueError: Failed to look up actor with name 'svWrqkWorkerDict_0:0'. This could because 1. You are trying to look up a named actor you didn't create. 2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.

Any quick fix?

@KawaiiNotHawaii Maybe the actor svWrqkWorkerDict_0:0 died unexpectedly, please check actor log in ray dashboard or /tmp/ray/session_latest/logs/

@mingruimingrui
Copy link
Contributor

mingruimingrui commented Apr 21, 2025

Noob question, will we have query locality to make use of prefix caching?
edit: Not sure if "query locality" is a word but you understand

@wuxibin89 wuxibin89 force-pushed the wuxibin/async_vllm branch from 391b912 to 6357b88 Compare April 21, 2025 12:17
@wuxibin89
Copy link
Collaborator Author

Noob question, will we have query locality to make use of prefix caching? edit: Not sure if "query locality" is a word but you understand

Yes, ChatScheduler sticks multi turn rollout session to specific server by request_id. ChatScheduler automatically generate request_id in first turn, and user should pass extra_headers = {"x-request-id": completions.id} in following turns.
https://github.com/volcengine/verl/pull/1138/files#diff-0897f82aa48563a3aa2d1eb1844599727bde0ab1dd7de27e92b9c36a41be79baR72-R85

@richardliaw
Copy link

Hi @wuxibin89; this is awesome to see; let me know if you need any support from Ray side for this PR.

@casper-hansen
Copy link
Contributor

casper-hansen commented Apr 22, 2025

EDIT: I realise this is a TODO # TODO: completions => DataProto. @wuxibin89 when you do create the DataProto, please keep in mind that we need to have the option to build it during callback.

@wuxibin89 I took a look again this morning and I am wondering how we can handle the attention mask? For example, when tool calling, the information you return should be masked as zeros in the attention mask for improved convergence.
image

@mertunsall
Copy link
Contributor

It would be awesome if we could write a quickstart on how to set up a model for interleaved code execution (perhaps in a dummy environment) - this is the future!

@wuxibin89
Copy link
Collaborator Author

EDIT: I realise this is a TODO # TODO: completions => DataProto. @wuxibin89 when you do create the DataProto, please keep in mind that we need to have the option to build it during callback.

@wuxibin89 I took a look again this morning and I am wondering how we can handle the attention mask? For example, when tool calling, the information you return should be masked as zeros in the attention mask for improved convergence. image

@casper-hansen We should postprocess completions and convert them to DataProto, in postprocess we can mask out tool calling tokens.
https://github.com/volcengine/verl/pull/1138/files#diff-82206035fbba7264fcb965dbe70535bfccbc1a81479aed61fe885489d35a952bR128-R130

@KawaiiNotHawaii
Copy link

KawaiiNotHawaii commented Apr 22, 2025

It seems openai api's retry logic will cause the error: Request id xxx already running.
and I've successfully got rid of the retry logic and thus getting rid of the aforementioned error by setting max retries to 0 and generation timeout longer thru the AsyncOpenAI args.
But I still got some completions returned as None. And this only happens in the second batch while the first batch steadily runs without any None completions returned. Anyone can help?

@scris
Copy link

scris commented Apr 28, 2025

Do we have any examples for reference on this yet? Thanks.

@wuxibin89
Copy link
Collaborator Author

Do we have any examples for reference on this yet? Thanks.

@scris I add a demo chat scheduler in retool paper, #1297 Note it's for reference only, for e2e training, there's more issue to address, such as tool calling token mask.

ScottCTD pushed a commit to ScottCTD/verl that referenced this pull request May 5, 2025
…volcengine#1138)

### Summary
Introduce vLLM AsyncLLM to support multi-turn rollout and volcengine#385 volcengine#398 volcengine#710

### Architecture


![async_llm_arch](https://github.com/user-attachments/assets/e8cd974c-0c26-4d96-9a9e-b71fd85dd32d)



**New Components**:
- AsyncLLMWorker: standalone vllm server instance
  - FastAPI: provide OpenAI-compatible HTTP server
- AsyncLLM: async LLMEngine for online serving, for more details:
[AsyncLLM](vllm-project/vllm#9826),
[LLMEngine](https://docs.vllm.ai/en/latest/design/arch_overview.html#llmengine)
- ExternalRayDistributedExecutor: custom executor backend manages
workers in worker group, it grabs corresponding workers by actor names

- AsyncLLManager: manages a group of vllm server
instances(AsyncLLMWorker)
  - AsyncLLM lifecycle: initialization, wake_up, sleep.
  - FastAPI service discovery

- ChatScheduler: schedule multiple chat completion requests with
multiple server instances
  - Least requests load balance
  - Sticky session with prefix caching
  - Chat completion callback: tools calling

### TODO
- [x] AsyncLLM: intialization/wake_up/sleep
- [x] OpenAI API:  support `/v1/chat/completions`
- [x] RayPPOTrainer integration: replace `generate_sequences` to http
call `/v1/chat/completions`
- [x] GSM8K e2e training
- [ ] Add document

---------

Co-authored-by: shengguangming <[email protected]>
@dawson-chen
Copy link

dawson-chen commented May 8, 2025

@wuxibin89
I added a for loop in tests/rollout/test_vllm_multi_turn.py, but during execution, I encountered an “actor dead” error. Below is the specific error message:

(AsyncvLLMServer pid=584371) instance_id: fd2516db-f46f-4370-8881-648b2a0899b8:6wB9ka:4:3 intializes with external actors: ['6wB9kaWorkerDict_0:6', '6wB9kaWorkerDict_0:7']
(AsyncvLLMServer pid=584371) instance_id: fd2516db-f46f-4370-8881-648b2a0899b8:6wB9ka:4:3 intializes finished.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] EngineCore hit an exception: Traceback (most recent call last):
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 383, in run_engine_core
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     engine_core.run_busy_loop()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 405, in run_busy_loop
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     self._process_engine_step()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 434, in _process_engine_step
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     outputs = self.step_fn()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]               ^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 206, in step
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     output = self.model_executor.execute_model(scheduler_output)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/executor/abstract.py", line 77, in execute_model
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     output = self.collective_rpc("execute_model",
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/vllm_rollout/vllm_async_server.py", line 97, in collective_rpc
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     outputs = ray.get([worker.execute_method.remote(sent_method, *args, **(kwargs or {})) for worker in self.workers])
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     return fn(*args, **kwargs)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]            ^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     return func(*args, **kwargs)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]            ^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 2771, in get
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 921, in get_objects
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     raise value
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         class_name: create_colocated_worker_cls.<locals>.WorkerDict
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         actor_id: 4147152ba99be6826521490896000000
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         pid: 582583
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         name: 6wB9kaWorkerDict_0:6
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         namespace: fd2516db-f46f-4370-8881-648b2a0899b8
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         ip: 29.76.21.254
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] The actor is dead because all references to the actor were removed.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] 
(WorkerDict pid=582583) [DP=3,TP=0] execute_method: wake_up [repeated 3x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff5de9d8b633f6b30b74e8a68496000000 Worker ID: 226f493dbca5115045f0cf93e8fd587854383c223b0052d1389f2ec7 Node ID: aca6253a4b6bbed7da56f99bf1be18458e85bfee85404f2cc57b26d6 Worker IP address: 29.76.21.254 Worker port: 10992 Worker PID: 584371 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.

code

for _ in range(100):
        raw_prompts = [
            [
                {
                    "role": "user",
                    "content": "Let's play a role playing game. Your name is Alice, your favorite color is blue.",
                }
            ],
            [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}]
        ]
        batch = DataProto(
            non_tensor_batch={
                "raw_prompt": np.array(raw_prompts),
            },
        )
        result = async_rollout_manager.generate_sequences(prompts=batch)
        seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1)
        assert len(result) == 2
        assert result.batch["input_ids"].size(1) == seq_len
        assert result.batch["attention_mask"].size(1) == seq_len
        assert result.batch["position_ids"].size(1) == seq_len

Could you provide some debugging suggestions?

@wuxibin89
Copy link
Collaborator Author

@dawson-chen

The actor is dead because all references to the actor were removed

It seems that worker_group is not hold and causes ray actors being garbage collected, please post the full test function.

@dawson-chen
Copy link

dawson-chen commented May 8, 2025

@dawson-chen

The actor is dead because all references to the actor were removed

It seems that worker_group is not hold and causes ray actors being garbage collected, please post the full test function.

Hello @wuxibin89
I run the test code in a machine with H20*8.

Here is the full test code.

# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
from typing import Any, Dict

import numpy as np
import ray
from omegaconf import DictConfig, OmegaConf
from openai.types.chat.chat_completion import ChatCompletion
from vllm.entrypoints.openai.protocol import ChatCompletionRequest, ChatCompletionResponse, ChatCompletionStreamResponse, ErrorResponse

from tests.rollout.async_rollout_utils import init_async_rollout_manager
from verl.protocol import DataProto


def init_config() -> DictConfig:
    config = OmegaConf.load("verl/trainer/config/ppo_trainer.yaml")
    model_path = "Qwen/Qwen2-7B-Instruct"
    config.actor_rollout_ref.model.path = model_path
    config.actor_rollout_ref.rollout.mode = "async"
    config.actor_rollout_ref.rollout.chat_scheduler = "examples.ppo_trainer.naive_chat_scheduler.NaiveChatCompletionScheduler"
    config.actor_rollout_ref.rollout.prompt_length = 4096
    config.actor_rollout_ref.rollout.response_length = 4096
    config.actor_rollout_ref.rollout.disable_log_stats = False
    # test sleep/wake_up with fsdp offload
    config.actor_rollout_ref.actor.fsdp_config.param_offload = True
    config.actor_rollout_ref.actor.fsdp_config.optimizer_offload = True

    return config


def test_vllm_multi_turn(config):
    ray.init(
        runtime_env={
            "env_vars": {
                "TOKENIZERS_PARALLELISM": "true",
                "NCCL_DEBUG": "WARN",
                "VLLM_LOGGING_LEVEL": "WARN",
                "VLLM_USE_V1": "1",
            }
        }
    )

    # =========================== 1. Init rollout manager ===========================
    model_name = "/".join(config.actor_rollout_ref.model.path.split("/")[-2:])
    worker_groups, async_rollout_manager = init_async_rollout_manager(config)

    # test sleep and wake_up
    async_rollout_manager.sleep()
    async_rollout_manager.wake_up()

    async_chat_scheduler = async_rollout_manager.chat_scheduler

    # =========================== 2. Multi turn rollout  ===========================
    async def callback(completions: ChatCompletion, info: Dict[str, Any], exception: Exception):
        assert exception is None, f"exception: {exception}"
        messages, round = info["messages"], info["round"]
        message = completions.choices[0].message
        messages.append({"role": message.role, "content": message.content})
        print(f"[round={round}] role: {message.role}, content: {message.content}")

        extra_headers = {"x-request-id": completions.id}
        if round == 0:
            messages.append({"role": "user", "content": "What is your name?"})
            await async_chat_scheduler.submit_chat_completions(
                callback=callback,
                callback_additional_info={"messages": messages, "round": 1},
                model=model_name,
                messages=messages,
                extra_headers=extra_headers,
            )
        elif round == 1:
            messages.append({"role": "user", "content": "What is your favorite color?"})
            await async_chat_scheduler.submit_chat_completions(
                callback=callback,
                callback_additional_info={"messages": messages, "round": 2},
                model=model_name,
                messages=messages,
                extra_headers=extra_headers,
            )
        else:
            print("Done!")

    messages = [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}]
    async_rollout_manager.submit_chat_completions(
        callback=callback,
        callback_additional_info={"messages": messages, "round": 0},
        model=model_name,
        messages=messages,
    )
    assert len(messages) == 6
    for round, message in enumerate(messages):
        if round % 2 == 0:
            assert message["role"] == "user"
        else:
            assert message["role"] == "assistant"

    # =========================== 3. Generate sequences  ===========================
    for i in range(100):
        print(f'Round {i}')
        raw_prompts = [
            [{"role": "user", "content": "Let's play a role playing game. Your name is Alice, your favorite color is blue."}],
            [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}],
            [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is yellow."}],
            [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is grey."}],
        ]
        batch = DataProto(
            non_tensor_batch={
                "raw_prompt": np.array(raw_prompts),
            },
        )
        result = async_rollout_manager.generate_sequences(prompts=batch)
        seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1)
        assert len(result) == 4
        assert result.batch["input_ids"].size(1) == seq_len
        assert result.batch["attention_mask"].size(1) == seq_len
        assert result.batch["position_ids"].size(1) == seq_len

    ray.shutdown()


async def test_vllm_streaming_response(config):
    ray.init(
        runtime_env={
            "env_vars": {
                "TOKENIZERS_PARALLELISM": "true",
                "NCCL_DEBUG": "WARN",
                "VLLM_LOGGING_LEVEL": "WARN",
                "VLLM_USE_V1": "1",
            }
        }
    )

    model_name = "/".join(config.actor_rollout_ref.model.path.split("/")[-2:])
    worker_groups, async_rollout_manager = init_async_rollout_manager(config)
    async_llm_server = async_rollout_manager.async_llm_servers[0]

    # non-streaming request
    request = ChatCompletionRequest(
        model=model_name,
        messages=[{"role": "user", "content": "What is your name?"}],
        stream=False,
    )
    generator = async_llm_server.chat_completion_generator.remote(request)
    async for ref in generator:
        status_code, data = await ref
        print(f">>>> status_code: {status_code}, {data}")
        data = data[len("data: ") :].rstrip()
        if status_code != 200:
            response = ErrorResponse(**json.loads(data))
        else:
            response = ChatCompletionResponse(**json.loads(data))
            assert response.choices[0].message.role == "assistant"
            assert response.choices[0].message.content is not None

    # streaming request
    request = ChatCompletionRequest(
        model=model_name,
        messages=[{"role": "user", "content": "How are you?"}],
        stream=True,
    )
    generator = async_llm_server.chat_completion_generator.remote(request)
    async for ref in generator:
        status_code, data = await ref
        print(f">>>> status_code: {status_code}, {data}")
        data = data[len("data: ") :].rstrip()
        if status_code != 200:
            response = ErrorResponse(**json.loads(data))
        elif data == "[DONE]":
            break
        else:
            response = ChatCompletionStreamResponse(**json.loads(data))
            assert response.choices[0].delta.role is None or response.choices[0].delta.role == "assistant"
            assert response.choices[0].delta.content is not None

    ray.shutdown()


if __name__ == "__main__":
    config = init_config()
    test_vllm_multi_turn(config)
    asyncio.run(test_vllm_streaming_response(config))

Here is the complete log output.

INFO 05-08 15:28:12 [__init__.py:239] Automatically detected platform cuda.
2025-05-08 15:28:15,165 INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 29.76.21.254:6379...
2025-05-08 15:28:15,181 INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at http://29.76.21.254:8080 
2025-05-08 15:28:15,199 WARNING __init__.py:161 -- DeprecationWarning: `ray.state.available_resources_per_node` is a private attribute and access will be removed in a future Ray version.
colocated worker base class <class 'verl.single_controller.base.worker.Worker'>
bind role actor_rollout method execute_method to class <class 'verl.single_controller.ray.base.create_colocated_worker_cls.<locals>.WorkerDict'>
WARNING:2025-05-08 15:28:16,157:Waiting for register center actor BL14zn_register_center to be ready. Elapsed time: 0 seconds out of 300 seconds.
(WorkerDict pid=727950) Flash Attention 2.0 only supports torch.float16 and torch.bfloat16 dtypes, but the current dype in Qwen2ForCausalLM is torch.float32. You should run training or inference using Automatic Mixed-Precision via the `with torch.autocast(device_type='torch_device'):` decorator, or load the model with the `torch_dtype` argument. Example: `model = AutoModel.from_pretrained("openai/whisper-tiny", attn_implementation="flash_attention_2", torch_dtype=torch.float16)`
(WorkerDict pid=727950) You are attempting to use Flash Attention 2.0 with a model not initialized on GPU. Make sure to move the model to GPU after initializing it on CPU with `model.to('cuda')`.
Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]
(WorkerDict pid=727504) Model config after override: Qwen2Config {
(WorkerDict pid=727504)   "architectures": [
(WorkerDict pid=727504)     "Qwen2ForCausalLM"
(WorkerDict pid=727504)   ],
(WorkerDict pid=727504)   "attention_dropout": 0.0,
(WorkerDict pid=727504)   "eos_token_id": 151645,
(WorkerDict pid=727504)   "hidden_act": "silu",
(WorkerDict pid=727504)   "hidden_size": 3584,
(WorkerDict pid=727504)   "initializer_range": 0.02,
(WorkerDict pid=727504)   "intermediate_size": 18944,
(WorkerDict pid=727504)   "max_position_embeddings": 32768,
(WorkerDict pid=727504)   "max_window_layers": 28,
(WorkerDict pid=727504)   "model_type": "qwen2",
(WorkerDict pid=727504)   "num_attention_heads": 28,
(WorkerDict pid=727504)   "num_hidden_layers": 28,
(WorkerDict pid=727504)   "num_key_value_heads": 4,
(WorkerDict pid=727504)   "pad_token_id": 151643,
(WorkerDict pid=727504)   "rms_norm_eps": 1e-06,
(WorkerDict pid=727504)   "rope_scaling": null,
(WorkerDict pid=727504)   "rope_theta": 1000000.0,
(WorkerDict pid=727504)   "sliding_window": 131072,
(WorkerDict pid=727504)   "tie_word_embeddings": false,
(WorkerDict pid=727504)   "torch_dtype": "bfloat16",
(WorkerDict pid=727504)   "transformers_version": "4.51.3",
(WorkerDict pid=727504)   "use_cache": true,
(WorkerDict pid=727504)   "use_sliding_window": false,
(WorkerDict pid=727504)   "vocab_size": 152064
(WorkerDict pid=727504) }
(WorkerDict pid=727504) 
Loading checkpoint shards:  25%|██▌       | 1/4 [00:03<00:11,  3.73s/it]
(WorkerDict pid=727951) Flash Attention 2.0 only supports torch.float16 and torch.bfloat16 dtypes, but the current dype in Qwen2ForCausalLM is torch.float32. You should run training or inference using Automatic Mixed-Precision via the `with torch.autocast(device_type='torch_device'):` decorator, or load the model with the `torch_dtype` argument. Example: `model = AutoModel.from_pretrained("openai/whisper-tiny", attn_implementation="flash_attention_2", torch_dtype=torch.float16)` [repeated 7x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
(WorkerDict pid=727951) You are attempting to use Flash Attention 2.0 with a model not initialized on GPU. Make sure to move the model to GPU after initializing it on CPU with `model.to('cuda')`. [repeated 7x across cluster]
Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s] [repeated 7x across cluster]
Loading checkpoint shards:  75%|███████▌  | 3/4 [00:09<00:03,  3.08s/it] [repeated 16x across cluster]
Loading checkpoint shards: 100%|██████████| 4/4 [00:12<00:00,  3.11s/it]
(WorkerDict pid=727952) [rank4]:[W508 15:28:47.190073948 ProcessGroupNCCL.cpp:4561] [PG ID 0 PG GUID 0 Rank 4]  using GPU 0 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect. Specify device_ids in barrier() to force use of a particular device, or call init_process_group() with a device_id.
(WorkerDict pid=727504) NCCL version 2.21.5+cuda12.4
Loading checkpoint shards:  75%|███████▌  | 3/4 [00:10<00:03,  3.62s/it] [repeated 7x across cluster]
(WorkerDict pid=727504) Qwen2ForCausalLM contains 7.62B parameters
(WorkerDict pid=727504) wrap_policy: functools.partial(<function _or_policy at 0x7f55a14922a0>, policies=[functools.partial(<function transformer_auto_wrap_policy at 0x7f55a1492160>, transformer_layer_cls={<class 'transformers.models.qwen2.modeling_qwen2.Qwen2DecoderLayer'>})])
(WorkerDict pid=727950) Total steps: -1, num_warmup_steps: 0
(WorkerDict pid=727950) Actor use_remove_padding=False
(WorkerDict pid=727956) wrap_policy: functools.partial(<function _or_policy at 0x7fb0aeeda2a0>, policies=[functools.partial(<function transformer_auto_wrap_policy at 0x7fb0aeeda160>, transformer_layer_cls={<class 'transformers.models.qwen2.modeling_qwen2.Qwen2DecoderLayer'>})]) [repeated 7x across cluster]
(WorkerDict pid=727951) /usr/local/lib64/python3.11/site-packages/torch/distributed/fsdp/fully_sharded_data_parallel.py:690: FutureWarning: FSDP.state_dict_type() and FSDP.set_state_dict_type() are being deprecated. Please use APIs, get_state_dict() and set_state_dict(), which can support different parallelisms, FSDP1, FSDP2, DDP. API doc: https://pytorch.org/docs/stable/distributed.checkpoint.html#torch.distributed.checkpoint.state_dict.get_state_dict .Tutorial: https://pytorch.org/tutorials/recipes/distributed_checkpoint_recipe.html .
(WorkerDict pid=727951)   warnings.warn(
Loading checkpoint shards: 100%|██████████| 4/4 [00:14<00:00,  3.52s/it] [repeated 7x across cluster]
(WorkerDict pid=727956) [rank7]:[W508 15:28:50.868771672 ProcessGroupNCCL.cpp:4561] [PG ID 0 PG GUID 0 Rank 7]  using GPU 0 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect. Specify device_ids in barrier() to force use of a particular device, or call init_process_group() with a device_id. [repeated 7x across cluster]
(AsyncvLLMServer pid=729634) FastAPI startup
(WorkerDict pid=727504) Total steps: -1, num_warmup_steps: 0 [repeated 7x across cluster]
(WorkerDict pid=727956) Actor use_remove_padding=False [repeated 7x across cluster]
(AsyncvLLMServer pid=729634) override_generation_config: {'n': 1, 'logprobs': 0, 'max_tokens': 4096, 'temperature': 1.0, 'top_k': -1, 'top_p': 1, 'ignore_eos': False}
(AsyncvLLMServer pid=729632) WARNING 05-08 15:29:34 [arg_utils.py:1713] Detected VLLM_USE_V1=1 with Engine in background thread. Usage should be considered experimental. Please report any issues on Github.
(AsyncvLLMServer pid=729632) WARNING 05-08 15:29:34 [cuda.py:96] To see benefits of async output processing, enable CUDA graph. Since, enforce-eager is enabled, async output processor cannot be used
(AsyncvLLMServer pid=729635) FastAPI startup [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) override_generation_config: {'n': 1, 'logprobs': 0, 'max_tokens': 4096, 'temperature': 1.0, 'top_k': -1, 'top_p': 1, 'ignore_eos': False} [repeated 3x across cluster]
(AsyncvLLMServer pid=729632) WARNING 05-08 15:29:34 [core_client.py:368] SIGUSR1 handler not installed because we are not running in the main thread. In this case the forked engine process may not be killed when an exception is raised, and you need to handle the engine process shutdown manually.
(AsyncvLLMServer pid=729632) WARNING 05-08 15:29:34 [utils.py:2273] We must use the `spawn` multiprocessing start method. Overriding VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. See https://docs.vllm.ai/en/latest/getting_started/troubleshooting.html#python-multiprocessing for more information. Reason: In a Ray actor and can only be spawned
(AsyncvLLMServer pid=729632) 2025-05-08 15:29:39,928    INFO worker.py:1514 -- Using address 29.76.21.254:6379 set in the environment variable RAY_ADDRESS
(AsyncvLLMServer pid=729632) 2025-05-08 15:29:39,928    INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 29.76.21.254:6379...
(AsyncvLLMServer pid=729632) 2025-05-08 15:29:39,941    INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at http://29.76.21.254:8080 
(WorkerDict pid=727956) /usr/local/lib64/python3.11/site-packages/torch/distributed/fsdp/fully_sharded_data_parallel.py:690: FutureWarning: FSDP.state_dict_type() and FSDP.set_state_dict_type() are being deprecated. Please use APIs, get_state_dict() and set_state_dict(), which can support different parallelisms, FSDP1, FSDP2, DDP. API doc: https://pytorch.org/docs/stable/distributed.checkpoint.html#torch.distributed.checkpoint.state_dict.get_state_dict .Tutorial: https://pytorch.org/tutorials/recipes/distributed_checkpoint_recipe.html . [repeated 7x across cluster]
(WorkerDict pid=727956)   warnings.warn( [repeated 7x across cluster]
(WorkerDict pid=727950) [DP=1,TP=0] execute_method: init_worker
(AsyncvLLMServer pid=729635) WARNING 05-08 15:29:34 [arg_utils.py:1713] Detected VLLM_USE_V1=1 with Engine in background thread. Usage should be considered experimental. Please report any issues on Github. [repeated 3x across cluster]
(AsyncvLLMServer pid=729635) WARNING 05-08 15:29:34 [cuda.py:96] To see benefits of async output processing, enable CUDA graph. Since, enforce-eager is enabled, async output processor cannot be used [repeated 3x across cluster]
(AsyncvLLMServer pid=729635) WARNING 05-08 15:29:35 [core_client.py:368] SIGUSR1 handler not installed because we are not running in the main thread. In this case the forked engine process may not be killed when an exception is raised, and you need to handle the engine process shutdown manually. [repeated 3x across cluster]
(AsyncvLLMServer pid=729635) WARNING 05-08 15:29:35 [utils.py:2273] We must use the `spawn` multiprocessing start method. Overriding VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. See https://docs.vllm.ai/en/latest/getting_started/troubleshooting.html#python-multiprocessing for more information. Reason: In a Ray actor and can only be spawned [repeated 3x across cluster]
(WorkerDict pid=727950) 2025-05-08 15:29:40,136 - INFO - flashinfer.jit: Prebuilt kernels not found, using JIT backend
(WorkerDict pid=727950) WARNING 05-08 15:29:40 [utils.py:2413] Methods determine_num_available_blocks,device_config,get_cache_block_size_bytes,initialize_cache not implemented in <vllm.v1.worker.gpu_worker.Worker object at 0x7f79044267d0>
(WorkerDict pid=727950) [DP=1,TP=0] execute_method: init_device
(WorkerDict pid=727950) NCCL version 2.21.5+cuda12.4
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: load_model
(WorkerDict pid=727952) [DP=2,TP=0] execute_method: init_worker [repeated 3x across cluster]
(WorkerDict pid=727952) WARNING 05-08 15:29:41 [utils.py:2413] Methods determine_num_available_blocks,device_config,get_cache_block_size_bytes,initialize_cache not implemented in <vllm.v1.worker.gpu_worker.Worker object at 0x7f9588089210> [repeated 7x across cluster]
(WorkerDict pid=727952) [DP=2,TP=0] execute_method: init_device [repeated 3x across cluster]
Loading safetensors checkpoint shards:   0% Completed | 0/4 [00:00<?, ?it/s]
(AsyncvLLMServer pid=729635) 2025-05-08 15:29:40,403    INFO worker.py:1514 -- Using address 29.76.21.254:6379 set in the environment variable RAY_ADDRESS [repeated 3x across cluster]
(AsyncvLLMServer pid=729635) 2025-05-08 15:29:40,403    INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 29.76.21.254:6379... [repeated 3x across cluster]
(AsyncvLLMServer pid=729635) 2025-05-08 15:29:40,416    INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at http://29.76.21.254:8080  [repeated 3x across cluster]
(WorkerDict pid=727952) 2025-05-08 15:29:40,615 - INFO - flashinfer.jit: Prebuilt kernels not found, using JIT backend [repeated 7x across cluster]
Loading safetensors checkpoint shards:  25% Completed | 1/4 [00:00<00:01,  2.98it/s]
Loading safetensors checkpoint shards:  50% Completed | 2/4 [00:00<00:00,  2.60it/s]
(WorkerDict pid=727954) NCCL version 2.21.5+cuda12.4 [repeated 2x across cluster]
Loading safetensors checkpoint shards:  75% Completed | 3/4 [00:01<00:00,  2.60it/s]
Loading safetensors checkpoint shards: 100% Completed | 4/4 [00:01<00:00,  2.58it/s]
Loading safetensors checkpoint shards: 100% Completed | 4/4 [00:01<00:00,  2.61it/s]
(WorkerDict pid=727504) 
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: get_kv_cache_spec
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: determine_available_memory
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: initialize_from_config
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: compile_or_warm_up_model
(WorkerDict pid=727952) [DP=2,TP=0] execute_method: load_model [repeated 3x across cluster]
(AsyncvLLMServer pid=729634) WARNING 05-08 15:29:52 [config.py:1088] Default sampling parameters have been overridden by the model's Hugging Face generation config recommended from the model creator. If this is not intended, please relaunch vLLM instance with `--generation-config vllm`.
(WorkerDict pid=727954) [DP=3,TP=0] execute_method: get_kv_cache_spec [repeated 3x across cluster]
(WorkerDict pid=727954) [DP=3,TP=0] execute_method: determine_available_memory [repeated 3x across cluster]
(WorkerDict pid=727952) [DP=2,TP=0] execute_method: initialize_from_config [repeated 2x across cluster]
(WorkerDict pid=727952) [DP=2,TP=0] execute_method: compile_or_warm_up_model [repeated 2x across cluster]
(AsyncvLLMServer pid=729632) WARNING 05-08 15:29:52 [config.py:1088] Default sampling parameters have been overridden by the model's Hugging Face generation config recommended from the model creator. If this is not intended, please relaunch vLLM instance with `--generation-config vllm`.
(AsyncvLLMServer pid=729635) WARNING 05-08 15:30:00 [config.py:1088] Default sampling parameters have been overridden by the model's Hugging Face generation config recommended from the model creator. If this is not intended, please relaunch vLLM instance with `--generation-config vllm`.
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: sleep
(WorkerDict pid=727504) [DP=0,TP=0] execute_method: wake_up
(WorkerDict pid=727954) [DP=3,TP=0] execute_method: initialize_from_config
(WorkerDict pid=727954) [DP=3,TP=0] execute_method: compile_or_warm_up_model
(AsyncvLLMServer pid=729636) WARNING 05-08 15:30:00 [config.py:1088] Default sampling parameters have been overridden by the model's Hugging Face generation config recommended from the model creator. If this is not intended, please relaunch vLLM instance with `--generation-config vllm`.
(WorkerDict pid=727950) [DP=1,TP=0] execute_method: sleep [repeated 3x across cluster]
[round=0] role: assistant, content: Sure, let's play! My character name is Bob, and my favorite color is red. In this game, what kind of setting or scenario would you like to explore? Could be anything from a medieval kingdom to a futuristic city or even a world where everyone can see colors beyond the visible spectrum!
[round=1] role: assistant, content: My name is Bob. It's nice to meet you! What's your name in this game?
[round=2] role: assistant, content: My favorite color is red. How about you? What's your favorite color?
Done!
Round 0
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 1
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 2
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 3
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 4
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 5
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 6
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 7
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 8
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 9
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
[NaiveChatCompletionScheduler] generate_sequences done
Round 10
[NaiveChatCompletionScheduler] generate_sequences sampling params: {'n': 1, 'max_completion_tokens': 4096, 'temperature': 1.0, 'top_p': 1}
(AsyncvLLMServer pid=729634) instance_id: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592:BL14zn:4:0 intializes with external actors: ['BL14znWorkerDict_0:0', 'BL14znWorkerDict_0:1']
(AsyncvLLMServer pid=729634) instance_id: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592:BL14zn:4:0 intializes finished.
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390] EngineCore hit an exception: Traceback (most recent call last):
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 383, in run_engine_core
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     engine_core.run_busy_loop()
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 405, in run_busy_loop
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     self._process_engine_step()
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 434, in _process_engine_step
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     outputs = self.step_fn()
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]               ^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 206, in step
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     output = self.model_executor.execute_model(scheduler_output)
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/executor/abstract.py", line 77, in execute_model
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     output = self.collective_rpc("execute_model",
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/vllm_rollout/vllm_async_server.py", line 97, in collective_rpc
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     outputs = ray.get([worker.execute_method.remote(sent_method, *args, **(kwargs or {})) for worker in self.workers])
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     return fn(*args, **kwargs)
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]            ^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     return func(*args, **kwargs)
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]            ^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 2771, in get
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 921, in get_objects
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]     raise value
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         class_name: create_colocated_worker_cls.<locals>.WorkerDict
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         actor_id: d02803f41a89e3700b2813dada000000
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         pid: 727504
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         name: BL14znWorkerDict_0:0
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         namespace: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390]         ip: 29.76.21.254
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390] The actor is dead because all references to the actor were removed.
(AsyncvLLMServer pid=729634) ERROR 05-08 15:30:47 [core.py:390] 
(WorkerDict pid=727954) [DP=3,TP=0] execute_method: wake_up [repeated 3x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffffe8cb76360aa2779402245466da000000 Worker ID: 151bf392333ee2609bf19b6c2679cfcecfe80025f1c36f027daafe9c Node ID: aca6253a4b6bbed7da56f99bf1be18458e85bfee85404f2cc57b26d6 Worker IP address: 29.76.21.254 Worker port: 11330 Worker PID: 729632 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
Traceback (most recent call last):
  File "/usr/lib64/python3.11/runpy.py", line 198, in _run_module_as_main
    return _run_code(code, main_globals, None,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py", line 71, in <module>
    cli.main()
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 501, in main
    run()
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 351, in run_file
    runpy.run_path(target, run_name="__main__")
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 310, in run_path
    return _run_module_code(code, init_globals, run_name, pkg_name=pkg_name, script_name=fname)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 127, in _run_module_code
    _run_code(code, mod_globals, init_globals, mod_name, mod_spec, pkg_name, script_name)
  File "/root/.local/share/code-server/extensions/ms-python.debugpy-2025.6.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 118, in _run_code
    exec(code, run_globals)
  File "tests/rollout/test_vllm_multi_turn.py", line 195, in <module>
    test_vllm_multi_turn(config)
  File "tests/rollout/test_vllm_multi_turn.py", line 126, in test_vllm_multi_turn
    result = async_rollout_manager.generate_sequences(prompts=batch)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/async_server.py", line 336, in generate_sequences
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/examples/ppo_trainer/naive_chat_scheduler.py", line 85, in generate_sequences
    await asyncio.gather(*tasks)
  File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/async_server.py", line 193, in submit_chat_completions
    await callback(completions, callback_additional_info, exception)
  File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/examples/ppo_trainer/naive_chat_scheduler.py", line 56, in callback
    for choice in completions.choices:
                  ^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'choices'
(AsyncvLLMServer pid=729636) instance_id: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592:BL14zn:4:3 intializes with external actors: ['BL14znWorkerDict_0:6', 'BL14znWorkerDict_0:7'] [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) instance_id: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592:BL14zn:4:3 intializes finished. [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390] EngineCore hit an exception: Traceback (most recent call last): [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 383, in run_engine_core [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     engine_core.run_busy_loop() [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 405, in run_busy_loop [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     self._process_engine_step() [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 434, in _process_engine_step [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     outputs = self.step_fn() [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]               ^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 206, in step [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     output = self.model_executor.execute_model(scheduler_output) [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/executor/abstract.py", line 77, in execute_model [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     output = self.collective_rpc("execute_model", [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/vllm_rollout/vllm_async_server.py", line 97, in collective_rpc [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     outputs = ray.get([worker.execute_method.remote(sent_method, *args, **(kwargs or {})) for worker in self.workers]) [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     return fn(*args, **kwargs) [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]            ^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     return func(*args, **kwargs) [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]            ^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 2771, in get [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 921, in get_objects [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]     raise value [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task. [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         class_name: create_colocated_worker_cls.<locals>.WorkerDict [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         actor_id: b740f3915edbdd9d5a95b5cada000000 [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         pid: 727954 [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         name: BL14znWorkerDict_0:6 [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         namespace: c24d0fd4-a8d8-4b2e-8b5e-c9562a382592 [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]         ip: 29.76.21.254 [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390] The actor is dead because all references to the actor were removed. [repeated 3x across cluster]
(AsyncvLLMServer pid=729636) ERROR 05-08 15:30:47 [core.py:390]  [repeated 3x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffffc2ba0c65522f8b201d4fcc33da000000 Worker ID: 45db1112e86e4ee24adec519ab6eb8f0081992bf1b0244232f44259a Node ID: aca6253a4b6bbed7da56f99bf1be18458e85bfee85404f2cc57b26d6 Worker IP address: 29.76.21.254 Worker port: 11332 Worker PID: 729636 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. [repeated 3x across cluster]

My observation is that the larger the raw_prompts list, the sooner this bug is to occur.

@wwd29
Copy link

wwd29 commented May 8, 2025

How does the actor sync weights to the vllm async server? I‘m not sure if I've missed something. Thanks.

@wuxibin89
Copy link
Collaborator Author

@dawson-chen I find the root cause of this problem. It's a bug in RayWorkerGroup holding weak references to these actors. Fixed in #1443

@wuxibin89
Copy link
Collaborator Author

wuxibin89 commented May 8, 2025

How does the actor sync weights to the vllm async server? I‘m not sure if I've missed something. Thanks.

@wwd29 Weight sync between actor and async server works as follows

  1. AsyncvLLMServer.wake_up call AsyncLLM.wake_up
  2. AsyncLLM.wake_up call vLLMAsyncRollout.execute_method(method="wake_up") by collective_rpc
  3. vLLMAsyncRollout.wake_up call FSDPVLLMShardingManager.__enter__
  4. FSDPVLLMShardingManager.__enter__ sync actor weights in update_params

wuxibin89 added a commit that referenced this pull request May 8, 2025
…1443)

### Checklist Before Starting

- [ ] Search for similar PR(s).

### What does this PR do?

Spawned RayWorkerGroup get actors by name, which holds a weak reference
to the actor and causes actors garbage collected unexpectedly. Pass
actor handle explicitly in spawn to make RayWorkerGroup have strong
reference to these actors. close #1365
#1138 (comment)

### High-Level Design

> Demonstrate the high-level design if this PR is complex.

### Specific Changes

> List the specific changes.

### API

> Demonstrate how the API changes if any.

### Usage Example

> Provide usage example(s) for easier usage.

```python
# Add code snippet or script demonstrating how to use this 
```

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluatuion results, etc.

### Additional Info.

- **Issue Number**: Fixes issue # or discussion # if any.
- **Training**: [Note which backend this PR will affect: FSDP, Megatron,
both, or none]
- **Inference**: [Note which backend this PR will affect: vLLM, SGLang,
both, or none]

### Checklist Before Submitting

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl?tab=readme-ov-file#contribution-guide).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl?tab=readme-ov-file#code-linting-and-formatting).
- [ ] Add `[BREAKING]` to the PR title if it breaks any API.
- [ ] Update the documentation about your changes in the
[docs](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add CI test(s) if neccessary.
@dawson-chen
Copy link

@dawson-chen I find the root cause of this problem. It's a bug in RayWorkerGroup holding weak references to these actors. Fixed in #1443

Awesome man! It works. You are a genius.

Recently, I've been looking over the async-rollout branches of SGLang and vLLM. verl-SGLang’s async is limited to each DP shard, so a long request can block the others. vLLM is fully async: every DP shard runs its own service, the main process fires requests in parallel, and jobs are load-balanced across shards with no waiting.

Is that accurate?

@U-rara
Copy link
Contributor

U-rara commented May 11, 2025

How does the actor sync weights to the vllm async server? I‘m not sure if I've missed something. Thanks.

@wwd29 Weight sync between actor and async server works as follows

  1. AsyncvLLMServer.wake_up call AsyncLLM.wake_up
  2. AsyncLLM.wake_up call vLLMAsyncRollout.execute_method(method="wake_up") by collective_rpc
  3. vLLMAsyncRollout.wake_up call FSDPVLLMShardingManager.__enter__
  4. FSDPVLLMShardingManager.__enter__ sync actor weights in update_params

@wuxibin89 Thank you very much for your fix. However, on the latest main branch I still encounter the following error:
AttributeError: 'NoneType' object has no attribute 'choices'
From my observation, this error seems to occur whenever the generated output length exceeds a larger max_response_length (e.g., 32K). With a smaller max_response_length (e.g., 8K), this error does not occur. When training reasoning models such as Qwen7B-Distill, Qwen8B, etc., it often happens on the very first step. I tried increasing vLLM’s TP size, but the issue still persists (it doesn’t seem related to the KV cache capacity).

@wuxibin89
Copy link
Collaborator Author

@U-rara Maybe there's exception, do you check the exception in callback function?

@U-rara
Copy link
Contributor

U-rara commented May 11, 2025

@wuxibin89 I added a for loop in tests/rollout/test_vllm_multi_turn.py, but during execution, I encountered an “actor dead” error. Below is the specific error message:

(AsyncvLLMServer pid=584371) instance_id: fd2516db-f46f-4370-8881-648b2a0899b8:6wB9ka:4:3 intializes with external actors: ['6wB9kaWorkerDict_0:6', '6wB9kaWorkerDict_0:7']
(AsyncvLLMServer pid=584371) instance_id: fd2516db-f46f-4370-8881-648b2a0899b8:6wB9ka:4:3 intializes finished.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] EngineCore hit an exception: Traceback (most recent call last):
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 383, in run_engine_core
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     engine_core.run_busy_loop()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 405, in run_busy_loop
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     self._process_engine_step()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 434, in _process_engine_step
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     outputs = self.step_fn()
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]               ^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/engine/core.py", line 206, in step
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     output = self.model_executor.execute_model(scheduler_output)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/vllm/v1/executor/abstract.py", line 77, in execute_model
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     output = self.collective_rpc("execute_model",
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/apdcephfs_nj3/share_303326228/chriszxiong/refuge/daoyi/verl/verl/workers/rollout/vllm_rollout/vllm_async_server.py", line 97, in collective_rpc
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     outputs = ray.get([worker.execute_method.remote(sent_method, *args, **(kwargs or {})) for worker in self.workers])
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     return fn(*args, **kwargs)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]            ^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     return func(*args, **kwargs)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]            ^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 2771, in get
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]   File "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py", line 921, in get_objects
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]     raise value
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         class_name: create_colocated_worker_cls.<locals>.WorkerDict
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         actor_id: 4147152ba99be6826521490896000000
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         pid: 582583
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         name: 6wB9kaWorkerDict_0:6
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         namespace: fd2516db-f46f-4370-8881-648b2a0899b8
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390]         ip: 29.76.21.254
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] The actor is dead because all references to the actor were removed.
(AsyncvLLMServer pid=584371) ERROR 05-08 09:02:35 [core.py:390] 
(WorkerDict pid=582583) [DP=3,TP=0] execute_method: wake_up [repeated 3x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff5de9d8b633f6b30b74e8a68496000000 Worker ID: 226f493dbca5115045f0cf93e8fd587854383c223b0052d1389f2ec7 Node ID: aca6253a4b6bbed7da56f99bf1be18458e85bfee85404f2cc57b26d6 Worker IP address: 29.76.21.254 Worker port: 10992 Worker PID: 584371 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.

code

for _ in range(100):
        raw_prompts = [
            [
                {
                    "role": "user",
                    "content": "Let's play a role playing game. Your name is Alice, your favorite color is blue.",
                }
            ],
            [{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}]
        ]
        batch = DataProto(
            non_tensor_batch={
                "raw_prompt": np.array(raw_prompts),
            },
        )
        result = async_rollout_manager.generate_sequences(prompts=batch)
        seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1)
        assert len(result) == 2
        assert result.batch["input_ids"].size(1) == seq_len
        assert result.batch["attention_mask"].size(1) == seq_len
        assert result.batch["position_ids"].size(1) == seq_len

Could you provide some debugging suggestions?

@U-rara Maybe there's exception, do you check the exception in callback function?

@wuxibin89 Thank you for your reply. My error is exactly the same as his: the completions passed to the callback are None, and there are no other error messages. Since everything works fine at shorter generation lengths (e.g. 8K), but this error only occurs when the length reaches 16K–32K, I suspect the issue originates from vLLM; however, because it’s in an asynchronous environment, the exception isn’t being caught.

@casper-hansen
Copy link
Contributor

casper-hansen commented May 11, 2025

@wuxibin89 @U-rara I had this error too. It occurs because there is no check in callback to check 1) does adding the current completion exceed completion length, and 2) adjustment of max_completion_tokens (you have to decrease for each round).

My vLLM did throw an error. I don't have the full log, but it's something like the max length has been exceeded and then the actor dies like you see above in the error log.

@wuxibin89
Copy link
Collaborator Author

@U-rara After #1443 , this exception should not happen

The actor is dead because all references to the actor were removed.

If any exception happened in OpenAI client call, the exception should be passed to callback
https://github.com/volcengine/verl/blob/main/verl/workers/rollout/async_server.py#L185-L193

Please check it in your callback:

async def callback(completions: ChatCompletion, info: Dict[str, Any], exception: Exception):
    assert exception is None, f"generate_sequences failed: {exception}"
    ...

@U-rara
Copy link
Contributor

U-rara commented May 11, 2025

@U-rara After #1443 , this exception should not happen

The actor is dead because all references to the actor were removed.

If any exception happened in OpenAI client call, the exception should be passed to callback https://github.com/volcengine/verl/blob/main/verl/workers/rollout/async_server.py#L185-L193

Please check it in your callback:

async def callback(completions: ChatCompletion, info: Dict[str, Any], exception: Exception):
    assert exception is None, f"generate_sequences failed: {exception}"
    ...

@wuxibin89 @casper-hansen
Thank you for the tip. After some experiments, I discovered the root cause of the bug and how to fix it.

First, with a 32K inference length, I found that when completions is None I got this error:

AssertionError: generate_sequences failed: Error code: 400 - {
  'object': 'error',
  'message': 'Request id chatcmpl-b22c3bb0a2f740f5b181412975b47e92 already running.',
  'type': 'BadRequestError',
  'param': None,
  'code': 400
}

Upon investigation, I realized this error comes from the default two retries in:

client = AsyncOpenAI(
base_url=f"http://{address}/v1",
api_key="token-abc123",
)

When I set max_retries=0, the true error appeared:

AssertionError: generate_sequences failed: Request timed out.

I suspect that because we didn’t explicitly set a timeout for AsyncOpenAI, it falls back to HTTPX’s default of 600 s, which isn’t enough for long-running inferences. So I tried disabling the timeout explicitly:

client = AsyncOpenAI(
    base_url=f"http://{address}/v1",
    api_key="token-abc123",
    timeout=None,
    max_retries=0
)

After testing, everything worked as expected. May I open a PR for this change? Considering this change seems to have been discussed before, I’m not sure whether it’s harmless.

Please let me know if you have any other suggestions.

@wuxibin89
Copy link
Collaborator Author

@U-rara Feel free to open a PR.

wuxibin89 pushed a commit that referenced this pull request May 12, 2025
…lout (#1483)

### Checklist Before Starting

- [x] Search for similar PR(s).

### What does this PR do?

In Async rollout, `AsyncOpenAI` has a default 600-second timeout, which
can lead to timeouts during longer inference. See details at
#1138 (comment).

### High-Level Design

See details at
#1138 (comment).

### Specific Changes

See details at
#1138 (comment).

### API

> Demonstrate how the API changes if any.

### Usage Example

> Provide usage example(s) for easier usage.

```python
# Add code snippet or script demonstrating how to use this 
```

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluatuion results, etc.

### Additional Info.

- **Issue Number**: Fixes issue # or discussion # if any.
- **Training**: [Note which backend this PR will affect: FSDP, Megatron,
both, or none]
- **Inference**: [Note which backend this PR will affect: vLLM, SGLang,
both, or none]

### Checklist Before Submitting

- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl?tab=readme-ov-file#contribution-guide).
- [x] Apply [pre-commit
checks](https://github.com/volcengine/verl?tab=readme-ov-file#code-linting-and-formatting).
- [ ] Add `[BREAKING]` to the PR title if it breaks any API.
- [ ] Update the documentation about your changes in the
[docs](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add CI test(s) if neccessary.
@mertunsall
Copy link
Contributor

Does async rollout give the same performance as using sync one, i.e. if I keep everything else the same and use async rollout with one-turn chat scheduler, should I get the same performance?

def init_worker(self, all_kwargs: List[Dict[str, Any]]):
"""Initialize worker engine."""
all_kwargs[0]["rank"] = int(os.environ["RANK"])
all_kwargs[0]["local_rank"] = 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why local_rank is zero? 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.