Commit 24c03bfe authored by Felix Delattre's avatar Felix Delattre
Browse files

Added rabotnik message bus logic

parent c18bc3eb
Pipeline #29031 passed with stage
in 2 minutes and 1 second
......@@ -27,7 +27,7 @@ setup(
description="Trigger calculations on the Rabotnik Message Bus based on \
OpenStreetMap's augmented diffs",
license="AGPLv3+",
install_requires=["aiofiles", "osmdiff>=0.1.9"],
install_requires=["aiofiles", "osmdiff>=0.1.9", "rabotnik>=0.0.3"],
extras_require={
"tests": tests_require,
"linters": linters_require,
......
......@@ -40,6 +40,11 @@ class Configuration:
self.overpass_base_url = config["overpass_base_url"]
self.filepath = config["filepath"]
self.state_filename = config["state_filename"]
self.rabotnik = {
"hostname": config["rabotnik"]["hostname"],
"username": config["rabotnik"]["username"],
"password": config["rabotnik"]["password"],
}
except KeyError as e:
logger.info("Error: The variable %s hasn't been set in the config file." % str(e))
sys.exit(1)
......@@ -21,6 +21,7 @@ import sys
import osmdiff
from rabotnik import MessageBus
from spearhead.configure import Configuration
from spearhead.file_handler import FileHandler
......@@ -35,9 +36,15 @@ class DiffAnalyzer:
"augmented diffs".
"""
def __init__(self, diff_provider: osmdiff.AugmentedDiff, config: Configuration):
def __init__(
self,
diff_provider: osmdiff.AugmentedDiff,
config: Configuration,
message_bus: MessageBus,
):
self.diff_provider = diff_provider
self.file_handler = FileHandler(config)
self.message_bus = message_bus
async def get_list_of_new_diffs(self):
self.diff_provider.get_state()
......@@ -67,30 +74,35 @@ class DiffAnalyzer:
augmented diff.
"""
logger.info(" " + str(augmented_diff))
building_count = 0
numbers = []
# Check on newly created objects
for n in augmented_diff.create:
if n.tags.get("building"):
if self._is_way_or_relation(n):
building_count += 1
numbers.append(n.attribs.get("id"))
# Check on objects that have been modified
for n in augmented_diff.modify:
if n["new"].tags.get("building"):
if self._is_way_or_relation(n["new"]):
building_count += 1
numbers.append(n["new"].attribs.get("id"))
elif n["old"].tags.get("building"):
if self._is_way_or_relation(n["old"]):
building_count += 1
numbers.append(n["old"].attribs.get("id"))
# Check on deleted objects
for n in augmented_diff.delete:
if self._is_way_or_relation(n):
building_count += 1
numbers.append(n.attribs.get("id"))
await self._set_buildings_on_message_bus(numbers)
logger.info(" building_count: " + str(building_count))
async def _set_buildings_on_message_bus(self, numbers: list) -> None:
"""Set a list of building ids on the message bus"""
for number in numbers:
await self.message_bus.send(message="building", payload={"building_id": number})
def _is_way_or_relation(self, element) -> bool:
"""Check if element is of type `osm.Way` or `osm.Relation`"""
......
......@@ -22,6 +22,7 @@ import sys
import osmdiff
from rabotnik import MessageBus
from spearhead.configure import Configuration
from spearhead import DiffAnalyzer
......@@ -48,7 +49,14 @@ async def get_changed_buildings(config: Configuration):
diff_provider.base_url = config.overpass_base_url
logger.info("Using Overpass API: " + diff_provider.base_url)
diff_analyzer = DiffAnalyzer(diff_provider, config)
message_bus = MessageBus()
await message_bus.connect(
host=config.rabotnik["hostname"],
username=config.rabotnik["username"],
password=config.rabotnik["password"],
)
diff_analyzer = DiffAnalyzer(diff_provider, config, message_bus)
while True:
try:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment