Source code for autosklearn.metrics

from abc import ABCMeta, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

import collections
from functools import partial
from itertools import product

import numpy as np
import sklearn.metrics
from sklearn.utils.multiclass import type_of_target
from smac.utils.constants import MAXINT

from autosklearn.constants import (
    BINARY_CLASSIFICATION,
    MULTICLASS_CLASSIFICATION,
    MULTILABEL_CLASSIFICATION,
    MULTIOUTPUT_REGRESSION,
    REGRESSION,
    REGRESSION_TASKS,
    TASK_TYPES,
)
from autosklearn.data.target_validator import SUPPORTED_XDATA_TYPES

from .util import sanitize_array


class Scorer(object, metaclass=ABCMeta):
    def __init__(
        self,
        name: str,
        score_func: Callable,
        optimum: float,
        worst_possible_result: float,
        sign: float,
        kwargs: Any,
        needs_X: bool = False,
    ) -> None:
        self.name = name
        self._kwargs = kwargs
        self._score_func = score_func
        self._optimum = optimum
        self._needs_X = needs_X
        self._worst_possible_result = worst_possible_result
        self._sign = sign

    @abstractmethod
    def __call__(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        *,
        X_data: Optional[SUPPORTED_XDATA_TYPES] = None,
        sample_weight: Optional[List[float]] = None,
    ) -> float:
        pass

    def __repr__(self) -> str:
        return self.name


class _PredictScorer(Scorer):
    def __call__(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        *,
        X_data: Optional[SUPPORTED_XDATA_TYPES] = None,
        sample_weight: Optional[List[float]] = None,
    ) -> float:
        """Evaluate predicted target values for X relative to y_true.

        Parameters
        ----------
        y_true : array-like
            Gold standard target values for X.

        y_pred : array-like, [n_samples x n_classes]
            Model predictions

        X_data : array-like [n_samples x n_features]
            X data used to obtain the predictions: each row x_j corresponds to the input
             used to obtain predictions y_j

        sample_weight : array-like, optional (default=None)
            Sample weights.

        Returns
        -------
        score : float
            Score function applied to prediction of estimator on X.
        """
        type_true = type_of_target(y_true)
        if (
            type_true == "binary"
            and type_of_target(y_pred) == "continuous"
            and len(y_pred.shape) == 1
        ):
            # For a pred scorer, no threshold, nor probability is required
            # If y_true is binary, and y_pred is continuous
            # it means that a rounding is necessary to obtain the binary class
            y_pred = np.around(y_pred, decimals=0)
        elif (
            len(y_pred.shape) == 1 or y_pred.shape[1] == 1 or type_true == "continuous"
        ):
            # must be regression, all other task types would return at least
            # two probabilities
            pass
        elif type_true in ["binary", "multiclass"]:
            y_pred = np.argmax(y_pred, axis=1)
        elif type_true == "multilabel-indicator":
            y_pred[y_pred > 0.5] = 1.0
            y_pred[y_pred <= 0.5] = 0.0
        elif type_true == "continuous-multioutput":
            pass
        else:
            raise ValueError(type_true)

        scorer_kwargs = {}  # type: Dict[str, Union[List[float], np.ndarray]]
        if sample_weight is not None:
            scorer_kwargs["sample_weight"] = sample_weight
        if self._needs_X is True:
            scorer_kwargs["X_data"] = X_data

        return self._sign * self._score_func(
            y_true, y_pred, **scorer_kwargs, **self._kwargs
        )


