Source code for MiLoMerge.merging.bin_optimizer

import warnings
import os

from abc import ABC, abstractmethod
import numpy as np
import tqdm
import numba as nb
import h5py
from collections.abc import Iterable
import typing


@nb.njit(
    "float64(int64, Array(float64, 2, 'C'), Array(float64, 2, 'C'), int64, int64, b1)",
    cache=True,
    parallel=False,
)
def mlm_driver(
    n: int,
    counts: Iterable[Iterable[float]],
    weights: Iterable[Iterable[float]],
    b: int,
    b_prime: int,
    comp_to_first: bool,
) -> float:
    """This method uses the MLM metric to compare all samples to each other
    and issue a score between the two bin indices.

    Parameters
    ----------
    n : int
        The number of samples being dealt with
    counts : numpy.ndarray
        An ndarray of the counts with shape (#samples, #bins)
    weights : numpy.ndarray
        An ndarray of per-sample weight with size (#samples)
    b : int
        The index of the first bin that is being calculated
    b_prime : int
        The index of the second bin that is being calculated
    comp_to_first : bool
        Whether comparisons are only done between every sample and sample 0

    Returns
    -------
    float
        The score, as prescribed by the MLM metric
    """

    metric_val = 0
    h_range = range(1) if comp_to_first else range(n)
    for h in h_range:
        t1 = counts[h, b]
        t3 = counts[h, b_prime]
        for h_prime in range(h + 1, n):
            t2 = counts[h_prime, b_prime]
            t4 = counts[h_prime, b]

            metric_val += (t1 * t2 - t3 * t4) ** 2 * weights[h, h_prime] ** 4

    return metric_val


@nb.njit(
    "(int64, Array(int64, 1, 'C'), Array(float64, 2, 'A'), Array(float64, 2, 'F'), int64, Array(float64, 2, 'C'), b1)",
    parallel=False,
    cache=True,
)
def _closest_pair_driver(
    n_items: int,
    things_to_recalculate: Iterable[int],
    scores: Iterable[Iterable[float]],
    counts: Iterable[Iterable[float]],
    n_hypotheses: int,
    weights: Iterable[Iterable[float]],
    comp_to_first: bool,
) -> int:
    h_range = range(1) if comp_to_first else range(n_hypotheses)

    for i in range(n_items):
        for j_idx in range(len(things_to_recalculate)):
            j = things_to_recalculate[j_idx]
            if i == j:
                scores[i][j] = np.inf
                continue
            metric_val = 0
            for h in h_range:
                t1 = counts[h, i]
                t3 = counts[h, j]
                for h_prime in range(h + 1, n_hypotheses):
                    t2 = counts[h_prime, j]
                    t4 = counts[h_prime, i]

                    term = t1 * t2 - t3 * t4
                    w = weights[h, h_prime]
                    metric_val += (term * term) * (w * w * w * w)

            scores[i][j] = metric_val

    return np.argmin(scores)


class Merger(ABC):
    """Abstract class that serves as a baseplate for both the local and nonlocal merger"""

    def __init__(
        self,
        bin_edges: Iterable[float],
        *counts: Iterable[float],
        weights: typing.Union[int, Iterable[float]] = None,
        comp_to_first: bool = False,
        map_at: Iterable[int] = None,
    ) -> None:
        """The initializer for the baseplate class

        Parameters
        ----------
        bin_edges : numpy.ndarray
            These are the edges of your histogram that correspond to physical quantities
        counts : numpy.ndarray
            A series of arrays that correspond to the number of events between your bin edges
        weights : numpy.ndarray, optional
            An array of the weights associated for each of the counts.
            If none are provided, the weights will be 1, by default None
        comp_to_first : bool, optional
            Whether you would like to compare all samples to the first one provided,
            as opposed to all of them to each other, by default False
        map_at : list, optional
            A list of bin numbers at which you would like the mapping
            from the original sample to be recorded, by default None
        brute_force_at : int, optional
            A value at or below which the merger will utilize the "brute-force"
            approach of merging by calculating a total ROC score, by default 10

        Raises
        ------
        ValueError
            If all the counts provided do not have the same length, raise an error
        ValueError
            If the length of the bin counts are not equal to 1 - len(bin_edges) raise an error
        ValueError
            If the number of samples and the number of weights are not the same, raise an error
        TypeError
            If map_at is not an iterable of some kind, raise an error
        """

        it = iter(counts)
        the_len = len(next(it))
        if not all(len(l) == the_len for l in it):
            errortext = [str(len(count)) for count in counts]
            errortext = " ".join(errortext)
            errortext = "Not all counts have same length! Lengths are: " + errortext
            raise ValueError("\n" + errortext)

        if len(counts[0]) != len(bin_edges) - 1:
            errortext = f"len(counts) = {len(counts[0])} != len(bin_edges) - 1 = {len(bin_edges)-1}"
            raise ValueError("\n" + errortext)

        if weights is not None and len(weights) != len(counts):
            errortext = (
                "The # of weight values and the # of samples should be the same!"
            )
            errortext += (
                f"\nThere are {len(counts)} samples and {len(weights)} weight values"
            )
            raise ValueError("\n" + errortext)

        if weights is None:
            weights = np.ones(len(counts), dtype=np.float64)
        else:
            weights = np.array(weights, dtype=np.float64)

        self._merger_type = None

        self.weights = np.outer(weights, weights)
        self.n_hypotheses = len(counts)
        self.n_items = self.original_n_items = len(counts[0])
        # self.n is the number of hypotheses, self.n_items is the current number of bin_edges

        self.comp_to_first = comp_to_first

        self.counts = np.vstack(counts).astype(np.float64)
        # self.counts[~np.isfinite(self.counts)] = 0

        self.bin_edges = np.array(bin_edges, dtype=np.float64)

        if map_at is None:
            map_at = []
        else:
            try:
                iter(map_at)
            except Exception as e:
                raise TypeError("Parameter map_at must be an iterable!") from e

        # sets are nice and hashed
        self.map_at = set([i for i in map_at if i < self.n_items])

    def __repr__(self) -> str:
        """Representation of the merger object

        Returns
        -------
        str
            A brief summary of the merger object
        """
        return f"Merger of type {self._merger_type} merging {self.original_n_items}"

    @abstractmethod
    def run(self, target_bin_number=2):
        """Runs the merger

        Parameters
        ----------
        target_bin_number : int, optional
            The number of bins you would like to merge down to, by default 2

        Returns
        -------
        NotImplemented
            Abstract class does not have a proper implementation!
        """
        return NotImplemented


[docs] class MergerLocal(Merger): """ A merger that merges bins locally. This will not change the physical ordering of the bin edges. """ def __init__( self, bin_edges: Iterable[float], *counts: Iterable[float], weights: Iterable[float] = None, comp_to_first: bool = False, map_at: Iterable[int] = None, file_path: str = "./", file_name: str = "", ) -> None: """The initializer for the local merging class Parameters ---------- bin_edges : numpy.ndarray These are the edges of your histogram that correspond to physical quantities counts : numpy.ndarray A series of arrays that correspond to the number of events between your bin edges. weights : numpy.ndarray, optional An array of the weights associated for each of the counts. If none are provided, the weights will be 1, by default None comp_to_first : bool, optional Whether you would like to compare all samples to the first one provided, as opposed to all of them to each other, by default False map_at : list, optional A list of bin numbers at which you would like the mapping from the original sample to be recorded, by default None file_path : str, optional The directory to place "_tracker.hdf5" should mapping be desired, by default "./" file_name : str, optional The file prefix before "_tracker.hdf5" to identify this mapping, by default "" Raises ------ ValueError The dimension of the bin edges can only be 1-dimensional NotADirectoryError If file_path is not a valid directory, raise an error """ super().__init__( bin_edges, *counts, weights=weights, comp_to_first=comp_to_first, map_at=map_at, ) if self.bin_edges.ndim > 1: raise ValueError("LOCAL MERGING CAN ONLY HANDLE 1-DIMENSIONAL ARRAYS") self._merger_type = "Local" if any(self.map_at): if not os.path.exists(file_path): raise NotADirectoryError(f"{file_path} is not a valid directory!") if file_path[-1] != "/": file_path += "/" self.tracker = h5py.File( f"{file_path}{file_name}_tracker.hdf5", "w", libver="latest", driver=None, ) for mapped_bincount in self.map_at: self.tracker.create_dataset( str(mapped_bincount), (mapped_bincount + 1), np.float64, compression="gzip", compression_opts=9, fletcher32=True, fillvalue=0, maxshape=len(bin_edges), shuffle=True, ) else: self.tracker = None @staticmethod @nb.njit( "(Array(float64, 2, 'C'), Array(float64, 1, 'C'), int64, int64)", cache=True, fastmath=True, ) def __merge_driver( counts: Iterable[Iterable[float]], bin_edges: Iterable[float], first_part: int, second_part: int, ): """This method is the numba-ified function that is called by __merge. It handles merging two histogram bins together and editing the bin edges inplace Parameters ---------- counts : numpy.ndarray A ndarray of the counts with shape (#samples, #bins) bin_edges : numpy.ndarray A 1-d array of bin edges that correspond to some physical meaning first_part : int The index that corresponds with the smaller of the two bin eges being merged second_part : int The index that corresponds with the larger of the two bin eges being merged Returns ------- tuple[numpy.ndarray, numpy.ndarray] A tuple of the new counts with two bins merged and a 1-d array of the new bin edges with one of the bin edges removed """ new_counts = counts[:, :-1].copy() new_counts[:, first_part] = (counts[:, first_part] + counts[:, second_part]).T new_counts[:, first_part + 1 :] = counts[:, second_part + 1 :] new_bin_edges = bin_edges[:-1].copy() new_bin_edges[first_part + 1 :] = bin_edges[second_part + 1 :] return (new_counts, new_bin_edges) def __merge(self, i: int, j: int): """Merges bins i and j such that the first and last bin edge are always preserved Parameters ---------- i : int The first index to merge j : int The second index to merge Returns ------- Tuple[numpy.ndarray, numpy.ndarray] A tuple of the new counts with two bins merged and a 1-d array of the new bin edges with one of the bin edges removed Raises ------ ValueError Raise an error if there are unhandled edge cases where i or j are invalid ValueError Raise an error if bins are sent to the function nonlocally """ if i == j + 1: if i > self.n_items - 1 or i < 0: raise ValueError("UNHANDLED EDGE CASE WHILE MERGING") return self.__merge_driver(self.counts, self.bin_edges, j, i) elif i == j - 1: if i > self.n_items - 1 or i < 1: raise ValueError("UNHANDLED EDGE CASE WHILE MERGING") return self.__merge_driver(self.counts, self.bin_edges, i, j) elif i == j: raise ValueError("TRYING TO MERGE THE SAME INDEX!") # return (self.counts, self.bin_edges) else: raise ValueError("This is local binning! Can only merge ahead or behind!")
[docs] def run(self, target_bin_number: int, return_counts: bool = False): """Runs the merger Parameters ---------- target_bin_number : int The number of bins you would like to merge down to return_counts : bool, optional Whether the returned value will have the bin counts returned alongside the bin edges, by default False Returns ------- numpy.ndarray or tuple(numpy.ndarray, numpy.ndarray) Returns the 1-d bin edges that would correspond to the best binning for the number of bins you want. If return_counts is set to true, also return the internal final bin counts, which have shape (#samples, target_bin_number). """ if self.n_items <= target_bin_number: warnings.warn("Merging is pointless! Number of bins already >= target") pbar = tqdm.tqdm( total=self.n_items - target_bin_number, desc="Binning locally:", leave=True, position=0, ) things_to_recalculate = range(1, self.n_items - 1) scores = np.full((self.n_items - 1, 3), np.inf, dtype=np.float64) # stores the 2 indices being merged and the score as a matrix while self.n_items > target_bin_number: for i in things_to_recalculate: scores[i] = ( i, i + 1, mlm_driver( self.n_hypotheses, self.counts, self.weights, i, i + 1, self.comp_to_first, ), ) if i == 1: scores[0] = ( 0, 1, mlm_driver( self.n_hypotheses, self.counts, self.weights, 1, 0, self.comp_to_first, ), ) # The last column is the scores, and the first 2 the indices i1, i2 = scores[np.argmin(scores[:, 2])][:-1].astype(int) # remove the merged index and move all indices down 1 scores[:, :-1][i1:] -= 1 scores = np.delete(scores, i1, axis=0) if i1 == 0: things_to_recalculate = (0, 1) i1, i2 = i2, i1 elif i1 == self.n_items - 2: if self.n_items <= 4: things_to_recalculate = (i1 - 1, i1 - 2) else: things_to_recalculate = (i1 - 2, i1 - 3) else: things_to_recalculate = (i1 - 1, i1) self.counts, self.bin_edges = self.__merge(i1, i2) self.n_items -= 1 if self.n_items in self.map_at: self.tracker[str(self.n_items)][:] = self.bin_edges pbar.update(1) if return_counts: return self.bin_edges, self.counts return self.bin_edges
[docs] class MergerNonlocal(Merger): """A merger that merges bins non-locally. Bin edges are irrelevant here.""" def __init__( self, bin_edges: Iterable[float], *counts: Iterable[float], weights: Iterable[float] = None, comp_to_first: bool = False, map_at: Iterable[int] = None, file_path: str = "./", file_name: str = "", ) -> None: """The initializer for the non-local merging class Parameters ---------- bin_edges : numpy.ndarray These are the edges of your histogram that correspond to physical quantities counts : numpy.ndarray A series of arrays that correspond to the number of events between your bin edges weights : numpy.ndarray, optional An array of the weights associated for each of the counts. If none are provided, the weights will be 1, by default None comp_to_first : bool, optional Whether you would like to compare all samples to the first one provided, as opposed to all of them to each other, by default False map_at : list, optional A list of bin numbers at which you would like the mapping from the original sample to be recorded, by default None file_path : str, optional The directory to place "_tracker.hdf5" should mapping be desired, by default "./" file_name : str, optional The file prefix before "_tracker.hdf5" to identify this mapping, by default "" Raises ------ ValueError The dimension of the bin edges can only be 1-dimensional """ unrolled_counts = list(map(np.ndarray.ravel, map(np.array, counts))) sizes = np.array([len(i) for i in unrolled_counts]) if np.any(sizes != sizes[0]): raise ValueError( f"All arrays must be the same length! Your unrolled arrays are of length: {sizes}" ) for nd_arr in counts: nd_arr = np.array(nd_arr) if len(nd_arr.shape) > 1: for i, size in enumerate(nd_arr.shape): if i == len(bin_edges): raise IndexError(f"No bin edges provided for dimension {i}!") if len(bin_edges[i]) != size + 1: raise ValueError( f"Bin edge for dimension {i} is of invalid size {len(bin_edges[i])} != {size + 1}!" ) else: if len(bin_edges) != len(counts[0]) + 1: raise ValueError( f"Bin edges are of invalid size {len(bin_edges)} != {len(counts[0]) + 1}!" ) unrolled_bins = np.arange(len(unrolled_counts[0]) + 1) super().__init__( unrolled_bins, *unrolled_counts, weights=weights, comp_to_first=comp_to_first, map_at=map_at, ) self._merger_type = "Non-local" self.counts = np.asfortranarray(self.counts) self.physical_bins = np.array(bin_edges, dtype=object) self.n_observables = len(bin_edges) self.scores = np.zeros((self.n_items, self.n_items), dtype=np.float64) self.things_to_recalculate = np.arange(self.n_items, dtype=int) self.__cur_iteration_tracker = nb.typed.Dict() for i in self.things_to_recalculate: self.__cur_iteration_tracker[i] = np.array([i], dtype=np.int64) if any(self.map_at): if not os.path.isdir(file_path): raise NotADirectoryError(f"{file_path} is not a valid directory!") if file_path[-1] != "/": file_path += "/" tracker_name = f"{file_path}{file_name}_tracker.hdf5" bins_name = f"{file_path}{file_name}_physical_bins.npy" if os.path.exists(tracker_name): os.system(f"rm {tracker_name} {bins_name}") self.tracker = h5py.File( tracker_name, "w", libver="latest", driver=None, ) for mapped_bincount in self.map_at: self.tracker.create_dataset( str(mapped_bincount), (self.original_n_items), np.uint32, compression="gzip", compression_opts=9, fletcher32=True, fillvalue=0, maxshape=(self.original_n_items), shuffle=True, ) np.save( bins_name, self.physical_bins, fix_imports=False, allow_pickle=True ) else: self.tracker = None @staticmethod @nb.njit( "(int64, int64, Array(float64, 2, 'F'), Array(float64, 2, 'A'), DictType(int64, Array(int64, 1, 'C')), int64, int64)", cache=True, fastmath=True, ) def __merge_driver_tracker( n_items, n_hypotheses, counts, scores, cur_tracker, i, j ): k = 0 sum_term = np.zeros(n_hypotheses) for c in range(n_items): if c != i and c != j: # if c == k then you don't need to do anything! # The bins pre and post merge will be the same if c != k: counts[:, k] = counts[:, c] scores[:, k], scores[k] = scores[:, c], scores[c] cur_tracker[k] = cur_tracker[c] k += 1 else: # add the merged terms to an accumulator sum_term += counts[:, c] counts[:, k] = sum_term.T counts = counts[:, :-1:1] scores = scores[: k + 1, : k + 1] scores[k] = np.inf scores[:, k] = np.inf return k, counts, scores @staticmethod @nb.njit("(DictType(int64, Array(int64, 1, 'C')), int64)", cache=True) def __convert_tracker(cur_tracker, original_n_items): """Converts the internal tracker into a bin map That maps original bin indices to new ones Returns ------- numpy.ndarray A 1-d array where the each element holds the new index of that index (i.e. the $i^{th}$ element of the array contains a value j, that value j is the new placement for any element that would land in i for the original binning) """ new_map = np.zeros(original_n_items, dtype=np.int64) for new_place, original_placement in cur_tracker.items(): for original_place in original_placement: new_map[original_place] = new_place return new_map
[docs] def run(self, target_bin_number: int): """Runs the merger Parameters ---------- target_bin_number : int, optional The number of bins you would like to merge down to Returns ------- numpy.ndarray Returns an array containing all the new counts for the nonlocal array since bin edges are now meaningless. The array has shape (# samples, target_bin_number) """ if self.n_items <= target_bin_number: warnings.warn("Merging is pointless! Number of bins already >= target") pbar = tqdm.tqdm( total=self.n_items - target_bin_number, desc="Binning non-locally:", leave=True, position=0, ) while self.n_items > target_bin_number: ###### FIND BINS TO MERGE ###### min_arg = _closest_pair_driver( self.n_items, self.things_to_recalculate, self.scores, self.counts, self.n_hypotheses, self.weights, self.comp_to_first, ) min_1, min_2 = np.unravel_index(min_arg, self.scores.shape) ##### MERGE STEP ###### if self.tracker is not None: # add tuples that contain original indices within the key of the new indices old_tracker_entries = np.concatenate( ( self.__cur_iteration_tracker[min_1], self.__cur_iteration_tracker[min_2], ) ) k, self.counts, self.scores = self.__merge_driver_tracker( self.n_items, self.n_hypotheses, self.counts, self.scores, self.__cur_iteration_tracker, min_1, min_2, ) self.things_to_recalculate = np.array( [ k, ], dtype=int, ) if self.tracker is not None: self.__cur_iteration_tracker[k] = old_tracker_entries del self.__cur_iteration_tracker[ k + 1 ] # in the end k is the number of entries left over ##### UPDATE STATE ###### self.n_items -= 1 if self.n_items in self.map_at: self.tracker[str(self.n_items)][:] = self.__convert_tracker( self.__cur_iteration_tracker, self.original_n_items ) pbar.update(1) if self.tracker is not None: self.tracker.close() return self.counts