Skip to content

vllm.engine.protocol

logger module-attribute

logger = init_logger(__name__)

EngineClient

Bases: ABC

Protocol class for Clients to Engine

Source code in vllm/engine/protocol.py
class EngineClient(ABC):
    """Protocol class for Clients to Engine"""

    vllm_config: VllmConfig
    model_config: ModelConfig
    processor: Processor
    io_processor: Optional[IOProcessor]

    @property
    @abstractmethod
    def is_running(self) -> bool: ...

    @property
    @abstractmethod
    def is_stopped(self) -> bool: ...

    @property
    @abstractmethod
    def errored(self) -> bool: ...

    @property
    @abstractmethod
    def dead_error(self) -> BaseException: ...

    @abstractmethod
    def generate(
        self,
        prompt: Union[EngineCoreRequest, PromptType],
        sampling_params: SamplingParams,
        request_id: str,
        *,
        prompt_text: Optional[str] = None,
        lora_request: Optional[LoRARequest] = None,
        tokenization_kwargs: Optional[dict[str, Any]] = None,
        trace_headers: Optional[Mapping[str, str]] = None,
        priority: int = 0,
        data_parallel_rank: Optional[int] = None,
    ) -> AsyncGenerator[RequestOutput, None]:
        """Generate outputs for a request."""
        ...

    @abstractmethod
    def encode(
        self,
        prompt: PromptType,
        pooling_params: PoolingParams,
        request_id: str,
        lora_request: Optional[LoRARequest] = None,
        trace_headers: Optional[Mapping[str, str]] = None,
        priority: int = 0,
        tokenization_kwargs: Optional[dict[str, Any]] = None,
    ) -> AsyncGenerator[PoolingRequestOutput, None]:
        """Generate outputs for a request from a pooling model."""
        ...

    @abstractmethod
    async def abort(self, request_id: Union[str, Iterable[str]]) -> None:
        """Abort a request.

        Args:
            request_id: The unique id of the request,
                        or an iterable of such ids.
        """
        ...

    @abstractmethod
    async def get_tokenizer(self) -> AnyTokenizer:
        """Get the tokenizer"""
        ...

    @abstractmethod
    async def is_tracing_enabled(self) -> bool: ...

    @abstractmethod
    async def do_log_stats(self) -> None: ...

    @abstractmethod
    async def check_health(self) -> None:
        """Raise if unhealthy"""
        ...

    @abstractmethod
    async def start_profile(self) -> None:
        """Start profiling the engine"""
        ...

    @abstractmethod
    async def stop_profile(self) -> None:
        """Start profiling the engine"""
        ...

    @abstractmethod
    async def reset_mm_cache(self) -> None:
        """Reset the multi-modal cache"""
        ...

    @abstractmethod
    async def reset_prefix_cache(self, device: Optional[Device] = None) -> None:
        """Reset the prefix cache"""
        ...

    @abstractmethod
    async def sleep(self, level: int = 1) -> None:
        """Sleep the engine"""
        ...

    @abstractmethod
    async def wake_up(self, tags: Optional[list[str]] = None) -> None:
        """Wake up the engine"""
        ...

    @abstractmethod
    async def is_sleeping(self) -> bool:
        """Check whether the engine is sleeping"""
        ...

    @abstractmethod
    async def add_lora(self, lora_request: LoRARequest) -> bool:
        """Load a new LoRA adapter into the engine for future requests."""
        ...

    async def scale_elastic_ep(
        self, new_data_parallel_size: int, drain_timeout: int = 300
    ) -> None:
        """Scale the engine"""
        raise NotImplementedError

    async def collective_rpc(
        self,
        method: str,
        timeout: Optional[float] = None,
        args: tuple = (),
        kwargs: Optional[dict] = None,
    ):
        """Perform a collective RPC call to the given path."""
        raise NotImplementedError

    async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
        """Get supported tasks"""
        raise NotImplementedError

dead_error abstractmethod property

dead_error: BaseException

errored abstractmethod property

errored: bool

io_processor instance-attribute

io_processor: Optional[IOProcessor]

is_running abstractmethod property

is_running: bool

is_stopped abstractmethod property

is_stopped: bool

model_config instance-attribute

model_config: ModelConfig

processor instance-attribute

processor: Processor

vllm_config instance-attribute

vllm_config: VllmConfig

abort abstractmethod async

abort(request_id: Union[str, Iterable[str]]) -> None

Abort a request.

Parameters:

Name Type Description Default
request_id Union[str, Iterable[str]]

The unique id of the request, or an iterable of such ids.

required
Source code in vllm/engine/protocol.py
@abstractmethod
async def abort(self, request_id: Union[str, Iterable[str]]) -> None:
    """Abort a request.

    Args:
        request_id: The unique id of the request,
                    or an iterable of such ids.
    """
    ...

add_lora abstractmethod async

add_lora(lora_request: LoRARequest) -> bool

Load a new LoRA adapter into the engine for future requests.

Source code in vllm/engine/protocol.py
@abstractmethod
async def add_lora(self, lora_request: LoRARequest) -> bool:
    """Load a new LoRA adapter into the engine for future requests."""
    ...

