import copy
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union
from ConfigSpace.configuration_space import Configuration, ConfigurationSpace
from ConfigSpace.forbidden import (
ForbiddenAndConjunction,
ForbiddenEqualsClause,
ForbiddenInClause
)
import numpy as np
import pandas as pd
from sklearn.base import RegressorMixin
import torch
from autoPyTorch.constants import STRING_TO_TASK_TYPES
from autoPyTorch.datasets.base_dataset import BaseDatasetPropertiesType
from autoPyTorch.datasets.time_series_dataset import TimeSeriesSequence
from autoPyTorch.pipeline.base_pipeline import BasePipeline
from autoPyTorch.pipeline.components.base_choice import autoPyTorchChoice
from autoPyTorch.pipeline.components.base_component import autoPyTorchComponent
from autoPyTorch.pipeline.components.preprocessing.time_series_preprocessing.TimeSeriesTransformer import (
TimeSeriesFeatureTransformer
)
from autoPyTorch.pipeline.components.preprocessing.time_series_preprocessing.encoding import TimeSeriesEncoderChoice
from autoPyTorch.pipeline.components.preprocessing.time_series_preprocessing.imputation.TimeSeriesImputer import (
TimeSeriesFeatureImputer,
TimeSeriesTargetImputer
)
from autoPyTorch.pipeline.components.preprocessing.time_series_preprocessing.scaling.base_scaler import (
BaseScaler
)
from autoPyTorch.pipeline.components.setup.early_preprocessor.TimeSeriesEarlyPreProcessing import (
TimeSeriesEarlyPreprocessing,
TimeSeriesTargetEarlyPreprocessing
)
from autoPyTorch.pipeline.components.setup.forecasting_target_scaling.base_target_scaler import BaseTargetScaler
from autoPyTorch.pipeline.components.setup.forecasting_training_loss import ForecastingLossChoices
from autoPyTorch.pipeline.components.setup.lr_scheduler import SchedulerChoice
from autoPyTorch.pipeline.components.setup.network.forecasting_network import ForecastingNetworkComponent
from autoPyTorch.pipeline.components.setup.network_backbone.forecasting_backbone import ForecastingNetworkChoice
from autoPyTorch.pipeline.components.setup.network_embedding import NetworkEmbeddingChoice
from autoPyTorch.pipeline.components.setup.network_head.forecasting_network_head.forecasting_head import ForecastingHead
from autoPyTorch.pipeline.components.setup.network_initializer import NetworkInitializerChoice
from autoPyTorch.pipeline.components.setup.optimizer import OptimizerChoice
from autoPyTorch.pipeline.components.training.data_loader.time_series_forecasting_data_loader import (
TimeSeriesForecastingDataLoader
)
from autoPyTorch.pipeline.components.training.trainer.forecasting_trainer import ForecastingTrainerChoice
from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates
[docs]class TimeSeriesForecastingPipeline(RegressorMixin, BasePipeline):
"""This class is a proof of concept to integrate AutoPyTorch Components
It implements a pipeline, which includes as steps:
->One preprocessing step
->One neural network
Contrary to the sklearn API it is not possible to enumerate the
possible parameters in the __init__ function because we only know the
available regressors at runtime. For this reason the user must
specifiy the parameters by passing an instance of
ConfigSpace.configuration_space.Configuration.
Args:
config (Configuration):
The configuration to evaluate.
random_state (Optional[RandomState):
random_state is the random number generator
Attributes:
"""
def __init__(self,
config: Optional[Configuration] = None,
steps: Optional[List[Tuple[str, Union[autoPyTorchComponent, autoPyTorchChoice]]]] = None,
dataset_properties: Optional[Dict[str, BaseDatasetPropertiesType]] = None,
include: Optional[Dict[str, Any]] = None,
exclude: Optional[Dict[str, Any]] = None,
random_state: Optional[np.random.RandomState] = None,
init_params: Optional[Dict[str, Any]] = None,
search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None,
):
BasePipeline.__init__(self,
config, steps, dataset_properties, include, exclude,
random_state, init_params, search_space_updates)
# Because a pipeline is passed to a worker, we need to honor the random seed
# in this context. A tabular regression pipeline will implement a torch
# model, so we comply with https://pytorch.org/docs/stable/notes/randomness.html
torch.manual_seed(self.random_state.get_state()[1][0])
[docs] def score(self, X: List[Union[np.ndarray, pd.DataFrame, TimeSeriesSequence]],
y: np.ndarray, batch_size: Optional[int] = None, **score_kwargs: Any) -> float:
"""Scores the fitted estimator on (X, y)
Args:
X (List[Union[np.ndarray, pd.DataFrame, TimeSeriesSequence]]):
input to the pipeline, from which to guess targets
batch_size (Optional[int]):
batch_size controls whether the pipeline will be called on small chunks of the data.
Useful when calling the predict method on the whole array X results in a MemoryError.
Returns:
np.ndarray:
coefficient of determination R^2 of the prediction
"""
from autoPyTorch.pipeline.components.training.metrics.utils import (
calculate_score, get_metrics)
metrics = get_metrics(self.dataset_properties, ['mean_MAPE_forecasting'])
y_pred = self.predict(X, batch_size=batch_size) # type: ignore[arg-types]
r2 = calculate_score(y, y_pred, task_type=STRING_TO_TASK_TYPES[str(self.dataset_properties['task_type'])],
metrics=metrics, **score_kwargs)['mean_MAPE_forecasting']
return r2
def _get_hyperparameter_search_space(self,
dataset_properties: Dict[str, Any],
include: Optional[Dict[str, Any]] = None,
exclude: Optional[Dict[str, Any]] = None,
) -> ConfigurationSpace:
"""Create the hyperparameter configuration space.
For the given steps, and the Choices within that steps,
this procedure returns a configuration space object to
explore.
Args:
include (Optional[Dict[str, Any]]):
what hyper-parameter configurations to honor when creating the configuration space
exclude (Optional[Dict[str, Any]]):
what hyper-parameter configurations to remove from the configuration space
dataset_properties (Optional[Dict[str, Union[str, int]]]):
Characteristics of the dataset to guide the pipeline choices of components
Returns:
cs (Configuration):
The configuration space describing the TimeSeriesRegressionPipeline.
"""
cs = ConfigurationSpace()
if not isinstance(dataset_properties, dict):
warnings.warn('The given dataset_properties argument contains an illegal value.'
'Proceeding with the default value')
dataset_properties = dict()
if 'target_type' not in dataset_properties:
dataset_properties['target_type'] = 'time_series_forecasting'
if dataset_properties['target_type'] != 'time_series_forecasting':
warnings.warn('Time series forecasting is being used, however the target_type'
'is not given as "time_series_forecasting". Overriding it.')
dataset_properties['target_type'] = 'time_series_forecasting'
# get the base search space given this
# dataset properties. Then overwrite with custom
# regression requirements
cs = self._get_base_search_space(
cs=cs, dataset_properties=dataset_properties,
exclude=exclude, include=include, pipeline=self.steps)
# Here we add custom code, like this with this
# is not a valid configuration
# Learned Entity Embedding is only valid when encoder is one hot encoder
if 'network_embedding' in self.named_steps.keys():
embeddings = cs.get_hyperparameter('network_embedding:__choice__').choices
if 'LearnedEntityEmbedding' in embeddings:
if 'feature_encoding' in self.named_steps.keys():
feature_encodings = cs.get_hyperparameter('feature_encoding:__choice__').choices
default = cs.get_hyperparameter('network_embedding:__choice__').default_value
possible_default_embeddings = copy.copy(list(embeddings))
del possible_default_embeddings[possible_default_embeddings.index(default)]
for encoding in feature_encodings:
if encoding == 'OneHotEncoder':
continue
while True:
try:
cs.add_forbidden_clause(ForbiddenAndConjunction(
ForbiddenEqualsClause(cs.get_hyperparameter(
'network_embedding:__choice__'), 'LearnedEntityEmbedding'),
ForbiddenEqualsClause(
cs.get_hyperparameter('feature_encoding:__choice__'), encoding
)))
break
except ValueError:
# change the default and try again
try:
default = possible_default_embeddings.pop()
except IndexError:
raise ValueError("Cannot find a legal default configuration")
cs.get_hyperparameter('network_embedding:__choice__').default_value = default
if 'network_backbone:flat_encoder:__choice__' in cs:
hp_flat_encoder = cs.get_hyperparameter('network_backbone:flat_encoder:__choice__')
if 'NBEATSEncoder' in hp_flat_encoder.choices:
cs.add_forbidden_clause(ForbiddenAndConjunction(
ForbiddenEqualsClause(hp_flat_encoder, 'NBEATSEncoder'),
ForbiddenEqualsClause(cs.get_hyperparameter(
'network_embedding:__choice__'), 'LearnedEntityEmbedding'))
)
# dist_cls and auto_regressive are only activate if the network outputs distribution
if 'loss' in self.named_steps.keys() and 'network_backbone' in self.named_steps.keys():
hp_loss = cs.get_hyperparameter('loss:__choice__')
ar_forbidden = True
hp_deepAR = []
for hp_name in cs.get_hyperparameter_names():
if hp_name.startswith('network_backbone:'):
if hp_name.endswith(':auto_regressive'):
hp_deepAR.append(cs.get_hyperparameter(hp_name))
# DeepAR
forbidden_losses_all = []
losses_non_ar = []
for loss in hp_loss.choices:
if loss != "DistributionLoss":
losses_non_ar.append(loss)
if losses_non_ar:
forbidden_hp_regression_loss = ForbiddenInClause(hp_loss, losses_non_ar)
for hp_ar in hp_deepAR:
if True in hp_ar.choices:
forbidden_hp_dist = ForbiddenEqualsClause(hp_ar, ar_forbidden)
forbidden_hp_dist = ForbiddenAndConjunction(forbidden_hp_dist, forbidden_hp_regression_loss)
forbidden_losses_all.append(forbidden_hp_dist)
if "network_backbone:seq_encoder:decoder_auto_regressive" in cs:
decoder_auto_regressive = cs.get_hyperparameter("network_backbone:seq_encoder:decoder_auto_regressive")
forecast_strategy = cs.get_hyperparameter("loss:DistributionLoss:forecast_strategy")
use_tf = cs.get_hyperparameter("network_backbone:seq_encoder:use_temporal_fusion")
if True in decoder_auto_regressive.choices and\
'sample' in forecast_strategy.choices and True in use_tf.choices:
cs.add_forbidden_clause(
ForbiddenAndConjunction(
ForbiddenEqualsClause(decoder_auto_regressive, True),
ForbiddenEqualsClause(forecast_strategy, 'sample'),
ForbiddenEqualsClause(use_tf, True)
)
)
if 'network_backbone:flat_encoder:__choice__' in cs:
network_flat_encoder_hp = cs.get_hyperparameter('network_backbone:flat_encoder:__choice__')
if 'MLPEncoder' in network_flat_encoder_hp.choices:
forbidden = ['MLPEncoder']
forbidden_deepAREncoder = [
forbid for forbid in forbidden if forbid in network_flat_encoder_hp.choices
]
for hp_ar in hp_deepAR:
if True in hp_ar.choices:
forbidden_hp_ar = ForbiddenEqualsClause(hp_ar, ar_forbidden)
forbidden_hp_mlpencoder = ForbiddenInClause(network_flat_encoder_hp,
forbidden_deepAREncoder)
forbidden_hp_ar_mlp = ForbiddenAndConjunction(forbidden_hp_ar, forbidden_hp_mlpencoder)
forbidden_losses_all.append(forbidden_hp_ar_mlp)
if 'loss:DistributionLoss:forecast_strategy' in cs:
forecast_strategy = cs.get_hyperparameter('loss:DistributionLoss:forecast_strategy')
if 'mean' in forecast_strategy.choices:
for hp_ar in hp_deepAR:
if True in hp_ar.choices:
forbidden_hp_ar = ForbiddenEqualsClause(hp_ar, ar_forbidden)
forbidden_hp_forecast_strategy = ForbiddenEqualsClause(forecast_strategy, 'mean')
forbidden_hp_ar_forecast_strategy = ForbiddenAndConjunction(forbidden_hp_ar,
forbidden_hp_forecast_strategy)
forbidden_losses_all.append(forbidden_hp_ar_forecast_strategy)
if forbidden_losses_all:
cs.add_forbidden_clauses(forbidden_losses_all)
# NBEATS
network_encoder_hp = cs.get_hyperparameter("network_backbone:__choice__")
forbidden_NBEATS = []
encoder_non_flat = [choice for choice in network_encoder_hp.choices if choice != 'flat_encoder']
loss_non_regression = [choice for choice in hp_loss.choices if choice != 'RegressionLoss']
data_loader_backcast = cs.get_hyperparameter('data_loader:backcast')
forbidden_encoder_non_flat = ForbiddenInClause(network_encoder_hp, encoder_non_flat)
forbidden_loss_non_regression = ForbiddenInClause(hp_loss, loss_non_regression)
forbidden_backcast = ForbiddenEqualsClause(data_loader_backcast, True)
if 'network_backbone:flat_encoder:__choice__' in cs:
hp_flat_encoder = cs.get_hyperparameter("network_backbone:flat_encoder:__choice__")
# Ensure that NBEATS encoder only works with NBEATS decoder
if 'NBEATSEncoder' in hp_flat_encoder.choices:
forbidden_NBEATS.append(ForbiddenAndConjunction(
ForbiddenEqualsClause(hp_flat_encoder, 'NBEATSEncoder'),
forbidden_loss_non_regression)
)
transform_time_features = "data_loader:transform_time_features"
if transform_time_features in cs:
hp_ttf = cs.get_hyperparameter(transform_time_features)
forbidden_NBEATS.append(ForbiddenAndConjunction(
ForbiddenEqualsClause(hp_flat_encoder, 'NBEATSEncoder'),
ForbiddenEqualsClause(hp_ttf, True))
)
forbidden_NBEATS.append(ForbiddenAndConjunction(
forbidden_backcast,
forbidden_encoder_non_flat
))
cs.add_forbidden_clauses(forbidden_NBEATS)
self.configuration_space = cs
self.dataset_properties = dataset_properties
return cs
def _get_pipeline_steps(self, dataset_properties: Optional[Dict[str, Any]]) -> List[Tuple[str, autoPyTorchChoice]]:
"""
Defines what steps a pipeline should follow.
The step itself has choices given via autoPyTorchChoice.
One key difference between Forecasting pipeline and others is that we put "data_loader"
before "network_backbone" such that
Returns:
List[Tuple[str, autoPyTorchChoice]]:
list of steps sequentially exercised by the pipeline.
"""
steps = [] # type: List[Tuple[str, autoPyTorchChoice]]
default_dataset_properties: Dict[str, BaseDatasetPropertiesType] = {'target_type': 'time_series_prediction'}
if dataset_properties is not None:
default_dataset_properties.update(dataset_properties)
if not default_dataset_properties.get("uni_variant", False):
steps.extend([("impute", TimeSeriesFeatureImputer(random_state=self.random_state)),
("scaler", BaseScaler(random_state=self.random_state)),
('feature_encoding', TimeSeriesEncoderChoice(default_dataset_properties,
random_state=self.random_state)),
("time_series_transformer", TimeSeriesFeatureTransformer(random_state=self.random_state)),
("preprocessing", TimeSeriesEarlyPreprocessing(random_state=self.random_state)),
])
steps.extend([
("target_imputer", TimeSeriesTargetImputer(random_state=self.random_state)),
("target_preprocessing", TimeSeriesTargetEarlyPreprocessing(random_state=self.random_state)),
('loss', ForecastingLossChoices(default_dataset_properties, random_state=self.random_state)),
("target_scaler", BaseTargetScaler(random_state=self.random_state)),
("data_loader", TimeSeriesForecastingDataLoader(random_state=self.random_state)),
("network_embedding", NetworkEmbeddingChoice(default_dataset_properties,
random_state=self.random_state)),
("network_backbone", ForecastingNetworkChoice(dataset_properties=default_dataset_properties,
random_state=self.random_state)),
("network_head", ForecastingHead(random_state=self.random_state)),
("network", ForecastingNetworkComponent(random_state=self.random_state)),
("network_init", NetworkInitializerChoice(default_dataset_properties,
random_state=self.random_state)),
("optimizer", OptimizerChoice(default_dataset_properties,
random_state=self.random_state)),
("lr_scheduler", SchedulerChoice(default_dataset_properties,
random_state=self.random_state)),
("trainer", ForecastingTrainerChoice(default_dataset_properties, random_state=self.random_state)),
])
return steps
[docs] def get_pipeline_representation(self) -> Dict[str, str]:
"""
Returns a representation of the pipeline, so that it can be
consumed and formatted by the API.
It should be a representation that follows:
[{'PreProcessing': <>, 'Estimator': <>}]
Returns:
Dict: contains the pipeline representation in a short format
"""
preprocessing: List[str] = []
estimator: List[str] = []
skip_steps = ['data_loader', 'trainer', 'lr_scheduler', 'optimizer', 'network_init',
'preprocessing', 'time_series_transformer']
for step_name, step_component in self.steps:
if step_name in skip_steps:
continue
properties: Dict[str, Any] = {}
if isinstance(step_component, autoPyTorchChoice) and step_component.choice is not None:
properties = step_component.choice.get_properties()
elif isinstance(step_component, autoPyTorchComponent):
properties = step_component.get_properties()
if 'shortname' in properties:
if 'network' in step_name:
estimator.append(properties['shortname'])
else:
preprocessing.append(properties['shortname'])
return {
'Preprocessing': ','.join(preprocessing),
'Estimator': ','.join(estimator),
}
def _get_estimator_hyperparameter_name(self) -> str:
"""
Returns the name of the current estimator.
Returns:
str: name of the pipeline type
"""
return "time_series_forecasting"
[docs] def predict(self,
X: List[Union[np.ndarray, pd.DataFrame, TimeSeriesSequence]], # type: ignore[override]
batch_size: Optional[int] = None) -> np.ndarray:
"""Predict the output using the selected model.
Args:
X (List[Union[np.ndarray, pd.DataFrame, TimeSeriesSequence]]):
input data to predict
batch_size (Optional[int]):
batch_size controls whether the pipeline will be called on small chunks of the data.
Useful when calling the predict method on the whole array X results in a MemoryError.
Returns:
np.ndarray:
the predicted values given input X
"""
# Pre-process X
if batch_size is None:
warnings.warn("Batch size not provided. "
"Will use 1000 instead")
batch_size = 1000
loader = self.named_steps['data_loader'].get_loader(X=X, batch_size=batch_size)
try:
return self.named_steps['network'].predict(loader).flatten()
except Exception as e:
# https://github.com/pytorch/fairseq/blob/50a671f78d0c8de0392f924180db72ac9b41b801/fairseq/trainer.py#L283
if 'out of memory' in str(e):
if batch_size <= 1:
raise e
warnings.warn('| WARNING: ran out of memory, retrying batch')
torch.cuda.empty_cache()
batch_size = batch_size // 2
return self.predict(X, batch_size=batch_size // 2).flatten()
else:
raise e