"""Pycnophylactic Interpolation (contributed by @danlewis85)."""
# https://github.com/danlewis85/pycno/
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
from numpy import (
absolute,
apply_along_axis,
asarray,
convolve,
copy,
nan,
nanmax,
nanmean,
nansum,
pad,
power,
round,
unique,
)
from numpy.ma import masked_invalid, masked_where
from pandas import DataFrame
from rasterio.features import rasterize
def pycno(
gdf, value_field, cellsize, r=0.2, handle_null=True, converge=3, verbose=True
):
"""Returns a smooth pycnophylactic interpolation raster for a given geodataframe
Args:
gdf (geopandas.geodataframe.GeoDataFrame): Input GeoDataFrame.
value_field (str): Field name of values to be used to produce pycnophylactic surface
cellsize (int): Pixel size of raster in planar units (i.e. metres, feet)
r (float, optional): Relaxation parameter, default of 0.2 is generally fine.
handle_null (boolean, optional): Changes how nodata values are smoothed. Default True.
converge (int, optional): Index for stopping value, default 3 is generally fine.
verbose (boolean, optional): Print out progress at each iteration.
Returns:
Numpy Array: Smooth pycnophylactic interpolation.
Rasterio geotransform
GeoPandas crs
"""
# set nodata value
nodata = -9999
# work out raster rows and columns based on gdf extent and cellsize
xmin, ymin, xmax, ymax = gdf.total_bounds
xres = int((xmax - xmin) / cellsize)
yres = int((ymax - ymin) / cellsize)
# Work out transform so that we rasterize the area where the data are!
trans = rasterio.Affine.from_gdal(xmin, cellsize, 0, ymax, 0, -cellsize)
# First make a zone array
# NB using index values as ids can often be too large/alphanumeric. Limit is int32 polygon features.
# create a generator of geom, index pairs to use in rasterizing
shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf.index))
# burn the features into a raster array
feature_array = rasterize(
shapes=shapes, fill=nodata, out_shape=(yres, xres), transform=trans
)
# Get cell counts per index value (feature)
unique, count = np.unique(feature_array, return_counts=True)
cellcounts = asarray((unique, count)).T
# Lose the nodata counts
cellcounts = cellcounts[cellcounts[:, 0] != nodata, :]
# Adjust value totals by cells
# Make cell counts dataframe
celldf = DataFrame(cellcounts[:, 1], index=cellcounts[:, 0], columns=["cellcount"])
# Merge cell counts
gdf = gdf.merge(celldf, how="left", left_index=True, right_index=True)
# Calculate cell values
gdf["cellvalues"] = gdf[value_field] / gdf["cellcount"]
# create a generator of geom, cellvalue pairs to use in rasterizing
shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf.cellvalues))
# Now burn the initial value raster
value_array = rasterize(
shapes=shapes, fill=nodata, out_shape=(yres, xres), transform=trans
)
# Set nodata in value array to np.nan
value_array[value_array == -9999] = nan
# Set stopper value based on converge parameter
stopper = nanmax(value_array) * power(10.0, -converge)
# The basic numpy convolve function doesn't handle nulls.
def smooth2D(data):
# Create function that calls a 1 dimensionsal smoother.
s1d = lambda s: convolve(s, [0.5, 0.0, 0.5], mode="same")
# pad the data array with the mean value
padarray = pad(data, 1, "constant", constant_values=nanmean(data))
# make nodata mask
mask = masked_invalid(padarray).mask
# set nodata as zero to avoid eroding the raster
padarray[mask] = 0.0
# Apply the convolution along each axis of the data and average
padarray = (
apply_along_axis(s1d, 1, padarray) + apply_along_axis(s1d, 0, padarray)
) / 2
# Reinstate nodata
padarray[mask] = nan
return padarray[1:-1, 1:-1]
# The convolution function from astropy handles nulls.
def astroSmooth2d(data):
try:
from astropy.convolution import convolve as astro_convolve
except (ImportError, ModuleNotFoundError) as err:
raise ImportError(
"Pycnophylactic interpolation with handle_null=True "
"requires the astropy package"
) from err
s1d = lambda s: astro_convolve(s, [0.5, 0, 0.5])
# pad the data array with the mean value
padarray = pad(data, 1, "constant", constant_values=nanmean(data))
# Apply the convolution along each axis of the data and average
padarray = (
apply_along_axis(s1d, 1, padarray) + apply_along_axis(s1d, 0, padarray)
) / 2
return padarray[1:-1, 1:-1]
def correct2Da(data):
for idx, val in gdf[value_field].items():
# Create zone mask from feature_array
mask = masked_where(feature_array == idx, feature_array).mask
# Work out the correction factor
correct = (val - nansum(data[mask])) / mask.sum()
# Apply correction
data[mask] += correct
return data
def correct2Dm(data):
for idx, val in gdf[value_field].items():
# Create zone mask from feature_array
mask = masked_where(feature_array == idx, feature_array).mask
# Work out the correction factor
correct = val / nansum(data[mask])
if correct != 0.0:
# Apply correction
data[mask] *= correct
return data
while True:
# Store the current iteration
old = copy(value_array)
# Smooth the value_array
if handle_null:
sm = astroSmooth2d(value_array)
else:
sm = smooth2D(value_array)
# Relaxation to prevent overcompensation in the smoothing step
value_array = value_array * r + (1.0 - r) * sm
# Perform correction
value_array = correct2Da(value_array)
# Reset any negative values to zero.
value_array[value_array < 0] = 0.0
# Perform correction
value_array = correct2Dm(value_array)
if verbose:
print(
"Maximum Change: "
+ str(round(nanmax(absolute(old - value_array)), 4))
+ " - will stop at "
+ str(round(stopper, 4))
)
if nanmax(absolute(old - value_array)) < stopper:
break
return (value_array, trans, gdf.crs)
def save_pycno(pycno_array, transform, crs, filestring, driver="GTiff"):
"""Saves a numpy array as a raster, largely a helper function for pycno
Args:
pycno_array (numpy array): 2D numpy array of pycnophylactic surface
transform (rasterio geotransform): Relevant transform from pycno()
crs (GeoPandas crs): Coordinate reference system of GeoDataFrame used in pycno()
filestring (str): File path to save raster
driver (str, optional): Format for output raster, default: geoTiff.
Returns:
None
"""
import rasterio
# Save raster
new_dataset = rasterio.open(
filestring,
"w",
driver=driver,
height=pycno_array.shape[0],
width=pycno_array.shape[1],
count=1,
dtype="float64",
crs=crs,
transform=transform,
)
new_dataset.write(pycno_array.astype("float64"), 1)
new_dataset.close()
return None
def extract_values(pycno_array, gdf, transform, fieldname="Estimate"):
"""Extract raster value sums according to a provided polygon geodataframe
Args:
pycno_array (numpy array): 2D numpy array of pycnophylactic surface.
gdf (geopandas.geodataframe.GeoDataFrame): Target GeoDataFrame.
transform (rasterio geotransform): Relevant transform from pycno()
fieldname (str, optional): New gdf field to save estimates in. Default name: 'Estimate'.
Returns:
geopandas.geodataframe.GeoDataFrame: Target GeoDataFrame with appended estimates.
"""
from numpy import nansum
from rasterio.features import geometry_mask
estimates = []
# Iterate through geodataframe and extract values
for idx, geom in gdf["geometry"].items():
mask = geometry_mask(
[geom], pycno_array.shape, transform=transform, invert=True
)
estimates.append(nansum(pycno_array[mask]))
out = pd.Series(estimates, index=gdf.index)
return out
[docs]def pycno_interpolate(
source_df,
target_df,
variables,
cellsize,
r=0.2,
handle_null=True,
converge=3,
verbose=False,
):
"""Pycnophylactic Inerpolation.
Parameters
----------
source_df : geopandas.GeoDataFrame (required)
geodataframe with polygon geometries and data to transfer
target_df : geopandas.GeoDataFrame (required)
geodataframe with polygon geometries to receive new data
variables : list
columns on the source_df containing data to transfer
cellsize : int
Pixel size of intermediate raster in planar units (i.e. metres, feet)
r : float, optional
Relaxation parameter, default of 0.2 is generally fine
handle_null : bool, optional
Changes how nodata values are smoothed. Default True.
converge : int, optional
Index for stopping value, default 3 is generally fine.
verbose : bool, optional
Print out progress at each iteration.
Returns
-------
geopandas.GeoDataFrame
new geodataframe with interpolated variables as columns and target_df geometry
as output geometry
Notes
-----
The formula is based on Tobler, W. R. (1979). Smooth pycnophylactic interpolation for geographical regions. Journal of the American Statistical Association, 74(367), 519–529. https://doi.org/10.1080/01621459.1979.10481647
Original implementation written by @danlewis85 at <https://github.com/danlewis85/pycno/>
and based in part on the R pycno package by Chris Brusndon (<https://cran.r-project.org/web/packages/pycno/index.html>)
References: :cite:`tobler_smooth_1979`
"""
assert source_df.crs.equals(
target_df.crs
), "source_df CRS and target_df CRS are not the same. Reproject into consistent systems before proceeding"
output_vars = target_df.copy()[[target_df.geometry.name]]
for variable in variables:
pyc, trans, _ = pycno(
source_df,
variable,
cellsize=cellsize,
r=r,
handle_null=handle_null,
converge=converge,
verbose=verbose,
)
vals = extract_values(pyc, target_df, transform=trans)
output_vars[variable] = vals
return output_vars