Commit 3987a530 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added function auto_adjust_overlaps_gaps and its test

parent 1b509592
......@@ -962,3 +962,170 @@ def adjust_coord_of_polygons(poly1, poly2, cardinal_str):
)
return new_poly1, new_poly2
def auto_adjust_overlaps_gaps(
in_gdf, overlaps_gdf, col_lon, col_lat, width_EW, width_NS, case, col_1="id_1", col_2="id_2"
):
"""This function resolves overlaps/gaps between cells by averaging the coordinates of the
corresponding boundaries. The overlaps existing in the cells of in_gdf are indicated in
overlaps_gdf, in which each row is an overlap case involving two cells. When case ==
"overlap", overlaps_gdf should contain the intersections of in_gdf itself, while when case
== "gap", overlaps_gdf should contain the intersections of an enlarged version of the cells
that allows to identify the potential gaps. When case == "overlap", overlaps_gdf is used
simply to identify neighbouring cells that overlap each other. When case == "gap",
overlaps_gdf is also used in combination with in_gdf to define whether there is a gap or
not, by subtracting the original cell geometries from in_gdf from the intersection of the
enlarged cells. If the resulting geometry is one Polygon, then there is a gap and this
function will fix it (by averaging the coordinates); if the resulting geometry is a
MultiPolygon then there is no gap to be fixed and the function moves on to analysing the
next case.
By calling angles_from_coords() and determine_cardinal_point(), the function identifies the
relative position of one cell with respect to the other (e.g. to the north, to the
south-east, etc.) to be able to decide which coordinates of the cells' geometries need to be
adjusted. The classification of the relative positions into cardinal points (eight cases,
north, north-east, east, south-east, south, south-west, west, north-west) is a function of
the widths of the cells (width_EW, width_NS), and is determined by calling
define_threshold_angle(). The adjustment of the cells' geometries is carried out by calling
adjust_coord_of_polygons().
Args:
in_gdf (Geopandas GeoDataFrame): GeoDataFrame with the cells geometries and original
centroids. Its "geometry" column contains the cells'
polygons. It also contains the columns "lon_w",
"lat_s", "lon_e", and "lat_n", with the four
coordinates that define each cell (floats). It is meant
to be the output of function
build_adjusted_cells_dataframe().
overlaps_gdf (Geopandas GeoDataFrame): GeoDataFrame with the geometry of the
intersections of the polygons in in_gdf (if case
== "overlap") or an expanded version of in_gdf
(if case == "gap"). It is meant to be the output
of function overlap_by_full_geom_intersection().
col_lon (str): Name of the column in the file of the aggregated exposure model that
contains longitudes.
col_lat (str): Name of the column in the file of the aggregated exposure model that
contains latitudes.
width_EW (float): Width of the cell in the East-West direction, in degrees, >0.
width_NS (float): Width of the cell in the North-south direction, in degrees, >0.
case (str): Indicates whether the function should deal with an "overlap" or "gap".
col_1 (str): Name of first column of overlaps_gdf to consider for comparing if the entry
already exists. Default: "id_1".
col_2 (str): Name of second column of overlaps_gdf to consider for comparing if the
entry already exists. Default: "id_2".
Returns:
out_gdf (Geopandas GeoDataFrame): GeoDataFrame with the same structure as was given as
input. The geometries of the cells for which
overlaps_gdf indicated there were overlaps are the
adjusted geometries calculated within this function.
The geometries of the rest of the cells are the same
as given in the input (no change).
gaps_found (bool): True if actual gaps were found and case == "gap". If case ==
"overlap", it will be False.
"""
# Check that case is either "overlap" or "gap":
if not (case == "overlap" or case == "gap"):
print("ERROR!! case can only be 'overlap' or 'gap' in auto_adjust_overlaps_gaps")
return {}
out_gdf = deepcopy(in_gdf)
# Calculate threshold angle that allows to define the relative position between overlapping
# cells:
ang_thr = define_threshold_angle(width_EW, width_NS)
# Calculate angles between te E-W line and a segment that starts at the centroid of the
# first cell and finishes at the centroid of the second cell, with the former centroid being
# the intersection between the two lines:
angles = angles_from_coords(
overlaps_gdf["%s_1" % (col_lon)].values,
overlaps_gdf["%s_1" % (col_lat)].values,
overlaps_gdf["%s_2" % (col_lon)].values,
overlaps_gdf["%s_2" % (col_lat)].values,
)
# Determine the cardinal points at which the second cell (col_2) is located with respect to
# the first cell (col_1):
cardinal_pts = determine_cardinal_point(angles, ang_thr, inputangle="radians")
# Go one by one the cases of overlaps/gaps found and adjust them:
gaps_found = False
for i in range(overlaps_gdf.shape[0]):
# IDs of the two cells involved:
id_1 = overlaps_gdf[col_1].values[i]
id_2 = overlaps_gdf[col_2].values[i]
if case == "gap":
how_many_intersections = max(
len(np.where(overlaps_gdf[col_1].values == id_1)[0]),
len(np.where(overlaps_gdf[col_1].values == id_2)[0]),
len(np.where(overlaps_gdf[col_2].values == id_1)[0]),
len(np.where(overlaps_gdf[col_2].values == id_2)[0]),
)
escape_condition = (how_many_intersections > 1) and (
(cardinal_pts[i] == "NW")
or (cardinal_pts[i] == "NE")
or (cardinal_pts[i] == "SW")
or (cardinal_pts[i] == "SE")
)
if escape_condition:
# If the cells are in diagonal with respect to one another and they participate
# of other cases of intersection, ignore:
continue
# Auxiliary GeoDataFrame with original cell geometries:
aux_orig = out_gdf[(out_gdf.id == id_1) | (out_gdf.id == id_2)]
# Auxiliary GeoDataFrame with just the geometry of this intersection:
aux_intersect = overlaps_gdf[
(overlaps_gdf.id_1 == id_1) & (overlaps_gdf.id_2 == id_2)
]
# Subtract the original cells geometry from the intersection:
subtraction = gpd.overlay(aux_intersect, aux_orig, how="difference")
if subtraction["geometry"].values[0].geom_type == "MultiPolygon":
# If MultiPolygon, no gap exists, thus no adjustment needed:
continue
gaps_found = True
# Identify the rows of these cells in out_gdf:
which_1 = np.where(out_gdf["id"].values == id_1)[0]
which_2 = np.where(out_gdf["id"].values == id_2)[0]
# Retrieve the original polygons of these cells:
polygon1 = out_gdf["geometry"].values[which_1[0]]
polygon2 = out_gdf["geometry"].values[which_2[0]]
# Calculate new polygons:
newpolygon1, newpolygon2 = adjust_coord_of_polygons(polygon1, polygon2, cardinal_pts[i])
# Write the new geometries to out_gdf:
out_gdf["geometry"].values[which_1[0]] = newpolygon1
out_gdf["geometry"].values[which_2[0]] = newpolygon2
# Write columns "lon_w", "lat_s", "lon_e", and "lat_n" of out_gdf, if they exist:
if (
"lon_w" in out_gdf.columns
and "lon_e" in out_gdf.columns
and "lat_s" in out_gdf.columns
and "lat_n" in out_gdf.columns
):
# Polygon 1:
out_gdf["lon_w"].values[which_1[0]] = newpolygon1.bounds[0]
out_gdf["lon_e"].values[which_1[0]] = newpolygon1.bounds[2]
out_gdf["lat_s"].values[which_1[0]] = newpolygon1.bounds[1]
out_gdf["lat_n"].values[which_1[0]] = newpolygon1.bounds[3]
# Polygon 2:
out_gdf["lon_w"].values[which_2[0]] = newpolygon2.bounds[0]
out_gdf["lon_e"].values[which_2[0]] = newpolygon2.bounds[2]
out_gdf["lat_s"].values[which_2[0]] = newpolygon2.bounds[1]
out_gdf["lat_n"].values[which_2[0]] = newpolygon2.bounds[3]
return out_gdf, gaps_found
......@@ -895,3 +895,337 @@ def test_adjust_coord_of_polygons():
assert function_poly1.area == should_be_1[i].area
assert function_poly2.bounds == should_be_2[i].bounds
assert function_poly2.area == should_be_2[i].area
def test_auto_adjust_overlaps_gaps():
# Test that the function terminates if input variable "case" is not correct:
# ==========================================================================
assert {} == gdet_cr_ind.auto_adjust_overlaps_gaps([], [], "", "", 0.0, 0.0, "some_case")
# General variables for all tests that follow:
col_lon = "LONGITUDE"
col_lat = "LATITUDE"
width_EW = 30.0 / (60.0 * 60.0) # 30 arcsec
width_NS = 30.0 / (60.0 * 60.0) # 30 arcsec
col_1 = "id_1"
col_2 = "id_2"
# Test a case of overlaps:
# ========================
# Define a GeoDataFrame with the cells' geometry:
smaller_spacing = 0.0082
points_lon = np.array(
[
12.0,
12.0,
12.0 + smaller_spacing,
12.0 + (smaller_spacing - 0.0001),
12.0 + 2.0 * (smaller_spacing - 0.0001),
]
)
points_lat = np.array(
[
40.0,
40.0 + smaller_spacing,
40.0,
40.0 + smaller_spacing,
40.0 + 2.0 * (smaller_spacing - 0.0001),
]
)
longitudes_w = points_lon - width_EW / 2.0
longitudes_e = points_lon + width_EW / 2.0
latitudes_s = points_lat - width_NS / 2.0
latitudes_n = points_lat + width_NS / 2.0
geoms = []
for i in range(0, len(points_lon)):
geoms.append(
Polygon(
[
(longitudes_w[i], latitudes_s[i]),
(longitudes_e[i], latitudes_s[i]),
(longitudes_e[i], latitudes_n[i]),
(longitudes_w[i], latitudes_n[i]),
]
)
)
d = {
"id": np.array(["ID_%s" % (i) for i in range(1, len(geoms) + 1)]),
"LONGITUDE": points_lon,
"LATITUDE": points_lat,
"lon_w": longitudes_w,
"lon_e": longitudes_e,
"lat_s": latitudes_s,
"lat_n": latitudes_n,
"geometry": geoms,
}
cells_gdf = gpd.GeoDataFrame(d, geometry="geometry", crs="EPSG:4326")
# Define a GeoDataFrame with the cells' intersections:
ids = {}
ids[1] = ["ID_1", "ID_1", "ID_1", "ID_2", "ID_2", "ID_3", "ID_4"]
ids[2] = ["ID_2", "ID_3", "ID_4", "ID_3", "ID_4", "ID_4", "ID_5"]
geoms = [
Polygon(
[
(12.0 - width_EW / 2.0, 40.0 + smaller_spacing - width_NS / 2.0), # IDs 1 & 2
(12.0 + width_EW / 2.0, 40.0 + smaller_spacing - width_NS / 2.0),
(12.0 + width_EW / 2.0, 40.0 + width_NS / 2.0),
(12.0 - width_EW / 2.0, 40.0 + width_NS / 2.0),
]
),
Polygon(
[
(12.0 + smaller_spacing - width_EW / 2.0, 40.0 - width_NS / 2.0), # IDs 1 & 3
(12.0 + width_EW / 2.0, 40.0 - width_NS / 2.0),
(12.0 + width_EW / 2.0, 40.0 + width_NS / 2.0),
(12.0 + smaller_spacing - width_EW / 2.0, 40.0 + width_NS / 2.0),
]
),
Polygon(
[
(
12.0 + (smaller_spacing - 0.0001) - width_EW / 2.0,
40.0 + smaller_spacing - width_NS / 2.0,
), # IDs 1 & 4
(12.0 + width_EW / 2.0, 40.0 + smaller_spacing - width_NS / 2.0),
(12.0 + width_EW / 2.0, 40.0 + width_NS / 2.0),
(12.0 + (smaller_spacing - 0.0001) - width_EW / 2.0, 40.0 + width_NS / 2.0),
]
),
Polygon(
[
(
12.0 + smaller_spacing - width_EW / 2.0,
40.0 + smaller_spacing - width_NS / 2.0,
), # IDs 2 & 3
(12.0 + width_EW / 2.0, 40.0 + smaller_spacing - width_NS / 2.0),
(12.0 + width_EW / 2.0, 40.0 + width_NS / 2.0),
(12.0 + smaller_spacing - width_EW / 2.0, 40.0 + width_NS / 2.0),
]
),
Polygon(
[
(
12.0 + (smaller_spacing - 0.0001) - width_EW / 2.0,
40.0 + smaller_spacing - width_NS / 2.0,
), # IDs 2 & 4
(12.0 + width_EW / 2.0, 40.0 + smaller_spacing - width_NS / 2.0),
(12.0 + width_EW / 2.0, 40.0 + smaller_spacing + width_NS / 2.0),
(
12.0 + (smaller_spacing - 0.0001) - width_EW / 2.0,
40.0 + smaller_spacing + width_NS / 2.0,
),
]
),
Polygon(
[
(
12.0 + smaller_spacing - width_EW / 2.0,
40.0 + smaller_spacing - width_NS / 2.0,
), # IDs 3 & 4
(
12.0 + (smaller_spacing - 0.0001) + width_EW / 2.0,
40.0 + smaller_spacing - width_NS / 2.0,
),
(12.0 + (smaller_spacing - 0.0001) + width_EW / 2.0, 40.0 + width_NS / 2.0),
(12.0 + smaller_spacing - width_EW / 2.0, 40.0 + width_NS / 2.0),
]
),
Polygon(
[
(
12.0 + 2.0 * (smaller_spacing - 0.0001) - width_EW / 2.0,
40.0 + 2.0 * (smaller_spacing - 0.0001) - width_NS / 2.0,
), # IDs 4 & 5
(
12.0 + (smaller_spacing - 0.0001) + width_EW / 2.0,
40.0 + 2.0 * (smaller_spacing - 0.0001) - width_NS / 2.0,
),
(
12.0 + (smaller_spacing - 0.0001) + width_EW / 2.0,
40.0 + smaller_spacing + width_NS / 2.0,
),
(
12.0 + 2.0 * (smaller_spacing - 0.0001) - width_EW / 2.0,
40.0 + smaller_spacing + width_NS / 2.0,
),
]
),
]
cols = ["LONGITUDE", "LATITUDE", "lon_w", "lon_e", "lat_s", "lat_n"]
aux_dict = {}
for col_name in cols:
for val in range(1, 3):
aux_list = []
for i in range(0, len(ids[val])):
aux_list.append(cells_gdf[cells_gdf.id == ids[val][i]][col_name].values[0])
aux_dict["%s_%s" % (col_name, str(val))] = np.array(aux_list)
d = {"id_1": ids[1], "id_2": ids[2]}
for key in aux_dict.keys():
d[key] = aux_dict[key]
d["geometry"] = geoms
intersections_gdf = gpd.GeoDataFrame(d, geometry="geometry", crs="EPSG:4326")
# Expected output:
expected_out_gdf = deepcopy(cells_gdf)
# Manual adustment done by row of the intersections GeoDataFrame (i.e. the order matters):
# IDs 1 & 2
expected_out_gdf["lat_n"].values[0] = (
expected_out_gdf["lat_n"].values[0] + expected_out_gdf["lat_s"].values[1]
) / 2.0
expected_out_gdf["lat_s"].values[1] = expected_out_gdf["lat_n"].values[0]
# IDs 1 & 3:
expected_out_gdf["lon_e"].values[0] = (
expected_out_gdf["lon_e"].values[0] + expected_out_gdf["lon_w"].values[2]
) / 2.0
expected_out_gdf["lon_w"].values[2] = expected_out_gdf["lon_e"].values[0]
# IDs 1 & 4:
expected_out_gdf["lat_n"].values[0] = (
expected_out_gdf["lat_n"].values[0] + expected_out_gdf["lat_s"].values[3]
) / 2.0
expected_out_gdf["lat_s"].values[3] = expected_out_gdf["lat_n"].values[0]
expected_out_gdf["lon_e"].values[0] = (
expected_out_gdf["lon_e"].values[0] + expected_out_gdf["lon_w"].values[3]
) / 2.0
expected_out_gdf["lon_w"].values[3] = expected_out_gdf["lon_e"].values[0]
# IDs 2 & 3:
expected_out_gdf["lon_e"].values[1] = (
expected_out_gdf["lon_e"].values[1] + expected_out_gdf["lon_w"].values[2]
) / 2.0
expected_out_gdf["lon_w"].values[2] = expected_out_gdf["lon_e"].values[1]
expected_out_gdf["lat_s"].values[1] = (
expected_out_gdf["lat_s"].values[1] + expected_out_gdf["lat_n"].values[2]
) / 2.0
expected_out_gdf["lat_n"].values[2] = expected_out_gdf["lat_s"].values[1]
# IDs 2 & 4:
expected_out_gdf["lon_e"].values[1] = (
expected_out_gdf["lon_e"].values[1] + expected_out_gdf["lon_w"].values[3]
) / 2.0
expected_out_gdf["lon_w"].values[3] = expected_out_gdf["lon_e"].values[1]
# IDs 3 & 4:
expected_out_gdf["lat_n"].values[2] = (
expected_out_gdf["lat_n"].values[2] + expected_out_gdf["lat_s"].values[3]
) / 2.0
expected_out_gdf["lat_s"].values[3] = expected_out_gdf["lat_n"].values[2]
# IDs 4 & 5:
expected_out_gdf["lat_n"].values[3] = (
expected_out_gdf["lat_n"].values[3] + expected_out_gdf["lat_s"].values[4]
) / 2.0
expected_out_gdf["lat_s"].values[4] = expected_out_gdf["lat_n"].values[3]
expected_out_gdf["lon_e"].values[3] = (
expected_out_gdf["lon_e"].values[3] + expected_out_gdf["lon_w"].values[4]
) / 2.0
expected_out_gdf["lon_w"].values[4] = expected_out_gdf["lon_e"].values[3]
new_geoms = []
for i in range(0, expected_out_gdf.shape[0]):
lon_w = expected_out_gdf["lon_w"].values[i]
lon_e = expected_out_gdf["lon_e"].values[i]
lat_s = expected_out_gdf["lat_s"].values[i]
lat_n = expected_out_gdf["lat_n"].values[i]
new_geoms.append(
Polygon([(lon_w, lat_s), (lon_e, lat_s), (lon_e, lat_n), (lon_w, lat_n)])
)
expected_out_gdf["geometry"] = new_geoms
# Output from function to be tested:
function_out_gdf, flag = gdet_cr_ind.auto_adjust_overlaps_gaps(
cells_gdf,
intersections_gdf,
col_lon,
col_lat,
width_EW,
width_NS,
"overlap",
col_1=col_1,
col_2=col_2,
)
assert flag == False
for rownum in range(0, expected_out_gdf.shape[0]):
bounds_expected = expected_out_gdf["geometry"].values[rownum].bounds
bounds_function = function_out_gdf["geometry"].values[rownum].bounds
for position in range(0, 4):
assert round(bounds_expected[position], 12) == round(bounds_function[position], 12)
np.testing.assert_allclose(
expected_out_gdf["lon_w"].values, function_out_gdf["lon_w"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lon_e"].values, function_out_gdf["lon_e"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lat_s"].values, function_out_gdf["lat_s"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lat_n"].values, function_out_gdf["lat_n"].values, rtol=0.0, atol=1e-08
)
# Test a case of gaps:
# ====================
# The output GeoDataFrame from the test of overlaps (expected_out_gdf) contains gaps and
# is used herein to test case="gap" (this would be the normal flow of the main code too):
# The new input is:
cells_gdf = deepcopy(expected_out_gdf)
cells_offset_gdf = gdet_cr_ind.swell_cells_with_buffer(
cells_gdf, 0.25 * width_EW, 0.25 * width_NS
)
intersections_gdf = gdet_cr_ind.overlap_by_full_geom_intersection(
cells_offset_gdf, "id_1", "id_2"
)
# The new expected output is:
# IDs 1 & 2
expected_out_gdf["lat_n"].values[0] = (
expected_out_gdf["lat_n"].values[0] + expected_out_gdf["lat_s"].values[1]
) / 2.0
expected_out_gdf["lat_s"].values[1] = expected_out_gdf["lat_n"].values[0]
# IDs 1 & 3:
expected_out_gdf["lon_e"].values[0] = (
expected_out_gdf["lon_e"].values[0] + expected_out_gdf["lon_w"].values[2]
) / 2.0
expected_out_gdf["lon_w"].values[2] = expected_out_gdf["lon_e"].values[0]
new_geoms = []
for i in range(0, expected_out_gdf.shape[0]):
lon_w = expected_out_gdf["lon_w"].values[i]
lon_e = expected_out_gdf["lon_e"].values[i]
lat_s = expected_out_gdf["lat_s"].values[i]
lat_n = expected_out_gdf["lat_n"].values[i]
new_geoms.append(
Polygon([(lon_w, lat_s), (lon_e, lat_s), (lon_e, lat_n), (lon_w, lat_n)])
)
expected_out_gdf["geometry"] = new_geoms
# Output from function to be tested:
function_out_gdf, flag = gdet_cr_ind.auto_adjust_overlaps_gaps(
cells_gdf,
intersections_gdf,
col_lon,
col_lat,
width_EW,
width_NS,
"gap",
col_1=col_1,
col_2=col_2,
)
assert flag == True
for rownum in range(0, expected_out_gdf.shape[0]):
bounds_expected = expected_out_gdf["geometry"].values[rownum].bounds
bounds_function = function_out_gdf["geometry"].values[rownum].bounds
for position in range(0, 4):
assert round(bounds_expected[position], 12) == round(bounds_function[position], 12)
np.testing.assert_allclose(
expected_out_gdf["lon_w"].values, function_out_gdf["lon_w"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lon_e"].values, function_out_gdf["lon_e"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lat_s"].values, function_out_gdf["lat_s"].values, rtol=0.0, atol=1e-08
)
np.testing.assert_allclose(
expected_out_gdf["lat_n"].values, function_out_gdf["lat_n"].values, rtol=0.0, atol=1e-08
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment