"""
A-DBSCAN implementation
"""
__author__ = "Dani Arribas-Bel <daniel.arribas.bel@gmail.com>"
import warnings
from collections import Counter
import numpy as np
import pandas
from libpysal.cg.alpha_shapes import alpha_shape_auto
from scipy.spatial import cKDTree
from sklearn.base import BaseEstimator as _BaseEstimator
from sklearn.base import ClusterMixin as _ClusterMixin
from sklearn.cluster import DBSCAN
from sklearn.neighbors import KNeighborsClassifier
__all__ = ["ADBSCAN", "remap_lbls", "ensemble", "get_cluster_boundary"]
[docs]
class ADBSCAN(_ClusterMixin, _BaseEstimator):
"""
A-DBSCAN, as introduced in :cite:`ab_gl_vm2020joue`.
A-DSBCAN is an extension of the original DBSCAN algorithm that creates an
ensemble of solutions generated by running DBSCAN on a random subset and
"extending" the solution to the rest of the sample through
nearest-neighbor regression.
See the original reference (:cite:`ab_gl_vm2020joue`) for more details or
the notebook guide for an illustration.
...
Parameters
----------
eps : float
The maximum distance between two samples for them to be considered
as in the same neighborhood.
min_samples : int
The number of samples (or total weight) in a neighborhood
for a point to be considered as a core point. This includes the
point itself.
algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, optional
The algorithm to be used by the NearestNeighbors module
to compute pointwise distances and find nearest neighbors.
See NearestNeighbors module documentation for details.
n_jobs : int
[Optional. Default=1] The number of parallel jobs to run. If -1, then
the number of jobs is set to the number of CPU cores.
pct_exact : float
[Optional. Default=0.1] Proportion of the entire dataset
used to calculate DBSCAN in each draw
reps : int
[Optional. Default=100] Number of random samples to draw in order to
build final solution
keep_solus : bool
[Optional. Default=False] If True, the `solus` and `solus_relabelled`
objects are kept, else it is deleted to save memory
pct_thr : float
[Optional. Default=0.9] Minimum proportion of replications that
a non-noise label need to be assigned to an observation for that
observation to be labelled as such
Attributes
----------
labels_ : array
[Only available after `fit`] Cluster labels for each point in the
dataset given to fit().
Noisy (if the proportion of the most common label is < pct_thr)
samples are given the label -1.
votes : DataFrame
[Only available after `fit`] Table indexed on `X.index` with
`labels_` under the `lbls` column, and the frequency across draws of
that label under `pct`
solus : DataFrame, shape = [n, reps]
[Only available after `fit`] Each solution of labels for every draw
solus_relabelled : DataFrame, shape = [n, reps]
[Only available after `fit`] Each solution of labels for
every draw, relabelled to be consistent across solutions
Examples
--------
>>> import pandas
>>> from esda.adbscan import ADBSCAN
>>> import numpy as np
>>> np.random.seed(10)
>>> db = pandas.DataFrame({'X': np.random.random(25), \
'Y': np.random.random(25) \
})
ADBSCAN can be run following scikit-learn like API as:
>>> np.random.seed(10)
>>> clusterer = ADBSCAN(0.03, 3, reps=10, keep_solus=True)
>>> _ = clusterer.fit(db)
>>> clusterer.labels_
array(['-1', '-1', '-1', '0', '-1', '-1', '-1', '0', '-1', '-1', '-1',
'-1', '-1', '-1', '0', '0', '0', '-1', '0', '-1', '0', '-1', '-1',
'-1', '-1'], dtype=object)
We can inspect the winning label for each observation, as well as the
proportion of votes:
>>> print(clusterer.votes.head().to_string())
lbls pct
0 -1 0.7
1 -1 0.5
2 -1 0.7
3 0 1.0
4 -1 0.7
If you have set the option to keep them, you can even inspect each
solution that makes up the ensemble:
>>> print(clusterer.solus.head().to_string())
rep-00 rep-01 rep-02 rep-03 rep-04 rep-05 rep-06 rep-07 rep-08 rep-09
0 0 1 1 0 1 0 0 0 1 0
1 1 1 1 1 0 1 0 1 1 1
2 0 1 1 0 0 1 0 0 1 0
3 0 1 1 0 0 1 1 1 0 0
4 0 1 1 1 0 1 0 1 0 1
If we select only one replication and the proportion of the entire dataset
that is sampled to 100%, we obtain a traditional DBSCAN:
>>> clusterer = ADBSCAN(0.2, 5, reps=1, pct_exact=1)
>>> np.random.seed(10)
>>> _ = clusterer.fit(db)
>>> clusterer.labels_
array(['0', '-1', '0', '0', '0', '-1', '-1', '0', '-1', '-1', '0', '-1',
'-1', '-1', '0', '0', '0', '-1', '0', '0', '0', '-1', '-1', '0',
'-1'], dtype=object)
"""
[docs]
def __init__(
self,
eps,
min_samples,
algorithm="auto",
n_jobs=1,
pct_exact=0.1,
reps=100,
keep_solus=False,
pct_thr=0.9,
):
self.eps = eps
self.min_samples = min_samples
self.algorithm = algorithm
self.reps = reps
self.n_jobs = n_jobs
self.pct_exact = pct_exact
self.pct_thr = pct_thr
self.keep_solus = keep_solus
[docs]
def fit(self, X, y=None, sample_weight=None, xy=["X", "Y"]): # noqa: ARG002
"""
Perform ADBSCAN clustering from fetaures
...
Parameters
----------
X : DataFrame
Features
sample_weight : Series, shape (n_samples,)
[Optional. Default=None] Weight of each sample, such
that a sample with a weight of at least ``min_samples``
is by itself a core sample; a sample with negative
weight may inhibit its eps-neighbor from being core.
Note that weights are absolute, and default to 1.
xy : list
[Default=`['X', 'Y']`] Ordered pair of names for XY
coordinates in `xys`
y : Ignored
"""
n = X.shape[0]
zfiller = len(str(self.reps))
solus = pandas.DataFrame(
np.zeros((X.shape[0], self.reps), dtype=str),
index=X.index,
columns=[f"rep-{str(i).zfill(zfiller)}" for i in range(self.reps)],
)
# Multi-core implementation of parallel draws
if (self.n_jobs == -1) or (self.n_jobs > 1):
# Set different parallel seeds!!!
warnings.warn(
"Multi-core implementation only works on relabelling solutions. "
"Execution of draws is still sequential.",
UserWarning,
stacklevel=2,
)
for i in range(self.reps):
pars = (
n,
X,
sample_weight,
xy,
self.pct_exact,
self.eps,
self.min_samples,
self.algorithm,
self.n_jobs,
)
lbls_pred = _one_draw(pars)
solus.iloc[:, i] = lbls_pred
else:
for i in range(self.reps):
pars = (
n,
X,
sample_weight,
xy,
self.pct_exact,
self.eps,
self.min_samples,
self.algorithm,
self.n_jobs,
)
lbls_pred = _one_draw(pars)
solus.iloc[:, i] = lbls_pred
solus_relabelled = remap_lbls(solus, X, xy=xy, n_jobs=self.n_jobs)
self.votes = ensemble(solus_relabelled)
lbls = self.votes["lbls"].values
lbl_type = type(solus.iloc[0, 0])
lbls[self.votes["pct"] < self.pct_thr] = lbl_type(-1)
self.labels_ = lbls
if not self.keep_solus:
del solus
del solus_relabelled
else:
self.solus = solus
self.solus_relabelled = solus_relabelled
return self
def _one_draw(pars):
n, X, sample_weight, xy, pct_exact, eps, min_samples, algorithm, n_jobs = pars
rids = np.arange(n)
np.random.shuffle(rids)
rids = rids[: int(n * pct_exact)]
X_thin = X.iloc[rids, :]
thin_sample_weight = None
if sample_weight is not None:
thin_sample_weight = sample_weight.iloc[rids]
min_samples = min_samples * pct_exact
min_samples = 1 if min_samples < 1 else int(np.floor(min_samples))
dbs = DBSCAN(
eps=eps,
min_samples=min_samples,
algorithm=algorithm,
n_jobs=n_jobs,
).fit(X_thin[xy], sample_weight=thin_sample_weight)
lbls_thin = pandas.Series(dbs.labels_.astype(str), index=X_thin.index)
NR = KNeighborsClassifier(n_neighbors=1)
NR.fit(X_thin[xy], lbls_thin)
lbls_pred = pandas.Series(NR.predict(X[xy]), index=X.index)
return lbls_pred
def remap_lbls(solus, xys, xy=["X", "Y"], n_jobs=1):
"""
Remap labels in solutions so they are comparable (same label
for same cluster)
...
Parameters
----------
solus : DataFrame
Table with labels for each point (row) and solution (column)
xys : DataFrame
Table including coordinates
xy : list
[Default=`['X', 'Y']`] Ordered pair of names for XY
coordinates in `xys`
n_jobs : int
[Optional. Default=1] The number of parallel jobs to run. If
-1, then the number of jobs is set to the number of CPU
cores.
Returns
-------
onel_solus : DataFrame
Table with original solutions remapped to consolidated
labels across all the columns
Examples
--------
>>> import pandas
>>> db = pandas.DataFrame({"X": [0, 0.1, 4, 6, 5], \
"Y": [0, 0.2, 5, 7, 5] \
})
>>> solus = pandas.DataFrame({"rep-00": [0, 0, 7, 7, -1], \
"rep-01": [4, 4, -1, 6, 6], \
"rep-02": [5, 5, 8, 8, 8] \
})
>>> print(remap_lbls(solus, db).to_string())
rep-00 rep-01 rep-02
0 0 0 0
1 0 0 0
2 7 -1 7
3 7 7 7
4 -1 7 7
"""
# N. of clusters by solution
ns_clusters = solus.apply(lambda x: x.unique().shape[0])
# Pick reference solution as one w/ max N. of clusters
ref = ns_clusters[ns_clusters == ns_clusters.max()].iloc[[0]].index[0]
lbl_type = type(solus[ref].iloc[0])
# Obtain centroids of reference solution
ref_centroids = (
xys.groupby(solus[ref])[xy]
.apply(lambda xys: xys.mean())
.drop(lbl_type(-1), errors="ignore")
)
# Only continue if any solution
if ref_centroids.shape[0] > 0:
# Build KDTree and setup results holder
ref_kdt = cKDTree(ref_centroids)
remapped_solus = pandas.DataFrame(
np.zeros(solus.shape, dtype=lbl_type),
index=solus.index,
columns=solus.columns,
)
if (n_jobs == -1) or (n_jobs > 1):
pool = _setup_pool(n_jobs)
s_ids = solus.drop(ref, axis=1).columns.tolist()
to_loop_over = [(solus[s], ref_centroids, ref_kdt, xys, xy) for s in s_ids]
remapped = pool.map(_remap_n_expand, to_loop_over)
remapped_df = pandas.concat(remapped, axis=1)
remapped_solus.loc[:, s_ids] = remapped_df
else:
for s in solus.drop(ref, axis=1):
# -
pars = (solus[s], ref_centroids, ref_kdt, xys, xy)
remap_ids = _remap_lbls_single(pars)
# -
remapped_solus.loc[:, s] = solus[s].map(remap_ids)
remapped_solus.loc[:, ref] = solus.loc[:, ref]
return remapped_solus.fillna(lbl_type(-1)).astype(lbl_type)
else:
warnings.warn("No clusters identified.", UserWarning, stacklevel=2)
return solus
def _remap_n_expand(pars):
solus_s, ref_centroids, ref_kdt, xys, xy = pars
remap_ids = _remap_lbls_single(pars)
expanded = solus_s.map(remap_ids)
return expanded
def _remap_lbls_single(pars):
new_lbls, ref_centroids, ref_kdt, xys, xy = pars
lbl_type = type(ref_centroids.index[0])
# Cross-walk to cluster IDs
ref_centroids_ids = pandas.Series(ref_centroids.index.values)
# Centroids for new solution
solu_centroids = (
xys.groupby(new_lbls)[xy]
.apply(lambda xys: xys.mean())
.drop(lbl_type(-1), errors="ignore")
)
# Remapping from old to new labels
_, nrst_ref_cl = ref_kdt.query(solu_centroids.values)
remap_ids = pandas.Series(nrst_ref_cl, index=solu_centroids.index).map(
ref_centroids_ids
)
return remap_ids
def ensemble(solus_relabelled):
"""
Generate unique class prediction based on majority/hard voting
...
Parameters
----------
solus_relabelled : DataFrame
Table with labels for each point (row) and solution
(column). Labels are assumed to be consistent across
solutions.
Returns
-------
pred : DataFrame
Table with one row per observation, a `lbls` column with the
winning label, and a `pct` column with the proportion of
times the winning label was voted
Examples
--------
>>> import pandas
>>> db = pandas.DataFrame({"X": [0, 0.1, 4, 6, 5], \
"Y": [0, 0.2, 5, 7, 5] \
})
>>> solus = pandas.DataFrame({"rep-00": [0, 0, 7, 7, -1], \
"rep-01": [4, 4, -1, 6, 6], \
"rep-02": [5, 5, 8, 8, 8] \
})
>>> solus_rl = remap_lbls(solus, db)
>>> print(round(ensemble(solus_rl), 2).to_string())
lbls pct
0 0 1.00
1 0 1.00
2 7 0.67
3 7 1.00
4 7 0.67
"""
counts = np.array(
list( # noqa: C417
map(lambda a: Counter(a).most_common(1)[0], solus_relabelled.values)
)
)
winner = counts[:, 0]
votes = counts[:, 1].astype(int) / solus_relabelled.shape[1]
pred = pandas.DataFrame(
{"lbls": winner, "pct": votes}, index=solus_relabelled.index
)
return pred
def _setup_pool(n_jobs):
"""
Set pool for multiprocessing
...
Parameters
----------
n_jobs : int
The number of parallel jobs to run. If -1, then the number
of jobs is set to the number of CPU cores.
"""
import multiprocessing as mp
return mp.Pool(mp.cpu_count()) if n_jobs == -1 else mp.Pool(n_jobs)
def get_cluster_boundary(labels, xys, xy=["X", "Y"], n_jobs=1, crs=None, step=1):
"""
Turn a set of labels associated with 2-D points into polygon boundaries
for each cluster using the auto alpha shape algorithm
(`libpysal.cg.alpha_shapes.alpha_shape_auto`)
...
Parameters
----------
labels : Series
Cluster labels for each point in the dataset (noise
samples expressed as -1), indexed as `xys`
xys : DataFrame
Table including coordinates
xy : list
[Default=`['X', 'Y']`] Ordered pair of names for XY
coordinates in `xys`
n_jobs : int
[Optional. Default=1] The number of parallel jobs to run
for remapping. If -1, then the number of jobs is set to
the number of CPU cores.
crs : str
[Optional] Coordinate system
step : int
[Optional. Default=1]
Number of points in `xys` to jump ahead in the alpha
shape stage after checking whether the largest possible
alpha that includes the point and all the other ones
with smaller radii
Returns
-------
polys : GeoSeries
GeoSeries with polygons for each cluster boundary, indexed
on the cluster label
Examples
--------
>>> import pandas
>>> from esda.adbscan import ADBSCAN, get_cluster_boundary
>>> import numpy as np
>>> np.random.seed(10)
>>> db = pandas.DataFrame({'X': np.random.random(25), \
'Y': np.random.random(25) \
})
ADBSCAN can be run following scikit-learn like API as:
>>> np.random.seed(10)
>>> clusterer = ADBSCAN(0.03, 3, reps=10, keep_solus=True)
>>> _ = clusterer.fit(db)
>>> labels = pandas.Series(clusterer.labels_, index=db.index)
>>> polys = get_cluster_boundary(labels, db)
>>> polys[0].wkt
'POLYGON ((0.7217553174317995 0.8192869956700687, 0.7605307121989587 0.9086488808086682, 0.9177741225129434 0.8568503024577332, 0.8126209616521135 0.6262871483113925, 0.6125260668293881 0.5475861559192435, 0.5425443680112613 0.7546476915298572, 0.7217553174317995 0.8192869956700687))'
""" # noqa: E501
try:
from geopandas import GeoSeries
except ModuleNotFoundError:
def GeoSeries(data, index=None, crs=None): # noqa: ARG001, N802
return list(data)
lbl_type = type(labels.iloc[0])
noise = lbl_type(-1)
ids_in_cluster = labels[labels != noise].index
g = xys.loc[ids_in_cluster, xy].groupby(labels[ids_in_cluster])
chunked_pts_step = []
cluster_lbls = []
for sub in g.groups:
chunked_pts_step.append((xys.loc[g.groups[sub], xy].values, step))
cluster_lbls.append(sub)
if n_jobs == 1:
polys = map(_asa, chunked_pts_step)
else:
pool = _setup_pool(n_jobs)
polys = pool.map(_asa, chunked_pts_step)
pool.close()
polys = GeoSeries(polys, index=cluster_lbls, crs=crs)
return polys
def _asa(pts_s):
return alpha_shape_auto(pts_s[0], step=pts_s[1])