Skip to content

Tasks

Module for task-related functions and classes.

ClassificationTask

Bases: BaseTask

A class representing a classification task in the promptolution library.

This class handles the loading and management of classification datasets, as well as the evaluation of predictors on these datasets.

Source code in promptolution/tasks/classification_tasks.py
class ClassificationTask(BaseTask):
    """A class representing a classification task in the promptolution library.

    This class handles the loading and management of classification datasets,
    as well as the evaluation of predictors on these datasets.
    """

    def __init__(
        self,
        df: pd.DataFrame,
        task_description: Optional[str] = None,
        x_column: str = "x",
        y_column: str = "y",
        n_subsamples: int = 30,
        eval_strategy: Literal["full", "subsample", "sequential_block", "random_block"] = "full",
        seed: int = 42,
        metric: Callable[[Any, Any], float] = accuracy_score,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the ClassificationTask from a pandas DataFrame.

        Args:
            df (pd.DataFrame): Input DataFrame containing the data
            task_description (str): Description of the task
            x_column (str, optional): Name of the column containing input texts. Defaults to "x".
            y_column (str, optional): Name of the column containing labels. Defaults to "y".
            n_subsamples (int, optional): Number of subsamples to use. No subsampling if None. Defaults to None.
            eval_strategy (str, optional): Subsampling strategy to use. Options:
                - "full": Uses the entire dataset for evaluation.
                - "evaluated": Uses only previously evaluated datapoints from the cache.
                - "subsample": Randomly selects n_subsamples datapoints without replacement.
                - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially.
                - "random_block": Randomly selects a block of block_size consecutive datapoints.
                Defaults to "full".
            seed (int, optional): Random seed for reproducibility. Defaults to 42.
            metric (Callable, optional): Metric to use for evaluation. Defaults to accuracy_score.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        self.metric = metric
        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.task_type = "classification"
        self.ys: List[str] = (
            df[self.y_column].str.lower().values.tolist()
        )  # Ensure y values are lowercase for consistent comparison
        self.classes = np.unique(self.ys)

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate the score for a single prediction."""
        scores = [self.metric([y], [pred]) for pred, y in zip(preds, ys)]
        return np.asarray(scores, dtype=float)

__init__(df, task_description=None, x_column='x', y_column='y', n_subsamples=30, eval_strategy='full', seed=42, metric=accuracy_score, config=None)

Initialize the ClassificationTask from a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing the data

required
task_description str

Description of the task

None
x_column str

Name of the column containing input texts. Defaults to "x".

'x'
y_column str

Name of the column containing labels. Defaults to "y".

'y'
n_subsamples int

Number of subsamples to use. No subsampling if None. Defaults to None.

30
eval_strategy str

Subsampling strategy to use. Options: - "full": Uses the entire dataset for evaluation. - "evaluated": Uses only previously evaluated datapoints from the cache. - "subsample": Randomly selects n_subsamples datapoints without replacement. - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially. - "random_block": Randomly selects a block of block_size consecutive datapoints. Defaults to "full".

'full'
seed int

Random seed for reproducibility. Defaults to 42.

42
metric Callable

Metric to use for evaluation. Defaults to accuracy_score.

accuracy_score
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/classification_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    task_description: Optional[str] = None,
    x_column: str = "x",
    y_column: str = "y",
    n_subsamples: int = 30,
    eval_strategy: Literal["full", "subsample", "sequential_block", "random_block"] = "full",
    seed: int = 42,
    metric: Callable[[Any, Any], float] = accuracy_score,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the ClassificationTask from a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing the data
        task_description (str): Description of the task
        x_column (str, optional): Name of the column containing input texts. Defaults to "x".
        y_column (str, optional): Name of the column containing labels. Defaults to "y".
        n_subsamples (int, optional): Number of subsamples to use. No subsampling if None. Defaults to None.
        eval_strategy (str, optional): Subsampling strategy to use. Options:
            - "full": Uses the entire dataset for evaluation.
            - "evaluated": Uses only previously evaluated datapoints from the cache.
            - "subsample": Randomly selects n_subsamples datapoints without replacement.
            - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially.
            - "random_block": Randomly selects a block of block_size consecutive datapoints.
            Defaults to "full".
        seed (int, optional): Random seed for reproducibility. Defaults to 42.
        metric (Callable, optional): Metric to use for evaluation. Defaults to accuracy_score.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    self.metric = metric
    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.task_type = "classification"
    self.ys: List[str] = (
        df[self.y_column].str.lower().values.tolist()
    )  # Ensure y values are lowercase for consistent comparison
    self.classes = np.unique(self.ys)

JudgeTask

Bases: BaseTask

Task that evaluates a predictor using an LLM-as-a-judge, optionally accepting a ground truth.

Source code in promptolution/tasks/judge_tasks.py
class JudgeTask(BaseTask):
    """Task that evaluates a predictor using an LLM-as-a-judge, optionally accepting a ground truth."""

    def __init__(
        self,
        df: pd.DataFrame,
        judge_llm: "BaseLLM",
        x_column: str = "x",
        y_column: Optional[str] = None,
        task_description: Optional[str] = None,
        n_subsamples: int = 30,
        eval_strategy: "EvalStrategy" = "full",
        seed: int = 42,
        judge_prompt: Optional[str] = None,
        min_score: float = -5.0,
        max_score: float = 5.0,
        config: "ExperimentConfig" = None,
    ):
        """Initialize the JudgeTask.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data.
            judge_llm (BaseLLM): The LLM judging the predictions.
            x_column (str): Name of the column containing input texts.
            y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
            task_description (Optional[str]): Description of the task, parsed to the Judge-LLM and Meta-LLM.
            n_subsamples (int): Number of subsamples to use for evaluation.
            eval_strategy (EvalStrategy): Subsampling strategy to use for evaluation.
            seed (int): Random seed for reproducibility.
            judge_prompt (Optional[str]): Custom prompt for the judge. Note: The score of the Judge will be extracted inside <final_score> tags.
            min_score (float): Minimum score for evaluation.
            max_score (float): Maximum score for evaluation.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        if judge_prompt is None:
            judge_prompt = JUDGE_PROMPT_WITH_GROUND_TRUTH if y_column else JUDGE_PROMPT_WITHOUT_GROUND_TRUTH
        self.judge_prompt = judge_prompt
        self.min_score = min_score
        self.max_score = max_score

        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.judge_llm = judge_llm
        self.task_type = "judge"

    def _construct_judge_prompt(self, x: str, pred: str, y: Optional[str] = None) -> str:
        """Construct the judge prompt based on whether ground truth is available."""
        if y is not None:
            prompt = self.judge_prompt.replace("{ground_truth}", str(y))
        else:
            prompt = self.judge_prompt

        task_description = self.task_description or ""
        prompt = prompt.replace("{task}", task_description).replace("{input}", x).replace("{prediction}", pred)
        return prompt

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate the score for a single prediction using the LLM judge."""
        prompts: List[str] = []
        for x, y, pred in zip(xs, ys, preds):
            judge_prompt = self._construct_judge_prompt(x, pred, y)
            prompts.append(judge_prompt)
        judge_responses = self.judge_llm.get_response(prompts)
        scores_str = extract_from_tag(judge_responses, "<final_score>", "</final_score>")
        scores = []
        for score_str in scores_str:
            try:
                # only numeric chars, - or . are allowed
                score_str = "".join(filter(lambda c: c.isdigit() or c in "-.", score_str))
                score = float(score_str)
                # normalize from [min_score, max_score] to [0, 1]
                score = (score - self.min_score) / (self.max_score - self.min_score)
                score = max(0.0, min(1.0, score))
            except ValueError:
                logger.warning(f"Failed to parse score '{score_str}' as float. Defaulting to a score 0.0.")
                score = 0.0

            scores.append(score)

        return np.asarray(scores, dtype=float)

__init__(df, judge_llm, x_column='x', y_column=None, task_description=None, n_subsamples=30, eval_strategy='full', seed=42, judge_prompt=None, min_score=-5.0, max_score=5.0, config=None)

Initialize the JudgeTask.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing the data.

required
judge_llm BaseLLM

The LLM judging the predictions.

required
x_column str

Name of the column containing input texts.

'x'
y_column Optional[str]

Name of the column containing labels/ground truth (if applicable).

None
task_description Optional[str]

Description of the task, parsed to the Judge-LLM and Meta-LLM.

None
n_subsamples int

Number of subsamples to use for evaluation.

30
eval_strategy EvalStrategy

Subsampling strategy to use for evaluation.

'full'
seed int

Random seed for reproducibility.

42
judge_prompt Optional[str]

Custom prompt for the judge. Note: The score of the Judge will be extracted inside tags.

None
min_score float

Minimum score for evaluation.

-5.0
max_score float

Maximum score for evaluation.

5.0
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/judge_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    judge_llm: "BaseLLM",
    x_column: str = "x",
    y_column: Optional[str] = None,
    task_description: Optional[str] = None,
    n_subsamples: int = 30,
    eval_strategy: "EvalStrategy" = "full",
    seed: int = 42,
    judge_prompt: Optional[str] = None,
    min_score: float = -5.0,
    max_score: float = 5.0,
    config: "ExperimentConfig" = None,
):
    """Initialize the JudgeTask.

    Args:
        df (pd.DataFrame): The input DataFrame containing the data.
        judge_llm (BaseLLM): The LLM judging the predictions.
        x_column (str): Name of the column containing input texts.
        y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
        task_description (Optional[str]): Description of the task, parsed to the Judge-LLM and Meta-LLM.
        n_subsamples (int): Number of subsamples to use for evaluation.
        eval_strategy (EvalStrategy): Subsampling strategy to use for evaluation.
        seed (int): Random seed for reproducibility.
        judge_prompt (Optional[str]): Custom prompt for the judge. Note: The score of the Judge will be extracted inside <final_score> tags.
        min_score (float): Minimum score for evaluation.
        max_score (float): Maximum score for evaluation.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    if judge_prompt is None:
        judge_prompt = JUDGE_PROMPT_WITH_GROUND_TRUTH if y_column else JUDGE_PROMPT_WITHOUT_GROUND_TRUTH
    self.judge_prompt = judge_prompt
    self.min_score = min_score
    self.max_score = max_score

    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.judge_llm = judge_llm
    self.task_type = "judge"

MultiObjectiveTask

Bases: BaseTask

A task that aggregates evaluations across multiple underlying tasks.

Source code in promptolution/tasks/multi_objective_task.py
class MultiObjectiveTask(BaseTask):
    """A task that aggregates evaluations across multiple underlying tasks."""

    def __init__(
        self,
        tasks: List[BaseTask],
        eval_strategy: Optional[EvalStrategy] = None,
    ) -> None:
        """Initialize with a list of tasks sharing subsampling and seed settings."""
        if not tasks:
            raise ValueError("tasks must be a non-empty list")

        primary = tasks[0]
        for t in tasks[1:]:
            assert t.n_subsamples == primary.n_subsamples, "All tasks must share n_subsamples"
            assert t.seed == primary.seed, "All tasks must share seed"
            assert t.eval_strategy == primary.eval_strategy, "All tasks must share eval_strategy"

        combined_description = "This task is a combination of the following tasks:\n" + "\n".join(
            [f"Task: {t.task_description}" for t in tasks if t.task_description]
        )

        super().__init__(
            df=primary.df,
            x_column=primary.x_column,
            y_column=primary.y_column,
            task_description=combined_description,
            n_subsamples=primary.n_subsamples,
            eval_strategy=eval_strategy or primary.eval_strategy,
            seed=primary.seed,
            config=None,
        )
        self.task_type = "multi"
        self.tasks = tasks
        self._scalarized_objective: bool = False

    def activate_scalarized_objective(self) -> None:
        """Force single-objective behavior by equally averaging task scores."""
        self._scalarized_objective = True

    def evaluate(  # type: ignore
        self,
        prompts: Prompt | List[Prompt],
        predictor,
        system_prompts: Optional[str | List[str]] = None,
        eval_strategy: Optional[EvalStrategy] = None,
    ) -> MultiObjectiveEvalResult | EvalResult:
        """Run prediction once, then score via each task's _evaluate."""
        prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
        strategy = eval_strategy or self.eval_strategy

        # Keep block alignment across tasks so block-based strategies stay in sync.
        for task in self.tasks:
            task.block_idx = self.block_idx

        xs, ys = self.subsample(eval_strategy=strategy)

        # Collect all uncached prompt/x/y triples across tasks to predict only once.
        prompts_to_evaluate: List[str] = []
        xs_to_evaluate: List[str] = []
        ys_to_evaluate: List[str] = []
        key_to_index: Dict[Tuple[str, str, str], int] = {}
        cache_keys: List[Tuple[str, str, str]] = []

        for task in self.tasks:
            t_prompts, t_xs, t_ys, t_keys = task._prepare_batch(prompts_list, xs, ys, eval_strategy=strategy)
            for prompt_str, x_val, y_val, key in zip(t_prompts, t_xs, t_ys, t_keys):
                if key in key_to_index:
                    continue
                key_to_index[key] = len(prompts_to_evaluate)
                prompts_to_evaluate.append(prompt_str)
                xs_to_evaluate.append(x_val)
                ys_to_evaluate.append(y_val)
                cache_keys.append(key)

        preds: List[str] = []
        pred_seqs: List[str] = []
        if prompts_to_evaluate:
            preds, pred_seqs = predictor.predict(
                prompts=prompts_to_evaluate,
                xs=xs_to_evaluate,
                system_prompts=system_prompts,
            )

        # Map predictions back to each task and populate caches via _evaluate.
        key_to_pred: Dict[Tuple[str, str, str], Tuple[str, str]] = {
            key: (preds[idx], pred_seqs[idx]) for key, idx in key_to_index.items()
        }

        per_task_results: List[EvalResult] = []
        for task in self.tasks:
            if cache_keys:
                xs_eval = [k[1] for k in cache_keys]
                ys_eval = [k[2] for k in cache_keys]
                preds_eval = [key_to_pred[k][0] for k in cache_keys]
                scores = task._evaluate(xs_eval, ys_eval, preds_eval)
                for score, cache_key in zip(scores, cache_keys):
                    task.eval_cache[cache_key] = score
                    task.seq_cache[cache_key] = key_to_pred[cache_key][1]

            scores_array, agg_scores, seqs = task._collect_results_from_cache(prompts_list, xs, ys)
            input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = task._compute_costs(
                prompts_list, xs, ys, predictor
            )

            per_task_results.append(
                EvalResult(
                    scores=scores_array,
                    agg_scores=agg_scores,
                    sequences=seqs,
                    input_tokens=input_tokens,
                    output_tokens=output_tokens,
                    agg_input_tokens=agg_input_tokens,
                    agg_output_tokens=agg_output_tokens,
                )
            )

        stacked_scores = [r.scores for r in per_task_results]
        stacked_agg_scores = [r.agg_scores for r in per_task_results]

        # Record evaluated blocks for this evaluation (mirroring BaseTask behavior)
        for prompt in prompts_list:
            # Use self.block_idx (the MultiObjectiveTask's block_idx) if in a block strategy
            if strategy in ["sequential_block", "random_block"]:
                if isinstance(self.block_idx, list):
                    self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
                else:
                    self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
            elif strategy == "full":
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

        # Use first task's result for sequences and token counts (they're all the same across tasks)
        first_result = per_task_results[0]

        if self._scalarized_objective:
            return EvalResult(
                scores=np.mean(stacked_scores, axis=0),
                agg_scores=np.mean(stacked_agg_scores, axis=0),
                sequences=first_result.sequences,
                input_tokens=first_result.input_tokens,
                output_tokens=first_result.output_tokens,
                agg_input_tokens=first_result.agg_input_tokens,
                agg_output_tokens=first_result.agg_output_tokens,
            )

        return MultiObjectiveEvalResult(
            scores=stacked_scores,
            agg_scores=stacked_agg_scores,
            sequences=first_result.sequences,
            input_tokens=first_result.input_tokens,
            output_tokens=first_result.output_tokens,
            agg_input_tokens=first_result.agg_input_tokens,
            agg_output_tokens=first_result.agg_output_tokens,
        )

    def _evaluate(self, xs, ys, preds):  # pragma: no cover
        raise NotImplementedError("MultiObjectiveTask overrides evaluate directly")

__init__(tasks, eval_strategy=None)

Initialize with a list of tasks sharing subsampling and seed settings.

Source code in promptolution/tasks/multi_objective_task.py
def __init__(
    self,
    tasks: List[BaseTask],
    eval_strategy: Optional[EvalStrategy] = None,
) -> None:
    """Initialize with a list of tasks sharing subsampling and seed settings."""
    if not tasks:
        raise ValueError("tasks must be a non-empty list")

    primary = tasks[0]
    for t in tasks[1:]:
        assert t.n_subsamples == primary.n_subsamples, "All tasks must share n_subsamples"
        assert t.seed == primary.seed, "All tasks must share seed"
        assert t.eval_strategy == primary.eval_strategy, "All tasks must share eval_strategy"

    combined_description = "This task is a combination of the following tasks:\n" + "\n".join(
        [f"Task: {t.task_description}" for t in tasks if t.task_description]
    )

    super().__init__(
        df=primary.df,
        x_column=primary.x_column,
        y_column=primary.y_column,
        task_description=combined_description,
        n_subsamples=primary.n_subsamples,
        eval_strategy=eval_strategy or primary.eval_strategy,
        seed=primary.seed,
        config=None,
    )
    self.task_type = "multi"
    self.tasks = tasks
    self._scalarized_objective: bool = False

activate_scalarized_objective()

Force single-objective behavior by equally averaging task scores.

Source code in promptolution/tasks/multi_objective_task.py
def activate_scalarized_objective(self) -> None:
    """Force single-objective behavior by equally averaging task scores."""
    self._scalarized_objective = True

evaluate(prompts, predictor, system_prompts=None, eval_strategy=None)

Run prediction once, then score via each task's _evaluate.

Source code in promptolution/tasks/multi_objective_task.py
def evaluate(  # type: ignore
    self,
    prompts: Prompt | List[Prompt],
    predictor,
    system_prompts: Optional[str | List[str]] = None,
    eval_strategy: Optional[EvalStrategy] = None,
) -> MultiObjectiveEvalResult | EvalResult:
    """Run prediction once, then score via each task's _evaluate."""
    prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
    strategy = eval_strategy or self.eval_strategy

    # Keep block alignment across tasks so block-based strategies stay in sync.
    for task in self.tasks:
        task.block_idx = self.block_idx

    xs, ys = self.subsample(eval_strategy=strategy)

    # Collect all uncached prompt/x/y triples across tasks to predict only once.
    prompts_to_evaluate: List[str] = []
    xs_to_evaluate: List[str] = []
    ys_to_evaluate: List[str] = []
    key_to_index: Dict[Tuple[str, str, str], int] = {}
    cache_keys: List[Tuple[str, str, str]] = []

    for task in self.tasks:
        t_prompts, t_xs, t_ys, t_keys = task._prepare_batch(prompts_list, xs, ys, eval_strategy=strategy)
        for prompt_str, x_val, y_val, key in zip(t_prompts, t_xs, t_ys, t_keys):
            if key in key_to_index:
                continue
            key_to_index[key] = len(prompts_to_evaluate)
            prompts_to_evaluate.append(prompt_str)
            xs_to_evaluate.append(x_val)
            ys_to_evaluate.append(y_val)
            cache_keys.append(key)

    preds: List[str] = []
    pred_seqs: List[str] = []
    if prompts_to_evaluate:
        preds, pred_seqs = predictor.predict(
            prompts=prompts_to_evaluate,
            xs=xs_to_evaluate,
            system_prompts=system_prompts,
        )

    # Map predictions back to each task and populate caches via _evaluate.
    key_to_pred: Dict[Tuple[str, str, str], Tuple[str, str]] = {
        key: (preds[idx], pred_seqs[idx]) for key, idx in key_to_index.items()
    }

    per_task_results: List[EvalResult] = []
    for task in self.tasks:
        if cache_keys:
            xs_eval = [k[1] for k in cache_keys]
            ys_eval = [k[2] for k in cache_keys]
            preds_eval = [key_to_pred[k][0] for k in cache_keys]
            scores = task._evaluate(xs_eval, ys_eval, preds_eval)
            for score, cache_key in zip(scores, cache_keys):
                task.eval_cache[cache_key] = score
                task.seq_cache[cache_key] = key_to_pred[cache_key][1]

        scores_array, agg_scores, seqs = task._collect_results_from_cache(prompts_list, xs, ys)
        input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = task._compute_costs(
            prompts_list, xs, ys, predictor
        )

        per_task_results.append(
            EvalResult(
                scores=scores_array,
                agg_scores=agg_scores,
                sequences=seqs,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                agg_input_tokens=agg_input_tokens,
                agg_output_tokens=agg_output_tokens,
            )
        )

    stacked_scores = [r.scores for r in per_task_results]
    stacked_agg_scores = [r.agg_scores for r in per_task_results]

    # Record evaluated blocks for this evaluation (mirroring BaseTask behavior)
    for prompt in prompts_list:
        # Use self.block_idx (the MultiObjectiveTask's block_idx) if in a block strategy
        if strategy in ["sequential_block", "random_block"]:
            if isinstance(self.block_idx, list):
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
            else:
                self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
        elif strategy == "full":
            self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

    # Use first task's result for sequences and token counts (they're all the same across tasks)
    first_result = per_task_results[0]

    if self._scalarized_objective:
        return EvalResult(
            scores=np.mean(stacked_scores, axis=0),
            agg_scores=np.mean(stacked_agg_scores, axis=0),
            sequences=first_result.sequences,
            input_tokens=first_result.input_tokens,
            output_tokens=first_result.output_tokens,
            agg_input_tokens=first_result.agg_input_tokens,
            agg_output_tokens=first_result.agg_output_tokens,
        )

    return MultiObjectiveEvalResult(
        scores=stacked_scores,
        agg_scores=stacked_agg_scores,
        sequences=first_result.sequences,
        input_tokens=first_result.input_tokens,
        output_tokens=first_result.output_tokens,
        agg_input_tokens=first_result.agg_input_tokens,
        agg_output_tokens=first_result.agg_output_tokens,
    )

RewardTask

Bases: BaseTask

A task that evaluates a predictor using a reward function.

This task takes a DataFrame, a column name for input data, and a reward function. The reward function takes in a prediction as input and returns a scalar reward.

Source code in promptolution/tasks/reward_tasks.py
class RewardTask(BaseTask):
    """A task that evaluates a predictor using a reward function.

    This task takes a DataFrame, a column name for input data, and a reward function.
    The reward function takes in a prediction as input and returns a scalar reward.
    """

    def __init__(
        self,
        df: pd.DataFrame,
        reward_function: Callable[[str], float],
        x_column: str = "x",
        y_column: Optional[str] = None,
        reward_columns: Optional[List[str]] = None,
        task_description: Optional[str] = None,
        n_subsamples: int = 30,
        eval_strategy: "EvalStrategy" = "full",
        seed: int = 42,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the RewardTask.

        Args:
            df (pd.DataFrame): Input DataFrame containing the data.
            reward_function (Callable): Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.
            x_column (str, optional): Name of the column containing input texts. Defaults to "x".
            y_column (str, optional): Name of the column containing target texts if available. Defaults to None.
            reward_columns (List[str], optional): Additional dataframe columns to pass as keyword args to reward_function.
            task_description (str, optional): Description of the task.
            n_subsamples (int, optional): Number of subsamples to use. Defaults to 30.
            eval_strategy (str, optional): Subsampling strategy to use. Defaults to "full".
            seed (int, optional): Random seed for reproducibility. Defaults to 42.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        self.reward_function = reward_function
        self.reward_columns = reward_columns or []
        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.task_type = "reward"
        # x -> kwargs to reward function
        km = self.df.set_index(x_column)[self.reward_columns].to_dict("index")
        self.kwargs_map = defaultdict(dict, km)

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate reward for each prediction, passing configured columns as kwargs."""
        kwargs_list = [self.kwargs_map[x] for x in xs]
        rewards = [self.reward_function(pred, **kwargs) for pred, kwargs in zip(preds, kwargs_list)]
        return np.asarray(rewards, dtype=float)

__init__(df, reward_function, x_column='x', y_column=None, reward_columns=None, task_description=None, n_subsamples=30, eval_strategy='full', seed=42, config=None)

Initialize the RewardTask.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing the data.

required
reward_function Callable

Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.

required
x_column str

Name of the column containing input texts. Defaults to "x".

'x'
y_column str

Name of the column containing target texts if available. Defaults to None.

None
reward_columns List[str]

Additional dataframe columns to pass as keyword args to reward_function.

None
task_description str

Description of the task.

None
n_subsamples int

Number of subsamples to use. Defaults to 30.

30
eval_strategy str

Subsampling strategy to use. Defaults to "full".

'full'
seed int

Random seed for reproducibility. Defaults to 42.

42
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/reward_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    reward_function: Callable[[str], float],
    x_column: str = "x",
    y_column: Optional[str] = None,
    reward_columns: Optional[List[str]] = None,
    task_description: Optional[str] = None,
    n_subsamples: int = 30,
    eval_strategy: "EvalStrategy" = "full",
    seed: int = 42,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the RewardTask.

    Args:
        df (pd.DataFrame): Input DataFrame containing the data.
        reward_function (Callable): Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.
        x_column (str, optional): Name of the column containing input texts. Defaults to "x".
        y_column (str, optional): Name of the column containing target texts if available. Defaults to None.
        reward_columns (List[str], optional): Additional dataframe columns to pass as keyword args to reward_function.
        task_description (str, optional): Description of the task.
        n_subsamples (int, optional): Number of subsamples to use. Defaults to 30.
        eval_strategy (str, optional): Subsampling strategy to use. Defaults to "full".
        seed (int, optional): Random seed for reproducibility. Defaults to 42.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    self.reward_function = reward_function
    self.reward_columns = reward_columns or []
    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.task_type = "reward"
    # x -> kwargs to reward function
    km = self.df.set_index(x_column)[self.reward_columns].to_dict("index")
    self.kwargs_map = defaultdict(dict, km)

base_task

Base module for tasks.

BaseTask

Bases: ABC

Abstract base class for tasks in the promptolution library.

Source code in promptolution/tasks/base_task.py
class BaseTask(ABC):
    """Abstract base class for tasks in the promptolution library."""

    def __init__(
        self,
        df: pd.DataFrame,
        x_column: str,
        y_column: Optional[str] = None,
        task_description: Optional[str] = None,
        n_subsamples: int = 30,
        eval_strategy: "EvalStrategy" = "full",
        seed: int = 42,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the BaseTask.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data.
            x_column (str): Name of the column containing input texts.
            y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
            task_description (str): Description of the task.
            n_subsamples (int): Number of subsamples to use for evaluation.
            eval_strategy (Literal): Subsampling strategy ("full", "subsample", "sequential_block", "random_block", "evaluated").
            seed (int): Random seed for reproducibility.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        self.df = df.drop_duplicates(subset=[x_column])
        if len(self.df) != len(df):
            logger.warning(
                f"Duplicate entries detected for x_column '{x_column}' - dropped {len(df) - len(self.df)} rows to enforce uniqueness."
            )
        self.x_column: str = x_column
        self.y_column: Optional[str] = y_column
        self.task_type: TaskType | None = None
        self.task_description: Optional[str] = task_description
        self.n_subsamples: int = n_subsamples
        self.eval_strategy: EvalStrategy = eval_strategy
        self.seed: int = seed

        super().__init__()
        if config is not None:
            config.apply_to(self)

        self.xs: List[str] = self.df[self.x_column].values.astype(str).tolist()
        self.has_y: bool = y_column is not None
        if self.has_y and y_column is not None:
            self.ys: List[str] = self.df[y_column].values.astype(str).tolist()
        else:
            # If no y_column is provided, create a dummy y array
            self.ys = [""] * len(self.xs)

        self.block_idx: int = 0
        self.n_blocks: int = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
        self.rng = np.random.default_rng(seed)

        self.eval_cache: Dict[Tuple[str, str, str], float] = {}  # (prompt, x, y): scores per datapoint
        self.seq_cache: Dict[Tuple[str, str, str], str] = {}  # (prompt, x, y): raw model output per datapoint

        self.prompt_evaluated_blocks: Dict[Prompt, List[int]] = {}  # prompt_str: set of evaluated block indices

    def subsample(
        self, eval_strategy: Optional["EvalStrategy"] = None, block_idx: List[int] | None = None
    ) -> Tuple[List[str], List[str]]:
        """Subsample the dataset based on the specified parameters.

        Args:
            eval_strategy (EvalStrategy, optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None.
            block_idx (List[int] | None, optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.

        Returns:
            Tuple[List[str], List[str]]: Subsampled input data and labels.
        """
        if block_idx is not None:
            indices: List[int] = []
            for idx in block_idx:
                start_idx = idx * self.n_subsamples
                end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
                indices.extend(range(start_idx, end_idx))

            return [self.xs[i] for i in indices], [self.ys[i] for i in indices]

        if eval_strategy is None:
            eval_strategy = self.eval_strategy

        if eval_strategy in ["full", "evaluated"]:
            return self.xs, self.ys
        elif eval_strategy == "subsample":
            indices = self.rng.choice(len(self.xs), min(self.n_subsamples, len(self.xs)), replace=False)
            return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
        elif eval_strategy == "random_block":
            block_id = self.rng.integers(0, self.n_blocks)
            start_idx = block_id * self.n_subsamples
            end_idx = min((block_id + 1) * self.n_subsamples, len(self.xs))
            indices = np.arange(start_idx, end_idx)
            return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
        elif eval_strategy == "sequential_block":
            # Handle case where self.block_idx is a list
            if isinstance(self.block_idx, list):
                indices_list: List[int] = []
                for idx in self.block_idx:
                    start_idx = idx * self.n_subsamples
                    end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
                    indices_list.extend(range(start_idx, end_idx))
                return [self.xs[i] for i in indices_list], [self.ys[i] for i in indices_list]
            else:
                start_idx = self.block_idx * self.n_subsamples
                end_idx = min((self.block_idx + 1) * self.n_subsamples, len(self.xs))
                indices = np.arange(start_idx, end_idx)
                return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
        else:
            raise ValueError(f"Unknown subsampling strategy: '{eval_strategy}'")

    def _prepare_batch(
        self,
        prompts: List[Prompt],
        xs: List[str],
        ys: List[str],
        eval_strategy: Literal["full", "subsample", "sequential_block", "random_block", "evaluated"] = "full",
    ) -> Tuple[List[str], List[str], List[str], List[Tuple[str, str, str]]]:
        """Return uncached prompt/x/y triples for prediction and their cache keys."""
        if eval_strategy == "evaluated":
            return [], [], [], []

        prompts_to_predict: List[str] = []
        xs_to_predict: List[str] = []
        ys_to_predict: List[str] = []
        keys_to_predict: List[Tuple[str, str, str]] = []

        for prompt in prompts:
            for x, y in zip(xs, ys):
                cache_key = (str(prompt), x, str(y))
                if cache_key in self.eval_cache:
                    continue
                prompts_to_predict.append(str(prompt))
                xs_to_predict.append(x)
                ys_to_predict.append(str(y))
                keys_to_predict.append(cache_key)

        return prompts_to_predict, xs_to_predict, ys_to_predict, keys_to_predict

    @staticmethod
    def _cache_key(prompt: Prompt, x: str, y: str) -> Tuple[str, str, str]:
        return (prompt.construct_prompt(), x, y)

    def _collect_results_from_cache(
        self, prompts: List[Prompt], xs: List[str], ys: List[str]
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Collect cached scores and sequences for provided prompts/xs/ys."""
        score_rows: List[List[float]] = []
        seq_rows: List[List[str]] = []

        for prompt in prompts:
            datapoint_scores: List[float] = []
            datapoint_seqs: List[str] = []
            for x, y in zip(xs, ys):
                cache_key = self._cache_key(prompt, x, str(y))
                if cache_key not in self.eval_cache:
                    datapoint_scores.append(np.nan)  # Fill with NaN instead of skipping
                    datapoint_seqs.append("")
                else:
                    datapoint_score = self.eval_cache[cache_key]
                    datapoint_scores.append(datapoint_score)
                    datapoint_seqs.append(self.seq_cache.get(cache_key, ""))
            score_rows.append(datapoint_scores)
            seq_rows.append(datapoint_seqs)

        scores_array = np.array(score_rows, dtype=float)
        agg_scores = np.nanmean(scores_array, axis=1) if scores_array.size else np.array([])
        seqs_array = np.array(seq_rows, dtype=object)
        return scores_array, agg_scores, seqs_array

    def _compute_costs(
        self,
        prompts: List[Prompt],
        xs: List[str],
        ys: List[str],
        predictor: "BasePredictor",
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        token_counter = get_token_counter(predictor.llm)

        per_prompt_inputs: List[np.ndarray] = []
        per_prompt_outputs: List[np.ndarray] = []

        for prompt in prompts:
            prompt_token_count = token_counter(prompt.construct_prompt())
            seq_token_counts: List[float] = []
            input_token_counts = []
            for x, y in zip(xs, ys):
                cache_key = self._cache_key(prompt, x, str(y))
                if cache_key not in self.seq_cache:
                    # Use NaN for missing datapoints instead of skipping
                    seq_token_counts.append(np.nan)
                    input_token_counts.append(np.nan)
                    continue
                seq_text = self.seq_cache[cache_key]
                seq_token_counts.append(token_counter(seq_text))
                input_token_counts.append(token_counter(x))

            prompt_input_tokens = np.array(input_token_counts, dtype=float) + prompt_token_count
            output_token_counts = np.array(seq_token_counts, dtype=float) - np.array(input_token_counts, dtype=float)

            per_prompt_inputs.append(np.asarray(prompt_input_tokens, dtype=float))
            per_prompt_outputs.append(output_token_counts)

        inputs_array = np.vstack(per_prompt_inputs)
        outputs_array = np.vstack(per_prompt_outputs)

        agg_input_tokens = np.nanmean(inputs_array, axis=1)
        agg_output_tokens = np.nanmean(outputs_array, axis=1)

        return inputs_array, outputs_array, agg_input_tokens, agg_output_tokens

    @abstractmethod
    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Abstract method to calculate the score for a predictions.

        This method should be implemented by subclasses based on their specific evaluation logic.
        """
        raise NotImplementedError

    def activate_scalarized_objective(self) -> None:
        """Activate scalarized objective for multi-objective tasks."""
        raise NotImplementedError

    def evaluate(
        self,
        prompts: Union[Prompt, List[Prompt]],
        predictor: "BasePredictor",
        system_prompts: Optional[Union[str, List[str]]] = None,
        eval_strategy: Optional["EvalStrategy"] = None,
        block_idx: int | list[int] | None = None,
    ) -> EvalResult:
        """Evaluate a set of prompts using a given predictor.

        This method orchestrates subsampling, prediction, caching, and result collection.
        Sequences, token costs, raw scores, and aggregated scores are always returned.

        Args:
            prompts (Union[Prompt, List[Prompt]]): A single prompt or a list of prompts to evaluate. Results will be returned in the same order.
            predictor (BasePredictor): The predictor to evaluate the prompts with.
            system_prompts (Optional[Union[str, List[str]]], optional): Optional system prompts to parse to the predictor.
            eval_strategy (Optional[EvalStrategy], optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None, which uses self.eval_strategy.
            block_idx (Optional[int | list[int]], optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.
        """
        prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
        eval_strategy = eval_strategy or self.eval_strategy

        if block_idx is not None and isinstance(block_idx, int):
            block_idx = [block_idx]

        xs, ys = self.subsample(eval_strategy=eval_strategy, block_idx=block_idx)
        (
            prompts_to_evaluate,
            xs_to_evaluate,
            ys_to_evaluate,
            cache_keys,
        ) = self._prepare_batch(prompts_list, xs, ys, eval_strategy=eval_strategy)

        preds, pred_seqs = predictor.predict(
            prompts=prompts_to_evaluate,
            xs=xs_to_evaluate,
            system_prompts=system_prompts,
        )

        scores = self._evaluate(xs_to_evaluate, ys_to_evaluate, preds)
        for i, cache_key in enumerate(cache_keys):
            self.eval_cache[cache_key] = scores[i]
            self.seq_cache[cache_key] = str(pred_seqs[i])

        scores, agg_scores, seqs = self._collect_results_from_cache(
            prompts_list,
            xs,
            ys,
        )

        # Record evaluated block for block strategies
        for prompt in prompts_list:
            if block_idx is not None:
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(block_idx)
            elif eval_strategy in ["sequential_block", "random_block"]:
                # Handle case where self.block_idx is a list
                if isinstance(self.block_idx, list):
                    self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
                else:
                    self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
            elif eval_strategy == "full":
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

        input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = self._compute_costs(
            prompts_list, xs, ys, predictor
        )

        return EvalResult(
            scores=scores,
            agg_scores=agg_scores,
            sequences=seqs,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            agg_input_tokens=agg_input_tokens,
            agg_output_tokens=agg_output_tokens,
        )

    def pop_datapoints(self, n: Optional[int] = None, frac: Optional[float] = None) -> pd.DataFrame:
        """Pop a number of datapoints from the dataset.

        Args:
            n (int, optional): Number of datapoints to pop. Defaults to None.
            frac (float, optional): Fraction of datapoints to pop. Defaults to None.

        Returns:
            pd.DataFrame: DataFrame containing the popped datapoints.
        """
        assert n is None or frac is None, "Only one of n or frac can be specified."
        if n is not None:
            indices = self.rng.choice(len(self.xs), n, replace=False)
        elif frac is not None:
            indices = self.rng.choice(len(self.xs), int(len(self.xs) * frac), replace=False)
        else:
            raise ValueError("Either n or frac must be specified.")

        popped_xs = [self.xs[i] for i in indices]
        popped_ys = [self.ys[i] for i in indices]
        df_popped = pd.DataFrame({self.x_column: popped_xs, self.y_column: popped_ys})

        self.xs = [x for i, x in enumerate(self.xs) if i not in indices]
        self.ys = [y for i, y in enumerate(self.ys) if i not in indices]

        # Update n_blocks and block_idx based on the new dataset size
        self.n_blocks = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
        self.block_idx = min(self.block_idx, self.n_blocks - 1) if self.n_blocks > 0 else 0

        # Clear cache for popped items (optional, but good practice if memory is a concern)
        keys_to_remove = []
        for key in self.eval_cache:
            if key[1] in popped_xs and key[2] in popped_ys:  # Check if the x and y correspond to popped data
                keys_to_remove.append(key)
        for key in keys_to_remove:
            self.eval_cache.pop(key, None)
            self.seq_cache.pop(key, None)

        return df_popped

    def increment_block_idx(self) -> None:
        """Increment the block index for subsampling.

        Raises:
            ValueError: If the eval_strategy does not contain "block".
        """
        if "block" not in self.eval_strategy:
            raise ValueError("Block increment is only valid for block subsampling.")
        assert isinstance(self.block_idx, int), "Block index must be an integer to increment."
        self.block_idx += 1
        if self.n_blocks > 0:  # Ensure n_blocks is not zero to avoid division by zero
            self.block_idx %= self.n_blocks
        else:
            self.block_idx = 0  # If no blocks, reset to 0

    def reset_block_idx(self) -> None:
        """Reset the block index for subsampling.

        Raises:
            ValueError: If the eval_strategy does not contain "block".
        """
        if "block" not in self.eval_strategy:
            raise ValueError("Block reset is only valid for block subsampling.")
        self.block_idx = 0

    def set_block_idx(self, idx: int) -> None:
        """Set the block index (or indices) for block subsampling strategies."""
        if "block" not in self.eval_strategy:
            raise ValueError("Block assignment is only valid for block subsampling.")

        assert isinstance(idx, int), "Block index must be an integer"

        self.block_idx = idx

    def get_evaluated_blocks(self, prompts: Union[Prompt, List[Prompt]]) -> Dict[Prompt, List[int]]:
        """Return mapping of prompt string to evaluated block indices."""
        prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
        return {p: list(self.prompt_evaluated_blocks.get(p, [])) for p in prompts_list}

__init__(df, x_column, y_column=None, task_description=None, n_subsamples=30, eval_strategy='full', seed=42, config=None)

Initialize the BaseTask.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing the data.

required
x_column str

Name of the column containing input texts.

required
y_column Optional[str]

Name of the column containing labels/ground truth (if applicable).

None
task_description str

Description of the task.

None
n_subsamples int

Number of subsamples to use for evaluation.

30
eval_strategy Literal

Subsampling strategy ("full", "subsample", "sequential_block", "random_block", "evaluated").

'full'
seed int

Random seed for reproducibility.

42
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/base_task.py
def __init__(
    self,
    df: pd.DataFrame,
    x_column: str,
    y_column: Optional[str] = None,
    task_description: Optional[str] = None,
    n_subsamples: int = 30,
    eval_strategy: "EvalStrategy" = "full",
    seed: int = 42,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the BaseTask.

    Args:
        df (pd.DataFrame): The input DataFrame containing the data.
        x_column (str): Name of the column containing input texts.
        y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
        task_description (str): Description of the task.
        n_subsamples (int): Number of subsamples to use for evaluation.
        eval_strategy (Literal): Subsampling strategy ("full", "subsample", "sequential_block", "random_block", "evaluated").
        seed (int): Random seed for reproducibility.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    self.df = df.drop_duplicates(subset=[x_column])
    if len(self.df) != len(df):
        logger.warning(
            f"Duplicate entries detected for x_column '{x_column}' - dropped {len(df) - len(self.df)} rows to enforce uniqueness."
        )
    self.x_column: str = x_column
    self.y_column: Optional[str] = y_column
    self.task_type: TaskType | None = None
    self.task_description: Optional[str] = task_description
    self.n_subsamples: int = n_subsamples
    self.eval_strategy: EvalStrategy = eval_strategy
    self.seed: int = seed

    super().__init__()
    if config is not None:
        config.apply_to(self)

    self.xs: List[str] = self.df[self.x_column].values.astype(str).tolist()
    self.has_y: bool = y_column is not None
    if self.has_y and y_column is not None:
        self.ys: List[str] = self.df[y_column].values.astype(str).tolist()
    else:
        # If no y_column is provided, create a dummy y array
        self.ys = [""] * len(self.xs)

    self.block_idx: int = 0
    self.n_blocks: int = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
    self.rng = np.random.default_rng(seed)

    self.eval_cache: Dict[Tuple[str, str, str], float] = {}  # (prompt, x, y): scores per datapoint
    self.seq_cache: Dict[Tuple[str, str, str], str] = {}  # (prompt, x, y): raw model output per datapoint

    self.prompt_evaluated_blocks: Dict[Prompt, List[int]] = {}  # prompt_str: set of evaluated block indices

activate_scalarized_objective()

Activate scalarized objective for multi-objective tasks.

Source code in promptolution/tasks/base_task.py
def activate_scalarized_objective(self) -> None:
    """Activate scalarized objective for multi-objective tasks."""
    raise NotImplementedError

evaluate(prompts, predictor, system_prompts=None, eval_strategy=None, block_idx=None)

Evaluate a set of prompts using a given predictor.

This method orchestrates subsampling, prediction, caching, and result collection. Sequences, token costs, raw scores, and aggregated scores are always returned.

Parameters:

Name Type Description Default
prompts Union[Prompt, List[Prompt]]

A single prompt or a list of prompts to evaluate. Results will be returned in the same order.

required
predictor BasePredictor

The predictor to evaluate the prompts with.

required
system_prompts Optional[Union[str, List[str]]]

Optional system prompts to parse to the predictor.

None
eval_strategy Optional[EvalStrategy]

Subsampling strategy to use instead of self.eval_strategy. Defaults to None, which uses self.eval_strategy.

None
block_idx Optional[int | list[int]]

Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.

None
Source code in promptolution/tasks/base_task.py
def evaluate(
    self,
    prompts: Union[Prompt, List[Prompt]],
    predictor: "BasePredictor",
    system_prompts: Optional[Union[str, List[str]]] = None,
    eval_strategy: Optional["EvalStrategy"] = None,
    block_idx: int | list[int] | None = None,
) -> EvalResult:
    """Evaluate a set of prompts using a given predictor.

    This method orchestrates subsampling, prediction, caching, and result collection.
    Sequences, token costs, raw scores, and aggregated scores are always returned.

    Args:
        prompts (Union[Prompt, List[Prompt]]): A single prompt or a list of prompts to evaluate. Results will be returned in the same order.
        predictor (BasePredictor): The predictor to evaluate the prompts with.
        system_prompts (Optional[Union[str, List[str]]], optional): Optional system prompts to parse to the predictor.
        eval_strategy (Optional[EvalStrategy], optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None, which uses self.eval_strategy.
        block_idx (Optional[int | list[int]], optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.
    """
    prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
    eval_strategy = eval_strategy or self.eval_strategy

    if block_idx is not None and isinstance(block_idx, int):
        block_idx = [block_idx]

    xs, ys = self.subsample(eval_strategy=eval_strategy, block_idx=block_idx)
    (
        prompts_to_evaluate,
        xs_to_evaluate,
        ys_to_evaluate,
        cache_keys,
    ) = self._prepare_batch(prompts_list, xs, ys, eval_strategy=eval_strategy)

    preds, pred_seqs = predictor.predict(
        prompts=prompts_to_evaluate,
        xs=xs_to_evaluate,
        system_prompts=system_prompts,
    )

    scores = self._evaluate(xs_to_evaluate, ys_to_evaluate, preds)
    for i, cache_key in enumerate(cache_keys):
        self.eval_cache[cache_key] = scores[i]
        self.seq_cache[cache_key] = str(pred_seqs[i])

    scores, agg_scores, seqs = self._collect_results_from_cache(
        prompts_list,
        xs,
        ys,
    )

    # Record evaluated block for block strategies
    for prompt in prompts_list:
        if block_idx is not None:
            self.prompt_evaluated_blocks.setdefault(prompt, []).extend(block_idx)
        elif eval_strategy in ["sequential_block", "random_block"]:
            # Handle case where self.block_idx is a list
            if isinstance(self.block_idx, list):
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
            else:
                self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
        elif eval_strategy == "full":
            self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

    input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = self._compute_costs(
        prompts_list, xs, ys, predictor
    )

    return EvalResult(
        scores=scores,
        agg_scores=agg_scores,
        sequences=seqs,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        agg_input_tokens=agg_input_tokens,
        agg_output_tokens=agg_output_tokens,
    )

get_evaluated_blocks(prompts)

Return mapping of prompt string to evaluated block indices.

Source code in promptolution/tasks/base_task.py
def get_evaluated_blocks(self, prompts: Union[Prompt, List[Prompt]]) -> Dict[Prompt, List[int]]:
    """Return mapping of prompt string to evaluated block indices."""
    prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
    return {p: list(self.prompt_evaluated_blocks.get(p, [])) for p in prompts_list}

increment_block_idx()

Increment the block index for subsampling.

Raises:

Type Description
ValueError

If the eval_strategy does not contain "block".

Source code in promptolution/tasks/base_task.py
def increment_block_idx(self) -> None:
    """Increment the block index for subsampling.

    Raises:
        ValueError: If the eval_strategy does not contain "block".
    """
    if "block" not in self.eval_strategy:
        raise ValueError("Block increment is only valid for block subsampling.")
    assert isinstance(self.block_idx, int), "Block index must be an integer to increment."
    self.block_idx += 1
    if self.n_blocks > 0:  # Ensure n_blocks is not zero to avoid division by zero
        self.block_idx %= self.n_blocks
    else:
        self.block_idx = 0  # If no blocks, reset to 0

pop_datapoints(n=None, frac=None)

Pop a number of datapoints from the dataset.

Parameters:

Name Type Description Default
n int

Number of datapoints to pop. Defaults to None.

None
frac float

Fraction of datapoints to pop. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the popped datapoints.

Source code in promptolution/tasks/base_task.py
def pop_datapoints(self, n: Optional[int] = None, frac: Optional[float] = None) -> pd.DataFrame:
    """Pop a number of datapoints from the dataset.

    Args:
        n (int, optional): Number of datapoints to pop. Defaults to None.
        frac (float, optional): Fraction of datapoints to pop. Defaults to None.

    Returns:
        pd.DataFrame: DataFrame containing the popped datapoints.
    """
    assert n is None or frac is None, "Only one of n or frac can be specified."
    if n is not None:
        indices = self.rng.choice(len(self.xs), n, replace=False)
    elif frac is not None:
        indices = self.rng.choice(len(self.xs), int(len(self.xs) * frac), replace=False)
    else:
        raise ValueError("Either n or frac must be specified.")

    popped_xs = [self.xs[i] for i in indices]
    popped_ys = [self.ys[i] for i in indices]
    df_popped = pd.DataFrame({self.x_column: popped_xs, self.y_column: popped_ys})

    self.xs = [x for i, x in enumerate(self.xs) if i not in indices]
    self.ys = [y for i, y in enumerate(self.ys) if i not in indices]

    # Update n_blocks and block_idx based on the new dataset size
    self.n_blocks = len(self.xs) // self.n_subsamples if self.n_subsamples > 0 else 1
    self.block_idx = min(self.block_idx, self.n_blocks - 1) if self.n_blocks > 0 else 0

    # Clear cache for popped items (optional, but good practice if memory is a concern)
    keys_to_remove = []
    for key in self.eval_cache:
        if key[1] in popped_xs and key[2] in popped_ys:  # Check if the x and y correspond to popped data
            keys_to_remove.append(key)
    for key in keys_to_remove:
        self.eval_cache.pop(key, None)
        self.seq_cache.pop(key, None)

    return df_popped

reset_block_idx()

Reset the block index for subsampling.

Raises:

Type Description
ValueError

If the eval_strategy does not contain "block".

Source code in promptolution/tasks/base_task.py
def reset_block_idx(self) -> None:
    """Reset the block index for subsampling.

    Raises:
        ValueError: If the eval_strategy does not contain "block".
    """
    if "block" not in self.eval_strategy:
        raise ValueError("Block reset is only valid for block subsampling.")
    self.block_idx = 0

set_block_idx(idx)

Set the block index (or indices) for block subsampling strategies.

Source code in promptolution/tasks/base_task.py
def set_block_idx(self, idx: int) -> None:
    """Set the block index (or indices) for block subsampling strategies."""
    if "block" not in self.eval_strategy:
        raise ValueError("Block assignment is only valid for block subsampling.")

    assert isinstance(idx, int), "Block index must be an integer"

    self.block_idx = idx

subsample(eval_strategy=None, block_idx=None)

Subsample the dataset based on the specified parameters.

Parameters:

Name Type Description Default
eval_strategy EvalStrategy

Subsampling strategy to use instead of self.eval_strategy. Defaults to None.

None
block_idx List[int] | None

Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.

None

Returns:

Type Description
Tuple[List[str], List[str]]

Tuple[List[str], List[str]]: Subsampled input data and labels.

Source code in promptolution/tasks/base_task.py
def subsample(
    self, eval_strategy: Optional["EvalStrategy"] = None, block_idx: List[int] | None = None
) -> Tuple[List[str], List[str]]:
    """Subsample the dataset based on the specified parameters.

    Args:
        eval_strategy (EvalStrategy, optional): Subsampling strategy to use instead of self.eval_strategy. Defaults to None.
        block_idx (List[int] | None, optional): Specific block index or indices to evaluate, overriding eval_strategy. Defaults to None.

    Returns:
        Tuple[List[str], List[str]]: Subsampled input data and labels.
    """
    if block_idx is not None:
        indices: List[int] = []
        for idx in block_idx:
            start_idx = idx * self.n_subsamples
            end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
            indices.extend(range(start_idx, end_idx))

        return [self.xs[i] for i in indices], [self.ys[i] for i in indices]

    if eval_strategy is None:
        eval_strategy = self.eval_strategy

    if eval_strategy in ["full", "evaluated"]:
        return self.xs, self.ys
    elif eval_strategy == "subsample":
        indices = self.rng.choice(len(self.xs), min(self.n_subsamples, len(self.xs)), replace=False)
        return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
    elif eval_strategy == "random_block":
        block_id = self.rng.integers(0, self.n_blocks)
        start_idx = block_id * self.n_subsamples
        end_idx = min((block_id + 1) * self.n_subsamples, len(self.xs))
        indices = np.arange(start_idx, end_idx)
        return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
    elif eval_strategy == "sequential_block":
        # Handle case where self.block_idx is a list
        if isinstance(self.block_idx, list):
            indices_list: List[int] = []
            for idx in self.block_idx:
                start_idx = idx * self.n_subsamples
                end_idx = min((idx + 1) * self.n_subsamples, len(self.xs))
                indices_list.extend(range(start_idx, end_idx))
            return [self.xs[i] for i in indices_list], [self.ys[i] for i in indices_list]
        else:
            start_idx = self.block_idx * self.n_subsamples
            end_idx = min((self.block_idx + 1) * self.n_subsamples, len(self.xs))
            indices = np.arange(start_idx, end_idx)
            return [self.xs[i] for i in indices], [self.ys[i] for i in indices]
    else:
        raise ValueError(f"Unknown subsampling strategy: '{eval_strategy}'")

EvalResult dataclass

Evaluation outputs including scores, sequences, and costs.

Source code in promptolution/tasks/base_task.py
@dataclass
class EvalResult:
    """Evaluation outputs including scores, sequences, and costs."""

    scores: np.ndarray  # shape: (n_prompts, n_datapoints)
    agg_scores: np.ndarray  # shape: (n_prompts,) - mean over datapoints
    sequences: np.ndarray  # shape: (n_prompts, n_datapoints)
    input_tokens: np.ndarray  # shape: (n_prompts, n_datapoints)
    output_tokens: np.ndarray  # shape: (n_prompts, n_datapoints)
    agg_input_tokens: np.ndarray  # shape: (n_prompts,) - mean over datapoints
    agg_output_tokens: np.ndarray  # shape: (n_prompts,) - mean over datapoints

classification_tasks

Module for classification tasks.

ClassificationTask

Bases: BaseTask

A class representing a classification task in the promptolution library.

This class handles the loading and management of classification datasets, as well as the evaluation of predictors on these datasets.

Source code in promptolution/tasks/classification_tasks.py
class ClassificationTask(BaseTask):
    """A class representing a classification task in the promptolution library.

    This class handles the loading and management of classification datasets,
    as well as the evaluation of predictors on these datasets.
    """

    def __init__(
        self,
        df: pd.DataFrame,
        task_description: Optional[str] = None,
        x_column: str = "x",
        y_column: str = "y",
        n_subsamples: int = 30,
        eval_strategy: Literal["full", "subsample", "sequential_block", "random_block"] = "full",
        seed: int = 42,
        metric: Callable[[Any, Any], float] = accuracy_score,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the ClassificationTask from a pandas DataFrame.

        Args:
            df (pd.DataFrame): Input DataFrame containing the data
            task_description (str): Description of the task
            x_column (str, optional): Name of the column containing input texts. Defaults to "x".
            y_column (str, optional): Name of the column containing labels. Defaults to "y".
            n_subsamples (int, optional): Number of subsamples to use. No subsampling if None. Defaults to None.
            eval_strategy (str, optional): Subsampling strategy to use. Options:
                - "full": Uses the entire dataset for evaluation.
                - "evaluated": Uses only previously evaluated datapoints from the cache.
                - "subsample": Randomly selects n_subsamples datapoints without replacement.
                - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially.
                - "random_block": Randomly selects a block of block_size consecutive datapoints.
                Defaults to "full".
            seed (int, optional): Random seed for reproducibility. Defaults to 42.
            metric (Callable, optional): Metric to use for evaluation. Defaults to accuracy_score.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        self.metric = metric
        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.task_type = "classification"
        self.ys: List[str] = (
            df[self.y_column].str.lower().values.tolist()
        )  # Ensure y values are lowercase for consistent comparison
        self.classes = np.unique(self.ys)

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate the score for a single prediction."""
        scores = [self.metric([y], [pred]) for pred, y in zip(preds, ys)]
        return np.asarray(scores, dtype=float)

__init__(df, task_description=None, x_column='x', y_column='y', n_subsamples=30, eval_strategy='full', seed=42, metric=accuracy_score, config=None)

Initialize the ClassificationTask from a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing the data

required
task_description str

Description of the task

None
x_column str

Name of the column containing input texts. Defaults to "x".

'x'
y_column str

Name of the column containing labels. Defaults to "y".

'y'
n_subsamples int

Number of subsamples to use. No subsampling if None. Defaults to None.

30
eval_strategy str

Subsampling strategy to use. Options: - "full": Uses the entire dataset for evaluation. - "evaluated": Uses only previously evaluated datapoints from the cache. - "subsample": Randomly selects n_subsamples datapoints without replacement. - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially. - "random_block": Randomly selects a block of block_size consecutive datapoints. Defaults to "full".

'full'
seed int

Random seed for reproducibility. Defaults to 42.

42
metric Callable

Metric to use for evaluation. Defaults to accuracy_score.

accuracy_score
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/classification_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    task_description: Optional[str] = None,
    x_column: str = "x",
    y_column: str = "y",
    n_subsamples: int = 30,
    eval_strategy: Literal["full", "subsample", "sequential_block", "random_block"] = "full",
    seed: int = 42,
    metric: Callable[[Any, Any], float] = accuracy_score,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the ClassificationTask from a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing the data
        task_description (str): Description of the task
        x_column (str, optional): Name of the column containing input texts. Defaults to "x".
        y_column (str, optional): Name of the column containing labels. Defaults to "y".
        n_subsamples (int, optional): Number of subsamples to use. No subsampling if None. Defaults to None.
        eval_strategy (str, optional): Subsampling strategy to use. Options:
            - "full": Uses the entire dataset for evaluation.
            - "evaluated": Uses only previously evaluated datapoints from the cache.
            - "subsample": Randomly selects n_subsamples datapoints without replacement.
            - "sequential_block": Uses a block of block_size consecutive datapoints, advancing through blocks sequentially.
            - "random_block": Randomly selects a block of block_size consecutive datapoints.
            Defaults to "full".
        seed (int, optional): Random seed for reproducibility. Defaults to 42.
        metric (Callable, optional): Metric to use for evaluation. Defaults to accuracy_score.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    self.metric = metric
    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.task_type = "classification"
    self.ys: List[str] = (
        df[self.y_column].str.lower().values.tolist()
    )  # Ensure y values are lowercase for consistent comparison
    self.classes = np.unique(self.ys)

judge_tasks

Module for judge tasks.

JudgeTask

Bases: BaseTask

Task that evaluates a predictor using an LLM-as-a-judge, optionally accepting a ground truth.

Source code in promptolution/tasks/judge_tasks.py
class JudgeTask(BaseTask):
    """Task that evaluates a predictor using an LLM-as-a-judge, optionally accepting a ground truth."""

    def __init__(
        self,
        df: pd.DataFrame,
        judge_llm: "BaseLLM",
        x_column: str = "x",
        y_column: Optional[str] = None,
        task_description: Optional[str] = None,
        n_subsamples: int = 30,
        eval_strategy: "EvalStrategy" = "full",
        seed: int = 42,
        judge_prompt: Optional[str] = None,
        min_score: float = -5.0,
        max_score: float = 5.0,
        config: "ExperimentConfig" = None,
    ):
        """Initialize the JudgeTask.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data.
            judge_llm (BaseLLM): The LLM judging the predictions.
            x_column (str): Name of the column containing input texts.
            y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
            task_description (Optional[str]): Description of the task, parsed to the Judge-LLM and Meta-LLM.
            n_subsamples (int): Number of subsamples to use for evaluation.
            eval_strategy (EvalStrategy): Subsampling strategy to use for evaluation.
            seed (int): Random seed for reproducibility.
            judge_prompt (Optional[str]): Custom prompt for the judge. Note: The score of the Judge will be extracted inside <final_score> tags.
            min_score (float): Minimum score for evaluation.
            max_score (float): Maximum score for evaluation.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        if judge_prompt is None:
            judge_prompt = JUDGE_PROMPT_WITH_GROUND_TRUTH if y_column else JUDGE_PROMPT_WITHOUT_GROUND_TRUTH
        self.judge_prompt = judge_prompt
        self.min_score = min_score
        self.max_score = max_score

        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.judge_llm = judge_llm
        self.task_type = "judge"

    def _construct_judge_prompt(self, x: str, pred: str, y: Optional[str] = None) -> str:
        """Construct the judge prompt based on whether ground truth is available."""
        if y is not None:
            prompt = self.judge_prompt.replace("{ground_truth}", str(y))
        else:
            prompt = self.judge_prompt

        task_description = self.task_description or ""
        prompt = prompt.replace("{task}", task_description).replace("{input}", x).replace("{prediction}", pred)
        return prompt

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate the score for a single prediction using the LLM judge."""
        prompts: List[str] = []
        for x, y, pred in zip(xs, ys, preds):
            judge_prompt = self._construct_judge_prompt(x, pred, y)
            prompts.append(judge_prompt)
        judge_responses = self.judge_llm.get_response(prompts)
        scores_str = extract_from_tag(judge_responses, "<final_score>", "</final_score>")
        scores = []
        for score_str in scores_str:
            try:
                # only numeric chars, - or . are allowed
                score_str = "".join(filter(lambda c: c.isdigit() or c in "-.", score_str))
                score = float(score_str)
                # normalize from [min_score, max_score] to [0, 1]
                score = (score - self.min_score) / (self.max_score - self.min_score)
                score = max(0.0, min(1.0, score))
            except ValueError:
                logger.warning(f"Failed to parse score '{score_str}' as float. Defaulting to a score 0.0.")
                score = 0.0

            scores.append(score)

        return np.asarray(scores, dtype=float)

__init__(df, judge_llm, x_column='x', y_column=None, task_description=None, n_subsamples=30, eval_strategy='full', seed=42, judge_prompt=None, min_score=-5.0, max_score=5.0, config=None)

Initialize the JudgeTask.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing the data.

required
judge_llm BaseLLM

The LLM judging the predictions.

required
x_column str

Name of the column containing input texts.

'x'
y_column Optional[str]

Name of the column containing labels/ground truth (if applicable).

None
task_description Optional[str]

Description of the task, parsed to the Judge-LLM and Meta-LLM.

None
n_subsamples int

Number of subsamples to use for evaluation.

30
eval_strategy EvalStrategy

Subsampling strategy to use for evaluation.

'full'
seed int

Random seed for reproducibility.

42
judge_prompt Optional[str]

Custom prompt for the judge. Note: The score of the Judge will be extracted inside tags.

None
min_score float

Minimum score for evaluation.

-5.0
max_score float

Maximum score for evaluation.

5.0
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/judge_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    judge_llm: "BaseLLM",
    x_column: str = "x",
    y_column: Optional[str] = None,
    task_description: Optional[str] = None,
    n_subsamples: int = 30,
    eval_strategy: "EvalStrategy" = "full",
    seed: int = 42,
    judge_prompt: Optional[str] = None,
    min_score: float = -5.0,
    max_score: float = 5.0,
    config: "ExperimentConfig" = None,
):
    """Initialize the JudgeTask.

    Args:
        df (pd.DataFrame): The input DataFrame containing the data.
        judge_llm (BaseLLM): The LLM judging the predictions.
        x_column (str): Name of the column containing input texts.
        y_column (Optional[str]): Name of the column containing labels/ground truth (if applicable).
        task_description (Optional[str]): Description of the task, parsed to the Judge-LLM and Meta-LLM.
        n_subsamples (int): Number of subsamples to use for evaluation.
        eval_strategy (EvalStrategy): Subsampling strategy to use for evaluation.
        seed (int): Random seed for reproducibility.
        judge_prompt (Optional[str]): Custom prompt for the judge. Note: The score of the Judge will be extracted inside <final_score> tags.
        min_score (float): Minimum score for evaluation.
        max_score (float): Maximum score for evaluation.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    if judge_prompt is None:
        judge_prompt = JUDGE_PROMPT_WITH_GROUND_TRUTH if y_column else JUDGE_PROMPT_WITHOUT_GROUND_TRUTH
    self.judge_prompt = judge_prompt
    self.min_score = min_score
    self.max_score = max_score

    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.judge_llm = judge_llm
    self.task_type = "judge"

multi_objective_task

Multi-objective task wrapper that evaluates prompts across multiple tasks.

MultiObjectiveEvalResult dataclass

Container for per-task evaluation outputs in multi-objective runs.

Source code in promptolution/tasks/multi_objective_task.py
@dataclass
class MultiObjectiveEvalResult:
    """Container for per-task evaluation outputs in multi-objective runs."""

    scores: List[np.ndarray]
    agg_scores: List[np.ndarray]
    sequences: np.ndarray
    input_tokens: np.ndarray
    output_tokens: np.ndarray
    agg_input_tokens: np.ndarray
    agg_output_tokens: np.ndarray

MultiObjectiveTask

Bases: BaseTask

A task that aggregates evaluations across multiple underlying tasks.

Source code in promptolution/tasks/multi_objective_task.py
class MultiObjectiveTask(BaseTask):
    """A task that aggregates evaluations across multiple underlying tasks."""

    def __init__(
        self,
        tasks: List[BaseTask],
        eval_strategy: Optional[EvalStrategy] = None,
    ) -> None:
        """Initialize with a list of tasks sharing subsampling and seed settings."""
        if not tasks:
            raise ValueError("tasks must be a non-empty list")

        primary = tasks[0]
        for t in tasks[1:]:
            assert t.n_subsamples == primary.n_subsamples, "All tasks must share n_subsamples"
            assert t.seed == primary.seed, "All tasks must share seed"
            assert t.eval_strategy == primary.eval_strategy, "All tasks must share eval_strategy"

        combined_description = "This task is a combination of the following tasks:\n" + "\n".join(
            [f"Task: {t.task_description}" for t in tasks if t.task_description]
        )

        super().__init__(
            df=primary.df,
            x_column=primary.x_column,
            y_column=primary.y_column,
            task_description=combined_description,
            n_subsamples=primary.n_subsamples,
            eval_strategy=eval_strategy or primary.eval_strategy,
            seed=primary.seed,
            config=None,
        )
        self.task_type = "multi"
        self.tasks = tasks
        self._scalarized_objective: bool = False

    def activate_scalarized_objective(self) -> None:
        """Force single-objective behavior by equally averaging task scores."""
        self._scalarized_objective = True

    def evaluate(  # type: ignore
        self,
        prompts: Prompt | List[Prompt],
        predictor,
        system_prompts: Optional[str | List[str]] = None,
        eval_strategy: Optional[EvalStrategy] = None,
    ) -> MultiObjectiveEvalResult | EvalResult:
        """Run prediction once, then score via each task's _evaluate."""
        prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
        strategy = eval_strategy or self.eval_strategy

        # Keep block alignment across tasks so block-based strategies stay in sync.
        for task in self.tasks:
            task.block_idx = self.block_idx

        xs, ys = self.subsample(eval_strategy=strategy)

        # Collect all uncached prompt/x/y triples across tasks to predict only once.
        prompts_to_evaluate: List[str] = []
        xs_to_evaluate: List[str] = []
        ys_to_evaluate: List[str] = []
        key_to_index: Dict[Tuple[str, str, str], int] = {}
        cache_keys: List[Tuple[str, str, str]] = []

        for task in self.tasks:
            t_prompts, t_xs, t_ys, t_keys = task._prepare_batch(prompts_list, xs, ys, eval_strategy=strategy)
            for prompt_str, x_val, y_val, key in zip(t_prompts, t_xs, t_ys, t_keys):
                if key in key_to_index:
                    continue
                key_to_index[key] = len(prompts_to_evaluate)
                prompts_to_evaluate.append(prompt_str)
                xs_to_evaluate.append(x_val)
                ys_to_evaluate.append(y_val)
                cache_keys.append(key)

        preds: List[str] = []
        pred_seqs: List[str] = []
        if prompts_to_evaluate:
            preds, pred_seqs = predictor.predict(
                prompts=prompts_to_evaluate,
                xs=xs_to_evaluate,
                system_prompts=system_prompts,
            )

        # Map predictions back to each task and populate caches via _evaluate.
        key_to_pred: Dict[Tuple[str, str, str], Tuple[str, str]] = {
            key: (preds[idx], pred_seqs[idx]) for key, idx in key_to_index.items()
        }

        per_task_results: List[EvalResult] = []
        for task in self.tasks:
            if cache_keys:
                xs_eval = [k[1] for k in cache_keys]
                ys_eval = [k[2] for k in cache_keys]
                preds_eval = [key_to_pred[k][0] for k in cache_keys]
                scores = task._evaluate(xs_eval, ys_eval, preds_eval)
                for score, cache_key in zip(scores, cache_keys):
                    task.eval_cache[cache_key] = score
                    task.seq_cache[cache_key] = key_to_pred[cache_key][1]

            scores_array, agg_scores, seqs = task._collect_results_from_cache(prompts_list, xs, ys)
            input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = task._compute_costs(
                prompts_list, xs, ys, predictor
            )

            per_task_results.append(
                EvalResult(
                    scores=scores_array,
                    agg_scores=agg_scores,
                    sequences=seqs,
                    input_tokens=input_tokens,
                    output_tokens=output_tokens,
                    agg_input_tokens=agg_input_tokens,
                    agg_output_tokens=agg_output_tokens,
                )
            )

        stacked_scores = [r.scores for r in per_task_results]
        stacked_agg_scores = [r.agg_scores for r in per_task_results]

        # Record evaluated blocks for this evaluation (mirroring BaseTask behavior)
        for prompt in prompts_list:
            # Use self.block_idx (the MultiObjectiveTask's block_idx) if in a block strategy
            if strategy in ["sequential_block", "random_block"]:
                if isinstance(self.block_idx, list):
                    self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
                else:
                    self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
            elif strategy == "full":
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

        # Use first task's result for sequences and token counts (they're all the same across tasks)
        first_result = per_task_results[0]

        if self._scalarized_objective:
            return EvalResult(
                scores=np.mean(stacked_scores, axis=0),
                agg_scores=np.mean(stacked_agg_scores, axis=0),
                sequences=first_result.sequences,
                input_tokens=first_result.input_tokens,
                output_tokens=first_result.output_tokens,
                agg_input_tokens=first_result.agg_input_tokens,
                agg_output_tokens=first_result.agg_output_tokens,
            )

        return MultiObjectiveEvalResult(
            scores=stacked_scores,
            agg_scores=stacked_agg_scores,
            sequences=first_result.sequences,
            input_tokens=first_result.input_tokens,
            output_tokens=first_result.output_tokens,
            agg_input_tokens=first_result.agg_input_tokens,
            agg_output_tokens=first_result.agg_output_tokens,
        )

    def _evaluate(self, xs, ys, preds):  # pragma: no cover
        raise NotImplementedError("MultiObjectiveTask overrides evaluate directly")

__init__(tasks, eval_strategy=None)

Initialize with a list of tasks sharing subsampling and seed settings.

Source code in promptolution/tasks/multi_objective_task.py
def __init__(
    self,
    tasks: List[BaseTask],
    eval_strategy: Optional[EvalStrategy] = None,
) -> None:
    """Initialize with a list of tasks sharing subsampling and seed settings."""
    if not tasks:
        raise ValueError("tasks must be a non-empty list")

    primary = tasks[0]
    for t in tasks[1:]:
        assert t.n_subsamples == primary.n_subsamples, "All tasks must share n_subsamples"
        assert t.seed == primary.seed, "All tasks must share seed"
        assert t.eval_strategy == primary.eval_strategy, "All tasks must share eval_strategy"

    combined_description = "This task is a combination of the following tasks:\n" + "\n".join(
        [f"Task: {t.task_description}" for t in tasks if t.task_description]
    )

    super().__init__(
        df=primary.df,
        x_column=primary.x_column,
        y_column=primary.y_column,
        task_description=combined_description,
        n_subsamples=primary.n_subsamples,
        eval_strategy=eval_strategy or primary.eval_strategy,
        seed=primary.seed,
        config=None,
    )
    self.task_type = "multi"
    self.tasks = tasks
    self._scalarized_objective: bool = False

activate_scalarized_objective()

Force single-objective behavior by equally averaging task scores.

Source code in promptolution/tasks/multi_objective_task.py
def activate_scalarized_objective(self) -> None:
    """Force single-objective behavior by equally averaging task scores."""
    self._scalarized_objective = True

evaluate(prompts, predictor, system_prompts=None, eval_strategy=None)

Run prediction once, then score via each task's _evaluate.

Source code in promptolution/tasks/multi_objective_task.py
def evaluate(  # type: ignore
    self,
    prompts: Prompt | List[Prompt],
    predictor,
    system_prompts: Optional[str | List[str]] = None,
    eval_strategy: Optional[EvalStrategy] = None,
) -> MultiObjectiveEvalResult | EvalResult:
    """Run prediction once, then score via each task's _evaluate."""
    prompts_list: List[Prompt] = [prompts] if isinstance(prompts, Prompt) else list(prompts)
    strategy = eval_strategy or self.eval_strategy

    # Keep block alignment across tasks so block-based strategies stay in sync.
    for task in self.tasks:
        task.block_idx = self.block_idx

    xs, ys = self.subsample(eval_strategy=strategy)

    # Collect all uncached prompt/x/y triples across tasks to predict only once.
    prompts_to_evaluate: List[str] = []
    xs_to_evaluate: List[str] = []
    ys_to_evaluate: List[str] = []
    key_to_index: Dict[Tuple[str, str, str], int] = {}
    cache_keys: List[Tuple[str, str, str]] = []

    for task in self.tasks:
        t_prompts, t_xs, t_ys, t_keys = task._prepare_batch(prompts_list, xs, ys, eval_strategy=strategy)
        for prompt_str, x_val, y_val, key in zip(t_prompts, t_xs, t_ys, t_keys):
            if key in key_to_index:
                continue
            key_to_index[key] = len(prompts_to_evaluate)
            prompts_to_evaluate.append(prompt_str)
            xs_to_evaluate.append(x_val)
            ys_to_evaluate.append(y_val)
            cache_keys.append(key)

    preds: List[str] = []
    pred_seqs: List[str] = []
    if prompts_to_evaluate:
        preds, pred_seqs = predictor.predict(
            prompts=prompts_to_evaluate,
            xs=xs_to_evaluate,
            system_prompts=system_prompts,
        )

    # Map predictions back to each task and populate caches via _evaluate.
    key_to_pred: Dict[Tuple[str, str, str], Tuple[str, str]] = {
        key: (preds[idx], pred_seqs[idx]) for key, idx in key_to_index.items()
    }

    per_task_results: List[EvalResult] = []
    for task in self.tasks:
        if cache_keys:
            xs_eval = [k[1] for k in cache_keys]
            ys_eval = [k[2] for k in cache_keys]
            preds_eval = [key_to_pred[k][0] for k in cache_keys]
            scores = task._evaluate(xs_eval, ys_eval, preds_eval)
            for score, cache_key in zip(scores, cache_keys):
                task.eval_cache[cache_key] = score
                task.seq_cache[cache_key] = key_to_pred[cache_key][1]

        scores_array, agg_scores, seqs = task._collect_results_from_cache(prompts_list, xs, ys)
        input_tokens, output_tokens, agg_input_tokens, agg_output_tokens = task._compute_costs(
            prompts_list, xs, ys, predictor
        )

        per_task_results.append(
            EvalResult(
                scores=scores_array,
                agg_scores=agg_scores,
                sequences=seqs,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                agg_input_tokens=agg_input_tokens,
                agg_output_tokens=agg_output_tokens,
            )
        )

    stacked_scores = [r.scores for r in per_task_results]
    stacked_agg_scores = [r.agg_scores for r in per_task_results]

    # Record evaluated blocks for this evaluation (mirroring BaseTask behavior)
    for prompt in prompts_list:
        # Use self.block_idx (the MultiObjectiveTask's block_idx) if in a block strategy
        if strategy in ["sequential_block", "random_block"]:
            if isinstance(self.block_idx, list):
                self.prompt_evaluated_blocks.setdefault(prompt, []).extend(self.block_idx)
            else:
                self.prompt_evaluated_blocks.setdefault(prompt, []).append(self.block_idx)
        elif strategy == "full":
            self.prompt_evaluated_blocks.setdefault(prompt, []).extend(list(range(self.n_blocks)))

    # Use first task's result for sequences and token counts (they're all the same across tasks)
    first_result = per_task_results[0]

    if self._scalarized_objective:
        return EvalResult(
            scores=np.mean(stacked_scores, axis=0),
            agg_scores=np.mean(stacked_agg_scores, axis=0),
            sequences=first_result.sequences,
            input_tokens=first_result.input_tokens,
            output_tokens=first_result.output_tokens,
            agg_input_tokens=first_result.agg_input_tokens,
            agg_output_tokens=first_result.agg_output_tokens,
        )

    return MultiObjectiveEvalResult(
        scores=stacked_scores,
        agg_scores=stacked_agg_scores,
        sequences=first_result.sequences,
        input_tokens=first_result.input_tokens,
        output_tokens=first_result.output_tokens,
        agg_input_tokens=first_result.agg_input_tokens,
        agg_output_tokens=first_result.agg_output_tokens,
    )

reward_tasks

Module for Reward tasks.

RewardTask

Bases: BaseTask

A task that evaluates a predictor using a reward function.

This task takes a DataFrame, a column name for input data, and a reward function. The reward function takes in a prediction as input and returns a scalar reward.

Source code in promptolution/tasks/reward_tasks.py
class RewardTask(BaseTask):
    """A task that evaluates a predictor using a reward function.

    This task takes a DataFrame, a column name for input data, and a reward function.
    The reward function takes in a prediction as input and returns a scalar reward.
    """

    def __init__(
        self,
        df: pd.DataFrame,
        reward_function: Callable[[str], float],
        x_column: str = "x",
        y_column: Optional[str] = None,
        reward_columns: Optional[List[str]] = None,
        task_description: Optional[str] = None,
        n_subsamples: int = 30,
        eval_strategy: "EvalStrategy" = "full",
        seed: int = 42,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the RewardTask.

        Args:
            df (pd.DataFrame): Input DataFrame containing the data.
            reward_function (Callable): Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.
            x_column (str, optional): Name of the column containing input texts. Defaults to "x".
            y_column (str, optional): Name of the column containing target texts if available. Defaults to None.
            reward_columns (List[str], optional): Additional dataframe columns to pass as keyword args to reward_function.
            task_description (str, optional): Description of the task.
            n_subsamples (int, optional): Number of subsamples to use. Defaults to 30.
            eval_strategy (str, optional): Subsampling strategy to use. Defaults to "full".
            seed (int, optional): Random seed for reproducibility. Defaults to 42.
            config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
        """
        self.reward_function = reward_function
        self.reward_columns = reward_columns or []
        super().__init__(
            df=df,
            x_column=x_column,
            y_column=y_column,
            task_description=task_description,
            n_subsamples=n_subsamples,
            eval_strategy=eval_strategy,
            seed=seed,
            config=config,
        )
        self.task_type = "reward"
        # x -> kwargs to reward function
        km = self.df.set_index(x_column)[self.reward_columns].to_dict("index")
        self.kwargs_map = defaultdict(dict, km)

    def _evaluate(self, xs: List[str], ys: List[str], preds: List[str]) -> np.ndarray:
        """Calculate reward for each prediction, passing configured columns as kwargs."""
        kwargs_list = [self.kwargs_map[x] for x in xs]
        rewards = [self.reward_function(pred, **kwargs) for pred, kwargs in zip(preds, kwargs_list)]
        return np.asarray(rewards, dtype=float)

__init__(df, reward_function, x_column='x', y_column=None, reward_columns=None, task_description=None, n_subsamples=30, eval_strategy='full', seed=42, config=None)

Initialize the RewardTask.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing the data.

required
reward_function Callable

Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.

required
x_column str

Name of the column containing input texts. Defaults to "x".

'x'
y_column str

Name of the column containing target texts if available. Defaults to None.

None
reward_columns List[str]

Additional dataframe columns to pass as keyword args to reward_function.

None
task_description str

Description of the task.

None
n_subsamples int

Number of subsamples to use. Defaults to 30.

30
eval_strategy str

Subsampling strategy to use. Defaults to "full".

'full'
seed int

Random seed for reproducibility. Defaults to 42.

42
config ExperimentConfig

Configuration for the task, overriding defaults.

None
Source code in promptolution/tasks/reward_tasks.py
def __init__(
    self,
    df: pd.DataFrame,
    reward_function: Callable[[str], float],
    x_column: str = "x",
    y_column: Optional[str] = None,
    reward_columns: Optional[List[str]] = None,
    task_description: Optional[str] = None,
    n_subsamples: int = 30,
    eval_strategy: "EvalStrategy" = "full",
    seed: int = 42,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the RewardTask.

    Args:
        df (pd.DataFrame): Input DataFrame containing the data.
        reward_function (Callable): Function that takes a prediction, potential keyword arguments from the dataframe, and returns a reward score. Note: The optimizers aim to maximize.
        x_column (str, optional): Name of the column containing input texts. Defaults to "x".
        y_column (str, optional): Name of the column containing target texts if available. Defaults to None.
        reward_columns (List[str], optional): Additional dataframe columns to pass as keyword args to reward_function.
        task_description (str, optional): Description of the task.
        n_subsamples (int, optional): Number of subsamples to use. Defaults to 30.
        eval_strategy (str, optional): Subsampling strategy to use. Defaults to "full".
        seed (int, optional): Random seed for reproducibility. Defaults to 42.
        config (ExperimentConfig, optional): Configuration for the task, overriding defaults.
    """
    self.reward_function = reward_function
    self.reward_columns = reward_columns or []
    super().__init__(
        df=df,
        x_column=x_column,
        y_column=y_column,
        task_description=task_description,
        n_subsamples=n_subsamples,
        eval_strategy=eval_strategy,
        seed=seed,
        config=config,
    )
    self.task_type = "reward"
    # x -> kwargs to reward function
    km = self.df.set_index(x_column)[self.reward_columns].to_dict("index")
    self.kwargs_map = defaultdict(dict, km)