Commit b6484603 authored by Marius Kriegerowski's avatar Marius Kriegerowski
Browse files

restructure the tag generation

parent 93562cbe
......@@ -21,6 +21,7 @@ from __future__ import annotations
import csv
import logging
from pathlib import Path
from typing import Optional
from rabotnik import Rule
from rabotnik.storages.base import StorageBase
......@@ -39,6 +40,14 @@ class TagResult(Exception):
self.tag = tag
def apply_rules(rules, occupancies: TagStatistics) -> Optional[GEMTag]:
for rule in rules:
try:
rule.apply(occupancies)
except TagResult as result:
return result.tag
class OverridingOccupancy:
"""Takes precedence over other mappings.
......@@ -74,14 +83,16 @@ class OverridingOccupancy:
class RuleOneUniqueTag:
def apply(self, occupancies: TagStatistics) -> None:
@staticmethod
def apply(occupancies: TagStatistics) -> None:
"""raises `TagResult` if there is exactly one unique group"""
if occupancies.exactly_one_unique_tag():
raise TagResult(occupancies.tags[0])
class RulesOneUniqueSubGroup:
def apply(self, occupancies: TagStatistics) -> None:
@staticmethod
def apply(occupancies: TagStatistics) -> None:
if (
occupancies.number_of_unique_groups == 1
and occupancies.number_of_unique_subgroups <= 2
......@@ -100,20 +111,38 @@ class GetBuildingOccupancy(Rule):
def __init__(self, storage: StorageBase, occupancy_mapper: OccupancyMapper):
self.storage = storage
self.occupancy_mapper = occupancy_mapper
self.mappers = [
occupancy_mapper.landuse_mapper(),
occupancy_mapper.building_poi_mapper(),
]
self.candidates = [
OverridingOccupancy.overriding_occupancies(),
RuleOneUniqueTag,
RulesOneUniqueSubGroup,
]
async def evaluate(self, payload: dict) -> list[str]:
async def evaluate(self, payload: dict) -> GEMTag:
building_id = payload["building_id"]
logger.debug("Processing building: %s", building_id)
tags = await self.storage.expect_one(
f"SELECT tags FROM osm_building_relations WHERE osm_id={building_id} AND index=0"
)
occupancies = self.occupancy_mapper.apply(tags)
# Mapping to GEM taxonomies
occupancies = []
for mapper in self.mappers:
occupancies.extend(mapper.apply(tags))
occupancies = TagStatistics.from_strings(occupancies)
# Apply rules to find ultimate taxonomy
occupancy = apply_rules(self.candidates, occupancies)
logger.debug("occupancies %s: %s", building_id, occupancy)
return occupancy
logger.debug("occupancies %s: %s", building_id, occupancies)
return occupancies
async def apply_mappings(self, tags):
for rule in self.candidates:
rule.apply(tags)
......@@ -40,7 +40,8 @@ async def test_rule_get_building_occupancy(storage_consumer, building_poi_mapper
payload = {"building_id": -6744517}
result = await rule.evaluate(payload=payload)
assert result == ["ASS4", "UNDECIDABLE"]
assert result == ["ASS4"]
def test_overriding_occupancy(overriding_occupancies: OverridingOccupancy):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment