Skip to content
Snippets Groups Projects

Resolve "Introduce a new filter for when rules should be run based on the source ID of a building."

Compare and Show latest version
6 files
+ 129
145
Compare changes
  • Side-by-side
  • Inline
Files
6
+ 38
79
@@ -18,7 +18,6 @@
import abc
from typing import Union
import shapely
@@ -29,22 +28,26 @@ class AbstractRule(abc.ABC):
structure that is or could be necessary for all rules.
Args:
filter_string (str):
String representing the source_id_filter for the rule. May contain more than one filter
input, such as the `source_id` and the `geographic_filter_boundary`.
rule_source_ids (list):
A list of all valid source IDs for the rule. This is provided by the rule's XML file
if relevant.
geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule.
"""
def __init__(self, filter_string:str = None):
rule_source_id, geographic_filter_boundary = self.parse_filters(filter_string)
def __init__(
self,
rule_source_ids: list | None = None,
geographic_filter_boundary: str | None = None,
):
self.geographic_filter_boundary_geometry = None
if geographic_filter_boundary is not None:
self.geographic_filter_boundary_geometry = self.read_geometry_from_wkt(
self.geographic_filter_boundary_geometry = shapely.from_wkt(
geographic_filter_boundary
)
self.rule_source_id = rule_source_id
self.rule_source_ids = rule_source_ids
@abc.abstractmethod
def __call__(self, *args, **kwargs):
@@ -52,7 +55,7 @@ class AbstractRule(abc.ABC):
The `__call__` method implements the core process of the rule.
"""
def parse_data(self, data: Union[str, bytes], source_file: str):
def parse_data(self, data: str | bytes, source_file: str):
"""
The `parse_data` method can be used to parse files that are attached to the
rule. As file types can widely vary, this method needs to be implemented for each
@@ -73,52 +76,18 @@ class AbstractRule(abc.ABC):
+ "please implement this class first."
)
@staticmethod
def parse_filters(filter_string: str) -> tuple | None:
"""
Parses the source_id_filter string input from the rule's XML file or the file attached to it.
Takes a single string separated by semicolons and splits it into the different filter
inputs it includes.
Args:
filter_string (str):
String with all the filter inputs, separated by semicolons.
Returns:
tuple | None:
A list of one or more strings each of which represents a different filter, or
None.
"""
if filter_string is not None:
source_id, geographic_filter_boundary = filter_string.split(';')
return (int(source_id) if source_id != "" else None), geographic_filter_boundary
return None, None
@staticmethod
def read_geometry_from_wkt(geographic_filter_boundary):
"""
Converts a WKT boundary into a Shapely polygon.
Args:
geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule.
Returns:
shapely.geometry.polygon.Polygon:
A Shapely polygon based on the WKT string geometry.
"""
return shapely.from_wkt(geographic_filter_boundary)
def filter(self, longitude:float =None, latitude:float =None, source_id:int =None, *args,
**kwargs):
def filter(
self,
longitude: float | None = None,
latitude: float | None = None,
source_id: int | None = None,
*args,
**kwargs
):
"""
Applies a spatial filter to ensure that only buildings within the provided geographic
polygon are processed and a source filter to ensure source specific rules only run
for inputs with a matching source ID.
polygon are processed and a source filter to ensure source-specific rules only run
for buildings with a matching source ID.
Args:
longitude (float):
@@ -126,34 +95,24 @@ class AbstractRule(abc.ABC):
latitude (float):
Latitude of the building being processed.
source_id (int):
Integer representing the data source for which a rule is relevant.
Integer representing the building-source ID.
Returns:
bool:
True if all tests pass and False if at least one doesn't pass.
True if all tests pass and False if at least one does not pass.
"""
filter_list = []
# Geographic filter, passes if the inputs coordinates lie within the boundary geometry.
if self.geographic_filter_boundary_geometry is None:
filter_list.append(True)
elif self.geographic_filter_boundary_geometry.contains(
shapely.geometry.Point(longitude, latitude)
# Geographic filter, passes if the building's coordinates lie within the boundary
# geometry.
if (
self.geographic_filter_boundary_geometry is not None
and self.geographic_filter_boundary_geometry.disjoint(
shapely.geometry.Point(longitude, latitude)
)
):
filter_list.append(True)
else:
filter_list.append(False)
# Source ID filter, passes if the rules and the inputs source ID match.
if self.rule_source_id is None:
filter_list.append(True)
elif self.rule_source_id == source_id:
filter_list.append(True)
else:
filter_list.append(False)
if all(filter_list):
return True
return False
\ No newline at end of file
return False
# Source ID filter, passes if the rule's source ID and the building-source ID match.
if self.rule_source_ids is not None and source_id not in self.rule_source_ids:
return False
return True
Loading