Source code for esda.join_counts_local_bv

import numpy as np
import pandas as pd
from libpysal import weights
from sklearn.base import BaseEstimator

from esda.crand import _prepare_bivariate, _prepare_univariate
from esda.crand import crand as _crand_plus
from esda.crand import njit as _njit

PERMUTATIONS = 999


[docs] class Join_Counts_Local_BV(BaseEstimator): """Univariate Local Join Count Statistic"""
[docs] def __init__( self, connectivity=None, permutations=PERMUTATIONS, n_jobs=1, keep_simulations=True, seed=None, island_weight=0, drop_islands=True, ): """ Initialize a Local_Join_Counts_BV estimator Parameters ---------- connectivity : scipy.sparse matrix object the connectivity structure describing the relationships between observed units. Need not be row-standardized. permutations : int number of random permutations for calculation of pseudo p_values n_jobs : int Number of cores to be used in the conditional randomisation. If -1, all available cores are used. keep_simulations : bool (default True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. island_weight: value to use as a weight for the "fake" neighbor for every island. If numpy.nan, will propagate to the final local statistic depending on the `stat_func`. If 0, then the lag is always zero for islands. drop_islands : bool (default True) Whether or not to preserve islands as entries in the adjacency list. By default, observations with no neighbors do not appear in the adjacency list. If islands are kept, they are coded as self-neighbors with zero weight. See ``libpysal.weights.to_adjlist()``. """ self.connectivity = connectivity self.permutations = permutations self.n_jobs = n_jobs self.keep_simulations = keep_simulations self.seed = seed self.island_weight = island_weight self.drop_islands = drop_islands
[docs] def fit(self, x, z, case="CLC", n_jobs=1, permutations=999): """ Parameters ---------- x : numpy.ndarray array containing binary (0/1) data z : numpy.ndarray array containing binary (0/1) data Returns ------- the fitted estimator. Notes ----- Technical details and derivations can be found in :cite:`AnselinLi2019`. Examples -------- >>> import libpysal >>> w = libpysal.weights.lat2W(4, 4) >>> x = np.ones(16) >>> x[0:8] = 0 >>> z = [0,1,0,1,1,1,1,1,0,0,1,1,0,0,1,1] >>> LJC_BV_C1 = Local_Join_Counts_BV(connectivity=w).fit(x, z, case="BJC") >>> LJC_BV_C2 = Local_Join_Counts_BV(connectivity=w).fit(x, z, case="CLC") >>> LJC_BV_C1.LJC >>> LJC_BV_C1.p_sim >>> LJC_BV_C2.LJC >>> LJC_BV_C2.p_sim Commpop data replicating GeoDa tutorial (Case 1) >>> import libpysal >>> import geopandas as gpd >>> commpop = gpd.read_file( ... "https://github.com/jeffcsauer/GSOC2020/raw/master/" ... "validation/data/commpop.gpkg" ... ) >>> w = libpysal.weights.Queen.from_dataframe(commpop) >>> LJC_BV_Case1 = Local_Join_Counts_BV( ... connectivity=w ... ).fit(commpop['popneg'], commpop['popplus'], case='BJC') >>> LJC_BV_Case1.LJC >>> LJC_BV_Case1.p_sim Guerry data replicating GeoDa tutorial (Case 2) >>> import libpysal >>> import geopandas as gpd >>> guerry = libpysal.examples.load_example('Guerry') >>> guerry_ds = gpd.read_file(guerry.get_path('Guerry.shp')) >>> guerry_ds['infq5'] = 0 >>> guerry_ds['donq5'] = 0 >>> guerry_ds.loc[(guerry_ds['Infants'] > 23574), 'infq5'] = 1 >>> guerry_ds.loc[(guerry_ds['Donatns'] > 10973), 'donq5'] = 1 >>> w = libpysal.weights.Queen.from_dataframe(guerry_ds) >>> LJC_BV_Case2 = Local_Join_Counts_BV( ... connectivity=w ... ).fit(guerry_ds['infq5'], guerry_ds['donq5'], case='CLC') >>> LJC_BV_Case2.LJC >>> LJC_BV_Case2.p_sim """ # Need to ensure that the np.array() are of dtype='float' for numba x = np.array(x, dtype="float") z = np.array(z, dtype="float") w = self.connectivity # Fill the diagonal with 0s w = weights.util.fill_diagonal(w, val=0) w.transform = "b" self.x = x self.z = z self.n = len(x) self.w = w self.case = case n_jobs = self.n_jobs self.LJC = self._statistic(x, z, w, case, self.drop_islands) if permutations: if case == "BJC": self.p_sim, self.rjoins = _crand_plus( z=np.column_stack((x, z)), w=self.w, observed=self.LJC, permutations=permutations, keep=True, n_jobs=n_jobs, stat_func=_ljc_bv_case1, island_weight=self.island_weight, ) # Set p-values for those with LJC of 0 to NaN self.p_sim[self.LJC == 0] = "NaN" elif case == "CLC": self.p_sim, self.rjoins = _crand_plus( z=np.column_stack((x, z)), w=self.w, observed=self.LJC, permutations=permutations, keep=True, n_jobs=n_jobs, stat_func=_ljc_bv_case2, island_weight=self.island_weight, ) # Set p-values for those with LJC of 0 to NaN self.p_sim[self.LJC == 0] = "NaN" else: raise NotImplementedError( f"The requested LJC method ({case}) is not currently supported!" ) return self
@staticmethod def _statistic(x, z, w, case, drop_islands): # Create adjacency list. Note that remove_symmetric=False - this is # different from the esda.Join_Counts() function. adj_list = w.to_adjlist(remove_symmetric=False, drop_islands=drop_islands) # First, set up a series that maps the values to the weights table zseries_x = pd.Series(x, index=w.id_order) zseries_z = pd.Series(z, index=w.id_order) # Map the values to the focal (i) values focal_x = zseries_x.loc[adj_list.focal].values focal_z = zseries_z.loc[adj_list.focal].values # Map the values to the neighbor (j) values neighbor_x = zseries_x.loc[adj_list.neighbor].values neighbor_z = zseries_z.loc[adj_list.neighbor].values if case == "BJC": BJC = ( (focal_x == 1) & (focal_z == 0) & (neighbor_x == 0) & (neighbor_z == 1) ) adj_list_BJC = pd.DataFrame( adj_list.focal.values, BJC.astype("uint8") ).reset_index() adj_list_BJC.columns = ["BJC", "ID"] adj_list_BJC = adj_list_BJC.groupby(by="ID").sum() return np.array(adj_list_BJC.BJC.values, dtype="float") elif case == "CLC": CLC = ( (focal_x == 1) & (focal_z == 1) & (neighbor_x == 1) & (neighbor_z == 1) ) adj_list_CLC = pd.DataFrame( adj_list.focal.values, CLC.astype("uint8") ).reset_index() adj_list_CLC.columns = ["CLC", "ID"] adj_list_CLC = adj_list_CLC.groupby(by="ID").sum() return np.array(adj_list_CLC.CLC.values, dtype="float") else: raise NotImplementedError( f"The requested LJC method ({case}) is not currently supported!" )
# -------------------------------------------------------------- # Conditional Randomization Function Implementations # -------------------------------------------------------------- # Note: scaling not used @_njit(fastmath=True) def _ljc_bv_case1(i, z, permuted_ids, weights_i, scaling): zx = z[:, 0] zy = z[:, 1] other_weights = weights_i[1:] zyi, zyrand = _prepare_univariate(i, zy, permuted_ids, other_weights) return zx[i] * (zyrand @ other_weights) @_njit(fastmath=True) def _ljc_bv_case2(i, z, permuted_ids, weights_i, scaling): zy = z[:, 1] other_weights = weights_i[1:] zxi, zxrand, zyi, zyrand = _prepare_bivariate(i, z, permuted_ids, other_weights) zf = zxrand * zyrand return zy[i] * (zf @ other_weights)