Commit 2574d4b2 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Merge branch 'feature/ind03' into 'master'

Added more functions to create cells around SERA industrial points

See merge request !13
parents dc371d58 e4f41cac
......@@ -413,3 +413,244 @@ def adjust_coords(coords_dict, coords_uq):
out_adj_coord[ct][pos] = new_coord_j
return out_adj_coord
def build_adjusted_cells_dataframe(in_gdf, coords_dict_adj):
"""This function returns in_gdf with its geometry adjusted as per coords_dict_adj. The
elements of in_gdf and coords_dict_adj are assumed to be in corresponding order (this
happens naturally when this function is run as part of the whole set of functions of this
module).
The "lon_w", "lat_s", "lon_e" and "lat_n" columns of in_gdf are directly replaced by the
"lon_w", "lat_s", "lon_e" and "lat_n" arrays of coords_dict_adj. The geometry is then
re-defined by creating new cells with these replaced coordinates and replacing the
"geometry" column of in_gdf with these new cells.
In reality, in_gdf is not directly modified. An independent copy is created to generate
the output out_gdf.
Args:
in_gdf (Geopandas GeoDataFrame): GeoDataFrame with the cells geometries and original
centroids. It is meant to be the output of function
define_cells_in_dataframe(). Its "geometry" column
contains the cells' polygons. It also contains the
columns "lon_w", "lat_s", "lon_e", and "lat_n", with
the four coordinates that define each cell (floats).
coords_dict_adj (dictionary): Dictionary with four keys, "lon_w" and "lon_e" in "lon"
and "lat_s" and "lat_n" in "lat". Each of these is an
array of floats of equal length (and length equal to the
number of rows in in_gdf). Their elements correspond to
one another. It is meant to be the output of function
adjust_coords().
Returns:
out_gdf (Geopandas GeoDataFrame): GeoDataFrame with the same structure and overall
content as the input in_gdf, but the coordinates and
geometry replaced as indicated in coords_dict_adj.
"""
if (
(coords_dict_adj["lon_w"].shape[0] != in_gdf.shape[0])
or (coords_dict_adj["lon_e"].shape[0] != in_gdf.shape[0])
or (coords_dict_adj["lat_s"].shape[0] != in_gdf.shape[0])
or (coords_dict_adj["lat_n"].shape[0] != in_gdf.shape[0])
):
print(
"ERROR!! One or more of the arrays in coords_dict_adj do not have the same length"
" as the number of rows of in_gdf"
)
return
# Initialise output GeoDataFrame:
out_gdf = deepcopy(in_gdf)
# Replace the columns of coordinates in the GeoDataFrame:
out_gdf["lon_w"] = coords_dict_adj["lon_w"]
out_gdf["lon_e"] = coords_dict_adj["lon_e"]
out_gdf["lat_s"] = coords_dict_adj["lat_s"]
out_gdf["lat_n"] = coords_dict_adj["lat_n"]
# Define new geometry:
new_geoms = []
for i in range(out_gdf.shape[0]):
new_geoms.append(
define_cell_polygon(
out_gdf["lon_w"].values[i],
out_gdf["lat_s"].values[i],
out_gdf["lon_e"].values[i],
out_gdf["lat_n"].values[i],
)
)
del out_gdf["geometry"]
out_gdf = gpd.GeoDataFrame(out_gdf, geometry=new_geoms, crs=in_gdf.crs)
return out_gdf
def define_col_names(colnames, repeat=2):
"""This function defines a list of columns names, which are those
of colnames plus "_X", where X is a number in [1,repeat]. If
"geometry" is in colnames, it gets ignored in this process. The
name "geometry" is added to the list at the end.
Args:
colnames (pandas.core.indexes.base.Index): Input column names.
repeat (int): Number of times the names will be repeated
Returns:
out_cols: List of column names, which are those of colnames
plus "_X", where X is a number in [1,repeat].
Example:
colnames = ['id', 'lon', 'lat', 'geometry']
repeat = 2
out_cols = ['id_1', 'lon_1', 'lat_1',
'id_2', 'lon_2', 'lat_2',
'geometry']
"""
out_cols = []
for i in range(repeat):
for col in colnames:
if col != "geometry":
out_cols.append("%s_%s" % (col, i + 1))
out_cols.append("geometry")
return out_cols
def overlap_by_full_geom_intersection(in_gdf, col_1="id_1", col_2="id_2", printprogress=False):
"""This function carries out the intersection of all the geometries in in_gdf with each
other. For doing so, it goes row by row of in_gdf, creates two auxiliary GeoDataFrames, one
with that row and another one without that row, and intersects the two. If the result of
this intersection is non-null, then it checks that the identified intersections have not
been already identified before, and appends to the output GeoDataFrame those that are "new".
This process was faster than adding them all and eliminating dupplicate entries at the
end by 10% when testing at the development stage with in_gdf that contained 1165 rows.
Args:
in_gdf (Geopandas GeoDataFrame): GeoDataFrame whose geometry is polygons.
col_1 (str): Name of first column of intsect_gdf to consider for comparing if the
entry already exists. Default: "id_1".
col_2 (str): Name of second column of intsect_gdf to consider for comparing if the
entry already exists. Default: "id_2".
printprogress (bool): If True, the progress is printed to screen.
Returns:
intsect_gdf (Geopandas GeoDataFrame): GeoDataFrame with the geometry of the
intersections of the polygons in in_gdf.
"""
intsect_gdf = gpd.GeoDataFrame(
columns=define_col_names(in_gdf.columns, 2), geometry=[], crs=in_gdf.crs
)
for i in range(in_gdf.shape[0]):
if printprogress:
print("\r Working on row %s of %s" % (i + 1, in_gdf.shape[0]), end="")
aux_df_i = deepcopy(in_gdf[in_gdf.index == in_gdf.index[i]])
aux_df_not_i = deepcopy(in_gdf[in_gdf.index != in_gdf.index[i]])
insct_i = gpd.overlay(aux_df_i, aux_df_not_i, how="intersection")
if insct_i.shape[0] > 0:
# Check if these intersections have been identified before in intsect_gdf
ind_to_drop_i = [] # empty list to collect indices to discard
for j in range(insct_i.shape[0]):
if insct_i[col_2].values[j] in intsect_gdf[col_1].values:
# do not include it again
ind_to_drop_i.append(insct_i.index[j])
# Only append to intsect_gdf the intersections that are not there already:
intsect_gdf = pd.concat([intsect_gdf, insct_i.drop(ind_to_drop_i)])
# Reset indices because concat keeps those of its input parts:
intsect_gdf.index = range(intsect_gdf.shape[0])
return intsect_gdf
def enforce_boundaries_lon(longitudes):
"""This function forces longitude values in longitudes to be in the range [-180.0,+180.0].
Args:
longitudes (array of floats)
Returns:
longitudes (array of floats): The same as the input, but with values smaller than -180.0
replaced by -180.0, and values larger than +180.0 replaced
by +180.0.
"""
longitudes = np.maximum(longitudes, -180.0 * np.ones_like(longitudes))
longitudes = np.minimum(longitudes, 180.0 * np.ones_like(longitudes))
return longitudes
def enforce_boundaries_lat(latitudes):
"""This function forces latitude values in latitudes to be in the range [-90.0,+90.0].
Args:
latitudes (array of floats)
Returns:
latitudes (array of floats): The same as the input, but with values smaller than -90.0
replaced by -90.0, and values larger than +90.0 replaced
by +90.0.
"""
latitudes = np.maximum(latitudes, -90.0 * np.ones_like(latitudes))
latitudes = np.minimum(latitudes, 90.0 * np.ones_like(latitudes))
return latitudes
def swell_cells_with_buffer(in_gdf, offset_lon, offset_lat):
"""This function outputs a GeoPandas GeoDataFrame like the input in_gdf but with the cell
geometries expanded by the values indicated by offset_lon (longitude) and offset_lat
(latitude).
Args:
in_gdf (Geopandas GeoDataFrame): GeoDataFrame with the cells geometries and original
centroids. Its "geometry" column contains the cells'
polygons. It also contains the columns "lon_w",
"lat_s", "lon_e", and "lat_n", with the four
coordinates that define each cell (floats). It is meant
to be the output of function
build_adjusted_cells_dataframe().
offset_lon (float): Value by which to offset the longitude coordinates, in units
compatible to thos of the geometries of in_gdf.
offset_lat (float): Value by which to offset the latitude coordinates, in units
compatible to thos of the geometries of in_gdf.
"""
if offset_lon < 0.0 or offset_lat < 0.0:
print("ERROR in swell_cells_with_buffer: offsets should be larger than/equal to zero.")
return
out_gdf = deepcopy(in_gdf)
out_gdf["lon_w"] = enforce_boundaries_lon(
out_gdf["lon_w"].values - offset_lon * np.ones([out_gdf.shape[0]])
)
out_gdf["lon_e"] = enforce_boundaries_lon(
out_gdf["lon_e"].values + offset_lon * np.ones([out_gdf.shape[0]])
)
out_gdf["lat_s"] = enforce_boundaries_lat(
out_gdf["lat_s"].values - offset_lat * np.ones([out_gdf.shape[0]])
)
out_gdf["lat_n"] = enforce_boundaries_lat(
out_gdf["lat_n"].values + offset_lat * np.ones([out_gdf.shape[0]])
)
new_polygons = []
for i in range(0, out_gdf.shape[0]):
new_polygons.append(
define_cell_polygon(
out_gdf["lon_w"].values[i],
out_gdf["lat_s"].values[i],
out_gdf["lon_e"].values[i],
out_gdf["lat_n"].values[i],
)
)
out_gdf["geometry"] = new_polygons
return out_gdf
......@@ -456,3 +456,293 @@ def test_adjust_coords():
np.testing.assert_allclose(
function_dict[key], expected_output_dict[key], rtol=0.0, atol=1e-08
)
def test_build_adjusted_cells_dataframe():
# Build a GeoPandas DataFrame for testing:
d = {
"lon_w": [23.01234, -17.56791, 23.01237, 57.32492, -17.56789],
"lon_e": [23.01218, 23.01225, -17.56783, 47.17212, 47.17214],
"lat_s": [38.31234, 38.31213, -41.46789, 21.47281, 62.91624],
"lat_n": [-15.25663, 38.31175, -41.46812, -41.46842, 21.47332],
"geometry": [
Polygon([(22.0, 37.8), (23.5, 37.8), (23.5, 38.8), (22.0, 38.8)]),
Polygon([(22.5, -41.8), (23.5, -41.8), (23.5, -40.9), (22.5, -40.9)]),
Polygon([(-18.0, 36.8), (-17.0, 36.8), (-17.0, 38.8), (-18.0, 38.8)]),
Polygon([(22.5, -41.8), (23.5, -41.8), (23.5, -40.9), (22.5, -40.9)]),
Polygon([(-18.5, -41.9), (-16.5, -41.9), (-16.5, -40.9), (-18.5, -40.9)]),
],
} # Note: the geometry does not matter, we just need some geometry there
dummy_gdf = gpd.GeoDataFrame(d)
# Create input dictionary of adjusted coordinates (coords_dict_adj):
coords_dict = {}
coords_dict["lon_w"] = np.array(
[
(23.01234 + 23.01237 + 23.01218 + 23.01225) / 4.0,
((-17.56791) + (-17.56789) + (-17.56783)) / 3.0,
(23.01234 + 23.01237 + 23.01218 + 23.01225) / 4.0,
57.32492,
((-17.56791) + (-17.56789) + (-17.56783)) / 3.0,
]
)
coords_dict["lon_e"] = np.array(
[
(23.01234 + 23.01237 + 23.01218 + 23.01225) / 4.0,
(23.01234 + 23.01237 + 23.01218 + 23.01225) / 4.0,
((-17.56791) + (-17.56789) + (-17.56783)) / 3.0,
(47.17212 + 47.17214) / 2.0,
(47.17212 + 47.17214) / 2.0,
]
)
coords_dict["lat_s"] = np.array(
[
(38.31234 + 38.31213 + 38.31175) / 3.0,
(38.31234 + 38.31213 + 38.31175) / 3.0,
((-41.46789) + (-41.46812) + (-41.46842)) / 3.0,
(21.47281 + 21.47332) / 2.0,
62.91624,
]
)
coords_dict["lat_n"] = np.array(
[
-15.25663,
(38.31234 + 38.31213 + 38.31175) / 3.0,
((-41.46789) + (-41.46812) + (-41.46842)) / 3.0,
((-41.46789) + (-41.46812) + (-41.46842)) / 3.0,
(21.47281 + 21.47332) / 2.0,
]
)
function_gdf = gdet_cr_ind.build_adjusted_cells_dataframe(dummy_gdf, coords_dict)
for key in coords_dict.keys():
np.testing.assert_allclose(
function_gdf[key].values, coords_dict[key], rtol=0.0, atol=1e-08
)
def test_define_col_names():
column_names = ["id", "lon", "lat", "geometry", "anything"]
how_many = 3
function_columns = gdet_cr_ind.define_col_names(column_names, repeat=how_many)
expected_output = [
"id_1",
"lon_1",
"lat_1",
"anything_1",
"id_2",
"lon_2",
"lat_2",
"anything_2",
"id_3",
"lon_3",
"lat_3",
"anything_3",
"geometry",
]
# Check the length of the output:
assert len(function_columns) == len(expected_output)
# Go element by element to check that the output is as expected:
for i in range(0, len(expected_output)):
assert function_columns[i] == expected_output[i]
def test_overlap_by_full_geom_intersection():
# Create datasets for testing:
col_1 = "id_1"
col_2 = "id_2"
# Define geometries of polygons (auxiliary step):
spacing = 1.0
lon_w = 12.0
lat_s = 40.0
lon_e = lon_w + spacing # 13.0
lat_n = lat_s + spacing # 41.0
geoms = [
Polygon([(lon_w, lat_s), (lon_e, lat_s), (lon_e, lat_n), (lon_w, lat_n)]),
Polygon([(lon_w, 39.2), (lon_e, 39.2), (lon_e, 40.2), (lon_w, 40.2)]), # S
Polygon([(11.2, lat_s), (12.2, lat_s), (12.2, lat_n), (11.2, lat_n)]), # W
Polygon([(12.8, 40.8), (13.8, 40.8), (13.8, 41.8), (12.8, 41.8)]), # NE
Polygon([(11.2, 39.2), (12.2, 39.2), (12.2, 40.2), (11.2, 40.2)]),
] # SW
d = {"id": np.array(["ID_%s" % (i) for i in range(1, len(geoms) + 1)]), "geometry": geoms}
input_gdf = gpd.GeoDataFrame(d, geometry="geometry", crs="EPSG:4326")
# Results should be (the order can be different, will not check by order):
should_be_geoms = [
Polygon([(lon_w, lat_s), (lon_e, lat_s), (lon_e, 40.2), (lon_w, 40.2)]),
Polygon([(lon_w, lat_s), (12.2, lat_s), (12.2, lat_n), (lon_w, lat_n)]),
Polygon([(12.8, 40.8), (lon_e, 40.8), (lon_e, lat_n), (12.8, lat_n)]),
Polygon([(lon_w, lat_s), (12.2, lat_s), (12.2, 40.2), (lon_w, 40.2)]),
Polygon([(lon_w, lat_s), (12.2, lat_s), (12.2, 40.2), (lon_w, 40.2)]),
Polygon([(lon_w, 39.2), (12.2, 39.2), (12.2, 40.2), (lon_w, 40.2)]),
Polygon([(11.2, lat_s), (12.2, lat_s), (12.2, 40.2), (11.2, 40.2)]),
]
should_be_id1 = ["ID_1", "ID_1", "ID_1", "ID_1", "ID_2", "ID_2", "ID_3"]
should_be_id2 = ["ID_2", "ID_3", "ID_4", "ID_5", "ID_3", "ID_5", "ID_5"]
# Get result from function to test:
function_out = gdet_cr_ind.overlap_by_full_geom_intersection(
input_gdf, col_1=col_1, col_2=col_2
)
# Test:
assert len(should_be_geoms) == function_out.shape[0]
# Go one by one the interections that should exist, keeping track of those found:
was_checked = np.array([False for i in range(function_out.shape[0])])
for i in range(len(should_be_geoms)):
# The IDs can be inverted, i.e. id1 can be id2 and viceversa. So we first check in
# one order and if no match is found check in the inverse order:
id1 = should_be_id1[i]
id2 = should_be_id2[i]
which1 = np.where(
np.logical_and(function_out[col_1].values == id1, function_out[col_2].values == id2)
)[0]
if (len(which1) == 1) and (not was_checked[which1]):
assert (
function_out["geometry"].values[which1[0]].bounds == should_be_geoms[i].bounds
)
assert function_out["geometry"].values[which1[0]].area == should_be_geoms[i].area
was_checked[which1[0]] = True
else: # Invert the order of id1 and id2
which2 = np.where(
np.logical_and(
function_out[col_1].values == id2, function_out[col_2].values == id1
)
)[0]
if (len(which2) == 1) and (not was_checked[which2]):
assert (
function_out["geometry"].values[which2[0]].bounds
== should_be_geoms[i].bounds
)
assert (
function_out["geometry"].values[which2[0]].area == should_be_geoms[i].area
)
was_checked[which2[0]] = True
else:
assert 0 == 1 # Create an assertion error because this result was not found
# Check that all rows of function_out have been checked:
assert not (np.any(np.logical_not(was_checked)))
def test_enforce_boundaries_lon():
longitudes = np.array(
[-215.0, -180.00001, -180.0, -150.0, -30.0, 0.0, 25.0, 98.0, 180.0, 180.00001, 215.0]
)
function_out = gdet_cr_ind.enforce_boundaries_lon(longitudes)
expected_output = np.array(
[-180.0, -180.0, -180.0, -150.0, -30.0, 0.0, 25.0, 98.0, 180.0, 180.0, 180.0]
)
np.testing.assert_allclose(function_out, expected_output, rtol=0.0, atol=1e-08)
def test_enforce_boundaries_lat():
latitudes = np.array(
[-215.0, -90.00001, -90.0, -30.0, 0.0, 25.0, 78.0, 90.0, 90.00001, 215.0]
)
function_out = gdet_cr_ind.enforce_boundaries_lat(latitudes)
expected_output = np.array([-90.0, -90.0, -90.0, -30.0, 0.0, 25.0, 78.0, 90.0, 90.0, 90.0])
np.testing.assert_allclose(function_out, expected_output, rtol=0.0, atol=1e-08)
def test_swell_cells_with_buffer():
# Test that the function terminates if the input offsets are negative:
assert gdet_cr_ind.swell_cells_with_buffer([], -0.5, 2.3) is None
assert gdet_cr_ind.swell_cells_with_buffer([], 0.5, -2.3) is None
assert gdet_cr_ind.swell_cells_with_buffer([], -0.5, -2.3) is None
# Build a GeoPandas DataFrame for testing:
spacing = 1.0
lon_w = 12.0
lat_s = 40.0
geoms = [
Polygon(
[
(lon_w, lat_s),
(lon_w + spacing, lat_s),
(lon_w + spacing, lat_s + spacing),
(lon_w, lat_s + spacing),
]
),
Polygon(
[
(lon_w + 0.1, lat_s - spacing),
(lon_w + spacing, lat_s - spacing),
(lon_w + spacing, lat_s),
(lon_w + 0.1, lat_s),
]
),
]
centr_lon = [lon_w + spacing / 2.0, lon_w + spacing / 2.0]
centr_lat = [lat_s + spacing / 2.0, lat_s - spacing / 2.0]
longitudes_w = []
longitudes_e = []
latitudes_s = []
latitudes_n = []
for poly in geoms:
longitudes_w.append(poly.bounds[0])
longitudes_e.append(poly.bounds[2])
latitudes_s.append(poly.bounds[1])
latitudes_n.append(poly.bounds[3])
d = {
"id": np.array(["ID_%s" % (i) for i in range(1, len(geoms) + 1)]),
"LONGITUDE": centr_lon,
"LATITUDE": centr_lat,
"lon_w": longitudes_w,
"lon_e": longitudes_e,
"lat_s": latitudes_s,
"lat_n": latitudes_n,
"geometry": geoms,
}
dummy_gdf = gpd.GeoDataFrame(d, geometry="geometry", crs="EPSG:4326")
offset_EW = 0.2
offset_NS = 0.1
expected_outputs = {}
expected_outputs["lon_w"] = np.array([lon_w - offset_EW, lon_w + 0.1 - offset_EW])
expected_outputs["lon_e"] = np.array(
[lon_w + spacing + offset_EW, lon_w + spacing + offset_EW]
)
expected_outputs["lat_s"] = np.array([lat_s - offset_NS, lat_s - spacing - offset_NS])
expected_outputs["lat_n"] = np.array([lat_s + spacing + offset_NS, lat_s + offset_NS])
expected_geometries = np.array(
[
Polygon(
[
(lon_w - offset_EW, lat_s - offset_NS),
(lon_w + spacing + offset_EW, lat_s - offset_NS),
(lon_w + spacing + offset_EW, lat_s + spacing + offset_NS),
(lon_w - offset_EW, lat_s + spacing + offset_NS),
]
),
Polygon(
[
(lon_w + 0.1 - offset_EW, lat_s - spacing - offset_NS),
(lon_w + spacing + offset_EW, lat_s - spacing - offset_NS),
(lon_w + spacing + offset_EW, lat_s + offset_NS),
(lon_w + 0.1 - offset_EW, lat_s + offset_NS),
]
),
]
)
function_gdf = gdet_cr_ind.swell_cells_with_buffer(dummy_gdf, offset_EW, offset_NS)
for key in expected_outputs.keys():
np.testing.assert_allclose(
function_gdf[key].values, expected_outputs[key], rtol=0.0, atol=1e-08
)
for i in range(len(expected_geometries)):
assert expected_geometries[i] == function_gdf["geometry"].values[i]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment