Skip to content

LLMs

Module for Large Language Models.

APILLM

Bases: BaseLLM

Persistent asynchronous LLM wrapper using a background event loop.

Source code in promptolution/llms/api_llm.py
class APILLM(BaseLLM):
    """Persistent asynchronous LLM wrapper using a background event loop."""

    def __init__(
        self,
        api_url: Optional[str] = None,
        model_id: Optional[str] = None,
        api_key: Optional[str] = None,
        max_concurrent_calls: int = 32,
        max_tokens: int = 4096,
        call_timeout_s: float = 200.0,  # per request
        gather_timeout_s: float = 500.0,  # whole batch
        max_retries: int = 5,
        retry_base_delay_s: float = 1,
        client_kwargs: Optional[Dict[str, Any]] = None,
        call_kwargs: Optional[Dict[str, Any]] = None,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the APILLM.

        Args:
            api_url (Optional[str]): Base URL for the API endpoint.
            model_id (Optional[str]): Identifier of the model to call. Must be set.
            api_key (Optional[str]): API key/token for authentication.
            max_concurrent_calls (int): Maximum number of concurrent API calls.
            max_tokens (int): Default maximum number of tokens in model responses.
            call_timeout_s (float): Per-call timeout in seconds.
            gather_timeout_s (float): Timeout in seconds for the entire batch.
            max_retries (int): Number of retry attempts per prompt in addition to the initial call.
            retry_base_delay_s (float): Base delay in seconds for exponential backoff between retries.
            client_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `AsyncOpenAI(...)`.
            call_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `client.chat.completions.create(...)`.
            config (Optional[ExperimentConfig]): Configuration for the LLM, overriding defaults.
        """
        self.api_url = api_url
        self.model_id = model_id
        self.api_key = api_key
        self.max_tokens = max_tokens
        self.call_timeout_s = call_timeout_s
        self.gather_timeout_s = gather_timeout_s
        self.max_retries = max_retries
        self.retry_base_delay_s = retry_base_delay_s

        # extra kwargs
        self._client_kwargs: Dict[str, Any] = dict(client_kwargs or {})
        self._call_kwargs: Dict[str, Any] = dict(call_kwargs or {})

        self.max_concurrent_calls = max_concurrent_calls
        super().__init__(config=config)

        # --- persistent loop + semaphore ---
        self._loop = asyncio.new_event_loop()
        self._sem = asyncio.Semaphore(self.max_concurrent_calls)

        def _run_loop() -> None:
            """Run the background event loop forever."""
            asyncio.set_event_loop(self._loop)
            self._loop.run_forever()

        self._thread = threading.Thread(target=_run_loop, name="APILLMLoop", daemon=True)
        self._thread.start()

        # Create client once; can still be customised via client_kwargs.
        self.client = AsyncOpenAI(
            base_url=self.api_url,
            api_key=self.api_key,
            timeout=self.call_timeout_s,
            **self._client_kwargs,
        )

    # ---------- async bits that run inside the loop ----------
    async def _ainvoke_once(self, prompt: str, system_prompt: str) -> ChatCompletion:
        """Perform a single API call with a per-call timeout.

        Args:
            prompt (str): User prompt content.
            system_prompt (str): System-level instructions for the model.

        Returns:
            ChatCompletion: Raw completion response from the API.

        Raises:
            asyncio.TimeoutError: If the call exceeds `call_timeout_s`.
            Exception: Any exception raised by the underlying client call.
        """
        messages = [
            {"role": "system", "content": str(system_prompt)},
            {"role": "user", "content": str(prompt)},
        ]

        # base kwargs; user can override via call_kwargs
        kwargs: Dict[str, Any] = {
            "model": self.model_id,
            "messages": messages,
            "max_tokens": self.max_tokens,
        }
        kwargs.update(self._call_kwargs)

        async with self._sem:
            # per-call timeout enforces failure instead of hang
            return await asyncio.wait_for(
                self.client.chat.completions.create(**kwargs),
                timeout=self.call_timeout_s,
            )

    async def _ainvoke_with_retries(self, prompt: str, system_prompt: str) -> str:
        """Invoke the model with retries and exponential backoff.

        Args:
            prompt (str): User prompt content.
            system_prompt (str): System-level instructions for the model.

        Returns:
            str: The message content of the first choice in the completion.

        Raises:
            Exception: The last exception encountered after all retries are exhausted.
        """
        last_err: Optional[Exception] = None
        for attempt in range(self.max_retries + 1):
            try:
                r = await self._ainvoke_once(prompt, system_prompt)
                content = r.choices[0].message.content
                if content is None:
                    raise RuntimeError("Empty content from model")
                return content
            except Exception as e:
                last_err = e
                if attempt < self.max_retries:
                    delay = self.retry_base_delay_s * (2**attempt)
                    logger.error(
                        f"LLM call failed ({attempt + 1}/{self.max_retries + 1}): — retrying in {delay}s", exc_info=e
                    )
                    await asyncio.sleep(delay)
        assert last_err is not None
        raise last_err

    async def _aget_batch(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Execute a batch of prompts concurrently and collect responses.

        Args:
            prompts (List[str]): List of user prompts.
            system_prompts (List[str]): List of system prompts; must match `prompts` in length.

        Returns:
            List[str]: List of model outputs. For failed entries, an empty string is inserted.

        Raises:
            TimeoutError: If the entire batch exceeds `gather_timeout_s`.
            RuntimeError: If any of the tasks fails; the first exception is propagated.
        """
        tasks = [asyncio.create_task(self._ainvoke_with_retries(p, s)) for p, s in zip(prompts, system_prompts)]

        try:
            results = await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=self.gather_timeout_s,
            )
        except asyncio.TimeoutError:
            for t in tasks:
                t.cancel()
            raise TimeoutError(f"LLM batch timed out after {self.gather_timeout_s}s")

        outs: List[str] = []
        first_exc: Optional[BaseException] = None
        for r in results:
            if isinstance(r, BaseException):
                if first_exc is None:
                    first_exc = r
                outs.append("")
            else:
                outs.append(r)

        if first_exc:
            for t in tasks:
                if not t.done():
                    t.cancel()
            raise RuntimeError(f"LLM batch failed: {first_exc}") from first_exc

        return outs

    # ---------- sync API used by the threads ----------
    def _submit(self, coro):
        """Submit a coroutine to the background event loop.

        Args:
            coro: Coroutine object to be scheduled on the loop.

        Returns:
            concurrent.futures.Future: Future representing the coroutine result.
        """
        return asyncio.run_coroutine_threadsafe(coro, self._loop)

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Synchronously obtain responses for a batch of prompts.

        This is the main entrypoint used by external callers. It handles system
        prompt broadcasting and delegates the actual work to the async batch
        execution on the background loop.

        Args:
            prompts (List[str]): List of user prompts.
            system_prompts (List[str]): List of system prompts. If a single system
                prompt is provided and multiple prompts are given, the system
                prompt is broadcast to all prompts. Otherwise, the list is
                normalized to match the length of `prompts`.

        Returns:
            List[str]: List of model responses corresponding to `prompts`.

        Raises:
            TimeoutError: If waiting on the batch future exceeds `gather_timeout_s + 5.0`.
            Exception: Any underlying error from the async batch execution.
        """
        fut = self._submit(self._aget_batch(prompts, system_prompts))
        try:
            r = fut.result(timeout=self.gather_timeout_s + 5.0)
            return r
        except FuturesTimeout:
            fut.cancel()
            raise TimeoutError(f"LLM batch (future) timed out after {self.gather_timeout_s + 5.0}s")
        except Exception:
            raise

__init__(api_url=None, model_id=None, api_key=None, max_concurrent_calls=32, max_tokens=4096, call_timeout_s=200.0, gather_timeout_s=500.0, max_retries=5, retry_base_delay_s=1, client_kwargs=None, call_kwargs=None, config=None)

Initialize the APILLM.

Parameters:

Name Type Description Default
api_url Optional[str]

Base URL for the API endpoint.

None
model_id Optional[str]

Identifier of the model to call. Must be set.

None
api_key Optional[str]

API key/token for authentication.

None
max_concurrent_calls int

Maximum number of concurrent API calls.

32
max_tokens int

Default maximum number of tokens in model responses.

4096
call_timeout_s float

Per-call timeout in seconds.

200.0
gather_timeout_s float

Timeout in seconds for the entire batch.

500.0
max_retries int

Number of retry attempts per prompt in addition to the initial call.

5
retry_base_delay_s float

Base delay in seconds for exponential backoff between retries.

1
client_kwargs Optional[Dict[str, Any]]

Additional keyword arguments passed to AsyncOpenAI(...).

None
call_kwargs Optional[Dict[str, Any]]

Additional keyword arguments passed to client.chat.completions.create(...).

None
config Optional[ExperimentConfig]

Configuration for the LLM, overriding defaults.

None
Source code in promptolution/llms/api_llm.py
def __init__(
    self,
    api_url: Optional[str] = None,
    model_id: Optional[str] = None,
    api_key: Optional[str] = None,
    max_concurrent_calls: int = 32,
    max_tokens: int = 4096,
    call_timeout_s: float = 200.0,  # per request
    gather_timeout_s: float = 500.0,  # whole batch
    max_retries: int = 5,
    retry_base_delay_s: float = 1,
    client_kwargs: Optional[Dict[str, Any]] = None,
    call_kwargs: Optional[Dict[str, Any]] = None,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the APILLM.

    Args:
        api_url (Optional[str]): Base URL for the API endpoint.
        model_id (Optional[str]): Identifier of the model to call. Must be set.
        api_key (Optional[str]): API key/token for authentication.
        max_concurrent_calls (int): Maximum number of concurrent API calls.
        max_tokens (int): Default maximum number of tokens in model responses.
        call_timeout_s (float): Per-call timeout in seconds.
        gather_timeout_s (float): Timeout in seconds for the entire batch.
        max_retries (int): Number of retry attempts per prompt in addition to the initial call.
        retry_base_delay_s (float): Base delay in seconds for exponential backoff between retries.
        client_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `AsyncOpenAI(...)`.
        call_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `client.chat.completions.create(...)`.
        config (Optional[ExperimentConfig]): Configuration for the LLM, overriding defaults.
    """
    self.api_url = api_url
    self.model_id = model_id
    self.api_key = api_key
    self.max_tokens = max_tokens
    self.call_timeout_s = call_timeout_s
    self.gather_timeout_s = gather_timeout_s
    self.max_retries = max_retries
    self.retry_base_delay_s = retry_base_delay_s

    # extra kwargs
    self._client_kwargs: Dict[str, Any] = dict(client_kwargs or {})
    self._call_kwargs: Dict[str, Any] = dict(call_kwargs or {})

    self.max_concurrent_calls = max_concurrent_calls
    super().__init__(config=config)

    # --- persistent loop + semaphore ---
    self._loop = asyncio.new_event_loop()
    self._sem = asyncio.Semaphore(self.max_concurrent_calls)

    def _run_loop() -> None:
        """Run the background event loop forever."""
        asyncio.set_event_loop(self._loop)
        self._loop.run_forever()

    self._thread = threading.Thread(target=_run_loop, name="APILLMLoop", daemon=True)
    self._thread.start()

    # Create client once; can still be customised via client_kwargs.
    self.client = AsyncOpenAI(
        base_url=self.api_url,
        api_key=self.api_key,
        timeout=self.call_timeout_s,
        **self._client_kwargs,
    )