class _ProbaScorer(Scorer):
    def __call__(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        *,
        X_data: Optional[SUPPORTED_XDATA_TYPES] = None,
        sample_weight: Optional[List[float]] = None,
    ) -> float:
        """Evaluate predicted probabilities for X relative to y_true.
        Parameters
        ----------
        y_true : array-like
            Gold standard target values for X. These must be class labels,
            not probabilities.

        y_pred : array-like, [n_samples x n_classes]
            Model predictions

        X_data : array-like [n_samples x n_features]
            X data used to obtain the predictions: each row x_j corresponds to the input
             used to obtain predictions y_j

        sample_weight : array-like, optional (default=None)
            Sample weights.

        Returns
        -------
        score : float
            Score function applied to prediction of estimator on X.
        """

        if self._score_func is sklearn.metrics.log_loss:
            n_labels_pred = np.array(y_pred).reshape((len(y_pred), -1)).shape[1]
            n_labels_test = len(np.unique(y_true))
            if n_labels_pred != n_labels_test:
                labels = list(range(n_labels_pred))
                if sample_weight is not None:
                    return self._sign * self._score_func(
                        y_true,
                        y_pred,
                        sample_weight=sample_weight,
                        labels=labels,
                        **self._kwargs,
                    )
                else:
                    return self._sign * self._score_func(
                        y_true, y_pred, labels=labels, **self._kwargs
                    )

        scorer_kwargs = {}  # type: Dict[str, Union[List[float], np.ndarray]]
        if sample_weight is not None:
            scorer_kwargs["sample_weight"] = sample_weight
        if self._needs_X is True:
            scorer_kwargs["X_data"] = X_data

        return self._sign * self._score_func(
            y_true, y_pred, **scorer_kwargs, **self._kwargs
        )


class _ThresholdScorer(Scorer):
    def __call__(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        *,
        X_data: Optional[SUPPORTED_XDATA_TYPES] = None,
        sample_weight: Optional[List[float]] = None,
    ) -> float:
        """Evaluate decision function output for X relative to y_true.
        Parameters
        ----------
        y_true : array-like
            Gold standard target values for X. These must be class labels,
            not probabilities.

        y_pred : array-like, [n_samples x n_classes]
            Model predictions

        X_data : array-like [n_samples x n_features]
            X data used to obtain the predictions: each row x_j corresponds to the input
             used to obtain predictions y_j

        sample_weight : array-like, optional (default=None)
            Sample weights.

        Returns
        -------
        score : float
            Score function applied to prediction of estimator on X.
        """
        y_type = type_of_target(y_true)
        if y_type not in ("binary", "multilabel-indicator"):
            raise ValueError("{0} format is not supported".format(y_type))

        if y_type == "binary":
            if y_pred.ndim > 1:
                y_pred = y_pred[:, 1]
        elif isinstance(y_pred, list):
            y_pred = np.vstack([p[:, -1] for p in y_pred]).T

        scorer_kwargs = {}  # type: Dict[str, Union[List[float], np.ndarray]]
        if sample_weight is not None:
            scorer_kwargs["sample_weight"] = sample_weight
        if self._needs_X is True:
            scorer_kwargs["X_data"] = X_data

        return self._sign * self._score_func(
            y_true, y_pred, **scorer_kwargs, **self._kwargs
        )


