Commit 3dce5c7b authored by Marius Kriegerowski's avatar Marius Kriegerowski
Browse files

fix rules 1 and 2

parent 4edb8fde
...@@ -61,12 +61,30 @@ async def start_rabotnik_obm(message_bus): ...@@ -61,12 +61,30 @@ async def start_rabotnik_obm(message_bus):
await message_bus.subscribe("building", rules.run) await message_bus.subscribe("building", rules.run)
async def start_rabotnik_gem_occupancy(
message_bus, storage_consumer, storage_contributor, n_processes_max
):
rules = [
GetBuilding(storage_consumer, storage_contributor),
GetFloorspace(storage_consumer, storage_contributor),
GetBuildingLandUse(storage_consumer),
GetPointsInBuilding(storage_consumer),
]
rules = Assembly(rules=rules, n_processes_max=n_processes_max)
await message_bus.subscribe("building", rules.run)
async def start_rabotnik(args): async def start_rabotnik(args):
logger.info("start rabotnik") logger.info("start rabotnik")
message_bus = await connected_message_bus(args.config_message_bus) message_bus = await connected_message_bus(args.config_message_bus)
await start_rabotnik_obm(message_bus) # await start_rabotnik_obm(
# message_bus, storage_consumer, storage_contributor, args.n_processes_max
# )
await
def main(): def main():
......
...@@ -17,18 +17,19 @@ ...@@ -17,18 +17,19 @@
# along with this program. If not, see http://www.gnu.org/licenses/. # along with this program. If not, see http://www.gnu.org/licenses/.
from __future__ import annotations from __future__ import annotations
import logging
import csv import csv
import logging
from rabotnik import Rule from rabotnik import Rule
from rabotnik.storages.base import StorageBase from rabotnik.storages.base import StorageBase
from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper, TagStatistics, GEMTag from rabotnikobm.rules.gem_occupancy.mapping import OccupancyMapper, TagStatistics, GEMTag
logger = logging.getLogger() logger = logging.getLogger()
class TagResult(Exception): class TagResult(Exception):
"""Raise this exception with the identified occupancy""" """Raise this exception with the identified occupancy"""
def __init__(self, tag: GEMTag): def __init__(self, tag: GEMTag):
...@@ -36,7 +37,6 @@ class TagResult(Exception): ...@@ -36,7 +37,6 @@ class TagResult(Exception):
class OverridingOccupancy: class OverridingOccupancy:
"""Takes precedence over other mappings. """Takes precedence over other mappings.
If `OverridingOccupancy.apply` returns a result this will be the designated occupancy. If `OverridingOccupancy.apply` returns a result this will be the designated occupancy.
...@@ -66,15 +66,23 @@ class OverridingOccupancy: ...@@ -66,15 +66,23 @@ class OverridingOccupancy:
def check_exactly_one_unique_tag(occupancies: TagStatistics): def check_exactly_one_unique_tag(occupancies: TagStatistics):
"""raises `TagResult` if there is exactly one unique type""" """raises `TagResult` if there is exactly one unique group"""
if occupancies.exactly_one_unique_tag(): if occupancies.exactly_one_unique_tag():
raise TagResult(occupancies.tags[0]) raise TagResult(occupancies.tags[0])
def check_one_unique_sub_type(occupancies: TagStatistics): def check_one_unique_sub_group(occupancies: TagStatistics):
if occupancies.exactly_one_unique_type() and occupancies.number_of_sub_groups == 1: print(occupancies)
# TODO: find raise TagResult(occupancies....) print(occupancies.exactly_one_unique_tag())
raise TagResult("ASDASDF") print(occupancies.number_of_unique_groups)
if occupancies.number_of_unique_groups == 1 and occupancies.number_of_unique_subgroups <= 2:
raise TagResult(
GEMTag(
group=occupancies.unique_group,
sub_group=occupancies.unique_sub_group,
sub_sub_group=occupancies.unique_sub_sub_group,
)
)
class GetBuildingOccupancy(Rule): class GetBuildingOccupancy(Rule):
......
...@@ -29,6 +29,10 @@ from typing import Optional ...@@ -29,6 +29,10 @@ from typing import Optional
REGEX_GEM_GROUP = r"(?P<group>^[A-Z]*)(?P<sub_group>[0-9]*)(?P<sub_sub_group>[A-Z]*$)" REGEX_GEM_GROUP = r"(?P<group>^[A-Z]*)(?P<sub_group>[0-9]*)(?P<sub_sub_group>[A-Z]*$)"
class GEMTagException(Exception):
pass
@dataclass @dataclass
class GEMTag: class GEMTag:
"""Represents a single GEM classification """Represents a single GEM classification
...@@ -70,25 +74,69 @@ class TagStatistics( ...@@ -70,25 +74,69 @@ class TagStatistics(
[ [
"tags", "tags",
"tags_counter", "tags_counter",
"types_counter", "groups_counter",
"subtypes_counter", "subgroups_counter",
"subsubgroups_counter",
], ],
) )
): ):
def exactly_one_unique_tag(self) -> bool: def exactly_one_unique_tag(self) -> bool:
return self.number_of_unique_tags == 1 return self.number_of_unique_tags == 1
@property
def subsubgroups(self):
"""list of available sub_sub_groups without None"""
return list(filter(None, self.subsubgroups_counter.keys()))
@property
def subgroups(self):
"""list of available sub_groups without None"""
return list(filter(None, self.subgroups_counter.keys()))
@property @property
def number_of_unique_tags(self) -> int: def number_of_unique_tags(self) -> int:
return len(self.tags_counter) return len(set(self.tags_counter))
@property @property
def number_of_unique_types(self) -> int: def number_of_unique_groups(self) -> int:
return len(self.tags_counter) return len(list(self.groups_counter))
@property @property
def number_of_unique_subtypes(self) -> int: def number_of_unique_subgroups(self) -> int:
return len(self.subtypes_counter) return len(self.subgroups_counter)
@property
def unique_group(self) -> str:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
if self.number_of_unique_groups > 1:
raise GEMTagException(f"{self} has more than unique group.")
return list(self.groups_counter.keys())[0]
@property
def unique_sub_group(self) -> Optional[int]:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
if len(self.subgroups) > 1:
raise GEMTagException(f"{self} has more than unique sub_group.")
if len(self.subgroups) == 0:
return None
return self.subgroups[0]
@property
def unique_sub_sub_group(self) -> Optional[str]:
"""Get the unique group ID. In case of multiple available groups, raise GEMTagException."""
if len(self.subsubgroups) > 1:
raise GEMTagException(f"{self} has more than unique sub_sub_group.")
if len(self.subsubgroups) == 0:
return None
return self.subsubgroups[0]
@classmethod @classmethod
def from_strings(cls, tags: list[str]) -> TagStatistics: def from_strings(cls, tags: list[str]) -> TagStatistics:
...@@ -99,17 +147,21 @@ class TagStatistics( ...@@ -99,17 +147,21 @@ class TagStatistics(
def from_tags(cls, tags: list[GEMTag]) -> TagStatistics: def from_tags(cls, tags: list[GEMTag]) -> TagStatistics:
tags_counter = Counter(tags) tags_counter = Counter(tags)
types = [tag.group for tag in tags] groups = [tag.group for tag in tags]
types_counter = Counter(types) groups_counter = Counter(groups)
subgroups = [tag.sub_group for tag in tags]
subgroups_counter = Counter(subgroups)
subtypes = [tag.sub_group for tag in tags] subsubgroups = [tag.sub_sub_group for tag in tags]
subtypes_counter = Counter(subtypes) subsubgroups_counter = Counter(subsubgroups)
return cls( return cls(
tags=tags, tags=tags,
tags_counter=tags_counter, tags_counter=tags_counter,
types_counter=types_counter, groups_counter=groups_counter,
subtypes_counter=subtypes_counter, subgroups_counter=subgroups_counter,
subsubgroups_counter=subsubgroups_counter,
) )
......
...@@ -23,6 +23,7 @@ from rabotnikobm.rules.gem_occupancy.get_building_occupancy import ( ...@@ -23,6 +23,7 @@ from rabotnikobm.rules.gem_occupancy.get_building_occupancy import (
check_exactly_one_unique_tag, check_exactly_one_unique_tag,
TagStatistics, TagStatistics,
TagResult, TagResult,
check_one_unique_sub_group,
) )
from rabotnikobm.rules.gem_occupancy.mapping import GEMTag from rabotnikobm.rules.gem_occupancy.mapping import GEMTag
...@@ -57,15 +58,27 @@ def test_overriding_occupancy(overriding_occupancies): ...@@ -57,15 +58,27 @@ def test_overriding_occupancy(overriding_occupancies):
def test_overriding_occupancy_unknown(): def test_overriding_occupancy_unknown():
"""Test Rule #1 """
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
TagStatistics.from_strings(["unknown tag"]) TagStatistics.from_strings(["unknown tag"])
def test_unique_tags(): def test_unique_tags():
"""Rule #2""" """Rule"""
tags = TagStatistics.from_tags([GEMTag.from_string("COM"), GEMTag.from_string("COM")]) tags = TagStatistics.from_strings(["COM", "COM"])
with pytest.raises(TagResult) as e: with pytest.raises(TagResult) as e:
check_exactly_one_unique_tag(tags) check_exactly_one_unique_tag(tags)
assert e.value.tag.group == "COM" assert e.value.tag.group == "COM"
def test_rule2():
"""Rule #2"""
tags = TagStatistics.from_strings(["RES", "RES1"])
with pytest.raises(TagResult) as e:
check_one_unique_sub_group(tags)
assert e.value.tag.group == "RES"
assert e.value.tag.sub_group == 1
...@@ -16,18 +16,41 @@ ...@@ -16,18 +16,41 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/. # along with this program. If not, see http://www.gnu.org/licenses/.
from rabotnikobm.rules.gem_occupancy.mapping import group_tags, GEMTag, TagStatistics from rabotnikobm.rules.gem_occupancy.mapping import (
group_tags,
GEMTag,
TagStatistics,
GEMTagException,
)
import pytest import pytest
def test_tag_statistics(): def test_tag_statistics():
stats = TagStatistics.from_tags([GEMTag.from_string("COM"), GEMTag.from_string("COM")]) stats = TagStatistics.from_strings(["COM", "COM"])
assert stats.exactly_one_unique_tag() is True assert stats.exactly_one_unique_tag() is True
stats = TagStatistics.from_tags([GEMTag.from_string("COM"), GEMTag.from_string("COM1")]) stats = TagStatistics.from_strings(["COM", "COM1"])
assert stats.exactly_one_unique_tag() is False assert stats.exactly_one_unique_tag() is False
def test_tag_statistics_unique_getters():
stats = TagStatistics.from_strings(["COM", "COM"])
assert stats.unique_group == "COM"
assert stats.unique_sub_group is None
stats = TagStatistics.from_strings(["COM", "COM1"])
assert stats.unique_group == "COM"
assert stats.unique_sub_group is 1
assert stats.unique_sub_sub_group is None
stats = TagStatistics.from_strings(["COM", "COM1", "RES2"])
with pytest.raises(GEMTagException):
assert stats.unique_group != "COM"
with pytest.raises(GEMTagException):
assert stats.unique_sub_group != 1
def test_tag_hash(): def test_tag_hash():
assert hash(GEMTag.from_string("COM")) == hash(GEMTag.from_string("COM")) assert hash(GEMTag.from_string("COM")) == hash(GEMTag.from_string("COM"))
assert hash(GEMTag.from_string("COM1")) != hash(GEMTag.from_string("COM")) assert hash(GEMTag.from_string("COM1")) != hash(GEMTag.from_string("COM"))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment