Source code for quickpaver._transfer_matrix
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2026 Antoine COLLET
"""
Code for efficient transfer matrix construction based on grid cells intersections.
"""
import numpy as np
import shapely
from scipy.sparse import coo_array, csc_array
from shapely.strtree import STRtree
[docs]
def compute_transfer_matrix(
source_grid: shapely.MultiPolygon,
target_grid: shapely.MultiPolygon,
is_sanity_check: bool = False,
) -> csc_array:
"""
Build a conservative transfer matrix between two polygonal grids.
The resulting sparse matrix distributes values defined on the
source grid onto the target grid using surface intersection
prorata weights.
The matrix coefficients are defined as:
:contentReference[oaicite:0]{index=0}
where:
- :math:`S_i` is the i-th source polygon
- :math:`T_j` is the j-th target polygon
Therefore:
.. math::
\\sum_j W_{ij} = 1
for every source polygon fully covered by the target grid.
Parameters
----------
source_grid : shapely.MultiPolygon
Source polygon grid.
target_grid : shapely.MultiPolygon
Target polygon grid.
is_sanity_check: bool
Whether to perform a sanity check at the end of the transfer.
The default is False.
Returns
-------
scipy.sparse.csc_array
Sparse conservative transfer matrix of shape:
.. math::
(n_{source}, n_{target})
such that:
.. math::
v_{target} = W^T v_{source}
Notes
-----
The implementation uses:
- STRtree spatial indexing
- vectorized Shapely intersection operations
- sparse COO matrix assembly
"""
# -----------------------------------------------------------------
# Convert geometries to NumPy object arrays
# -----------------------------------------------------------------
source_polygons: np.ndarray = np.asarray(
source_grid.geoms,
dtype=object,
)
target_polygons: np.ndarray = np.asarray(
target_grid.geoms,
dtype=object,
)
n_source: int = len(source_polygons)
n_target: int = len(target_polygons)
# -----------------------------------------------------------------
# Build spatial index on source polygons
# -----------------------------------------------------------------
tree: STRtree = STRtree(source_polygons)
# -----------------------------------------------------------------
# Query all intersecting polygon pairs
#
# Returned shape:
#
# pairs[0] -> indices in target_polygons
# pairs[1] -> indices in source_polygons
# -----------------------------------------------------------------
pairs: np.ndarray = tree.query(
target_polygons,
predicate="intersects",
)
target_indices: np.ndarray = pairs[0]
source_indices: np.ndarray = pairs[1]
# -----------------------------------------------------------------
# Compute vectorized polygon intersections
# -----------------------------------------------------------------
intersections: np.ndarray = shapely.intersection(
source_polygons[source_indices],
target_polygons[target_indices],
)
# -----------------------------------------------------------------
# Compute intersection areas
# -----------------------------------------------------------------
intersection_areas: np.ndarray = shapely.area(intersections)
# Remove empty / numerical-noise intersections
valid_mask: np.ndarray = intersection_areas > 1e-15
source_indices = source_indices[valid_mask]
target_indices = target_indices[valid_mask]
intersection_areas = intersection_areas[valid_mask]
# -----------------------------------------------------------------
# Conservative normalization
#
# Each source polygon distributes 100% of its quantity
# over intersecting target polygons.
# -----------------------------------------------------------------
source_areas: np.ndarray = shapely.area(source_polygons)
weights: np.ndarray = intersection_areas / source_areas[source_indices]
# -----------------------------------------------------------------
# Assemble sparse transfer matrix
# -----------------------------------------------------------------
transfer_matrix: csc_array = coo_array(
(
weights,
(
source_indices,
target_indices,
),
),
shape=(n_source, n_target),
).tocsc()
# -----------------------------------------------------------------
# Sanity check:
# each source polygon must conserve its full quantity
# -----------------------------------------------------------------
# sanity check
if is_sanity_check:
np.testing.assert_allclose(
(transfer_matrix.T * source_areas).sum(axis=1)
/ shapely.area(target_polygons),
np.ones(transfer_matrix.shape[1]),
atol=1e-10,
)
return transfer_matrix