Source code for esda.moran

"""
Moran's I Spatial Autocorrelation Statistics

"""

__author__ = (
    "Sergio J. Rey <srey@asu.edu>, "
    "Dani Arribas-Bel <daniel.arribas.bel@gmail.com>, "
    "Levi John Wolf <levi.john.wolf@gmail.com>"
)

from warnings import simplefilter

import numpy as np
import pandas as pd
import scipy.stats as stats
from libpysal.weights import W
from libpysal.weights.spatial_lag import lag_spatial
from scipy import sparse

from .crand import _prepare_univariate
from .crand import crand as _crand_plus
from .crand import njit as _njit
from .smoothing import assuncao_rate
from .tabular import _bivariate_handler, _univariate_handler

__all__ = [
    "Moran",
    "Moran_Local",
    "Moran_BV",
    "Moran_BV_matrix",
    "Moran_Local_BV",
    "Moran_Rate",
    "Moran_Local_Rate",
    "plot_moran_facet",
]

PERMUTATIONS = 999


def _slag(w, y):
    """Helper to compute lag either for W or for Graph"""
    if isinstance(w, W):
        return lag_spatial(w, y)
    else:
        return w.lag(y)


def _transform(w, transformation):
    """Helper to transform W or Graph"""
    if isinstance(w, W):
        w.transform = transformation
        return w
    else:
        return w.transform(transformation)


