diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000000000000000000000000000000000000..9b6a330ee6e6927d765a7a9e5b200cb6b6c4ffdf --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include rabotnikobm/rules/gem_occupancy/data/*.csv +include LICENSE diff --git a/docs/occupancy.md b/data/occupancy.md similarity index 100% rename from docs/occupancy.md rename to data/occupancy.md diff --git a/rabotnikobm/instance.py b/rabotnikobm/instance.py index 6165d3763de0f2a5479ab07bb4a84b5bf99561d7..8ab5994ee47b889863f9820cfba4c193c5f0b31d 100644 --- a/rabotnikobm/instance.py +++ b/rabotnikobm/instance.py @@ -24,11 +24,12 @@ import argparse from rabotnik import Rabotnik, Assembly from rabotnik.bus import MessageBus -# These need to be absolute imports. Otherwise, celery will fail to load them -from rabotnikobm.rules import GetBuilding -from rabotnikobm.rules import GetFloorspace -from rabotnikobm.rules import GetBuildingLandUse -from rabotnikobm.rules import GetPointsInBuilding +from rabotnikobm.rules.gem_occupancy.get_building_occupancy import GetBuildingOccupancy + +from .rules import GetBuilding +from .rules import GetFloorspace +from .rules import GetBuildingLandUse +from .rules import GetPointsInBuilding logger = logging.getLogger(__name__) @@ -61,12 +62,28 @@ async def start_rabotnik_obm(message_bus): await message_bus.subscribe("building", rules.run) +async def start_rabotnik_gem_occupancy(message_bus, storage_consumer, _, n_processes_max): + + rules = [ + GetBuildingOccupancy(storage_consumer), + ] + rules = Assembly(rules=rules, n_processes_max=n_processes_max) + + await message_bus.subscribe("building", rules.run) + + async def start_rabotnik(args): logger.info("start rabotnik") message_bus = await connected_message_bus(args.config_message_bus) - await start_rabotnik_obm(message_bus) + rabotnik_args = (message_bus, storage_consumer, storage_contributor, args.n_processes_max) + + if args.target in ("all", "obm"): + asyncio.create_task(start_rabotnik_obm(*rabotnik_args)) + + if args.target in ("all", "gem"): + asyncio.create_task(start_rabotnik_gem_occupancy(*rabotnik_args)) def main(): @@ -90,6 +107,15 @@ def main(): "--start-celery-worker", action="store_true", ) + + parser.add_argument( + "-t", + "--target", + default="all", + type=str, + help="Process to start (defaults to 'all')", + ) + args = parser.parse_args() default_log_level = logging.WARNING diff --git a/rabotnikobm/rules/gem_occupancy/__init__.py b/rabotnikobm/rules/gem_occupancy/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/docs/rules/occupancy/GEM_taxonomy_strings.csv b/rabotnikobm/rules/gem_occupancy/data/GEM_taxonomy_strings.csv similarity index 100% rename from docs/rules/occupancy/GEM_taxonomy_strings.csv rename to rabotnikobm/rules/gem_occupancy/data/GEM_taxonomy_strings.csv diff --git a/docs/rules/occupancy/README.md b/rabotnikobm/rules/gem_occupancy/data/README.md similarity index 89% rename from docs/rules/occupancy/README.md rename to rabotnikobm/rules/gem_occupancy/data/README.md index 47c384d91099e172880565860b33588f570e31c6..bb79ea15227b56d658e17f03e1ff248ab561d989 100644 --- a/docs/rules/occupancy/README.md +++ b/rabotnikobm/rules/gem_occupancy/data/README.md @@ -14,5 +14,5 @@ In the first two CSV files, the columns are the following: - `key`: OpenStreetMap key. - `value`: value of the key (together, `key=value` are a `tag`). - `taginfo_description`: description of the tag according to [taginfo](https://taginfo.openstreetmap.org/). -- `GEM_taxonomy_occupancy`: the occupancy string according to the GEM Taxonomy. If more than one value is possible, they are separated by "|". +- `GEM_occupancy`: the occupancy string according to the GEM Taxonomy. If more than one value is possible, they are separated by "|". - `comments`: comments associated with the established mapping. diff --git a/docs/rules/occupancy/building_and_PoIs_tags.csv b/rabotnikobm/rules/gem_occupancy/data/building_and_PoIs_tags.csv similarity index 99% rename from docs/rules/occupancy/building_and_PoIs_tags.csv rename to rabotnikobm/rules/gem_occupancy/data/building_and_PoIs_tags.csv index 82e6327fb4d520eb1278e74e2268b37e6af1d2c1..656f6f7aab807a46dd91610ea6ecdb7be06ee8c3 100644 --- a/docs/rules/occupancy/building_and_PoIs_tags.csv +++ b/rabotnikobm/rules/gem_occupancy/data/building_and_PoIs_tags.csv @@ -1,4 +1,4 @@ -key,value,taginfo_description,GEM_taxonomy_occupancy,comments +key,value,taginfo_description,occupancy,comments aerialway,station,"A station where passengers and/or goods can enter and/or leave the aerialway (forms of transport that use wires, including cable-cars, chair-lifts and drag-lifts)",COM,COM contains other kinds of transport stations but not one specific for aerialways aeroway,hangar,A large airport building with extensive floor areas for housing aircraft or spacecraft,COM10,Unclear if hangar is treated as standard part of an airport or not aeroway,terminal,An airport passenger building,COM10, diff --git a/docs/rules/occupancy/landuse_tags.csv b/rabotnikobm/rules/gem_occupancy/data/landuse_tags.csv similarity index 99% rename from docs/rules/occupancy/landuse_tags.csv rename to rabotnikobm/rules/gem_occupancy/data/landuse_tags.csv index 6c450ff9887170877ab97885afec9dbffb947d50..f259d8dadeee0ad7b7f7a7e8ef45cdc42612cd19 100644 --- a/docs/rules/occupancy/landuse_tags.csv +++ b/rabotnikobm/rules/gem_occupancy/data/landuse_tags.csv @@ -1,4 +1,4 @@ -key,value,taginfo_description,GEM_taxonomy_occupancy,comments +key,value,taginfo_description,occupancy,comments amenity,university,"An educational institution designed for instruction, examination, or both, of students in many branches of advanced learning.",EDU3, amenity,school,A primary or secondary school (pupils typically aged 6 to 18).,EDU2, amenity,college,"A place for further education, a post-secondary education institution which is not a University",EDU3, diff --git a/docs/rules/occupancy/overriding_occupancies.csv b/rabotnikobm/rules/gem_occupancy/data/overriding_occupancies.csv similarity index 100% rename from docs/rules/occupancy/overriding_occupancies.csv rename to rabotnikobm/rules/gem_occupancy/data/overriding_occupancies.csv index df17dc8bbb9cf59842155fa40a05613dca4071b7..7f21514fed554d071fd86b0b6d80e498f183628c 100644 --- a/docs/rules/occupancy/overriding_occupancies.csv +++ b/rabotnikobm/rules/gem_occupancy/data/overriding_occupancies.csv @@ -1,13 +1,13 @@ -ASS1, religious gathering -ASS2, arena +COM10, airport +COM9, railway station +COM8, bus station COM4, hospital/medical clinic +GOV2, government emergency response +GOV1, government general services COM6, public building (gallery;museum;monument building;library) -COM8, bus station -COM9, railway station -COM10, airport -RES3, temporary lodging (hotels;motels;guest lodges;cabins) +ASS2, arena EDU2, school EDU3, offices and/or classrooms of college/university EDU4, research facilities and/or labs of college/university -GOV1, government general services -GOV2, government emergency response +RES3, temporary lodging (hotels;motels;guest lodges;cabins) +ASS1, religious gathering diff --git a/rabotnikobm/rules/gem_occupancy/get_building_occupancy.py b/rabotnikobm/rules/gem_occupancy/get_building_occupancy.py new file mode 100644 index 0000000000000000000000000000000000000000..82ef276b1459c1b876b6fb86f42d23067e1d3b55 --- /dev/null +++ b/rabotnikobm/rules/gem_occupancy/get_building_occupancy.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2021: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +from __future__ import annotations + +import csv +import logging +from pathlib import Path +from typing import Optional + +from rabotnik import Rule +from rabotnik.storages.base import StorageBase + +from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper, TagStatistics, GEMTag + +logger = logging.getLogger(__name__) + +MODULE_PATH = Path(__file__).parent + + +class TagResult(Exception): + """Raise this exception with the identified occupancy""" + + def __init__(self, tag: GEMTag): + self.tag = tag + + +def apply_rules(rules, occupancies: TagStatistics) -> Optional[GEMTag]: + for rule in rules: + try: + rule.apply(occupancies) + except TagResult as result: + return result.tag + + +class OverridingOccupancy: + """Takes precedence over other mappings. + + If `OverridingOccupancy.apply` returns a result this will be the designated occupancy. + Otherwise will return None. + """ + + def __init__(self, mapping): + self.mapping = mapping + + def apply(self, occupancies: TagStatistics) -> None: + """Apply the loaded mapping to a list of `occupancies`.""" + + for candidate in self.mapping.keys(): + if candidate in occupancies.tags: + raise TagResult(candidate) + + @classmethod + def from_csv(cls, fn: Path) -> OverridingOccupancy: + """Read a csv and initialize a `OverridingOccupancy` instance.""" + + with open(fn) as csvfile: + occupancy_mapping = {k.strip(): v.strip() for (k, v) in csv.reader(csvfile)} + + occupancy_mapping = {GEMTag.from_string(k): v for k, v in occupancy_mapping.items()} + + return cls(mapping=occupancy_mapping) + + @classmethod + def overriding_occupancies(cls): + fn = MODULE_PATH / "data/overriding_occupancies.csv" + return cls.from_csv(fn) + + +class RuleOneUniqueTag: + @staticmethod + def apply(occupancies: TagStatistics) -> None: + """raises `TagResult` if there is exactly one unique group""" + if occupancies.exactly_one_unique_tag(): + raise TagResult(occupancies.tags[0]) + + +class RulesOneUniqueSubGroup: + @staticmethod + def apply(occupancies: TagStatistics) -> None: + if ( + occupancies.number_of_unique_groups == 1 + and occupancies.number_of_unique_subgroups <= 2 + ): + raise TagResult( + GEMTag( + group=occupancies.unique_group, + sub_group=occupancies.unique_sub_group, + sub_sub_group=occupancies.unique_sub_sub_group, + ) + ) + + +@Rule.app.task(bind=True, base=Rule) +class GetBuildingOccupancy(Rule): + """A rule to map OSM tags to building occupancies.""" + + def __init__(self, storage: StorageBase): + self.storage = storage + + self.mappers = [ + OccupancyMapper.landuse_mapper(), + OccupancyMapper.building_poi_mapper(), + ] + + self.candidates = [ + OverridingOccupancy.overriding_occupancies(), + RuleOneUniqueTag, + RulesOneUniqueSubGroup, + ] + + def evaluate(self, building_id: int) -> GEMTag | None: + logger.debug("Processing building: %s", building_id) + tags = self.storage.expect_one( + f"SELECT tags FROM osm_building_relations WHERE osm_id={building_id} AND index=0" + ) + + logger.debug(f"working on tags: {tags}") + if not tags: + return + + # Mapping to GEM taxonomy strings + occupancies = [] + for mapper in self.mappers: + occupancies.extend(mapper.apply(tags)) + + # Convert derived occupancies to GEMTags + occupancies = TagStatistics.from_strings(occupancies) + + # Apply rules to find final taxonomy + occupancy = apply_rules(self.candidates, occupancies) + + logger.debug("occupancies %s: %s", building_id, occupancy) + return occupancy diff --git a/rabotnikobm/rules/gem_occupancy/mapping.py b/rabotnikobm/rules/gem_occupancy/mapping.py new file mode 100644 index 0000000000000000000000000000000000000000..ccbf1b4f319a4ef7481d1a5843f796303221b486 --- /dev/null +++ b/rabotnikobm/rules/gem_occupancy/mapping.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2021: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. +from __future__ import annotations +import re +import csv +from pathlib import Path +from collections import defaultdict, namedtuple, Counter +from dataclasses import dataclass +from typing import Optional + + +# Regular expression to extract group (0-N characters at beginning), +# sub_group (0-N digits) and sub_sub_group (0-N characters and end) of GEM tags +# Can be letters-number-letters(-numbers) +REGEX_GEM_GROUP = r"(?P^[A-Z]*)(?P[0-9]*)(?P[A-Z]*$)" + + +MODULE_PATH = Path(__file__).parent + + +class GEMTagException(Exception): + pass + + +@dataclass +class GEMTag: + """Represents a single GEM classification + + Args: + group (str): multi-character GEM classifier + sub_group (int): multi-digit GEM classifier sub-group + sub_sub_group (str): multi-char GEM classifier sub-sub-group + """ + + group: str + sub_group: Optional[int] + sub_sub_group: Optional[str] + + @classmethod + def from_string(cls, tag_as_string: str) -> GEMTag: + """Instantiate `GEMClassification`s from GEM occupancy string.""" + matched = re.search(REGEX_GEM_GROUP, tag_as_string, re.IGNORECASE) + assert matched is not None, f"could not convert '{tag_as_string}' to GEMTag" + + group = str(matched.group("group")) + assert group, f"could not extract `group` from {tag_as_string}" + + sub_group = matched.group("sub_group") + sub_group = int(sub_group) if sub_group != "" else None + + sub_sub_group = matched.group("sub_sub_group") + sub_sub_group = sub_sub_group if sub_sub_group != "" else None + + return cls(group, sub_group, sub_sub_group) + + def __hash__(self): + return hash((self.group, self.sub_group, self.sub_sub_group)) + + +class TagStatistics( + namedtuple( + "TagStatistics", + [ + "tags", + "tags_counter", + "groups_counter", + "subgroups_counter", + "subsubgroups_counter", + ], + ) +): + def exactly_one_unique_tag(self) -> bool: + return self.number_of_unique_tags == 1 + + @property + def subsubgroups(self): + """list of available sub_sub_groups without None""" + return list(filter(None, self.subsubgroups_counter.keys())) + + @property + def subgroups(self): + """list of available sub_groups without None""" + return list(filter(None, self.subgroups_counter.keys())) + + @property + def number_of_unique_tags(self) -> int: + return len(set(self.tags_counter)) + + @property + def number_of_unique_groups(self) -> int: + return len(list(self.groups_counter)) + + @property + def number_of_unique_subgroups(self) -> int: + return len(self.subgroups_counter) + + @property + def unique_group(self) -> str: + """Get the unique group ID. In case of multiple available groups, + raises GEMTagException.""" + + if self.number_of_unique_groups > 1: + raise GEMTagException(f"{self} has more than unique group.") + + return list(self.groups_counter.keys())[0] + + @property + def unique_sub_group(self) -> Optional[int]: + """Get the unique group ID. In case of multiple available groups, + raises GEMTagException.""" + + if len(self.subgroups) > 1: + raise GEMTagException(f"{self} has more than unique sub_group.") + + if len(self.subgroups) == 0: + return None + + return self.subgroups[0] + + @property + def unique_sub_sub_group(self) -> Optional[str]: + """Get the unique group ID. In case of multiple available groups, + raises GEMTagException.""" + + if len(self.subsubgroups) > 1: + raise GEMTagException(f"{self} has more than unique sub_sub_group.") + + if len(self.subsubgroups) == 0: + return None + + return self.subsubgroups[0] + + @classmethod + def from_strings(cls, tags: list[str]) -> TagStatistics: + tags = [GEMTag.from_string(tag_as_string) for tag_as_string in tags] + return cls.from_tags(tags) + + @classmethod + def from_tags(cls, tags: list[GEMTag]) -> TagStatistics: + tags_counter = Counter(tags) + + groups = [tag.group for tag in tags] + groups_counter = Counter(groups) + + subgroups = [tag.sub_group for tag in tags] + subgroups_counter = Counter(subgroups) + + subsubgroups = [tag.sub_sub_group for tag in tags] + subsubgroups_counter = Counter(subsubgroups) + + return cls( + tags=tags, + tags_counter=tags_counter, + groups_counter=groups_counter, + subgroups_counter=subgroups_counter, + subsubgroups_counter=subsubgroups_counter, + ) + + +def group_tags(occupancy_mapping: list[dict[str, str]]) -> dict[str, dict[str, str]]: + """Extract groups of identical tags and group them in nested dicts.""" + + grouped_tag_mapping = defaultdict(dict) + + for row in occupancy_mapping: + key = row.pop("key") + value = row.pop("value") + grouped_tag_mapping[key].update({value: row}) + + return grouped_tag_mapping + + +class OccupancyMapper: + + """Map osm tags to `occupancy` categories.""" + + def __init__(self, mapping): + self.mapping = mapping + + def apply(self, osm_tags: dict[dict[str, str]]) -> list[str]: + """Map `osm_tags` to lists of occupancies as defined in + `building_and_POIs_tags.csv` and `landuse_tags.csv`. Tags may be duplicated. + + Args: + osm_tags: list of OSM building tag strings associated with a specific building + Returns: + list of `occupancy`s + """ + + occupancies = [] + for osm_tag in osm_tags: + for key, value in osm_tag.items(): + occupancy = self.mapping.get(key, {}).get(value, None) + if occupancy is not None: + occupancies.append(occupancy["occupancy"]) + + return occupancies + + @classmethod + def from_csv(cls, fn: Path) -> OccupancyMapper: + """Read a csv and initialize a `OccupancyMapper`.""" + + with open(fn) as csvfile: + occupancy_mapping = list(csv.DictReader(csvfile)) + + occupancy_mapping_grouped = group_tags(occupancy_mapping) + + return cls(mapping=occupancy_mapping_grouped) + + @classmethod + def landuse_mapper(cls: OccupancyMapper): + fn_mapping = MODULE_PATH / "data/landuse_tags.csv" + return cls.from_csv(fn_mapping) + + @classmethod + def building_poi_mapper(cls: OccupancyMapper): + fn_mapping = MODULE_PATH / "data/building_and_PoIs_tags.csv" + return cls.from_csv(fn_mapping) diff --git a/rabotnikobm/rules/get_building.py b/rabotnikobm/rules/get_building.py index 9705639cfcd00ac8313d4973ba9400fa940547ff..1bbec1486d7fda8f48b98ad709e227cfcaa9b56c 100644 --- a/rabotnikobm/rules/get_building.py +++ b/rabotnikobm/rules/get_building.py @@ -37,7 +37,7 @@ class GetBuilding(Rule): """A rule to copy a building entry from a source database to a destination database.""" @Rule.app.task(bind=True, base=Rule) - def evaluate(self, building_id): + def evaluate(self, building_id: int): logger.info("called building task") diff --git a/setup.py b/setup.py index 9fa49bb811a8c9f1caad668024e9ce3091635f0a..dc717a125bf8e75b32e818aee86ac43d609187d2 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ from setuptools import setup, find_packages -tests_require = ["pytest"] +tests_require = ["pytest", "pytest-asyncio"] linters_require = ["pylint", "pre-commit"] setup( @@ -33,7 +33,8 @@ setup( "tests": tests_require, "linters": linters_require, }, - packages=find_packages(), entry_points={"console_scripts": ["rabotnikobm = rabotnikobm.instance:main"]}, python_requires=">=3.6", + packages=find_packages(), + include_package_data=True, ) diff --git a/tests/conftest.py b/tests/conftest.py index d4ec532e2f70ed25b11981d7c8254f661b6727a4..779703ee913ca5ae3d7a4e41040fdc0603b43d5c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,8 @@ import logging import pytest import rabotnik +from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper + logger = logging.getLogger(__name__) @@ -48,6 +50,18 @@ def storage_consumer(pytestconfig): storage.disconnect() +@pytest.fixture +def building_poi_mapper(): + mapper = OccupancyMapper.building_poi_mapper() + yield mapper + + +@pytest.fixture +def landuse_mapper(): + mapper = OccupancyMapper.landuse_mapper() + yield mapper + + def pytest_collection_modifyitems(config, items): storage_configuration = config.getoption("storage_contributor") if storage_configuration: diff --git a/tests/test_get_gem_occupancy.py b/tests/test_get_gem_occupancy.py new file mode 100644 index 0000000000000000000000000000000000000000..7c514903a4fd31de9e4527aad4d7e7f5d837732b --- /dev/null +++ b/tests/test_get_gem_occupancy.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2021: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import pytest +from rabotnikobm.rules.gem_occupancy.get_building_occupancy import ( + OverridingOccupancy, + GetBuildingOccupancy, + RuleOneUniqueTag, + TagStatistics, + TagResult, + RulesOneUniqueSubGroup, +) +from rabotnikobm.rules.gem_occupancy.mapping import GEMTag + + +@pytest.fixture() +def overriding_occupancies(): + yield OverridingOccupancy.overriding_occupancies() + + +@pytest.mark.requires_storage +@pytest.mark.asyncio +async def test_rule_get_building_occupancy(storage_consumer, building_poi_mapper): + rule = GetBuildingOccupancy(storage=storage_consumer, occupancy_mapper=building_poi_mapper) + + payload = {"building_id": -6744517} + result = await rule.evaluate(payload=payload) + + assert result == ["ASS4"] + + +def test_overriding_occupancy(overriding_occupancies: OverridingOccupancy): + """Rule #1""" + demo_tags = TagStatistics.from_strings(["ASS1", "COM10"]) + try: + overriding_occupancies.apply(demo_tags) + except TagResult as e: + occupancy = e.tag + assert occupancy == GEMTag.from_string("COM10") + + +def test_overriding_occupancy_unknown(): + """Test Rule #1 """ + with pytest.raises(AssertionError): + TagStatistics.from_strings(["unknown tag"]) + + +def test_unique_tags(): + """Rule""" + rule = RuleOneUniqueTag() + tags = TagStatistics.from_strings(["COM", "COM"]) + with pytest.raises(TagResult) as e: + rule.apply(tags) + + assert e.value.tag.group == "COM" + + +def test_rule2(): + """Rule #2""" + rule = RulesOneUniqueSubGroup() + tags = TagStatistics.from_strings(["RES", "RES1"]) + with pytest.raises(TagResult) as e: + rule.apply(tags) + + assert e.value.tag.group == "RES" + assert e.value.tag.sub_group == 1 diff --git a/tests/test_occupancy_mapping.py b/tests/test_occupancy_mapping.py new file mode 100644 index 0000000000000000000000000000000000000000..4cbb5f99d9ad7396665561aab6c4d02a2d05be0a --- /dev/null +++ b/tests/test_occupancy_mapping.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2021: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +from rabotnikobm.rules.gem_occupancy.mapping import ( + group_tags, + GEMTag, + TagStatistics, + GEMTagException, +) +import pytest + + +def test_tag_statistics(): + stats = TagStatistics.from_strings(["COM", "COM"]) + assert stats.exactly_one_unique_tag() is True + + stats = TagStatistics.from_strings(["COM", "COM1"]) + assert stats.exactly_one_unique_tag() is False + + +def test_tag_statistics_unique_getters(): + stats = TagStatistics.from_strings(["COM", "COM"]) + assert stats.unique_group == "COM" + assert stats.unique_sub_group is None + + stats = TagStatistics.from_strings(["COM", "COM1"]) + assert stats.unique_group == "COM" + assert stats.unique_sub_group is 1 + assert stats.unique_sub_sub_group is None + + stats = TagStatistics.from_strings(["COM", "COM1", "RES2"]) + with pytest.raises(GEMTagException): + assert stats.unique_group != "COM" + + with pytest.raises(GEMTagException): + assert stats.unique_sub_group != 1 + + +def test_tag_hash(): + assert hash(GEMTag.from_string("COM")) == hash(GEMTag.from_string("COM")) + assert hash(GEMTag.from_string("COM1")) != hash(GEMTag.from_string("COM")) + + +def test_gem_classification_parser(): + assert GEMTag.from_string("COM").group == "COM" + assert GEMTag.from_string("COM").sub_group is None + + assert GEMTag.from_string("COM1").group == "COM" + assert GEMTag.from_string("COM1").sub_group == 1 + + assert GEMTag.from_string("COM11").sub_group == 11 + assert GEMTag.from_string("COM11A").sub_sub_group == "A" + + assert GEMTag.from_string("UNDECIDABLE").group == "UNDECIDABLE" + assert GEMTag.from_string("UNDECIDABLE").sub_group is None + + with pytest.raises(AssertionError): + # Tag must not contain whitespace + GEMTag.from_string("unknown tag") + + with pytest.raises(AssertionError): + # Tag must start with character (group) + GEMTag.from_string("1") + + +def test_group_tags(): + rows = [ + {"key": "a", "value": 0}, + {"key": "a", "value": 1}, + {"key": "b", "value": 2}, + ] + + grouped = group_tags(rows) + assert grouped == {"a": {0: {}, 1: {}}, "b": {2: {}}} + + +def test_mapper_building_pois(building_poi_mapper): + sample_tags = [{"amenity": "community_centre"}, {"amenity": "cafe"}, {"x": "y"}] + assert building_poi_mapper.apply(sample_tags) == ["ASS4", "COM5"] + + +def test_mapper_landuse(landuse_mapper): + sample_tags = [{"amenity": "university", "a": "b"}, {"landuse": "brownfield"}, {"x": "y"}] + assert landuse_mapper.apply(sample_tags) == ["EDU3", "UNDECIDABLE"]