Source code for MiLoMerge.merging.place_from_map

import warnings
import os
import numpy as np
import h5py
from collections.abc import Iterable


def __load_file_local(fname, key):
    """Simple function to load and return an h5py file

    Parameters
    ----------
    fname : str
        The filename to load
    key : str
        The key to load for the h5py file

    Returns
    -------
    numpy.ndarray
        Returns a copy of the array that is being accessed through h5py
    """
    f = h5py.File(fname, "r")
    return f[key][:]


def __load_file_nonlocal(fname_tracker, fname_bins, key):
    """Simple function to load and return an h5py file
    alongside its corresponding "physical bins" file

    Parameters
    ----------
    fname_tracker : str
        The filename of the tracker (h5py file)
    fname_bins : str
        The filename of the physical bins file
    key : str
        The key to load for the h5py file

    Returns
    -------
    tuple[numpy.ndarray, numpy.ndarray]
        Returns a copy of the array that is being accessed through h5py
        alongside a copy of the physical bins
    """
    f = h5py.File(fname_tracker, "r")
    bin_mapping = f[key][:]

    physical_bins = np.load(fname_bins, allow_pickle=True)

    return bin_mapping, physical_bins


[docs] def place_event_nonlocal( N: int, *observable: float, file_prefix: str, verbose: bool = False ): """This function takes in one N-dimensional observable from data and utilizes the mapping and saved physical bins from MiLoMerge.MergerNonlocal to output where that event would go in the final nonlocal binning. Parameters ---------- N : int The number of bins in the mapping to use observable : float args input where one inputs observables in the same order as the input binning to MiLoMerge file_prefix : str The entirety of the filepath before _tracker.hdf5 or "_physical_bins.npy". This argument should be the same as `f"{file_path} + {file_prefix}"`, where `file_path` and `file_prefix` are the inputs given to MiLoMerge.MergerNonlocal. verbose : bool, optional Whether additional print statements are turned on, by default False Returns ------- int The index of where the event is placed in the final binning. Raises ------ FileNotFoundError If the prefix is not suitable to find the appropriate tracker.hdf5 file, raise an error. FileNotFoundError If the prefix is not suitable to find the appropriate physical_bins.npy file, raise an error. ValueError If any observable is outside of the provided bins, raise an error ValueError If the dimensions of the observable and the dimensions of the bins are not compatible, raise an error """ if not os.path.exists(f"{file_prefix}_tracker.hdf5"): raise FileNotFoundError(f"{file_prefix}_tracker.hdf5 does not exist!") if not os.path.exists(f"{file_prefix}_physical_bins.npy"): raise FileNotFoundError(f"{file_prefix}_physical_bins.npy does not exist!") fname_tracker = f"{file_prefix}_tracker.hdf5" fname_bins = f"{file_prefix}_physical_bins.npy" bin_mapping, physical_bins = __load_file_nonlocal(fname_tracker, fname_bins, str(N)) observable = np.array(observable) if not (isinstance(physical_bins[0], int) or isinstance(physical_bins[0], float)): subarray_lengths = np.array([len(b) for b in physical_bins]) else: subarray_lengths = np.array([len(physical_bins)]) if len(physical_bins.shape) > 1 and len(observable) > 1: n_observables = physical_bins.shape[0] n_physical_bins = physical_bins.shape[1] nonzero_rolled = np.zeros(n_observables, dtype=np.uint64) for i in np.arange(n_observables): nonzero_rolled[i] = np.searchsorted(physical_bins[i], observable[i]) - 1 if np.any(nonzero_rolled < 0) or np.any(nonzero_rolled > n_physical_bins - 1): raise ValueError(f"observables are outside of the provided phase space!") if verbose: print("original index of:", nonzero_rolled) print("This places your point in the range:") for i in range(physical_bins.shape[0]): print( "[", physical_bins[i][nonzero_rolled[i]], ",", physical_bins[i][int(nonzero_rolled[i] + 1)], "]", ) unrolled_index = ( np.power( n_physical_bins - 1, np.arange(n_observables - 1, -1, -1, np.int16) ) * nonzero_rolled ).sum() elif any([subarray_lengths[0] != b for b in subarray_lengths]) and len( subarray_lengths ) == len(observable): n_observables = len(observable) n_physical_bins = np.array(subarray_lengths) # this is now per dimension nonzero_rolled = np.zeros(n_observables, dtype=np.uint64) for i in np.arange(n_observables): nonzero_rolled[i] = np.searchsorted(physical_bins[i], observable[i]) - 1 if nonzero_rolled[i] < 0 or nonzero_rolled[i] >= n_physical_bins[i]: raise ValueError( f"Observable of index {i} is outside of the provided phase space!" ) if verbose: print("original index of:", nonzero_rolled[i]) print("This places your point in the range:") print( "[", physical_bins[i][nonzero_rolled[i]], ",", physical_bins[i][int(nonzero_rolled[i] + 1)], "]", ) unrolled_index = 0 multiplier = 1 for i in reversed(range(len(n_physical_bins))): unrolled_index += nonzero_rolled[i] * multiplier if i > 0: multiplier *= subarray_lengths[i] - 1 elif len(physical_bins.shape) > 1 or len(observable) > 1: raise ValueError( f"Shapes are incompatible\n" f" shape of bins is {len(physical_bins.shape)} and" f" length of observables is {len(observable)}" ) else: if len(observable) > 1: raise ValueError unrolled_index = np.searchsorted(physical_bins, observable) - 1 unrolled_index = int(unrolled_index) try: mapped_index = bin_mapping[unrolled_index] except IndexError as e: raise ValueError(f"{observable} is outside of the provided phase space!") from e return mapped_index
[docs] def place_array_nonlocal( N: int, observables: Iterable[Iterable[float]], file_prefix: str = "", verbose: bool = False, ): """This function takes in an array of N-dimensional observables from data and utilizes the mapping and saved physical bins from MiLoMerge.MergerNonlocal to output where that event would go in the final nonlocal binning. The array version of place_event_nonlocal. Parameters ---------- N : int The number of bins in the mapping to use observables : numpy.ndarray[float] Array should be of shape (#events, #observables). Contains all the observables to be binned. file_prefix : str The entirety of the filepath before _tracker.hdf5 or "_physical_bins.npy". This argument should be the same as `f"{file_path} + {file_prefix}"`, where `file_path` and `file_prefix` are the inputs given to MiLoMerge.MergerNonlocal. verbose : bool, optional Whether additional print statements are turned on, by default False Returns ------- numpy.ndarray[int] A 1-d array of indices for where the events are placed in the final binning. Raises ------ FileNotFoundError If the prefix is not suitable to find the appropriate tracker.hdf5 file, raise an error. FileNotFoundError If the prefix is not suitable to find the appropriate physical_bins.npy file, raise an error. ValueError If any observable is outside of the provided bins, raise an error ValueError If the dimensions of the observable and the dimensions of the bins are not compatible, raise an error KeyError If any observable is outside the provided bins, raise an error """ if not os.path.exists(f"{file_prefix}_tracker.hdf5"): raise FileNotFoundError(f"{file_prefix}_tracker.hdf5 does not exist!") if not os.path.exists(f"{file_prefix}_physical_bins.npy"): raise FileNotFoundError(f"{file_prefix}_physical_bins.npy does not exist!") fname_tracker = f"{file_prefix}_tracker.hdf5" fname_bins = f"{file_prefix}_physical_bins.npy" bin_mapping, physical_bins = __load_file_nonlocal(fname_tracker, fname_bins, str(N)) observables_stacked = np.array(observables) if not (isinstance(physical_bins[0], int) or isinstance(physical_bins[0], float)): subarray_lengths = np.array([len(b) for b in physical_bins]) else: subarray_lengths = np.array([len(physical_bins)]) if physical_bins.ndim > 1: if len(observables_stacked[0]) != len(physical_bins): raise ValueError( f"Number of observables {len(observables_stacked[0])} != Number of bin dimensions {len(physical_bins)}" ) n_physical_bins = physical_bins.shape[1] n_datapoints, n_observables = observables_stacked.shape nonzero_rolled = np.zeros((n_datapoints, n_observables), dtype=np.uint64) for i in range(n_observables): left_edge_mask = observables_stacked[:, i] == physical_bins[i][0] nonzero_rolled[:, i][~left_edge_mask] = ( np.searchsorted( physical_bins[i], observables_stacked[:, i][~left_edge_mask] ) - 1 ) if verbose: print("Original indices") print(nonzero_rolled) unrolled_index = ( np.power( n_physical_bins - 1, np.arange(n_observables - 1, -1, -1, np.int64) ) * nonzero_rolled ).sum(axis=1) unrolled_index = unrolled_index.astype(int) elif np.any(subarray_lengths != subarray_lengths[0]): if len(observables_stacked[0]) != len(physical_bins): raise ValueError( f"Number of observables {len(observables_stacked[0])} != Number of bin dimensions {len(physical_bins)}" ) n_physical_bins = subarray_lengths n_datapoints, n_observables = observables_stacked.shape nonzero_rolled = np.zeros((n_datapoints, n_observables), dtype=np.uint64) for i in range(n_observables): nonzero_rolled[:, i] = ( np.searchsorted(physical_bins[i], observables_stacked[:, i]) - 1 ) if verbose: print("Original indices") print(nonzero_rolled) unrolled_index = np.zeros(n_datapoints, dtype=np.uint64) multiplier = 1 for i in reversed(range(len(n_physical_bins))): unrolled_index += nonzero_rolled[:, i] * multiplier if i > 0: multiplier *= subarray_lengths[i] - 1 else: if observables_stacked.ndim != physical_bins.ndim: raise ValueError( f"Number of observables {observables_stacked.ndim} != Number of bin dimensions {physical_bins.ndim}" ) n_physical_bins = len(physical_bins) nonzero_rolled = np.searchsorted(physical_bins, observables_stacked) - 1 unrolled_index = nonzero_rolled failed_events = unrolled_index > len(bin_mapping) if np.any(failed_events): print("The following events have indices that are too large:") for i, j in zip(nonzero_rolled[failed_events], unrolled_index[failed_events]): print(f"{i} = {j}") raise KeyError( "Please check your phasespace to ensure it is within your original binning!" ) return bin_mapping[unrolled_index].ravel()
[docs] def place_local( N: int, observable_array: Iterable[float], file_prefix: str, verbose: bool = False, ): """Places 1-dimensional data into the respective bins for a given bin number. Equivalent to running numpy.histogram for the given N+1 bin edges stored. Parameters ---------- N : int The number of bins that are desired observable_array : numpy.ndarray A 1-d array of datapoints to be binned with N bins file_prefix : str, optional The entirety of the filepath before _tracker.hdf5 or "_physical_bins.npy". This argument should be the same as `f"{file_path} + {file_prefix}"`, where `file_path` and `file_prefix` are the inputs given to MiLoMerge.MergerNonlocal. verbose : bool, optional Whether additional print statements are turned on, by default False, by default False Returns ------- tuple(numpy.ndarray[int], numpy.ndarray(float)) Returns the counts and bin edges for the data in the same fashion as numpy.histogram. Raises ------ FileNotFoundError If the prefix is not suitable to find the appropriate tracker.hdf5 file, raise an error. """ if not os.path.exists(f"{file_prefix}_tracker.hdf5"): raise FileNotFoundError(f"{file_prefix}_tracker.hdf5 does not exist!") fname_tracker = f"{file_prefix}_tracker.hdf5" bin_mapping = __load_file_local(fname_tracker, str(N)) if verbose: print(f"Using file {os.path.abspath(fname_tracker)}") print(np.array(bin_mapping)) placements = np.searchsorted(bin_mapping, observable_array) - 1 if np.any((placements < 0) | (placements >= len(bin_mapping))): warnings.warn( "Some items placed out of bounds! Please check your phasespace to ensure it is within your original binning!" ) return np.bincount(placements, minlength=N), bin_mapping