Skip to content

Evaluator

evals_hub.evaluator.classification_eval

ClassificationEvaluator

Source code in evals_hub/evaluator/classification_eval.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class ClassificationEvaluator:
    _clf: LogisticRegression

    def __init__(
        self,
        model_name: str,
        device: torch.device | None = None,
        samples_per_label: int | None = None,
        seed: int | None = None,
        n_experiments: int | None = None,
    ):
        self.model = TextEmbedEvaluator(model_name, device)
        self.samples_per_label = samples_per_label or 8
        self.seed = seed or 42
        self.n_experiments = n_experiments or 10
        np.random.seed(self.seed)

    def _undersample_data(
        self,
        queries: Dataset,
        labels: Dataset,
        indices: np.ndarray | None,
    ) -> tuple[Dataset, Dataset, np.ndarray]:
        """
        Undersample data to ensure each label has the same number of samples.

        Ref: https://github.com/embeddings-benchmark/mteb/blob/b67bd043fe7575e91f08c16a16e318fc4baaa1d6/mteb/abstasks/AbsTaskClassification.py#L228
        """
        queries_sampled = []
        labels_sampled = []

        indices = indices if indices is not None else np.arange(len(labels))
        np.random.shuffle(indices)

        label_counter = defaultdict(int)
        for idx in indices:
            idx = int(idx)
            if label_counter[labels[idx]["label"]] < self.samples_per_label:
                queries_sampled.append(queries[idx]["query"])
                labels_sampled.append(labels[idx]["label"])
                label_counter[labels[idx]["label"]] += 1

        queries_sampled = Dataset.from_dict({"query": queries_sampled})
        labels_sampled = Dataset.from_dict({"label": labels_sampled})
        return queries_sampled, labels_sampled, indices

    def train_classification_model(
        self,
        train_queries: Dataset,
        train_labels: Dataset,
        seed: int,
        max_length: int | None = None,
        samples_per_label: int | None = None,
    ) -> None:
        self._clf = LogisticRegression(n_jobs=-1, random_state=seed, max_iter=100)
        embeddings = self.model.embed_content(
            train_queries, feature="query", max_length=max_length
        )
        self.samples_per_label = samples_per_label or 8
        if isinstance(embeddings["embeddings"], np.ndarray):
            self._clf.fit(embeddings["embeddings"], train_labels["label"])
        else:
            self._clf.fit(
                embeddings["embeddings"].to("cpu").numpy(), train_labels["label"]
            )

    def _evaluate_subset(
        self,
        train_queries: Dataset,
        train_labels: Dataset,
        test_embeddings: np.ndarray,
        test_labels: Dataset,
        max_length: int | None = None,
    ) -> dict[str, Any]:
        self.train_classification_model(
            train_queries,
            train_labels,
            seed=self.seed,
            max_length=max_length,
            samples_per_label=self.samples_per_label,
        )
        predictions = self._clf.predict(test_embeddings)
        clf_report: dict[str, Any] = classification_report(
            test_labels["label"],
            predictions,
            output_dict=True,
            zero_division=np.nan,  # type: ignore
        )  # type: ignore

        return clf_report

    def _get_averaged_metrics(
        self, run_metrics: list[dict[str, Any]]
    ) -> dict[str, Any]:
        accuracy = np.mean([experiment["accuracy"] for experiment in run_metrics])
        macro_avg_precision = np.mean(
            [experiment["macro avg"]["precision"] for experiment in run_metrics]
        )
        macro_avg_recall = np.mean(
            [experiment["macro avg"]["recall"] for experiment in run_metrics]
        )
        macro_avg_f1 = np.mean(
            [experiment["macro avg"]["f1-score"] for experiment in run_metrics]
        )
        macro_avg = {
            "precision": macro_avg_precision,
            "recall": macro_avg_recall,
            "f1-score": macro_avg_f1,
        }
        averaged_metrics = {"accuracy": accuracy, "macro_avg": macro_avg}

        if "weighted avg" in run_metrics[0]:
            weighted_avg_precision = np.mean(
                [experiment["weighted avg"]["precision"] for experiment in run_metrics]
            )
            weighted_avg_recall = np.mean(
                [experiment["weighted avg"]["recall"] for experiment in run_metrics]
            )
            weighted_avg_f1 = np.mean(
                [experiment["weighted avg"]["f1-score"] for experiment in run_metrics]
            )
            weighted_avg = {
                "precision": weighted_avg_precision,
                "recall": weighted_avg_recall,
                "f1-score": weighted_avg_f1,
            }
            averaged_metrics["weighted_avg"] = weighted_avg

        if "micro avg" in run_metrics[0]:
            micro_avg_precision = np.mean(
                [experiment["micro avg"]["precision"] for experiment in run_metrics]
            )
            micro_avg_recall = np.mean(
                [experiment["micro avg"]["recall"] for experiment in run_metrics]
            )
            micro_avg_f1 = np.mean(
                [experiment["micro avg"]["f1-score"] for experiment in run_metrics]
            )
            micro_avg = {
                "precision": micro_avg_precision,
                "recall": micro_avg_recall,
                "f1-score": micro_avg_f1,
            }
            averaged_metrics["micro_avg"] = micro_avg

        return averaged_metrics

    def evaluate(
        self,
        train_queries: Dataset,
        train_labels: Dataset,
        test_queries: Dataset,
        test_labels: Dataset,
        max_length: int | None = None,
    ) -> dict[str, Any]:
        run_metrics = []
        test_embeddings = self.model.embed_content(
            test_queries, feature="query", max_length=max_length
        )["embeddings"]
        if not isinstance(test_embeddings, np.ndarray):
            test_embeddings = test_embeddings.to("cpu").numpy()
        indices = None
        for _ in range(self.n_experiments):
            experiment_queries, experiment_labels, indices = self._undersample_data(
                train_queries,
                train_labels,
                indices,
            )
            run_metrics.append(
                self._evaluate_subset(
                    experiment_queries,
                    experiment_labels,
                    test_embeddings,
                    test_labels,
                    max_length=max_length,
                )
            )
        return self._get_averaged_metrics(run_metrics)

evals_hub.evaluator.nli_eval

NLIEvaluator

Evaluator for NLI tasks for embedding models.

