Commit 93562cbe authored by Marius Kriegerowski's avatar Marius Kriegerowski
Browse files

GEM rule refactoring

parent 3dce5c7b
......@@ -24,11 +24,12 @@ import argparse
from rabotnik import Rabotnik, Assembly
from rabotnik.bus import MessageBus
# These need to be absolute imports. Otherwise, celery will fail to load them
from rabotnikobm.rules import GetBuilding
from rabotnikobm.rules import GetFloorspace
from rabotnikobm.rules import GetBuildingLandUse
from rabotnikobm.rules import GetPointsInBuilding
from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper
from .rules import GetBuilding
from .rules import GetFloorspace
from .rules import GetBuildingLandUse
from .rules import GetPointsInBuilding
logger = logging.getLogger(__name__)
......@@ -65,11 +66,11 @@ async def start_rabotnik_gem_occupancy(
message_bus, storage_consumer, storage_contributor, n_processes_max
):
fn_mapping = "building_and_PoIs_tags.csv"
occupancy_mapper = OccupancyMapper.from_csv(fn=fn_mapping)
rules = [
GetBuilding(storage_consumer, storage_contributor),
GetFloorspace(storage_consumer, storage_contributor),
GetBuildingLandUse(storage_consumer),
GetPointsInBuilding(storage_consumer),
GetBuildingOccupancy(storage_consumer, occupancy_mapper=occupancy_mapper),
]
rules = Assembly(rules=rules, n_processes_max=n_processes_max)
......@@ -84,7 +85,6 @@ async def start_rabotnik(args):
# await start_rabotnik_obm(
# message_bus, storage_consumer, storage_contributor, args.n_processes_max
# )
await
def main():
......
......@@ -20,13 +20,16 @@ from __future__ import annotations
import csv
import logging
from pathlib import Path
from rabotnik import Rule
from rabotnik.storages.base import StorageBase
from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper, TagStatistics, GEMTag
logger = logging.getLogger()
logger = logging.getLogger(__name__)
MODULE_PATH = Path(__file__).parent
class TagResult(Exception):
......@@ -54,7 +57,7 @@ class OverridingOccupancy:
raise TagResult(candidate)
@classmethod
def from_csv(cls, fn: str) -> OverridingOccupancy:
def from_csv(cls, fn: Path) -> OverridingOccupancy:
"""Read a csv and initialize a `OverridingOccupancy` instance."""
with open(fn) as csvfile:
......@@ -64,25 +67,32 @@ class OverridingOccupancy:
return cls(mapping=occupancy_mapping)
@classmethod
def overriding_occupancies(cls):
fn = MODULE_PATH / "data/overriding_occupancies.csv"
return cls.from_csv(fn)
def check_exactly_one_unique_tag(occupancies: TagStatistics):
"""raises `TagResult` if there is exactly one unique group"""
if occupancies.exactly_one_unique_tag():
raise TagResult(occupancies.tags[0])
class RuleOneUniqueTag:
def apply(self, occupancies: TagStatistics) -> None:
"""raises `TagResult` if there is exactly one unique group"""
if occupancies.exactly_one_unique_tag():
raise TagResult(occupancies.tags[0])
def check_one_unique_sub_group(occupancies: TagStatistics):
print(occupancies)
print(occupancies.exactly_one_unique_tag())
print(occupancies.number_of_unique_groups)
if occupancies.number_of_unique_groups == 1 and occupancies.number_of_unique_subgroups <= 2:
raise TagResult(
GEMTag(
group=occupancies.unique_group,
sub_group=occupancies.unique_sub_group,
sub_sub_group=occupancies.unique_sub_sub_group,
class RulesOneUniqueSubGroup:
def apply(self, occupancies: TagStatistics) -> None:
if (
occupancies.number_of_unique_groups == 1
and occupancies.number_of_unique_subgroups <= 2
):
raise TagResult(
GEMTag(
group=occupancies.unique_group,
sub_group=occupancies.unique_sub_group,
sub_sub_group=occupancies.unique_sub_sub_group,
)
)
)
class GetBuildingOccupancy(Rule):
......@@ -92,6 +102,10 @@ class GetBuildingOccupancy(Rule):
self.storage = storage
self.occupancy_mapper = occupancy_mapper
self.candidates = [
OverridingOccupancy.overriding_occupancies(),
]
async def evaluate(self, payload: dict) -> list[str]:
building_id = payload["building_id"]
logger.debug("Processing building: %s", building_id)
......@@ -100,5 +114,6 @@ class GetBuildingOccupancy(Rule):
)
occupancies = self.occupancy_mapper.apply(tags)
logger.debug("occupancies %s: %s", building_id, occupancies)
return occupancies
......@@ -18,6 +18,7 @@
from __future__ import annotations
import re
import csv
from pathlib import Path
from collections import defaultdict, namedtuple, Counter
from dataclasses import dataclass
from typing import Optional
......@@ -29,6 +30,9 @@ from typing import Optional
REGEX_GEM_GROUP = r"(?P<group>^[A-Z]*)(?P<sub_group>[0-9]*)(?P<sub_sub_group>[A-Z]*$)"
MODULE_PATH = Path(__file__).parent
class GEMTagException(Exception):
pass
......@@ -107,7 +111,8 @@ class TagStatistics(
@property
def unique_group(self) -> str:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
"""Get the unique group ID. In case of multiple available groups,
raises GEMTagException."""
if self.number_of_unique_groups > 1:
raise GEMTagException(f"{self} has more than unique group.")
......@@ -116,7 +121,8 @@ class TagStatistics(
@property
def unique_sub_group(self) -> Optional[int]:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
"""Get the unique group ID. In case of multiple available groups,
raises GEMTagException."""
if len(self.subgroups) > 1:
raise GEMTagException(f"{self} has more than unique sub_group.")
......@@ -128,7 +134,8 @@ class TagStatistics(
@property
def unique_sub_sub_group(self) -> Optional[str]:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
"""Get the unique group ID. In case of multiple available groups,
raises GEMTagException."""
if len(self.subsubgroups) > 1:
raise GEMTagException(f"{self} has more than unique sub_sub_group.")
......@@ -205,19 +212,22 @@ class OccupancyMapper:
return occupancies
@classmethod
def read_csv(cls, fn: str) -> list[dict[str, str]]:
"""Read content from csv into list of dicts."""
def from_csv(cls, fn: Path) -> OccupancyMapper:
"""Read a csv and initialize a `OccupancyMapper`."""
with open(fn) as csvfile:
mapping = list(csv.DictReader(csvfile))
occupancy_mapping = list(csv.DictReader(csvfile))
return mapping
@classmethod
def from_csv(cls, fn: str) -> OccupancyMapper:
"""Read a csv and initialize a `OccupancyMapper`."""
occupancy_mapping = cls.read_csv(fn)
occupancy_mapping_grouped = group_tags(occupancy_mapping)
return cls(mapping=occupancy_mapping_grouped)
@classmethod
def landuse_mapper(cls: OccupancyMapper):
fn_mapping = MODULE_PATH / "data/landuse_tags.csv"
return cls.from_csv(fn_mapping)
@classmethod
def building_poi_mapper(cls: OccupancyMapper):
fn_mapping = MODULE_PATH / "data/building_and_PoIs_tags.csv"
return cls.from_csv(fn_mapping)
......@@ -51,21 +51,14 @@ def storage_consumer(pytestconfig):
@pytest.fixture
def gem_data_path(pytestconfig):
yield pytestconfig.rootpath / "rabotnikobm/rules/gem_occupancy/data"
@pytest.fixture
def building_poi_mapper(gem_data_path):
fn_mapping = gem_data_path / "building_and_PoIs_tags.csv"
mapper = OccupancyMapper.from_csv(fn_mapping)
def building_poi_mapper():
mapper = OccupancyMapper.building_poi_mapper()
yield mapper
@pytest.fixture
def landuse_mapper(gem_data_path):
fn_mapping = gem_data_path / "landuse_tags.csv"
mapper = OccupancyMapper.from_csv(fn_mapping)
def landuse_mapper():
mapper = OccupancyMapper.landuse_mapper()
yield mapper
......
......@@ -20,20 +20,17 @@ import pytest
from rabotnikobm.rules.gem_occupancy.get_building_occupancy import (
OverridingOccupancy,
GetBuildingOccupancy,
check_exactly_one_unique_tag,
RuleOneUniqueTag,
TagStatistics,
TagResult,
check_one_unique_sub_group,
RulesOneUniqueSubGroup,
)
from rabotnikobm.rules.gem_occupancy.mapping import GEMTag
@pytest.fixture()
def overriding_occupancies(gem_data_path):
fn_mapping = gem_data_path / "overriding_occupancies.csv"
overriding_occupancies = OverridingOccupancy.from_csv(fn_mapping)
yield overriding_occupancies
def overriding_occupancies():
yield OverridingOccupancy.overriding_occupancies()
@pytest.mark.requires_storage
......@@ -46,15 +43,14 @@ async def test_rule_get_building_occupancy(storage_consumer, building_poi_mapper
assert result == ["ASS4", "UNDECIDABLE"]
def test_overriding_occupancy(overriding_occupancies):
def test_overriding_occupancy(overriding_occupancies: OverridingOccupancy):
"""Rule #1"""
demo_tags = TagStatistics.from_strings(["ASS1", "COM10"])
try:
occupancy = overriding_occupancies.apply(demo_tags)
overriding_occupancies.apply(demo_tags)
except TagResult as e:
occupancy = e.tag
assert occupancy == GEMTag.from_string("COM10")
assert occupancy == GEMTag.from_string("COM10")
def test_overriding_occupancy_unknown():
......@@ -65,20 +61,20 @@ def test_overriding_occupancy_unknown():
def test_unique_tags():
"""Rule"""
rule = RuleOneUniqueTag()
tags = TagStatistics.from_strings(["COM", "COM"])
with pytest.raises(TagResult) as e:
check_exactly_one_unique_tag(tags)
rule.apply(tags)
assert e.value.tag.group == "COM"
def test_rule2():
"""Rule #2"""
rule = RulesOneUniqueSubGroup()
tags = TagStatistics.from_strings(["RES", "RES1"])
with pytest.raises(TagResult) as e:
check_one_unique_sub_group(tags)
rule.apply(tags)
assert e.value.tag.group == "RES"
assert e.value.tag.sub_group == 1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment