Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • globaldynamicexposure/libraries/rule-lib
1 result
Show changes
Commits on Source (2)
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
import abc import abc
from typing import Union
import shapely import shapely
...@@ -29,24 +28,34 @@ class AbstractRule(abc.ABC): ...@@ -29,24 +28,34 @@ class AbstractRule(abc.ABC):
structure that is or could be necessary for all rules. structure that is or could be necessary for all rules.
Args: Args:
rule_source_ids (list):
A list of all valid source IDs for the rule. This is provided by the rule's XML file
if relevant.
geographic_filter_boundary (str): geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule. Boundary polygon in WKT format defining the geographic boundary of the rule.
""" """
def __init__(self, geographic_filter_boundary: str = None): def __init__(
self,
rule_source_ids: list | None = None,
geographic_filter_boundary: str | None = None,
):
self.geographic_filter_boundary_geometry = None self.geographic_filter_boundary_geometry = None
if geographic_filter_boundary is not None: if geographic_filter_boundary is not None:
self.geographic_filter_boundary_geometry = self.read_geometry_from_wkt( self.geographic_filter_boundary_geometry = shapely.from_wkt(
geographic_filter_boundary geographic_filter_boundary
) )
self.rule_source_ids = rule_source_ids
@abc.abstractmethod @abc.abstractmethod
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
""" """
The `__call__` method implements the core process of the rule. The `__call__` method implements the core process of the rule.
""" """
def parse_data(self, data: Union[str, bytes], source_file: str): def parse_data(self, data: str | bytes, source_file: str):
""" """
The `parse_data` method can be used to parse files that are attached to the The `parse_data` method can be used to parse files that are attached to the
rule. As file types can widely vary, this method needs to be implemented for each rule. As file types can widely vary, this method needs to be implemented for each
...@@ -67,41 +76,43 @@ class AbstractRule(abc.ABC): ...@@ -67,41 +76,43 @@ class AbstractRule(abc.ABC):
+ "please implement this class first." + "please implement this class first."
) )
@staticmethod def filter(
def read_geometry_from_wkt(geographic_filter_boundary): self,
""" longitude: float | None = None,
Converts a WKT boundary into a Shapely polygon. latitude: float | None = None,
source_id: int | None = None,
Args: *args,
geographic_filter_boundary (str): **kwargs
Boundary polygon in WKT format defining the geographic boundary of the rule. ):
Returns:
A Shapely polygon based on the geometry.
"""
return shapely.from_wkt(geographic_filter_boundary)
def filter(self, longitude=None, latitude=None, *args, **kwargs):
""" """
Applies a spatial filter to ensure that only buildings within the provided geographic Applies a spatial filter to ensure that only buildings within the provided geographic
polygon are processed. polygon are processed and a source filter to ensure source-specific rules only run
for buildings with a matching source ID.
Args: Args:
longitude (float): longitude (float):
Longitude of the building being processed. Longitude of the building being processed.
latitude (float): latitude (float):
Latitude of the building being processed. Latitude of the building being processed.
source_id (int):
Integer representing the building-source ID.
Returns: Returns:
True if the point is in the boundary and False if it isn't, which skips the rule. bool:
True if all tests pass and False if at least one does not pass.
""" """
if self.geographic_filter_boundary_geometry is None: # Geographic filter, passes if the building's coordinates lie within the boundary
return True # geometry.
elif self.geographic_filter_boundary_geometry.contains( if (
shapely.geometry.Point(longitude, latitude) self.geographic_filter_boundary_geometry is not None
and self.geographic_filter_boundary_geometry.disjoint(
shapely.geometry.Point(longitude, latitude)
)
): ):
return True
else:
return False return False
# Source ID filter, passes if the rule's source ID and the building-source ID match.
if self.rule_source_ids is not None and source_id not in self.rule_source_ids:
return False
return True
...@@ -74,6 +74,7 @@ class Rule: ...@@ -74,6 +74,7 @@ class Rule:
dependencies: List = None, dependencies: List = None,
additional_data: List = None, additional_data: List = None,
db_name: str = None, db_name: str = None,
rule_source_ids: list = None,
geographic_filter_boundary: str = None, geographic_filter_boundary: str = None,
): ):
if dependencies is None: if dependencies is None:
...@@ -102,7 +103,8 @@ class Rule: ...@@ -102,7 +103,8 @@ class Rule:
# Initialize the function, so it can be run. # Initialize the function, so it can be run.
self.function = function_definition[self.name]( self.function = function_definition[self.name](
geographic_filter_boundary=geographic_filter_boundary rule_source_ids=rule_source_ids,
geographic_filter_boundary=geographic_filter_boundary,
) )
# Add extra data that is delivered with the rule. # Add extra data that is delivered with the rule.
...@@ -144,7 +146,6 @@ class Rule: ...@@ -144,7 +146,6 @@ class Rule:
<param type="float"> test_parameter </param> <param type="float"> test_parameter </param>
</input> </input>
<function> squared=test_parameter * test_parameter </function> <function> squared=test_parameter * test_parameter </function>
<filter type="bbox">POLYGON ((60 -10, 60 0, 30 0, 30 -10, 60 -10))</filter>
<dependencies> <dependencies>
<dependency name="DependentOnRule"> <dependency name="DependentOnRule">
</dependencies> </dependencies>
...@@ -152,6 +153,9 @@ class Rule: ...@@ -152,6 +153,9 @@ class Rule:
<output> <output>
<param type="float"> squared </param> <param type="float"> squared </param>
</output> </output>
<filters>
<filter type='boundary'>POLYGON ((60 39, 60 78, 10 78, 10 39, 60 39))</filter>
</filters>
</rule> </rule>
Args: Args:
...@@ -178,17 +182,27 @@ class Rule: ...@@ -178,17 +182,27 @@ class Rule:
inputs = [param.text for param in rule.find("input").findall("param")] inputs = [param.text for param in rule.find("input").findall("param")]
outputs = [param.text for param in rule.find("output").findall("param")] outputs = [param.text for param in rule.find("output").findall("param")]
# Get filter boundary if it exists. # Get filters if they exist.
rule_source_ids = None
geographic_filter_boundary = None geographic_filter_boundary = None
rule_filter = rule.find("filter") rule_filters = rule.find("filters")
if rule_filter is not None: if rule_filters is not None:
filter_filepath = rule_filter.get("filepath") for param in rule_filters.findall("filter"):
if filter_filepath is None: key = param.attrib.get("type")
geographic_filter_boundary = rule_filter.text filter_filepath = param.get("filepath")
else: if filter_filepath is None:
geographic_filter_boundary = file_directory[filter_filepath] filter_content = param.text if param.text is not None else ""
if type(geographic_filter_boundary) is bytes: else:
geographic_filter_boundary = geographic_filter_boundary.decode("utf-8") filter_content = file_directory[filter_filepath]
if type(filter_content) is bytes:
filter_content = filter_content.decode("utf-8")
if key == "source_ids":
filter_content = filter_content.split(",")
filter_content = [int(source_id) for source_id in filter_content]
rule_source_ids = filter_content
elif key == "boundary":
geographic_filter_boundary = filter_content
# Add dependencies if they exist. # Add dependencies if they exist.
dependencies = rule.find("dependencies") dependencies = rule.find("dependencies")
...@@ -216,6 +230,7 @@ class Rule: ...@@ -216,6 +230,7 @@ class Rule:
dependencies, dependencies,
files, files,
db_name, db_name,
rule_source_ids,
geographic_filter_boundary, geographic_filter_boundary,
) )
......
...@@ -133,6 +133,12 @@ class RuleHandler: ...@@ -133,6 +133,12 @@ class RuleHandler:
if rule_result is not None: if rule_result is not None:
args.update(rule_result) args.update(rule_result)
# Check if any command is passed using the `execute_command` key. If set to
# `CANCEL`, terminate the current loop, stopping further processing.
execute_command = args.get("execute_command", "")
if execute_command.upper() == "CANCEL":
break
# Close all database connections. # Close all database connections.
for database in self.databases.values(): for database in self.databases.values():
database.close() database.close()
......
...@@ -30,13 +30,17 @@ logger = logging.getLogger() ...@@ -30,13 +30,17 @@ logger = logging.getLogger()
@pytest.fixture @pytest.fixture
def zip_filepath_without_xml(): def zip_filepath_without_xml():
"""
Returns a filepath to a rule directory without an XML definition.
"""
# Create a temporary ZIP file from the `tests/data/incorrect_xml_path` directory. # Create a temporary ZIP file from the `tests/data/incorrect_xml_path` directory.
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
file_path = os.path.join(tmp_dir, "zip_filepath_without_xml") filepath = os.path.join(tmp_dir, "zip_filepath_without_xml")
shutil.make_archive(file_path, "zip", "tests/data/zip_filepath_without_xml") shutil.make_archive(filepath, "zip", "tests/data/zip_filepath_without_xml")
# Yield filepath. # Yield filepath.
yield file_path + ".zip" yield filepath + ".zip"
# Remove temporary ZIP file. # Remove temporary ZIP file.
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
...@@ -44,13 +48,17 @@ def zip_filepath_without_xml(): ...@@ -44,13 +48,17 @@ def zip_filepath_without_xml():
@pytest.fixture @pytest.fixture
def floorspace_rule(): def floorspace_rule():
"""
Returns the `FloorspaceRule` rule.
"""
# Create a temporary ZIP file from the `tests/data/floorspace` directory # Create a temporary ZIP file from the `tests/data/floorspace` directory
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
file_path = os.path.join(tmp_dir, "floorspace") filepath = os.path.join(tmp_dir, "floorspace")
shutil.make_archive(file_path, "zip", "tests/data/floorspace") shutil.make_archive(filepath, "zip", "tests/data/floorspace")
# Yield rule # Yield rule
yield Rule.load_rule_from_zip(open(file_path + ".zip", "rb")) yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file # Remove temporary ZIP file
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
...@@ -58,13 +66,17 @@ def floorspace_rule(): ...@@ -58,13 +66,17 @@ def floorspace_rule():
@pytest.fixture @pytest.fixture
def apartments_rule(): def apartments_rule():
"""
Returns the `ApartmentsRule` rule.
"""
# Create a temporary ZIP file from the `tests/data/apartments` directory # Create a temporary ZIP file from the `tests/data/apartments` directory
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
file_path = os.path.join(tmp_dir, "apartments") filepath = os.path.join(tmp_dir, "apartments")
shutil.make_archive(file_path, "zip", "tests/data/apartments") shutil.make_archive(filepath, "zip", "tests/data/apartments")
# Yield rule # Yield rule
yield Rule.load_rule_from_zip(open(file_path + ".zip", "rb")) yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file # Remove temporary ZIP file
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
...@@ -72,13 +84,17 @@ def apartments_rule(): ...@@ -72,13 +84,17 @@ def apartments_rule():
@pytest.fixture @pytest.fixture
def apartment_size_rule(): def apartment_size_rule():
"""
Returns the `ApartmentSizeRule` rule.
"""
# Create a temporary ZIP file from the `tests/data/apartments_size` directory # Create a temporary ZIP file from the `tests/data/apartments_size` directory
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
file_path = os.path.join(tmp_dir, "apartment_size") filepath = os.path.join(tmp_dir, "apartment_size")
shutil.make_archive(file_path, "zip", "tests/data/apartment_size") shutil.make_archive(filepath, "zip", "tests/data/apartment_size")
# Yield rule # Yield rule
yield Rule.load_rule_from_zip(open(file_path + ".zip", "rb")) yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file # Remove temporary ZIP file
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
...@@ -86,17 +102,59 @@ def apartment_size_rule(): ...@@ -86,17 +102,59 @@ def apartment_size_rule():
@pytest.fixture @pytest.fixture
def building_tags_rule(): def building_tags_rule():
"""
Returns the `TagRule` rule.
"""
yield Rule.load_rule_from_xml(open("./tests/data/building_tags.xml", "r").read()) yield Rule.load_rule_from_xml(open("./tests/data/building_tags.xml", "r").read())
@pytest.fixture @pytest.fixture
def boundary_filter_rule(): def boundary_filter_rule():
"""
Returns the `BoundaryTestRule` rule.
"""
tmp_dir = tempfile.mkdtemp()
filepath = os.path.join(tmp_dir, "boundary_filter")
shutil.make_archive(filepath, "zip", "tests/data/boundary_filter")
# Yield rule.
yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file.
shutil.rmtree(tmp_dir, ignore_errors=True)
@pytest.fixture
def source_id_filter_rule():
"""
Returns the `SourceIDFilterRule` rule.
"""
tmp_dir = tempfile.mkdtemp()
filepath = os.path.join(tmp_dir, "source_id_filter")
shutil.make_archive(filepath, "zip", "tests/data/source_id_filter")
# Yield rule.
yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file.
shutil.rmtree(tmp_dir, ignore_errors=True)
@pytest.fixture
def execute_cancel_command_rule():
"""
Returns the `ExecuteCancelCommandRule` rule.
"""
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
file_path = os.path.join(tmp_dir, "boundary_filter") filepath = os.path.join(tmp_dir, "execute_cancel_command")
shutil.make_archive(file_path, "zip", "tests/data/boundary_filter") shutil.make_archive(filepath, "zip", "tests/data/execute_cancel_command")
# Yield rule. # Yield rule.
yield Rule.load_rule_from_zip(open(file_path + ".zip", "rb")) yield Rule.load_rule_from_zip(open(filepath + ".zip", "rb"))
# Remove temporary ZIP file. # Remove temporary ZIP file.
shutil.rmtree(tmp_dir, ignore_errors=True) shutil.rmtree(tmp_dir, ignore_errors=True)
......
...@@ -5,5 +5,7 @@ ...@@ -5,5 +5,7 @@
<function filepath="boundary_filter.py"/> <function filepath="boundary_filter.py"/>
<output> <output>
</output> </output>
<filter>POLYGON ((60 39, 60 78, 10 78, 10 39, 60 39))</filter> <filters>
<filter type='boundary'>POLYGON ((60 39, 60 78, 10 78, 10 39, 60 39))</filter>
</filters>
</rule> </rule>
#!/usr/bin/env python3
# Copyright (C) 2024:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
from rulelib import AbstractRule
class ExecuteCancelCommandRule(AbstractRule):
def __call__(self, *args, **kwargs):
"""
This rule cancels any further processing when passed to the `RuleHandler`.
"""
return {"execute_command": "CANCEL"}
<?xml version="1.0" encoding="UTF-8" ?>
<rule name="ExecuteCancelCommandRule" category="building">
<input/>
<function filepath="execute_cancel_command.py"/>
<output/>
</rule>
<?xml version="1.0" encoding="UTF-8" ?>
<rule name="SourceIDFilterRule" category="building">
<input/>
<function filepath="source_id_filter.py"/>
<output/>
<filters>
<filter type='source_ids'>0</filter>
<filter type='boundary'>POLYGON ((60 39, 60 78, 10 78, 10 39, 60 39))</filter>
</filters>
</rule>
#!/usr/bin/env python3
# Copyright (C) 2024:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
from rulelib import AbstractRule
class SourceIDFilterRule(AbstractRule):
def __call__(self, *args, **kwargs):
return True
...@@ -139,12 +139,12 @@ def test_boundary_filter(boundary_filter_rule): ...@@ -139,12 +139,12 @@ def test_boundary_filter(boundary_filter_rule):
# Set of tests with initial geometry, one point inside, one outside and four points on the # Set of tests with initial geometry, one point inside, one outside and four points on the
# borders. # borders.
test_data = [ test_data = [
[60, 45, False], # Point in the boundary. [60, 45, True], # Point on the boundary.
[59, 45, True], # Point inside the polygon. [59, 45, True], # Point inside the polygon.
[61, 45, False], # Point outside the polygon. [61, 45, False], # Point outside the polygon.
[59, 39, False], # Point in the boundary. [59, 39, True], # Point on the boundary.
[10, 45, False], # Point in the boundary. [10, 45, True], # Point on the boundary.
[59, 78, False], # Point in the boundary. [59, 78, True], # Point on the boundary.
] ]
for lon, lat, result in test_data: for lon, lat, result in test_data:
filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat) filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat)
...@@ -173,8 +173,8 @@ def test_boundary_filter(boundary_filter_rule): ...@@ -173,8 +173,8 @@ def test_boundary_filter(boundary_filter_rule):
] ]
for geom_wkt, lon, lat, result in test_data: for geom_wkt, lon, lat, result in test_data:
boundary_filter_rule.function.geographic_filter_boundary_geometry = ( boundary_filter_rule.function.geographic_filter_boundary_geometry = shapely.from_wkt(
boundary_filter_rule.function.read_geometry_from_wkt(geom_wkt) geom_wkt
) )
filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat) filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat)
assert filter_output == result, ( assert filter_output == result, (
...@@ -210,9 +210,55 @@ def test_boundary_filter(boundary_filter_rule): ...@@ -210,9 +210,55 @@ def test_boundary_filter(boundary_filter_rule):
for geom_wkt, lon, lat, result in test_data: for geom_wkt, lon, lat, result in test_data:
with pytest.raises(result): with pytest.raises(result):
boundary_filter_rule.geographic_filter_boundary_geometry = ( boundary_filter_rule.geographic_filter_boundary_geometry = shapely.from_wkt(
boundary_filter_rule.function.read_geometry_from_wkt(geom_wkt) geom_wkt
) )
filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat) filter_output = boundary_filter_rule.function.filter(longitude=lon, latitude=lat)
assert filter_output == result, f"The input geometry `{geom_wkt}` or point " assert filter_output == result, f"The input geometry `{geom_wkt}` or point "
f"`{lon, lat}` have invalid values." f"`{lon, lat}` have invalid values."
def test_source_id_filter(source_id_filter_rule):
"""
Test the source-ID filter, which ensures that rules which are only relevant for data from
certain sources are only run for those sources. The test should pass with inputs from the
spatial boundary filter and without them.
"""
test_data = [
[59, 45, 0, True],
[59, 45, 1, False],
[60, 45, 2, False],
[59, 45, "3", False],
]
# Set of tests with the initial rule source ID, with valid and invalid source IDs and
# coordinates.
for lon, lat, source_id, expected_result in test_data:
filter_output = source_id_filter_rule.function.filter(
longitude=lon, latitude=lat, source_id=source_id
)
assert filter_output == expected_result, (
f"The result `{filter_output}` does not match the expected output "
f"`{expected_result}` for coordinates {lon}, {lat} and source ID {source_id}."
)
test_data = [
[[1], 0, False],
[[1], 1, True],
[["1"], 1, False],
[[0], None, False],
[None, 8, True],
[None, None, True],
]
for rule_source_id, source_id, expected_result in test_data:
source_id_filter_rule.function.geographic_filter_boundary_geometry = None
source_id_filter_rule.function.rule_source_ids = rule_source_id
filter_output = source_id_filter_rule.function.filter(source_id=source_id)
assert filter_output == expected_result, (
f"The result `{filter_output}` does not match the expected output "
f"'{expected_result}' for source ID {source_id} and rule source ID "
f"{rule_source_id}."
)
...@@ -200,3 +200,25 @@ def test_fail_rule_dependencies(rule_handler, apartments_rule): ...@@ -200,3 +200,25 @@ def test_fail_rule_dependencies(rule_handler, apartments_rule):
with pytest.raises(ValueError): with pytest.raises(ValueError):
rule_handler.register(apartments_rule) rule_handler.register(apartments_rule)
def test_execute_cancel_command(rule_handler, execute_cancel_command_rule, floorspace_rule):
"""
Test if the `eval` method inside the `RuleHandler` is stopped, when the value `CANCEL` is
returned by the `ExecuteCancelCommandRule`.
"""
# Test if the `floorspace_rule` functions without registering the
# `execute_cancel_command_rule`.
result = rule_handler.eval({"footprint": 100.0, "floors": 2})
assert result["area"] == 200.0
rule_handler.register(execute_cancel_command_rule)
# Overwrite the dependencies of the `floorspace_rule` and re-register.
floorspace_rule.dependencies = ["ExecuteCancelCommandRule"]
rule_handler.register(floorspace_rule, overwrite=True)
# Test if the processing of the rules is cancelled before the `floorspace_rule` is executed.
result = rule_handler.eval({"footprint": 100.0, "floors": 2})
assert "area" not in result