Source code for cave.plot.algorithm_footprint
import itertools
import logging
import os
import time
from collections import OrderedDict
import matplotlib.pyplot as plt
import numpy as np
plt.style.use(os.path.join(os.path.dirname(__file__), 'mpl_style')) # noqa
from scipy import spatial
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from bokeh.plotting import figure, ColumnDataSource
from bokeh.models import HoverTool, CustomJS, CDSView, GroupFilter
from bokeh.models.widgets import RadioButtonGroup
from bokeh.models.ranges import DataRange1d
from bokeh.layouts import row, column, widgetbox
from smac.runhistory.runhistory import RunHistory
from cave.utils.helpers import get_cost_dict_for_config
from cave.utils.io import export_bokeh
__author__ = "Joshua Marben"
__copyright__ = "Copyright 2017, ML4AAD"
__license__ = "3-clause BSD"
__maintainer__ = "Joshua Marben"
__email__ = "jo.ma@posteo.de"
[docs]class AlgorithmFootprintPlotter(object):
""" Class that provides the algorithmic footprints after
"Measuring algorithm footprints in instance space"
(Kate Smith-Miles, Kate Smith-Miles)
General procedure:
- label for each algorithm each instance with the same metric
- map the instances onto a plane using pca
NOTE:
The terms 'algorithm' and 'config/configuration' will be used synonymous
throughout the class.
"""
def __init__(self,
rh: RunHistory,
train_inst_feat,
test_inst_feat,
algorithms,
cutoff=np.inf,
output_dir=None,
rng=None):
"""
Parameters
----------
rh: RunHistory
runhistory to take cost from
train_inst_feat, test_inst_feat: dict[str->np.array]
instances names mapped to features
algorithms: List[Tuple(Configuration, str)]
list with configs and descriptive names
cutoff: int
cutoff (if available)
output_dir: str
output directory
"""
self.logger = logging.getLogger(self.__module__ + '.' + self.__class__.__name__)
self.output_dir = output_dir
self.rng = rng
if not self.rng:
self.logger.debug("No randomstate passed. Generate deterministic random state.")
self.rng = np.random.RandomState(42)
self.rh = rh
self.train_feats = train_inst_feat
self.test_feats = test_inst_feat
self.inst_to_feat = {**train_inst_feat, **test_inst_feat}
# This is the order of instances:
self.insts = list(train_inst_feat.keys()) + list(test_inst_feat.keys())
if self.output_dir and not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
self.algorithms = [config for config, name in algorithms] # Configuration-objects
self.algo_name = {algo: name for algo, name in algorithms} # Mapping config to name
self.name_algo = {name: algo for algo, name in algorithms} # and vice versa
if len(self.algo_name.keys()) < len(self.name_algo.keys()):
raise ValueError("Default and Incumbent are equal or some other error occured. Deactivate "
"algorithm-footprints with --no_algorithm_footprint")
self.algo_labels = {} # Maps algo -> label (good and bad)
self.features = np.array([self.inst_to_feat[k] for k in self.insts])
self.features_2d = self._reduce_dim(self.features, 2)
self.features_3d = self._reduce_dim(self.features, 3)
# self.clusters, self.cluster_dict = self.get_clusters(self.features_2d)
self.cutoff = cutoff
self._label_instances()
[docs] def _reduce_dim(self, feature_array, n=2):
""" Expects feature-array (not dict!), performs a PCA
Parameters
----------
feature_array: np.array
array containing features in order of self.inst_names
n: int
target dimension for pca, 2 or 3
Returns
-------
feature_array_nd: np.array
array with pca'ed features (n-dimensional)
"""
if n not in [2, 3]:
raise ValueError("Only 2 and 3 supported as target dimension!")
if feature_array.shape[1] > n:
self.logger.debug("Use PCA to reduce features to %d dimensions", n)
feature_array = StandardScaler().fit_transform(feature_array)
feature_array = PCA(n_components=n).fit_transform(feature_array)
return feature_array
[docs] def _get_cost(self, algorithm, instance=None):
"""
Return cost according to (possibly EPM-)validated runhistory.
Parameters
----------
algorithm: Configuration
config
instance: str
instance name
"""
if not hasattr(self, '__algo_cost'):
self.__algo_cost = {} # Use function self._get_cost!! Maps algo -> {instance -> cost}
if algorithm not in self.__algo_cost:
#self.logger.debug("Getting cost for %s, using PAR1-score", self.algo_name[algorithm])
self.__algo_cost[algorithm] = get_cost_dict_for_config(self.rh, algorithm)
if instance:
return self.__algo_cost[algorithm][instance]
else:
return self.__algo_cost[algorithm]
[docs] def _label_instances(self, epsilon=0.95):
"""
Returns dictionary with a label for each instance.
Returns
-------
labels: Dict[str:float]
maps instance-names (strings) to label (floats)
"""
start = time.time()
if len(self.algo_labels) > 0:
return
self.algo_labels = {a: {} for a in self.algo_name.keys()}
for i in self.insts:
best_cost = min([self._get_cost(a, i) for a in self.algo_name.keys()])
for a in self.algo_name.keys():
cost = self._get_cost(a, i)
# self.logger.debug("%s on \'%s\': best/this (%f/%f=%f)",
# self.algo_name[a], i,
# best_cost, cost,
# best_cost / cost)
if (cost == 0 or
(best_cost/cost >= epsilon and
(not self.cutoff or not cost >= self.cutoff))):
# Algorithm for instance is in threshhold epsilon and no timeout
label = 1
else:
label = 0
self.algo_labels[a][i] = label
self.logger.debug("Labeling instances in %.2f secs.", time.time() - start)
# -~-~-~-~ FOOTPRINT
[docs] def footprint(self, a, density_threshold, purity_threshold):
"""
Calculating the footprint within a portfolio using convex hulls that
depend on density and purity thresholds.
(algorithm 1 in Smith-Miles 2014)
We use 3 ways to refer to an instance here:
name: the name (unique!) of the instance
feat2d: the position as np.array
tup: the tuple-version of feat2d (hashable...)
Parameters
----------
a: Configuration
configuration to get footprint of
density_threshold: float
minimum density that regions must show to be merged
purity_threshold: float
minimum purity (percentage of good instance)
that regions must show to be merged
Returns
-------
footprint: float
the size of all resulting convex hulls
"""
def get_2NN(x, X):
""" Return indices in X of two nearest points in X to x. """
# map index to dist from point
dist = [(i, np.linalg.norm(tmp - x)) for i, tmp in enumerate(X)]
# sort after dist
dist = sorted(dist, key=lambda x: x[1])
# return indices of the two smallest values
return (dist[0][0], dist[1][0])
count_exceptions = 0
# -~-~ Initialise Stage
# Map inst-names to feat2d (np.array) and tup (tuple)
inst_feat2d = {i: self.features_2d[idx] for idx, i in enumerate(self.insts)}
inst_tup = {i: tuple(pos) for i, pos in inst_feat2d.items()} # noqa
# regions maps tuple(centroid) of region to inst-names in region
regions = OrderedDict()
# Instances (by name) in not_in_region
not_in_region = self.insts[:]
# Randomly select a good instance;
good = [i for i in self.insts if self.algo_labels[a][i] == 1]
if len(good) < 3:
self.logger.debug("Less than 3 good instances found in %s, footprint"
" not calculated.", self.algo_name[a])
return 0
# Repeat until no more triangles can be formed (at least 3 points left).
while (len(not_in_region) >= 3):
# Select random good instance TODO also from in_regions?!?!
rand_good = self.rng.choice(good)
try:
not_in_region.remove(rand_good) # Remove here so it's not its own nearest neighbor
except ValueError:
pass
# Form a closed region (triangle) with the two closest (smallest
# Euclidean distance in feature space) instances to
# rand_good, not already part of a triangle;
idx1, idx2 = get_2NN(inst_feat2d[rand_good],
[inst_feat2d[i] for i in not_in_region])
triangle = (rand_good, not_in_region[idx1], not_in_region[idx2]) # names
triangle_feat = np.array([inst_feat2d[i] for i in triangle])
centroid = np.sum(np.array(triangle_feat), axis=0)/len(triangle)
regions[tuple(centroid)] = triangle
for p in triangle:
try:
not_in_region.remove(p)
except ValueError:
pass
# -~-~ Merge Stage
# Repeat the Merge Stage until there are no more pairs to consider.
# If we iterated over whole list once, we are done.
stop = False
while not stop:
stop = True
centroids = list(regions.keys())
self.rng.shuffle(centroids)
# Randomly select a closed region;
for idx, cent in enumerate(centroids):
reg = regions[cent] # inst-names!
cent_array = np.array(cent) # Keys in dict are tuples!
# Find the closest closed region (minimum Euclidean
# centroid distance);
remaining_centroids = [np.array(c) for c in regions.keys() if not c == cent]
idx = get_2NN(cent_array, remaining_centroids)[0]
nearest_cent = tuple(remaining_centroids[idx])
nearest_reg = regions[nearest_cent] # inst-names!
# Check purity and density
new_reg = tuple(set(reg) | set(nearest_reg)) # names
new_reg_array = np.array([inst_feat2d[i] for i in new_reg]) # array
try:
combined_hull = spatial.ConvexHull(new_reg_array)
except spatial.qhull.QhullError:
count_exceptions += 1
continue
density = len(new_reg)/combined_hull.volume
purity = (len([i for i in reg if i in good]) +
len([i for i in nearest_reg if i in good])) / float(len(new_reg))
if density > density_threshold and purity > purity_threshold:
self.logger.debug("Purity: %f, density: %f", purity, density)
regions.pop(cent)
regions.pop(nearest_cent)
new_centroid = tuple(np.sum(new_reg_array, axis=0)/len(new_reg))
regions[new_centroid] = new_reg
stop = False
break
# We now have final regions -> return sum of individual convex hulls
area = 0
for reg in regions.values():
try:
hull = spatial.ConvexHull(np.array([inst_feat2d[p] for p in reg]))
area += hull.volume
except spatial.qhull.QhullError:
count_exceptions += 1
pass
self.logger.debug("Area for %s is %f (%d Qhull-exceptions, %d/%d good "
"insts, %d regions)",
self.algo_name[a], area, count_exceptions, len(good),
len(self.insts), len(regions))
return area
# -~-~-~ PLOTS
[docs] def _get_good_bad(self, conf, insts=[]):
""" Creates a list of indices for good and bad instances for a
configuration.
Parameters
----------
conf: Configuration
configuration for which to plot good vs bad
insts: List[str]
instances to be plotted
Returns
-------
outpath: str
output path
"""
if len(insts) == 0:
insts = self.insts
good_idx, bad_idx = [], []
for k, v in self._get_cost(conf).items():
# Only consider passed insts
if k not in insts:
continue
# Append inst-idx either to good or to bad
if self.algo_labels[conf][k] == 0:
bad_idx.append(self.insts.index(k))
else:
good_idx.append(self.insts.index(k))
assert(len(good_idx) == len(set(good_idx)))
assert(len(bad_idx) == len(set(bad_idx)))
good_idx, bad_idx = np.array(good_idx), np.array(bad_idx)
self.logger.debug("for config %s good: %d, bad: %d",
self.algo_name[conf], len(good_idx), len(bad_idx))
return (good_idx, bad_idx)
[docs] def plot_interactive_footprint(self):
"""Use bokeh to create an interactive algorithm footprint with zoom and
hover tooltips. Should avoid problems with overplotting (since we can
zoom) and provide better information about instances."""
features = np.array(self.features_2d)
instances = self.insts
runhistory = self.rh
algo = {v: k for k, v in self.algo_name.items()}
incumbent = algo['incumbent']
default = algo['default']
source = ColumnDataSource(data=dict(x=features[:, 0], y=features[:, 1]))
# Add all necessary information for incumbent and default
source.add(instances, 'instance_name')
instance_set = ['train' if i in self.train_feats.keys() else 'test' for i in instances]
source.add(instance_set, 'instance_set') # train or test
for config, name in [(incumbent, 'incumbent'), (default, 'default')]:
cost = get_cost_dict_for_config(runhistory, config)
source.add([cost[i] for i in instances], '{}_cost'.format(name))
# TODO should be in function
good, bad = self._get_good_bad(config)
color = [1 if idx in good else 0 for idx, i in enumerate(instances)]
# TODO end
color = ['blue' if c else 'red' for c in color]
self.logger.debug("%s colors: %s", name, str(color))
source.add(color, '{}_color'.format(name))
source.add(source.data['default_color'], 'color')
# Define what appears in tooltips
hover = HoverTool(tooltips=[('instance name', '@instance_name'),
('def cost', '@default_cost'),
('inc_cost', '@incumbent_cost'),
('set', '@instance_set'),
])
# Add radio-button
def_inc_callback = CustomJS(args=dict(source=source), code="""
var data = source.data;
if (cb_obj.active == 0) {
data['color'] = data['default_color'];
} else {
data['color'] = data['incumbent_color'];
}
source.change.emit();
""")
def_inc_radio_button = RadioButtonGroup(
labels=["default", "incumbent"], active=0,
callback=def_inc_callback)
# Plot
x_range = DataRange1d(bounds='auto', start=min(features[:, 0]) - 1, end=max(features[:, 0]) + 1)
y_range = DataRange1d(bounds='auto', start=min(features[:, 1]) - 1, end=max(features[:, 1]) + 1)
p = figure(plot_height=500, plot_width=600,
tools=[hover, 'save', 'wheel_zoom', 'box_zoom', 'pan', 'reset'], active_drag='box_zoom',
x_range=x_range, y_range=y_range)
# Scatter train and test individually to toggle them
train_view = CDSView(source=source, filters=[GroupFilter(column_name='instance_set', group='train')])
test_view = CDSView(source=source, filters=[GroupFilter(column_name='instance_set', group='test')])
train = p.scatter(x='x', y='y', source=source, view=train_view, color='color')
test = p.scatter(x='x', y='y', source=source, view=test_view, color='color')
p.xaxis.axis_label, p.yaxis.axis_label = 'principal component 1', 'principal component 2'
p.xaxis.axis_label_text_font_size = p.yaxis.axis_label_text_font_size = "15pt"
train_test_callback = CustomJS(args=dict(source=source, train_view=train, test_view=test), code="""
var data = source.data;
if (cb_obj.active == 0) {
train_view.visible = true;
test_view.visible = true;
} else if (cb_obj.active == 1) {
train_view.visible = true;
test_view.visible = false;
} else {
train_view.visible = false;
test_view.visible = true;
}
""")
train_test_radio_button = RadioButtonGroup(
labels=["all", "train", "test"], active=0,
callback=train_test_callback)
# Export and return
if self.output_dir:
path = os.path.join(self.output_dir, "content/images/algorithm_footprint.png")
export_bokeh(p, path, self.logger)
layout = column(p, row(widgetbox(def_inc_radio_button), widgetbox(train_test_radio_button)))
return layout
[docs] def plot3d(self):
""" Plot 3d-version of the algorithm footprint from four different
angles. """
plots = []
for a in self.algorithms:
# Plot without clustering (for all insts)
out_fns = [os.path.join(self.output_dir, 'footprint_' +
self.algo_name[a] + '_3d_{}.png'.format(i)) for i in range(4)]
self.logger.debug("Plot saved to '%s'", out_fns)
fig, ax = plt.subplots()
good_idx, bad_idx = self._get_good_bad(a)
good = np.array([self.features_3d[idx] for idx in good_idx])
bad = np.array([self.features_3d[idx] for idx in bad_idx])
axes = {0: 'principal component 1',
1: 'principal component 2',
2: 'principal component 3'}
for out_fn, axes_ordered in zip(out_fns, list(itertools.permutations([0, 1, 2]))[:len(out_fns)]):
# Plot 3d
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
x, y, z = axes_ordered
if len(good) > 0:
ax.scatter(xs=good[:, x], ys=good[:, y], zs=good[:, z], color="blue")
if len(bad) > 0:
ax.scatter(xs=bad[:, x], ys=bad[:, y], zs=bad[:, z], color="red")
ax.set_xlabel(axes[x], fontsize=12)
ax.set_ylabel(axes[y], fontsize=12)
ax.set_zlabel(axes[z], fontsize=12)
plt.tight_layout()
# for out_fn, angle in zip(out_fns, range(20, 381, 90)):
# ax.view_init(30, angle)
fig.savefig(out_fn)
plt.close(fig)
plots.append(out_fns)
return plots
# -~-~- CLUSTER
[docs] def plot_points_per_cluster(self):
""" Plot good versus bad for passed config per cluster.
Parameters
----------
conf: Configuration
configuration for which to plot good vs bad
out: str
output path
Returns
-------
outpaths: List[str]
output paths per cluster
"""
# For Development/Debug:
algo_fp_debug = os.path.join(self.output_dir, 'debug', 'algo_fp')
if not os.path.exists(algo_fp_debug):
os.makedirs(algo_fp_debug)
for e in np.hstack([np.arange(0.0, 1.0, .95), np.arange(0.96, 1.0, 0.02)]):
self._label_instances(e)
for a in self.algorithms:
# Plot without clustering (for all insts)
suffix = 'all_{:4.3f}.png'.format(e)
path = os.path.join(algo_fp_debug,
'_'.join([self.algo_name[a], suffix]))
path = self.plot2d(a, path)
self.logger.debug("Plot saved to '%s'", path)
self._label_instances()
for c in self.cluster_dict.keys():
# Plot per cluster
path = os.path.join(algo_fp_debug, 'cluster_' + str(c) + '_fp_' +
self.algo_name[a] + '_0.95.png')
path = self.plot2d(a, path, self.cluster_dict[c])
[docs] def get_clusters(self, features_2d):
""" Mapping instances to clusters, using silhouette-scores to determine
number of cluster.
Returns
-------
paths: List[str]
paths to plots
"""
# get silhouette scores for k_means with 2 to 12 clusters
# use number of clusters with highest silhouette score
best_score, best_n_clusters = -1, -1
min_clusters, max_clusters = 2, min(features_2d.shape[0], 12)
clusters = None
for n_clusters in range(min_clusters, max_clusters):
km = KMeans(n_clusters=n_clusters)
y_pred = km.fit_predict(features_2d)
score = silhouette_score(features_2d, y_pred)
if score > best_score:
best_n_clusters = n_clusters
best_score = score
clusters = y_pred
self.logger.debug("%d clusters detected using silhouette scores",
best_n_clusters)
cluster_dict = {n: [] for n in range(best_n_clusters)}
for i, c in enumerate(clusters):
cluster_dict[c].append(self.insts[i])
self.logger.debug("Distribution over clusters: %s",
str({k: len(v) for k, v in cluster_dict.items()}))
return clusters, cluster_dict