check_health abstractmethod async

check_health() -> None

Raise if unhealthy

Source code in vllm/engine/protocol.py
@abstractmethod
async def check_health(self) -> None:
    """Raise if unhealthy"""
    ...

collective_rpc async

collective_rpc(
    method: str,
    timeout: Optional[float] = None,
    args: tuple = (),
    kwargs: Optional[dict] = None,
)

Perform a collective RPC call to the given path.

Source code in vllm/engine/protocol.py
async def collective_rpc(
    self,
    method: str,
    timeout: Optional[float] = None,
    args: tuple = (),
    kwargs: Optional[dict] = None,
):
    """Perform a collective RPC call to the given path."""
    raise NotImplementedError

do_log_stats abstractmethod async

do_log_stats() -> None
Source code in vllm/engine/protocol.py
@abstractmethod
async def do_log_stats(self) -> None: ...

encode abstractmethod

encode(
    prompt: PromptType,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: Optional[LoRARequest] = None,
    trace_headers: Optional[Mapping[str, str]] = None,
    priority: int = 0,
    tokenization_kwargs: Optional[dict[str, Any]] = None,
) -> AsyncGenerator[PoolingRequestOutput, None]

Generate outputs for a request from a pooling model.

Source code in vllm/engine/protocol.py
@abstractmethod
def encode(
    self,
    prompt: PromptType,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: Optional[LoRARequest] = None,
    trace_headers: Optional[Mapping[str, str]] = None,
    priority: int = 0,
    tokenization_kwargs: Optional[dict[str, Any]] = None,
) -> AsyncGenerator[PoolingRequestOutput, None]:
    """Generate outputs for a request from a pooling model."""
    ...

generate abstractmethod

generate(
    prompt: Union[EngineCoreRequest, PromptType],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: Optional[str] = None,
    lora_request: Optional[LoRARequest] = None,
    tokenization_kwargs: Optional[dict[str, Any]] = None,
    trace_headers: Optional[Mapping[str, str]] = None,
    priority: int = 0,
    data_parallel_rank: Optional[int] = None,
) -> AsyncGenerator[RequestOutput, None]

Generate outputs for a request.

Source code in vllm/engine/protocol.py
@abstractmethod
def generate(
    self,
    prompt: Union[EngineCoreRequest, PromptType],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: Optional[str] = None,
    lora_request: Optional[LoRARequest] = None,
    tokenization_kwargs: Optional[dict[str, Any]] = None,
    trace_headers: Optional[Mapping[str, str]] = None,
    priority: int = 0,
    data_parallel_rank: Optional[int] = None,
) -> AsyncGenerator[RequestOutput, None]:
    """Generate outputs for a request."""
    ...

get_supported_tasks async

get_supported_tasks() -> tuple[SupportedTask, ...]

Get supported tasks

Source code in vllm/engine/protocol.py
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
    """Get supported tasks"""
    raise NotImplementedError

get_tokenizer abstractmethod async

get_tokenizer() -> AnyTokenizer

Get the tokenizer

Source code in vllm/engine/protocol.py
@abstractmethod
async def get_tokenizer(self) -> AnyTokenizer:
    """Get the tokenizer"""
    ...

is_sleeping abstractmethod async

is_sleeping() -> bool

Check whether the engine is sleeping

Source code in vllm/engine/protocol.py
@abstractmethod
async def is_sleeping(self) -> bool:
    """Check whether the engine is sleeping"""
    ...

is_tracing_enabled abstractmethod async

is_tracing_enabled() -> bool
Source code in vllm/engine/protocol.py
@abstractmethod
async def is_tracing_enabled(self) -> bool: ...

reset_mm_cache abstractmethod async

reset_mm_cache() -> None

Reset the multi-modal cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_mm_cache(self) -> None:
    """Reset the multi-modal cache"""
    ...

reset_prefix_cache abstractmethod async

reset_prefix_cache(device: Optional[Device] = None) -> None

Reset the prefix cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_prefix_cache(self, device: Optional[Device] = None) -> None:
    """Reset the prefix cache"""
    ...

scale_elastic_ep async

scale_elastic_ep(
    new_data_parallel_size: int, drain_timeout: int = 300
) -> None

Scale the engine

Source code in vllm/engine/protocol.py
async def scale_elastic_ep(
    self, new_data_parallel_size: int, drain_timeout: int = 300
) -> None:
    """Scale the engine"""
    raise NotImplementedError

sleep abstractmethod async

sleep(level: int = 1) -> None

Sleep the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def sleep(self, level: int = 1) -> None:
    """Sleep the engine"""
    ...

start_profile abstractmethod async

start_profile() -> None

Start profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def start_profile(self) -> None:
    """Start profiling the engine"""
    ...

stop_profile abstractmethod async

stop_profile() -> None

Start profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def stop_profile(self) -> None:
    """Start profiling the engine"""
    ...

wake_up abstractmethod async

wake_up(tags: Optional[list[str]] = None) -> None

Wake up the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def wake_up(self, tags: Optional[list[str]] = None) -> None:
    """Wake up the engine"""
    ...