[docs] class Moran: """Moran's I Global Autocorrelation Statistic Parameters ---------- y : array variable measured across n spatial units w : W | Graph spatial weights instance as W or Graph aligned with y transformation : string weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``) (general weights), "V": variance-stabilizing. permutations : int number of random permutations for calculation of pseudo-p_values two_tailed : boolean If True (default) analytical p-values for Moran are two tailed, otherwise if False, they are one-tailed. Attributes ---------- y : array original variable w : W | Graph original w object z : array zero-mean, unit standard deviation normalized y permutations : int number of permutations I : float value of Moran's I EI : float expected value under normality assumption VI_norm : float variance of I under normality assumption seI_norm : float standard deviation of I under normality assumption z_norm : float z-value of I under normality assumption p_norm : float p-value of I under normality assumption VI_rand : float variance of I under randomization assumption seI_rand : float standard deviation of I under randomization assumption z_rand : float z-value of I under randomization assumption p_rand : float p-value of I under randomization assumption two_tailed : boolean If True p_norm and p_rand are two-tailed, otherwise they are one-tailed. sim : array (if permutations>0) vector of I values for permuted samples p_sim : array (if permutations>0) p-value based on permutations (one-tailed) null: spatial randomness alternative: the observed I is extreme if it is either extremely greater or extremely lower than the values obtained based on permutations EI_sim : float (if permutations>0) average value of I from permutations VI_sim : float (if permutations>0) variance of I from permutations seI_sim : float (if permutations>0) standard deviation of I under permutations. z_sim : float (if permutations>0) standardized I based on permutations p_z_sim : float (if permutations>0) p-value based on standard normal approximation from permutations Notes ----- Technical details and derivations can be found in :cite:`cliff81`. Examples -------- >>> import libpysal >>> w = libpysal.io.open(libpysal.examples.get_path("stl.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("stl_hom.txt")) >>> y = np.array(f.by_col['HR8893']) >>> from esda.moran import Moran >>> mi = Moran(y, w) >>> round(mi.I, 3) 0.244 >>> mi.EI -0.012987012987012988 >>> mi.p_norm 0.00027147862770937614 SIDS example replicating OpenGeoda >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) >>> SIDR = np.array(f.by_col("SIDR74")) >>> mi = Moran(SIDR, w) >>> round(mi.I, 3) 0.248 >>> mi.p_norm 0.0001158330781489969 One-tailed >>> mi_1 = Moran(SIDR, w, two_tailed=False) >>> round(mi_1.I, 3) 0.248 >>> round(mi_1.p_norm, 4) 0.0001 """
[docs] def __init__( self, y, w, transformation="r", permutations=PERMUTATIONS, two_tailed=True ): y = np.asarray(y).flatten() self.y = y w = _transform(w, transformation) self.w = w self.permutations = permutations self.__moments() self.I = self.__calc(self.z) # noqa: E741 self.z_norm = (self.I - self.EI) / self.seI_norm self.z_rand = (self.I - self.EI) / self.seI_rand if self.z_norm > 0: self.p_norm = stats.norm.sf(self.z_norm) self.p_rand = stats.norm.sf(self.z_rand) else: self.p_norm = stats.norm.cdf(self.z_norm) self.p_rand = stats.norm.cdf(self.z_rand) if two_tailed: self.p_norm *= 2.0 self.p_rand *= 2.0 if permutations: sim = [ self.__calc(np.random.permutation(self.z)) for i in range(permutations) ] self.sim = sim = np.array(sim) above = sim >= self.I larger = above.sum() if (self.permutations - larger) < larger: larger = self.permutations - larger self.p_sim = (larger + 1.0) / (permutations + 1.0) self.EI_sim = sim.sum() / permutations self.seI_sim = np.array(sim).std() self.VI_sim = self.seI_sim**2 self.z_sim = (self.I - self.EI_sim) / self.seI_sim if self.z_sim > 0: self.p_z_sim = stats.norm.sf(self.z_sim) else: self.p_z_sim = stats.norm.cdf(self.z_sim) # provide .z attribute that is znormalized sy = y.std() self.z /= sy
def __moments(self): self.n = len(self.y) y = self.y z = y - y.mean() self.z = z self.z2ss = (z * z).sum() self.EI = -1.0 / (self.n - 1) n = self.n n2 = n * n if isinstance(self.w, W): s1 = self.w.s1 s0 = self.w.s0 s2 = self.w.s2 else: self.summary = self.w.summary() s1 = self.summary.s1 s0 = self.summary.s0 s2 = self.summary.s2 s02 = s0 * s0 v_num = n2 * s1 - n * s2 + 3 * s02 v_den = (n - 1) * (n + 1) * s02 self.VI_norm = v_num / v_den - (1.0 / (n - 1)) ** 2 self.seI_norm = self.VI_norm ** (1 / 2.0) # variance under randomization xd4 = z**4 xd2 = z**2 k_num = xd4.sum() / n k_den = (xd2.sum() / n) ** 2 k = k_num / k_den EI = self.EI A = n * ((n2 - 3 * n + 3) * s1 - n * s2 + 3 * s02) B = k * ((n2 - n) * s1 - 2 * n * s2 + 6 * s02) VIR = (A - B) / ((n - 1) * (n - 2) * (n - 3) * s02) - EI * EI self.VI_rand = VIR self.seI_rand = VIR ** (1 / 2.0) def __calc(self, z): zl = _slag(self.w, z) inum = (z * zl).sum() s0 = self.w.s0 if isinstance(self.w, W) else self.summary.s0 return self.n / s0 * inum / self.z2ss @property def _statistic(self): """More consistent hidden attribute to access ESDA statistics""" return self.I
[docs] @classmethod def by_col( cls, df, cols, w=None, inplace=False, pvalue="sim", outvals=None, **stat_kws ): """ Function to compute a Moran statistic on a dataframe Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column cols : string or list of string name or list of names of columns to use to compute the statistic w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran statistic **stat_kws : dict options to pass to the underlying statistic. For this, see the documentation for the Moran statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ return _univariate_handler( df, cols, w=w, inplace=inplace, pvalue=pvalue, outvals=outvals, stat=cls, swapname=cls.__name__.lower(), **stat_kws, )
[docs] def plot_scatter( self, ax=None, scatter_kwds=None, fitline_kwds=None, ): """ Plot a Moran scatterplot with optional coloring for significant points. Parameters ---------- ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. scatter_kwds : dict, optional Additional keyword arguments for scatter plot, by default None. fitline_kwds : dict, optional Additional keyword arguments for fit line, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. """ return _scatterplot( self, crit_value=None, ax=ax, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, )
[docs] def plot_simulation(self, ax=None, legend=False, fitline_kwds=None, **kwargs): """ Global Moran's I simulated reference distribution. Parameters ---------- ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. legend : bool, optional Plot a legend, by default False fitline_kwds : dict, optional Additional keyword arguments for vertical Moran fit line, by default None. **kwargs : keyword arguments, optional Additional keyword arguments for KDE plot passed to ``seaborn.kdeplot``, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. Notes ----- This requires optional dependencies ``matplotlib`` and ``seaborn``. Examples -------- >>> import libpysal >>> w = libpysal.io.open(libpysal.examples.get_path("stl.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("stl_hom.txt")) >>> y = np.array(f.by_col['HR8893']) >>> from esda.moran import Moran >>> mi = Moran(y, w) Default plot: >>> mi.plot_simulation() Customized styling that turns the distribution into a pink line and line indicating I to a black line: >>> mi.plot_simulation(fitline_kwds={"color": "k"}, color="pink", shade=False) """ return _simulation_plot( self, ax=ax, legend=legend, bivariate=False, fitline_kwds=fitline_kwds, **kwargs, )
[docs] class Moran_BV: # noqa: N801 """ Bivariate Moran's I Parameters ---------- x : array x-axis variable y : array wy will be on y axis w : W | Graph spatial weights instance as W or Graph aligned with x and y transformation : {'R', 'B', 'D', 'U', 'V'} weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``), "V": variance-stabilizing. permutations : int number of random permutations for calculation of pseudo p_values Attributes ---------- zx : array original x variable standardized by mean and std zy : array original y variable standardized by mean and std w : W | Graph original w object permutation : int number of permutations I : float value of bivariate Moran's I sim : array (if permutations>0) vector of I values for permuted samples p_sim : float (if permutations>0) p-value based on permutations (one-sided) null: spatial randomness alternative: the observed I is extreme it is either extremely high or extremely low EI_sim : array (if permutations>0) average value of I from permutations VI_sim : array (if permutations>0) variance of I from permutations seI_sim : array (if permutations>0) standard deviation of I under permutations. z_sim : array (if permutations>0) standardized I based on permutations p_z_sim : float (if permutations>0) p-value based on standard normal approximation from permutations Notes ----- Inference is only based on permutations as analytical results are not too reliable. Examples -------- >>> import libpysal >>> import numpy as np Set random number generator seed so we can replicate the example >>> np.random.seed(10) Open the sudden infant death dbf file and read in rates for 74 and 79 converting each to a numpy array >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) >>> SIDR74 = np.array(f.by_col['SIDR74']) >>> SIDR79 = np.array(f.by_col['SIDR79']) Read a GAL file and construct our spatial weights object >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() Create an instance of Moran_BV >>> from esda.moran import Moran_BV >>> mbi = Moran_BV(SIDR79, SIDR74, w) What is the bivariate Moran's I value >>> round(mbi.I, 3) 0.156 Based on 999 permutations, what is the p-value of our statistic >>> round(mbi.p_z_sim, 3) 0.001 """
[docs] def __init__(self, x, y, w, transformation="r", permutations=PERMUTATIONS): x = np.asarray(x).flatten() y = np.asarray(y).flatten() zy = (y - y.mean()) / y.std(ddof=1) zx = (x - x.mean()) / x.std(ddof=1) self.y = y self.x = x self.zx = zx self.zy = zy n = x.shape[0] self.den = n - 1.0 # zx'zx = zy'zy = n-1 w = _transform(w, transformation) self.w = w self.I = self.__calc(zy) # noqa: E741 if permutations: nrp = np.random.permutation sim = [self.__calc(nrp(zy)) for i in range(permutations)] self.sim = sim = np.array(sim) above = sim >= self.I larger = above.sum() if (permutations - larger) < larger: larger = permutations - larger self.p_sim = (larger + 1.0) / (permutations + 1.0) self.EI_sim = sim.sum() / permutations self.seI_sim = np.array(sim).std() self.VI_sim = self.seI_sim**2 self.z_sim = (self.I - self.EI_sim) / self.seI_sim if self.z_sim > 0: self.p_z_sim = stats.norm.sf(self.z_sim) else: self.p_z_sim = stats.norm.cdf(self.z_sim)
def __calc(self, zy): wzy = _slag(self.w, zy) self.num = (self.zx * wzy).sum() return self.num / self.den @property def _statistic(self): """More consistent hidden attribute to access ESDA statistics""" return self.I
[docs] @classmethod def by_col( cls, df, x, y=None, w=None, inplace=False, pvalue="sim", outvals=None, **stat_kws, ): """ Function to compute a Moran_BV statistic on a dataframe Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column X : list of strings column name or list of column names to use as X values to compute the bivariate statistic. If no Y is provided, pairwise comparisons among these variates are used instead. Y : list of strings column name or list of column names to use as Y values to compute the bivariate statistic. if no Y is provided, pariwise comparisons among the X variates are used instead. w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran_local' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran_BV statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran_BV statistic **stat_kws : keyword arguments options to pass to the underlying statistic. For this, see the documentation for the Moran_BV statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ return _bivariate_handler( df, x, y=y, w=w, inplace=inplace, pvalue=pvalue, outvals=outvals, swapname=cls.__name__.lower(), stat=cls, **stat_kws, )
[docs] def plot_scatter( self, ax=None, scatter_kwds=None, fitline_kwds=None, ): """ Plot a Moran scatterplot with optional coloring for significant points. Parameters ---------- ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. scatter_kwds : dict, optional Additional keyword arguments for scatter plot, by default None. fitline_kwds : dict, optional Additional keyword arguments for fit line, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. """ return _scatterplot( self, crit_value=None, bivariate=True, ax=ax, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, )
[docs] def plot_simulation(self, ax=None, legend=False, fitline_kwds=None, **kwargs): """ Global Moran's I simulated reference distribution. Parameters ---------- ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. legend : bool, optional Plot a legend, by default False fitline_kwds : dict, optional Additional keyword arguments for vertical Moran fit line, by default None. **kwargs : keyword arguments, optional Additional keyword arguments for KDE plot passed to ``seaborn.kdeplot``, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. Notes ----- This requires optional dependencies ``matplotlib`` and ``seaborn``. """ return _simulation_plot( self, ax=ax, legend=legend, bivariate=True, fitline_kwds=fitline_kwds, **kwargs, )
[docs] def Moran_BV_matrix(variables, w, permutations=0, varnames=None): # noqa: N802 """ Bivariate Moran Matrix Calculates bivariate Moran between all pairs of a set of variables. Parameters ---------- variables : array or pandas.DataFrame sequence of variables to be assessed w : W | Graph spatial weights instance as W or Graph aligned with variables permutations : int number of permutations varnames : list, optional if variables is an array Strings for variable names. Will add an attribute to `Moran_BV` objects in results needed for plotting in `splot` or `.plot()`. Default =None. Note: If variables is a `pandas.DataFrame` varnames will automatically be generated Returns ------- results : dictionary (i, j) is the key for the pair of variables, values are the Moran_BV objects. Examples -------- open dbf >>> import libpysal >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) pull of selected variables from dbf and create numpy arrays for each >>> varnames = ['SIDR74', 'SIDR79', 'NWR74', 'NWR79'] >>> vars = [np.array(f.by_col[var]) for var in varnames] create a contiguity matrix from an external gal file >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() create an instance of Moran_BV_matrix >>> from esda.moran import Moran_BV_matrix >>> res = Moran_BV_matrix(vars, w, varnames = varnames) check values >>> round(res[(0, 1)].I,7) 0.1936261 >>> round(res[(3, 0)].I,7) 0.3770138 """ try: # check if pandas is installed import pandas if isinstance(variables, pandas.DataFrame): # if yes use variables as df and convert to numpy_array varnames = pandas.Index.tolist(variables.columns) variables_n = [] for var in varnames: variables_n.append(variables[str(var)].values) else: variables_n = variables except ImportError: variables_n = variables results = _Moran_BV_Matrix_array( variables=variables_n, w=w, permutations=permutations, varnames=varnames ) return results
def _Moran_BV_Matrix_array(variables, w, permutations=0, varnames=None): # noqa: N802 """ Base calculation for MORAN_BV_Matrix """ k = len(variables) if varnames is None: varnames = [f"x{i}" for i in range(k)] rk = list(range(0, k - 1)) results = {} for i in rk: for j in range(i + 1, k): y1 = variables[i] y2 = variables[j] results[i, j] = Moran_BV(y1, y2, w, permutations=permutations) results[j, i] = Moran_BV(y2, y1, w, permutations=permutations) results[i, j].varnames = {"x": varnames[i], "y": varnames[j]} results[j, i].varnames = {"x": varnames[j], "y": varnames[i]} return results
[docs] def plot_moran_facet( moran_matrix, figsize=(16, 12), scatter_bv_kwds=None, fitline_bv_kwds=None, scatter_glob_kwds=dict(color="#737373"), fitline_glob_kwds=None, ): """ Moran Facet visualization. A matrix containing bivariate Moran plots between all pairs of variables present in the ``moran_matrix`` dictionary. On the diagonal contains global Moran plot. Parameters ---------- moran_matrix : dict Dictionary of Moran_BV objects returned by Moran_BV_matrix figsize : tuple, optional Size of the figure. Default is (16,12) scatter_bv_kwds : keyword arguments, optional Keywords used for creating and designing the scatter points of off-diagonal Moran_BV plots. Default =None. fitline_bv_kwds : keyword arguments, optional Keywords used for creating and designing the moran fitline of off-diagonal Moran_BV plots. Default =None. scatter_glob_kwds : keyword arguments, optional Keywords used for creating and designing the scatter points of diagonal Moran plots. Default =None. fitline_glob_kwds : keyword arguments, optional Keywords used for creating and designing the moran fitline of diagonal Moran plots. Default =None. Returns ------- ax : matplotlib Axes instance Axes in which the figure is plotted """ try: from matplotlib import pyplot as plt except ImportError as err: raise ImportError( "matplotlib must be installed to plot the simulation." ) from err nrows = int(np.sqrt(len(moran_matrix))) + 1 ncols = nrows fig, axarr = plt.subplots(nrows, ncols, figsize=figsize, sharey=True, sharex=True) fig.suptitle("Moran Facet") for row in range(nrows): for col in range(ncols): if row == col: global_m = Moran( moran_matrix[row, (row + 1) % 4].zy, moran_matrix[row, (row + 1) % 4].w, ) _scatterplot( global_m, crit_value=None, ax=axarr[row, col], scatter_kwds=scatter_glob_kwds, fitline_kwds=fitline_glob_kwds, ) axarr[row, col].set_facecolor("#d9d9d9") else: _scatterplot( moran_matrix[row, col], bivariate=True, crit_value=None, ax=axarr[row, col], scatter_kwds=scatter_bv_kwds, fitline_kwds=fitline_bv_kwds, ) axarr[row, col].spines[["left", "right", "top", "bottom"]].set_visible( False ) if row == nrows - 1: axarr[row, col].set_xlabel( str(moran_matrix[(col + 1) % 4, col].varnames["x"]).format(col) ) axarr[row, col].spines["bottom"].set_visible(True) else: axarr[row, col].set_xlabel("") if col == 0: axarr[row, col].set_ylabel( ( "Spatial Lag of " + str(moran_matrix[row, (row + 1) % 4].varnames["y"]) ).format(row) ) axarr[row, col].spines["left"].set_visible(True) else: axarr[row, col].set_ylabel("") axarr[row, col].set_title("") plt.tight_layout() return axarr
[docs] class Moran_Rate(Moran): # noqa: N801 """ Adjusted Moran's I Global Autocorrelation Statistic for Rate Variables :cite:`Assuncao1999` Parameters ---------- e : array an event variable measured across n spatial units b : array a population-at-risk variable measured across n spatial units w : W | Graph spatial weights instance as W or Graph aligned with e and b adjusted : boolean whether or not Moran's I needs to be adjusted for rate variable transformation : {'R', 'B', 'D', 'U', 'V'} weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``), "V": variance-stabilizing. two_tailed : boolean If True (default), analytical p-values for Moran's I are two-tailed, otherwise they are one tailed. permutations : int number of random permutations for calculation of pseudo p_values Attributes ---------- y : array rate variable computed from parameters e and b if adjusted is True, y is standardized rates otherwise, y is raw rates z : array zero-mean, unit standard deviation normalized y w : W | Graph original w object permutations : int number of permutations I : float value of Moran's I EI : float expected value under normality assumption VI_norm : float variance of I under normality assumption seI_norm : float standard deviation of I under normality assumption z_norm : float z-value of I under normality assumption p_norm : float p-value of I under normality assumption VI_rand : float variance of I under randomization assumption seI_rand : float standard deviation of I under randomization assumption z_rand : float z-value of I under randomization assumption p_rand : float p-value of I under randomization assumption two_tailed : boolean If True, p_norm and p_rand are two-tailed p-values, otherwise they are one-tailed. sim : array (if permutations>0) vector of I values for permuted samples p_sim : array (if permutations>0) p-value based on permutations (one-sided) null: spatial randomness alternative: the observed I is extreme if it is either extremely greater or extremely lower than the values obtained from permutaitons EI_sim : float (if permutations>0) average value of I from permutations VI_sim : float (if permutations>0) variance of I from permutations seI_sim : float (if permutations>0) standard deviation of I under permutations. z_sim : float (if permutations>0) standardized I based on permutations p_z_sim : float (if permutations>0) p-value based on standard normal approximation from Examples -------- >>> import libpysal >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) >>> e = np.array(f.by_col('SID79')) >>> b = np.array(f.by_col('BIR79')) >>> from esda.moran import Moran_Rate >>> mi = Moran_Rate(e, b, w, two_tailed=False) >>> "%6.4f" % mi.I '0.1662' >>> "%6.4f" % mi.p_norm '0.0042' """
[docs] def __init__( self, e, b, w, adjusted=True, transformation="r", permutations=PERMUTATIONS, two_tailed=True, ): e = np.asarray(e).flatten() b = np.asarray(b).flatten() y = assuncao_rate(e, b) if adjusted else e * 1.0 / b Moran.__init__( self, y, w, transformation=transformation, permutations=permutations, two_tailed=two_tailed, )
[docs] @classmethod def by_col( cls, df, events, populations, w=None, inplace=False, pvalue="sim", outvals=None, swapname="", **stat_kws, ): """ Function to compute a Moran_Rate statistic on a dataframe Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column events : string or list of strings one or more names where events are stored populations : string or list of strings one or more names where the populations corresponding to the events are stored. If one population column is provided, it is used for all event columns. If more than one population column is provided but there is not a population for every event column, an exception will be raised. w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran_rate' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran_Rate statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran_Rate statistic **stat_kws : keyword arguments options to pass to the underlying statistic. For this, see the documentation for the Moran_Rate statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ if not inplace: new = df.copy() cls.by_col( new, events, populations, w=w, inplace=True, pvalue=pvalue, outvals=outvals, swapname=swapname, **stat_kws, ) return new if isinstance(events, str): events = [events] if isinstance(populations, str): populations = [populations] if len(populations) < len(events): populations = populations * len(events) if len(events) != len(populations): raise ValueError( "There is not a one-to-one matching between events and populations!" f"\nEvents: {events}\nPopulations: {populations}" ) adjusted = stat_kws.pop("adjusted", True) if isinstance(adjusted, bool): adjusted = [adjusted] * len(events) if swapname == "": swapname = cls.__name__.lower() rates = [ assuncao_rate(df[e], df[pop]) if adj else df[e].astype(float) / df[pop] for e, pop, adj in zip(events, populations, adjusted, strict=True) ] names = ["-".join((e, p)) for e, p in zip(events, populations, strict=True)] out_df = df.copy() rate_df = out_df.from_dict( dict(zip(names, rates, strict=True)) ) # trick to avoid importing pandas stat_df = _univariate_handler( rate_df, names, w=w, inplace=False, pvalue=pvalue, outvals=outvals, swapname=swapname, stat=Moran, # how would this get done w/super? **stat_kws, ) for col in stat_df.columns: df[col] = stat_df[col]
# -----------------------------------------------------------------------------# # Local Statistics # # -----------------------------------------------------------------------------#
[docs] class Moran_Local: # noqa: N801 """Local Moran Statistics. Parameters ---------- y : array (n,1), attribute array w : W | Graph spatial weights instance as W or Graph aligned with y transformation : {'R', 'B', 'D', 'U', 'V'} weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``), "V": variance-stabilizing. permutations : int number of random permutations for calculation of pseudo p_values geoda_quads : boolean (default=False) If True use GeoDa scheme: HH=1, LL=2, LH=3, HL=4 If False use PySAL Scheme: HH=1, LH=2, LL=3, HL=4 n_jobs : int Number of cores to be used in the conditional randomisation. If -1, all available cores are used. keep_simulations : Boolean (default=True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. island_weight: value to use as a weight for the "fake" neighbor for every island. If numpy.nan, will propagate to the final local statistic depending on the `stat_func`. If 0, then the lag is always zero for islands. Attributes ---------- y : array original variable w : W | Graph original w object z : array zero-mean, unit standard deviation normalized y permutations : int number of random permutations for calculation of pseudo p_values Is : array local Moran's I values q : array (if permutations>0) values indicate quandrant location 1 HH, 2 LH, 3 LL, 4 HL sim : array (permutations by n) (if permutations>0) I values for permuted samples p_sim : array (if permutations>0) p-values based on permutations (one-sided) null: spatial randomness alternative: the observed Ii is further away or extreme from the median of simulated values. It is either extremely high or extremely low in the distribution of simulated Is. EI_sim : array (if permutations>0) average values of local Is from permutations VI_sim : array (if permutations>0) variance of Is from permutations EI : array analytical expectation of Is under total permutation, from :cite:`Anselin1995`. Is the same at each site, and equal to the expectation of I itself when transformation='r'. We recommend using EI_sim, not EI, for analysis. This EI is only provided for reproducibility. VI : array analytical variance of Is under total permutation, from :cite:`Anselin1995`. Varies according only to cardinality. We recommend using VI_sim, not VI, for analysis. This VI is only provided for reproducibility. EIc : array analytical expectation of Is under conditional permutation, from :cite:`sokal1998local`. Varies strongly by site, since it conditions on z_i. We recommend using EI_sim, not EIc, for analysis. This EIc is only provided for reproducibility. VIc : array analytical variance of Is under conditional permutation, from :cite:`sokal1998local`. Varies strongly by site, since it conditions on z_i. We recommend using VI_sim, not VIc, for analysis. This VIc is only provided for reproducibility. seI_sim : array (if permutations>0) standard deviations of Is under permutations. z_sim : arrray (if permutations>0) standardized Is based on permutations p_z_sim : array (if permutations>0) p-values based on standard normal approximation from permutations (one-sided) for two-sided tests, these values should be multiplied by 2 n_jobs : int Number of cores to be used in the conditional randomisation. If -1, all available cores are used. keep_simulations : Boolean (default=True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. Notes ----- For technical details see :cite:`Anselin95`. Examples -------- >>> import libpysal >>> import numpy as np >>> np.random.seed(10) >>> w = libpysal.io.open(libpysal.examples.get_path("desmith.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("desmith.txt")) >>> y = np.array(f.by_col['z']) >>> from esda.moran import Moran_Local >>> lm = Moran_Local(y, w, transformation = "r", permutations = 99) >>> lm.q array([4, 4, 4, 2, 3, 3, 1, 4, 3, 3]) >>> lm.p_z_sim[0] 0.24669152541631179 >>> lm = Moran_Local(y, w, transformation = "r", permutations = 99, \ geoda_quads=True) >>> lm.q array([4, 4, 4, 3, 2, 2, 1, 4, 2, 2]) Note random components result is slightly different values across architectures so the results have been removed from doctests and will be moved into unittests that are conditional on architectures. """
[docs] def __init__( self, y, w, transformation="r", permutations=PERMUTATIONS, geoda_quads=False, n_jobs=1, keep_simulations=True, seed=None, island_weight=0, # noqa: ARG002 ): y = np.asarray(y).flatten() self.y = y n = len(y) self.n = n self.n_1 = n - 1 z = y - y.mean() # setting for floating point noise orig_settings = np.seterr() np.seterr(all="ignore") sy = y.std() z /= sy np.seterr(**orig_settings) self.z = z w = _transform(w, transformation) self.w = w self.permutations = permutations self.den = (z * z).sum() self.Is = self.__calc(self.w, self.z) self.geoda_quads = geoda_quads quads = [1, 2, 3, 4] if geoda_quads: quads = [1, 3, 2, 4] self.quads = quads self.__quads() self.__moments() if permutations: self.p_sim, self.rlisas = _crand_plus( z, w, self.Is, permutations, keep_simulations, n_jobs=n_jobs, stat_func=_moran_local_crand, seed=seed, ) self.sim = np.transpose(self.rlisas) if keep_simulations: sim = np.transpose(self.rlisas) above = sim >= self.Is larger = above.sum(0) low_extreme = (self.permutations - larger) < larger larger[low_extreme] = self.permutations - larger[low_extreme] self.p_sim = (larger + 1.0) / (permutations + 1.0) self.sim = sim self.EI_sim = self.sim.mean(axis=0) self.seI_sim = self.sim.std(axis=0) self.VI_sim = self.seI_sim * self.seI_sim self.z_sim = (self.Is - self.EI_sim) / self.seI_sim self.p_z_sim = stats.norm.sf(np.abs(self.z_sim)) else: self.sim = self.rlisas = None self.EI_sim = np.nan self.seI_sim = np.nan self.VI_sim = np.nan self.z_sim = np.nan self.p_z_sim = np.nan
def __calc(self, w, z): zl = _slag(w, z) return self.n_1 * self.z * zl / self.den def __quads(self): zl = _slag(self.w, self.z) zp = self.z > 0 lp = zl > 0 pp = zp * lp np = (1 - zp) * lp nn = (1 - zp) * (1 - lp) pn = zp * (1 - lp) q0, q1, q2, q3 = self.quads self.q = (q0 * pp) + (q1 * np) + (q2 * nn) + (q3 * pn) def __moments(self): W = self.w.sparse z = self.z simplefilter("always", sparse.SparseEfficiencyWarning) n = self.n m2 = (z * z).sum() / n wi = np.asarray(W.sum(axis=1)).flatten() wi2 = np.asarray(W.multiply(W).sum(axis=1)).flatten() # --------------------------------------------------------- # Conditional randomization null, Sokal 1998, Eqs. A7 & A8 # assume that division is as written, so that # a - b / (n - 1) means a - (b / (n-1)) # --------------------------------------------------------- expectation = -(z**2 * wi) / ((n - 1) * m2) var_term1 = (z / m2) ** 2 var_term2 = n / (n - 2) var_term3 = wi2 - (wi**2 / (n - 1)) var_term4 = m2 - (z**2 / (n - 1)) variance = var_term1 * var_term2 * var_term3 * var_term4 self.EIc = expectation self.VIc = variance # --------------------------------------------------------- # Total randomization null, Sokal 1998, Eqs. A3 & A4* # --------------------------------------------------------- m4 = z**4 / n b2 = m4 / m2**2 expectation = -wi / (n - 1) # assume that "avoiding identical subscripts" in :cite:`Anselin1995` # includes i==h and i==k, we can use the form due to # :cite:`sokal1998local` below. # wikh = _wikh_fast(W) # variance_anselin = (wi2 * (n - b2)/(n-1) # + 2*wikh*(2*b2 - n) / ((n-1)*(n-2)) # - wi**2/(n-1)**2) self.EI = expectation n1 = n - 1 self.VI = wi2 * (n - b2) / n1 self.VI += (wi**2 - wi2) * (2 * b2 - n) / (n1 * (n - 2)) self.VI -= (-wi / n1) ** 2 @property def _statistic(self): """More consistent hidden attribute to access ESDA statistics.""" return self.Is
[docs] @classmethod def by_col( cls, df, cols, w=None, inplace=False, pvalue="sim", outvals=None, **stat_kws ): """ Function to compute a Moran_Local statistic on a dataframe. Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column cols : string or list of string name or list of names of columns to use to compute the statistic w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran_local' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran_Local statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran_Local statistic **stat_kws : dict options to pass to the underlying statistic. For this, see the documentation for the Moran_Local statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ return _univariate_handler( df, cols, w=w, inplace=inplace, pvalue=pvalue, outvals=outvals, stat=cls, swapname=cls.__name__.lower(), **stat_kws, )
[docs] def get_cluster_labels(self, crit_value=0.05): """Return LISA cluster labels for each observation. Parameters ---------- crit_value : float, optional crititical significance value for statistical inference, by default 0.05 Returns ------- numpy.array an array of cluster labels aligned with the input data used to conduct the local Moran analysis """ return _get_cluster_labels(self, crit_value)
[docs] def explore(self, gdf, crit_value=0.05, **kwargs): """Create interactive map of LISA indicators Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis crit_value : float, optional critical value to determine statistical significance, by default 0.05 kwargs : dict, optional additional keyword arguments passed to the geopandas `explore` method Returns ------- Folium.Map interactive map with LISA clusters """ gdf = gdf.copy() gdf["Moran Cluster"] = self.get_cluster_labels(crit_value) return _viz_local_moran(self, gdf, crit_value, "explore", **kwargs)
[docs] def plot(self, gdf, crit_value=0.05, **kwargs): """Create static map of LISA indicators Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis crit_value : float, optional critical value to determine statistical significance, by default 0.05 kwargs : dict, optional additional keyword arguments passed to the geopandas `explore` method Returns ------- ax matplotlib axis """ gdf = gdf.copy() gdf["Moran Cluster"] = self.get_cluster_labels(crit_value) return _viz_local_moran(self, gdf, crit_value, "plot", **kwargs)
[docs] def plot_scatter( self, crit_value=0.05, ax=None, scatter_kwds=None, fitline_kwds=None, ): """ Plot a Moran scatterplot with optional coloring for significant points. Parameters ---------- crit_value : float, optional Critical value to determine statistical significance, by default 0.05. ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. scatter_kwds : dict, optional Additional keyword arguments for scatter plot, by default None. fitline_kwds : dict, optional Additional keyword arguments for fit line, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. """ return _scatterplot( self, crit_value=crit_value, ax=ax, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, )
[docs] def plot_combination( self, gdf, attribute, crit_value=0.05, region_column=None, mask=None, mask_color="#636363", quadrant=None, legend=True, scheme="Quantiles", cmap="YlGnBu", figsize=(15, 4), scatter_kwds=None, fitline_kwds=None, legend_kwds=None, ): """ Produce three-plot visualisation of Moran Scatteprlot, LISA cluster and Choropleth maps, with Local Moran region and quadrant masking Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis attribute : str Column name of attribute which should be depicted in Choropleth map. crit_value : float, optional critical value to determine statistical significance, by default 0.05 region_column: string, optional Column name containing mask region of interest, by default None mask: str, float, int, optional Identifier or name of the region to highlight, by default None Use the same dtype to specifiy as in original dataset. mask_color: str, optional Color of mask, by default '#636363'. quadrant : int, optional Quadrant 1-4 in scatterplot masking values in LISA cluster and Choropleth maps, by default None figsize: tuple, optional W, h of figure, by default (15,4) legend: boolean, optional If True, legend for maps will be depicted, by default True scheme: str, optional Name of mapclassify classifier to be used, by default 'Quantiles' cmap: str, optional Name of matplotlib colormap used for plotting the Choropleth. By default 'YlGnBu'. scatter_kwds : keyword arguments, optional Keywords used for creating and designing the scatter points, by default None. fitline_kwds : keyword arguments, optional Keywords used for creating and designing the moran fitline in the scatterplot, by default None. legend_kwds : dict Keyword arguments passed to geopandas.GeodataFrame.plot ``legend_kwds`` allowing repositioning of the legend in LISA cluster plot and choropleth. Returns ------- axs : array of Matplotlib axes """ return _plot_combination( self, gdf, attribute, crit_value=crit_value, region_column=region_column, mask=mask, mask_color=mask_color, quadrant=quadrant, legend=legend, scheme=scheme, cmap=cmap, figsize=figsize, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, legend_kwds=legend_kwds, )
[docs] class Moran_Local_BV: # noqa: N801 """Bivariate Local Moran Statistics. Parameters ---------- x : array x-axis variable y : array (n,1), wy will be on y axis w : W | Graph spatial weights instance as W or Graph aligned with y transformation : {'R', 'B', 'D', 'U', 'V'} weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``), "V": variance-stabilizing. permutations : int number of random permutations for calculation of pseudo p_values geoda_quads : boolean (default=False) If True use GeoDa scheme: HH=1, LL=2, LH=3, HL=4 If False use PySAL Scheme: HH=1, LH=2, LL=3, HL=4 njobs : int number of workers to use to compute the local statistic. keep_simulations : Boolean (default=True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. island_weight: value to use as a weight for the "fake" neighbor for every island. If numpy.nan, will propagate to the final local statistic depending on the `stat_func`. If 0, then the lag is always zero for islands. Attributes ---------- zx : array original x variable standardized by mean and std zy : array original y variable standardized by mean and std w : W | Graph original w object permutations : int number of random permutations for calculation of pseudo p_values Is : float value of Moran's I q : array (if permutations>0) values indicate quandrant location 1 HH, 2 LH, 3 LL, 4 HL sim : array (if permutations>0) vector of I values for permuted samples p_sim : array (if permutations>0) p-value based on permutations (one-sided) null: spatial randomness alternative: the observed Ii is further away or extreme from the median of simulated values. It is either extremelyi high or extremely low in the distribution of simulated Is. EI_sim : array (if permutations>0) average values of local Is from permutations VI_sim : array (if permutations>0) variance of Is from permutations seI_sim: array (if permutations>0) standard deviations of Is under permutations. z_sim : arrray (if permutations>0) standardized Is based on permutations p_z_sim: array (if permutations>0) p-values based on standard normal approximation from permutations (one-sided) for two-sided tests, these values should be multiplied by 2 Examples -------- >>> import libpysal >>> import numpy as np >>> np.random.seed(10) >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) >>> x = np.array(f.by_col['SIDR79']) >>> y = np.array(f.by_col['SIDR74']) >>> from esda.moran import Moran_Local_BV >>> lm =Moran_Local_BV(x, y, w, transformation = "r", \ permutations = 99) >>> lm.q[:10] array([3, 4, 3, 4, 2, 1, 4, 4, 2, 4]) >>> lm = Moran_Local_BV(x, y, w, transformation = "r", \ permutations = 99, geoda_quads=True) >>> lm.q[:10] array([2, 4, 2, 4, 3, 1, 4, 4, 3, 4]) Note random components result is slightly different values across architectures so the results have been removed from doctests and will be moved into unittests that are conditional on architectures. """
[docs] def __init__( self, x, y, w, transformation="r", permutations=PERMUTATIONS, geoda_quads=False, n_jobs=1, keep_simulations=True, seed=None, island_weight=0, # noqa: ARG002 ): x = np.asarray(x).flatten() y = np.asarray(y).flatten() self.y = y self.x = x n = len(y) assert len(y) == len(x), "x and y must have the same shape!" self.n = n self.n_1 = n - 1 zx = x - x.mean() zy = y - y.mean() # setting for floating point noise orig_settings = np.seterr() np.seterr(all="ignore") sx = x.std() zx /= sx sy = y.std() zy /= sy np.seterr(**orig_settings) self.zx = zx self.zy = zy w = _transform(w, transformation) self.w = w self.permutations = permutations self.den = (zx * zx).sum() self.Is = self.__calc() self.geoda_quads = geoda_quads quads = [1, 2, 3, 4] if geoda_quads: quads = [1, 3, 2, 4] self.quads = quads self.__quads() if permutations: self.p_sim, self.rlisas = _crand_plus( np.column_stack((zx, zy)), w, self.Is, permutations, keep_simulations, n_jobs=n_jobs, stat_func=_moran_local_bv_crand, seed=seed, ) self.sim = np.transpose(self.rlisas) if keep_simulations: sim = np.transpose(self.rlisas) above = sim >= self.Is larger = above.sum(0) low_extreme = (self.permutations - larger) < larger larger[low_extreme] = self.permutations - larger[low_extreme] self.p_sim = (larger + 1.0) / (permutations + 1.0) self.sim = sim self.EI_sim = sim.mean(axis=0) self.seI_sim = sim.std(axis=0) self.VI_sim = self.seI_sim * self.seI_sim self.z_sim = (self.Is - self.EI_sim) / self.seI_sim self.p_z_sim = stats.norm.sf(np.abs(self.z_sim))
def __calc(self): zly = _slag(self.w, self.zy) return self.n_1 * self.zx * zly / self.den def __quads(self): zl = _slag(self.w, self.zy) zp = self.zx > 0 lp = zl > 0 pp = zp * lp np = (1 - zp) * lp nn = (1 - zp) * (1 - lp) pn = zp * (1 - lp) q0, q1, q2, q3 = self.quads self.q = (q0 * pp) + (q1 * np) + (q2 * nn) + (q3 * pn) @property def _statistic(self): """More consistent hidden attribute to access ESDA statistics.""" return self.Is
[docs] @classmethod def by_col( cls, df, x, y=None, w=None, inplace=False, pvalue="sim", outvals=None, **stat_kws, ): """ Function to compute a Moran_Local_BV statistic on a dataframe Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column X : list of strings column name or list of column names to use as X values to compute the bivariate statistic. If no Y is provided, pairwise comparisons among these variates are used instead. Y : list of strings column name or list of column names to use as Y values to compute the bivariate statistic. if no Y is provided, pariwise comparisons among the X variates are used instead. w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran_local_bv' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran_Local_BV statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran_Local_BV statistic **stat_kws : dict options to pass to the underlying statistic. For this, see the documentation for the Moran_Local_BV statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ return _bivariate_handler( df, x, y=y, w=w, inplace=inplace, pvalue=pvalue, outvals=outvals, swapname=cls.__name__.lower(), stat=cls, **stat_kws, )
[docs] def get_cluster_labels(self, crit_value=0.05): """Return LISA cluster labels for each observation. Parameters ---------- crit_value : float, optional crititical significance value for statistical inference, by default 0.05 Returns ------- numpy.array an array of cluster labels aligned with the input data used to conduct the local Moran analysis """ return _get_cluster_labels(self, crit_value)
[docs] def explore(self, gdf, crit_value=0.05, **kwargs): """Create interactive map of LISA indicators Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis crit_value : float, optional critical value to determine statistical significance, by default 0.05 kwargs : dict, optional additional keyword arguments passed to the geopandas `explore` method Returns ------- Folium.Map interactive map with LISA clusters """ gdf = gdf.copy() gdf["Moran Cluster"] = self.get_cluster_labels(crit_value) return _viz_local_moran(self, gdf, crit_value, "explore", **kwargs)
[docs] def plot(self, gdf, crit_value=0.05, **kwargs): """Create static map of LISA indicators Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis crit_value : float, optional critical value to determine statistical significance, by default 0.05 kwargs : dict, optional additional keyword arguments passed to the geopandas `explore` method Returns ------- ax matplotlib axis """ gdf = gdf.copy() gdf["Moran Cluster"] = self.get_cluster_labels(crit_value) return _viz_local_moran(self, gdf, crit_value, "plot", **kwargs)
[docs] def plot_scatter( self, crit_value=0.05, ax=None, scatter_kwds=None, fitline_kwds=None, ): """ Plot a Moran scatterplot with optional coloring for significant points. Parameters ---------- crit_value : float, optional Critical value to determine statistical significance, by default 0.05. ax : matplotlib.axes.Axes, optional Pre-existing axes for the plot, by default None. scatter_kwds : dict, optional Additional keyword arguments for scatter plot, by default None. fitline_kwds : dict, optional Additional keyword arguments for fit line, by default None. Returns ------- matplotlib.axes.Axes Axes object with the Moran scatterplot. """ return _scatterplot( self, crit_value=crit_value, bivariate=True, ax=ax, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, )
[docs] def plot_combination( self, gdf, attribute, crit_value=0.05, region_column=None, mask=None, mask_color="#636363", quadrant=None, legend=True, scheme="Quantiles", cmap="YlGnBu", figsize=(15, 4), scatter_kwds=None, fitline_kwds=None, legend_kwds=None, ): """ Produce three-plot visualisation of Moran Scatteprlot, LISA cluster and Choropleth maps, with Local Moran region and quadrant masking Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe used to conduct the local Moran analysis attribute : str Column name of attribute which should be depicted in Choropleth map. crit_value : float, optional critical value to determine statistical significance, by default 0.05 region_column: string, optional Column name containing mask region of interest, by default None mask: str, float, int, optional Identifier or name of the region to highlight, by default None Use the same dtype to specifiy as in original dataset. mask_color: str, optional Color of mask, by default '#636363'. quadrant : int, optional Quadrant 1-4 in scatterplot masking values in LISA cluster and Choropleth maps, by default None figsize: tuple, optional W, h of figure, by default (15,4) legend: boolean, optional If True, legend for maps will be depicted, by default True scheme: str, optional Name of mapclassify classifier to be used, by default 'Quantiles' cmap: str, optional Name of matplotlib colormap used for plotting the Choropleth. By default 'YlGnBu'. scatter_kwds : keyword arguments, optional Keywords used for creating and designing the scatter points, by default None. fitline_kwds : keyword arguments, optional Keywords used for creating and designing the moran fitline in the scatterplot, by default None. legend_kwds : dict Keyword arguments passed to geopandas.GeodataFrame.plot ``legend_kwds`` allowing repositioning of the legend in LISA cluster plot and choropleth. Returns ------- axs : array of Matplotlib axes """ return _plot_combination( self, gdf, attribute, crit_value=crit_value, region_column=region_column, mask=mask, mask_color=mask_color, quadrant=quadrant, legend=legend, scheme=scheme, cmap=cmap, figsize=figsize, scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, legend_kwds=legend_kwds, )
[docs] class Moran_Local_Rate(Moran_Local): # noqa: N801 """ Adjusted Local Moran Statistics for Rate Variables :cite:`Assuncao1999`. Parameters ---------- e : array (n,1), an event variable across n spatial units b : array (n,1), a population-at-risk variable across n spatial units w : W | Graph spatial weights instance as W or Graph aligned with y adjusted : boolean whether or not local Moran statistics need to be adjusted for rate variable transformation : {'R', 'B', 'D', 'U', 'V'} weights transformation, default is row-standardized "r". Other options include "B": binary, "D": doubly-standardized, "O": restore original transformation (applicable only if ``w`` is passed as ``W``), "V": variance-stabilizing. permutations : int number of random permutations for calculation of pseudo p_values geoda_quads : boolean (default=False) If True use GeoDa scheme: HH=1, LL=2, LH=3, HL=4 If False use PySAL Scheme: HH=1, LH=2, LL=3, HL=4 njobs : int number of workers to use to compute the local statistic. keep_simulations : Boolean (default=True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. island_weight : float value to use as a weight for the "fake" neighbor for every island. If numpy.nan, will propagate to the final local statistic depending on the `stat_func`. If 0, then the lag is always zero for islands. Attributes ---------- y : array rate variables computed from parameters e and b if adjusted is True, y is standardized rates otherwise, y is raw rates z : array zero-mean, unit standard deviation normalized y w : W | Graph original w object permutations : int number of random permutations for calculation of pseudo p_values Is : float value of Local Moran's Ii q : array (if permutations>0) values indicate quandrant location 1 HH, 2 LH, 3 LL, 4 HL sim : array (if permutations>0) vector of I values for permuted samples p_sim : array (if permutations>0) p-value based on permutations (one-sided) null: spatial randomness alternative: the observed Ii is further away or extreme from the median of simulated Iis. It is either extremely high or extremely low in the distribution of simulated Is EI_sim : float (if permutations>0) average value of I from permutations VI_sim : float (if permutations>0) variance of I from permutations seI_sim : float (if permutations>0) standard deviation of I under permutations. z_sim : float (if permutations>0) standardized I based on permutations p_z_sim : float (if permutations>0) p-value based on standard normal approximation from permutations (one-sided) for two-sided tests, these values should be multiplied by 2 Examples -------- >>> import libpysal >>> import numpy as np >>> np.random.seed(10) >>> w = libpysal.io.open(libpysal.examples.get_path("sids2.gal")).read() >>> f = libpysal.io.open(libpysal.examples.get_path("sids2.dbf")) >>> e = np.array(f.by_col('SID79')) >>> b = np.array(f.by_col('BIR79')) >>> from esda.moran import Moran_Local_Rate >>> lm = Moran_Local_Rate(e, b, w, transformation="r", permutations=99) >>> lm.q[:10] array([2, 4, 3, 1, 2, 1, 1, 4, 2, 4]) >>> lm = Moran_Local_Rate( ... e, b, w, transformation = "r", permutations=99, geoda_quads=True ) >>> lm.q[:10] array([3, 4, 2, 1, 3, 1, 1, 4, 3, 4]) Note random components result is slightly different values across architectures so the results have been removed from doctests and will be moved into unittests that are conditional on architectures """
[docs] def __init__( self, e, b, w, adjusted=True, transformation="r", permutations=PERMUTATIONS, geoda_quads=False, n_jobs=1, keep_simulations=True, seed=None, island_weight=0, # noqa: ARG002 ): e = np.asarray(e).flatten() b = np.asarray(b).flatten() y = assuncao_rate(e, b) if adjusted else e * 1.0 / b Moran_Local.__init__( self, y, w, transformation=transformation, permutations=permutations, geoda_quads=geoda_quads, n_jobs=n_jobs, keep_simulations=keep_simulations, seed=seed, )
[docs] @classmethod def by_col( cls, df, events, populations, w=None, inplace=False, pvalue="sim", outvals=None, swapname="", **stat_kws, ): """ Function to compute a Moran_Local_Rate statistic on a dataframe Parameters ---------- df : pandas.DataFrame a pandas dataframe with a geometry column events : string or list of strings one or more names where events are stored populations : string or list of strings one or more names where the populations corresponding to the events are stored. If one population column is provided, it is used for all event columns. If more than one population column is provided but there is not a population for every event column, an exception will be raised. w : W | Graph spatial weights instance as W or Graph aligned with the dataframe. If not provided, this is searched for in the dataframe's metadata inplace : bool a boolean denoting whether to operate on the dataframe inplace or to return a series contaning the results of the computation. If operating inplace, the derived columns will be named 'column_moran_local_rate' pvalue : string a string denoting which pvalue should be returned. Refer to the the Moran_Local_Rate statistic's documentation for available p-values outvals : list of strings list of arbitrary attributes to return as columns from the Moran_Local_Rate statistic **stat_kws : dict options to pass to the underlying statistic. For this, see the documentation for the Moran_Local_Rate statistic. Returns -------- If inplace, None, and operation is conducted on dataframe in memory. Otherwise, returns a copy of the dataframe with the relevant columns attached. """ if not inplace: new = df.copy() cls.by_col( new, events, populations, w=w, inplace=True, pvalue=pvalue, outvals=outvals, swapname=swapname, **stat_kws, ) return new if isinstance(events, str): events = [events] if isinstance(populations, str): populations = [populations] if len(populations) < len(events): populations = populations * len(events) if len(events) != len(populations): raise ValueError( "There is not a one-to-one matching between events and populations!" f"\nEvents: {events}\nPopulations: {populations}" ) adjusted = stat_kws.pop("adjusted", True) if isinstance(adjusted, bool): adjusted = [adjusted] * len(events) if swapname == "": swapname = cls.__name__.lower() rates = [ assuncao_rate(df[e], df[pop]) if adj else df[e].astype(float) / df[pop] for e, pop, adj in zip(events, populations, adjusted, strict=True) ] names = ["-".join((e, p)) for e, p in zip(events, populations, strict=True)] out_df = df.copy() rate_df = out_df.from_dict( dict(zip(names, rates, strict=True)) ) # trick to avoid importing pandas _univariate_handler( rate_df, names, w=w, inplace=True, pvalue=pvalue, outvals=outvals, swapname=swapname, stat=Moran_Local, # how would this get done w/super? **stat_kws, ) for col in rate_df.columns: df[col] = rate_df[col]
def _viz_local_moran(moran_local, gdf, crit_value, method, **kwargs): """Common helper for local Moran's I vizualization Parameters ---------- moran_local : esda.Moran_Local a fitted local Moran class from the PySAL esda module gdf : geopandas.GeoDataFrame geodataframe used to create the Moran_Local class crit_value : float, optional critical value for determining statistical significance, by default 0.05 method : str {"explore", "plot"} GeoDataFrame method to be used kwargs : dict, optional additional keyword arguments are passed directly to the plotting method, by default None Returns ------- m | ax either folium.Map or maptlotlib.Axes """ try: from matplotlib import colors except ImportError as err: raise ImportError( "matplotlib library must be installed to use the vizualization feature" ) from err gdf = gdf.copy() gdf["Moran Cluster"] = moran_local.get_cluster_labels(crit_value) gdf["p-value"] = moran_local.p_sim x = gdf["Moran Cluster"].values y = np.unique(x) colors5_mpl = { "High-High": "#d7191c", "Low-High": "#89cff0", "Low-Low": "#2c7bb6", "High-Low": "#fdae61", "Insignificant": "lightgrey", } colors5 = [colors5_mpl[i] for i in y] # for mpl hmap = colors.ListedColormap(colors5) if "cmap" not in kwargs: kwargs["cmap"] = hmap return getattr(gdf[["Moran Cluster", "p-value", "geometry"]], method)( "Moran Cluster", **kwargs ) def _get_cluster_labels(moran_local, crit_value): gdf = pd.DataFrame() gdf["q"] = moran_local.q gdf["p_sim"] = moran_local.p_sim gdf["Moran Cluster"] = "Insignificant" gdf.loc[(gdf["p_sim"] < crit_value) & (gdf["q"] == 1), "Moran Cluster"] = ( "High-High" ) gdf.loc[(gdf["p_sim"] < crit_value) & (gdf["q"] == 2), "Moran Cluster"] = "Low-High" gdf.loc[(gdf["p_sim"] < crit_value) & (gdf["q"] == 3), "Moran Cluster"] = "Low-Low" gdf.loc[(gdf["p_sim"] < crit_value) & (gdf["q"] == 4), "Moran Cluster"] = "High-Low" return gdf["Moran Cluster"].values def _scatterplot( moran, crit_value=0.05, bivariate=False, ax=None, scatter_kwds=None, fitline_kwds=None, ): """Generates a Moran Local or Global Scatterplot. Parameters ---------- moran : Moran object An instance of a Moran or Moran_Local object. crit_value : float, optional The critical value for significance. Default is 0.05. ax : matplotlib.axes.Axes, optional The axes on which to draw the plot. If None, a new figure and axes are created. scatter_kwds : dict, optional Additional keyword arguments to pass to the scatter plot. fitline_kwds : dict, optional Additional keyword arguments to pass to the fit line plot. Returns ------- ax : matplotlib.axes.Axes The axes with the Moran Scatterplot. Raises ------ ImportError If matplotlib is not installed. """ try: from matplotlib import pyplot as plt except ImportError as err: raise ImportError( "matplotlib library must be installed to use the scatterplot feature" ) from err # to set default as an empty dictionary that is later filled with defaults if scatter_kwds is None: scatter_kwds = dict() if fitline_kwds is None: fitline_kwds = dict() if crit_value is not None: labels = _get_cluster_labels(moran, crit_value) # TODO: allow customization of colors in here and in plot and explore # TODO: in a way to keep them easily synced colors5_mpl = { "High-High": "#d7191c", "Low-High": "#89cff0", "Low-Low": "#2c7bb6", "High-Low": "#fdae61", "Insignificant": "lightgrey", } colors5 = [colors5_mpl[i] for i in labels] # for mpl # define customization scatter_kwds.setdefault("alpha", 0.6) fitline_kwds.setdefault("alpha", 0.9) if ax is None: _, ax = plt.subplots() ax.set_title("Moran Scatterplot") if bivariate: x = moran.zx lag = lag_spatial(moran.w, moran.zy) ax.set_xlabel("Attribute X") ax.set_ylabel("Spatial Lag of Y") else: x = moran.z lag = lag_spatial(moran.w, moran.z) ax.set_xlabel("Attribute") ax.set_ylabel("Spatial Lag") fit = stats.linregress( x, lag, ) # v- and hlines ax.axvline(0, alpha=0.5, color="k", linestyle="--") ax.axhline(0, alpha=0.5, color="k", linestyle="--") if crit_value is not None: fitline_kwds.setdefault("color", "k") scatter_kwds.setdefault("c", colors5) ax.plot(x, fit.intercept + fit.slope * x, **fitline_kwds) ax.scatter(x, lag, **scatter_kwds) else: scatter_kwds.setdefault("color", "#bababa") fitline_kwds.setdefault("color", "#d6604d") ax.plot(x, fit.intercept + fit.slope * x, **fitline_kwds) ax.scatter(x, lag, **scatter_kwds) ax.set_aspect("equal") return ax def _simulation_plot( moran, ax=None, legend=False, bivariate=False, fitline_kwds=None, **kwargs ): try: import seaborn as sns from matplotlib import pyplot as plt except ImportError as err: raise ImportError( "matplotlib and seaborn must be installed to plot the simulation." ) from err # to set default as an empty dictionary that is later filled with defaults if fitline_kwds is None: fitline_kwds = dict() if ax is None: _, ax = plt.subplots() # plot distribution shade = kwargs.pop("shade", True) color = kwargs.pop("color", "#bababa") sns.kdeplot( moran.sim, fill=shade, color=color, ax=ax, label="Distribution of simulated Is", **kwargs, ) exp = moran.EI_sim if bivariate else moran.EI # customize plot fitline_kwds.setdefault("color", "#d6604d") ax.vlines(moran.I, 0, 1, **fitline_kwds, label="Moran's I") ax.vlines(exp, 0, 1, label="Expected I") ax.set_title("Reference Distribution") ax.set_xlabel(f"Moran's I: {moran.I:.2f}") if legend: ax.legend() return ax def _plot_combination( moran_loc, gdf, attribute, crit_value=0.05, region_column=None, mask=None, mask_color="#636363", quadrant=None, legend=True, scheme="Quantiles", cmap="YlGnBu", figsize=(15, 4), scatter_kwds=None, fitline_kwds=None, legend_kwds=None, ): """ Produce three-plot visualisation of Moran Scatteprlot, LISA cluster and Choropleth maps, with Local Moran region and quadrant masking Parameters ---------- moran_loc : esda.moran.Moran_Local or Moran_Local_BV instance Values of Moran's Local Autocorrelation Statistic gdf : geopandas dataframe The Dataframe containing information to plot the two maps. attribute : str Column name of attribute which should be depicted in Choropleth map. p : float, optional The p-value threshold for significance. Points and polygons will be colored by significance. Default = 0.05. region_column: string, optional Column name containing mask region of interest. Default = None mask: str, float, int, optional Identifier or name of the region to highlight. Default = None Use the same dtype to specifiy as in original dataset. mask_color: str, optional Color of mask. Default = '#636363' quadrant : int, optional Quadrant 1-4 in scatterplot masking values in LISA cluster and Choropleth maps. Default = None figsize: tuple, optional W, h of figure. Default = (15,4) legend: boolean, optional If True, legend for maps will be depicted. Default = True scheme: str, optional Name of PySAL classifier to be used. Default = 'Quantiles' cmap: str, optional Name of matplotlib colormap used for plotting the Choropleth. Default = 'YlGnBu' scatter_kwds : keyword arguments, optional Keywords used for creating and designing the scatter points. Default =None. fitline_kwds : keyword arguments, optional Keywords used for creating and designing the moran fitline in the scatterplot. Default =None. legend_kwds : dict Keyword arguments passed to geopandas.GeodataFrame.plot ``legend_kwds`` allowing repositioning of the legend in LISA cluster plot and choropleth. Returns ------- axs : array of Matplotlib axes """ try: from matplotlib import patches from matplotlib import pyplot as plt except ImportError as err: raise ImportError( "matplotlib library must be installed to use the scatterplot feature" ) from err _, axs = plt.subplots( 1, 3, figsize=figsize, subplot_kw={"aspect": "equal", "adjustable": "datalim"} ) # Moran Scatterplot moran_loc.plot_scatter( crit_value=crit_value, ax=axs[0], scatter_kwds=scatter_kwds, fitline_kwds=fitline_kwds, ) # Lisa cluster map moran_loc.plot( gdf, crit_value=crit_value, ax=axs[1], legend=legend, legend_kwds=legend_kwds, ) # Choropleth for attribute gdf.plot( column=attribute, scheme=scheme, cmap=cmap, legend=legend, legend_kwds=legend_kwds, ax=axs[2], alpha=1, ) axs[2].set_axis_off() axs[2].set_aspect("equal") # MASKING QUADRANT VALUES if quadrant is not None: # Quadrant masking in Scatterplot mask_angles = {1: 0, 2: 90, 3: 180, 4: 270} # rectangle angles # We don't want to change the axis data limits, so use the current ones xmin, xmax = axs[0].get_xlim() ymin, ymax = axs[0].get_ylim() # We are rotating, so we start from 0 degrees and # figured out the right dimensions for the rectangles for other angles mask_width = {1: abs(xmax), 2: abs(ymax), 3: abs(xmin), 4: abs(ymin)} mask_height = {1: abs(ymax), 2: abs(xmin), 3: abs(ymin), 4: abs(xmax)} axs[0].add_patch( patches.Rectangle( (0, 0), width=mask_width[quadrant], height=mask_height[quadrant], angle=mask_angles[quadrant], color="#E5E5E5", zorder=-1, alpha=0.8, ) ) # quadrant selection in maps non_quadrant = ~(moran_loc.q == quadrant) mask_quadrant = gdf[non_quadrant] df_quadrant = gdf.iloc[~non_quadrant] union2 = df_quadrant.dissolve().boundary # LISA Cluster mask and cluster boundary mask_quadrant.plot( scheme=scheme, color="white", ax=axs[1], alpha=0.7, zorder=1, ) union2.plot(linewidth=1, ax=axs[1], color="#E5E5E5") # CHOROPLETH MASK mask_quadrant.plot( scheme=scheme, color="white", ax=axs[2], alpha=0.7, zorder=1, ) union2.plot(linewidth=1, ax=axs[2], color="#E5E5E5") # REGION MASKING if region_column is not None: # masking inside axs[0] or Moran Scatterplot # enforce the same dtype of list and mask if not isinstance(mask[0], type(gdf[region_column].iloc[0])): warn( "Values in `mask` are not the same dtype as" + " values in `region_column`. Converting `mask` values" + " to dtype of first observation in region_column.", stacklevel=3, ) data_type = type(gdf[region_column][0].item()) mask = list(map(data_type, mask)) ix = gdf[region_column].isin(mask) if not ix.any(): raise ValueError( f"Specified values {mask} in `mask` not in `region_column`" ) df_mask = gdf[ix] x_mask = moran_loc.z[ix] y_mask = lag_spatial(moran_loc.w, moran_loc.z)[ix] axs[0].plot( x_mask, y_mask, color=mask_color, marker="o", markersize=14, alpha=0.8, linestyle="None", zorder=-1, ) # masking inside axs[1] or Lisa cluster map union = df_mask.dissolve().boundary union.plot(linewidth=2, ax=axs[1], color=mask_color) # masking inside axs[2] or Chloropleth union.plot(linewidth=2, ax=axs[2], color=mask_color) axs[0].spines[["right", "top"]].set_visible(False) axs[1].set_axis_off() return axs # -------------------------------------------------------------- # Conditional Randomization Moment Estimators # -------------------------------------------------------------- def _wikh_fast(W, sokal_correction=False): """ This computes the outer product of weights for each observation. .. math:: w_{i(kh)} = \\sum_{k \neq i}^n \\sum_{h \neq i}^n w_ik * w_hk If the :cite:`sokal1998local` version is used, then we also have h \neq k Since this version introduces a simplification in the expression where this function is called, the defaults should always return the version in the original :cite:`Anselin1995 paper`. Arguments --------- W : scipy sparse matrix a sparse matrix describing the spatial relationships between observations. sokal_correction : bool Whether to avoid self-neighbors in the summation of weights. If False (default), then the outer product of all weights for observation i are used, regardless if they are of the form w_hh or w_kk. Returns ------- (n,) length numpy.ndarray containing the result. """ return _wikh_numba( W.shape[0], *W.nonzero(), W.data, sokal_correction=sokal_correction ) @_njit(fastmath=True) def _wikh_numba(n, row, col, data, sokal_correction=False): """ This is a fast implementation of the wi(kh) function from :cite:`Anselin1995`. This uses numpy to compute the outer product of each observation's weights, after removing the w_ii entry. Then, the sum of the outer product is taken. If the sokal correction is requested, the trace of the outer product matrix is removed from the result. """ result = np.empty((n,), dtype=data.dtype) ixs = np.arange(n) for i in ixs: # all weights that are not the self weight row_no_i = data[(row == i) & (col != i)] # compute the pairwise product pairwise_product = np.outer(row_no_i, row_no_i) # get the sum overall (wik*wih) result[i] = pairwise_product.sum() if sokal_correction: # minus the diagonal (wik*wih when k==h) result[i] -= np.trace(pairwise_product) return result / 2 def _wikh_slow(W, sokal_correction=False): """ This is a slow implementation of the wi(kh) function from :cite:`Anselin1995` This does three nested for-loops over n, doing the literal operations stated by the expression. """ W = W.toarray() (n, n) = W.shape result = np.empty((n,)) # for each observation for i in range(n): acc = 0 # we need the product wik for k in range(n): # excluding wii * wih if i == k: continue # and wij for h in range(n): # excluding wik * wii if i == h: continue if sokal_correction and h == k: # excluding wih * wih continue acc += W[i, k] * W[i, h] result[i] = acc return result / 2 # -------------------------------------------------------------- # Conditional Randomization Function Implementations # -------------------------------------------------------------- @_njit(fastmath=True) def _moran_local_bv_crand(i, z, permuted_ids, weights_i, scaling): self_weight = weights_i[0] other_weights = weights_i[1:] zx = z[:, 0] zy = z[:, 1] zyi, zyrand = _prepare_univariate(i, zy, permuted_ids, other_weights) return zx[i] * (zyrand @ other_weights + self_weight * zyi) * scaling @_njit(fastmath=True) def _moran_local_crand(i, z, permuted_ids, weights_i, scaling): self_weight = weights_i[0] other_weights = weights_i[1:] zi, zrand = _prepare_univariate(i, z, permuted_ids, other_weights) return zi * (zrand @ other_weights + self_weight * zi) * scaling