class MistralTokenizer(TokenizerBase):
def __init__(self, tokenizer: "TransformersMistralTokenizer") -> None:
from mistral_common.tokens.tokenizers.sentencepiece import (
SentencePieceTokenizer,
)
from mistral_common.tokens.tokenizers.tekken import Tekkenizer
self.transformers_tokenizer = tokenizer
self.mistral = tokenizer.tokenizer
self.instruct = self.mistral.instruct_tokenizer
self.tokenizer = self.instruct.tokenizer
_mistral_version_str = str(self.tokenizer.version.value)
self.version: int = int(_mistral_version_str.split("v")[-1])
self.is_tekken = isinstance(self.tokenizer, Tekkenizer)
self.is_spm = isinstance(self.tokenizer, SentencePieceTokenizer)
if not (self.is_tekken or self.is_spm):
raise TypeError(f"Unsupported tokenizer: {type(self.tokenizer)}")
# Reverse order to ensure that the lowest token id is kept.
self._vocab_dict = {
self.convert_ids_to_tokens([i], skip_special_tokens=False)[0]: i
for i in range(self.vocab_size - 1, -1, -1)
}
# Sort the dict for convenience
self._vocab_dict = dict(sorted(self._vocab_dict.items(), key=lambda x: x[1]))
# Vocab sorted by token id.
self._vocab = self.tokenizer._vocab
self._max_token_id = self.vocab_size - 1
@classmethod
def from_pretrained(
cls, path_or_repo_id: str, *, revision: Optional[str] = None
) -> "MistralTokenizer":
from transformers.tokenization_mistral_common import (
MistralCommonTokenizer as TransformersMistralTokenizer,
)
str_revision = "main" if revision is None else revision
return cls(
TransformersMistralTokenizer.from_pretrained(
path_or_repo_id, revision=str_revision
)
)
# the following attributes are set to fit vLLM's design and are used
# by the structured output backends.
@property
def all_special_tokens_extended(self) -> list[str]:
return self.all_special_tokens
@property
def all_special_tokens(self) -> list[str]:
from mistral_common.tokens.tokenizers.base import SpecialTokenPolicy
return [
self.tokenizer.decode([i], special_token_policy=SpecialTokenPolicy.KEEP)
for i in self.all_special_ids
]
@property
def all_special_ids(self) -> list[int]:
from mistral_common.tokens.tokenizers.sentencepiece import (
SentencePieceTokenizer,
)
from mistral_common.tokens.tokenizers.tekken import Tekkenizer
if self.is_tekken:
assert isinstance(self.tokenizer, Tekkenizer), type(self.tokenizer)
special_ids = {t["rank"] for t in self.tokenizer._all_special_tokens}
elif self.is_spm:
assert isinstance(self.tokenizer, SentencePieceTokenizer), type(
self.tokenizer
)
special_ids = self.tokenizer._control_tokens
else:
raise ValueError(f"Unknown tokenizer type: {type(self.tokenizer)}")
return sorted(special_ids)
@property
def bos_token_id(self) -> int:
return self.tokenizer.bos_id
@property
def eos_token_id(self) -> int:
return self.tokenizer.eos_id
@property
def sep_token(self) -> str:
raise NotImplementedError()
@property
def pad_token(self) -> str:
return self.transformers_tokenizer.pad_token
@property
def is_fast(self) -> bool:
return True
@property
def vocab_size(self) -> int:
return self.transformers_tokenizer.vocab_size
@property
def max_token_id(self) -> int:
return self._max_token_id
@property
def truncation_side(self) -> str:
raise NotImplementedError()
def _is_special_token_id(self, token_id: int) -> bool:
from mistral_common.tokens.tokenizers.sentencepiece import (
SentencePieceTokenizer,
)
from mistral_common.tokens.tokenizers.tekken import Tekkenizer
if self.is_spm:
assert isinstance(self.tokenizer, SentencePieceTokenizer), type(
self.tokenizer
)
return token_id in self.tokenizer._control_tokens
if self.is_tekken:
assert isinstance(self.tokenizer, Tekkenizer), type(self.tokenizer)
return token_id < self.tokenizer.num_special_tokens
else:
raise ValueError(f"Unknown tokenizer type: {type(self.tokenizer)}")
def __len__(self) -> int:
return self.vocab_size
def __call__(
self,
text: Union[str, list[str], list[int]],
text_pair: Optional[str] = None,
add_special_tokens: bool = False,
truncation: bool = False,
max_length: Optional[int] = None,
):
return self.transformers_tokenizer(
text=text,
text_pair=text_pair,
add_special_tokens=add_special_tokens,
truncation=truncation,
max_length=max_length,
)
@property
def vocab(self) -> list[str]:
return self._vocab
def get_vocab(self) -> dict[str, int]:
return self._vocab_dict
def get_added_vocab(self) -> dict[str, int]:
# Mistral tokenizers have no added vocabulary
return {}
def encode_one(
self,
text: str,
truncation: bool = False,
max_length: Optional[int] = None,
) -> list[int]:
# Mistral Tokenizers should not add special tokens
return self.transformers_tokenizer.encode(
text, add_special_tokens=False, truncation=truncation, max_length=max_length
)
def encode(
self,
text: str,
truncation: Optional[bool] = None,
max_length: Optional[int] = None,
add_special_tokens: Optional[bool] = None,
) -> list[int]:
if add_special_tokens is not None:
return self.transformers_tokenizer.encode(
text,
truncation=truncation,
max_length=max_length,
add_special_tokens=add_special_tokens,
)
else:
encoded = self.tokenizer.encode(text, bos=True, eos=False)
if truncation is not False and max_length is not None:
return encoded[:max_length]
else:
return encoded
def apply_chat_template(
self,
messages: list["ChatCompletionMessageParam"],
tools: Optional[list[dict[str, Any]]] = None,
**kwargs,
) -> list[int]:
add_generation_prompt = kwargs.pop("add_generation_prompt", False)
continue_final_message = kwargs.get("continue_final_message", False)
padding = kwargs.get("padding", False)
truncation = kwargs.get("truncation", False)
max_length = kwargs.get("max_length")
messages, tools = _prepare_apply_chat_template_tools_and_messages(
messages, tools, continue_final_message, add_generation_prompt
)
return self.transformers_tokenizer.apply_chat_template(
conversation=messages,
tools=tools,
continue_final_message=continue_final_message,
tokenize=True,
padding=padding,
truncation=truncation,
max_length=max_length,
return_tensors=None,
return_dict=False,
)
def decode(
self, ids: Union[list[int], int], skip_special_tokens: bool = True
) -> str:
return self.transformers_tokenizer.decode(
ids, skip_special_tokens=skip_special_tokens
)
def convert_tokens_to_string(self, tokens: list[str]) -> str:
from mistral_common.tokens.tokenizers.base import (
SpecialTokenPolicy,
SpecialTokens,
)
from mistral_common.tokens.tokenizers.sentencepiece import (
SentencePieceTokenizer,
)
from mistral_common.tokens.tokenizers.tekken import Tekkenizer
to_decode_special_tokens = {SpecialTokens.tool_calls}
if self.is_tekken:
assert isinstance(self.tokenizer, Tekkenizer), type(self.tokenizer)
tokens = [
t
for t in tokens
if (t in to_decode_special_tokens or t not in self.all_special_tokens)
]
if any(isinstance(t, bytes) for t in tokens):
# we need to encode and decode all tokens again
ids = [_tekken_token_to_id(self.tokenizer, t) for t in tokens]
# We filtered unwanted special tokens before
# so we can decode the rest.
decoded = self.tokenizer.decode(ids, SpecialTokenPolicy.KEEP)
else:
decoded = "".join(tokens)
else:
# make sure certain special tokens like Tool calls are
# not decoded
assert isinstance(self.tokenizer, SentencePieceTokenizer), type(
self.tokenizer
)
regular_tokens: list[str] = []
decoded_list: list[str] = []
decoded = ""
for token in tokens:
if token in to_decode_special_tokens:
if regular_tokens:
decoded_list.append(
self.tokenizer.decode(
regular_tokens, SpecialTokenPolicy.IGNORE
)
)
regular_tokens = []
decoded_list.append(token)
else:
regular_tokens.append(token)
if regular_tokens:
decoded_list.append(
self.tokenizer.decode(regular_tokens, SpecialTokenPolicy.IGNORE)
)
decoded = "".join(decoded_list)
return decoded
def convert_ids_to_tokens(
self,
ids: list[int],
skip_special_tokens: bool = True,
) -> list[str]:
from mistral_common.tokens.tokenizers.base import (
SpecialTokenPolicy,
SpecialTokens,
)
from mistral_common.tokens.tokenizers.instruct import InstructTokenizerV13
if not skip_special_tokens:
return [self.tokenizer.id_to_piece(token_id) for token_id in ids]
non_skip_special_tokens_ids = {
self.tokenizer.get_control_token(SpecialTokens.tool_calls),
}
if isinstance(self.instruct, InstructTokenizerV13):
if self.instruct.BEGIN_THINK:
non_skip_special_tokens_ids.add(self.instruct.BEGIN_THINK)
if self.instruct.END_THINK:
non_skip_special_tokens_ids.add(self.instruct.END_THINK)
ids_kept = [
i
for i in ids
if i in non_skip_special_tokens_ids or not self._is_special_token_id(i)
]
# We filtered unwanted special tokens so we can decode the rest.
tokens = [self.tokenizer.id_to_piece(token_id) for token_id in ids_kept]
if any("�" in t for t in tokens) and self.is_tekken:
# if a decoded token contains the replacement character, then the
# token has an incomplete UTF-8 character so we must use bytes
# See: https://github.com/vllm-project/vllm/pull/8640
# https://github.com/vllm-project/vllm/pull/9625
# if underlying tokenizer is sentencepiece, we just add "�".
# We filtered unwanted special tokens so we can decode the rest.
tokens = [
self.tokenizer.id_to_byte_piece(token_id, SpecialTokenPolicy.KEEP)
if token_id not in self.all_special_ids
else self.tokenizer.decode([token_id], SpecialTokenPolicy.KEEP)
for token_id in ids_kept
]
return tokens