Skip to content
Snippets Groups Projects

Resolve "Introduce a new filter for when rules should be run based on the source ID of a building."

Compare and Show latest version
6 files
+ 116
132
Compare changes
  • Side-by-side
  • Inline
Files
6
+ 30
76
@@ -18,7 +18,7 @@
import abc
from typing import Union
from typing import Union, Optional
import shapely
@@ -29,22 +29,26 @@ class AbstractRule(abc.ABC):
structure that is or could be necessary for all rules.
Args:
filter_string (str):
String representing the source_id_filter for the rule. May contain more than one
filter input, such as the `source_id` and the `geographic_filter_boundary`.
rule_source_ids (list):
A list of all valid source IDs for the rule. This is provided by the rule's XML file
if relevant.
geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule.
"""
def __init__(self, filter_string: str = None):
rule_source_id, geographic_filter_boundary = self.parse_filters(filter_string)
def __init__(
self,
rule_source_ids: Optional[list | None] = None,
geographic_filter_boundary: Optional[str | None] = None,
):
self.geographic_filter_boundary_geometry = None
if geographic_filter_boundary is not None:
self.geographic_filter_boundary_geometry = self.read_geometry_from_wkt(
self.geographic_filter_boundary_geometry = shapely.from_wkt(
geographic_filter_boundary
)
self.rule_source_id = rule_source_id
self.rule_source_ids = rule_source_ids
@abc.abstractmethod
def __call__(self, *args, **kwargs):
@@ -73,56 +77,17 @@ class AbstractRule(abc.ABC):
+ "please implement this class first."
)
@staticmethod
def parse_filters(filter_string: str) -> tuple | None:
"""
Parses the source_id_filter string input from the rule's XML file or the file attached to it.
Takes a single string separated by semicolons and splits it into the different filter
inputs it includes.
Args:
filter_string (str):
String with all the filter inputs, separated by semicolons.
Returns:
tuple | None:
A list of one or more strings each of which represents a different filter, or
None.
"""
if filter_string is not None:
source_id, geographic_filter_boundary = filter_string.split(";")
return (int(source_id) if source_id != "" else None), geographic_filter_boundary
return None, None
@staticmethod
def read_geometry_from_wkt(geographic_filter_boundary):
"""
Converts a WKT boundary into a Shapely polygon.
Args:
geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule.
Returns:
shapely.geometry.polygon.Polygon:
A Shapely polygon based on the WKT string geometry.
"""
return shapely.from_wkt(geographic_filter_boundary)
def filter(
self,
longitude: float = None,
latitude: float = None,
source_id: int = None,
longitude: Optional[float | None] = None,
latitude: Optional[float | None] = None,
source_id: Optional[int | None] = None,
*args,
**kwargs
):
"""
Applies a spatial filter to ensure that only buildings within the provided geographic
polygon are processed and a source filter to ensure source specific rules only run
polygon are processed and a source filter to ensure source-specific rules only run
for inputs with a matching source ID.
Args:
@@ -131,34 +96,23 @@ class AbstractRule(abc.ABC):
latitude (float):
Latitude of the building being processed.
source_id (int):
Integer representing the data source for which a rule is relevant.
Integer representing the input-source ID.
Returns:
bool:
True if all tests pass and False if at least one doesn't pass.
`True` if all tests pass and `False` if at least one doesn't pass.
"""
filter_list = []
# Geographic filter, passes if the inputs coordinates lie within the boundary geometry.
if self.geographic_filter_boundary_geometry is None:
filter_list.append(True)
elif self.geographic_filter_boundary_geometry.contains(
shapely.geometry.Point(longitude, latitude)
if (
self.geographic_filter_boundary_geometry is not None
and self.geographic_filter_boundary_geometry.disjoint(
shapely.geometry.Point(longitude, latitude)
)
):
filter_list.append(True)
else:
filter_list.append(False)
# Source ID filter, passes if the rules and the inputs source ID match.
if self.rule_source_id is None:
filter_list.append(True)
elif self.rule_source_id == source_id:
filter_list.append(True)
else:
filter_list.append(False)
if all(filter_list):
return True
return False
return False
# Source ID filter, passes if the rule's source ID and the input-source ID match.
if self.rule_source_ids is not None and source_id not in self.rule_source_ids:
return False
return True
Loading