"""
Openshaw, S. and Rao, L. (1995). Algorithms for reengineering 1991 census geography.
Environment and Planning A, 27(3):425-446.
"""
# ruff: noqa: B008, N806
import abc
import math
import random
from collections import deque
import networkx as nx
import numpy as np
from spopt.region.azp_util import (
AllowMoveAZP,
AllowMoveAZPSimulatedAnnealing,
AllowMoveStrategy,
)
from spopt.region.csgraph_utils import is_connected, neighbors, sub_adj_matrix
from spopt.region.objective_function import ObjectiveFunctionPairwise
from spopt.region.util import (
Move,
array_from_df_col,
array_from_dict_values,
array_from_graph_or_dict,
assert_feasible,
boolean_assert_feasible,
copy_func,
count,
generate_initial_sol,
make_move,
pop_randomly_from,
random_element_from,
scipy_sparse_matrix_from_dict,
scipy_sparse_matrix_from_w,
separate_components,
w_from_gdf,
)
from ..BaseClass import BaseSpOptHeuristicSolver
[docs]
class AZP(BaseSpOptHeuristicSolver):
"""AZP involves class offering the implementation of
the automatic zoning procedure algorithm.
Parameters
----------
gdf : geopandas.GeoDataFrame
Geodataframe containing original data.
w : libpysal.weights.W
Weights object created from given data.
attrs_name : list
Strings for attribute names (cols of ``geopandas.GeoDataFrame``).
n_clusters : int
The number of clusters to form. Default is ``5``.
allow_move_strategy : None or AllowMoveStrategy
For a different behavior for allowing moves an AllowMoveStrategy
instance can be passed as argument. Default is ``None``.
random_state : None, int, str, bytes, or bytearray
Random seed. Default is ``None``.
initial_labels : numpy.ndarray or None
One-dimensional array of labels at the beginning of the algorithm.
If ``None``, then a random initial clustering will be generated.
Default is ``None``.
objective_func : spopt.region.objective_function.ObjectiveFunction
The objective function to use. Default is
``ObjectiveFunctionPairwise()``.
Attributes
----------
labels_ : numpy.ndarray
Each element is a region label specifying to which region the
corresponding area was assigned to by the last run of a fit-method.
Examples
--------
>>> import numpy as np
>>> import libpysal
>>> import geopandas as gpd
>>> from spopt.region import AZP
Read the data.
>>> pth = libpysal.examples.get_path('mexicojoin.shp')
>>> mexico = gpd.read_file(pth)
Initialize the parameters.
>>> attrs_name = [f'PCGDP{year}' for year in range(1950,2010, 10)]
>>> w = libpysal.weights.Queen.from_dataframe(mexico)
>>> n_clusters = 8
>>> floor = 3
>>> allow_move_strategy = None
>>> random_state = 12345
Run the skater algorithm.
>>> model = AZP(
... mexico, w, attrs_name, n_clusters, allow_move_strategy, random_state
... )
>>> model.solve()
Get the region IDs for unit areas.
>>> model.labels_
Show the clustering results.
>>> mexico['azp_new'] = model.labels_
>>> mexico.plot(column='azp_new', categorical=True, figsize=(12,8), edgecolor='w')
"""
[docs]
def __init__(
self,
gdf,
w,
attrs_name,
n_clusters=5,
allow_move_strategy=None,
random_state=None,
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
self.gdf = gdf
self.w = w
self.attrs_name = attrs_name
self.n_clusters = n_clusters
self.allow_move_strategy = allow_move_strategy
self.random_state = random_state
self.initial_labels = initial_labels
self.objective_func = objective_func
[docs]
def solve(self):
"""Solve the azp"""
data = self.gdf
X = data[self.attrs_name].values
model = AZPOrig(self.allow_move_strategy, self.random_state)
model.fit_from_w(
self.w,
X,
self.n_clusters,
initial_labels=self.initial_labels,
objective_func=self.objective_func,
)
self.labels_ = model.labels_
class AZPOrig:
"""
Class offering the implementation of the AZP algorithm.
Attributes
----------
labels_ : numpy.ndarray
Each element is a region label specifying to which region the
corresponding area was assigned to by the last run of a fit-method.
"""
def __init__(self, allow_move_strategy=None, random_state=None):
"""
Parameters
----------
allow_move_strategy : None or AllowMoveStrategy
If None, then the AZP algorithm in [OR1995]_ is chosen.
For a different behavior for allowing moves an AllowMoveStrategy
instance can be passed as argument. Default is ``None``.
random_state : None, int, str, bytes, or bytearray
Random seed. Default is ``None``.
"""
self.n_regions = None
self.labels_ = None
self.random_state = random_state
random.seed(self.random_state)
if isinstance(allow_move_strategy, AllowMoveStrategy):
self.allow_move_strategy = allow_move_strategy
elif allow_move_strategy is None:
self.allow_move_strategy = AllowMoveAZP()
else:
raise ValueError(
"The allow_move_strategy argument must be either "
"None, or an instance of AllowMoveStrategy."
)
self.objective_func = None
def fit_from_scipy_sparse_matrix(
self,
adj,
attr,
n_regions,
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Perform the AZP algorithm as described in [OR1995]_.
The resulting region labels are assigned to the instance's
:attr:`labels_` attribute.
Parameters
----------
adj : scipy.sparse.csr_matrix
Adjacency matrix representing the contiguity relation.
attr : numpy.ndarray
Array (number of areas x number of attributes) of areas' attributes
relevant to clustering.
n_regions : int
Number of desired regions.
initial_labels : numpy.ndarray or None
One-dimensional array of labels at the beginning of the algorithm.
If ``None``, then a random initial clustering will be generated.
Default is ``None``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
The objective function to use. Default is
``ObjectiveFunctionPairwise()``.
""" # noqa: E501
if attr.ndim == 1:
attr = attr.reshape(adj.shape[0], -1)
self.allow_move_strategy.attr_all = attr
self.objective_func = objective_func
# step 1
if initial_labels is not None:
assert_feasible(initial_labels, adj, n_regions)
initial_labels_gen = separate_components(adj, initial_labels)
else:
initial_labels_gen = generate_initial_sol(adj, n_regions)
labels = -np.ones(adj.shape[0])
for labels_comp in initial_labels_gen:
comp_idx = np.where(labels_comp != -1)[0]
adj_comp = sub_adj_matrix(adj, comp_idx)
labels_comp = labels_comp[comp_idx]
attr_comp = attr[comp_idx]
self.allow_move_strategy.start_new_component(
labels_comp, attr_comp, self.objective_func, comp_idx
)
labels_comp = self._azp_connected_component(
adj_comp, labels_comp, attr_comp
)
labels[comp_idx] = labels_comp
self.n_regions = n_regions
self.labels_ = labels
fit = copy_func(fit_from_scipy_sparse_matrix)
fit.__doc__ = (
"Alias for :meth:`fit_from_scipy_sparse_matrix`.\n\n"
+ fit_from_scipy_sparse_matrix.__doc__
)
def fit_from_w(
self,
w,
attr,
n_regions,
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Alternative API for ``fit_from_scipy_sparse_matrix``.
Parameters
----------
w : libpysal.weights.weights.W
`W` object representing the contiguity relation.
attr : numpy.ndarray
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
n_regions : int
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
initial_labels : numpy.ndarray or None
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``. Default is ``None``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
adj = scipy_sparse_matrix_from_w(w)
self.fit_from_scipy_sparse_matrix(
adj, attr, n_regions, initial_labels, objective_func=objective_func
)
def fit_from_networkx(
self,
graph,
attr,
n_regions,
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Alternative API for ``fit_from_scipy_sparse_matrix``.
Parameters
----------
graph : networkx.Graph
Graph representing the contiguity relation.
attr : str, list, or dict
If the clustering criteria are present in the ``networkx.Graph``
`graph` as node attributes, then they can be specified as a string
(for one criterion) or as a list of strings (for multiple
criteria).
Alternatively, a dict can be used with each key being a node of the
networkx.Graph `graph` and each value being the corresponding
clustering criterion (a scalar [e.g. ``float`` or ``int``] or a
``numpy.ndarray``).
If there are no clustering criteria present in the ``networkx.Graph``
`graph` as node attributes, then a dictionary must be used for this
argument. Refer to the corresponding argument in
``fit_from_dict`` for more details about the expected dict.
n_regions : int
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
initial_labels : str, dict, or None
If str, then the string names the graph's attribute holding the
information about the initial clustering.
If ``dict``, then each key is a node and each value is the region the
key area is assigned to at the beginning of the algorithm.
If ``None``, then a random initial clustering will be generated.
Default is ``None``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
adj = nx.to_scipy_sparse_matrix(graph)
attr = array_from_graph_or_dict(graph, attr)
if initial_labels is not None:
initial_labels = array_from_graph_or_dict(graph, initial_labels)
self.fit_from_scipy_sparse_matrix(
adj, attr, n_regions, initial_labels, objective_func=objective_func
)
def fit_from_geodataframe(
self,
gdf,
attr,
n_regions,
contiguity="rook",
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Alternative API for ``fit_from_scipy_sparse_matrix``.
Parameters
----------
gdf : geopandas.GeoDataFrame
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``.
attr : str or list
The clustering-relevant attributes (columns of the GeoDataFrame
``gdf``) are specified as string (for one column) or list of strings
(for multiple columns).
n_regions : int
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
contiguity : str
Defines the contiguity relationship between areas.
Default is ``'rook'``. Possible contiguity definitions are:
* "rook" - Rook contiguity.
* "queen" - Queen contiguity.
initial_labels : numpy.ndarray or None
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``None``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
w = w_from_gdf(gdf, contiguity)
attr = array_from_df_col(gdf, attr)
self.fit_from_w(
w, attr, n_regions, initial_labels, objective_func=objective_func
)
def fit_from_dict(
self,
neighbor_dict,
attr,
n_regions,
initial_labels=None,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Alternative API for :meth:`fit_from_scipy_sparse_matrix`.
Parameters
----------
neighbor_dict : `dict`
Each key is an area and each value is an iterable of the key area's
neighbors.
attr : `dict`
Each key is an area and each value is the corresponding
clustering-relevant attribute.
n_regions : `int`
Refer to the corresponding argument in
:meth:`fit_from_scipy_sparse_matrix`.
initial_labels : `dict` or None, default: None
Each key represents an area. Each value represents the region, the
corresponding area is assigned to at the beginning of the
algorithm.
If None, then a random initial clustering will be generated.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
:meth:`fit_from_scipy_sparse_matrix`.
""" # noqa: E501
sorted_areas = sorted(neighbor_dict)
adj = scipy_sparse_matrix_from_dict(neighbor_dict)
attr_arr = array_from_dict_values(attr, sorted_areas)
if initial_labels is not None:
initial_labels = array_from_dict_values(
initial_labels, sorted_areas, flat_output=True, dtype=np.int32
)
self.fit_from_scipy_sparse_matrix(
adj, attr_arr, n_regions, initial_labels, objective_func=objective_func
)
def _azp_connected_component(self, adj, initial_clustering, attr): # noqa: ARG002
"""
Implementation of the AZP algorithm for a spatially connected set of
areas (i.e. for every area there is a path to every other area).
Parameters
----------
adj : scipy.sparse.csr_matrix
Adjacency matrix representing the contiguity relation. The matrix'
shape is `(N, N)` where `N` denotes the number of areas in the
currently considered connected component.
initial_clustering : numpy.ndarray
Array of labels. Shape: `(N,)` where `N` denotes the number of areas in
the currently considered connected component.
attr : numpy.ndarray
Array of labels. Shape: `(N, M)` where `N` denotes the number of areas
in the currently considered connected component and `M` denotes the
number of attributes per area.
Returns
-------
labels : numpy.ndarray
One-dimensional array of region labels after the AZP algorithm has
been performed. Only region labels of the currently considered
connected component are returned.
"""
# if there is only one region in the initial solution, just return it.
distinct_regions = list(np.unique(initial_clustering))
if len(distinct_regions) == 1:
return initial_clustering
distinct_regions_copy = distinct_regions.copy()
# step 2: make a list of the M regions
labels = initial_clustering
obj_val_start = float("inf")
obj_val_end = self.allow_move_strategy.objective_val
region_neighbors = {}
for region in distinct_regions:
region_areas = set(np.where(labels == region)[0])
neighs = set()
for area in region_areas:
neighs.update(neighbors(adj, area))
region_neighbors[region] = neighs.difference(region_areas)
del neighs
# step 7: Repeat until no further improving moves are made
while obj_val_end < obj_val_start: # improvement
obj_val_start = float(obj_val_end)
distinct_regions = distinct_regions_copy.copy()
# step 6: when the list for region K is exhausted return to step 3
# and select another region and repeat steps 4-6
while distinct_regions:
# step 3: select & remove any region K at random from this list
recipient = pop_randomly_from(distinct_regions)
while True:
# step 4: identify a set of zones bordering on members of
# region K that could be moved into region K without
# destroying the internal contiguity of the donor region(s)
candidates = []
for neigh in region_neighbors[recipient]:
neigh_region = labels[neigh]
sub_adj = sub_adj_matrix(
adj, np.where(labels == neigh_region)[0], wo_nodes=neigh
)
# if area is alone in its region, it must stay
if is_connected(sub_adj) and count(labels, neigh_region) > 1:
candidates.append(neigh)
# step 5: randomly select zones from this list until either
# there is a local improvement in the current value of the
# objective function or a move that is equivalently as good
# as the current best. Then make the move, update the list
# of candidate zones, and return to step 4 or else repeat
# step 5 until the list is exhausted.
while candidates:
cand = pop_randomly_from(candidates)
if self.allow_move_strategy(cand, recipient, labels):
donor = labels[cand]
make_move(cand, recipient, labels)
region_neighbors[donor].add(cand)
region_neighbors[recipient].discard(cand)
neighs_of_cand = neighbors(adj, cand)
recipient_region_areas = set(
np.where(labels == recipient)[0]
)
region_neighbors[recipient].update(neighs_of_cand)
region_neighbors[recipient].difference_update(
recipient_region_areas
)
donor_region_areas = set(np.where(labels == donor)[0])
not_donor_neighs_anymore = {
area
for area in neighs_of_cand
if not any(
a in donor_region_areas
for a in neighbors(adj, area)
)
}
region_neighbors[donor].difference_update(
not_donor_neighs_anymore
)
break
else:
break
obj_val_end = float(self.allow_move_strategy.objective_val)
return labels
class AZPSimulatedAnnealing:
"""
Class offering the implementation of the AZP-SA algorithm (see [OR1995]_).
Attributes
----------
labels_ : numpy.ndarray
Each element is a region label specifying to which region the
corresponding area was assigned to by the last run of a fit-method.
"""
def __init__(
self,
init_temperature=None,
max_iterations=float("inf"),
sa_moves_term=10,
nonmoving_steps_before_stop=3,
repetitions_before_termination=5,
random_state=None,
):
"""
Parameters
----------
init_temperature : float
The initial temperature used in the simulated annealing algorithm.
max_iterations : int or float("inf")
Termination condition for step b: Terminate if the AZP algorithm
has run for `max_iterations` times. Default is ``float('inf')``.
sa_moves_term : int
Termination condition for step b: Count the SA-moves made by the
repeated runs of the AZP (modified in step 5) and terminate after
the AZP run that made the cumulative number of SA-moves reach or
exceed `sa_moves_term`. Default is ``10``.
nonmoving_steps_before_stop : int
Termination condition: Repeat steps b and c until no further moves
occur for `nonmoving_steps_before_stop` times. Default is ``3``.
repetitions_before_termination : int
Termination condition not present in [OR1995]_: Terminate if the
AZP runs returned a given solution for
`repetitions_before_termination` times. Default is ``5``.
random_state : None, int, str, bytes, or bytearray
Random seed. Default is ``None``.
"""
self.allow_move_strategy = None
self.azp = None
if init_temperature is not None:
self.init_temperature = init_temperature
else:
raise NotImplementedError("TODO") # todo
self.maxit = max_iterations
self.sa_moves_term = sa_moves_term
self.sa_moves_term_reached = False
self.move_made = False
self.nonmoving_steps_before_stop = nonmoving_steps_before_stop
self.visited = []
self.reps_before_termination = repetitions_before_termination
self.random_state = random_state
self.n_regions = None
def fit_from_geodataframe(
self,
gdf,
attr,
n_regions,
contiguity="rook",
initial_labels=None,
cooling_factor=0.85,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Parameters
----------
gdf : geopandas.GeoDataFrame
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``.
attr : str or list
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``.
n_regions : int
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``.
contiguity : str
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``. Default is ``'rook'``.
initial_labels : numpy.ndarray
Refer to the corresponding argument in
``AZP.fit_from_geodataframe``. Default is ``None``.
cooling_factor : float
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``. Default is ``0.85``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
w = w_from_gdf(gdf, contiguity)
attr = array_from_df_col(gdf, attr)
self.fit_from_w(
w,
attr,
n_regions,
initial_labels,
cooling_factor=cooling_factor,
objective_func=objective_func,
)
def fit_from_dict(
self,
neighbor_dict,
attr,
n_regions,
initial_labels=None,
cooling_factor=0.85,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Parameters
----------
neighbor_dict : dict
Refer to the corresponding argument in ``AZP.fit_from_dict``.
attr : dict
Refer to the corresponding argument in ``AZP.fit_from_dict``.
n_regions : int
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
initial_labels : dict or None
Refer to the corresponding argument in
``AZP.fit_from_dict``. Default is ``None``.
cooling_factor : float
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``0.85``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
sorted_areas = sorted(neighbor_dict)
adj = scipy_sparse_matrix_from_dict(neighbor_dict)
attr_arr = array_from_dict_values(attr, sorted_areas)
if initial_labels is not None:
initial_labels = array_from_dict_values(
initial_labels, sorted_areas, flat_output=True, dtype=np.int32
)
self.fit_from_scipy_sparse_matrix(
adj,
attr_arr,
n_regions,
initial_labels=initial_labels,
cooling_factor=cooling_factor,
objective_func=objective_func,
)
def fit_from_networkx(
self,
graph,
attr,
n_regions,
initial_labels=None,
cooling_factor=0.85,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Parameters
----------
graph : networkx.Graph
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
attr : str, list, or dict
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
n_regions : int
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
initial_labels : str or dict or None
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
Default is ``None``.
cooling_factor : float
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
Default is ``0.85``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``AZP.fit_from_networkx``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
adj = nx.to_scipy_sparse_matrix(graph)
attr = array_from_graph_or_dict(graph, attr)
if initial_labels is not None:
initial_labels = array_from_graph_or_dict(graph, initial_labels)
self.fit_from_scipy_sparse_matrix(
adj,
attr,
n_regions,
initial_labels,
cooling_factor=cooling_factor,
objective_func=objective_func,
)
def fit_from_scipy_sparse_matrix(
self,
adj,
attr,
n_regions,
initial_labels=None,
cooling_factor=0.85,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Parameters
----------
adj : scipy.sparse.csr_matrix
Refer to the corresponding argument in
``AZP.fit_from_scipy_sparse_matrix``.
attr : numpy.ndarray
Refer to the corresponding argument in
``AZP.fit_from_scipy_sparse_matrix``.
n_regions : int
Refer to the corresponding argument in
``AZP.fit_from_scipy_sparse_matrix``.
initial_labels : numpy.ndarray or None
Refer to the corresponding argument in
``AZP.fit_from_scipy_sparse_matrix``.
Default is ``None``.
cooling_factor : float
Float :math:`\\in (0, 1)` specifying the cooling factor for the
simulated annealing.
Default is ``0.85``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``AZP.fit_from_scipy_sparse_matrix``.
""" # noqa: E501
if not (0 < cooling_factor < 1):
raise ValueError(
"The cooling_factor argument must be greater than 0 and less than 1"
)
if attr.ndim == 1:
attr = attr.reshape(adj.shape[0], -1)
self.allow_move_strategy = AllowMoveAZPSimulatedAnnealing(
init_temperature=self.init_temperature, sa_moves_term=self.sa_moves_term
)
self.allow_move_strategy.register_sa_moves_term(self.sa_moves_alert)
self.allow_move_strategy.register_move_made(self.move_made_alert)
self.azp = AZP(
allow_move_strategy=self.allow_move_strategy, random_state=self.random_state
)
# step a
t = self.init_temperature
nonmoving_steps = 0
# step d: repeat step b and c
while nonmoving_steps < self.nonmoving_steps_before_stop:
it = 0
self.sa_moves_term_reached = False
self.allow_move_strategy.reset()
# step b
while it < self.maxit and not self.sa_moves_term_reached:
it += 1
old_sol = initial_labels
self.azp.fit_from_scipy_sparse_matrix(
adj, attr, n_regions, initial_labels, objective_func
)
initial_labels = self.azp.labels_
if old_sol is not None and (old_sol == initial_labels).all():
break
# added termination condition (not in Openshaw & Rao (1995))
if (
self.visited.count(tuple(initial_labels))
>= self.reps_before_termination
):
break
self.visited.append(tuple(initial_labels))
# step c
t *= cooling_factor
self.allow_move_strategy.update_temperature(t)
if self.move_made:
self.move_made = False
else:
nonmoving_steps += 1
self.labels_ = initial_labels
def fit_from_w(
self,
w,
attr,
n_regions,
initial_labels=None,
cooling_factor=0.85,
objective_func=ObjectiveFunctionPairwise(),
):
"""
Parameters
----------
w : libpysal.weights.weights.W
Refer to the corresponding argument in ``AZP.fit_from_w``.
attr : numpy.ndarray
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
n_regions : int
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
initial_labels : numpy.ndarray or None
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``None``.
cooling_factor : float
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``0.85``.
objective_func : :class:`region.ObjectiveFunction` (default ObjectiveFunctionPairwise())
Refer to the corresponding argument in
``fit_from_scipy_sparse_matrix``.
Default is ``ObjectiveFunctionPairwise()``.
""" # noqa: E501
adj = scipy_sparse_matrix_from_w(w)
self.fit_from_scipy_sparse_matrix(
adj,
attr,
n_regions,
initial_labels,
cooling_factor=cooling_factor,
objective_func=objective_func,
)
def sa_moves_alert(self):
self.sa_moves_term_reached = True
def move_made_alert(self):
self.move_made = True
class AZPTabu(AZP, abc.ABC):
"""
Superclass for tabu variants of the AZP.
"""
def _make_move(self, area, new_region, labels, adj):
old_region = labels[area]
make_move(area, new_region, labels)
if not boolean_assert_feasible(
labels, adj
): # If the move breaks the contiguity, revert it!
make_move(area, old_region, labels) # Revert Move!
reverse_move = Move(area, new_region, old_region)
self.tabu.append(reverse_move)
return False
else:
# step 5: Tabu the reverse move for R iterations.
reverse_move = Move(area, new_region, old_region)
self.tabu.append(reverse_move)
return True
def reset_tabu(self, tabu_len=None):
tabu_len = self.tabu.maxlen if tabu_len is None else tabu_len
self.tabu = deque([], tabu_len)
class AZPBasicTabu(AZPTabu):
"""
Implementation of the AZP with basic tabu (refer to [OR1995]_).
Attributes
----------
labels_ : numpy.ndarray
Each element is a region label specifying to which region the
corresponding area was assigned to by the last run of a fit-method.
"""
def __init__(
self, tabu_length=None, repetitions_before_termination=5, random_state=None
):
"""
Parameters
----------
tabu_length : numbers.Integral
The size of the tabu list.
repetitions_before_termination : numbers.integral
This argument specifies a termination condition. If a solution has
been visited for `repetitions_before_termination` times, the
clustering function will terminate.
random_state : None, int, str, bytes, or bytearray
Refer to the corresponding argument in
``AZP.__init__``. Default is ``None``.
"""
self.tabu = deque([], tabu_length)
self.visited = []
self.reps_before_termination = repetitions_before_termination
super().__init__(random_state=random_state)
def _azp_connected_component(self, adj, initial_clustering, attr):
"""
Implementation of the basic tabu version of the AZP algorithm (refer
to [OR1995]_) for a spatially connected set of areas (i.e. for every
area there is a path to every other area).
Parameters
----------
adj : scipy.sparse.csr_matrix
Refer to the corresponding argument in
``AZP._azp_connected_component``.
initial_clustering : numpy.ndarray
Refer to the corresponding argument in
``AZP._azp_connected_component``.
attr : numpy.ndarray
Refer to the corresponding argument in
``AZP._azp_connected_component``.
Returns
-------
labels : numpy.ndarray
Refer to the return value in ``AZP._azp_connected_component``.
"""
self.reset_tabu()
# if there is only one region in the initial solution, just return it.
distinct_regions = list(np.unique(initial_clustering))
if len(distinct_regions) == 1:
return initial_clustering
# step 2: make a list of the M regions
labels = initial_clustering
visited = []
stop = False
while True:
# added termination condition (not in Openshaw & Rao (1995))
label_tup = tuple(labels)
if visited.count(label_tup) >= self.reps_before_termination:
stop = True
visited.append(label_tup)
# step 1 Find the global best move that is not prohibited or tabu.
# find possible moves (globally)
best_move = None
best_objval_diff = float("inf")
for area in range(labels.shape[0]):
old_region = labels[area]
sub_adj = sub_adj_matrix(
adj, np.where(labels == old_region)[0], wo_nodes=area
)
# moving the area must not destroy spatial contiguity in donor
# region and if area is alone in its region, it must stay:
if is_connected(sub_adj) and count(labels, old_region) > 1:
for neigh in neighbors(adj, area):
new_region = labels[neigh]
if new_region != old_region:
possible_move = Move(area, old_region, new_region)
if possible_move not in self.tabu:
objval_diff = self.objective_func.update(
possible_move.area,
possible_move.new_region,
labels,
attr,
)
if objval_diff < best_objval_diff:
best_move = possible_move
best_objval_diff = objval_diff
# step 2: Make this move if it is an improvement or equivalet in
# value.
if (
best_move is not None
and best_objval_diff <= 0
and self.allow_move_strategy(
best_move.area, best_move.new_region, labels
)
):
self._make_move(best_move.area, best_move.new_region, labels, adj)
else:
# step 3: if no improving move can be made, then see if a tabu
# move can be made which improves on the current local best
# (termed an aspiration move)
improving_tabus = [
move
for move in self.tabu
if labels[move.area] == move.old_region
and self.objective_func.update(
move.area, move.new_region, labels, attr
)
< 0
]
if improving_tabus:
aspiration_move = random_element_from(improving_tabus)
if self.allow_move_strategy(
aspiration_move.area, aspiration_move.new_region, labels
):
self._make_move(
aspiration_move.area,
aspiration_move.new_region,
labels,
adj,
)
if stop:
break
else:
# step 4: If there is no improving move and no aspirational
# move, then make the best move even if it is nonimproving
# (that is, results in a worse value of the objective
# function).
if stop:
break
if best_move is not None and self.allow_move_strategy(
best_move.area, best_move.new_region, labels
):
self._make_move(
best_move.area, best_move.new_region, labels, adj
)
return labels
class AZPReactiveTabu(AZPTabu):
"""
Implementation of the AZP with reactive tabu (refer to [OR1995]_).
Attributes
----------
labels_ : numpy.ndarray
Each element is a region label specifying to which region the
corresponding area was assigned to by the last run of a fit-method.
"""
def __init__(self, max_iterations, k1, k2, random_state=None):
"""
Parameters
----------
max_iterations : int
Termination condition: The algorithm terminates after steps 3-11
(see [OR1995]_) are repeated for `max_max_iterations` times.
k1 : int
Defining a necessary condition for jumping from step 7 to step 11
in the algorithm (see [OR1995]_). Such a jump requires (besides the
condition involving `k2`) a solution to be visited more than `k1`
times.
k2 : int
Defining a necessary condition for jumping from step 7 to step 11
in the algorithm (see [OR1995]_). Such a jump requires (besides the
condition involving `k1`) a cycle of solutions to be found at least
`k2` times.
random_state : None, int, str, bytes, or bytearray
Refer to the corresponding argument in
``AZP.__init__``. Default is ``None``.
"""
self.tabu = deque([], maxlen=1)
super().__init__(random_state=random_state)
self.avg_it_until_rep = 1
self.rep_counter = 1
if max_iterations <= 0:
raise ValueError("The `max_iterations` argument must be > 0.")
self.maxit = max_iterations
self.visited = []
self.k1 = k1
self.k2 = k2
def _azp_connected_component(self, adj, initial_labels, attr):
"""
Implementation of the reactive tabu version of the AZP algorithm (refer
to [OR1995]_) for a spatially connected set of areas (i.e. for every
area there is a path to every other area).
Parameters
----------
adj : :class:`scipy.sparse.csr_matrix`
Refer to the corresponding argument in
:meth:`AZP._azp_connected_component`.
initial_labels : :class:`numpy.ndarray`
Refer to the corresponding argument in
:meth:`AZP._azp_connected_component`.
attr : :class:`numpy.ndarray`
Refer to the corresponding argument in
:meth:`AZP._azp_connected_component`.
Returns
-------
labels : :class:`numpy.ndarray`
Refer to the return value in :meth:`AZP._azp_connected_component`.
"""
self.reset_tabu(1)
# if there is only one region in the initial solution, just return it.
distinct_regions = list(np.unique(initial_labels))
if len(distinct_regions) == 1:
return initial_labels
# step 2: make a list of the M regions
labels = initial_labels
it_since_tabu_len_changed = 0
obj_val_start = float("inf")
# step 12: Repeat steps 3-11 until either no further improvements are
# made or a maximum number of iterations are exceeded.
for _it in range(self.maxit):
obj_val_end = self.objective_func(labels, attr)
if not obj_val_end < obj_val_start:
break # step 12
obj_val_start = obj_val_end
it_since_tabu_len_changed += 1
# step 3: Define the list of all possible moves that are not tabu
# and retain regional connectivity.
possible_moves = []
for area in range(labels.shape[0]):
old_region = labels[area]
sub_adj = sub_adj_matrix(
adj, np.where(labels == old_region)[0], wo_nodes=area
)
# moving the area must not destroy spatial contiguity in donor
# region and if area is alone in its region, it must stay:
if is_connected(sub_adj) and count(labels, old_region) > 1:
for neigh in neighbors(adj, area):
new_region = labels[neigh]
if new_region != old_region:
possible_move = Move(area, old_region, new_region)
if possible_move not in self.tabu:
possible_moves.append(possible_move)
# step 4: Find the best nontabu move.
best_move = None
best_move_index = None
best_objval_diff = float("inf")
for i, move in enumerate(possible_moves):
obj_val_diff = self.objective_func.update(
move.area, move.new_region, labels, attr
)
if obj_val_diff < best_objval_diff:
best_move_index, best_move = i, move
best_objval_diff = obj_val_diff
# step 5: Make the move if possible. Update the tabu status.
if self.allow_move_strategy(best_move.area, best_move.new_region, labels):
self._make_move(best_move.area, best_move.new_region, labels, adj)
# step 6: Look up the current zoning system in a list of all zoning
# systems visited so far during the search. If not found then go
# to step 10.
# Sets can't be permuted so we convert our list to a set:
label_tup = tuple(labels)
if label_tup in self.visited:
# step 7: If it is found and it has been visited more than K1
# times already and this cyclical behavior has been found on
# at least K2 other occasions (involving other zones) then go
# to step 11.
times_visited = self.visited.count(label_tup)
cycle = list(reversed(self.visited))
cycle = cycle[: cycle.index(label_tup) + 1]
cycle = list(reversed(cycle))
it_until_repetition = len(cycle)
if times_visited > self.k1:
times_cycle_found = 0
if self.k2 > 0:
for i in range(len(self.visited) - len(cycle)):
if self.visited[i : i + len(cycle)] == cycle:
times_cycle_found += 1
if times_cycle_found >= self.k2:
break
if times_cycle_found >= self.k2:
# step 11: Delete all stored zoning systems and make P
# random moves, P = 1 + self.avg_it_until_rep/2, and
# update tabu to preclude a return to the previous
# state.
# we save the labels such that we can access it if
# this step yields a poor solution.
last_step = (11, tuple(labels))
self.visited = []
p = math.floor(1 + self.avg_it_until_rep / 2)
possible_moves.pop(best_move_index)
for _ in range(p):
move = possible_moves.pop(
random.randrange(len(possible_moves))
)
if self.allow_move_strategy(
move.area, move.new_region, labels
):
self._make_move(move.area, move.new_region, labels, adj)
continue
# step 8: Update a moving average of the repetition
# interval self.avg_it_until_rep, and increase the
# prohibition period R to 1.1*R.
self.rep_counter += 1
avg_it = self.avg_it_until_rep
self.avg_it_until_rep = (
1
/ self.rep_counter
* ((self.rep_counter - 1) * avg_it + it_until_repetition)
)
self.tabu = deque(self.tabu, 1.1 * self.tabu.maxlen)
# step 9: If the number of iterations since R was last
# changed exceeds self.avg_it_until_rep, then decrease R to
# max(0.9*R, 1).
if it_since_tabu_len_changed > self.avg_it_until_rep:
new_tabu_len = max([0.9 * self.tabu.maxlen, 1])
new_tabu_len = math.floor(new_tabu_len)
self.tabu = deque(self.tabu, new_tabu_len)
it_since_tabu_len_changed = 0 # step 8
# step 10: Save the zoning system and go to step 12.
self.visited.append(tuple(labels))
last_step = 10
if last_step == 10:
try:
return np.array(self.visited[-2])
except IndexError:
return np.array(self.visited[-1])
# if step 11 was the last one, the result is in last_step[1]
return np.array(last_step[1])