LocalLLM

Bases: BaseLLM

A class for running language models locally using the Hugging Face Transformers library.

This class sets up a text generation pipeline with specified model parameters and provides a method to generate responses for given prompts.

Attributes:

Name Type Description
pipeline Pipeline

The text generation pipeline.

Methods:

Name Description
get_response

Generate responses for a list of prompts.

Source code in promptolution/llms/local_llm.py
class LocalLLM(BaseLLM):
    """A class for running language models locally using the Hugging Face Transformers library.

    This class sets up a text generation pipeline with specified model parameters
    and provides a method to generate responses for given prompts.

    Attributes:
        pipeline (transformers.Pipeline): The text generation pipeline.

    Methods:
        get_response: Generate responses for a list of prompts.
    """

    def __init__(self, model_id: str, batch_size: int = 8, config: Optional["ExperimentConfig"] = None) -> None:
        """Initialize the LocalLLM with a specific model.

        Args:
            model_id (str): The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").
            batch_size (int, optional): The batch size for text generation. Defaults to 8.
            config (ExperimentConfig, optional): "ExperimentConfig" overwriting defaults.

        Note:
            This method sets up a text generation pipeline with bfloat16 precision,
            automatic device mapping, and specific generation parameters.
        """
        if not imports_successful:
            raise ImportError(
                "Could not import at least one of the required libraries: torch, transformers. "
                "Please ensure they are installed in your environment."
            )
        self.pipeline: Pipeline = pipeline(
            "text-generation",
            model=model_id,
            model_kwargs={"torch_dtype": torch.bfloat16},
            device_map="auto",
            max_new_tokens=256,
            batch_size=batch_size,
            num_return_sequences=1,
            return_full_text=False,
        )
        super().__init__(config)
        self.tokenizer = self.pipeline.tokenizer
        assert self.tokenizer is not None, "Tokenizer must be initialized."
        self.eos_token_id = self.tokenizer.eos_token_id
        self.tokenizer.pad_token_id = self.eos_token_id
        self.tokenizer.padding_side = "left"

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Generate responses for a list of prompts using the local language model.

        Args:
            prompts (list[str]): A list of input prompts.
            system_prompts (list[str]): A list of system prompts to guide the model's behavior.

        Returns:
            list[str]: A list of generated responses corresponding to the input prompts.

        Note:
            This method uses torch.no_grad() for inference to reduce memory usage.
            It handles both single and batch inputs, ensuring consistent output format.
        """
        inputs: List[List[Dict[str, str]]] = []
        for prompt, sys_prompt in zip(prompts, system_prompts):
            inputs.append([{"role": "system", "prompt": sys_prompt}, {"role": "user", "prompt": prompt}])

        with torch.no_grad():
            response = self.pipeline(inputs, pad_token_id=self.eos_token_id)

        if len(response) != 1:
            response = [r[0] if isinstance(r, list) else r for r in response]

        response = [r["generated_text"] for r in response]
        return response

    def __del__(self) -> None:
        """Cleanup method to delete the pipeline and free up GPU memory."""
        if hasattr(self, "pipeline"):
            del self.pipeline
        if "torch" in globals() and hasattr(torch, "cuda") and torch.cuda.is_available():
            torch.cuda.empty_cache()

__del__()

Cleanup method to delete the pipeline and free up GPU memory.

Source code in promptolution/llms/local_llm.py
def __del__(self) -> None:
    """Cleanup method to delete the pipeline and free up GPU memory."""
    if hasattr(self, "pipeline"):
        del self.pipeline
    if "torch" in globals() and hasattr(torch, "cuda") and torch.cuda.is_available():
        torch.cuda.empty_cache()

__init__(model_id, batch_size=8, config=None)

Initialize the LocalLLM with a specific model.

Parameters:

Name Type Description Default
model_id str

The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").

required
batch_size int

The batch size for text generation. Defaults to 8.

8
config ExperimentConfig

"ExperimentConfig" overwriting defaults.

None
Note

This method sets up a text generation pipeline with bfloat16 precision, automatic device mapping, and specific generation parameters.

Source code in promptolution/llms/local_llm.py
def __init__(self, model_id: str, batch_size: int = 8, config: Optional["ExperimentConfig"] = None) -> None:
    """Initialize the LocalLLM with a specific model.

    Args:
        model_id (str): The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").
        batch_size (int, optional): The batch size for text generation. Defaults to 8.
        config (ExperimentConfig, optional): "ExperimentConfig" overwriting defaults.

    Note:
        This method sets up a text generation pipeline with bfloat16 precision,
        automatic device mapping, and specific generation parameters.
    """
    if not imports_successful:
        raise ImportError(
            "Could not import at least one of the required libraries: torch, transformers. "
            "Please ensure they are installed in your environment."
        )
    self.pipeline: Pipeline = pipeline(
        "text-generation",
        model=model_id,
        model_kwargs={"torch_dtype": torch.bfloat16},
        device_map="auto",
        max_new_tokens=256,
        batch_size=batch_size,
        num_return_sequences=1,
        return_full_text=False,
    )
    super().__init__(config)
    self.tokenizer = self.pipeline.tokenizer
    assert self.tokenizer is not None, "Tokenizer must be initialized."
    self.eos_token_id = self.tokenizer.eos_token_id
    self.tokenizer.pad_token_id = self.eos_token_id
    self.tokenizer.padding_side = "left"

VLLM

Bases: BaseLLM

A class for running language models using the vLLM library.

This class sets up a vLLM inference engine with specified model parameters and provides a method to generate responses for given prompts.

Attributes:

Name Type Description
llm LLM

The vLLM inference engine.

tokenizer PreTrainedTokenizer

The tokenizer for the model.

sampling_params SamplingParams

Parameters for text generation.

Methods:

Name Description
get_response

Generate responses for a list of prompts.

update_token_count

Update the token count based on the given inputs and outputs.

Source code in promptolution/llms/vllm.py
class VLLM(BaseLLM):
    """A class for running language models using the vLLM library.

    This class sets up a vLLM inference engine with specified model parameters
    and provides a method to generate responses for given prompts.

    Attributes:
        llm (vllm.LLM): The vLLM inference engine.
        tokenizer (PreTrainedTokenizer): The tokenizer for the model.
        sampling_params (vllm.SamplingParams): Parameters for text generation.

    Methods:
        get_response: Generate responses for a list of prompts.
        update_token_count: Update the token count based on the given inputs and outputs.
    """

    tokenizer: "PreTrainedTokenizer"

    def __init__(
        self,
        model_id: str,
        batch_size: Optional[int] = None,
        max_generated_tokens: int = 256,
        temperature: float = 0.1,
        top_p: float = 0.9,
        model_storage_path: Optional[str] = None,
        dtype: str = "auto",
        tensor_parallel_size: int = 1,
        gpu_memory_utilization: float = 0.95,
        max_model_len: int = 2048,
        trust_remote_code: bool = False,
        seed: int = 42,
        llm_kwargs: Optional[Dict[str, Any]] = None,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the VLLM with a specific model.

        Args:
            model_id (str): The identifier of the model to use.
            batch_size (int, optional): The batch size for text generation. Defaults to 8.
            max_generated_tokens (int, optional): Maximum number of tokens to generate. Defaults to 256.
            temperature (float, optional): Sampling temperature. Defaults to 0.1.
            top_p (float, optional): Top-p sampling parameter. Defaults to 0.9.
            model_storage_path (str, optional): Directory to store the model. Defaults to None.
            dtype (str, optional): Data type for model weights. Defaults to "float16".
            tensor_parallel_size (int, optional): Number of GPUs for tensor parallelism. Defaults to 1.
            gpu_memory_utilization (float, optional): Fraction of GPU memory to use. Defaults to 0.95.
            max_model_len (int, optional): Maximum sequence length for the model. Defaults to 2048.
            trust_remote_code (bool, optional): Whether to trust remote code. Defaults to False.
            seed (int, optional): Random seed for the model. Defaults to 42.
            llm_kwargs (dict, optional): Additional keyword arguments for the LLM. Defaults to None.
            config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.

        Note:
            This method sets up a vLLM engine with specified parameters for efficient inference.
        """
        if not imports_successful:
            raise ImportError(
                "Could not import at least one of the required libraries: transformers, vllm. "
                "Please ensure they are installed in your environment."
            )

        self.dtype = dtype
        self.tensor_parallel_size = tensor_parallel_size
        self.gpu_memory_utilization = gpu_memory_utilization
        self.max_model_len = max_model_len
        self.trust_remote_code = trust_remote_code

        super().__init__(config)

        # Configure sampling parameters
        self.sampling_params = SamplingParams(
            temperature=temperature, top_p=top_p, max_tokens=max_generated_tokens, seed=seed
        )

        llm_kwargs = llm_kwargs or {}
        # Initialize the vLLM engine with both explicit parameters and any additional kwargs
        llm_params: Dict[str, Any] = {
            "model": model_id,
            "tokenizer": model_id,
            "dtype": self.dtype,
            "tensor_parallel_size": self.tensor_parallel_size,
            "gpu_memory_utilization": self.gpu_memory_utilization,
            "max_model_len": self.max_model_len,
            "download_dir": model_storage_path,
            "trust_remote_code": self.trust_remote_code,
            "seed": seed,
            **llm_kwargs,
        }

        self.llm = LLM(**llm_params)

        # Initialize tokenizer separately for potential pre-processing
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

        if batch_size is None:
            cache_config = self.llm.llm_engine.model_executor.cache_config
            if (
                cache_config.num_gpu_blocks is not None
                and cache_config.block_size is not None
                and self.max_model_len is not None
            ):
                self.batch_size = int(
                    (cache_config.num_gpu_blocks * cache_config.block_size / self.max_model_len) * 0.95
                )
                logger.info(f"🚀 Batch size set to {self.batch_size} based on GPU memory.")
            else:
                self.batch_size = 1
                logger.warning("⚠️ Could not determine batch size from GPU memory. Using batch size of 1.")
        else:
            self.batch_size = batch_size

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Generate responses for a list of prompts using the vLLM engine.

        Args:
            prompts (list[str]): A list of input prompts.
            system_prompts (list[str]): A list of system prompts to guide the model's behavior.

        Returns:
            list[str]: A list of generated responses corresponding to the input prompts.

        Note:
            This method uses vLLM's batched generation capabilities for efficient inference.
            It also counts input and output tokens.
        """
        prompts = [
            str(
                self.tokenizer.apply_chat_template(
                    [
                        {
                            "role": "system",
                            "content": sys_prompt,
                        },
                        {"role": "user", "content": prompt},
                    ],
                    tokenize=False,
                    add_generation_prompt=True,
                )
            )
            for prompt, sys_prompt in zip(prompts, system_prompts)
        ]

        # generate responses for self.batch_size prompts at the same time
        all_responses = []
        for i in range(0, len(prompts), self.batch_size):
            batch = prompts[i : i + self.batch_size]
            outputs = self.llm.generate(batch, self.sampling_params)
            responses = [output.outputs[0].text for output in outputs]

            all_responses.extend(responses)

        return all_responses

    def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
        """Update the token count based on the given inputs and outputs.

            Uses the tokenizer to count the tokens.

        Args:
            inputs (List[str]): A list of input prompts.
            outputs (List[str]): A list of generated responses.
        """
        for input in inputs:
            self.input_token_count += len(self.tokenizer.encode(input))

        for output in outputs:
            self.output_token_count += len(self.tokenizer.encode(output))

    def set_generation_seed(self, seed: int) -> None:
        """Set the random seed for text generation.

        Args:
            seed (int): Random seed for text generation.
        """
        self.sampling_params.seed = seed

__init__(model_id, batch_size=None, max_generated_tokens=256, temperature=0.1, top_p=0.9, model_storage_path=None, dtype='auto', tensor_parallel_size=1, gpu_memory_utilization=0.95, max_model_len=2048, trust_remote_code=False, seed=42, llm_kwargs=None, config=None)

Initialize the VLLM with a specific model.

Parameters:

Name Type Description Default
model_id str

The identifier of the model to use.

required
batch_size int

The batch size for text generation. Defaults to 8.

None
max_generated_tokens int

Maximum number of tokens to generate. Defaults to 256.

256
temperature float

Sampling temperature. Defaults to 0.1.

0.1
top_p float

Top-p sampling parameter. Defaults to 0.9.

0.9
model_storage_path str

Directory to store the model. Defaults to None.

None
dtype str

Data type for model weights. Defaults to "float16".

'auto'
tensor_parallel_size int

Number of GPUs for tensor parallelism. Defaults to 1.

1
gpu_memory_utilization float

Fraction of GPU memory to use. Defaults to 0.95.

0.95
max_model_len int

Maximum sequence length for the model. Defaults to 2048.

2048
trust_remote_code bool

Whether to trust remote code. Defaults to False.

False
seed int

Random seed for the model. Defaults to 42.

42
llm_kwargs dict

Additional keyword arguments for the LLM. Defaults to None.

None
config ExperimentConfig

Configuration for the LLM, overriding defaults.

None
Note

This method sets up a vLLM engine with specified parameters for efficient inference.

Source code in promptolution/llms/vllm.py
def __init__(
    self,
    model_id: str,
    batch_size: Optional[int] = None,
    max_generated_tokens: int = 256,
    temperature: float = 0.1,
    top_p: float = 0.9,
    model_storage_path: Optional[str] = None,
    dtype: str = "auto",
    tensor_parallel_size: int = 1,
    gpu_memory_utilization: float = 0.95,
    max_model_len: int = 2048,
    trust_remote_code: bool = False,
    seed: int = 42,
    llm_kwargs: Optional[Dict[str, Any]] = None,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the VLLM with a specific model.

    Args:
        model_id (str): The identifier of the model to use.
        batch_size (int, optional): The batch size for text generation. Defaults to 8.
        max_generated_tokens (int, optional): Maximum number of tokens to generate. Defaults to 256.
        temperature (float, optional): Sampling temperature. Defaults to 0.1.
        top_p (float, optional): Top-p sampling parameter. Defaults to 0.9.
        model_storage_path (str, optional): Directory to store the model. Defaults to None.
        dtype (str, optional): Data type for model weights. Defaults to "float16".
        tensor_parallel_size (int, optional): Number of GPUs for tensor parallelism. Defaults to 1.
        gpu_memory_utilization (float, optional): Fraction of GPU memory to use. Defaults to 0.95.
        max_model_len (int, optional): Maximum sequence length for the model. Defaults to 2048.
        trust_remote_code (bool, optional): Whether to trust remote code. Defaults to False.
        seed (int, optional): Random seed for the model. Defaults to 42.
        llm_kwargs (dict, optional): Additional keyword arguments for the LLM. Defaults to None.
        config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.

    Note:
        This method sets up a vLLM engine with specified parameters for efficient inference.
    """
    if not imports_successful:
        raise ImportError(
            "Could not import at least one of the required libraries: transformers, vllm. "
            "Please ensure they are installed in your environment."
        )

    self.dtype = dtype
    self.tensor_parallel_size = tensor_parallel_size
    self.gpu_memory_utilization = gpu_memory_utilization
    self.max_model_len = max_model_len
    self.trust_remote_code = trust_remote_code

    super().__init__(config)

    # Configure sampling parameters
    self.sampling_params = SamplingParams(
        temperature=temperature, top_p=top_p, max_tokens=max_generated_tokens, seed=seed
    )

    llm_kwargs = llm_kwargs or {}
    # Initialize the vLLM engine with both explicit parameters and any additional kwargs
    llm_params: Dict[str, Any] = {
        "model": model_id,
        "tokenizer": model_id,
        "dtype": self.dtype,
        "tensor_parallel_size": self.tensor_parallel_size,
        "gpu_memory_utilization": self.gpu_memory_utilization,
        "max_model_len": self.max_model_len,
        "download_dir": model_storage_path,
        "trust_remote_code": self.trust_remote_code,
        "seed": seed,
        **llm_kwargs,
    }

    self.llm = LLM(**llm_params)

    # Initialize tokenizer separately for potential pre-processing
    self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    if batch_size is None:
        cache_config = self.llm.llm_engine.model_executor.cache_config
        if (
            cache_config.num_gpu_blocks is not None
            and cache_config.block_size is not None
            and self.max_model_len is not None
        ):
            self.batch_size = int(
                (cache_config.num_gpu_blocks * cache_config.block_size / self.max_model_len) * 0.95
            )
            logger.info(f"🚀 Batch size set to {self.batch_size} based on GPU memory.")
        else:
            self.batch_size = 1
            logger.warning("⚠️ Could not determine batch size from GPU memory. Using batch size of 1.")
    else:
        self.batch_size = batch_size

set_generation_seed(seed)

Set the random seed for text generation.

Parameters:

Name Type Description Default
seed int

Random seed for text generation.

required
Source code in promptolution/llms/vllm.py
def set_generation_seed(self, seed: int) -> None:
    """Set the random seed for text generation.

    Args:
        seed (int): Random seed for text generation.
    """
    self.sampling_params.seed = seed

update_token_count(inputs, outputs)

Update the token count based on the given inputs and outputs.

Uses the tokenizer to count the tokens.

Parameters:

Name Type Description Default
inputs List[str]

A list of input prompts.

required
outputs List[str]

A list of generated responses.

required
Source code in promptolution/llms/vllm.py
def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
    """Update the token count based on the given inputs and outputs.

        Uses the tokenizer to count the tokens.

    Args:
        inputs (List[str]): A list of input prompts.
        outputs (List[str]): A list of generated responses.
    """
    for input in inputs:
        self.input_token_count += len(self.tokenizer.encode(input))

    for output in outputs:
        self.output_token_count += len(self.tokenizer.encode(output))

api_llm

Module to interface with various language models through their respective APIs.

APILLM

Bases: BaseLLM

Persistent asynchronous LLM wrapper using a background event loop.

Source code in promptolution/llms/api_llm.py
class APILLM(BaseLLM):
    """Persistent asynchronous LLM wrapper using a background event loop."""

    def __init__(
        self,
        api_url: Optional[str] = None,
        model_id: Optional[str] = None,
        api_key: Optional[str] = None,
        max_concurrent_calls: int = 32,
        max_tokens: int = 4096,
        call_timeout_s: float = 200.0,  # per request
        gather_timeout_s: float = 500.0,  # whole batch
        max_retries: int = 5,
        retry_base_delay_s: float = 1,
        client_kwargs: Optional[Dict[str, Any]] = None,
        call_kwargs: Optional[Dict[str, Any]] = None,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the APILLM.

        Args:
            api_url (Optional[str]): Base URL for the API endpoint.
            model_id (Optional[str]): Identifier of the model to call. Must be set.
            api_key (Optional[str]): API key/token for authentication.
            max_concurrent_calls (int): Maximum number of concurrent API calls.
            max_tokens (int): Default maximum number of tokens in model responses.
            call_timeout_s (float): Per-call timeout in seconds.
            gather_timeout_s (float): Timeout in seconds for the entire batch.
            max_retries (int): Number of retry attempts per prompt in addition to the initial call.
            retry_base_delay_s (float): Base delay in seconds for exponential backoff between retries.
            client_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `AsyncOpenAI(...)`.
            call_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `client.chat.completions.create(...)`.
            config (Optional[ExperimentConfig]): Configuration for the LLM, overriding defaults.
        """
        self.api_url = api_url
        self.model_id = model_id
        self.api_key = api_key
        self.max_tokens = max_tokens
        self.call_timeout_s = call_timeout_s
        self.gather_timeout_s = gather_timeout_s
        self.max_retries = max_retries
        self.retry_base_delay_s = retry_base_delay_s

        # extra kwargs
        self._client_kwargs: Dict[str, Any] = dict(client_kwargs or {})
        self._call_kwargs: Dict[str, Any] = dict(call_kwargs or {})

        self.max_concurrent_calls = max_concurrent_calls
        super().__init__(config=config)

        # --- persistent loop + semaphore ---
        self._loop = asyncio.new_event_loop()
        self._sem = asyncio.Semaphore(self.max_concurrent_calls)

        def _run_loop() -> None:
            """Run the background event loop forever."""
            asyncio.set_event_loop(self._loop)
            self._loop.run_forever()

        self._thread = threading.Thread(target=_run_loop, name="APILLMLoop", daemon=True)
        self._thread.start()

        # Create client once; can still be customised via client_kwargs.
        self.client = AsyncOpenAI(
            base_url=self.api_url,
            api_key=self.api_key,
            timeout=self.call_timeout_s,
            **self._client_kwargs,
        )

    # ---------- async bits that run inside the loop ----------
    async def _ainvoke_once(self, prompt: str, system_prompt: str) -> ChatCompletion:
        """Perform a single API call with a per-call timeout.

        Args:
            prompt (str): User prompt content.
            system_prompt (str): System-level instructions for the model.

        Returns:
            ChatCompletion: Raw completion response from the API.

        Raises:
            asyncio.TimeoutError: If the call exceeds `call_timeout_s`.
            Exception: Any exception raised by the underlying client call.
        """
        messages = [
            {"role": "system", "content": str(system_prompt)},
            {"role": "user", "content": str(prompt)},
        ]

        # base kwargs; user can override via call_kwargs
        kwargs: Dict[str, Any] = {
            "model": self.model_id,
            "messages": messages,
            "max_tokens": self.max_tokens,
        }
        kwargs.update(self._call_kwargs)

        async with self._sem:
            # per-call timeout enforces failure instead of hang
            return await asyncio.wait_for(
                self.client.chat.completions.create(**kwargs),
                timeout=self.call_timeout_s,
            )

    async def _ainvoke_with_retries(self, prompt: str, system_prompt: str) -> str:
        """Invoke the model with retries and exponential backoff.

        Args:
            prompt (str): User prompt content.
            system_prompt (str): System-level instructions for the model.

        Returns:
            str: The message content of the first choice in the completion.

        Raises:
            Exception: The last exception encountered after all retries are exhausted.
        """
        last_err: Optional[Exception] = None
        for attempt in range(self.max_retries + 1):
            try:
                r = await self._ainvoke_once(prompt, system_prompt)
                content = r.choices[0].message.content
                if content is None:
                    raise RuntimeError("Empty content from model")
                return content
            except Exception as e:
                last_err = e
                if attempt < self.max_retries:
                    delay = self.retry_base_delay_s * (2**attempt)
                    logger.error(
                        f"LLM call failed ({attempt + 1}/{self.max_retries + 1}): — retrying in {delay}s", exc_info=e
                    )
                    await asyncio.sleep(delay)
        assert last_err is not None
        raise last_err

    async def _aget_batch(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Execute a batch of prompts concurrently and collect responses.

        Args:
            prompts (List[str]): List of user prompts.
            system_prompts (List[str]): List of system prompts; must match `prompts` in length.

        Returns:
            List[str]: List of model outputs. For failed entries, an empty string is inserted.

        Raises:
            TimeoutError: If the entire batch exceeds `gather_timeout_s`.
            RuntimeError: If any of the tasks fails; the first exception is propagated.
        """
        tasks = [asyncio.create_task(self._ainvoke_with_retries(p, s)) for p, s in zip(prompts, system_prompts)]

        try:
            results = await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=self.gather_timeout_s,
            )
        except asyncio.TimeoutError:
            for t in tasks:
                t.cancel()
            raise TimeoutError(f"LLM batch timed out after {self.gather_timeout_s}s")

        outs: List[str] = []
        first_exc: Optional[BaseException] = None
        for r in results:
            if isinstance(r, BaseException):
                if first_exc is None:
                    first_exc = r
                outs.append("")
            else:
                outs.append(r)

        if first_exc:
            for t in tasks:
                if not t.done():
                    t.cancel()
            raise RuntimeError(f"LLM batch failed: {first_exc}") from first_exc

        return outs

    # ---------- sync API used by the threads ----------
    def _submit(self, coro):
        """Submit a coroutine to the background event loop.

        Args:
            coro: Coroutine object to be scheduled on the loop.

        Returns:
            concurrent.futures.Future: Future representing the coroutine result.
        """
        return asyncio.run_coroutine_threadsafe(coro, self._loop)

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Synchronously obtain responses for a batch of prompts.

        This is the main entrypoint used by external callers. It handles system
        prompt broadcasting and delegates the actual work to the async batch
        execution on the background loop.

        Args:
            prompts (List[str]): List of user prompts.
            system_prompts (List[str]): List of system prompts. If a single system
                prompt is provided and multiple prompts are given, the system
                prompt is broadcast to all prompts. Otherwise, the list is
                normalized to match the length of `prompts`.

        Returns:
            List[str]: List of model responses corresponding to `prompts`.

        Raises:
            TimeoutError: If waiting on the batch future exceeds `gather_timeout_s + 5.0`.
            Exception: Any underlying error from the async batch execution.
        """
        fut = self._submit(self._aget_batch(prompts, system_prompts))
        try:
            r = fut.result(timeout=self.gather_timeout_s + 5.0)
            return r
        except FuturesTimeout:
            fut.cancel()
            raise TimeoutError(f"LLM batch (future) timed out after {self.gather_timeout_s + 5.0}s")
        except Exception:
            raise

__init__(api_url=None, model_id=None, api_key=None, max_concurrent_calls=32, max_tokens=4096, call_timeout_s=200.0, gather_timeout_s=500.0, max_retries=5, retry_base_delay_s=1, client_kwargs=None, call_kwargs=None, config=None)

Initialize the APILLM.

Parameters:

Name Type Description Default
api_url Optional[str]

Base URL for the API endpoint.

None
model_id Optional[str]

Identifier of the model to call. Must be set.

None
api_key Optional[str]

API key/token for authentication.

None
max_concurrent_calls int

Maximum number of concurrent API calls.

32
max_tokens int

Default maximum number of tokens in model responses.

4096
call_timeout_s float

Per-call timeout in seconds.

200.0
gather_timeout_s float

Timeout in seconds for the entire batch.

500.0
max_retries int

Number of retry attempts per prompt in addition to the initial call.

5
retry_base_delay_s float

Base delay in seconds for exponential backoff between retries.

1
client_kwargs Optional[Dict[str, Any]]

Additional keyword arguments passed to AsyncOpenAI(...).

None
call_kwargs Optional[Dict[str, Any]]

Additional keyword arguments passed to client.chat.completions.create(...).

None
config Optional[ExperimentConfig]

Configuration for the LLM, overriding defaults.

None
Source code in promptolution/llms/api_llm.py
def __init__(
    self,
    api_url: Optional[str] = None,
    model_id: Optional[str] = None,
    api_key: Optional[str] = None,
    max_concurrent_calls: int = 32,
    max_tokens: int = 4096,
    call_timeout_s: float = 200.0,  # per request
    gather_timeout_s: float = 500.0,  # whole batch
    max_retries: int = 5,
    retry_base_delay_s: float = 1,
    client_kwargs: Optional[Dict[str, Any]] = None,
    call_kwargs: Optional[Dict[str, Any]] = None,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the APILLM.

    Args:
        api_url (Optional[str]): Base URL for the API endpoint.
        model_id (Optional[str]): Identifier of the model to call. Must be set.
        api_key (Optional[str]): API key/token for authentication.
        max_concurrent_calls (int): Maximum number of concurrent API calls.
        max_tokens (int): Default maximum number of tokens in model responses.
        call_timeout_s (float): Per-call timeout in seconds.
        gather_timeout_s (float): Timeout in seconds for the entire batch.
        max_retries (int): Number of retry attempts per prompt in addition to the initial call.
        retry_base_delay_s (float): Base delay in seconds for exponential backoff between retries.
        client_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `AsyncOpenAI(...)`.
        call_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed to `client.chat.completions.create(...)`.
        config (Optional[ExperimentConfig]): Configuration for the LLM, overriding defaults.
    """
    self.api_url = api_url
    self.model_id = model_id
    self.api_key = api_key
    self.max_tokens = max_tokens
    self.call_timeout_s = call_timeout_s
    self.gather_timeout_s = gather_timeout_s
    self.max_retries = max_retries
    self.retry_base_delay_s = retry_base_delay_s

    # extra kwargs
    self._client_kwargs: Dict[str, Any] = dict(client_kwargs or {})
    self._call_kwargs: Dict[str, Any] = dict(call_kwargs or {})

    self.max_concurrent_calls = max_concurrent_calls
    super().__init__(config=config)

    # --- persistent loop + semaphore ---
    self._loop = asyncio.new_event_loop()
    self._sem = asyncio.Semaphore(self.max_concurrent_calls)

    def _run_loop() -> None:
        """Run the background event loop forever."""
        asyncio.set_event_loop(self._loop)
        self._loop.run_forever()

    self._thread = threading.Thread(target=_run_loop, name="APILLMLoop", daemon=True)
    self._thread.start()

    # Create client once; can still be customised via client_kwargs.
    self.client = AsyncOpenAI(
        base_url=self.api_url,
        api_key=self.api_key,
        timeout=self.call_timeout_s,
        **self._client_kwargs,
    )

base_llm

Base module for LLMs in the promptolution library.

BaseLLM

Bases: ABC

Abstract base class for Language Models in the promptolution library.

This class defines the interface that all concrete LLM implementations should follow. It's designed to track which configuration parameters are actually used.

Attributes:

Name Type Description
config LLMModelConfig

Configuration for the language model.

input_token_count int

Count of input tokens processed.

output_token_count int

Count of output tokens generated.

tokenizer Optional[PreTrainedTokenizer]

The tokenizer for the model.

Source code in promptolution/llms/base_llm.py
class BaseLLM(ABC):
    """Abstract base class for Language Models in the promptolution library.

    This class defines the interface that all concrete LLM implementations should follow.
    It's designed to track which configuration parameters are actually used.

    Attributes:
        config (LLMModelConfig): Configuration for the language model.
        input_token_count (int): Count of input tokens processed.
        output_token_count (int): Count of output tokens generated.
        tokenizer (Optional[PreTrainedTokenizer]): The tokenizer for the model.
    """

    def __init__(self, config: Optional["ExperimentConfig"] = None):
        """Initialize the LLM with a configuration or direct parameters.

        This constructor supports both config-based and direct parameter initialization
        for backward compatibility.

        Args:
            config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.
        """
        if config is not None:
            config.apply_to(self)
        # Initialize token counters
        self.input_token_count = 0
        self.output_token_count = 0
        self.tokenizer: Optional["PreTrainedTokenizer"] = None

    def get_token_count(self) -> Dict[str, int]:
        """Get the current count of input and output tokens.

        Returns:
            dict: A dictionary containing the input and output token counts.
        """
        return {
            "input_tokens": self.input_token_count,
            "output_tokens": self.output_token_count,
            "total_tokens": self.input_token_count + self.output_token_count,
        }

    def reset_token_count(self) -> None:
        """Reset the token counters to zero."""
        self.input_token_count = 0
        self.output_token_count = 0

    def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
        """Update the token count based on the given inputs and outputs.

        It uses a simple tokenization method (splitting by whitespace) to count tokens in the base class.

        Args:
            inputs (List[str]): A list of input prompts.
            outputs (List[str]): A list of generated responses.
        """
        input_tokens = sum([len(i.split()) for i in inputs])
        output_tokens = sum([len(o.split()) for o in outputs])
        self.input_token_count += input_tokens
        self.output_token_count += output_tokens

    def get_response(
        self, prompts: Union[str, List[str]], system_prompts: Optional[Union[str, List[str]]] = None
    ) -> List[str]:
        """Generate responses for the given prompts.

        This method calls the _get_response method to generate responses
        for the given prompts. It also updates the token count for the
        input and output tokens.

        Args:
            prompts (str or List[str]): Input prompt(s). If a single string is provided,
                                        it's converted to a list containing that string.
            system_prompts (Optional, str or List[str]): System prompt(s) to provide context to the model.

        Returns:
            List[str]: A list of generated responses, one for each input prompt.
        """
        if system_prompts is None:
            system_prompts = DEFAULT_SYS_PROMPT
        if isinstance(prompts, str):
            prompts = [prompts]
        if isinstance(system_prompts, str):
            system_prompts = [system_prompts] * len(prompts)
        responses = self._get_response(prompts, system_prompts)
        self.update_token_count(prompts + system_prompts, responses)

        return responses

    def set_generation_seed(self, seed: int) -> None:
        """Set the random seed for reproducibility per request.

        Args:
            seed (int): Random seed value.
        """
        pass

    @abstractmethod
    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Generate responses for the given prompts.

        This method should be implemented by subclasses to define how
        the LLM generates responses.

        Args:
            prompts (List[str]): A list of input prompts.
            system_prompts (List[str]): A list of system prompts to provide context to the model.

        Returns:
            List[str]: A list of generated responses corresponding to the input prompts.
        """
        raise NotImplementedError

__init__(config=None)

Initialize the LLM with a configuration or direct parameters.

This constructor supports both config-based and direct parameter initialization for backward compatibility.

Parameters:

Name Type Description Default
config ExperimentConfig

Configuration for the LLM, overriding defaults.

None
Source code in promptolution/llms/base_llm.py
def __init__(self, config: Optional["ExperimentConfig"] = None):
    """Initialize the LLM with a configuration or direct parameters.

    This constructor supports both config-based and direct parameter initialization
    for backward compatibility.

    Args:
        config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.
    """
    if config is not None:
        config.apply_to(self)
    # Initialize token counters
    self.input_token_count = 0
    self.output_token_count = 0
    self.tokenizer: Optional["PreTrainedTokenizer"] = None

get_response(prompts, system_prompts=None)

Generate responses for the given prompts.

This method calls the _get_response method to generate responses for the given prompts. It also updates the token count for the input and output tokens.

Parameters:

Name Type Description Default
prompts str or List[str]

Input prompt(s). If a single string is provided, it's converted to a list containing that string.

required
system_prompts (Optional, str or List[str])

System prompt(s) to provide context to the model.

None

Returns:

Type Description
List[str]

List[str]: A list of generated responses, one for each input prompt.

Source code in promptolution/llms/base_llm.py
def get_response(
    self, prompts: Union[str, List[str]], system_prompts: Optional[Union[str, List[str]]] = None
) -> List[str]:
    """Generate responses for the given prompts.

    This method calls the _get_response method to generate responses
    for the given prompts. It also updates the token count for the
    input and output tokens.

    Args:
        prompts (str or List[str]): Input prompt(s). If a single string is provided,
                                    it's converted to a list containing that string.
        system_prompts (Optional, str or List[str]): System prompt(s) to provide context to the model.

    Returns:
        List[str]: A list of generated responses, one for each input prompt.
    """
    if system_prompts is None:
        system_prompts = DEFAULT_SYS_PROMPT
    if isinstance(prompts, str):
        prompts = [prompts]
    if isinstance(system_prompts, str):
        system_prompts = [system_prompts] * len(prompts)
    responses = self._get_response(prompts, system_prompts)
    self.update_token_count(prompts + system_prompts, responses)

    return responses

get_token_count()

Get the current count of input and output tokens.

Returns:

Name Type Description
dict Dict[str, int]

A dictionary containing the input and output token counts.

Source code in promptolution/llms/base_llm.py
def get_token_count(self) -> Dict[str, int]:
    """Get the current count of input and output tokens.

    Returns:
        dict: A dictionary containing the input and output token counts.
    """
    return {
        "input_tokens": self.input_token_count,
        "output_tokens": self.output_token_count,
        "total_tokens": self.input_token_count + self.output_token_count,
    }

reset_token_count()

Reset the token counters to zero.

Source code in promptolution/llms/base_llm.py
def reset_token_count(self) -> None:
    """Reset the token counters to zero."""
    self.input_token_count = 0
    self.output_token_count = 0

set_generation_seed(seed)

Set the random seed for reproducibility per request.

Parameters:

Name Type Description Default
seed int

Random seed value.

required
Source code in promptolution/llms/base_llm.py
def set_generation_seed(self, seed: int) -> None:
    """Set the random seed for reproducibility per request.

    Args:
        seed (int): Random seed value.
    """
    pass

update_token_count(inputs, outputs)

Update the token count based on the given inputs and outputs.

It uses a simple tokenization method (splitting by whitespace) to count tokens in the base class.

Parameters:

Name Type Description Default
inputs List[str]

A list of input prompts.

required
outputs List[str]

A list of generated responses.

required
Source code in promptolution/llms/base_llm.py
def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
    """Update the token count based on the given inputs and outputs.

    It uses a simple tokenization method (splitting by whitespace) to count tokens in the base class.

    Args:
        inputs (List[str]): A list of input prompts.
        outputs (List[str]): A list of generated responses.
    """
    input_tokens = sum([len(i.split()) for i in inputs])
    output_tokens = sum([len(o.split()) for o in outputs])
    self.input_token_count += input_tokens
    self.output_token_count += output_tokens

local_llm

Module for running LLMs locally using the Hugging Face Transformers library.

LocalLLM

Bases: BaseLLM

A class for running language models locally using the Hugging Face Transformers library.

This class sets up a text generation pipeline with specified model parameters and provides a method to generate responses for given prompts.

Attributes:

Name Type Description
pipeline Pipeline

The text generation pipeline.

Methods:

Name Description
get_response

Generate responses for a list of prompts.

Source code in promptolution/llms/local_llm.py
class LocalLLM(BaseLLM):
    """A class for running language models locally using the Hugging Face Transformers library.

    This class sets up a text generation pipeline with specified model parameters
    and provides a method to generate responses for given prompts.

    Attributes:
        pipeline (transformers.Pipeline): The text generation pipeline.

    Methods:
        get_response: Generate responses for a list of prompts.
    """

    def __init__(self, model_id: str, batch_size: int = 8, config: Optional["ExperimentConfig"] = None) -> None:
        """Initialize the LocalLLM with a specific model.

        Args:
            model_id (str): The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").
            batch_size (int, optional): The batch size for text generation. Defaults to 8.
            config (ExperimentConfig, optional): "ExperimentConfig" overwriting defaults.

        Note:
            This method sets up a text generation pipeline with bfloat16 precision,
            automatic device mapping, and specific generation parameters.
        """
        if not imports_successful:
            raise ImportError(
                "Could not import at least one of the required libraries: torch, transformers. "
                "Please ensure they are installed in your environment."
            )
        self.pipeline: Pipeline = pipeline(
            "text-generation",
            model=model_id,
            model_kwargs={"torch_dtype": torch.bfloat16},
            device_map="auto",
            max_new_tokens=256,
            batch_size=batch_size,
            num_return_sequences=1,
            return_full_text=False,
        )
        super().__init__(config)
        self.tokenizer = self.pipeline.tokenizer
        assert self.tokenizer is not None, "Tokenizer must be initialized."
        self.eos_token_id = self.tokenizer.eos_token_id
        self.tokenizer.pad_token_id = self.eos_token_id
        self.tokenizer.padding_side = "left"

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Generate responses for a list of prompts using the local language model.

        Args:
            prompts (list[str]): A list of input prompts.
            system_prompts (list[str]): A list of system prompts to guide the model's behavior.

        Returns:
            list[str]: A list of generated responses corresponding to the input prompts.

        Note:
            This method uses torch.no_grad() for inference to reduce memory usage.
            It handles both single and batch inputs, ensuring consistent output format.
        """
        inputs: List[List[Dict[str, str]]] = []
        for prompt, sys_prompt in zip(prompts, system_prompts):
            inputs.append([{"role": "system", "prompt": sys_prompt}, {"role": "user", "prompt": prompt}])

        with torch.no_grad():
            response = self.pipeline(inputs, pad_token_id=self.eos_token_id)

        if len(response) != 1:
            response = [r[0] if isinstance(r, list) else r for r in response]

        response = [r["generated_text"] for r in response]
        return response

    def __del__(self) -> None:
        """Cleanup method to delete the pipeline and free up GPU memory."""
        if hasattr(self, "pipeline"):
            del self.pipeline
        if "torch" in globals() and hasattr(torch, "cuda") and torch.cuda.is_available():
            torch.cuda.empty_cache()

__del__()

Cleanup method to delete the pipeline and free up GPU memory.

Source code in promptolution/llms/local_llm.py
def __del__(self) -> None:
    """Cleanup method to delete the pipeline and free up GPU memory."""
    if hasattr(self, "pipeline"):
        del self.pipeline
    if "torch" in globals() and hasattr(torch, "cuda") and torch.cuda.is_available():
        torch.cuda.empty_cache()

__init__(model_id, batch_size=8, config=None)

Initialize the LocalLLM with a specific model.

Parameters:

Name Type Description Default
model_id str

The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").

required
batch_size int

The batch size for text generation. Defaults to 8.

8
config ExperimentConfig

"ExperimentConfig" overwriting defaults.

None
Note

This method sets up a text generation pipeline with bfloat16 precision, automatic device mapping, and specific generation parameters.

Source code in promptolution/llms/local_llm.py
def __init__(self, model_id: str, batch_size: int = 8, config: Optional["ExperimentConfig"] = None) -> None:
    """Initialize the LocalLLM with a specific model.

    Args:
        model_id (str): The identifier of the model to use (e.g., "gpt2", "facebook/opt-1.3b").
        batch_size (int, optional): The batch size for text generation. Defaults to 8.
        config (ExperimentConfig, optional): "ExperimentConfig" overwriting defaults.

    Note:
        This method sets up a text generation pipeline with bfloat16 precision,
        automatic device mapping, and specific generation parameters.
    """
    if not imports_successful:
        raise ImportError(
            "Could not import at least one of the required libraries: torch, transformers. "
            "Please ensure they are installed in your environment."
        )
    self.pipeline: Pipeline = pipeline(
        "text-generation",
        model=model_id,
        model_kwargs={"torch_dtype": torch.bfloat16},
        device_map="auto",
        max_new_tokens=256,
        batch_size=batch_size,
        num_return_sequences=1,
        return_full_text=False,
    )
    super().__init__(config)
    self.tokenizer = self.pipeline.tokenizer
    assert self.tokenizer is not None, "Tokenizer must be initialized."
    self.eos_token_id = self.tokenizer.eos_token_id
    self.tokenizer.pad_token_id = self.eos_token_id
    self.tokenizer.padding_side = "left"

vllm

Module for running language models locally using the vLLM library.

VLLM

Bases: BaseLLM

A class for running language models using the vLLM library.

This class sets up a vLLM inference engine with specified model parameters and provides a method to generate responses for given prompts.

Attributes:

Name Type Description
llm LLM

The vLLM inference engine.

tokenizer PreTrainedTokenizer

The tokenizer for the model.

sampling_params SamplingParams

Parameters for text generation.

Methods:

Name Description
get_response

Generate responses for a list of prompts.

update_token_count

Update the token count based on the given inputs and outputs.

Source code in promptolution/llms/vllm.py
class VLLM(BaseLLM):
    """A class for running language models using the vLLM library.

    This class sets up a vLLM inference engine with specified model parameters
    and provides a method to generate responses for given prompts.

    Attributes:
        llm (vllm.LLM): The vLLM inference engine.
        tokenizer (PreTrainedTokenizer): The tokenizer for the model.
        sampling_params (vllm.SamplingParams): Parameters for text generation.

    Methods:
        get_response: Generate responses for a list of prompts.
        update_token_count: Update the token count based on the given inputs and outputs.
    """

    tokenizer: "PreTrainedTokenizer"

    def __init__(
        self,
        model_id: str,
        batch_size: Optional[int] = None,
        max_generated_tokens: int = 256,
        temperature: float = 0.1,
        top_p: float = 0.9,
        model_storage_path: Optional[str] = None,
        dtype: str = "auto",
        tensor_parallel_size: int = 1,
        gpu_memory_utilization: float = 0.95,
        max_model_len: int = 2048,
        trust_remote_code: bool = False,
        seed: int = 42,
        llm_kwargs: Optional[Dict[str, Any]] = None,
        config: Optional["ExperimentConfig"] = None,
    ) -> None:
        """Initialize the VLLM with a specific model.

        Args:
            model_id (str): The identifier of the model to use.
            batch_size (int, optional): The batch size for text generation. Defaults to 8.
            max_generated_tokens (int, optional): Maximum number of tokens to generate. Defaults to 256.
            temperature (float, optional): Sampling temperature. Defaults to 0.1.
            top_p (float, optional): Top-p sampling parameter. Defaults to 0.9.
            model_storage_path (str, optional): Directory to store the model. Defaults to None.
            dtype (str, optional): Data type for model weights. Defaults to "float16".
            tensor_parallel_size (int, optional): Number of GPUs for tensor parallelism. Defaults to 1.
            gpu_memory_utilization (float, optional): Fraction of GPU memory to use. Defaults to 0.95.
            max_model_len (int, optional): Maximum sequence length for the model. Defaults to 2048.
            trust_remote_code (bool, optional): Whether to trust remote code. Defaults to False.
            seed (int, optional): Random seed for the model. Defaults to 42.
            llm_kwargs (dict, optional): Additional keyword arguments for the LLM. Defaults to None.
            config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.

        Note:
            This method sets up a vLLM engine with specified parameters for efficient inference.
        """
        if not imports_successful:
            raise ImportError(
                "Could not import at least one of the required libraries: transformers, vllm. "
                "Please ensure they are installed in your environment."
            )

        self.dtype = dtype
        self.tensor_parallel_size = tensor_parallel_size
        self.gpu_memory_utilization = gpu_memory_utilization
        self.max_model_len = max_model_len
        self.trust_remote_code = trust_remote_code

        super().__init__(config)

        # Configure sampling parameters
        self.sampling_params = SamplingParams(
            temperature=temperature, top_p=top_p, max_tokens=max_generated_tokens, seed=seed
        )

        llm_kwargs = llm_kwargs or {}
        # Initialize the vLLM engine with both explicit parameters and any additional kwargs
        llm_params: Dict[str, Any] = {
            "model": model_id,
            "tokenizer": model_id,
            "dtype": self.dtype,
            "tensor_parallel_size": self.tensor_parallel_size,
            "gpu_memory_utilization": self.gpu_memory_utilization,
            "max_model_len": self.max_model_len,
            "download_dir": model_storage_path,
            "trust_remote_code": self.trust_remote_code,
            "seed": seed,
            **llm_kwargs,
        }

        self.llm = LLM(**llm_params)

        # Initialize tokenizer separately for potential pre-processing
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

        if batch_size is None:
            cache_config = self.llm.llm_engine.model_executor.cache_config
            if (
                cache_config.num_gpu_blocks is not None
                and cache_config.block_size is not None
                and self.max_model_len is not None
            ):
                self.batch_size = int(
                    (cache_config.num_gpu_blocks * cache_config.block_size / self.max_model_len) * 0.95
                )
                logger.info(f"🚀 Batch size set to {self.batch_size} based on GPU memory.")
            else:
                self.batch_size = 1
                logger.warning("⚠️ Could not determine batch size from GPU memory. Using batch size of 1.")
        else:
            self.batch_size = batch_size

    def _get_response(self, prompts: List[str], system_prompts: List[str]) -> List[str]:
        """Generate responses for a list of prompts using the vLLM engine.

        Args:
            prompts (list[str]): A list of input prompts.
            system_prompts (list[str]): A list of system prompts to guide the model's behavior.

        Returns:
            list[str]: A list of generated responses corresponding to the input prompts.

        Note:
            This method uses vLLM's batched generation capabilities for efficient inference.
            It also counts input and output tokens.
        """
        prompts = [
            str(
                self.tokenizer.apply_chat_template(
                    [
                        {
                            "role": "system",
                            "content": sys_prompt,
                        },
                        {"role": "user", "content": prompt},
                    ],
                    tokenize=False,
                    add_generation_prompt=True,
                )
            )
            for prompt, sys_prompt in zip(prompts, system_prompts)
        ]

        # generate responses for self.batch_size prompts at the same time
        all_responses = []
        for i in range(0, len(prompts), self.batch_size):
            batch = prompts[i : i + self.batch_size]
            outputs = self.llm.generate(batch, self.sampling_params)
            responses = [output.outputs[0].text for output in outputs]

            all_responses.extend(responses)

        return all_responses

    def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
        """Update the token count based on the given inputs and outputs.

            Uses the tokenizer to count the tokens.

        Args:
            inputs (List[str]): A list of input prompts.
            outputs (List[str]): A list of generated responses.
        """
        for input in inputs:
            self.input_token_count += len(self.tokenizer.encode(input))

        for output in outputs:
            self.output_token_count += len(self.tokenizer.encode(output))

    def set_generation_seed(self, seed: int) -> None:
        """Set the random seed for text generation.

        Args:
            seed (int): Random seed for text generation.
        """
        self.sampling_params.seed = seed

