Source code for persim.landscapes.exact

"""
    Persistence Landscape Exact class
"""

import itertools
from operator import itemgetter

import numpy as np

from .approximate import PersLandscapeApprox
from .auxiliary import _p_norm, union_crit_pairs
from .base import PersLandscape

__all__ = ["PersLandscapeExact"]


[docs] class PersLandscapeExact(PersLandscape): """Persistence Landscape Exact class. This class implements an exact version of Persistence Landscapes. The landscape functions are stored as a list of critical pairs, and the actual function is the linear interpolation of these critical pairs. All computations done with these classes are exact. For much faster but approximate methods that should suffice for most applications, consider `PersLandscapeApprox`. Parameters ---------- dgms : list of (-,2) numpy.ndarrays, optional A nested list of numpy arrays, e.g., [array( array([:]), array([:]) ),..., array([:])] Each entry in the list corresponds to a single homological degree. Each array represents the birth-death pairs in that homological degree. This is the output format from ripser.py: ripser(data_user)['dgms']. Only one of diagrams or critical pairs should be specified. hom_deg : int Represents the homology degree of the persistence diagram. critical_pairs : list, optional List of lists of critical pairs (points, values) for specifying a persistence landscape. These do not necessarily have to arise from a persistence diagram. Only one of diagrams or critical pairs should be specified. compute : bool, optional Flag determining whether landscape functions are computed upon instantiation. Examples -------- Define a persistence diagram and instantiate the landscape:: >>> from persim import PersLandscapeExact >>> import numpy as np >>> pd = [ np.array([[0,3],[1,4]]), np.array([[1,4]]) ] >>> ple = PersLandscapeExact(dgms=pd, hom_deg=0) >>> ple `PersLandscapeExact` instances store the critical pairs of the landscape as a list of lists in the `critical_pairs` attribute. The `i`-th entry corresponds to the critical values of the depth `i` landscape:: >>> ple.critical_pairs [[[0, 0], [1.5, 1.5], [2.0, 1.0], [2.5, 1.5], [4, 0]], [[1, 0], [2.0, 1.0], [3, 0]]] Addition, subtraction, and scalar multiplication between landscapes is implemented:: >>> pd2 = [ np.array([[0.5,7],[3,5],[4.1,6.5]]), np.array([[1,4]])] >>> pl2 = PersLandscapeExact(dgms=pd2,hom_deg=0) >>> pl_sum = ple + pl2 >>> pl_sum.critical_pairs [[[0, 0], [0.5, 0.5], [1.5, 2.5], [2.0, 2.5], [2.5, 3.5], [3.75, 3.5],[4, 3.0], [7.0, 0.0]], [[1, 0], [2.0, 1.0], [3, 0.0], [4.0, 1.0], [4.55, 0.45], [5.3, 1.2], [6.5, 0.0]], [[4.1, 0], [4.55, 0.45], [5.0, 0]]] >>> diff_pl = ple - pl2 >>> diff_pl.critical_pairs [[[0, 0], [0.5, 0.5], [1.5, 0.5], [2.0, -0.5], [2.5, -0.5], [3.75, -3.0], [4, -3.0], [7.0, 0.0]], [[1, 0], [2.0, 1.0], [3, 0.0], [4.0, -1.0], [4.55, -0.45], [5.3, -1.2], [6.5, 0.0]], [[4.1, 0], [4.55, -0.45], [5.0, 0]]] >>> (5*ple).critical_pairs [[(0, 0), (1.5, 7.5), (2.0, 5.0), (2.5, 7.5), (4, 0)], [(1, 0), (2.0, 5.0), (3, 0)]] Landscapes are sliced by depth and slicing returns the critical pairs in the range specified:: >>> ple[0] [[0, 0], [1.5, 1.5], [2.0, 1.0], [2.5, 1.5], [4, 0]] >>> pl2[1:] [[[3.0, 0], [4.0, 1.0], [4.55, 0.4500000000000002], [5.3, 1.2000000000000002], [6.5, 0]], [[4.1, 0], [4.55, 0.4500000000000002], [5.0, 0]]] `p` norms are implemented for all `p` as well as the supremum norm:: >>> ple.p_norm(p=3) 1.7170713638299977 >>> pl2.sup_norm() 3.25 """
[docs] def __init__( self, dgms: list = [], hom_deg: int = 0, critical_pairs: list = [], compute: bool = True, ) -> None: super().__init__(dgms=dgms, hom_deg=hom_deg) self.critical_pairs = critical_pairs if dgms: self.dgms = dgms[self.hom_deg] else: # critical pairs are passed. Is this the best check for this? self.dgms = dgms if not dgms and not critical_pairs: raise ValueError("dgms and critical_pairs cannot both be empty") self.max_depth = len(self.critical_pairs) if compute: self.compute_landscape()
def __repr__(self): return f"Exact persistence landscape in homological degree {self.hom_deg}" def __neg__(self): """ Computes the negation of a persistence landscape object """ self.compute_landscape() return PersLandscapeExact( hom_deg=self.hom_deg, critical_pairs=[ [[a, -b] for a, b in depth_list] for depth_list in self.critical_pairs ], ) def __add__(self, other): """ Computes the sum of two persistence landscape objects """ if self.hom_deg != other.hom_deg: raise ValueError("homological degrees must match") return PersLandscapeExact( critical_pairs=union_crit_pairs(self, other), hom_deg=self.hom_deg ) def __sub__(self, other): """ Computes the difference of two persistence landscape objects """ return self + -other def __mul__(self, other: float): """ Computes the product of a persistence landscape object and a float Parameters ------- other: float the real scalar the persistence landscape will be multiplied by """ self.compute_landscape() return PersLandscapeExact( hom_deg=self.hom_deg, critical_pairs=[ [(a, other * b) for a, b in depth_list] for depth_list in self.critical_pairs ], ) def __rmul__(self, other: float): """ Computes the product of a persistence landscape object and a float Parameters ------- other: float the real scalar the persistence landscape will be multiplied by """ return self.__mul__(other) def __truediv__(self, other: float): """ Computes the quotient of a persistence landscape object and a float Parameters ------- other: float the real divisor of the persistence landscape object """ if other == 0.0: raise ValueError("Cannot divide by zero") return self * (1.0 / other) def __getitem__(self, key: slice) -> list: """ Returns a list of critical pairs corresponding in range specified by depth Parameters ---------- key : slice object Returns ------- list The critical pairs of the landscape function corresponding to depths given by key Note ---- If the slice is beyond `self.max_depth` an IndexError is raised. """ self.compute_landscape() return self.critical_pairs[key] def compute_landscape(self, verbose: bool = False) -> list: """ Stores the persistence landscape in `self.critical_pairs` as a list Parameters ---------- verbose: bool, optional If true, progress messages are printed during computation """ verboseprint = print if verbose else lambda *a, **k: None # check if landscapes were already computed if self.critical_pairs: verboseprint( "self.critical_pairs was not empty and stored value was returned" ) return self.critical_pairs A = self.dgms # change A into a list A = list(A) # change inner nparrays into lists for i in range(len(A)): A[i] = list(A[i]) if A[-1][1] == np.inf: A.pop(-1) landscape_idx = 0 L = [] # Sort A: read from right to left inside () A = sorted(A, key=lambda x: [x[0], -x[1]]) while A: verboseprint(f"computing landscape index {landscape_idx+1}...") # add a 0 element to begin count of lamda_k # size_landscapes = np.append(size_landscapes, [0]) # pop first term b, d = A.pop(0) verboseprint(f"(b,d) is ({b},{d})") # outer brackets for start of L_k L.append([[-np.inf, 0], [b, 0], [(b + d) / 2, (d - b) / 2]]) # check for duplicates of (b,d) duplicate = 0 for j, itemj in enumerate(A): if itemj == [b, d]: duplicate += 1 A.pop(j) else: break while L[landscape_idx][-1] != [np.inf, 0]: # if d is > = all remaining pairs, then end lambda # includes edge case with (b,d) pairs with the same death time if all(d >= _[1] for _ in A): # add to end of L_k L[landscape_idx].extend([[d, 0], [np.inf, 0]]) # for duplicates, add another copy of the last computed lambda for _ in range(duplicate): L.append(L[-1]) landscape_idx += 1 else: # set (b', d') to be the first term so that d' > d for i, item in enumerate(A): if item[1] > d: b_prime, d_prime = A.pop(i) verboseprint(f"(bp,dp) is ({b_prime},{d_prime})") break # Case I if b_prime > d: L[landscape_idx].extend([[d, 0]]) # Case II if b_prime >= d: L[landscape_idx].extend([[b_prime, 0]]) # Case III else: L[landscape_idx].extend( [[(b_prime + d) / 2, (d - b_prime) / 2]] ) # push (b', d) into A in order # find the first b_i in A so that b'<= b_i # push (b', d) to end of list if b' not <= any bi ind = len(A) for i in range(len(A)): if b_prime <= A[i][0]: ind = i # index to push (b', d) if b' != b_i break # if b' not < = any bi, put at the end of list if ind == len(A): pass # if b' = b_i elif b_prime == A[ind][0]: # pick out (bk,dk) such that b' = bk A_i = [item for item in A if item[0] == b_prime] # move index to the right one for every d_i such that d < d_i for j in range(len(A_i)): if d < A_i[j][1]: ind += 1 # d > dk for all k A.insert(ind, [b_prime, d]) L[landscape_idx].extend( [[(b_prime + d_prime) / 2, (d_prime - b_prime) / 2]] ) # size_landscapes[landscape_idx] += 1 b, d = b_prime, d_prime # Set (b',d')= (b, d) landscape_idx += 1 verboseprint("self.critical_pairs was empty and algorthim was executed") self.max_depth = len(L) self.critical_pairs = [item[1:-1] for item in L] def compute_landscape_by_depth(self, depth: int) -> list: """ Returns the function of depth from `self.critical_pairs` as a list Parameters ---------- depth: int the depth of the desired landscape function """ if self.critical_pairs: return self.critical_pairs[depth] else: return self.compute_landscape()[depth] def p_norm(self, p: int = 2) -> float: """ Returns the L_{`p`} norm of an exact persistence landscape Parameters ---------- p: float, default 2 value p of the L_{`p`} norm """ super().p_norm(p=p) return _p_norm(p=p, critical_pairs=self.critical_pairs) def sup_norm(self) -> float: """ Returns the supremum norm of an exact persistence landscape """ self.compute_landscape() cvals = list(itertools.chain.from_iterable(self.critical_pairs)) return max(np.abs(cvals), key=itemgetter(1))[1]