Skip to content
Snippets Groups Projects

Resolve "Introduce a new filter for when rules should be run based on the source ID of a building."

Compare and Show latest version
5 files
+ 50
59
Compare changes
  • Side-by-side
  • Inline
Files
5
+ 9
39
@@ -29,14 +29,13 @@ class AbstractRule(abc.ABC):
structure that is or could be necessary for all rules.
Args:
filters_dict (dict):
String representing the source_id_filter for the rule. May contain more than one
filter input, such as the `source_id` and the `geographic_filter_boundary`.
rule_source_ids (list):
A list of all valid source IDs for the rule.
geographic_filter_boundary (str):
Boundary polygon in WKT format defining the geographic boundary of the rule.
"""
def __init__(self, filters_dict: dict = None):
rule_source_ids, geographic_filter_boundary = self.parse_filters(filters_dict)
def __init__(self, rule_source_ids: list = None, geographic_filter_boundary: str = None):
self.geographic_filter_boundary_geometry = None
if geographic_filter_boundary is not None:
@@ -44,9 +43,6 @@ class AbstractRule(abc.ABC):
geographic_filter_boundary
)
if rule_source_ids is not None:
rule_source_ids = [int(source_id) for source_id in rule_source_ids]
self.rule_source_ids = rule_source_ids
@abc.abstractmethod
@@ -68,7+64,7 @@
source_file (str):
Name of the file where the data came from. This can be used to distinguish
how to parse the data.
"""
raise NotImplementedError(
"The `parse_data` method is not implemented for this class, but you are trying "
@@ -76,7+72,7 @@
+ "please implement this class first."
)
@staticmethod
def parse_filters(filters_dict: dict) -> tuple | None:
"""
Parses the source_id_filter string input from the rule's XML file or the file attached
to it. Takes a single string separated by semicolons and splits it into the different
filter inputs it includes.
Args:
filters_dict (dict):
List with all the filter inputs.
Returns:
tuple | None:
A list of one or more strings each of which represents a different filter, or
None.
"""
if bool(filters_dict):
if "source_id" in filters_dict:
source_ids = filters_dict["source_id"].split(",")
else:
source_ids = None
return source_ids, filters_dict["boundary"]
return None, None
@staticmethod
def read_geometry_from_wkt(geographic_filter_boundary):
"""
@@ -128,0+98,0 @@
):
"""
Applies a spatial filter to ensure that only buildings within the provided geographic
polygon are processed and a source filter to ensure source specific rules only run
polygon are processed and a source filter to ensure source-specific rules only run
for inputs with a matching source ID.
Args:
@@ -137,17 +107,17 @@ class AbstractRule(abc.ABC):
latitude (float):
Latitude of the building being processed.
source_id (int):
Integer representing the data's source ID, and determines the rule's relevance.
Integer representing the data-source ID, and determines the rule's relevance.
Returns:
bool:
True if all tests pass and False if at least one doesn't pass.
`True` if all tests pass and `False` if at least one doesn't pass.
"""
# Geographic filter, passes if the inputs coordinates lie within the boundary geometry.
if (
self.geographic_filter_boundary_geometry is not None
and not self.geographic_filter_boundary_geometry.contains(
and self.geographic_filter_boundary_geometry.disjoint(
shapely.geometry.Point(longitude, latitude)
)
):
Loading