Source code for tobler.area_weighted.area_interpolate

"""
Area Weighted Interpolation

"""

import os

import geopandas as gpd
import numpy as np
import pandas as pd
from scipy.sparse import coo_matrix, diags

from tobler.util.util import _check_crs, _inf_check, _nan_check

__all__ = ["area_interpolate"]


def _chunk_dfs(geoms_to_chunk, geoms_full, n_jobs):
    chunk_size = geoms_to_chunk.shape[0] // n_jobs + 1
    for i in range(n_jobs):
        start = i * chunk_size
        yield geoms_to_chunk.iloc[start : start + chunk_size], geoms_full


def _index_n_query(geoms1, geoms2):
    # Pick largest for STRTree, query the smallest
    if geoms1.shape[0] > geoms2.shape[0]:
        large = geoms1
        small = geoms2
    else:
        large = geoms2
        small = geoms1
    # Build tree + query
    qry_polyIDs, tree_polyIDs = large.sindex.query(small, predicate="intersects")
    # Remap IDs to global
    large_global_ids = large.iloc[tree_polyIDs].index.values
    small_global_ids = small.iloc[qry_polyIDs].index.values
    # Return always global IDs for geoms1, geoms2
    if geoms1.shape[0] > geoms2.shape[0]:
        return np.array([large_global_ids, small_global_ids]).T
    else:
        return np.array([small_global_ids, large_global_ids]).T


def _chunk_polys(id_pairs, geoms_left, geoms_right, n_jobs):
    chunk_size = id_pairs.shape[0] // n_jobs + 1
    for i in range(n_jobs):
        start = i * chunk_size
        chunk1 = geoms_left.array[id_pairs[start : start + chunk_size, 0]]
        chunk2 = geoms_right.array[id_pairs[start : start + chunk_size, 1]]
        yield chunk1, chunk2


def _intersect_area_on_chunk(geoms1, geoms2):
    areas = geoms1.intersection(geoms2).area
    return areas


def _area_tables_binning_parallel(source_df, target_df, n_jobs=-1):
    """Construct area allocation and source-target correspondence tables using
    a parallel spatial indexing approach
    ...

    NOTE: currently, the largest df is chunked and the other one is shipped in
    full to each core; within each process, the spatial index is built for the
    largest set of geometries, and the other one used for `query`

    Parameters
    ----------
    source_df : geopandas.GeoDataFrame
        GeoDataFrame containing input data and polygons
    target_df : geopandas.GeoDataFramee
        GeoDataFrame defining the output geometries
    n_jobs : int
        [Optional. Default=-1] Number of processes to run in parallel. If -1,
        this is set to the number of CPUs available

    Returns
    -------
    tables : scipy.sparse.csr_matrix

    """
    from joblib import Parallel, delayed, parallel_backend

    if _check_crs(source_df, target_df):
        pass
    else:
        return None
    if n_jobs == -1:
        n_jobs = os.cpu_count()

    df1 = source_df.copy()
    df2 = target_df.copy()

    # Chunk the largest, ship the smallest in full
    if df1.shape[0] > df2.shape[1]:
        to_chunk = df1
        df_full = df2
    else:
        to_chunk = df2
        df_full = df1

    # Spatial index query
    ## Reindex on positional IDs
    to_workers = _chunk_dfs(
        gpd.GeoSeries(to_chunk.geometry.values, crs=to_chunk.crs),
        gpd.GeoSeries(df_full.geometry.values, crs=df_full.crs),
        n_jobs,
    )

    with parallel_backend("loky", inner_max_num_threads=1):
        worker_out = Parallel(n_jobs=n_jobs)(
            delayed(_index_n_query)(*chunk_pair) for chunk_pair in to_workers
        )

    ids_src, ids_tgt = np.concatenate(worker_out).T

    # Intersection + area calculation
    chunks_to_intersection = _chunk_polys(
        np.vstack([ids_src, ids_tgt]).T, df1.geometry, df2.geometry, n_jobs
    )
    with parallel_backend("loky", inner_max_num_threads=1):
        worker_out = Parallel(n_jobs=n_jobs)(
            delayed(_intersect_area_on_chunk)(*chunk_pair)
            for chunk_pair in chunks_to_intersection
        )
    areas = np.concatenate(worker_out)

    # Build CSR table
    table = coo_matrix(
        (
            areas,
            (ids_src, ids_tgt),
        ),
        shape=(df1.shape[0], df2.shape[0]),
        dtype=np.float32,
    )
    table = table.tocsr()
    return table


def _area_tables_binning(source_df, target_df, spatial_index):
    """Construct area allocation and source-target correspondence tables using a spatial indexing approach
    ...

    NOTE: this currently relies on Geopandas' spatial index machinery

    Parameters
    ----------
    source_df : geopandas.GeoDataFrame
        GeoDataFrame containing input data and polygons
    target_df : geopandas.GeoDataFramee
        GeoDataFrame defining the output geometries
    spatial_index : str
        Spatial index to use to build the allocation of area from source to
        target tables. It currently support the following values:
            - "source": build the spatial index on `source_df`
            - "target": build the spatial index on `target_df`
            - "auto": attempts to guess the most efficient alternative.
              Currently, this option uses the largest table to build the
              index, and performs a `bulk_query` on the shorter table.

    Returns
    -------
    tables : scipy.sparse.csr_matrix

    """
    if _check_crs(source_df, target_df):
        pass
    else:
        return None

    df1 = source_df.copy()
    df2 = target_df.copy()

    # it is generally more performant to use the longer df as spatial index
    if spatial_index == "auto":
        if df1.shape[0] > df2.shape[0]:
            spatial_index = "source"
        else:
            spatial_index = "target"

    if spatial_index == "source":
        ids_tgt, ids_src = df1.sindex.query(df2.geometry, predicate="intersects")
    elif spatial_index == "target":
        ids_src, ids_tgt = df2.sindex.query(df1.geometry, predicate="intersects")
    else:
        raise ValueError(
            f"'{spatial_index}' is not a valid option. Use 'auto', 'source' or 'target'."
        )

    areas = df1.geometry.values[ids_src].intersection(df2.geometry.values[ids_tgt]).area

    table = coo_matrix(
        (
            areas,
            (ids_src, ids_tgt),
        ),
        shape=(df1.shape[0], df2.shape[0]),
        dtype=np.float32,
    )

    table = table.tocsr()

    return table


[docs] def area_interpolate( source_df, target_df, extensive_variables=None, intensive_variables=None, table=None, allocate_total=True, spatial_index="auto", n_jobs=1, categorical_variables=None, categorical_frequency=True, ): """ Area interpolation for extensive, intensive and categorical variables. Parameters ---------- source_df : geopandas.GeoDataFrame target_df : geopandas.GeoDataFrame extensive_variables : list [Optional. Default=None] Columns in dataframes for extensive variables intensive_variables : list [Optional. Default=None] Columns in dataframes for intensive variables table : scipy.sparse.csr_matrix [Optional. Default=None] Area allocation source-target correspondence table. If not provided, it will be built from `source_df` and `target_df` using `tobler.area_interpolate._area_tables_binning` allocate_total : boolean [Optional. Default=True] True if total value of source area should be allocated. False if denominator is area of i. Note that the two cases would be identical when the area of the source polygon is exhausted by intersections. See Notes for more details. spatial_index : str [Optional. Default="auto"] Spatial index to use to build the allocation of area from source to target tables. It currently support the following values: - "source": build the spatial index on `source_df` - "target": build the spatial index on `target_df` - "auto": attempts to guess the most efficient alternative. Currently, this option uses the largest table to build the index, and performs a `bulk_query` on the shorter table. This argument is ignored if n_jobs>1 (or n_jobs=-1). n_jobs : int [Optional. Default=1] Number of processes to run in parallel to generate the area allocation. If -1, this is set to the number of CPUs available. If `table` is passed, this is ignored. categorical_variables : list [Optional. Default=None] Columns in dataframes for categorical variables categorical_frequency : Boolean [Optional. Default=True] If True, `estimates` returns the frequency of each value in a categorical variable in every polygon of `target_df` (proportion of area). If False, `estimates` contains the area in every polygon of `target_df` that is occupied by each value of the categorical Returns ------- estimates : geopandas.GeoDataFrame new geodataframe with interpolated variables as columns and target_df geometry as output geometry Notes ----- The assumption is both dataframes have the same coordinate reference system. For an extensive variable, the estimate at target polygon j (default case) is: .. math:: v_j = \\sum_i v_i w_{i,j} w_{i,j} = a_{i,j} / \\sum_k a_{i,k} If the area of the source polygon is not exhausted by intersections with target polygons and there is reason to not allocate the complete value of an extensive attribute, then setting allocate_total=False will use the following weights: $$v_j = \\sum_i v_i w_{i,j}$$ $$w_{i,j} = a_{i,j} / a_i$$ where a_i is the total area of source polygon i. For an intensive variable, the estimate at target polygon j is: $$v_j = \\sum_i v_i w_{i,j}$$ $$w_{i,j} = a_{i,j} / \\sum_k a_{k,j}$$ For categorical variables, the estimate returns ratio of presence of each unique category. """ source_df = source_df.copy() target_df = target_df.copy() if _check_crs(source_df, target_df): pass else: return None if table is None: if n_jobs == 1: table = _area_tables_binning(source_df, target_df, spatial_index) else: table = _area_tables_binning_parallel(source_df, target_df, n_jobs=n_jobs) dfs = [] extensive = [] if extensive_variables: den = source_df.area.values if allocate_total: den = np.asarray(table.sum(axis=1)) den = den + (den == 0) den = 1.0 / den n = den.shape[0] den = den.reshape((n,)) den = diags([den], [0]) weights = den.dot(table) # row standardize table for variable in extensive_variables: vals = _nan_check(source_df, variable) vals = _inf_check(source_df, variable) estimates = diags([vals], [0]).dot(weights) estimates = estimates.sum(axis=0) extensive.append(estimates.tolist()[0]) extensive = np.asarray(extensive) extensive = np.array(extensive) extensive = pd.DataFrame(extensive.T, columns=extensive_variables) intensive = [] if intensive_variables: area = np.asarray(table.sum(axis=0)) den = 1.0 / (area + (area == 0)) n, k = den.shape den = den.reshape((k,)) den = diags([den], [0]) weights = table.dot(den) for variable in intensive_variables: vals = _nan_check(source_df, variable) vals = _inf_check(source_df, variable) n = vals.shape[0] vals = vals.reshape((n,)) estimates = diags([vals], [0]) estimates = estimates.dot(weights).sum(axis=0) intensive.append(estimates.tolist()[0]) intensive = np.asarray(intensive) intensive = pd.DataFrame(intensive.T, columns=intensive_variables) if categorical_variables: categorical = {} for variable in categorical_variables: unique = source_df[variable].unique() for value in unique: mask = source_df[variable] == value categorical[f"{variable}_{value}"] = np.asarray( table[mask.to_numpy()].sum(axis=0) )[0] categorical = pd.DataFrame(categorical) if categorical_frequency is True: categorical = categorical.div(target_df.area.values, axis="rows") if extensive_variables: dfs.append(extensive) if intensive_variables: dfs.append(intensive) if categorical_variables: dfs.append(categorical) df = pd.concat(dfs, axis=1) df["geometry"] = target_df[target_df.geometry.name].reset_index(drop=True) df = gpd.GeoDataFrame(df.replace(np.inf, np.nan)) return df.set_index(target_df.index)