"""Helper functions for evaluation of standard concept embedding analysis experiments.
Experiment results are assumed to have a folder structure of
For details see the respective sample Sacred experiment scripts.
# Copyright (c) 2022 Continental Automotive GmbH
import os
from typing import Callable, List, Sequence, Optional, Dict, Union, Mapping, \
Iterable, Tuple
import PIL.Image
import pandas as pd
import torchvision as tv
from matplotlib import pyplot as plt
from pandas.io.formats.style import Styler
# pylint: disable=import-error,wrong-import-position
from hybrid_learning.concepts.analysis import AnalysisResult, \
from hybrid_learning.concepts.models import ConceptEmbedding
to_img: Callable = tv.transforms.ToPILImage()
to_tens: Callable = tv.transforms.ToTensor()
# ======================================
# ======================================
# ALexNet
ALEXNET_LAYERS: Tuple[str, ...] = (
'features.1', 'features.2', 'features.4', 'features.5', 'features.7',
'features.9', 'features.11', 'features.12', 'avgpool')
"""Layer IDs of pytorch AlexNet model in correct order."""
# VGG16
VGG16_LAYERS: Tuple[str, ...] = (
'features.4', 'features.6', 'features.8', 'features.9', 'features.11',
'features.13', 'features.15', 'features.16', 'features.18', 'features.20',
'features.22', 'features.23', 'features.25', 'features.27', 'features.29',
'features.30', 'avgpool')
"""Layer IDs of pytorch VGG16 model in correct order."""
# Mask R-CNN
MASK_RCNN_LAYERS: Tuple[str, ...] = (
'backbone.body.relu', 'backbone.body.maxpool', 'backbone.body.layer1',
'backbone.body.layer1.0.bn1', 'backbone.body.layer1.0.bn2',
'backbone.body.layer1.0.relu', 'backbone.body.layer1.0',
'backbone.body.layer1.1.bn1', 'backbone.body.layer1.1.bn2',
'backbone.body.layer1.1.relu', 'backbone.body.layer1.1',
'backbone.body.layer1.2.bn1', 'backbone.body.layer1.2.bn2',
'backbone.body.layer1.2.relu', 'backbone.body.layer1.2',
'backbone.body.layer2', 'backbone.body.layer2.0.bn1',
'backbone.body.layer2.0.bn2', 'backbone.body.layer2.0.relu',
'backbone.body.layer2.0', 'backbone.body.layer2.1.bn1',
'backbone.body.layer2.1.bn2', 'backbone.body.layer2.1.relu',
'backbone.body.layer2.1', 'backbone.body.layer2.2.bn1',
'backbone.body.layer2.2.bn2', 'backbone.body.layer2.2.relu',
'backbone.body.layer2.2', 'backbone.body.layer2.3.bn1',
'backbone.body.layer2.3.bn2', 'backbone.body.layer2.3.relu',
'backbone.body.layer2.3', 'backbone.body.layer3',
'backbone.body.layer3.0.bn1', 'backbone.body.layer3.0.bn2',
'backbone.body.layer3.0.relu', 'backbone.body.layer3.0',
'backbone.body.layer3.1.bn1', 'backbone.body.layer3.1.bn2',
'backbone.body.layer3.1.relu', 'backbone.body.layer3.1',
'backbone.body.layer3.2.bn1', 'backbone.body.layer3.2.bn2',
'backbone.body.layer3.2.relu', 'backbone.body.layer3.2',
'backbone.body.layer3.3.bn1', 'backbone.body.layer3.3.bn2',
'backbone.body.layer3.3.relu', 'backbone.body.layer3.3',
'backbone.body.layer3.4.bn1', 'backbone.body.layer3.4.bn2',
'backbone.body.layer3.4.relu', 'backbone.body.layer3.4',
'backbone.body.layer3.5.bn1', 'backbone.body.layer3.5.bn2',
'backbone.body.layer3.5.relu', 'backbone.body.layer3.5',
'backbone.body.layer4', 'backbone.body.layer4.0.bn1',
'backbone.body.layer4.0.bn2', 'backbone.body.layer4.0.relu',
'backbone.body.layer4.0', 'backbone.body.layer4.1.bn1',
'backbone.body.layer4.1.bn2', 'backbone.body.layer4.1.relu',
'backbone.body.layer4.1', 'backbone.body.layer4.2.bn1',
'backbone.body.layer4.2.bn2', 'backbone.body.layer4.2.relu',
'backbone.body.layer4.2', 'backbone.body.layer1.0.downsample',
'backbone.body.layer2.0.downsample', 'backbone.body.layer3.0.downsample',
'backbone.body.layer4.0.downsample', 'backbone.fpn',
'backbone.fpn.inner_blocks.0', 'backbone.fpn.inner_blocks.1',
'backbone.fpn.inner_blocks.2', 'backbone.fpn.inner_blocks.3',
'backbone.fpn.layer_blocks.0', 'backbone.fpn.layer_blocks.1',
'backbone.fpn.layer_blocks.2', 'backbone.fpn.layer_blocks.3',
"""Layer IDs of pytorch Mask R-CNN model in correct order."""
# ======================================
# ======================================
def _experiments(layer_id: str, root: str) -> List[str]:
"""List of experiment folders for given layer and root.
Ones starting with underscore are excluded."""
layer_root = os.path.join(root, "layers", layer_id)
return [os.path.join(layer_root, exp) for exp in os.listdir(layer_root)
if not exp.startswith('_')]
[docs]def analysis_root(layer_id: str, concept_name: str, root: str) -> str:
"""Get the analysis root folder path for given
``root``, ``layer_id``, and ``concept_name``."""
exps: List[str] = [exp for exp in _experiments(layer_id=layer_id, root=root)
if os.path.isdir(os.path.join(exp, concept_name))]
if len(exps) == 0:
raise ValueError(
"Concept results not found for concept {} and layer {} in root {}"
.format(concept_name, layer_id, root))
if len(exps) > 1:
raise ValueError(
"Ambiguous results for concept {}: Found experiments {}"
[os.path.join(exp, concept_name) for exp in exps]))
concept_root = os.path.join(exps[0], concept_name)
return concept_root
[docs]def get_layers(root: str, model_layers: List[str] = None,
verbose: bool = False) -> List[str]:
"""Return all available layers for the given analysis root."""
available_layers = sorted(os.listdir(os.path.join(root, "layers")))
model_layers = model_layers or available_layers
layers = [l for l in model_layers if l in available_layers]
if verbose and len(layers) != len(model_layers):
print("WARNING: Skipping non-available layers {}".format(
[l for l in model_layers if l not in available_layers]))
return layers
[docs]def get_concepts(root: str, layers: List[str] = None) -> List[str]:
"""Get all the concept names available for the given experiment root."""
layers = layers or get_layers(root)
return sorted({c for layer in layers
for exp in _experiments(layer, root=root)
for c in os.listdir(exp)})
[docs]def get_common_concepts(root: str, layers: List[str] = None) -> List[str]:
"""Get all the concept names available for ALL layers in the
given experiment ``root``."""
layers = layers or get_layers(root)
return sorted(set.intersection(*[
set(concept for exp in _experiments(layer, root=root)
for concept in os.listdir(exp))
for layer in layers]))
# ======================================
# ======================================
[docs]def get_stats(root: str) -> pd.DataFrame:
"""Stats of an analysis run as DataFrame."""
return AnalysisResult.load(root).to_pandas()
[docs]def get_best_emb_stats(root: str) -> pd.DataFrame:
"""Stats of the best embeddings of an analysis as DataFrame."""
return BestEmbeddingResult.load(root).to_pandas()
[docs]def get_embs(root: str) -> pd.Series:
"""Embeddings of an analysis as series indexed by ``layer_id``.
Each embedding is encapsulated in a 1-element list to keep pandas
analysis_results = AnalysisResult.load(root).results
return pd.Series({(layer, run): [emb]
for layer, layer_stats in analysis_results.items()
for run, (emb, s) in layer_stats.items()}).transpose()
[docs]def get_best_emb(layer_id: str, root: str) -> ConceptEmbedding:
"""Best embedding for given layer."""
best_emb_result = BestEmbeddingResult.load(root)
return best_emb_result.results[layer_id][0]
[docs]def get_vis_best_embedding(root: str) -> PIL.Image.Image:
"""Get the saved visualization of the best embedding."""
return PIL.Image.open(os.path.join(root, "vis_best_embedding.png"))
[docs]def get_all_best_emb_stats(root: str, layers: List[str],
concepts: List[str] = None) -> pd.DataFrame:
"""Gather a DataFrame with the stats of all best embeddings.
The frame is indexed by ``concept`` and ``layer``, and columns are the
stats names.
Stats are retrieved from experiment ``root``, for given ``layers`` and
concepts = concepts or get_concepts(root, layers=layers)
best_emb_stats = pd.DataFrame(
{(concept, layer): get_best_emb_stats(
analysis_root(layer, concept, root=root)).loc[layer]
for layer in layers for concept in concepts
if os.path.isdir(analysis_root(layer, concept, root=root))}
best_emb_stats.index.names = ['concept', 'layer']
return best_emb_stats
[docs]def get_all_stats(root: str, layers: List[str],
concepts: List[str] = None) -> pd.DataFrame:
"""Gather a DataFrame with the stats of all embedding runs.
Same as ``get_all_best_emb_stats``, only indexed by
``concept``, ``layer``, *and* ``run``."""
concepts = concepts or get_concepts(root, layers=layers)
stats_dict = {}
for layer in layers:
for concept in [c for c in concepts if
os.path.isdir(analysis_root(layer, c, root=root))]:
results = get_stats(analysis_root(layer, concept, root=root))
{(concept, layer, run): results.loc[(layer, run)] for run in
stats = pd.DataFrame(stats_dict).transpose().infer_objects()
stats.index.names = ['concept', 'layer', 'run']
return stats
[docs]def gather_stats(roots: Union[List[str], Dict[str, str]] = None,
root_templ: str = None,
root_params: Iterable[Union[str, Sequence[str]]] = None,
model_layers: List[str] = None,
metric: str = 'set_iou'
"""Gather dicts with all_stats, best_emb_stats, and merged_stats for
different roots."""
# Prepare roots:
if roots is not None:
if not isinstance(roots, Mapping):
roots: Dict[str, str] = {
os.path.basename(os.path.dirname(root)): root
for root in roots}
elif root_templ is None or root_params is None:
raise ValueError("Either root, or root_templ and root_params must be "
root_params = [[p] if isinstance(p, str) else p for p in root_params]
roots = {"_".join(param): root_templ.format(*param)
for param in root_params}
for root in roots.values():
assert os.path.isdir(root), root
all_stats, best_stats, merged_stats = dict(), dict(), dict()
for graph_name, root in roots.items():
layers = get_layers(root, model_layers=model_layers)
all_stats[graph_name] = get_all_stats(root=root, layers=layers)
best_stats[graph_name] = get_all_best_emb_stats(root=root,
merged_stats[graph_name] = merge_to_overview(best_stats[graph_name],
return all_stats, best_stats, merged_stats
# ======================================
# ======================================
[docs]def highlight_max_blue(series: pd.Series) -> List[str]:
"""Highlight the maximum in a Series with red font color."""
return ['color: blue; font-weight: bold' if v else ''
for v in series == series.max()]
[docs]def merge_to_overview(best_ious: pd.DataFrame, all_stats: pd.DataFrame,
metric: str = 'set_iou', layers: List[str] = None):
"""Merge the stats from all runs and from the best embeddings
to an overview over the test values of ``metric``.
The resulting DataFrame is indexed by the ``layer_id``, and columns are a
multi-index of ``(concept_name, stats_name)``, with ``stats_name`` one of
- ``best_emb``: the best embedding performance
- ``mean``: the mean performance of the runs for that concept & layer
- ``std``: the corresponding standard deviation
:param all_stats: should an output of ``get_all_stats``
:param best_ious: should an output of ``get_all_best_emb_stats``
:param metric: the (column) name of the metric to use
:param layers: optionally restrict to given layers
if all_stats.index.names != ['layer', 'concept']:
all_stats = all_stats.set_index(['layer', 'concept'])
if best_ious.index.names != ['layer', 'concept']:
best_ious = best_ious.set_index(['layer', 'concept'])
layers = layers or all_stats.index.get_level_values('layer').unique()
best_set_iou_display = pd.concat({
'best_emb': best_ious[['test_' + metric]],
'mean': all_stats[['test_' + metric]].mean(level=[0, 1]),
'std': all_stats[['test_' + metric]].std(level=[0, 1]),
}, axis=1) \
.T.reset_index(level=1, drop=True).T \
.unstack(level=1) \
.swaplevel(0, 1, 1).sort_index(1) \
return best_set_iou_display
[docs]def display_overview(best_set_iou_display: pd.DataFrame) -> Styler:
"""Provide a nice display style of ``best_set_iou_display``.
E.g. highlight maxima, apply color gradients, and take care of precision.
:param best_set_iou_display:
best_emb_cols = best_set_iou_display.loc[:,
(slice(None), 'best_emb')].columns.unique()
std_cols = best_set_iou_display.loc[:, (slice(None), 'std')] \
mean_cols = best_set_iou_display.loc[:, (slice(None), 'mean')] \
return best_set_iou_display.style \
.set_caption("Set IoU values by layer and concept (both the set IoU of "
"the mean embedding and the mean set IoU with standard "
"deviation)") \
.set_precision(4) \
.background_gradient(cmap='Greens', axis=0, subset=best_emb_cols,
vmin=0, vmax=0.5) \
subset=list(best_emb_cols) + list(mean_cols)) \
.format({col: "±{:.4f}" for col in std_cols})
# ======================================
# ======================================
[docs]def plot_best_ious_wt_std(best_emb_ious: pd.DataFrame,
iou_stds: pd.DataFrame, save_as: str = None,
max_val: float = None, metric: str = None,
"""Provide a plot of metric values against layers for all concepts in
Plotted points are the best embedding performances, the plotted standard
deviation is that of the runs. Optionally save the figure."""
plot_args = {**dict(capsize=3, rot=90,
xticks=range(len(best_emb_ious.index))), **plot_args}
layers: Sequence[str] = best_emb_ious.index
max_val = max_val if max_val is not None else \
best_emb_ious.max().max() + 0.01
plt.figure(figsize=(len(layers) * 0.5, max_val * 8))
plt.title("Embedding performance")
axis: Optional[plt.Axes] = None
for concept in best_emb_ious.columns.get_level_values(0).unique():
axis = best_emb_ious[(concept, 'best_emb')] \
.plot(yerr=iou_stds[concept, 'std'], **plot_args)
plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
if metric is not None:
axis.set_ylabel(metric.replace("_", " "))
if save_as is not None:
plt.savefig(save_as, bbox_inches='tight')
[docs]def plot_best_iou_comparison(metric_mean_map: Dict[str, pd.DataFrame],
metric_std_map: Dict[str, pd.DataFrame],
concept: str = None,
max_val: float = None,
save_as: str = None,
metric: str = 'set_iou',
axis: plt.Axes = None,
set_title: bool = True,
"""Provide a plot of metric values against layers for one concept and all
settings in ``best_emb_ious``.
Plotted points are the best embedding performances, the plotted standard
deviation is that of the runs. Optionally save the figure."""
layers: Sequence[str] = list(metric_mean_map.values())[0].index
concept: str = concept or list(metric_mean_map.values())[0] \
max_val: float = max(max_val or 0,
max(ious.max().max() + 0.01 for ious in
fig = None
if axis is None:
fig = plt.figure(figsize=(len(layers) * 0.5, max_val * 12))
if set_title:
if set_title:
for graph_name, best_emb_ious in metric_mean_map.items():
values: pd.Series = pd.Series(best_emb_ious[(concept, 'best_emb')],
# Make sure layers are all aligned!!
values.sort_index(key=lambda idx: [list(layers).index(i) for i in idx],
values.name = " ".join(str(values.name).split("_"))
plot_args = {**dict(rot=90, capsize=3, xticks=range(len(values.index))),
axis = values.plot(yerr=metric_std_map[graph_name][concept, 'std'],
ax=axis, **plot_args)
axis.set_ylabel(metric.replace("_", " "))
if fig is not None:
axis.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
if save_as is not None:
plt.savefig(save_as, bbox_inches='tight')
[docs]def plot_overview(best_set_iou_display: pd.DataFrame,
model_name: str = None, metric: str = 'set_iou',
max_val: float = None, save_as: str = None,
"""Plot and save the output of ``merge_to_overview`` using
:param best_set_iou_display: DataFrame in the format as provided by
:param model_name: used to determine default for save_as
:param metric: used to determine default for save_as
:param max_val: see ``plot_best_ious_wt_std``
:param save_as: the filename to save plot as; no saving if set to ``False``;
if a directory or None, a default file name is used (under directory);
if ends with ``.svg``, both an SVG and a PNG image are saved
# Default save position
root = None
if save_as and os.path.isdir(save_as):
root, save_as = save_as, None
model_name = None if root is None else model_name or os.path.basename(root)
if save_as is None:
save_as: Optional[str] = None if root is None else \
os.path.join(root, f"{model_name}_{metric}.svg")
save_as = save_as or None
best_emb_ious=best_set_iou_display.loc[:, (slice(None), 'best_emb')],
iou_stds=best_set_iou_display.loc[:, (slice(None), 'std')],
save_as=save_as, max_val=max_val, metric=metric, **plot_args
best_emb_ious=best_set_iou_display.loc[:, (slice(None), 'best_emb')],
iou_stds=best_set_iou_display.loc[:, (slice(None), 'std')],
save_as=save_as.replace(".svg", ".png") if save_as else None,
max_val=max_val, metric=metric, **plot_args
[docs]def plot_overview_for_concept(merged_stats_map: Dict[str, pd.DataFrame],
concepts: Union[str, Iterable[str]],
max_val: float = None,
save_as: str = None, model_name: str = None,
metric: str = 'set_iou',
one_plot: bool = True, legend_in: int = -1,
fig_size: Tuple[float, float] = None,
axes: Optional[Sequence[plt.Axes]] = None,
**plot_args): # pylint: disable=too-many-branches
"""Plot and save the output of ``merge_to_overview`` using
:param merged_stats_map: dict with values the output of merge_to_overview;
each value will get its own graph
:param concepts: the concept or concepts to plot
:param max_val: the common x-axis limit for all created plots;
defaults to the maximum metric value for a concept
:param save_as: an optional save location;
must contain ``{concept}`` if ``one_plot`` is not ``True``;
if a directory is given, the file name is auto-inferred from the
other arguments
:param model_name: for file name auto-inference
:param metric: for axis label and file name auto-inference
:param one_plot: whether to put all into one common plot
(save_as must not contain formatting string for concept)
:param legend_in: if ``one_plot``, index of the axis the legend should be
placed in; if set to None, legend is placed outside of plot
:param axes: plot into given axes
:param fig_size: if ``one_plot``, used ``fig_size``
assert axes is None or save_as is None
# Collect data
metric_mean_map, metric_std_map = dict(), dict()
for graph_name, merged_stats in merged_stats_map.items():
metric_mean_map[graph_name] = \
merged_stats.loc[:, (slice(None), 'best_emb')]
metric_std_map[graph_name] = \
merged_stats.loc[:, (slice(None), 'std')]
max_val_by_concept: Dict[str, float] = {
concept: max(max_val or 0,
max(vals.loc[:, (concept, 'best_emb')].max().max() + 0.01
for vals in metric_mean_map.values()))
for concept in concepts}
if save_as is not None and os.path.isdir(save_as):
save_as: str = os.path.join(
save_as, f"{model_name or ''}_{metric or ''}"
f"{'' if one_plot else '_{concept}'}.svg")
if one_plot and axes is None:
layers: Sequence[str] = list(metric_mean_map.values())[0].index
max_val: float = max(max_val_by_concept.values())
max_val_by_concept: Dict[str, float] = {c: max_val for c in concepts}
_, axes = plt.subplots(
1, len(concepts), sharey='all',
figsize=fig_size or
(len(layers) * len(concepts) * 0.7, max_val * 10))
if isinstance(axes, plt.Axes):
axes: Sequence[plt.Axes] = [axes]
for i, concept in enumerate(concepts):
assert all(concept in means.columns.get_level_values('concept')
for means in metric_mean_map.values())
assert all(concept in stds.columns.get_level_values('concept')
for stds in metric_std_map.values())
max_val: float = max_val_by_concept[concept]
curr_save_as: Optional[str] = save_as.format(concept=concept) \
if save_as is not None and not one_plot else None
axis: Optional[plt.Axes] = axes[i] if axes is not None else None
if curr_save_as is not None and curr_save_as.endswith('.svg'):
concept=concept, max_val=max_val, metric=metric,
save_as=(curr_save_as.replace('.svg', '.png') if curr_save_as
else None),
axis=axis, **plot_args
if not axis:
concept=concept, max_val=max_val, metric=metric,
axis=axis, **plot_args
if not axis:
if one_plot:
# handles, labels = axes[-1].get_legend_handles_labels()
# fig.legend(handles, labels, loc='center left', bbox_to_anchor=(.9,0.5))
if legend_in is not None:
axes[-1].legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
if save_as is not None:
if save_as.endswith('.svg'):
plt.savefig(save_as.replace('.svg', '.png'),
plt.savefig(save_as, bbox_inches='tight')