forked from openvinotoolkit/openvino_notebooks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathov_embedding_model.py
137 lines (116 loc) · 4.5 KB
/
ov_embedding_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from langchain.pydantic_v1 import BaseModel, Extra, Field
from langchain.schema.embeddings import Embeddings
from typing import Optional, Union, Dict, Tuple, Any, List
from sklearn.preprocessing import normalize
from transformers import AutoTokenizer
from pathlib import Path
import openvino as ov
import torch
import numpy as np
class OVEmbeddings(BaseModel, Embeddings):
"""
LangChain compatible model wrapper for embedding model
"""
model: Any #: :meta private:
"""LLM Transformers model."""
model_kwargs: Optional[dict] = None
"""OpenVINO model configurations."""
tokenizer: Any #: :meta private:
"""Huggingface tokenizer model."""
do_norm: bool
"""Whether normlizing the output of model"""
num_stream: int
"""Number of stream."""
encode_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Keyword arguments to pass when calling the `encode` method of the model."""
@classmethod
def from_model_id(
cls,
model_id: str,
do_norm: bool,
ov_config: Optional[dict],
model_kwargs: Optional[dict],
**kwargs: Any,
):
_model_kwargs = model_kwargs or {}
_ov_config = ov_config or {}
tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs)
core = ov.Core()
model_path = Path(model_id) / "openvino_model.xml"
model = core.compile_model(model_path, **_ov_config)
num_stream = model.get_property('NUM_STREAMS')
return cls(
model=model,
tokenizer=tokenizer,
do_norm = do_norm,
num_stream=num_stream,
**kwargs,
)
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def _text_length(self, text: Union[List[int], List[List[int]]]):
"""
Help function to get the length for the input text. Text can be either
a list of ints (which means a single text as input), or a tuple of list of ints
(representing several text inputs to the model).
"""
if isinstance(text, dict): # {key: value} case
return len(next(iter(text.values())))
elif not hasattr(text, '__len__'): # Object has no len() method
return 1
# Empty string or list of ints
elif len(text) == 0 or isinstance(text[0], int):
return len(text)
else:
# Sum of length of individual strings
return sum([len(t) for t in text])
def encode(self, sentences: Union[str, List[str]]):
"""
Computes sentence embeddings
Args:
sentences: the sentences to embed
Returns:
By default, a list of tensors is returned.
"""
all_embeddings = []
length_sorted_idx = np.argsort(
[-self._text_length(sen) for sen in sentences])
sentences_sorted = [sentences[idx] for idx in length_sorted_idx]
nireq = self.num_stream + 1
infer_queue = ov.AsyncInferQueue(self.model, nireq)
def postprocess(request, userdata):
embeddings = request.get_output_tensor(0).data
embeddings = np.mean(embeddings, axis=1)
if self.do_norm:
embeddings = normalize(embeddings, 'l2')
all_embeddings.extend(embeddings)
infer_queue.set_callback(postprocess)
for i, sentence in enumerate(sentences_sorted):
inputs = {}
features = self.tokenizer(
sentence, padding=True, truncation=True, return_tensors='np')
for key in features:
inputs[key] = features[key]
infer_queue.start_async(inputs, i)
infer_queue.wait_all()
all_embeddings = np.asarray(all_embeddings)
return all_embeddings
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Compute doc embeddings using a HuggingFace transformer model.
Args:
texts: The list of texts to embed.
Returns:
List of embeddings, one for each text.
"""
texts = list(map(lambda x: x.replace("\n", " "), texts))
embeddings = self.encode(texts, **self.encode_kwargs)
return embeddings.tolist()
def embed_query(self, text: str) -> List[float]:
"""Compute query embeddings using a HuggingFace transformer model.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
return self.embed_documents([text])[0]