__init__(model_id, batch_size=None, max_generated_tokens=256, temperature=0.1, top_p=0.9, model_storage_path=None, dtype='auto', tensor_parallel_size=1, gpu_memory_utilization=0.95, max_model_len=2048, trust_remote_code=False, seed=42, llm_kwargs=None, config=None)

Initialize the VLLM with a specific model.

Parameters:

Name Type Description Default
model_id str

The identifier of the model to use.

required
batch_size int

The batch size for text generation. Defaults to 8.

None
max_generated_tokens int

Maximum number of tokens to generate. Defaults to 256.

256
temperature float

Sampling temperature. Defaults to 0.1.

0.1
top_p float

Top-p sampling parameter. Defaults to 0.9.

0.9
model_storage_path str

Directory to store the model. Defaults to None.

None
dtype str

Data type for model weights. Defaults to "float16".

'auto'
tensor_parallel_size int

Number of GPUs for tensor parallelism. Defaults to 1.

1
gpu_memory_utilization float

Fraction of GPU memory to use. Defaults to 0.95.

0.95
max_model_len int

Maximum sequence length for the model. Defaults to 2048.

2048
trust_remote_code bool

Whether to trust remote code. Defaults to False.

False
seed int

Random seed for the model. Defaults to 42.

42
llm_kwargs dict

