"""
Methods for identifying space-time interaction in spatio-temporal event
data.
"""
__author__ = (
"Eli Knaap <eknaap@sdsu.edu>",
"Nicholas Malizia <nmalizia@asu.edu>",
"Sergio J. Rey <srey@sdsu.edu>",
"Philip Stephens <philip.stephens@asu.edu",
)
__all__ = [
"SpaceTimeEvents",
"knox",
"mantel",
"jacquez",
"modified_knox",
"Knox",
"KnoxLocal",
]
import os
from datetime import date
from warnings import warn
import geopandas as gpd
import libpysal as lps
import numpy as np
import pandas
import scipy.stats as stats
from libpysal import cg
from pandas.api.types import is_numeric_dtype
from scipy.spatial import KDTree
from scipy.stats import hypergeom, poisson
from shapely.geometry import LineString
[docs]
class SpaceTimeEvents:
"""
Method for reformatting event data stored in a shapefile for use in
calculating metrics of spatio-temporal interaction.
Parameters
----------
path : string
the path to the appropriate shapefile, including the
file name and extension
time : string
column header in the DBF file indicating the column
containing the time stamp.
infer_timestamp : bool, optional
if the column containing the timestamp is formatted as
calendar dates, try to coerce them into Python datetime
objects (the default is False).
Attributes
----------
n : int
number of events.
x : array
(n, 1), array of the x coordinates for the events.
y : array
(n, 1), array of the y coordinates for the events.
t : array
(n, 1), array of the temporal coordinates for the events.
space : array
(n, 2), array of the spatial coordinates (x,y) for the
events.
time : array
(n, 2), array of the temporal coordinates (t,1) for the
events, the second column is a vector of ones.
Examples
--------
Read in the example shapefile data, ensuring to omit the file
extension. In order to successfully create the event data the .dbf file
associated with the shapefile should have a column of values that are a
timestamp for the events. This timestamp may be a numerical value
or a date. Date inference was added in version 1.6.
>>> import libpysal as lps
>>> path = lps.examples.get_path("burkitt.shp")
>>> from pointpats import SpaceTimeEvents
Create an instance of SpaceTimeEvents from a shapefile, where the
temporal information is stored in a column named "T".
>>> events = SpaceTimeEvents(path,'T')
See how many events are in the instance.
>>> events.n
188
Check the spatial coordinates of the first event.
>>> events.space[0]
array([300., 302.])
Check the time of the first event.
>>> events.t[0]
array([413.])
Calculate the time difference between the first two events.
>>> events.t[1] - events.t[0]
array([59.])
New, in 1.6, date support:
Now, create an instance of SpaceTimeEvents from a shapefile, where the
temporal information is stored in a column named "DATE".
>>> events = SpaceTimeEvents(path,'DATE')
See how many events are in the instance.
>>> events.n
188
Check the spatial coordinates of the first event.
>>> events.space[0]
array([300., 302.])
Check the time of the first event. Note that this value is equivalent to
413 days after January 1, 1900.
>>> events.t[0][0]
datetime.date(1901, 2, 16)
Calculate the time difference between the first two events.
>>> (events.t[1][0] - events.t[0][0]).days
59
"""
[docs]
def __init__(self, path, time_col, infer_timestamp=False):
shp = lps.io.open(path)
head, tail = os.path.split(path)
dbf_tail = tail.split(".")[0] + ".dbf"
dbf = lps.io.open(lps.examples.get_path(dbf_tail))
# extract the spatial coordinates from the shapefile
x = [coords[0] for coords in shp]
y = [coords[1] for coords in shp]
self.n = n = len(shp)
x = np.array(x)
y = np.array(y)
self.x = np.reshape(x, (n, 1))
self.y = np.reshape(y, (n, 1))
self.space = np.hstack((self.x, self.y))
# extract the temporal information from the database
if infer_timestamp:
col = dbf.by_col(time_col)
if isinstance(col[0], date):
day1 = min(col)
col = [(d - day1).days for d in col]
t = np.array(col)
else:
print(
"Unable to parse your time column as Python datetime \
objects, proceeding as integers."
)
t = np.array(col)
else:
t = np.array(dbf.by_col(time_col))
line = np.ones((n, 1))
self.t = np.reshape(t, (n, 1))
self.time = np.hstack((self.t, line))
# close open objects
dbf.close()
shp.close()
def knox(s_coords, t_coords, delta, tau, permutations=99, debug=False):
"""
Knox test for spatio-temporal interaction. :cite:`Knox:1964`
Parameters
----------
s_coords : array
(n, 2), spatial coordinates.
t_coords : array
(n, 1), temporal coordinates.
delta : float
threshold for proximity in space.
tau : float
threshold for proximity in time.
permutations : int, optional
the number of permutations used to establish pseudo-
significance (the default is 99).
debug : bool, optional
if true, debugging information is printed (the default is
False).
Returns
-------
knox_result : dictionary
contains the statistic (stat) for the test and the
associated p-value (pvalue).
stat : float
value of the knox test for the dataset.
pvalue : float
pseudo p-value associated with the statistic.
counts : int
count of space time neighbors.
Examples
--------
>>> import numpy as np
>>> import libpysal as lps
>>> from pointpats import SpaceTimeEvents, knox
Read in the example data and create an instance of SpaceTimeEvents.
>>> path = lps.examples.get_path("burkitt.shp")
>>> events = SpaceTimeEvents(path,'T')
Set the random seed generator. This is used by the permutation based
inference to replicate the pseudo-significance of our example results -
the end-user will normally omit this step.
>>> np.random.seed(100)
Run the Knox test with distance and time thresholds of 20 and 5,
respectively. This counts the events that are closer than 20 units in
space, and 5 units in time.
>>> result = knox(events.space, events.t, delta=20, tau=5, permutations=99)
Next, we examine the results. First, we call the statistic from the
results dictionary. This reports that there are 13 events close
in both space and time, according to our threshold definitions.
>>> result['stat'] == 13
True
Next, we look at the pseudo-significance of this value, calculated by
permuting the timestamps and rerunning the statistics. In this case,
the results indicate there is likely no space-time interaction between
the events.
>>> print("%2.2f"%result['pvalue'])
0.17
"""
warn("This function is deprecated. Use Knox", DeprecationWarning, stacklevel=2)
# Do a kdtree on space first as the number of ties (identical points) is
# likely to be lower for space than time.
kd_s = cg.KDTree(s_coords)
neigh_s = kd_s.query_pairs(delta)
tau2 = tau * tau
ids = np.array(list(neigh_s))
# For the neighboring pairs in space, determine which are also time
# neighbors
d_t = (t_coords[ids[:, 0]] - t_coords[ids[:, 1]]) ** 2
n_st = sum(d_t <= tau2)
knox_result = {"stat": n_st[0]}
if permutations:
joint = np.zeros((permutations, 1), int)
for p in range(permutations):
np.random.shuffle(t_coords)
d_t = (t_coords[ids[:, 0]] - t_coords[ids[:, 1]]) ** 2
joint[p] = np.sum(d_t <= tau2)
larger = sum(joint >= n_st[0])
if (permutations - larger) < larger:
larger = permutations - larger
p_sim = (larger + 1.0) / (permutations + 1.0)
knox_result["pvalue"] = p_sim
return knox_result
[docs]
def mantel(
s_coords, t_coords, permutations=99, scon=1.0, spow=-1.0, tcon=1.0, tpow=-1.0
):
"""
Standardized Mantel test for spatio-temporal interaction. :cite:`Mantel:1967`
Parameters
----------
s_coords : array
(n, 2), spatial coordinates.
t_coords : array
(n, 1), temporal coordinates.
permutations : int, optional
the number of permutations used to establish pseudo-
significance (the default is 99).
scon : float, optional
constant added to spatial distances (the default is 1.0).
spow : float, optional
value for power transformation for spatial distances
(the default is -1.0).
tcon : float, optional
constant added to temporal distances (the default is 1.0).
tpow : float, optional
value for power transformation for temporal distances
(the default is -1.0).
Returns
-------
mantel_result : dictionary
contains the statistic (stat) for the test and the
associated p-value (pvalue).
stat : float
value of the knox test for the dataset.
pvalue : float
pseudo p-value associated with the statistic.
Examples
--------
>>> import numpy as np
>>> import libpysal as lps
>>> from pointpats import SpaceTimeEvents, mantel
Read in the example data and create an instance of SpaceTimeEvents.
>>> path = lps.examples.get_path("burkitt.shp")
>>> events = SpaceTimeEvents(path,'T')
Set the random seed generator. This is used by the permutation based
inference to replicate the pseudo-significance of our example results -
the end-user will normally omit this step.
>>> np.random.seed(100)
The standardized Mantel test is a measure of matrix correlation between
the spatial and temporal distance matrices of the event dataset. The
following example runs the standardized Mantel test without a constant
or transformation; however, as recommended by :cite:`Mantel:1967`, these
should be added by the user. This can be done by adjusting the constant
and power parameters.
>>> result = mantel(events.space, events.t, 99, scon=1.0, spow=-1.0, tcon=1.0, tpow=-1.0)
Next, we examine the result of the test.
>>> print("%6.6f"%result['stat'])
0.048368
Finally, we look at the pseudo-significance of this value, calculated by
permuting the timestamps and rerunning the statistic for each of the 99
permutations. According to these parameters, the results indicate
space-time interaction between the events.
>>> print("%2.2f"%result['pvalue'])
0.01
"""
t = t_coords
s = s_coords
n = len(t)
# calculate the spatial and temporal distance matrices for the events
distmat = cg.distance_matrix(s)
timemat = cg.distance_matrix(t)
# calculate the transformed standardized statistic
timevec = (timemat[np.tril_indices(timemat.shape[0], k=-1)] + tcon) ** tpow
distvec = (distmat[np.tril_indices(distmat.shape[0], k=-1)] + scon) ** spow
stat = stats.pearsonr(timevec, distvec)[0].sum()
# return the results (if no inference)
if not permutations:
return stat
# loop for generating a random distribution to assess significance
dist = []
for _i in range(permutations):
trand = _shuffle_matrix(timemat, np.arange(n))
timevec = (trand[np.tril_indices(trand.shape[0], k=-1)] + tcon) ** tpow
m = stats.pearsonr(timevec, distvec)[0].sum()
dist.append(m)
# establish the pseudo significance of the observed statistic
distribution = np.array(dist)
greater = np.ma.masked_greater_equal(distribution, stat)
count = np.ma.count_masked(greater)
pvalue = (count + 1.0) / (permutations + 1.0)
# report the results
mantel_result = {"stat": stat, "pvalue": pvalue}
return mantel_result
[docs]
def jacquez(s_coords, t_coords, k, permutations=99):
"""
Jacquez k nearest neighbors test for spatio-temporal interaction.
:cite:`Jacquez:1996`
Parameters
----------
s_coords : array
(n, 2), spatial coordinates.
t_coords : array
(n, 1), temporal coordinates.
k : int
the number of nearest neighbors to be searched.
permutations : int, optional
the number of permutations used to establish pseudo-
significance (the default is 99).
Returns
-------
jacquez_result : dictionary
contains the statistic (stat) for the test and the
associated p-value (pvalue).
stat : float
value of the Jacquez k nearest neighbors test for the
dataset.
pvalue : float
p-value associated with the statistic (normally
distributed with k-1 df).
Examples
--------
>>> import numpy as np
>>> import libpysal as lps
>>> from pointpats import SpaceTimeEvents, jacquez
Read in the example data and create an instance of SpaceTimeEvents.
>>> path = lps.examples.get_path("burkitt.shp")
>>> events = SpaceTimeEvents(path,'T')
The Jacquez test counts the number of events that are k nearest
neighbors in both time and space. The following runs the Jacquez test
on the example data and reports the resulting statistic. In this case,
there are 12 instances where events are nearest neighbors in both space
and time.
# turning off as kdtree changes from scipy < 0.12 return 13
>>> np.random.seed(100)
>>> result = jacquez(events.space, events.t ,k=3,permutations=99)
>>> print(result['stat'])
12
The significance of this can be assessed by calling the p-
value from the results dictionary, as shown below. Again, no
space-time interaction is observed.
>>> result['pvalue'] < 0.01
False
"""
time = t_coords
space = s_coords
n = len(time)
# calculate the nearest neighbors in space and time separately
knnt = lps.weights.KNN.from_array(time, k)
knns = lps.weights.KNN.from_array(space, k)
nnt = knnt.neighbors
nns = knns.neighbors
knn_sum = 0
# determine which events are nearest neighbors in both space and time
for i in range(n):
t_neighbors = nnt[i]
s_neighbors = nns[i]
check = set(t_neighbors)
inter = check.intersection(s_neighbors)
count = len(inter)
knn_sum += count
stat = knn_sum
# return the results (if no inference)
if not permutations:
return stat
# loop for generating a random distribution to assess significance
dist = []
for _p in range(permutations):
j = 0
trand = np.random.permutation(time)
knnt = lps.weights.KNN.from_array(trand, k)
nnt = knnt.neighbors
for i in range(n):
t_neighbors = nnt[i]
s_neighbors = nns[i]
check = set(t_neighbors)
inter = check.intersection(s_neighbors)
count = len(inter)
j += count
dist.append(j)
# establish the pseudo significance of the observed statistic
distribution = np.array(dist)
greater = np.ma.masked_greater_equal(distribution, stat)
count = np.ma.count_masked(greater)
pvalue = (count + 1.0) / (permutations + 1.0)
# report the results
jacquez_result = {"stat": stat, "pvalue": pvalue}
return jacquez_result
[docs]
def modified_knox(s_coords, t_coords, delta, tau, permutations=99):
"""
Baker's modified Knox test for spatio-temporal interaction.
:cite:`Baker:2004`
Parameters
----------
s_coords : array
(n, 2), spatial coordinates.
t_coords : array
(n, 1), temporal coordinates.
delta : float
threshold for proximity in space.
tau : float
threshold for proximity in time.
permutations : int, optional
the number of permutations used to establish pseudo-
significance (the default is 99).
Returns
-------
modknox_result : dictionary
contains the statistic (stat) for the test and the
associated p-value (pvalue).
stat : float
value of the modified knox test for the dataset.
pvalue : float
pseudo p-value associated with the statistic.
Examples
--------
>>> import numpy as np
>>> import libpysal as lps
>>> from pointpats import SpaceTimeEvents, modified_knox
Read in the example data and create an instance of SpaceTimeEvents.
>>> path = lps.examples.get_path("burkitt.shp")
>>> events = SpaceTimeEvents(path, 'T')
Set the random seed generator. This is used by the permutation based
inference to replicate the pseudo-significance of our example results -
the end-user will normally omit this step.
>>> np.random.seed(100)
Run the modified Knox test with distance and time thresholds of 20 and 5,
respectively. This counts the events that are closer than 20 units in
space, and 5 units in time.
>>> result = modified_knox(events.space, events.t, delta=20, tau=5, permutations=99)
Next, we examine the results. First, we call the statistic from the
results dictionary. This reports the difference between the observed
and expected Knox statistic.
>>> print("%2.8f" % result['stat'])
2.81016043
Next, we look at the pseudo-significance of this value, calculated by
permuting the timestamps and rerunning the statistics. In this case,
the results indicate there is likely no space-time interaction.
>>> print("%2.2f" % result['pvalue'])
0.11
"""
s = s_coords
t = t_coords
n = len(t)
# calculate the spatial and temporal distance matrices for the events
sdistmat = cg.distance_matrix(s)
tdistmat = cg.distance_matrix(t)
# identify events within thresholds
spacmat = np.ones((n, n))
spacbin = sdistmat <= delta
spacmat = spacmat * spacbin
timemat = np.ones((n, n))
timebin = tdistmat <= tau
timemat = timemat * timebin
# calculate the observed (original) statistic
knoxmat = timemat * spacmat
obsstat = knoxmat.sum() - n
# calculate the expectated value
ssumvec = np.reshape((spacbin.sum(axis=0) - 1), (n, 1))
tsumvec = np.reshape((timebin.sum(axis=0) - 1), (n, 1))
expstat = (ssumvec * tsumvec).sum()
# calculate the modified stat
stat = (obsstat - (expstat / (n - 1.0))) / 2.0
# return results (if no inference)
if not permutations:
return stat
distribution = []
# loop for generating a random distribution to assess significance
for _p in range(permutations):
rtdistmat = _shuffle_matrix(tdistmat, list(range(n)))
timemat = np.ones((n, n))
timebin = rtdistmat <= tau
timemat = timemat * timebin
# calculate the observed knox again
knoxmat = timemat * spacmat
obsstat = knoxmat.sum() - n
# calculate the expectated value again
ssumvec = np.reshape((spacbin.sum(axis=0) - 1), (n, 1))
tsumvec = np.reshape((timebin.sum(axis=0) - 1), (n, 1))
expstat = (ssumvec * tsumvec).sum()
# calculate the modified stat
tempstat = (obsstat - (expstat / (n - 1.0))) / 2.0
distribution.append(tempstat)
# establish the pseudo significance of the observed statistic
distribution = np.array(distribution)
greater = np.ma.masked_greater_equal(distribution, stat)
count = np.ma.count_masked(greater)
pvalue = (count + 1.0) / (permutations + 1.0)
# return results
modknox_result = {"stat": stat, "pvalue": pvalue}
return modknox_result
def _shuffle_matrix(X, ids):
"""
Random permutation of rows and columns of a matrix
Parameters
----------
X : array
(k, k), array to be permuted.
ids : array
range (k, ).
Returns
-------
: array
(k, k) with rows and columns randomly shuffled.
"""
np.random.shuffle(ids)
return X[ids, :][:, ids]
def _knox(s_coords, t_coords, delta, tau, permutations=99, keep=False):
"""
Parameters
==========
s_coords: array-like
spatial coordinates
t_coords: array-like
temporal coordinates
delta: float
distance threshold
tau: float
temporal threshold
permutations: int
number of permutations
keep: bool
return values from permutations (default False)
Returns
=======
summary table observed
summary table h0
ns
nt
nst
n
p-value
"""
n = s_coords.shape[0]
stree = KDTree(s_coords)
ttree = KDTree(t_coords)
sneighbors = stree.query_ball_tree(stree, r=delta)
sneighbors = [
set(neighbors).difference([i]) for i, neighbors in enumerate(sneighbors)
]
tneighbors = ttree.query_ball_tree(ttree, r=tau)
tneighbors = [
set(neighbors).difference([i]) for i, neighbors in enumerate(tneighbors)
]
# number of spatial neighbor pairs
ns = np.array([len(neighbors) for neighbors in sneighbors]) # by i
NS = ns.sum() / 2 # total
# number of temporal neigbor pairs
nt = np.array([len(neighbors) for neighbors in tneighbors])
NT = nt.sum() / 2
# s-t neighbors (list of lists)
stneighbors = [
sneighbors_i.intersection(tneighbors_i)
for sneighbors_i, tneighbors_i in zip(sneighbors, tneighbors)
]
# number of spatio-temporal neigbor pairs
nst = np.array([len(neighbors) for neighbors in stneighbors])
NST = nst.sum() / 2
all_pairs = []
pairs = {}
for i, neigh in enumerate(stneighbors):
if len(neigh) > 0:
all_pairs.extend([sorted((i, j)) for j in neigh])
st_pairs = {tuple(l) for l in all_pairs}
# ENST: expected number of spatio-temporal neighbors under HO
pairs = n * (n - 1) / 2
ENST = NS * NT / pairs
# observed table
observed = np.zeros((2, 2))
NS_ = NS - NST # spatial only
NT_ = NT - NST # temporal only
observed[0, 0] = NST
observed[0, 1] = NS_
observed[1, 0] = NT_
observed[1, 1] = pairs - NST - NS_ - NT_
# expected table
expected = np.zeros((2, 2))
expected[0, 0] = ENST
expected[0, 1] = NS - expected[0, 0]
expected[1, 0] = NT - expected[0, 0]
expected[1, 1] = pairs - expected.sum()
p_value_poisson = 1 - poisson.cdf(NST, expected[0, 0])
results = {}
results["ns"] = ns.sum() / 2
results["nt"] = nt.sum() / 2
results["nst"] = nst.sum() / 2
results["pairs"] = pairs
results["expected"] = expected
results["observed"] = observed
results["p_value_poisson"] = p_value_poisson
results["st_pairs"] = st_pairs
results["sneighbors"] = sneighbors
results["tneighbors"] = tneighbors
results["stneighbors"] = stneighbors
if permutations > 0:
exceedence = 0
n = len(sneighbors)
ids = np.arange(n)
if keep:
ST = np.zeros(permutations)
for perm in range(permutations):
st = 0
rids = np.random.permutation(ids)
for i in range(n):
ri = rids[i]
tni = tneighbors[ri]
rjs = [rids[j] for j in sneighbors[i]]
sti = [j for j in rjs if j in tni]
st += len(sti)
st /= 2
if st >= results["nst"]:
exceedence += 1
if keep:
ST[perm] = st
results["p_value_sim"] = (exceedence + 1) / (permutations + 1)
results["exceedence"] = exceedence
if keep:
results["st_perm"] = ST
return results
[docs]
class Knox:
"""Global Knox statistic for space-time interactions
Parameters
----------
s_coords: array-like
spatial coordinates of point events
t_coords: array-like
temporal coordinates of point events (floats or ints, not dateTime)
delta: float
spatial threshold defining distance below which pairs are spatial
neighbors
tau: float
temporal threshold defining distance below which pairs are temporal
neighbors
permutations: int
number of random permutations for inference
keep: bool
whether to store realized values of the statistic under permutations
Attributes
----------
s_coords: array-like
spatial coordinates of point events
t_coords: array-like
temporal coordinates of point events (floats or ints, not dateTime)
delta: float
spatial threshold defining distance below which pairs are spatial
neighbors
tau: float
temporal threshold defining distance below which pairs are temporal
neighbors
permutations: int
number of random permutations for inference
keep: bool
whether to store realized values of the statistic under permutations
nst: int
number of space-time pairs
p_poisson: float
Analytical p-value under Poisson assumption
p_sim: float
Pseudo p-value based on random permutations
expected: array
Two-by-two array with expected counts under the null of no space-time
interactions. [[NST, NS_], [NT_, N__]] where NST is the expected number
of space-time pairs, NS_ is the expected number of spatial (but not also
temporal) pairs, NT_ is the number of expected temporal (but not also
spatial pairs), N__ is the number of pairs that are neighor spatial or
temporal neighbors.
observed: array
Same structure as expected with the observed pair classifications
sim: array
Global statistics from permutations (if keep=True)
Notes
-----
Technical details can be found in :cite:`Rogerson:2001`
Examples
--------
>>> import libpysal
>>> path = libpysal.examples.get_path('burkitt.shp')
>>> import geopandas
>>> df = geopandas.read_file(path)
>>> from pointpats.spacetime import Knox
>>> global_knox = Knox(df[['X', 'Y']], df[["T"]], delta=20, tau=5)
>>> global_knox.statistic_
13
>>> global_knox.p_poisson
0.14624558197140414
>>> global_knox.observed
array([[1.300e+01, 3.416e+03],
[3.900e+01, 1.411e+04]])
>>> global_knox.expected
array([[1.01438161e+01, 3.41885618e+03],
[4.18561839e+01, 1.41071438e+04]])
>>> hasattr(global_knox, 'sim')
False
>>> import numpy
>>> numpy.random.seed(12345)
>>> global_knox = Knox(df[['X', 'Y']], df[["T"]], delta=20, tau=5, keep=True)
>>> hasattr(global_knox, 'sim')
True
>>> global_knox.p_sim
0.21
"""
[docs]
def __init__(self, s_coords, t_coords, delta, tau, permutations=99, keep=False):
self.s_coords = s_coords
self.t_coords = t_coords
self.delta = delta
self.tau = tau
self.permutations = permutations
self.keep = keep
results = _knox(s_coords, t_coords, delta, tau, permutations, keep)
self.nst = int(results["nst"])
if permutations > 0:
self.p_sim = results["p_value_sim"]
if keep:
self.sim = results["st_perm"]
self.p_poisson = results["p_value_poisson"]
self.observed = results["observed"]
self.expected = results["expected"]
self.statistic_ = self.nst
[docs]
@classmethod
def from_dataframe(
cls,
dataframe,
time_col: int,
delta: int,
tau: int,
permutations: int = 99,
keep: bool = False,
):
"""Compute a Knox statistic from a dataframe of Point observations
Parameters
----------
dataframe : geopandas.GeoDataFrame
geodataframe holding observations. Should be in a projected coordinate
system with geometries stored as Point
time_col : str
column in the dataframe storing the time values (integer coordinate)
for each observation. For example if the observations are stored with
a timestamp, the time_col should be converted to a series of integers
representing, e.g. hours, days, seconds, etc.
delta : int
delta parameter defining the spatial neighbor threshold measured in the
same units as the dataframe CRS
tau : int
tau parameter defining the temporal neihgbor threshold (in the units
measured by `time_col`)
permutations : int, optional
permutations to use for computation inference, by default 99
keep : bool
whether to store realized values of the statistic under permutations
Returns
-------
pointpats.spacetime.Knox
a fitted Knox class
"""
s_coords, t_coords = _spacetime_points_to_arrays(dataframe, time_col)
return cls(s_coords, dataframe[[time_col]], delta, tau, permutations, keep)
def _knox_local(s_coords, t_coords, delta, tau, permutations=99, keep=False):
"""
Parameters
----------
s_coords: array (n,2)
spatial coordinates
t_coords: array (n,1)
temporal coordinates
delta: numeric
spatial threshold distance for neighbor relation
tau: numeric
temporal threshold distance for neighbor relation
permutations: int
number of permutations for conditional randomization inference
keep: bool
whether to store local statistics from the permtuations
"""
# think about passing in the global object as an option to avoid recomputing the trees
res = _knox(s_coords, t_coords, delta, tau, permutations=permutations)
sneighbors = {i: tuple(ns) for i, ns in enumerate(res["sneighbors"])}
tneighbors = {i: tuple(nt) for i, nt in enumerate(res["tneighbors"])}
n = len(s_coords)
ids = np.arange(n)
res["nsti"] = np.zeros(n) # number of observed st_pairs for observation i
res["nsi"] = [len(r) for r in res["sneighbors"]]
res["nti"] = [len(r) for r in res["tneighbors"]]
for pair in res["st_pairs"]:
i, j = pair
res["nsti"][i] += 1
res["nsti"][j] += 1
nsti = res["nsti"]
nsi = res["nsi"]
res["nti"]
# rather than do n*permutations, we reuse the permutations
# ensuring that each permutation is conditional on a focal unit i
# for each of the permutations we loop over i and swap labels between the
# label at index i in the current permutation and the label at the index
# assigned i in the permutation.
if permutations > 0:
exceedence = np.zeros(n)
if keep:
STI = np.zeros((n, permutations))
for perm in range(permutations):
rids = np.random.permutation(ids)
for i in range(n):
rids_i = rids.copy()
# set observed value of focal unit i
# swap with value assigned to rids[i]
# example
# 0 1 2 (ids)
# 2 0 1 (rids)
# i=0
# 0 2 1 (rids_i)
# i=1
# 2 1 0 (rids_i)
# i=2
# 1 0 2 (rids_i)
rids_i[rids == i] = rids[i]
rids_i[i] = i
# calculate local stat
rjs = [rids_i[j] for j in sneighbors[i]]
tni = tneighbors[i]
sti = [j for j in rjs if j in tni]
count = len(sti)
if count >= res["nsti"][i]:
exceedence[i] += 1
if keep:
STI[i, perm] = count
if keep:
res["sti_perm"] = STI
res["exceedence_pvalue"] = (exceedence + 1) / (permutations + 1)
res["exceedences"] = exceedence
# analytical inference
ntjis = [len(r) for r in res["tneighbors"]]
n1 = n - 1
hg_pvalues = [
1 - hypergeom.cdf(nsti[i] - 1, n1, ntjis, nsi[i]).mean() for i in range(n)
]
res["hg_pvalues"] = np.array(hg_pvalues)
# identification of hot spots
adjlist = []
for i, j in res["st_pairs"]:
adjlist.append([i, j])
adjlist.append([j, i])
adjlist = pandas.DataFrame(data=adjlist, columns=["focal", "neighbor"])
adjlist = adjlist.sort_values(by=["focal", "neighbor"])
adjlist.reset_index(drop=True, inplace=True)
adjlist["orientation"] = ""
for index, row in adjlist.iterrows():
focal = row["focal"]
neighbor = row["neighbor"]
ft = t_coords[focal]
nt = t_coords[neighbor]
if ft < nt:
adjlist.iloc[index, 2] = "lead"
elif ft > nt:
adjlist.iloc[index, 2] = "lag"
else:
adjlist.iloc[index, 2] = "coincident"
res["stadjlist"] = adjlist
return res
[docs]
class KnoxLocal:
"""Local Knox statistics for space-time interactions
Parameters
----------
s_coords: array (nx2)
spatial coordinates of point events
t_coords: array (nx1)
temporal coordinates of point events (floats or ints, not dateTime)
delta: float
spatial threshold defining distance below which pairs are spatial
neighbors
tau: float
temporal threshold defining distance below which pairs are temporal
neighbors
permutations: int
number of random permutations for inference
keep: bool
whether to store realized values of the statistic under permutations
conditional: bool
whether to include conditional permutation inference
crit: float
signifcance level for local statistics
crs: str (optional)
coordinate reference system string for s_coords
Attributes
----------
s_coords: array (nx2)
spatial coordinates of point events
t_coords: array (nx1)
temporal coordinates of point events (floats or ints, not dateTime)
delta: float
spatial threshold defining distance below which pairs are spatial
neighbors
tau: float
temporal threshold defining distance below which pairs are temporal
neighbors
permutations: int
number of random permutations for inference
keep: bool
whether to store realized values of the statistic under permutations
nst: int
number of space-time pairs (global)
p_poisson: float
Analytical p-value under Poisson assumption (global)
p_sim: float
Pseudo p-value based on random permutations (global)
expected: array
Two-by-two array with expected counts under the null of no space-time
interactions. [[NST, NS_], [NT_, N__]] where NST is the expected number
of space-time pairs, NS_ is the expected number of spatial (but not also
temporal) pairs, NT_ is the number of expected temporal (but not also
spatial pairs), N__ is the number of pairs that are neighor spatial or
temporal neighbors. (global)
observed: array
Same structure as expected with the observed pair classifications (global)
sim: array
Global statistics from permutations (if keep=True and keep=True) (global)
p_sims: array
Local psuedo p-values from conditional permutations (if permutations>0)
sims: array
Local statistics from conditional permutations (if keep=True and
permutations>0)
nsti: array
Local statistics
p_hypergeom: array
Analytical p-values based on hypergeometric distribution
Notes
-----
Technical details can be found in :cite:`Rogerson:2001`. The conditional
permutation inference is unique to pysal.pointpats.
Examples
-------
>>> import libpysal
>>> path = libpysal.examples.get_path('burkitt.shp')
>>> import geopandas
>>> df = geopandas.read_file(path)
>>> from pointpats.spacetime import Knox
>>> import numpy
>>> numpy.random.seed(12345)
>>> local_knox = KnoxLocal(df[['X', 'Y']], df[["T"]], delta=20, tau=5, keep=True)
>>> local_knox.statistic_.shape
(188,)
>>> lres = local_knox
>>> gt0ids = numpy.where(lres.nsti>0)
>>> gt0ids # doctest: +NORMALIZE_WHITESPACE
(array([ 25, 26, 30, 31, 35, 36, 41, 42, 46, 47, 51, 52, 102,
103, 116, 118, 122, 123, 137, 138, 139, 140, 158, 159, 162, 163]),)
>>> lres.nsti[gt0ids]
array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1.])
>>> lres.p_hypergeom[gt0ids]
array([0.1348993 , 0.14220663, 0.07335085, 0.08400282, 0.1494317 ,
0.21524073, 0.0175806 , 0.04599869, 0.17523687, 0.18209188,
0.19111321, 0.16830444, 0.13734428, 0.14703242, 0.06796364,
0.03192559, 0.13734428, 0.17523687, 0.12998154, 0.1933476 ,
0.13244507, 0.13244507, 0.12502644, 0.14703242, 0.12502644,
0.12998154])
>>> lres.p_sims[gt0ids]
array([0.3 , 0.33, 0.11, 0.17, 0.3 , 0.42, 0.06, 0.06, 0.33, 0.34, 0.36,
0.38, 0.3 , 0.29, 0.41, 0.19, 0.31, 0.39, 0.18, 0.39, 0.48, 0.41,
0.22, 0.41, 0.39, 0.32])
"""
[docs]
def __init__(
self,
s_coords,
t_coords,
delta,
tau,
permutations=99,
keep=False,
crit=0.05,
crs=None,
ids=None,
):
if not isinstance(t_coords, np.ndarray):
raise ValueError("t_coords should be numpy.ndarray type")
if not isinstance(s_coords, np.ndarray):
raise ValueError("s_coords should be numpy.ndarray type")
n_s, k = s_coords.shape
rangeids = list(range(n_s))
if k < 2:
raise ValueError("s_coords shape required to be nx2")
n_t, k = t_coords.shape
if n_s != n_t:
raise ValueError("t_coords and s_coords need to be same length")
if ids is not None:
if len(ids) != n_s:
raise ValueError("`ids` must have the same length as the inputs")
else:
ids = rangeids
self._ids = ids
self.s_coords = s_coords
self.t_coords = t_coords
self.delta = delta
self.tau = tau
self.permutations = permutations
self.keep = keep
self.crit = crit
results = _knox_local(s_coords, t_coords, delta, tau, permutations, keep)
self.adjlist = results["stadjlist"]
self.nst = int(results["nst"])
if permutations > 0:
self.p_sim = results["p_value_sim"]
if keep:
self.sim = results["sti_perm"]
self.p_poisson = results["p_value_poisson"]
self.observed = results["observed"]
self.expected = results["expected"]
self.p_hypergeom = results["hg_pvalues"]
if permutations > 0:
self.p_sims = results["exceedence_pvalue"]
if keep:
self.sims = results["sti_perm"]
self.nsti = results["nsti"]
# self.hotspots = results["hotspots"]
self._crs = crs
self.statistic_ = self.nsti
self._id_map = dict(zip(rangeids, self._ids))
self.adjlist["focal"] = self.adjlist["focal"].replace(self._id_map)
self.adjlist["neighbor"] = self.adjlist["neighbor"].replace(self._id_map)
# reconstruct df
geom = gpd.points_from_xy(self.s_coords[:, 0], self.s_coords[:, 1])
_gdf = gpd.GeoDataFrame(geometry=geom, crs=self._crs, index=self._ids)
_gdf["time"] = self.t_coords
if permutations > 0:
_gdf["p_sim"] = self.p_sims
_gdf["p_hypergeom"] = self.p_hypergeom
self._gdf_static = _gdf
@property
def _gdf(self):
return self._gdf_static.copy()
[docs]
@classmethod
def from_dataframe(
cls,
dataframe,
time_col: str,
delta: int,
tau: int,
permutations: int = 99,
keep: bool = False,
):
"""Compute a set of local Knox statistics from a dataframe of Point observations
Parameters
----------
dataframe : geopandas.GeoDataFrame
dataframe holding observations. Should be in a projected coordinate system
with geometries stored as Points
time_col : str
column in the dataframe storing the time values (integer coordinate)
for each observation. For example if the observations are stored with
a timestamp, the time_col should be converted to a series of integers
representing, e.g. hours, days, seconds, etc.
delta : int
delta parameter defining the spatial neighbor threshold measured in the
same units as the dataframe CRS
tau : int
tau parameter defining the temporal neihgbor threshold (in the units
measured by `time_col`)
permutations : int, optional
permutations to use for computational inference, by default 99
keep : bool
whether to store realized values of the statistic under permutations
Returns
-------
pointpats.spacetime.LocalKnox
a fitted KnoxLocal class
"""
s_coords, t_coords = _spacetime_points_to_arrays(dataframe, time_col)
return cls(
s_coords,
t_coords,
delta,
tau,
permutations,
keep,
crs=dataframe.crs,
ids=dataframe.index.values,
)
[docs]
def hotspots(self, crit=0.05, inference="permutation"):
"""Table of significant space-time clusters that define local hotspots.
Parameters
----------
crit : float, optional
critical value for statistical inference, by default 0.05
inference : str, optional
whether p-values should use permutation or analutical inference, by default
"permutation"
Returns
-------
pandas.DataFrame
dataframe of significant hotspots
Raises
------
ValueError
if `inference` is not in {'permutation', 'analytic'}
"""
if inference == "permutation":
if not hasattr(self, "p_sim"):
warn(
"Pseudo-p values not availalable. Permutation-based p-values require "
"fitting the KnoxLocal class using `permutations` set to a large "
"number. Using analytic p-values instead"
)
col = "p_hypergeom"
else:
col = "p_sim"
elif inference == "analytic":
col = "p_hypergeom"
else:
raise ValueError("inference must be either `permutation` or `analytic`")
# determine hot spots
pdf_sig = self._gdf[self._gdf[col] <= crit][[col, "time"]].rename(
columns={col: "pvalue", "time": "focal_time"}
)
pdf_sig = pdf_sig.merge(
self.adjlist, how="inner", left_index=True, right_on="focal"
).reset_index(drop=True)
return pdf_sig.copy()
[docs]
def plot(
self,
colors: dict = {"focal": "red", "neighbor": "yellow", "nonsig": "grey"},
crit: float = 0.05,
inference: str = "permutation",
point_kwargs: dict = None,
plot_edges: bool = True,
edge_color: str = "black",
edge_kwargs: dict = None,
ax=None,
):
"""plot hotspots
Parameters
----------
colors : dict, optional
mapping of colors to hotspot values, by default
{"focal": "red", "neighbor": "yellow", "nonsig": "grey"}
crit : float, optional
critical value for assessing statistical sgifnicance, by default 0.05
inference : str, optional
whether to use permutation or analytic inference, by default "permutation"
point_kwargs : dict, optional
additional keyword arguments passsed to point plot, by default None
plot_edges : bool, optional
whether to plot edges connecting members of the same hotspot subgraph,
by default True
edge_color : str, optional
color of edges when plot_edges is True, by default 'black'
edge_kwargs : dict, optional
additional keyword arguments passsed to edge plot, by default None
ax : matplotlib.axes.Axes, optional
axes object on which to create the plot, by default None
Returns
-------
matplotlib.axes.Axes
plot of local space-time hotspots
"""
if point_kwargs is None:
point_kwargs = dict()
if edge_kwargs is None:
edge_kwargs = dict()
g = self._gdf.copy()
g["color"] = colors["nonsig"]
g["pvalue"] = self.p_hypergeom
if inference == "permutation":
if not hasattr(self, "p_sims"):
warn(
"Pseudo-p values not availalable. Permutation-based p-values require "
"fitting the KnoxLocal class using `permutations` set to a large "
"number. Using analytic p-values instead"
)
g["pvalue"] = self.p_hypergeom
else:
g["pvalue"] = self.p_sims
elif inference == "analytic":
g["pvalue"] = self.p_hypergeom
else:
raise ValueError("inference must be either `permutation` or `analytic`")
mask = g[g.pvalue <= crit].index.values
neighbors = self.adjlist[self.adjlist.focal.isin(mask)].neighbor.unique()
g.loc[neighbors, "color"] = colors["neighbor"]
g.loc[g.pvalue <= crit, "color"] = colors["focal"]
m = g[g.color == colors["nonsig"]].plot(
color=colors["nonsig"], ax=ax, **point_kwargs
)
g[g.color == colors["neighbor"]].plot(
ax=m, color=colors["neighbor"], **point_kwargs
)
g[g.color == colors["focal"]].plot(ax=m, color=colors["focal"], **point_kwargs)
if plot_edges:
# edges between hotspot and st-neighbors
ghs = self.hotspots(crit=crit, inference=inference)
ghs = ghs.dropna()
origins = g.loc[ghs.focal].geometry
destinations = g.loc[ghs.neighbor].geometry
ods = zip(origins, destinations)
lines = gpd.GeoSeries([LineString(od) for od in ods], crs=g.crs)
lines.plot(ax=m, color=edge_color, **edge_kwargs)
return m
[docs]
def explore(
self,
crit: float = 0.05,
inference: str = "permutation",
radius: int = 5,
style_kwds: dict = None,
tiles: str = "CartoDB Positron",
plot_edges: bool = True,
edge_weight: int = 2,
edge_color: str = "black",
colors: dict = {"focal": "red", "neighbor": "yellow", "nonsig": "grey"},
):
"""Interactive plotting for space-time hotspots.
Parameters
----------
crit : float, optional
critical value for statistical inference, by default 0.05
inference : str, optional
which p-value to use for determining hotspots. Either "permutation" or
"analytic", by default "permutation"
radius : int, optional
radius of the circlemarker plotted by folium, passed to
geopandas.GeoDataFrame.explore style_kwds as a convenience. Ignored if
`style_kwds` is passed directly, by default 5
style_kwds : dict, optional
additional style kewords passed to GeoDataFrame.explore, by default None
tiles : str, optional
tileset passed to GeoDataFrame.explore `tiles` argument, by default
"CartoDB Positron"
plot_edges : bool, optional
Whether to include lines drawn between members of a singnificant hotspot, by
default True
edge_weight : int, optional
line thickness when `plot_edges=True`, by default 2
edge_color : str, optional
color of line when `plot_edges=True`, by default "black"
colors : dict, optional
mapping of observation type to color,
by default {"focal": "red", "neighbor": "yellow", "nonsig": "grey"}
Returns
-------
folium.Map
an interactive map showing locally-significant spacetime hotspots
"""
if style_kwds is None:
style_kwds = {"radius": radius}
g = self._gdf.copy()
g["color"] = colors["nonsig"]
if inference == "permutation":
if not hasattr(self, "p_sims"):
warn(
"Pseudo-p values not availalable. Permutation-based p-values require "
"fitting the KnoxLocal class using `permutations` set to a large "
"number. Using analytic p-values instead"
)
g["pvalue"] = self.p_hypergeom
else:
g["pvalue"] = self.p_sims
elif inference == "analytic":
g["pvalue"] = self.p_hypergeom
else:
raise ValueError("inference must be either `permutation` or `analytic`")
mask = g[g.pvalue <= crit].index.values
neighbors = self.adjlist[self.adjlist.focal.isin(mask)].neighbor.unique()
# this is clunky, but enforces plotting order so significance is prioritized
g.loc[neighbors, "color"] = colors["neighbor"]
g.loc[g.pvalue <= crit, "color"] = colors["focal"]
nbs = self.adjlist.groupby("focal").agg(list)["neighbor"]
g = g.reset_index().merge(nbs, left_on="index", right_index=True, how="left")
m = g[g.color == colors["nonsig"]].explore(
color="grey", style_kwds=style_kwds, tiles=tiles
)
blues = g[g.color == colors["neighbor"]]
if blues.shape[0] == 0:
warn("empty neighbor set.")
else:
m = blues.explore(m=m, color=colors["neighbor"], style_kwds=style_kwds)
m = g[g.color == colors["focal"]].explore(
m=m, color=colors["focal"], style_kwds=style_kwds
)
if plot_edges:
# edges between hotspot and st-neighbors
g = g.set_index("index")
ghs = self.hotspots(crit=crit, inference=inference)
ghs = ghs.dropna()
origins = g.loc[ghs.focal].geometry
destinations = g.loc[ghs.neighbor].geometry
ods = zip(origins, destinations)
lines = gpd.GeoSeries([LineString(od) for od in ods], crs=g.crs)
lines.explore(m=m, color=edge_color, style_kwds={"weight": edge_weight})
return m
def _gdfhs(self, crit=0.05, inference="permutation"):
# merge df with self.hotspots
return gpd.GeoDataFrame(
self._gdf.merge(
self.hotspots(crit=crit, inference=inference),
left_index=True,
right_on="focal",
)
)
def _spacetime_points_to_arrays(dataframe, time_col):
"""convert long-form geodataframe into arrays for kdtree
Parameters
----------
dataframe : geopandas.GeoDataFrame
geodataframe with point geometries
time_col : str
name of the column on dataframe that stores time values
Returns
-------
tuple
two numpy arrays holding spatial coodinates s_coords (n,2)
and temporal coordinates t_coords (n,1)
"""
if dataframe.crs is None:
warn(
"There is no CRS set on the dataframe. The KDTree will assume coordinates "
"are stored in Euclidean distances"
)
else:
if dataframe.crs.is_geographic:
raise ValueError(
"The input dataframe must be in a projected coordinate system."
)
assert dataframe.geom_type.unique().tolist() == [
"Point"
], "The Knox statistic is only defined for Point geometries"
# kdtree wont operate on datetime
if is_numeric_dtype(dataframe[time_col].dtype) is False:
raise ValueError(
"The time values must be stored as "
f"a numeric dtype but the column {time_col} is stored as "
f"{dataframe[time_col].dtype}"
)
s_coords = np.vstack((dataframe.geometry.x.values, dataframe.geometry.y.values)).T
t_coords = np.vstack(dataframe[time_col].values)
return s_coords, t_coords