[docs]def make_scorer( name: str, score_func: Callable, *, optimum: float = 1.0, worst_possible_result: float = 0.0, greater_is_better: bool = True, needs_proba: bool = False, needs_threshold: bool = False, needs_X: bool = False, **kwargs: Any, ) -> Scorer: """Make a scorer from a performance metric or loss function. Factory inspired by scikit-learn which wraps scikit-learn scoring functions to be used in auto-sklearn. Parameters ---------- name: str Descriptive name of the metric score_func : callable Score function (or loss function) with signature ``score_func(y, y_pred, **kwargs)``. optimum : int or float, default=1 The best score achievable by the score function, i.e. maximum in case of scorer function and minimum in case of loss function. worst_possible_result : int of float, default=0 The worst score achievable by the score function, i.e. minimum in case of scorer function and maximum in case of loss function. greater_is_better : boolean, default=True Whether score_func is a score function (default), meaning high is good, or a loss function, meaning low is good. In the latter case, the scorer object will sign-flip the outcome of the score_func. needs_proba : boolean, default=False Whether score_func requires predict_proba to get probability estimates out of a classifier. needs_threshold : boolean, default=False Whether score_func takes a continuous decision certainty. This only works for binary classification. needs_X : boolean, default=False Whether score_func requires X in __call__ to compute a metric. **kwargs : additional arguments Additional parameters to be passed to score_func. Returns ------- scorer : callable Callable object that returns a scalar score; greater is better or set greater_is_better to False. """ sign = 1 if greater_is_better else -1 if needs_proba and needs_threshold: raise ValueError( "Set either needs_proba or needs_threshold to True, but not both." ) cls = None # type: Any if needs_proba: cls = _ProbaScorer elif needs_threshold: cls = _ThresholdScorer else: cls = _PredictScorer return cls( name, score_func, optimum, worst_possible_result, sign, kwargs, needs_X=needs_X )
# Standard regression scores mean_absolute_error = make_scorer( "mean_absolute_error", sklearn.metrics.mean_absolute_error, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, ) mean_squared_error = make_scorer( "mean_squared_error", sklearn.metrics.mean_squared_error, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, squared=True, ) root_mean_squared_error = make_scorer( "root_mean_squared_error", sklearn.metrics.mean_squared_error, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, squared=False, ) mean_squared_log_error = make_scorer( "mean_squared_log_error", sklearn.metrics.mean_squared_log_error, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, ) median_absolute_error = make_scorer( "median_absolute_error", sklearn.metrics.median_absolute_error, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, ) r2 = make_scorer("r2", sklearn.metrics.r2_score) # Standard Classification Scores accuracy = make_scorer("accuracy", sklearn.metrics.accuracy_score) balanced_accuracy = make_scorer( "balanced_accuracy", sklearn.metrics.balanced_accuracy_score ) # Score functions that need decision values roc_auc = make_scorer( "roc_auc", sklearn.metrics.roc_auc_score, greater_is_better=True, needs_threshold=True, ) average_precision = make_scorer( "average_precision", sklearn.metrics.average_precision_score, needs_threshold=True ) # NOTE: zero_division # # Specified as the explicit default, see sklearn docs: # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_score.html#sklearn-metrics-precision-score precision = make_scorer( "precision", partial(sklearn.metrics.precision_score, zero_division=0) ) recall = make_scorer("recall", partial(sklearn.metrics.recall_score, zero_division=0)) f1 = make_scorer("f1", partial(sklearn.metrics.f1_score, zero_division=0)) # Score function for probabilistic classification log_loss = make_scorer( "log_loss", sklearn.metrics.log_loss, optimum=0, worst_possible_result=MAXINT, greater_is_better=False, needs_proba=True, ) # TODO what about mathews correlation coefficient etc? REGRESSION_METRICS = { scorer.name: scorer for scorer in [ mean_absolute_error, mean_squared_error, root_mean_squared_error, mean_squared_log_error, median_absolute_error, r2, ] } CLASSIFICATION_METRICS = { scorer.name: scorer for scorer in [accuracy, balanced_accuracy, roc_auc, average_precision, log_loss] } # NOTE: zero_division # # Specified as the explicit default, see sklearn docs: # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_score.html#sklearn-metrics-precision-score for (base_name, sklearn_metric), average in product( [ ("precision", sklearn.metrics.precision_score), ("recall", sklearn.metrics.recall_score), ("f1", sklearn.metrics.f1_score), ], ["macro", "micro", "samples", "weighted"], ): name = f"{base_name}_{average}" scorer = make_scorer( name, partial(sklearn_metric, pos_label=None, average=average, zero_division=0) ) globals()[name] = scorer # Adds scorer to the module scope CLASSIFICATION_METRICS[name] = scorer def _validate_metrics( metrics: Sequence[Scorer], scoring_functions: Optional[List[Scorer]] = None, ) -> None: """ Validate metrics given to Auto-sklearn. Raises an Exception in case of a problem. metrics: Sequence[Scorer] A list of objects that hosts a function to calculate how good the prediction is according to the solution. scoring_functions: Optional[List[Scorer]] A list of metrics to calculate multiple losses """ to_score = list(metrics) if scoring_functions: to_score.extend(scoring_functions) if len(metrics) == 0: raise ValueError("Number of metrics to compute must be greater than zero.") metric_counter = collections.Counter(to_score) metric_names_counter = collections.Counter(metric.name for metric in to_score) if len(metric_counter) != len(metric_names_counter): raise ValueError( "Error in metrics passed to Auto-sklearn. A metric name was used " "multiple times for different metrics!" ) def calculate_scores( solution: np.ndarray, prediction: np.ndarray, task_type: int, metrics: Sequence[Scorer], *, X_data: Optional[SUPPORTED_XDATA_TYPES] = None, scoring_functions: Optional[List[Scorer]] = None, ) -> Dict[str, float]: """ Returns the scores (a magnitude that allows casting the optimization problem as a maximization one) for the given Auto-Sklearn Scorer objects. Parameters ---------- solution: np.ndarray The ground truth of the targets prediction: np.ndarray The best estimate from the model, of the given targets task_type: int To understand if the problem task is classification or regression metrics: Sequence[Scorer] A list of objects that hosts a function to calculate how good the prediction is according to the solution. X_data : array-like [n_samples x n_features] X data used to obtain the predictions scoring_functions: List[Scorer] A list of metrics to calculate multiple losses Returns ------- Dict[str, float] """ if task_type not in TASK_TYPES: raise NotImplementedError(task_type) _validate_metrics(metrics=metrics, scoring_functions=scoring_functions) to_score = list(metrics) if scoring_functions: to_score.extend(scoring_functions) score_dict = dict() if task_type in REGRESSION_TASKS: for metric_ in to_score: try: score_dict[metric_.name] = _compute_single_scorer( metric=metric_, prediction=prediction, solution=solution, task_type=task_type, X_data=X_data, ) except ValueError as e: print(e, e.args[0]) if ( e.args[0] == "Mean Squared Logarithmic Error cannot be used when " "targets contain negative values." ): continue else: raise e else: for metric_ in to_score: # TODO maybe annotate metrics to define which cases they can # handle? try: score_dict[metric_.name] = _compute_single_scorer( metric=metric_, prediction=prediction, solution=solution, task_type=task_type, X_data=X_data, ) except ValueError as e: if e.args[0] == "multiclass format is not supported": continue elif ( e.args[0] == "Samplewise metrics are not available " "outside of multilabel classification." ): continue elif ( e.args[0] == "Target is multiclass but " "average='binary'. Please choose another average " "setting, one of [None, 'micro', 'macro', 'weighted']." ): continue else: raise e return score_dict def calculate_loss( solution: np.ndarray, prediction: np.ndarray, task_type: int, metric: Scorer, X_data: Optional[SUPPORTED_XDATA_TYPES] = None, ) -> float: """Calculate the loss with a given metric Parameters ---------- solution: np.ndarray The solutions prediction: np.ndarray The predictions generated task_type: int The task type of the problem metric: Scorer The metric to use X_data: Optional[SUPPORTED_XDATA_TYPES] X data used to obtain the predictions """ losses = calculate_losses( solution=solution, prediction=prediction, task_type=task_type, metrics=[metric], X_data=X_data, ) return losses[metric.name] def calculate_losses( solution: np.ndarray, prediction: np.ndarray, task_type: int, metrics: Sequence[Scorer], *, X_data: Optional[SUPPORTED_XDATA_TYPES] = None, scoring_functions: Optional[List[Scorer]] = None, ) -> Dict[str, float]: """ Returns the losses (a magnitude that allows casting the optimization problem as a minimization one) for the given Auto-Sklearn Scorer objects. Parameters ---------- solution: np.ndarray The ground truth of the targets prediction: np.ndarray The best estimate from the model, of the given targets task_type: int To understand if the problem task is classification or regression metrics: Sequence[Scorer] A list of objects that hosts a function to calculate how good the prediction is according to the solution. X_data: Optional[SUPPORTED_XDATA_TYPES] X data used to obtain the predictions scoring_functions: List[Scorer] A list of metrics to calculate multiple losses Returns ------- Dict[str, float] A loss function for each of the provided scorer objects """ score = calculate_scores( solution=solution, prediction=prediction, X_data=X_data, task_type=task_type, metrics=metrics, scoring_functions=scoring_functions, ) scoring_functions = scoring_functions if scoring_functions else [] # we expect a dict() object for which we should calculate the loss loss_dict = dict() for metric_ in scoring_functions + list(metrics): # maybe metric argument is not in scoring_functions # TODO: When metrics are annotated with type_of_target support # we can remove this check if metric_.name not in score: continue loss_dict[metric_.name] = metric_._optimum - score[metric_.name] return loss_dict def compute_single_metric( metric: Scorer, prediction: np.ndarray, solution: np.ndarray, task_type: int, X_data: Optional[SUPPORTED_XDATA_TYPES] = None, ) -> float: """ Returns a metric for the given Auto-Sklearn Scorer object. It's direction is determined by the metric itself. Parameters ---------- solution: np.ndarray The ground truth of the targets prediction: np.ndarray The best estimate from the model, of the given targets task_type: int To understand if the problem task is classification or regression metric: Scorer Object that host a function to calculate how good the prediction is according to the solution. X_data : array-like [n_samples x n_features] X data used to obtain the predictions Returns ------- float """ score = _compute_single_scorer( solution=solution, prediction=prediction, metric=metric, X_data=X_data, task_type=task_type, ) return metric._sign * score def _compute_single_scorer( metric: Scorer, prediction: np.ndarray, solution: np.ndarray, task_type: int, X_data: Optional[SUPPORTED_XDATA_TYPES] = None, ) -> float: """ Returns a score (a magnitude that allows casting the optimization problem as a maximization one) for the given Auto-Sklearn Scorer object Parameters ---------- solution: np.ndarray The ground truth of the targets prediction: np.ndarray The best estimate from the model, of the given targets task_type: int To understand if the problem task is classification or regression metric: Scorer Object that host a function to calculate how good the prediction is according to the solution. X_data : array-like [n_samples x n_features] X data used to obtain the predictions Returns ------- float """ if metric._needs_X: if X_data is None: raise ValueError( f"Metric {metric.name} needs X_data, but X_data is {X_data}" ) elif X_data.shape[0] != solution.shape[0]: raise ValueError( f"X_data has wrong length. " f"Should be {solution.shape[0]}, but is {X_data.shape[0]}" ) if task_type in REGRESSION_TASKS: # TODO put this into the regression metric itself cprediction = sanitize_array(prediction) score = metric(solution, cprediction, X_data=X_data) else: score = metric(solution, prediction, X_data=X_data) return score if task_type in REGRESSION_TASKS: # TODO put this into the regression metric itself cprediction = sanitize_array(prediction) score = metric(solution, cprediction) else: score = metric(solution, prediction) return score # Must be at bottom so all metrics are defined default_metric_for_task: Dict[int, Scorer] = { BINARY_CLASSIFICATION: CLASSIFICATION_METRICS["accuracy"], MULTICLASS_CLASSIFICATION: CLASSIFICATION_METRICS["accuracy"], MULTILABEL_CLASSIFICATION: CLASSIFICATION_METRICS["f1_macro"], REGRESSION: REGRESSION_METRICS["r2"], MULTIOUTPUT_REGRESSION: REGRESSION_METRICS["r2"], }