Additional keyword arguments for the LLM. Defaults to None.

None
config ExperimentConfig

Configuration for the LLM, overriding defaults.

None
Note

This method sets up a vLLM engine with specified parameters for efficient inference.

Source code in promptolution/llms/vllm.py
def __init__(
    self,
    model_id: str,
    batch_size: Optional[int] = None,
    max_generated_tokens: int = 256,
    temperature: float = 0.1,
    top_p: float = 0.9,
    model_storage_path: Optional[str] = None,
    dtype: str = "auto",
    tensor_parallel_size: int = 1,
    gpu_memory_utilization: float = 0.95,
    max_model_len: int = 2048,
    trust_remote_code: bool = False,
    seed: int = 42,
    llm_kwargs: Optional[Dict[str, Any]] = None,
    config: Optional["ExperimentConfig"] = None,
) -> None:
    """Initialize the VLLM with a specific model.

    Args:
        model_id (str): The identifier of the model to use.
        batch_size (int, optional): The batch size for text generation. Defaults to 8.
        max_generated_tokens (int, optional): Maximum number of tokens to generate. Defaults to 256.
        temperature (float, optional): Sampling temperature. Defaults to 0.1.
        top_p (float, optional): Top-p sampling parameter. Defaults to 0.9.
        model_storage_path (str, optional): Directory to store the model. Defaults to None.
        dtype (str, optional): Data type for model weights. Defaults to "float16".
        tensor_parallel_size (int, optional): Number of GPUs for tensor parallelism. Defaults to 1.
        gpu_memory_utilization (float, optional): Fraction of GPU memory to use. Defaults to 0.95.
        max_model_len (int, optional): Maximum sequence length for the model. Defaults to 2048.
        trust_remote_code (bool, optional): Whether to trust remote code. Defaults to False.
        seed (int, optional): Random seed for the model. Defaults to 42.
        llm_kwargs (dict, optional): Additional keyword arguments for the LLM. Defaults to None.
        config (ExperimentConfig, optional): Configuration for the LLM, overriding defaults.

    Note:
        This method sets up a vLLM engine with specified parameters for efficient inference.
    """
    if not imports_successful:
        raise ImportError(
            "Could not import at least one of the required libraries: transformers, vllm. "
            "Please ensure they are installed in your environment."
        )

    self.dtype = dtype
    self.tensor_parallel_size = tensor_parallel_size
    self.gpu_memory_utilization = gpu_memory_utilization
    self.max_model_len = max_model_len
    self.trust_remote_code = trust_remote_code

    super().__init__(config)

    # Configure sampling parameters
    self.sampling_params = SamplingParams(
        temperature=temperature, top_p=top_p, max_tokens=max_generated_tokens, seed=seed
    )

    llm_kwargs = llm_kwargs or {}
    # Initialize the vLLM engine with both explicit parameters and any additional kwargs
    llm_params: Dict[str, Any] = {
        "model": model_id,
        "tokenizer": model_id,
        "dtype": self.dtype,
        "tensor_parallel_size": self.tensor_parallel_size,
        "gpu_memory_utilization": self.gpu_memory_utilization,
        "max_model_len": self.max_model_len,
        "download_dir": model_storage_path,
        "trust_remote_code": self.trust_remote_code,
        "seed": seed,
        **llm_kwargs,
    }

    self.llm = LLM(**llm_params)

    # Initialize tokenizer separately for potential pre-processing
    self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    if batch_size is None:
        cache_config = self.llm.llm_engine.model_executor.cache_config
        if (
            cache_config.num_gpu_blocks is not None
            and cache_config.block_size is not None
            and self.max_model_len is not None
        ):
            self.batch_size = int(
                (cache_config.num_gpu_blocks * cache_config.block_size / self.max_model_len) * 0.95
            )
            logger.info(f"🚀 Batch size set to {self.batch_size} based on GPU memory.")
        else:
            self.batch_size = 1
            logger.warning("⚠️ Could not determine batch size from GPU memory. Using batch size of 1.")
    else:
        self.batch_size = batch_size

set_generation_seed(seed)

Set the random seed for text generation.

Parameters:

Name Type Description Default
seed int

Random seed for text generation.

required
Source code in promptolution/llms/vllm.py
def set_generation_seed(self, seed: int) -> None:
    """Set the random seed for text generation.

    Args:
        seed (int): Random seed for text generation.
    """
    self.sampling_params.seed = seed

update_token_count(inputs, outputs)

Update the token count based on the given inputs and outputs.

Uses the tokenizer to count the tokens.

Parameters:

Name Type Description Default
inputs List[str]

A list of input prompts.

required
outputs List[str]

A list of generated responses.

required
Source code in promptolution/llms/vllm.py
def update_token_count(self, inputs: List[str], outputs: List[str]) -> None:
    """Update the token count based on the given inputs and outputs.

        Uses the tokenizer to count the tokens.

    Args:
        inputs (List[str]): A list of input prompts.
        outputs (List[str]): A list of generated responses.
    """
    for input in inputs:
        self.input_token_count += len(self.tokenizer.encode(input))

    for output in outputs:
        self.output_token_count += len(self.tokenizer.encode(output))