Source code for hybrid_learning.experimentation.ca_exp_eval

"""Helper functions for evaluation of standard concept embedding analysis experiments.
Experiment results are assumed to have a folder structure of
``<root>/layers/<layer_id>/<timestamp>/<concept_name>``.
For details see the respective sample Sacred experiment scripts.
"""
#  Copyright (c) 2022 Continental Automotive GmbH

import os
from typing import Callable, List, Sequence, Optional, Dict, Union, Mapping, \
    Iterable, Tuple

import PIL.Image
import pandas as pd
import torchvision as tv
from matplotlib import pyplot as plt
from pandas.io.formats.style import Styler

# pylint: disable=import-error,wrong-import-position
from hybrid_learning.concepts.analysis import AnalysisResult, \
    BestEmbeddingResult
from hybrid_learning.concepts.models import ConceptEmbedding

to_img: Callable = tv.transforms.ToPILImage()
to_tens: Callable = tv.transforms.ToTensor()

# ======================================
# DEFAULT MODEL LAYERS
# ======================================

# ALexNet
ALEXNET_LAYERS: Tuple[str, ...] = (
    'features.1', 'features.2', 'features.4', 'features.5', 'features.7',
    'features.9', 'features.11', 'features.12', 'avgpool')
"""Layer IDs of pytorch AlexNet model in correct order."""

# VGG16
VGG16_LAYERS: Tuple[str, ...] = (
    'features.4', 'features.6', 'features.8', 'features.9', 'features.11',
    'features.13', 'features.15', 'features.16', 'features.18', 'features.20',
    'features.22', 'features.23', 'features.25', 'features.27', 'features.29',
    'features.30', 'avgpool')
"""Layer IDs of pytorch VGG16 model in correct order."""

# Mask R-CNN
MASK_RCNN_LAYERS: Tuple[str, ...] = (
    'backbone.body.relu', 'backbone.body.maxpool', 'backbone.body.layer1',
    'backbone.body.layer1.0.bn1', 'backbone.body.layer1.0.bn2',
    'backbone.body.layer1.0.relu', 'backbone.body.layer1.0',
    'backbone.body.layer1.1.bn1', 'backbone.body.layer1.1.bn2',
    'backbone.body.layer1.1.relu', 'backbone.body.layer1.1',
    'backbone.body.layer1.2.bn1', 'backbone.body.layer1.2.bn2',
    'backbone.body.layer1.2.relu', 'backbone.body.layer1.2',
    'backbone.body.layer2', 'backbone.body.layer2.0.bn1',
    'backbone.body.layer2.0.bn2', 'backbone.body.layer2.0.relu',
    'backbone.body.layer2.0', 'backbone.body.layer2.1.bn1',
    'backbone.body.layer2.1.bn2', 'backbone.body.layer2.1.relu',
    'backbone.body.layer2.1', 'backbone.body.layer2.2.bn1',
    'backbone.body.layer2.2.bn2', 'backbone.body.layer2.2.relu',
    'backbone.body.layer2.2', 'backbone.body.layer2.3.bn1',
    'backbone.body.layer2.3.bn2', 'backbone.body.layer2.3.relu',
    'backbone.body.layer2.3', 'backbone.body.layer3',
    'backbone.body.layer3.0.bn1', 'backbone.body.layer3.0.bn2',
    'backbone.body.layer3.0.relu', 'backbone.body.layer3.0',
    'backbone.body.layer3.1.bn1', 'backbone.body.layer3.1.bn2',
    'backbone.body.layer3.1.relu', 'backbone.body.layer3.1',
    'backbone.body.layer3.2.bn1', 'backbone.body.layer3.2.bn2',
    'backbone.body.layer3.2.relu', 'backbone.body.layer3.2',
    'backbone.body.layer3.3.bn1', 'backbone.body.layer3.3.bn2',
    'backbone.body.layer3.3.relu', 'backbone.body.layer3.3',
    'backbone.body.layer3.4.bn1', 'backbone.body.layer3.4.bn2',
    'backbone.body.layer3.4.relu', 'backbone.body.layer3.4',
    'backbone.body.layer3.5.bn1', 'backbone.body.layer3.5.bn2',
    'backbone.body.layer3.5.relu', 'backbone.body.layer3.5',
    'backbone.body.layer4', 'backbone.body.layer4.0.bn1',
    'backbone.body.layer4.0.bn2', 'backbone.body.layer4.0.relu',
    'backbone.body.layer4.0', 'backbone.body.layer4.1.bn1',
    'backbone.body.layer4.1.bn2', 'backbone.body.layer4.1.relu',
    'backbone.body.layer4.1', 'backbone.body.layer4.2.bn1',
    'backbone.body.layer4.2.bn2', 'backbone.body.layer4.2.relu',
    'backbone.body.layer4.2', 'backbone.body.layer1.0.downsample',
    'backbone.body.layer2.0.downsample', 'backbone.body.layer3.0.downsample',
    'backbone.body.layer4.0.downsample', 'backbone.fpn',
    'backbone.fpn.inner_blocks.0', 'backbone.fpn.inner_blocks.1',
    'backbone.fpn.inner_blocks.2', 'backbone.fpn.inner_blocks.3',
    'backbone.fpn.layer_blocks.0', 'backbone.fpn.layer_blocks.1',
    'backbone.fpn.layer_blocks.2', 'backbone.fpn.layer_blocks.3',
    'backbone.fpn.extra_blocks')
"""Layer IDs of pytorch Mask R-CNN model in correct order."""


# ======================================
# FILEPATHS
# ======================================


def _experiments(layer_id: str, root: str) -> List[str]:
    """List of experiment folders for given layer and root.
    Ones starting with underscore are excluded."""
    layer_root = os.path.join(root, "layers", layer_id)
    return [os.path.join(layer_root, exp) for exp in os.listdir(layer_root)
            if not exp.startswith('_')]


[docs]def analysis_root(layer_id: str, concept_name: str, root: str) -> str: """Get the analysis root folder path for given ``root``, ``layer_id``, and ``concept_name``.""" exps: List[str] = [exp for exp in _experiments(layer_id=layer_id, root=root) if os.path.isdir(os.path.join(exp, concept_name))] if len(exps) == 0: raise ValueError( "Concept results not found for concept {} and layer {} in root {}" .format(concept_name, layer_id, root)) if len(exps) > 1: raise ValueError( "Ambiguous results for concept {}: Found experiments {}" .format(concept_name, [os.path.join(exp, concept_name) for exp in exps])) concept_root = os.path.join(exps[0], concept_name) return concept_root
[docs]def get_layers(root: str, model_layers: List[str] = None, verbose: bool = False) -> List[str]: """Return all available layers for the given analysis root.""" available_layers = sorted(os.listdir(os.path.join(root, "layers"))) model_layers = model_layers or available_layers layers = [l for l in model_layers if l in available_layers] if verbose and len(layers) != len(model_layers): print("WARNING: Skipping non-available layers {}".format( [l for l in model_layers if l not in available_layers])) return layers
[docs]def get_concepts(root: str, layers: List[str] = None) -> List[str]: """Get all the concept names available for the given experiment root.""" layers = layers or get_layers(root) return sorted({c for layer in layers for exp in _experiments(layer, root=root) for c in os.listdir(exp)})
[docs]def get_common_concepts(root: str, layers: List[str] = None) -> List[str]: """Get all the concept names available for ALL layers in the given experiment ``root``.""" layers = layers or get_layers(root) return sorted(set.intersection(*[ set(concept for exp in _experiments(layer, root=root) for concept in os.listdir(exp)) for layer in layers]))
# ====================================== # RESULTS RETRIEVAL # ======================================
[docs]def get_stats(root: str) -> pd.DataFrame: """Stats of an analysis run as DataFrame.""" return AnalysisResult.load(root).to_pandas()
[docs]def get_best_emb_stats(root: str) -> pd.DataFrame: """Stats of the best embeddings of an analysis as DataFrame.""" return BestEmbeddingResult.load(root).to_pandas()
[docs]def get_embs(root: str) -> pd.Series: """Embeddings of an analysis as series indexed by ``layer_id``. Each embedding is encapsulated in a 1-element list to keep pandas working.""" analysis_results = AnalysisResult.load(root).results return pd.Series({(layer, run): [emb] for layer, layer_stats in analysis_results.items() for run, (emb, s) in layer_stats.items()}).transpose()
[docs]def get_best_emb(layer_id: str, root: str) -> ConceptEmbedding: """Best embedding for given layer.""" best_emb_result = BestEmbeddingResult.load(root) return best_emb_result.results[layer_id][0]
[docs]def get_vis_best_embedding(root: str) -> PIL.Image.Image: """Get the saved visualization of the best embedding.""" return PIL.Image.open(os.path.join(root, "vis_best_embedding.png"))
[docs]def get_all_best_emb_stats(root: str, layers: List[str], concepts: List[str] = None) -> pd.DataFrame: """Gather a DataFrame with the stats of all best embeddings. The frame is indexed by ``concept`` and ``layer``, and columns are the stats names. Stats are retrieved from experiment ``root``, for given ``layers`` and ``concepts``.""" concepts = concepts or get_concepts(root, layers=layers) best_emb_stats = pd.DataFrame( {(concept, layer): get_best_emb_stats( analysis_root(layer, concept, root=root)).loc[layer] for layer in layers for concept in concepts if os.path.isdir(analysis_root(layer, concept, root=root))} ).transpose().infer_objects() best_emb_stats.index.names = ['concept', 'layer'] best_emb_stats.reset_index(inplace=True) return best_emb_stats
[docs]def get_all_stats(root: str, layers: List[str], concepts: List[str] = None) -> pd.DataFrame: """Gather a DataFrame with the stats of all embedding runs. Same as ``get_all_best_emb_stats``, only indexed by ``concept``, ``layer``, *and* ``run``.""" concepts = concepts or get_concepts(root, layers=layers) stats_dict = {} for layer in layers: for concept in [c for c in concepts if os.path.isdir(analysis_root(layer, c, root=root))]: results = get_stats(analysis_root(layer, concept, root=root)) stats_dict.update( {(concept, layer, run): results.loc[(layer, run)] for run in results.index.get_level_values(1).unique()}) stats = pd.DataFrame(stats_dict).transpose().infer_objects() stats.index.names = ['concept', 'layer', 'run'] stats.reset_index(inplace=True) return stats
[docs]def gather_stats(roots: Union[List[str], Dict[str, str]] = None, root_templ: str = None, root_params: Iterable[Union[str, Sequence[str]]] = None, model_layers: List[str] = None, metric: str = 'set_iou' ): """Gather dicts with all_stats, best_emb_stats, and merged_stats for different roots.""" # Prepare roots: if roots is not None: if not isinstance(roots, Mapping): roots: Dict[str, str] = { os.path.basename(os.path.dirname(root)): root for root in roots} elif root_templ is None or root_params is None: raise ValueError("Either root, or root_templ and root_params must be " "given") else: root_params = [[p] if isinstance(p, str) else p for p in root_params] roots = {"_".join(param): root_templ.format(*param) for param in root_params} for root in roots.values(): assert os.path.isdir(root), root all_stats, best_stats, merged_stats = dict(), dict(), dict() for graph_name, root in roots.items(): layers = get_layers(root, model_layers=model_layers) all_stats[graph_name] = get_all_stats(root=root, layers=layers) best_stats[graph_name] = get_all_best_emb_stats(root=root, layers=layers) merged_stats[graph_name] = merge_to_overview(best_stats[graph_name], all_stats[graph_name], metric=metric) return all_stats, best_stats, merged_stats
# ====================================== # PANDAS DISPLAY # ======================================
[docs]def highlight_max_blue(series: pd.Series) -> List[str]: """Highlight the maximum in a Series with red font color.""" return ['color: blue; font-weight: bold' if v else '' for v in series == series.max()]
[docs]def merge_to_overview(best_ious: pd.DataFrame, all_stats: pd.DataFrame, metric: str = 'set_iou', layers: List[str] = None): """Merge the stats from all runs and from the best embeddings to an overview over the test values of ``metric``. The resulting DataFrame is indexed by the ``layer_id``, and columns are a multi-index of ``(concept_name, stats_name)``, with ``stats_name`` one of - ``best_emb``: the best embedding performance - ``mean``: the mean performance of the runs for that concept & layer - ``std``: the corresponding standard deviation :param all_stats: should an output of ``get_all_stats`` :param best_ious: should an output of ``get_all_best_emb_stats`` :param metric: the (column) name of the metric to use :param layers: optionally restrict to given layers """ if all_stats.index.names != ['layer', 'concept']: all_stats = all_stats.set_index(['layer', 'concept']) if best_ious.index.names != ['layer', 'concept']: best_ious = best_ious.set_index(['layer', 'concept']) layers = layers or all_stats.index.get_level_values('layer').unique() best_set_iou_display = pd.concat({ 'best_emb': best_ious[['test_' + metric]], 'mean': all_stats[['test_' + metric]].mean(level=[0, 1]), 'std': all_stats[['test_' + metric]].std(level=[0, 1]), }, axis=1) \ .T.reset_index(level=1, drop=True).T \ .unstack(level=1) \ .swaplevel(0, 1, 1).sort_index(1) \ .loc[layers] return best_set_iou_display
[docs]def display_overview(best_set_iou_display: pd.DataFrame) -> Styler: """Provide a nice display style of ``best_set_iou_display``. E.g. highlight maxima, apply color gradients, and take care of precision. :param best_set_iou_display: """ best_emb_cols = best_set_iou_display.loc[:, (slice(None), 'best_emb')].columns.unique() std_cols = best_set_iou_display.loc[:, (slice(None), 'std')] \ .columns.unique() mean_cols = best_set_iou_display.loc[:, (slice(None), 'mean')] \ .columns.unique() return best_set_iou_display.style \ .set_caption("Set IoU values by layer and concept (both the set IoU of " "the mean embedding and the mean set IoU with standard " "deviation)") \ .set_precision(4) \ .background_gradient(cmap='Greens', axis=0, subset=best_emb_cols, vmin=0, vmax=0.5) \ .apply(highlight_max_blue, subset=list(best_emb_cols) + list(mean_cols)) \ .format({col: {:.4f}" for col in std_cols})
# ====================================== # PLOTTING # ======================================
[docs]def plot_best_ious_wt_std(best_emb_ious: pd.DataFrame, iou_stds: pd.DataFrame, save_as: str = None, max_val: float = None, metric: str = None, **plot_args): """Provide a plot of metric values against layers for all concepts in ``best_emb_ious``. Plotted points are the best embedding performances, the plotted standard deviation is that of the runs. Optionally save the figure.""" plot_args = {**dict(capsize=3, rot=90, xticks=range(len(best_emb_ious.index))), **plot_args} layers: Sequence[str] = best_emb_ious.index max_val = max_val if max_val is not None else \ best_emb_ious.max().max() + 0.01 plt.figure(figsize=(len(layers) * 0.5, max_val * 8)) plt.ylim(top=max_val) plt.title("Embedding performance") axis: Optional[plt.Axes] = None for concept in best_emb_ious.columns.get_level_values(0).unique(): axis = best_emb_ious[(concept, 'best_emb')] \ .plot(yerr=iou_stds[concept, 'std'], **plot_args) plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) axis.set_xticklabels(layers) if metric is not None: axis.set_ylabel(metric.replace("_", " ")) if save_as is not None: plt.savefig(save_as, bbox_inches='tight')
[docs]def plot_best_iou_comparison(metric_mean_map: Dict[str, pd.DataFrame], metric_std_map: Dict[str, pd.DataFrame], concept: str = None, max_val: float = None, save_as: str = None, metric: str = 'set_iou', axis: plt.Axes = None, set_title: bool = True, **plot_args): """Provide a plot of metric values against layers for one concept and all settings in ``best_emb_ious``. Plotted points are the best embedding performances, the plotted standard deviation is that of the runs. Optionally save the figure.""" layers: Sequence[str] = list(metric_mean_map.values())[0].index concept: str = concept or list(metric_mean_map.values())[0] \ .columns.get_level_values(0).unique()[0] max_val: float = max(max_val or 0, max(ious.max().max() + 0.01 for ious in metric_mean_map.values())) fig = None if axis is None: fig = plt.figure(figsize=(len(layers) * 0.5, max_val * 12)) if set_title: plt.title(concept) plt.ylim(top=max_val) else: if set_title: axis.set_title(concept) axis.set_ylim(top=max_val) for graph_name, best_emb_ious in metric_mean_map.items(): values: pd.Series = pd.Series(best_emb_ious[(concept, 'best_emb')], name=graph_name) # Make sure layers are all aligned!! values.sort_index(key=lambda idx: [list(layers).index(i) for i in idx], inplace=True) values.name = " ".join(str(values.name).split("_")) plot_args = {**dict(rot=90, capsize=3, xticks=range(len(values.index))), **plot_args} axis = values.plot(yerr=metric_std_map[graph_name][concept, 'std'], ax=axis, **plot_args) axis.set_xticklabels(layers) axis.set_ylabel(metric.replace("_", " ")) if fig is not None: axis.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) if save_as is not None: plt.savefig(save_as, bbox_inches='tight')
[docs]def plot_overview(best_set_iou_display: pd.DataFrame, model_name: str = None, metric: str = 'set_iou', max_val: float = None, save_as: str = None, **plot_args): """Plot and save the output of ``merge_to_overview`` using ``plot_best_ious_wt_std``. :param best_set_iou_display: DataFrame in the format as provided by ``merge_to_overview`` :param model_name: used to determine default for save_as :param metric: used to determine default for save_as :param max_val: see ``plot_best_ious_wt_std`` :param save_as: the filename to save plot as; no saving if set to ``False``; if a directory or None, a default file name is used (under directory); if ends with ``.svg``, both an SVG and a PNG image are saved """ # Default save position root = None if save_as and os.path.isdir(save_as): root, save_as = save_as, None model_name = None if root is None else model_name or os.path.basename(root) if save_as is None: save_as: Optional[str] = None if root is None else \ os.path.join(root, f"{model_name}_{metric}.svg") save_as = save_as or None plot_best_ious_wt_std( best_emb_ious=best_set_iou_display.loc[:, (slice(None), 'best_emb')], iou_stds=best_set_iou_display.loc[:, (slice(None), 'std')], save_as=save_as, max_val=max_val, metric=metric, **plot_args ) plt.close() plot_best_ious_wt_std( best_emb_ious=best_set_iou_display.loc[:, (slice(None), 'best_emb')], iou_stds=best_set_iou_display.loc[:, (slice(None), 'std')], save_as=save_as.replace(".svg", ".png") if save_as else None, max_val=max_val, metric=metric, **plot_args ) plt.show()
[docs]def plot_overview_for_concept(merged_stats_map: Dict[str, pd.DataFrame], concepts: Union[str, Iterable[str]], max_val: float = None, save_as: str = None, model_name: str = None, metric: str = 'set_iou', one_plot: bool = True, legend_in: int = -1, fig_size: Tuple[float, float] = None, axes: Optional[Sequence[plt.Axes]] = None, **plot_args): # pylint: disable=too-many-branches """Plot and save the output of ``merge_to_overview`` using ``plot_best_ious_wt_std``. :param merged_stats_map: dict with values the output of merge_to_overview; each value will get its own graph :param concepts: the concept or concepts to plot :param max_val: the common x-axis limit for all created plots; defaults to the maximum metric value for a concept :param save_as: an optional save location; must contain ``{concept}`` if ``one_plot`` is not ``True``; if a directory is given, the file name is auto-inferred from the other arguments :param model_name: for file name auto-inference :param metric: for axis label and file name auto-inference :param one_plot: whether to put all into one common plot (save_as must not contain formatting string for concept) :param legend_in: if ``one_plot``, index of the axis the legend should be placed in; if set to None, legend is placed outside of plot :param axes: plot into given axes :param fig_size: if ``one_plot``, used ``fig_size`` """ assert axes is None or save_as is None # Collect data metric_mean_map, metric_std_map = dict(), dict() for graph_name, merged_stats in merged_stats_map.items(): metric_mean_map[graph_name] = \ merged_stats.loc[:, (slice(None), 'best_emb')] metric_std_map[graph_name] = \ merged_stats.loc[:, (slice(None), 'std')] max_val_by_concept: Dict[str, float] = { concept: max(max_val or 0, max(vals.loc[:, (concept, 'best_emb')].max().max() + 0.01 for vals in metric_mean_map.values())) for concept in concepts} if save_as is not None and os.path.isdir(save_as): save_as: str = os.path.join( save_as, f"{model_name or ''}_{metric or ''}" f"{'' if one_plot else '_{concept}'}.svg") if one_plot and axes is None: layers: Sequence[str] = list(metric_mean_map.values())[0].index max_val: float = max(max_val_by_concept.values()) max_val_by_concept: Dict[str, float] = {c: max_val for c in concepts} _, axes = plt.subplots( 1, len(concepts), sharey='all', figsize=fig_size or (len(layers) * len(concepts) * 0.7, max_val * 10)) if isinstance(axes, plt.Axes): axes: Sequence[plt.Axes] = [axes] for i, concept in enumerate(concepts): assert all(concept in means.columns.get_level_values('concept') for means in metric_mean_map.values()) assert all(concept in stds.columns.get_level_values('concept') for stds in metric_std_map.values()) max_val: float = max_val_by_concept[concept] curr_save_as: Optional[str] = save_as.format(concept=concept) \ if save_as is not None and not one_plot else None axis: Optional[plt.Axes] = axes[i] if axes is not None else None if curr_save_as is not None and curr_save_as.endswith('.svg'): plot_best_iou_comparison( metric_mean_map=metric_mean_map, metric_std_map=metric_std_map, concept=concept, max_val=max_val, metric=metric, save_as=(curr_save_as.replace('.svg', '.png') if curr_save_as else None), axis=axis, **plot_args ) if not axis: plt.close() plot_best_iou_comparison( metric_mean_map=metric_mean_map, metric_std_map=metric_std_map, concept=concept, max_val=max_val, metric=metric, save_as=curr_save_as, axis=axis, **plot_args ) if not axis: plt.show() if one_plot: # handles, labels = axes[-1].get_legend_handles_labels() # fig.legend(handles, labels, loc='center left', bbox_to_anchor=(.9,0.5)) if legend_in is not None: axes[legend_in].legend() else: axes[-1].legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) if save_as is not None: if save_as.endswith('.svg'): plt.savefig(save_as.replace('.svg', '.png'), bbox_inches='tight') plt.savefig(save_as, bbox_inches='tight')