Source code for spopt.locate.p_median

# ruff: noqa: B009, B010

import warnings

import numpy as np
import pulp
from geopandas import GeoDataFrame
from pointpats.geometry import build_best_tree
from scipy.sparse import csr_matrix, find
from scipy.spatial.distance import cdist

from .base import (
    BaseOutputMixin,
    FacilityModelBuilder,
    LocateSolver,
    MeanDistanceMixin,
    SpecificationError,
)


[docs] class PMedian(LocateSolver, BaseOutputMixin, MeanDistanceMixin): r""" Implement the :math:`p`-median optimization model and solve it. The :math:`p`-median problem, as adapted from :cite:`daskin_2013`, can be formulated as: .. math:: \begin{array}{lllll} \displaystyle \textbf{Minimize} & \displaystyle \sum_{i \in I}\sum_{j \in J}{a_i d_{ij} X_{ij}} && & (1) \\ \displaystyle \textbf{Subject To} & \displaystyle \sum_{j \in J}{X_{ij} = 1} && \forall i \in I & (2) \\ & \displaystyle \sum_{j \in J}{Y_j} = p && & (3) \\ & X_{ij} \leq Y_{j} && \forall i \in I \quad \forall j \in J & (4) \\ & X_{ij} \in \{0, 1\} && \forall i \in I \quad \forall j \in J & (5) \\ & Y_j \in \{0, 1\} && \forall j \in J & (6) \\ & && & \\ \displaystyle \textbf{Where} && i & = & \textrm{index of demand points/areas/objects in set } I \\ && j & = & \textrm{index of potential facility sites in set } J \\ && p & = & \textrm{the number of facilities to be sited} \\ && a_i & = & \textrm{service load or population demand at client location } i \\ && d_{ij} & = & \textrm{shortest distance or travel time between locations } i \textrm{ and } j \\ && X_{ij} & = & \begin{cases} 1, \textrm{if client location } i \textrm{ is served by facility } j \\ 0, \textrm{otherwise} \\ \end{cases} \\ && Y_j & = & \begin{cases} 1, \textrm{if a facility is sited at location } j \\ 0, \textrm{otherwise} \\ \end{cases} \\ \end{array} Parameters ---------- name : str The problem name. problem : pulp.LpProblem A ``pulp`` instance of an optimization model that contains constraints, variables, and an objective function. aij : numpy.array A cost matrix in the form of a 2D array between origins and destinations. Attributes ---------- name : str The problem name. problem : pulp.LpProblem A ``pulp`` instance of an optimization model that contains constraints, variables, and an objective function. fac2cli : numpy.array A 2D array storing facility to client relationships where each row represents a facility and contains an array of client indices with which it is associated. An empty client array indicates the facility is associated with no clients. cli2fac : numpy.array The inverse of ``fac2cli`` where client to facility relationships are shown. aij : numpy.array A cost matrix in the form of a 2D array between origins and destinations. """ # noqa: E501
[docs] def __init__( self, name: str, problem: pulp.LpProblem, aij: np.array, weights_sum: int | float, ): self.aij = aij self.ai_sum = weights_sum self.name = name self.problem = problem
def __add_obj(self, range_clients: range, range_facility: range) -> None: """ Add the objective function to the model. Minimize s0_0 * z0_0 + s0_1 * z0_1 + ... + si_j * zi_j Parameters ---------- range_clients : range The range of demand points. range_facility : range The range of facility point. Returns ------- None """ cli_assgn_vars = getattr(self, "cli_assgn_vars") self.problem += ( pulp.lpSum( [ self.aij[i, j] * cli_assgn_vars[i, j] for i in range_clients for j in range_facility ] ), "objective function", )
[docs] @classmethod def from_cost_matrix( cls, cost_matrix: np.array, weights: np.array, p_facilities: int, predefined_facilities_arr: np.array = None, facility_capacities: np.array = None, fulfill_predefined_fac: bool = False, name: str = "p-median", ): """ Create a ``PMedian`` object based on a cost matrix. Parameters ---------- cost_matrix : numpy.array A cost matrix in the form of a 2D array between origins and destinations. weights : numpy.array A 1D array of service load or population demand. p_facilities : int The number of facilities to be located. predefined_facilities_arr : numpy.array (default None) A binary 1D array of service facilities that must appear in the solution. For example, consider 3 facilites ``['A', 'B', 'C']``. If facility ``'B'`` must be in the model solution, then the passed in array should be ``[0, 1, 0]``. facility_capacity : numpy.array (default None) The capacity of each facility. fulfill_predefined_fac : bool (default False) If the predefined facilities need to be fulfilled. name : str (default 'p-median') The problem name. Returns ------- spopt.locate.p_median.PMedian Examples -------- >>> from spopt.locate import PMedian >>> from spopt.locate.util import simulated_geo_points >>> import geopandas >>> import numpy >>> import pulp >>> import spaghetti Create a regular lattice. >>> lattice = spaghetti.regular_lattice((0, 0, 10, 10), 9, exterior=True) >>> ntw = spaghetti.Network(in_data=lattice) >>> streets = spaghetti.element_as_gdf(ntw, arcs=True) >>> streets_buffered = geopandas.GeoDataFrame( ... geopandas.GeoSeries(streets["geometry"].buffer(0.2).unary_union), ... crs=streets.crs, ... columns=["geometry"] ... ) Simulate points about the lattice. >>> demand_points = simulated_geo_points(streets_buffered, needed=100, seed=5) >>> facility_points = simulated_geo_points(streets_buffered, needed=5, seed=6) Snap the points to the network of lattice edges. >>> ntw.snapobservations(demand_points, "clients", attribute=True) >>> clients_snapped = spaghetti.element_as_gdf( ... ntw, pp_name="clients", snapped=True ... ) >>> ntw.snapobservations(facility_points, "facilities", attribute=True) >>> facilities_snapped = spaghetti.element_as_gdf( ... ntw, pp_name="facilities", snapped=True ... ) Calculate the cost matrix from origins to destinations. >>> cost_matrix = ntw.allneighbordistances( ... sourcepattern=ntw.pointpatterns["clients"], ... destpattern=ntw.pointpatterns["facilities"] ... ) Simulate demand weights from ``1`` to ``12``. >>> ai = numpy.random.randint(1, 12, 100) Create and solve a ``PMedian`` instance from the cost matrix. >>> pmedian_from_cost_matrix = PMedian.from_cost_matrix( ... cost_matrix, ai, p_facilities=4 ... ) >>> pmedian_from_cost_matrix = pmedian_from_cost_matrix.solve( ... pulp.PULP_CBC_CMD(msg=False) ... ) Get the facility-client associations. >>> for fac, cli in enumerate(pmedian_from_cost_matrix.fac2cli): ... print(f"facility {fac} serving {len(cli)} clients") facility 0 serving 14 clients facility 1 serving 29 clients facility 2 serving 31 clients facility 3 serving 0 clients facility 4 serving 26 clients Get the total and average weighted travel cost. >>> round(pmedian_from_cost_matrix.problem.objective.value(), 3) 1870.747 >>> round(pmedian_from_cost_matrix.mean_dist, 3) 3.027 """ n_cli = cost_matrix.shape[0] r_cli = range(n_cli) r_fac = range(cost_matrix.shape[1]) model = pulp.LpProblem(name, pulp.LpMinimize) weights_sum = weights.sum() weights = np.reshape(weights, (n_cli, 1)) aij = weights * cost_matrix p_median = PMedian(name, model, aij, weights_sum) FacilityModelBuilder.add_facility_integer_variable(p_median, r_fac, "y[{i}]") FacilityModelBuilder.add_client_assign_variable( p_median, r_cli, r_fac, "z[{i}_{j}]" ) if predefined_facilities_arr is not None: if fulfill_predefined_fac and facility_capacities is not None: sum_predefined_fac_cap = np.sum( facility_capacities[predefined_facilities_arr] ) if sum_predefined_fac_cap <= weights.sum(): FacilityModelBuilder.add_predefined_facility_constraint( p_median, predefined_facilities_arr, weights, facility_capacities, ) else: raise SpecificationError( "Problem is infeasible. The predefined facilities can't be " "fulfilled, because their capacity is larger than the total " f"demand {weights.sum()}." ) elif fulfill_predefined_fac and facility_capacities is None: raise SpecificationError( "Data on the capacity of the facility is missing, " "so the model cannot be calculated." ) else: FacilityModelBuilder.add_predefined_facility_constraint( p_median, predefined_facilities_arr ) if facility_capacities is not None: sorted_capacities = np.sort(facility_capacities) highest_possible_capacity = sorted_capacities[-p_facilities:].sum() if highest_possible_capacity < weights.sum(): raise SpecificationError( "Problem is infeasible. The highest possible capacity " f"{highest_possible_capacity}, coming from the {p_facilities} " "sites with the highest capacity, is smaller than " f"the total demand {weights.sum()}." ) FacilityModelBuilder.add_facility_capacity_constraint( p_median, weights, facility_capacities, range(len(weights)), range(len(facility_capacities)), ) p_median.__add_obj(r_cli, r_fac) FacilityModelBuilder.add_facility_constraint(p_median, p_facilities) FacilityModelBuilder.add_assignment_constraint(p_median, r_fac, r_cli) FacilityModelBuilder.add_opening_constraint(p_median, r_fac, r_cli) return p_median
[docs] @classmethod def from_geodataframe( cls, gdf_demand: GeoDataFrame, gdf_fac: GeoDataFrame, demand_col: str, facility_col: str, weights_cols: str, p_facilities: int, facility_capacity_col: str = None, predefined_facility_col: str = None, fulfill_predefined_fac: bool = False, distance_metric: str = "euclidean", name: str = "p-median", ): """ Create an ``PMedian`` object from ``geopandas.GeoDataFrame`` objects. Calculate the cost matrix between demand and facility locations before building the problem within the ``from_cost_matrix()`` method. Parameters ---------- gdf_demand : geopandas.GeoDataFrame Demand locations. gdf_fac : geopandas.GeoDataFrame Facility locations. demand_col : str Demand sites geometry column name. facility_col : str Facility candidate sites geometry column name. weights_cols : str The weight column name representing service load or demand. p_facilities: int The number of facilities to be located. predefined_facility_col : str (default None) Column name representing facilities are already defined. This a binary assignment per facility. For example, consider 3 facilites ``['A', 'B', 'C']``. If facility ``'B'`` must be in the model solution, then the column should be ``[0, 1, 0]``. facility_capacities_col: str (default None) Column name representing the capacities of each facility. fulfill_predefined_fac : bool (default False) If the predefined facilities need to be fulfilled. distance_metric : str (default 'euclidean') A metric used for the distance calculations supported by `scipy.spatial.distance.cdist <https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html>`_. name : str (default 'p-median') The name of the problem. Returns ------- spopt.locate.p_median.PMedian Examples -------- >>> from spopt.locate import PMedian >>> from spopt.locate.util import simulated_geo_points >>> import geopandas >>> import numpy >>> import pulp >>> import spaghetti Create a regular lattice. >>> lattice = spaghetti.regular_lattice((0, 0, 10, 10), 9, exterior=True) >>> ntw = spaghetti.Network(in_data=lattice) >>> streets = spaghetti.element_as_gdf(ntw, arcs=True) >>> streets_buffered = geopandas.GeoDataFrame( ... geopandas.GeoSeries(streets["geometry"].buffer(0.2).unary_union), ... crs=streets.crs, ... columns=["geometry"] ... ) Simulate points about the lattice. >>> demand_points = simulated_geo_points(streets_buffered, needed=100, seed=5) >>> facility_points = simulated_geo_points(streets_buffered, needed=5, seed=6) Snap the points to the network of lattice edges and extract as ``GeoDataFrame`` objects. >>> ntw.snapobservations(demand_points, "clients", attribute=True) >>> clients_snapped = spaghetti.element_as_gdf( ... ntw, pp_name="clients", snapped=True ... ) >>> ntw.snapobservations(facility_points, "facilities", attribute=True) >>> facilities_snapped = spaghetti.element_as_gdf( ... ntw, pp_name="facilities", snapped=True ... ) Simulate demand weights from ``1`` to ``12``. >>> ai = numpy.random.randint(1, 12, 100) >>> clients_snapped['weights'] = ai Create and solve a ``PMedian`` instance from the ``GeoDataFrame`` object. >>> pmedian_from_geodataframe = PMedian.from_geodataframe( ... clients_snapped, ... facilities_snapped, ... "geometry", ... "geometry", ... "weights", ... p_facilities=4, ... distance_metric="euclidean" ... ) >>> pmedian_from_geodataframe = pmedian_from_geodataframe.solve( ... pulp.PULP_CBC_CMD(msg=False) ... ) Get the facility-client associations. >>> for fac, cli in enumerate(pmedian_from_geodataframe.fac2cli): ... print(f"facility {fac} serving {len(cli)} clients") facility 0 serving 13 clients facility 1 serving 29 clients facility 2 serving 31 clients facility 3 serving 0 clients facility 4 serving 27 clients """ # noqa: E501 predefined_facilities_arr = None if predefined_facility_col is not None: predefined_facilities_arr = gdf_fac[predefined_facility_col].to_numpy() facility_capacities = None if facility_capacity_col is not None: facility_capacities = gdf_fac[facility_capacity_col].to_numpy() service_load = gdf_demand[weights_cols].to_numpy() dem = gdf_demand[demand_col] fac = gdf_fac[facility_col] dem_type_geom = dem.geom_type.unique() fac_type_geom = fac.geom_type.unique() _msg = ( " geodataframe contains mixed type geometries or is not a point. Be " "sure deriving centroid from geometries doesn't affect the results." ) if len(dem_type_geom) > 1 or "Point" not in dem_type_geom: warnings.warn(f"Demand{_msg}", UserWarning, stacklevel=2) dem = dem.centroid if len(fac_type_geom) > 1 or "Point" not in fac_type_geom: warnings.warn(f"Facility{_msg}", UserWarning, stacklevel=2) fac = fac.centroid dem_data = np.array([dem.x.to_numpy(), dem.y.to_numpy()]).T fac_data = np.array([fac.x.to_numpy(), fac.y.to_numpy()]).T if gdf_demand.crs != gdf_fac.crs: raise ValueError( "Geodataframes crs are different: " f"gdf_demand-{gdf_demand.crs}, gdf_fac-{gdf_fac.crs}" ) distances = cdist(dem_data, fac_data, distance_metric) return cls.from_cost_matrix( cost_matrix=distances, weights=service_load, p_facilities=p_facilities, predefined_facilities_arr=predefined_facilities_arr, facility_capacities=facility_capacities, fulfill_predefined_fac=fulfill_predefined_fac, name=("capacitated-" + name if facility_capacities is not None else name), )
[docs] def facility_client_array(self) -> None: """ Create a 2D array storing **facility to client relationships** where each row represents a facility and contains an array of client indices with which it is associated. An empty client array indicates the facility is associated with no clients. Returns ------- None """ fac_vars = getattr(self, "fac_vars") cli_vars = getattr(self, "cli_assgn_vars") len_fac_vars = len(fac_vars) self.fac2cli = [] for j in range(len_fac_vars): array_cli = [] if fac_vars[j].value() > 0: for i in range(len(cli_vars)): if cli_vars[i, j].value() > 0: array_cli.append(i) self.fac2cli.append(array_cli)
[docs] def solve(self, solver: pulp.LpSolver, results: bool = True): """ Solve the ``PMedian`` model. Parameters ---------- solver : pulp.LpSolver A solver supported by ``pulp``. results : bool (default True) If ``True`` it will create metainfo (which facilities cover which demand) and vice-versa, and the uncovered demand. Returns ------- spopt.locate.p_median.PMedian """ self.problem.solve(solver) self.check_status() if results: self.facility_client_array() self.client_facility_array() self.get_mean_distance() return self
[docs] class KNearestPMedian(PMedian): r""" Implement the P-Median Model with Near-Far Cost Allocation and solve it. The model is adapted from :cite:`church_2018`, and can be formulated as: .. math:: \begin{array}{lllll} \displaystyle \textbf{Minimize} & \displaystyle \sum_{i \in I}\sum_{k \in k_{i}}{a_i d_{ik} X_{ik}} + \sum_{i \in I}{g_i (d_{i{k_i}} + 1)} && & (1) \\ \displaystyle \textbf{Subject To} & \sum_{k \in k_{i}}{X_{ik} + g_i = 1} && \forall i \in I & (2) \\ & \sum_{j \in J}{Y_j} = p && & (3) \\ & \sum_{i \in I}{a_i X_{ik}} \leq {Y_{k} c_{k}} && \forall k \in k_{i} & (4) \\ & X_{ij} \leq Y_{j} && \forall i \in I \quad \forall j \in J & (5) \\ & X_{ij} \in \{0, 1\} && \forall i \in I \quad \forall j \in J & (6) \\ & Y_j \in \{0, 1\} && \forall j \in J & (7) \\ & && & \\ \displaystyle \textbf{Where} && i & = & \textrm{index of demand points/areas/objects in set } I \\ && j & = & \textrm{index of potential facility sites in set } J \\ && p & = & \textrm{the number of facilities to be sited} \\ && a_i & = & \textrm{service load or population demand at client location } i \\ && k_{i} & = & \textrm{the } k \textrm{nearest facilities of client location } i \\ && c_{j} & = & \textrm{the capacity of facility} j \\ && d_{ij} & = & \textrm{shortest distance or travel time between locations } i \textrm{ and } j \\ && X_{ij} & = & \begin{cases} 1, \textrm{if client location } i \textrm{ is served by facility } j \\ 0, \textrm{otherwise} \\ \end{cases} \\ && Y_j & = & \begin{cases} 1, \textrm{if a facility is sited at location } j \\ 0, \textrm{otherwise} \\ \end{cases} \\ && g_i & = & \begin{cases} 1, \textrm{if the client } i \textrm{ needs to be served by non-k-nearest facilities} \\ 0, \textrm{otherwise} \\ \end{cases} \\ \end{array} Parameters ---------- name : str The problem name. ai_sum : Union[int, float] The sum of weights representing the service loads of the clients. clients : np.array An array of coordinates of clients. facilities : np.array An array of coordinates of facilities. weights : np.array An array of weights representing the service loads of the clients. k_array : np.array An array of k values representing the number of nearest facilities for each client. p_facilities: int The number of facilities to be located. capacities : np.array or None An array of facility capacities. None if capacity constraints are not considered. distance_metric : str The distance metric used for computing distances between clients and facilities. Attributes ---------- sparse_matrix : Compressed Sparse Row matrix A cost matrix in the form of a compressed sparse row matrix between origins and destinations. aij : Compressed Sparse Row matrix A weighted cost matrix in the form of a compressed sparse row matrix between origins and destinations. problem : pulp.LpProblem A ``pulp`` instance of an optimization model that contains constraints, variables, and an objective function. fac2cli : numpy.array A 2D array storing facility to client relationships where each row represents a facility and contains an array of client indices with which it is associated. An empty client array indicates the facility is associated with no clients. cli2fac : numpy.array The inverse of ``fac2cli`` where client to facility relationships are shown. """ # noqa: E501
[docs] def __init__( self, name: str, ai_sum: int | float, clients: np.array, facilities: np.array, weights: np.array, k_array: np.array, p_facilities: int, capacities: np.array = None, distance_metric: str = "euclidean", ): self.ai_sum = ai_sum self.clients = clients self.facilities = facilities self.weights = weights self.k_array = k_array self.p_facilities = p_facilities self.capacities = capacities self.distance_metric = distance_metric self.name = name
def __add_obj( self, max_distance: np.array, range_clients: range, range_facility: range ) -> None: """ Add the objective function to the model. Parameters ---------- max_distance : np.array An array of distances between each client and their k nearest facility. For example, when k = 2, this array will only store the distances between each client and their nearest and second nearest facility. range_clients : range The range of demand points. range_facility : range The range of facility point. Returns ------- None """ cli_assgn_vars = getattr(self, "cli_assgn_vars") placeholder_vars = getattr(self, "placeholder_vars") self.problem += ( pulp.lpSum( pulp.lpSum( self.aij[i, j] * cli_assgn_vars.get((i, j), 0) for j in range_facility ) + (placeholder_vars[i] * (max_distance[i] + 1)) for i in range_clients ), "objective function", )
[docs] @classmethod def from_cost_matrix(cls, *args, **kwargs): """ Warning: This method is not supported in the KNearestPMedian subclass. """ raise NotImplementedError( "The `from_cost_matrix()` method is not " "supported in `KNearestPMedian` class." )
def _create_sparse_matrix(self) -> None: """ Create a sparse matrix representing the distance between clients and their k nearest facilities. This method uses a suitable tree data structure (built with the ``pointpats.geometry.build_best_tree()`` function) to efficiently find the ``k`` nearest facilities for each client based on the specified distance metric. The resulting distances are stored in a sparse matrix format to conserve memory for large datasets. Returns ------- None """ row_shape = len(self.clients) column_shape = len(self.facilities) # check the k value with the total number of facilities if not (self.k_array <= column_shape).all(): raise ValueError( "The value of `k` should be no more than the number " f"of total facilities: ({column_shape})." ) # Initialize empty lists to store the data for the sparse matrix data = [] row_index = [] col_index = [] # create the suitable Tree tree = build_best_tree(self.facilities, self.distance_metric) for i, k in enumerate(self.k_array): # Query the Tree to find the k nearest facilities for each client distances, k_nearest_facilities_indices = tree.query([self.clients[i]], k=k) # extract the contents of the inner array distances = distances[0].tolist() k_nearest_facilities_indices = k_nearest_facilities_indices[0].tolist() # Append the data for the sparse matrix data.extend(distances) row_index.extend([i] * k) col_index.extend(k_nearest_facilities_indices) # Create the sparse matrix using csr_matrix self.sparse_matrix = csr_matrix( (data, (row_index, col_index)), shape=(row_shape, column_shape) ) def _update_k_array(self) -> None: """ Increase the ``k`` value for clients with any :math:`g_i > 0` and update the ``k`` array. This method is used to adjust the ``k`` values for clients based on their placeholder variable :math:`g_i`. For clients with :math:`g_i` greater than ``0``, the corresponding ``k`` value is increased by ``1`` in the new ``k`` array. Returns ------- None """ new_k_array = self.k_array.copy() placeholder_vars = getattr(self, "placeholder_vars") for i in range(len(placeholder_vars)): if placeholder_vars[i].value() > 0: new_k_array[i] = new_k_array[i] + 1 self.k_array = new_k_array def _from_sparse_matrix(self) -> None: """ Create the k nearest p-median problem from the sparse distance matrix. Returns ------- None """ n_cli = self.sparse_matrix.shape[0] r_cli = range(n_cli) r_fac = range(self.sparse_matrix.shape[1]) self.weights = np.reshape(self.weights, (n_cli, 1)) # create and store the weighted sparse matrix self.aij = self.sparse_matrix.multiply(self.weights).tocsr() # create the model/problem self.problem = pulp.LpProblem(self.name, pulp.LpMinimize) # add all the 1)decision variable, 2)objective function, and 3)constraints # Facility integer decision variable FacilityModelBuilder.add_facility_integer_variable(self, r_fac, "y[{i}]") fac_vars = getattr(self, "fac_vars") # Placeholder facility decision variable placeholder_vars = pulp.LpVariable.dicts( "g", (i for i in r_cli), 0, 1, pulp.LpBinary ) setattr(self, "placeholder_vars", placeholder_vars) # Client assignment integer decision variables row_indices, col_indices, values = find(self.aij) cli_assgn_vars = pulp.LpVariable.dicts( "z", list(zip(row_indices, col_indices, strict=True)), 0, 1, pulp.LpBinary ) setattr(self, "cli_assgn_vars", cli_assgn_vars) # Add the objective function max_distance = self.aij.max(axis=1).toarray().flatten() self.__add_obj(max_distance, r_cli, r_fac) # Create the capacity constraints if self.capacities is not None: sorted_capacities = np.sort(self.capacities) highest_possible_capacity = sorted_capacities[-self.p_facilities :].sum() if highest_possible_capacity < self.ai_sum: raise SpecificationError( "Problem is infeasible. The highest possible capacity " f"({highest_possible_capacity}), coming from the " f"{self.p_facilities} sites with the highest capacity, " f"is smaller than the total demand ({self.ai_sum})." ) for j in col_indices: self.problem += ( pulp.lpSum( self.weights[i] * cli_assgn_vars.get((i, j), 0) for i in r_cli ) <= fac_vars[j] * self.capacities[j] ) # Create assignment constraints. for i in r_cli: self.problem += ( pulp.lpSum(cli_assgn_vars.get((i, j), 0) for j in set(col_indices)) + placeholder_vars[i] == 1 ) # Create the facility constraint FacilityModelBuilder.add_facility_constraint(self, self.p_facilities) # Create opening constraints FacilityModelBuilder.add_opening_constraint(self, r_fac, r_cli)
[docs] @classmethod def from_geodataframe( cls, gdf_demand: GeoDataFrame, gdf_fac: GeoDataFrame, demand_col: str, facility_col: str, weights_cols: str, p_facilities: int, facility_capacity_col: str = None, k_array: np.array = None, distance_metric: str = "euclidean", name: str = "k-nearest-p-median", ): """ Create the object of KNearestPMedian class using input data. Parameters ---------- gdf_demand : GeoDataFrame A GeoDataFrame containing demand points with their associated attributes. gdf_fac : GeoDataFrame A GeoDataFrame containing facility points with their associated attributes. demand_col : str The column name in gdf_demand representing the coordinate of each client. facility_col : str The column name in gdf_fac representing the coordinate of each facility. weights_cols : str The column name in gdf_demand representing the weights for each client. p_facilities: int The number of facilities to be located. facility_capacity_col : str, optional The column name in gdf_fac representing the capacity of each facility, by default None. k_array : np.array, optional An array of integers representing the k values for each client. If not provided, a default value of 5 or the number of facilities, whichever is smaller, will be used. distance_metric : str, optional The distance metric to be used in calculating distances between clients and facilities, by default "euclidean". name : str, optional The name of the problem, by default "k-nearest-p-median". Returns ------- spopt.locate.p_median.KNearestPMedian Examples -------- >>> from spopt.locate import KNearestPMedian >>> import geopandas Create the input data and attributes. >>> k = np.array([1, 1]) >>> demand_data = { ... 'ID': [1, 2], ... 'geometry': [Point(0.5, 1), Point(1.5, 1)], ... 'demand': [1, 1]} >>> facility_data = { ... 'ID': [101, 102], ... 'geometry': [Point(1,1), Point(0, 2), Point(2, 0)], ... 'capacity': [1, 1, 1]} >>> gdf_demand = geopandas.GeoDataFrame(demand_data, crs='EPSG:4326') >>> gdf_fac = geopandas.GeoDataFrame(facility_data, crs='EPSG:4326') Create and solve a ``KNearestPMedian`` instance from the geodataframe. >>> k_nearest_pmedian = KNearestPMedian.from_geodataframe( ... gdf_demand, gdf_fac,'geometry','geometry', weights_cols='demand', ... 2, facility_capacity_col='capacity', k_array = k) >>> k_nearest_pmedian = k_nearest_pmedian.solve(pulp.PULP_CBC_CMD(msg=False)) Get the facility-client associations. >>> for fac, cli in enumerate(k_nearest_pmedian.fac2cli): ... print(f"facility {fac} serving {len(cli)} clients") facility 0 serving 1 clients facility 1 serving 1 clients facility 2 serving 0 clients Get the total and average weighted travel cost. >>> round(k_nearest_pmedian.problem.objective.value(), 3) 1.618 >>> round(k_nearest_pmedian.mean_dist, 3) 0.809 Get the k values for the last iteration. >>> print(k_nearest_pmedian.k_array) [2, 1] """ # check the crs of two geodataframes if gdf_demand.crs is None: raise ValueError("GeoDataFrame ``gdf_demand`` does not have a valid CRS.") if gdf_fac.crs is None: raise ValueError("GeoDataFrame ``gdf_fac`` does not have a valid CRS.") if gdf_demand.crs != gdf_fac.crs: raise ValueError( "Geodataframes crs are different: " f"gdf_demand-{gdf_demand.crs}, gdf_fac-{gdf_fac.crs}" ) # create the array of coordinate of clients and facilities dem = gdf_demand[demand_col] fac = gdf_fac[facility_col] dem_data = np.array([dem.x.to_numpy(), dem.y.to_numpy()]).T fac_data = np.array([fac.x.to_numpy(), fac.y.to_numpy()]).T # check the values of k_array if k_array is None: k_array = np.full(len(dem_data), np.minimum(len(fac_data), 5)) elif not isinstance(k_array, np.ndarray): raise TypeError("`k_array` should be a numpy array.") elif not (k_array <= len(fac_data)).all(): raise ValueError( f"The value of `k` should be no more than the number " f"of total facilities, which is {len(fac_data)}." ) # demand and capacity service_load = gdf_demand[weights_cols].to_numpy() weights_sum = service_load.sum() facility_capacities = None if facility_capacity_col is not None: facility_capacities = gdf_fac[facility_capacity_col].to_numpy() return KNearestPMedian( ("capacitated-" + name if facility_capacities is not None else name), weights_sum, dem_data, fac_data, service_load, k_array, p_facilities, facility_capacities, distance_metric, )
[docs] def facility_client_array(self) -> None: """ Create a 2D array storing **facility to client relationships** where each row represents a facility and contains an array of client indices with which it is associated. An empty client array indicates the facility is associated with no clients. Returns ------- None """ fac_vars = getattr(self, "fac_vars") cli_vars = getattr(self, "cli_assgn_vars") len_fac_vars = len(fac_vars) self.fac2cli = [] for j in range(len_fac_vars): array_cli = [] if fac_vars[j].value() > 0: for i in range(len(cli_vars)): if (i, j) in cli_vars and cli_vars[i, j].value() > 0: array_cli.append(i) self.fac2cli.append(array_cli)
[docs] def solve(self, solver: pulp.LpSolver, results: bool = True): """ Solve the k-nearest p-median model. This method iteratively solves the ``KNearestPMedian`` model using a specified solver until no more clients need to be assigned to placeholder facilities. The ``k`` values for clients are increased dynamically based on the presence of clients not assigned to ``k`` nearest facilities. Parameters ---------- solver : pulp.LpSolver The solver to be used for solving the optimization model. results : bool (default True) If ``True`` it will create metainfo (which facilities cover which demand) and vice versa, and the uncovered demand. Returns ------- spopt.locate.p_median.KNearestPMedian """ # initialize the sum of the values of g_i sum_gi = 1 # start the loop while sum_gi > 0: self._create_sparse_matrix() self._from_sparse_matrix() self.problem.solve(solver) self.check_status() # check the result placeholder_vars = getattr(self, "placeholder_vars") sum_gi = sum( placeholder_vars[i].value() for i in range(len(placeholder_vars)) if placeholder_vars[i].value() > 0 ) if sum_gi > 0: self._update_k_array() if results: self.facility_client_array() self.client_facility_array() self.get_mean_distance() return self