Source code in evals_hub/evaluator/nli_eval.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class NLIEvaluator:
    """
    Evaluator for NLI tasks for embedding models.
    """

    def __init__(
        self, model_name: str, device: torch.device | None = None, **encode_kwargs
    ) -> None:
        self.device = get_device() if device is None else device
        self.model = SentenceTransformer(model_name, device=self.device.type)
        self._encode_kwargs = encode_kwargs

    def embed_content(
        self, premises: Dataset, hypotheses: Dataset
    ) -> tuple[torch.Tensor, torch.Tensor]:
        unique_sentences = set(premises["premise"] + hypotheses["hypothesis"])

        # TODO: Replace this with model._encode that returns a tensor to account for non ST models
        unique_embeddings = self.model.encode(
            list(unique_sentences),
            normalize_embeddings=True,
            convert_to_tensor=True,
            device=self.device.type,
            **self._encode_kwargs,
        ).cpu()
        embeddings = dict(zip(unique_sentences, unique_embeddings))
        premise_embeddings = torch.stack(
            [embeddings[sentence] for sentence in premises["premise"]]
        )
        hypothesis_embeddings = torch.stack(
            [embeddings[sentence] for sentence in hypotheses["hypothesis"]]
        )
        return premise_embeddings, hypothesis_embeddings

    def _calculate_similarity_scores(
        self, premise_embeddings: torch.Tensor, hypothesis_embeddings: torch.Tensor
    ) -> np.ndarray:
        # Currently always uses SentenceTransformer similarity
        if hasattr(self.model, "similarity"):
            return (
                torch.stack(
                    [
                        self.model.similarity(premise_embedding, hypothesis_embedding)
                        for premise_embedding, hypothesis_embedding in zip(
                            premise_embeddings, hypothesis_embeddings
                        )
                    ]
                )
                .squeeze()
                .cpu()
                .numpy()
            )
        else:
            return (
                F.cosine_similarity(premise_embeddings, hypothesis_embeddings, dim=1)
                .cpu()
                .numpy()
            )

    def evaluate(
        self, premises: Dataset, hypotheses: Dataset, labels: Dataset
    ) -> dict[str, float]:
        premise_embeddings, hypothesis_embeddings = self.embed_content(
            premises, hypotheses
        )
        cosine_scores = 1 - paired_cosine_distances(
            premise_embeddings, hypothesis_embeddings
        )
        manhattan_distances = paired_manhattan_distances(
            premise_embeddings, hypothesis_embeddings
        )
        euclidean_distances = paired_euclidean_distances(
            premise_embeddings, hypothesis_embeddings
        )
        similarity_scores = self._calculate_similarity_scores(
            premise_embeddings, hypothesis_embeddings
        )

        metric_runs = {
            "model_similarity": {
                "best_f1": get_metrics_at_best_f1(
                    similarity_scores,
                    np.array(labels["label"]),
                    score_type="similarity",
                ),
                "best_accuracy": get_metrics_at_best_accuracy(
                    similarity_scores,
                    np.array(labels["label"]),
                    score_type="similarity",
                ),
                "average_precision": get_average_precision(
                    similarity_scores,
                    np.array(labels["label"]),
                    score_type="similarity",
                ),
            },
            "cosine_similarity": {
                "best_f1": get_metrics_at_best_f1(
                    cosine_scores, np.array(labels["label"]), score_type="similarity"
                ),
                "best_accuracy": get_metrics_at_best_accuracy(
                    cosine_scores, np.array(labels["label"]), score_type="similarity"
                ),
                "average_precision": get_average_precision(
                    cosine_scores, np.array(labels["label"]), score_type="similarity"
                ),
            },
            "manhattan_distances": {
                "best_f1": get_metrics_at_best_f1(
                    manhattan_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
                "best_accuracy": get_metrics_at_best_accuracy(
                    manhattan_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
                "average_precision": get_average_precision(
                    manhattan_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
            },
            "euclidean_distances": {
                "best_f1": get_metrics_at_best_f1(
                    euclidean_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
                "best_accuracy": get_metrics_at_best_accuracy(
                    euclidean_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
                "average_precision": get_average_precision(
                    euclidean_distances,
                    np.array(labels["label"]),
                    score_type="distance",
                ),
            },
        }

        best_metrics = {
            "f1": max(metric_runs[run]["best_f1"]["f1"] for run in metric_runs),
            "accuracy": max(
                metric_runs[run]["best_accuracy"]["accuracy"] for run in metric_runs
            ),
            "average_precision": max(
                metric_runs[run]["average_precision"] for run in metric_runs
            ),
        }
        return best_metrics

get_metrics_at_best_f1(scores, labels, score_type='similarity')

Calculate the best f1 score and the associated threshold by iterating through examples

Parameters:

Name Type Description Default
scores ndarray

Scores - these can be distances or similarities

required
labels ndarray

Ground truth labels

required

Returns:

Type Description
dict[str, float]

dict[str, float]: The best f1 score and associated threshold, precision and recall

Source code in evals_hub/evaluator/nli_eval.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_metrics_at_best_f1(
    scores: np.ndarray,
    labels: np.ndarray,
    score_type: Literal["distance", "similarity"] = "similarity",
) -> dict[str, float]:
    """Calculate the best f1 score and the associated threshold by iterating through examples

    Args:
        scores: Scores - these can be distances or similarities
        labels: Ground truth labels

    Returns:
        dict[str, float]: The best f1 score and associated threshold, precision and recall
    """
    sorted_indices = np.argsort(scores)
    if score_type == "similarity":
        sorted_indices = np.flip(sorted_indices)

    sorted_scores = scores[sorted_indices]
    sorted_labels = labels[sorted_indices]

    metrics_at_best_f1 = defaultdict(float)
    examples = list(zip(sorted_scores, sorted_labels))
    for i in range(scores.shape[0] - 1):
        predicted_labels = np.zeros(scores.shape[0], dtype=int)
        predicted_labels[: i + 1] = 1
        f1 = f1_score(sorted_labels, predicted_labels)

        if f1 > metrics_at_best_f1["f1"]:
            metrics_at_best_f1["f1"] = float(f1)
            metrics_at_best_f1["precision"] = float(
                precision_score(sorted_labels, predicted_labels)
            )
            metrics_at_best_f1["recall"] = float(
                recall_score(sorted_labels, predicted_labels)
            )
            score, next_score = examples[i][0], examples[i + 1][0]
            metrics_at_best_f1["f1_threshold"] = float((score + next_score) / 2)

    return metrics_at_best_f1

evals_hub.evaluator.reranking_eval

Reranker

Wrapper for both transformer-based and API-based reranker models. Supports embedding-based reranking and API-based reranking (e.g., Cohere Rerank 3.5).

Source code in evals_hub/evaluator/reranking_eval.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class Reranker:
    """
    Wrapper for both transformer-based and API-based reranker models.
    Supports embedding-based reranking and API-based reranking (e.g., Cohere Rerank 3.5).
    """

    def __init__(
        self,
        model_name: str,
        device: torch.device | None = None,
        langfuse_enable: bool = False,
        langfuse_name: str = "reranking_evaluation",
        langfuse_client: object | None = None,
        model_source: str = "huggingface",
        api_config: dict | None = None,
        reranking_method: str | None = None,
        max_text_char_length: int | None = None,
    ) -> None:
        self.device = get_device() if device is None else device
        self.model_name = model_name
        self.model_source = model_source
        self.langfuse_enable = langfuse_enable
        self.langfuse_name = langfuse_name
        self.langfuse_client = langfuse_client

        self.api_config = api_config
        # Determine reranking method strictly from config
        self.use_api = self._determine_reranking_method(reranking_method)
        # Character-based truncation (used for API limits like 32k per document)
        self.max_text_char_length = max_text_char_length

        if not self.use_api:
            self.model = load_model(
                model_name=self.model_name,
                model_source=self.model_source,
                device=self.device,
            )
        else:
            self.model = None
            self._setup_api_client()

    def _determine_reranking_method(self, reranking_method: str | None) -> bool:
        """Determine if API-based reranking should be used.

        Args:
            reranking_method: Either "api" or "embedding" (default).

        Returns:
            True if API-based reranking should be used, False for embedding-based.
        """
        if not reranking_method:
            return False

        return reranking_method.lower() == "api"

    def _setup_api_client(self) -> None:
        """
        Setup API client configuration for external reranking services.
        """
        # Get configuration from api_config or environment
        self.api_key = self.api_config.get("api_key") or os.getenv("COHERE_API_KEY")
        self.base_url = self.api_config.get("base_url") or os.getenv("BASE_URL")

        if not self.api_key:
            raise ValueError(
                "API key is required for API-based reranking. Set COHERE_API_KEY environment variable or provide api_config."
            )

        # Initialize the Cohere client
        self.co_client = cohere.Client(
            api_key=self.api_key,
            base_url=self.base_url,
        )

    def _call_rerank_api(
        self, query: str, documents: list[str], top_n: int | None = None
    ) -> dict | None:
        """
        Call external reranking API using the Cohere client.

        Args:
            query: The search query
            documents: List of documents to rerank
            top_n: Optional limit on number of results

        Returns:
            API response or None if error
        """
        try:
            kwargs = {
                "model": self.model_name,
                "query": query,
                "documents": documents,
            }

            if top_n is not None:
                kwargs["top_n"] = top_n

            response = self.co_client.rerank(**kwargs)

            result = {"results": []}

            for _, item in enumerate(response.results):
                result["results"].append(
                    {"index": item.index, "relevance_score": item.relevance_score}
                )

            return result
        except Exception as e:
            logger.error(f"Error calling rerank API: {e}")
            return None

    def _api_rerank_scores(
        self,
        query: str,
        documents: list[str],
    ) -> torch.Tensor:
        """
        Get reranking scores from API and convert to torch tensor.

        Args:
            query: The search query
            documents: List of documents to rerank

        Returns:
            Torch tensor of similarity scores
        """
        api_result = self._call_rerank_api(query, documents)

        if not api_result or "results" not in api_result:
            # Fallback to uniform scores if API fails
            return torch.ones(len(documents))

        # Initialize scores array with zeros
        scores = [0.0] * len(documents)

        # Fill in scores from API response
        for result in api_result["results"]:
            doc_index = result["index"]
            relevance_score = result["relevance_score"]
            if 0 <= doc_index < len(documents):
                scores[doc_index] = relevance_score

        return torch.tensor(scores)

    def _encode_unique_texts(
        self,
        all_texts: list[str],
        max_length: int = 512,
    ) -> np.ndarray:
        """
        Encodes a list of texts into embeddings while efficiently handling duplicates.
        Args:
            all_texts (list[str]): List of texts to encode
            max_length (int, optional): Maximum length of input sequences. Defaults to 512.

        Returns:
            np.ndarray: Array of embeddings for all input texts, maintaining the original order
                        Shape: (len(all_texts), embedding_dimension)
        """
        index_map, all_unique_texts, all_texts_indexes = {}, [], []
        for text in all_texts:
            text_hash = hash(text)
            if text_hash not in index_map:
                index_map[text_hash] = len(all_unique_texts)
                all_unique_texts.append(text)
            all_texts_indexes.append(index_map[text_hash])
        embs = self.model.encode(
            all_unique_texts, convert_to_tensor=True, show_progress_bar=False
        )
        all_unique_texts_embs = embs.detach().cpu().numpy()
        return all_unique_texts_embs[all_texts_indexes]

    def embed_content(
        self,
        queries: list[dict[str, list[str]]],
        max_length: int | None = None,
    ) -> tuple[np.ndarray, np.ndarray]:
        """
        Embeds queries and their associated documents (both positive and negative) into vector space.

        Args:
            queries (list[dict[str, list[str]]]): List of query dictionaries, where each dictionary contains:
                - 'query': string or list of strings representing the query
                - 'positive': list of strings representing positive (relevant) documents
                - 'negative': list of strings representing negative (irrelevant) documents
            max_length (int | None, optional): Maximum sequence length for document encoding.
                If None, uses model's default. Defaults to None.

        Returns:
            tuple[np.ndarray, np.ndarray]: A tuple containing:
                - all_query_embs: Array of query embeddings, shape (num_queries, embedding_dim)
                - all_docs_embs: Array of document embeddings, shape (num_total_docs, embedding_dim)
                  where num_total_docs is the sum of positive and negative documents across all queries
        """
        all_query_embs = self.model.encode(
            [sample["query"] for sample in queries],
            convert_to_tensor=False,
            show_progress_bar=False,
        )

        all_docs = []
        for sample in queries:
            all_docs.extend(sample["positive"])
            all_docs.extend(sample["negative"])

        all_docs_embs = self._encode_unique_texts(
            all_docs,
            max_length=max_length,
        )
        return all_query_embs, all_docs_embs

    def rerank_scores(
        self,
        query_emb: torch.Tensor | np.ndarray | list | str,
        docs_emb: torch.Tensor | np.ndarray | list | list[str],
    ) -> torch.Tensor:
        """
        Computes similarity scores between query and documents.
        Supports both embedding-based and API-based reranking.

        Args:
            query_emb: Query embeddings tensor (for embedding-based) or query string (for API-based)
            docs_emb: Document embeddings tensor (for embedding-based) or list of document strings (for API-based)

        Returns:
            torch.Tensor: Similarity scores between query and documents.
        """
        if self.use_api:
            # API-based reranking
            if isinstance(query_emb, str) and isinstance(docs_emb, list):
                return self._api_rerank_scores(query_emb, docs_emb)
            else:
                raise ValueError(
                    "For API-based reranking, query_emb must be string and docs_emb must be list of strings"
                )
        else:
            # Embedding-based reranking
            # Accept torch.Tensor, numpy arrays, or Python lists and normalize to torch.Tensor
            def to_tensor(x: torch.Tensor | np.ndarray | list) -> torch.Tensor:
                if isinstance(x, torch.Tensor):
                    return x.to(self.device)
                if isinstance(x, np.ndarray):
                    return torch.tensor(x, dtype=torch.float32, device=self.device)
                if isinstance(x, list):
                    # Convert list (possibly list of vectors) to 2D numpy then tensor
                    arr = np.array(x)
                    return torch.tensor(arr, dtype=torch.float32, device=self.device)
                raise ValueError(
                    "Unsupported embedding type; expected Tensor/ndarray/list"
                )

            # Convert inputs
            if isinstance(query_emb, (torch.Tensor, np.ndarray, list)) and isinstance(
                docs_emb, (torch.Tensor, np.ndarray, list)
            ):
                q = to_tensor(query_emb)
                d = to_tensor(docs_emb)

                # Ensure 2D shapes for cos_sim
                if q.dim() == 1:
                    q = q.unsqueeze(0)
                if d.dim() == 1:
                    d = d.unsqueeze(0)

                sim_scores = cos_sim(q, d)
                if len(sim_scores.shape) > 1:
                    sim_scores = torch.amax(sim_scores, dim=0)
                return sim_scores
            else:
                raise ValueError(
                    "For embedding-based reranking, expected tensor/ndarray/list for both query_emb and docs_emb"
                )

    @observe()
    def process_query_instance(
        self,
        instance: dict[str, list[str]],
        all_query_embs: np.ndarray | None,
        all_docs_embs: np.ndarray | None,
        query_idx: int,
        docs_idx: int,
        top_k: int,
    ) -> dict:
        """
        Process a single query instance and calculate its reranking metrics.
        Supports both embedding-based and API-based processing.

        Args:
            instance: Dictionary containing query, positive and negative documents
            all_query_embs: Array of all query embeddings (None for API-based)
            all_docs_embs: Array of all document embeddings (None for API-based)
            query_idx: Current index in query embeddings
            docs_idx: Current index in document embeddings
            top_k: K value for reciprocal rank calculation

        Returns:
            dict containing metrics and updated indices
        """
        if self.langfuse_enable:
            self.langfuse_client.update_current_trace(name=self.langfuse_name)

        num_pos = len(instance["positive"])
        num_neg = len(instance["negative"])

        if num_pos == 0 or num_neg == 0:
            return {
                "mrr": None,
                "ap": None,
                "query_idx": query_idx + 1,
                "docs_idx": docs_idx + num_pos + num_neg,
                "is_relevant": [],
                "similarity_scores": [],
            }

        is_relevant = [True] * num_pos + [False] * num_neg

        if self.use_api:
            # API-based processing
            query_text = instance["query"]
            if isinstance(query_text, list):
                query_text = " ".join(query_text)  # Join multiple queries

            # Truncate query and documents to respect API character limits when configured
            if self.max_text_char_length is not None:
                query_text = query_text[: self.max_text_char_length]

            documents = instance["positive"] + instance["negative"]
            if self.max_text_char_length is not None:
                documents = [
                    (doc[: self.max_text_char_length] if isinstance(doc, str) else doc)
                    for doc in documents
                ]
            sim_scores = self.rerank_scores(query_text, documents)
            new_query_idx = query_idx + 1
            new_docs_idx = docs_idx + num_pos + num_neg
        else:
            # Embedding-based processing
            num_subqueries = (
                len(instance["query"]) if isinstance(instance["query"], list) else 1
            )
            query_emb = all_query_embs[query_idx : query_idx + num_subqueries]
            new_query_idx = query_idx + num_subqueries

            docs_emb = all_docs_embs[docs_idx : docs_idx + num_pos + num_neg]
            new_docs_idx = docs_idx + num_pos + num_neg

            sim_scores = self.rerank_scores(query_emb, docs_emb)

        mrr = reciprocal_rank_at_k(is_relevant, sim_scores.cpu().tolist(), top_k)
        ap = ap_score(is_relevant, sim_scores.cpu().tolist())

        if self.langfuse_enable:
            time.sleep(0.25)

        return {
            "mrr": mrr,
            "ap": ap,
            "query_idx": new_query_idx,
            "docs_idx": new_docs_idx,
            "is_relevant": is_relevant,
            "similarity_scores": sim_scores.cpu().tolist(),
        }

    def compute_reranking_metrics(
        self, queries: list[dict[str, list[str]]], top_k: int
    ) -> dict[str, dict[str, float]]:
        """
        Compute Mean Average Precision (MAP) and Mean Reciprocal Rank (MRR) metrics for reranking.
        Supports both embedding-based and API-based reranking.
        """
        if self.use_api:
            # For API-based reranking, we don't need to precompute embeddings
            all_query_embs, all_docs_embs = None, None
        else:
            # For embedding-based reranking, precompute embeddings
            all_query_embs, all_docs_embs = self.embed_content(queries)

        all_mrr_scores = []
        all_ap_scores = []
        query_idx, docs_idx = 0, 0

        for instance in queries:
            query_metrics = self.process_query_instance(
                instance, all_query_embs, all_docs_embs, query_idx, docs_idx, top_k
            )
            if query_metrics["mrr"] is not None and query_metrics["ap"] is not None:
                all_mrr_scores.append(query_metrics["mrr"])
                all_ap_scores.append(query_metrics["ap"])
            query_idx = query_metrics["query_idx"]
            docs_idx = query_metrics["docs_idx"]

        mean_ap = np.mean(all_ap_scores)
        mean_mrr = np.mean(all_mrr_scores)

        return {
            "metrics": {
                "map": float(mean_ap),
                "mrr": float(mean_mrr),
                "num_queries": len(all_ap_scores),
                "reranking_method": "api" if self.use_api else "embedding",
                "model_name": self.model_name,
            },
        }

    @classmethod
    def create_api_reranker(
        cls,
        model_name: str = "cohere_rerank_3_5",
        api_key: str | None = None,
        base_url: str | None = None,
        langfuse_enable: bool = False,
        langfuse_name: str = "reranking_evaluation",
        langfuse_client: object | None = None,
    ) -> "Reranker":
        """
        Convenience method to create an API-based reranker.

        Args:
            model_name: Name of the API model (e.g., "cohere_rerank_3_5")
            api_key: API key (if not provided, will use COHERE_API_KEY env var)
            base_url: Base URL for the API (if not provided, will use COHERE_BASE_URL env var)
            langfuse_enable: Whether to enable Langfuse tracking
            langfuse_name: Name for Langfuse tracking
            langfuse_client: Langfuse client instance

        Returns:
            Reranker instance configured for API-based reranking
        """
        api_config = {}
        if api_key:
            api_config["api_key"] = api_key
        if base_url:
            api_config["base_url"] = base_url

        return cls(
            model_name=model_name,
            langfuse_enable=langfuse_enable,
            langfuse_name=langfuse_name,
            langfuse_client=langfuse_client,
            api_config=api_config,
            reranking_method="api",  # Explicitly set to API
        )

    @classmethod
    def from_config(cls, config: dict) -> "Reranker":
        """
        Create a Reranker instance from a configuration dictionary.

        Args:
            config: Configuration dictionary with reranking settings

        Returns:
            Reranker instance configured according to the config

        """
        model_config = config.get("model", {})
        api_config = config.get("api_config", {})
        langfuse_config = config.get("langfuse", {})

        return cls(
            model_name=model_config.get(
                "name", "sentence-transformers/all-MiniLM-L6-v2"
            ),
            model_source=model_config.get("source", "huggingface"),
            reranking_method=model_config.get("reranking_method"),
            api_config=api_config,
            langfuse_enable=langfuse_config.get("enable", False),
            langfuse_name=langfuse_config.get("name", "reranking_evaluation"),
            langfuse_client=langfuse_config.get("client"),
        )

compute_reranking_metrics(queries, top_k)

Compute Mean Average Precision (MAP) and Mean Reciprocal Rank (MRR) metrics for reranking. Supports both embedding-based and API-based reranking.

Source code in evals_hub/evaluator/reranking_eval.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def compute_reranking_metrics(
    self, queries: list[dict[str, list[str]]], top_k: int
) -> dict[str, dict[str, float]]:
    """
    Compute Mean Average Precision (MAP) and Mean Reciprocal Rank (MRR) metrics for reranking.
    Supports both embedding-based and API-based reranking.
    """
    if self.use_api:
        # For API-based reranking, we don't need to precompute embeddings
        all_query_embs, all_docs_embs = None, None
    else:
        # For embedding-based reranking, precompute embeddings
        all_query_embs, all_docs_embs = self.embed_content(queries)

    all_mrr_scores = []
    all_ap_scores = []
    query_idx, docs_idx = 0, 0

    for instance in queries:
        query_metrics = self.process_query_instance(
            instance, all_query_embs, all_docs_embs, query_idx, docs_idx, top_k
        )
        if query_metrics["mrr"] is not None and query_metrics["ap"] is not None:
            all_mrr_scores.append(query_metrics["mrr"])
            all_ap_scores.append(query_metrics["ap"])
        query_idx = query_metrics["query_idx"]
        docs_idx = query_metrics["docs_idx"]

    mean_ap = np.mean(all_ap_scores)
    mean_mrr = np.mean(all_mrr_scores)

    return {
        "metrics": {
            "map": float(mean_ap),
            "mrr": float(mean_mrr),
            "num_queries": len(all_ap_scores),
            "reranking_method": "api" if self.use_api else "embedding",
            "model_name": self.model_name,
        },
    }

create_api_reranker(model_name='cohere_rerank_3_5', api_key=None, base_url=None, langfuse_enable=False, langfuse_name='reranking_evaluation', langfuse_client=None) classmethod

Convenience method to create an API-based reranker.

Parameters:

Name Type Description Default
model_name str

Name of the API model (e.g., "cohere_rerank_3_5")

'cohere_rerank_3_5'
api_key str | None

API key (if not provided, will use COHERE_API_KEY env var)

None
base_url str | None

Base URL for the API (if not provided, will use COHERE_BASE_URL env var)

None
langfuse_enable bool

Whether to enable Langfuse tracking

False
langfuse_name str

Name for Langfuse tracking

'reranking_evaluation'
langfuse_client object | None

Langfuse client instance

None

Returns:

Type Description
Reranker

Reranker instance configured for API-based reranking

Source code in evals_hub/evaluator/reranking_eval.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
@classmethod
def create_api_reranker(
    cls,
    model_name: str = "cohere_rerank_3_5",
    api_key: str | None = None,
    base_url: str | None = None,
    langfuse_enable: bool = False,
    langfuse_name: str = "reranking_evaluation",
    langfuse_client: object | None = None,
) -> "Reranker":
    """
    Convenience method to create an API-based reranker.

    Args:
        model_name: Name of the API model (e.g., "cohere_rerank_3_5")
        api_key: API key (if not provided, will use COHERE_API_KEY env var)
        base_url: Base URL for the API (if not provided, will use COHERE_BASE_URL env var)
        langfuse_enable: Whether to enable Langfuse tracking
        langfuse_name: Name for Langfuse tracking
        langfuse_client: Langfuse client instance

    Returns:
        Reranker instance configured for API-based reranking
    """
    api_config = {}
    if api_key:
        api_config["api_key"] = api_key
    if base_url:
        api_config["base_url"] = base_url

    return cls(
        model_name=model_name,
        langfuse_enable=langfuse_enable,
        langfuse_name=langfuse_name,
        langfuse_client=langfuse_client,
        api_config=api_config,
        reranking_method="api",  # Explicitly set to API
    )

embed_content(queries, max_length=None)

Embeds queries and their associated documents (both positive and negative) into vector space.

Parameters:

Name Type Description Default
queries list[dict[str, list[str]]]

List of query dictionaries, where each dictionary contains: - 'query': string or list of strings representing the query - 'positive': list of strings representing positive (relevant) documents - 'negative': list of strings representing negative (irrelevant) documents

required
max_length int | None

Maximum sequence length for document encoding. If None, uses model's default. Defaults to None.

None

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: A tuple containing: - all_query_embs: Array of query embeddings, shape (num_queries, embedding_dim) - all_docs_embs: Array of document embeddings, shape (num_total_docs, embedding_dim) where num_total_docs is the sum of positive and negative documents across all queries

Source code in evals_hub/evaluator/reranking_eval.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def embed_content(
    self,
    queries: list[dict[str, list[str]]],
    max_length: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Embeds queries and their associated documents (both positive and negative) into vector space.

    Args:
        queries (list[dict[str, list[str]]]): List of query dictionaries, where each dictionary contains:
            - 'query': string or list of strings representing the query
            - 'positive': list of strings representing positive (relevant) documents
            - 'negative': list of strings representing negative (irrelevant) documents
        max_length (int | None, optional): Maximum sequence length for document encoding.
            If None, uses model's default. Defaults to None.

    Returns:
        tuple[np.ndarray, np.ndarray]: A tuple containing:
            - all_query_embs: Array of query embeddings, shape (num_queries, embedding_dim)
            - all_docs_embs: Array of document embeddings, shape (num_total_docs, embedding_dim)
              where num_total_docs is the sum of positive and negative documents across all queries
    """
    all_query_embs = self.model.encode(
        [sample["query"] for sample in queries],
        convert_to_tensor=False,
        show_progress_bar=False,
    )

    all_docs = []
    for sample in queries:
        all_docs.extend(sample["positive"])
        all_docs.extend(sample["negative"])

    all_docs_embs = self._encode_unique_texts(
        all_docs,
        max_length=max_length,
    )
    return all_query_embs, all_docs_embs

from_config(config) classmethod

Create a Reranker instance from a configuration dictionary.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with reranking settings

required

Returns:

Type Description
Reranker

Reranker instance configured according to the config

Source code in evals_hub/evaluator/reranking_eval.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@classmethod
def from_config(cls, config: dict) -> "Reranker":
    """
    Create a Reranker instance from a configuration dictionary.

    Args:
        config: Configuration dictionary with reranking settings

    Returns:
        Reranker instance configured according to the config

    """
    model_config = config.get("model", {})
    api_config = config.get("api_config", {})
    langfuse_config = config.get("langfuse", {})

    return cls(
        model_name=model_config.get(
            "name", "sentence-transformers/all-MiniLM-L6-v2"
        ),
        model_source=model_config.get("source", "huggingface"),
        reranking_method=model_config.get("reranking_method"),
        api_config=api_config,
        langfuse_enable=langfuse_config.get("enable", False),
        langfuse_name=langfuse_config.get("name", "reranking_evaluation"),
        langfuse_client=langfuse_config.get("client"),
    )

process_query_instance(instance, all_query_embs, all_docs_embs, query_idx, docs_idx, top_k)

Process a single query instance and calculate its reranking metrics. Supports both embedding-based and API-based processing.

Parameters:

Name Type Description Default
instance dict[str, list[str]]

Dictionary containing query, positive and negative documents

required
all_query_embs ndarray | None

Array of all query embeddings (None for API-based)

required
all_docs_embs ndarray | None

Array of all document embeddings (None for API-based)

required
query_idx int

Current index in query embeddings

required
docs_idx int

Current index in document embeddings

required
top_k int

K value for reciprocal rank calculation

required

Returns:

Type Description
dict

dict containing metrics and updated indices

Source code in evals_hub/evaluator/reranking_eval.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@observe()
def process_query_instance(
    self,
    instance: dict[str, list[str]],
    all_query_embs: np.ndarray | None,
    all_docs_embs: np.ndarray | None,
    query_idx: int,
    docs_idx: int,
    top_k: int,
) -> dict:
    """
    Process a single query instance and calculate its reranking metrics.
    Supports both embedding-based and API-based processing.

    Args:
        instance: Dictionary containing query, positive and negative documents
        all_query_embs: Array of all query embeddings (None for API-based)
        all_docs_embs: Array of all document embeddings (None for API-based)
        query_idx: Current index in query embeddings
        docs_idx: Current index in document embeddings
        top_k: K value for reciprocal rank calculation

    Returns:
        dict containing metrics and updated indices
    """
    if self.langfuse_enable:
        self.langfuse_client.update_current_trace(name=self.langfuse_name)

    num_pos = len(instance["positive"])
    num_neg = len(instance["negative"])

    if num_pos == 0 or num_neg == 0:
        return {
            "mrr": None,
            "ap": None,
            "query_idx": query_idx + 1,
            "docs_idx": docs_idx + num_pos + num_neg,
            "is_relevant": [],
            "similarity_scores": [],
        }

    is_relevant = [True] * num_pos + [False] * num_neg

    if self.use_api:
        # API-based processing
        query_text = instance["query"]
        if isinstance(query_text, list):
            query_text = " ".join(query_text)  # Join multiple queries

        # Truncate query and documents to respect API character limits when configured
        if self.max_text_char_length is not None:
            query_text = query_text[: self.max_text_char_length]

        documents = instance["positive"] + instance["negative"]
        if self.max_text_char_length is not None:
            documents = [
                (doc[: self.max_text_char_length] if isinstance(doc, str) else doc)
                for doc in documents
            ]
        sim_scores = self.rerank_scores(query_text, documents)
        new_query_idx = query_idx + 1
        new_docs_idx = docs_idx + num_pos + num_neg
    else:
        # Embedding-based processing
        num_subqueries = (
            len(instance["query"]) if isinstance(instance["query"], list) else 1
        )
        query_emb = all_query_embs[query_idx : query_idx + num_subqueries]
        new_query_idx = query_idx + num_subqueries

        docs_emb = all_docs_embs[docs_idx : docs_idx + num_pos + num_neg]
        new_docs_idx = docs_idx + num_pos + num_neg

        sim_scores = self.rerank_scores(query_emb, docs_emb)

    mrr = reciprocal_rank_at_k(is_relevant, sim_scores.cpu().tolist(), top_k)
    ap = ap_score(is_relevant, sim_scores.cpu().tolist())

    if self.langfuse_enable:
        time.sleep(0.25)

    return {
        "mrr": mrr,
        "ap": ap,
        "query_idx": new_query_idx,
        "docs_idx": new_docs_idx,
        "is_relevant": is_relevant,
        "similarity_scores": sim_scores.cpu().tolist(),
    }

rerank_scores(query_emb, docs_emb)

Computes similarity scores between query and documents. Supports both embedding-based and API-based reranking.

Parameters:

Name Type Description Default
query_emb Tensor | ndarray | list | str

Query embeddings tensor (for embedding-based) or query string (for API-based)

required
docs_emb Tensor | ndarray | list | list[str]

Document embeddings tensor (for embedding-based) or list of document strings (for API-based)

required

Returns:

Type Description
Tensor

torch.Tensor: Similarity scores between query and documents.

Source code in evals_hub/evaluator/reranking_eval.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def rerank_scores(
    self,
    query_emb: torch.Tensor | np.ndarray | list | str,
    docs_emb: torch.Tensor | np.ndarray | list | list[str],
) -> torch.Tensor:
    """
    Computes similarity scores between query and documents.
    Supports both embedding-based and API-based reranking.

    Args:
        query_emb: Query embeddings tensor (for embedding-based) or query string (for API-based)
        docs_emb: Document embeddings tensor (for embedding-based) or list of document strings (for API-based)

    Returns:
        torch.Tensor: Similarity scores between query and documents.
    """
    if self.use_api:
        # API-based reranking
        if isinstance(query_emb, str) and isinstance(docs_emb, list):
            return self._api_rerank_scores(query_emb, docs_emb)
        else:
            raise ValueError(
                "For API-based reranking, query_emb must be string and docs_emb must be list of strings"
            )
    else:
        # Embedding-based reranking
        # Accept torch.Tensor, numpy arrays, or Python lists and normalize to torch.Tensor
        def to_tensor(x: torch.Tensor | np.ndarray | list) -> torch.Tensor:
            if isinstance(x, torch.Tensor):
                return x.to(self.device)
            if isinstance(x, np.ndarray):
                return torch.tensor(x, dtype=torch.float32, device=self.device)
            if isinstance(x, list):
                # Convert list (possibly list of vectors) to 2D numpy then tensor
                arr = np.array(x)
                return torch.tensor(arr, dtype=torch.float32, device=self.device)
            raise ValueError(
                "Unsupported embedding type; expected Tensor/ndarray/list"
            )

        # Convert inputs
        if isinstance(query_emb, (torch.Tensor, np.ndarray, list)) and isinstance(
            docs_emb, (torch.Tensor, np.ndarray, list)
        ):
            q = to_tensor(query_emb)
            d = to_tensor(docs_emb)

            # Ensure 2D shapes for cos_sim
            if q.dim() == 1:
                q = q.unsqueeze(0)
            if d.dim() == 1:
                d = d.unsqueeze(0)

            sim_scores = cos_sim(q, d)
            if len(sim_scores.shape) > 1:
                sim_scores = torch.amax(sim_scores, dim=0)
            return sim_scores
        else:
            raise ValueError(
                "For embedding-based reranking, expected tensor/ndarray/list for both query_emb and docs_emb"
            )

evals_hub.evaluator.text_embed_eval

TextEmbedEvaluator

Source code in evals_hub/evaluator/text_embed_eval.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class TextEmbedEvaluator:
    def __init__(
        self,
        model_name: str,
        device: torch.device | None = None,
        model_source="huggingface",
    ) -> None:
        self.device = get_device() if device is None else device
        self.model = load_model(
            model_name=model_name, model_source=model_source, device=self.device
        )

    def embed_content(
        self,
        batch: Dataset,
        feature: str | None = None,
        max_length: int | None = None,
        **kwargs,
    ) -> dict:
        encode_kwargs = {
            "max_length": max_length,
            "normalize_embeddings": True,
            "device": self.device,
            "show_progress_bar": False,
            "convert_to_tensor": True,
            **kwargs,
        }
        embeddings = self.model.encode(
            batch[feature],
            **encode_kwargs,
        )
        return {"embeddings": embeddings}

    def search(self, queries, documents, top_k=10):
        """
        Perform similarity search on the embedded content.
        This method should be implemented based on the specific requirements of the evaluation.
        """
        # Convert embeddings to numpy arrays for easier manipulation
        document_embeddings_array = (
            torch.stack([emb for emb in documents["embeddings"]]).cpu().numpy()
        )
        query_embeddings_array = (
            torch.stack([emb for emb in queries["embeddings"]]).cpu().numpy()
        )

        # Calculate cosine similarity between query and corpus embeddings
        # as we normalized the embeddings, we can use dot product
        # range from -1 to 1, where 1 is most similar
        cosine_similarity_scores = np.dot(
            query_embeddings_array, document_embeddings_array.T
        )

        is_nan = np.isnan(cosine_similarity_scores)
        if is_nan.sum() > 0:
            total_scores = cosine_similarity_scores.size
            nan_percentage = (is_nan.sum() / total_scores) * 100
            logger.warning(
                f"Found {is_nan.sum()} NaN values ({nan_percentage:.2f}%) in similarity scores. "
                f"Replacing with -1. This may indicate model issues "
                f"(invalid embeddings, numerical instability, or model errors). "
                f"Please carefully review results if percentage is high."
            )
        cosine_similarity_scores[is_nan] = -1
        similarity_scores_top_k_values, similarity_scores_top_k_idx = torch.topk(
            torch.from_numpy(cosine_similarity_scores),
            k=top_k,
            dim=1,
            largest=True,
            sorted=True,
        )
        similarity_scores_top_k_values = similarity_scores_top_k_values.tolist()
        similarity_scores_top_k_idx = similarity_scores_top_k_idx.tolist()
        return similarity_scores_top_k_values, similarity_scores_top_k_idx

    def compute_metrics(self, target, pred):
        k_values = [1, 3, 5, 10, 20]

        # construct the string for pytrec_eval to calculate metrics
        all_ndcgs, all_aps, all_recalls, all_precisions = {}, {}, {}, {}
        for k in k_values:
            all_ndcgs[f"NDCG@{k}"] = []
            all_aps[f"MAP@{k}"] = []
            all_recalls[f"Recall@{k}"] = []
            all_precisions[f"P@{k}"] = []
        map_string = "map_cut." + ",".join([str(k) for k in k_values])
        ndcg_string = "ndcg_cut." + ",".join([str(k) for k in k_values])
        recall_string = "recall." + ",".join([str(k) for k in k_values])
        precision_string = "P." + ",".join([str(k) for k in k_values])

        # apply pytrec_eval to calculate metrics
        evaluator = pytrec_eval.RelevanceEvaluator(
            target, {map_string, ndcg_string, recall_string, precision_string}
        )
        pytrec_eval_results = evaluator.evaluate(pred)

        # parsing the results
        for query_id in pytrec_eval_results.keys():
            for k in k_values:
                all_ndcgs[f"NDCG@{k}"].append(
                    pytrec_eval_results[query_id]["ndcg_cut_" + str(k)]
                )
                all_aps[f"MAP@{k}"].append(
                    pytrec_eval_results[query_id]["map_cut_" + str(k)]
                )
                all_recalls[f"Recall@{k}"].append(
                    pytrec_eval_results[query_id]["recall_" + str(k)]
                )
                all_precisions[f"P@{k}"].append(
                    pytrec_eval_results[query_id]["P_" + str(k)]
                )
        return all_ndcgs, all_aps, all_recalls, all_precisions

    def evaluate(
        self,
        queries: Dataset,
        documents: Dataset,
        relevances: Dataset | None = None,
        top_k: int = 10,
        max_length: int | None = None,
        prompt_name_query: str | None = None,
        prompt_name_doc: str | None = None,
        batch_size: int = 32,
    ) -> dict:
        """
        Evaluate the model on the given queries and documents.
        If relevances are provided, compute metrics using pytrec_eval.
        """
        device = self.device

        # Embed queries and documents
        # Apply in batches using HuggingFace Datasets
        # Calculate query embeddings
        queries = queries.map(
            self.embed_content,
            fn_kwargs={
                "feature": "query",
                "max_length": max_length,
                "prompt_name": prompt_name_query,
            },
            batched=True,
            batch_size=batch_size,
        )
        documents = documents.map(
            self.embed_content,
            fn_kwargs={
                "feature": "doc",
                "max_length": max_length,
                "prompt_name": prompt_name_doc,
            },
            batched=True,
            batch_size=batch_size,  # Adjust as needed
        )
        queries.set_format("torch", device=device)
        documents.set_format("torch", device=device)

        similarity_scores_top_k_values, similarity_scores_top_k_idx = self.search(
            queries, documents, top_k=top_k
        )
        # transform pred and targe to the format that pytrec_eval expects
        results = self._transform_results(
            queries,
            documents,
            similarity_scores_top_k_values,
            similarity_scores_top_k_idx,
        )
        relevances_dict = self._transform_relevances(relevances)

        # metrics for each query
        all_ndcgs, all_aps, all_recalls, all_precisions = self.compute_metrics(
            target=relevances_dict, pred=results
        )

        # calculat mean for each metric
        mean_ndcgs = {k: np.mean(v) for k, v in all_ndcgs.items()}
        mean_aps = {k: np.mean(v) for k, v in all_aps.items()}
        mean_recalls = {k: np.mean(v) for k, v in all_recalls.items()}
        mean_precisions = {k: np.mean(v) for k, v in all_precisions.items()}

        # Combine all metrics into a single dictionary
        metrics = {
            "NDCGs": mean_ndcgs,
            "MAPs": mean_aps,
            "Recalls": mean_recalls,
            "Precisions": mean_precisions,
        }
        return metrics

    def _transform_results(
        self,
        queries,
        documents,
        similarity_scores_top_k_values,
        similarity_scores_top_k_idx,
    ):
        """
        Output the results in the format that the pytrec_eval requires.
        {
        'query_1': {
            'doc-1': 0.65,
            'doc-2': 0.62,
            'doc-3': 0.62,
            'doc-4': 0.61,
            'doc-5': 0.61
        },
        'query_2': {
            'doc-6': 0.47,
            'doc-7': 0.46,
            'doc-8': 0.43,
            'doc-9': 0.43,
            'doc-10': 0.42
        },
        }

        """
        # Create a Polars Series for the id slices
        documents_id = pl.Series(documents["_id"])

        # Convert index to _id
        results = {qid: {} for qid in queries["_id"]}
        for idx, query_id in enumerate(queries["_id"]):
            topk_df = pl.DataFrame(
                {
                    "doc_idx": documents_id[similarity_scores_top_k_idx[idx]],
                    "similarity_score": similarity_scores_top_k_values[idx],
                }
            )
            topk_dict = dict(zip(topk_df["doc_idx"], topk_df["similarity_score"]))
            results[query_id] = topk_dict

        return results

    def _transform_relevances(self, relevances: Dataset) -> dict:
        """
        Transform the relevances dataset to a dictionary format that pytrec_eval expects.
        Note that missing documents will be treated as non-relevant (score=0).
        {
        'query_1': {
            'doc-1': 2,
            'doc-4': 1,
            'doc-10': 2
        },
        'query_2': {
            'doc-6': 2,
            'doc-7': 1,
            'doc-100': 2,
        },
        }
        """
        relevances_dict = defaultdict(dict)

        def to_qrel(row):
            qid = row["query_id"]
            did = row["doc_id"]
            relevances_dict[qid][did] = row["score"]

        relevances.map(to_qrel)

        return relevances_dict

evaluate(queries, documents, relevances=None, top_k=10, max_length=None, prompt_name_query=None, prompt_name_doc=None, batch_size=32)

Evaluate the model on the given queries and documents. If relevances are provided, compute metrics using pytrec_eval.

Source code in evals_hub/evaluator/text_embed_eval.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def evaluate(
    self,
    queries: Dataset,
    documents: Dataset,
    relevances: Dataset | None = None,
    top_k: int = 10,
    max_length: int | None = None,
    prompt_name_query: str | None = None,
    prompt_name_doc: str | None = None,
    batch_size: int = 32,
) -> dict:
    """
    Evaluate the model on the given queries and documents.
    If relevances are provided, compute metrics using pytrec_eval.
    """
    device = self.device

    # Embed queries and documents
    # Apply in batches using HuggingFace Datasets
    # Calculate query embeddings
    queries = queries.map(
        self.embed_content,
        fn_kwargs={
            "feature": "query",
            "max_length": max_length,
            "prompt_name": prompt_name_query,
        },
        batched=True,
        batch_size=batch_size,
    )
    documents = documents.map(
        self.embed_content,
        fn_kwargs={
            "feature": "doc",
            "max_length": max_length,
            "prompt_name": prompt_name_doc,
        },
        batched=True,
        batch_size=batch_size,  # Adjust as needed
    )
    queries.set_format("torch", device=device)
    documents.set_format("torch", device=device)

    similarity_scores_top_k_values, similarity_scores_top_k_idx = self.search(
        queries, documents, top_k=top_k
    )
    # transform pred and targe to the format that pytrec_eval expects
    results = self._transform_results(
        queries,
        documents,
        similarity_scores_top_k_values,
        similarity_scores_top_k_idx,
    )
    relevances_dict = self._transform_relevances(relevances)

    # metrics for each query
    all_ndcgs, all_aps, all_recalls, all_precisions = self.compute_metrics(
        target=relevances_dict, pred=results
    )

    # calculat mean for each metric
    mean_ndcgs = {k: np.mean(v) for k, v in all_ndcgs.items()}
    mean_aps = {k: np.mean(v) for k, v in all_aps.items()}
    mean_recalls = {k: np.mean(v) for k, v in all_recalls.items()}
    mean_precisions = {k: np.mean(v) for k, v in all_precisions.items()}

    # Combine all metrics into a single dictionary
    metrics = {
        "NDCGs": mean_ndcgs,
        "MAPs": mean_aps,
        "Recalls": mean_recalls,
        "Precisions": mean_precisions,
    }
    return metrics

search(queries, documents, top_k=10)

Perform similarity search on the embedded content. This method should be implemented based on the specific requirements of the evaluation.

Source code in evals_hub/evaluator/text_embed_eval.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def search(self, queries, documents, top_k=10):
    """
    Perform similarity search on the embedded content.
    This method should be implemented based on the specific requirements of the evaluation.
    """
    # Convert embeddings to numpy arrays for easier manipulation
    document_embeddings_array = (
        torch.stack([emb for emb in documents["embeddings"]]).cpu().numpy()
    )
    query_embeddings_array = (
        torch.stack([emb for emb in queries["embeddings"]]).cpu().numpy()
    )

    # Calculate cosine similarity between query and corpus embeddings
    # as we normalized the embeddings, we can use dot product
    # range from -1 to 1, where 1 is most similar
    cosine_similarity_scores = np.dot(
        query_embeddings_array, document_embeddings_array.T
    )

    is_nan = np.isnan(cosine_similarity_scores)
    if is_nan.sum() > 0:
        total_scores = cosine_similarity_scores.size
        nan_percentage = (is_nan.sum() / total_scores) * 100
        logger.warning(
            f"Found {is_nan.sum()} NaN values ({nan_percentage:.2f}%) in similarity scores. "
            f"Replacing with -1. This may indicate model issues "
            f"(invalid embeddings, numerical instability, or model errors). "
            f"Please carefully review results if percentage is high."
        )
    cosine_similarity_scores[is_nan] = -1
    similarity_scores_top_k_values, similarity_scores_top_k_idx = torch.topk(
        torch.from_numpy(cosine_similarity_scores),
        k=top_k,
        dim=1,
        largest=True,
        sorted=True,
    )
    similarity_scores_top_k_values = similarity_scores_top_k_values.tolist()
    similarity_scores_top_k_idx = similarity_scores_top_k_idx.tolist()
    return similarity_scores_top_k_values, similarity_scores_top_k_idx

evals_hub.evaluator.qa_eval

JudgeOutput pydantic-model

Bases: BaseModel

Structured output for an LLM Judge model

Fields:

  • extracted_final_answer (str)
  • reasoning (str)
  • correct (Literal['yes', 'no'])
  • confidence (float)
Source code in evals_hub/evaluator/qa_eval.py
38
39
40
41
42
43
44
class JudgeOutput(BaseModel):
    """Structured output for an LLM Judge model"""

    extracted_final_answer: str
    reasoning: str
    correct: Literal["yes", "no"]
    confidence: float

QAEvaluator

Source code in evals_hub/evaluator/qa_eval.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class QAEvaluator:
    def __init__(
        self,
        model_config: ModelConfig,
        judge_model_config: ModelConfig | None,
    ) -> None:
        self._model_config = model_config
        self._judge_model_config = judge_model_config

        openai_settings = OpenAISettings()
        langfuse_settings = LangfuseSettings()

        self._openai_client = AsyncOpenAI(
            api_key=openai_settings.api_key.get_secret_value(),
            base_url=openai_settings.base_url,
        )

        if model_config.checkpoint:
            system_prompt_path = model_config.system_prompt_path
            if system_prompt_path:
                logging.info(
                    f"Loading system prompt from: {model_config.system_prompt_path}"
                )
                if system_prompt_path.suffix == ".txt":
                    system_prompt = system_prompt_path.read_text()
                elif system_prompt_path.suffix in [".yaml", ".yml"]:
                    system_prompt = load_prompts_yaml(system_prompt_path)[
                        "system_prompt"
                    ]
                else:
                    logging.warning(
                        f"Unsupported system prompt file format: \
                          {system_prompt_path.suffix}, only .txt, .yaml, .yml are supported."
                    )
                    system_prompt = load_default_prompts()["system_prompt"]

            else:
                logging.info("No system prompt path provided")
                system_prompt = load_default_prompts()["system_prompt"]
            logging.info(f"System prompt for Response generation: {system_prompt}")
            self._model = get_model(
                model_config.checkpoint,
                system_prompt,
                self._openai_client,
                response_format=QAOutput,
                **model_config.model_settings,
            )
        else:
            self._model = dynamic_load_model(model_config.import_path)

        if judge_model_config and judge_model_config.checkpoint:
            system_prompt = (
                model_config.system_prompt_path.read_text()
                if model_config.system_prompt_path
                else None
            )
            self._judge_model = get_model(
                judge_model_config.checkpoint,
                system_prompt,
                self._openai_client,
                response_format=JudgeOutput,
                **judge_model_config.model_config,
            )
        elif judge_model_config:
            self._judge_model = dynamic_load_model(judge_model_config.import_path)
        else:
            self._judge_model = None
        self.langfuse_tracing_environment = langfuse_settings.tracing_environment
        self._langfuse_client = Langfuse(
            host=langfuse_settings.host,
            public_key=langfuse_settings.public_key,
            secret_key=langfuse_settings.secret_key.get_secret_value(),
            environment=langfuse_settings.tracing_environment,
        )
        langfuse_name = langfuse_settings.name or "qa-evaluation"
        self._langfuse_name = langfuse_name

    def _get_traces_for_version(
        self, langfuse_version_tag: str
    ) -> list[TraceWithDetails]:
        all_traces = fetch_all_traces(
            "generate_answer", langfuse_env=self.langfuse_tracing_environment
        )
        valid_traces = list(
            filter(
                lambda trace: trace.metadata["version"] == langfuse_version_tag,
                all_traces,
            )
        )
        return valid_traces

    def _get_scores_for_version(self, langfuse_version_tag: str) -> list:
        traces = self._get_traces_for_version(langfuse_version_tag)
        score_ids = ",".join(chain.from_iterable([trace.scores for trace in traces]))
        scores = fetch_scores(score_ids=score_ids)
        return scores

    @observe(capture_input=False)
    async def generate_answer(
        self,
        qa_instance: QAInstance,
        langfuse_version_tag: str,
    ) -> QAOutput:
        self._langfuse_client.update_current_trace(
            input=qa_instance["question"],
            metadata={
                "id": qa_instance["id"],
                "answer_type": qa_instance["answer_type"],
                "version": langfuse_version_tag,
            },
        )
        return await self._model(qa_instance["question"])

    async def generate_answers(
        self,
        instances: Dataset,
        max_concurrency: int | None = None,
        langfuse_version_tag: str | None = None,
    ) -> tuple[str, list[QAOutput]]:
        """Generates answers for a given question set."""
        langfuse_version_tag = langfuse_version_tag or str(uuid4())
        max_concurrency = max_concurrency or 3
        generated_answers = []
        tasks = []
        for instance in instances:
            qa_instance: QAInstance = {
                "id": instance["id"],
                "question": instance["question"],
                "answer_type": instance["answer_type"],
            }
            tasks.append(
                self.generate_answer(
                    qa_instance=qa_instance, langfuse_version_tag=langfuse_version_tag
                )
            )
        generated_answers = await gather_with_semaphore(tasks, max_concurrency)
        self._langfuse_client.flush()
        return langfuse_version_tag, generated_answers

    async def _judge_answer(
        self, trace: TraceWithDetails, correct_answer: str
    ) -> JudgeOutput:
        question = trace.input
        answer = trace.output["answer"]
        confidence = trace.output["confidence"]
        if self._judge_model_config.user_prompt_path:
            user_prompt = self._judge_model_config.user_prompt_path.read_text()
            content = user_prompt.format(
                question=question,
                answer=answer,
                correct_answer=correct_answer,
                confidence=confidence,
            )
        else:
            content = f"""
            Question: `{question}`
            Answer: `{answer}
            Correct Answer: `{correct_answer}`"""

        judgement = await self._judge_model(content)
        score = int(judgement.correct == "yes")
        self._langfuse_client.create_score(
            name="correct",
            value=score,
            trace_id=trace.id,
            metadata={
                "judge_model_config": self._judge_model_config.model_dump(),
                "judgement": judgement.model_dump(),
            },
            # Making score idempotent
            score_id=f"{trace.id}-correct",
        )
        return judgement

    async def judge_answers(
        self,
        answers: Dataset,
        langfuse_version_tag: str,
        max_concurrency: int | None = None,
    ) -> list[JudgeOutput]:
        max_concurrency = max_concurrency or 3
        tasks = []
        traces = self._get_traces_for_version(langfuse_version_tag)

        for trace in traces:
            correct_answer = answers.filter(
                lambda row: row["id"] == trace.metadata["id"]
            )["answer"][0]
            tasks.append(self._judge_answer(trace, correct_answer))
        logger.info(f"Judging {len(tasks)} answers...")
        judgements = await gather_with_semaphore(tasks, max_concurrency)
        self._langfuse_client.flush()
        return judgements

    def calculate_metrics(
        self, langfuse_version_tag: str, beta: int = 10
    ) -> dict[str, str | float]:
        scores = self._get_scores_for_version(langfuse_version_tag)
        correct = np.array([score.value for score in scores])
        confidences = np.array(
            [score.metadata["judgement"]["confidence"] for score in scores]
        )

        accuracy = np.round(correct.mean(), 4)
        confidence_half_width = np.round(
            1.96 * np.sqrt(accuracy * (100 - accuracy) / len(scores)), 2
        )

        return {
            "accuracy": float(accuracy),
            "bounded_accuracy": f"{100 * accuracy}% +/- {confidence_half_width}% | n = {len(scores)}",
            "calibration_error": calculate_calibration_error(
                confidences=confidences, correct=correct, beta=beta
            ),
        }

generate_answers(instances, max_concurrency=None, langfuse_version_tag=None) async

Generates answers for a given question set.

Source code in evals_hub/evaluator/qa_eval.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def generate_answers(
    self,
    instances: Dataset,
    max_concurrency: int | None = None,
    langfuse_version_tag: str | None = None,
) -> tuple[str, list[QAOutput]]:
    """Generates answers for a given question set."""
    langfuse_version_tag = langfuse_version_tag or str(uuid4())
    max_concurrency = max_concurrency or 3
    generated_answers = []
    tasks = []
    for instance in instances:
        qa_instance: QAInstance = {
            "id": instance["id"],
            "question": instance["question"],
            "answer_type": instance["answer_type"],
        }
        tasks.append(
            self.generate_answer(
                qa_instance=qa_instance, langfuse_version_tag=langfuse_version_tag
            )
        )
    generated_answers = await gather_with_semaphore(tasks, max_concurrency)
    self._langfuse_client.flush()
    return langfuse_version_tag, generated_answers

QAOutput pydantic-model

Bases: BaseModel

Structured output for a QA model

Fields:

  • explanation (str)
  • answer (str)
  • confidence (str)
Source code in evals_hub/evaluator/qa_eval.py
24
25
26
27
28
29
class QAOutput(BaseModel):
    """Structured output for a QA model"""

    explanation: str
    answer: str
    confidence: str