add rules

# along with this program. If not, see
from __future__ import annotations
import logging
import csv
from typing import Optional
from rabotnik import Rule
from rabotnik.storages.base import StorageBase
from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper
from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper, TagStatistics, GEMTag
logger = logging.getLogger()
class TagResult(Exception):
def __init__(self, tag: GEMTag):
self.tag = tag
class OverridingOccupancy:
"""Takes precedence over other mappings.
return cls(mapping=occupancy_mapping)
def check_exactly_one_unique_tag(occupancies: TagStatistics):
"""raises `TagResult` if there is exactly one unique type"""
if occupancies.exactly_one_unique_tag:
raise TagResult(occupancies.tags[0])
def check_one_unique_sub_type(occupancies: TagStatistics):
if occupancies.exactly_one_unique_type() and occupancies.number_of_sub_groups == 1:
# TODO: find raise TagResult(occupancies....)
raise TagResult("ASDASDF")
class GetBuildingOccupancy(Rule):
"""A rule to map OSM tags to building occupancies."""
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
from __future__ import annotations
import re
import csv
from collections import defaultdict
from collections import defaultdict, namedtuple, Counter
from dataclasses import dataclass
from typing import Optional
# Regular expression to extract group (0-N characters at beginning)
# and sub_group (0-N digits at end) of GEM tags
REGEX_GEM_GROUP = r"(?P<group>^[A-Z]*)(?P<sub_group>[0-9]*$)"
class GEMTag:
"""Represents a single GEM classification
group (str): multi-character GEM classifier
sub_group (int): multi-digit GEM classifier sub-group
group: str
sub_group: Optional[int]
def from_string(cls, tag_as_string: str) -> GEMTag:
"""Instantiate `GEMClassification`s from GEM occupancy string."""
matched =, tag_as_string, re.IGNORECASE)
group = str("group"))
assert len(group) > 0, f"could not extract a GEM group from {tag_as_string}"
sub_group ="sub_group")
sub_group = int(sub_group) if sub_group != "" else None
return cls(group, sub_group)
def __hash__(self):
return hash((, self.sub_group))
class TagStatistics(
def exactly_one_unique_tag(self) -> bool:
return self.number_of_unique_tags == 1
def number_of_unique_tags(self) -> int:
return len(self.tags_counter)
def number_of_unique_types(self) -> int:
return len(self.tags_counter)
def number_of_unique_subtypes(self) -> int:
return len(self.subtypes_counter)
def from_tags_string(cls, tags: str):
"""Analyse a string containing tags separated by `|` and do statistics."""
tags = [GEMTag.from_string(tag) for tag in tags.split("|")]
tags_counter = Counter(tags)
types = [ for tag in tags]
types_counter = Counter(types)
subtypes = [tag.sub_group for tag in tags]
subtypes_counter = Counter(subtypes)
return cls(
def group_tags(occupancy_mapping: list[dict[str, str]]) -> dict[str, dict[str, str]]:
def apply(self, osm_tags: dict[dict[str, str]]) -> list[str]:
"""Map `osm_tags` to lists of occupancies as defined in
building_and_POIs_tags.csv and landuse_tags.csv. Tags may be duplicated.
`building_and_POIs_tags.csv` and `landuse_tags.csv`. Tags may be duplicated.
osm_tags: list of OSM building tag strings associated with a specific building
from rabotnikobm.rules.gem_occupancy.get_building_occupancy import (
......@@ -42,12 +45,24 @@ async def test_rule_get_building_occupancy(storage_consumer, building_poi_mapper
def test_overriding_occupancy(overriding_occupancies):
"""Rule #1"""
demo_tags = ["ASS1", "COM10"]
occupancy = overriding_occupancies.apply(demo_tags)
assert occupancy == "COM10"
def test_overriding_occupancy_unknown(overriding_occupancies):
"""Rule #1"""
demo_tags = ["unknown tag"]
occupancy = overriding_occupancies.apply(demo_tags)
assert occupancy is None
def test_unique_tags():
"""Rule #2"""
tags = TagStatistics.from_tags_string("COM|COM")
with pytest.raises(TagResult) as e:
assert == "COM"
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
from rabotnikobm.rules.gem_occupancy.mapping import group_tags
from rabotnikobm.rules.gem_occupancy.mapping import group_tags, GEMTag, TagStatistics
import pytest
def test_tag_statistics():
stats = TagStatistics.from_tags_string("COM|COM")
assert stats.exactly_one_unique_tag() is True
stats = TagStatistics.from_tags_string("COM|ASS")
assert stats.exactly_one_unique_tag() is False
def test_tag_hash():
assert hash(GEMTag.from_string("COM")) == hash(GEMTag.from_string("COM"))
assert hash(GEMTag.from_string("COM1")) != hash(GEMTag.from_string("COM"))
def test_gem_classification_parser():
assert GEMTag.from_string("COM").group == "COM"
assert GEMTag.from_string("COM").sub_group is None
assert GEMTag.from_string("COM1").group == "COM"
assert GEMTag.from_string("COM1").sub_group == 1
assert GEMTag.from_string("COM11").sub_group == 11
assert GEMTag.from_string("UNDECIDABLE").group == "UNDECIDABLE"
assert GEMTag.from_string("UNDECIDABLE").sub_group is None
with pytest.raises(AssertionError):
def test_group_tags():
