Commit 0a296f52 authored by Marius Kriegerowski's avatar Marius Kriegerowski
Browse files

Added logic to start assembly from dedicated function

parent 08f9effb
Pipeline #28021 passed with stage
in 1 minute and 13 seconds
......@@ -40,34 +40,52 @@ logger = logging.getLogger(__name__)
rabotnik = Rabotnik("rabotnik-obm")
async def start_rabotnik_obm():
async def connected_message_bus(config_message_bus):
# Initiate the message bus
message_bus = MessageBus()
# pylint: disable=no-value-for-parameter
await message_bus.connect(config_file="/etc/rabotnik/rabotnik-message-bus.yml")
await message_bus.connect(config_file=config_message_bus)
return message_bus
logger.info("Started rabotnik for OpenBuildingMap")
async def connected_storages(config_consumer, config_producer):
# Get a storage connection to `osm_replication` (source)
storage_osmreplication = rabotnik.get_storage("postgresql")
await storage_osmreplication.connect(config_file="/etc/rabotnik/storage-osmreplication.yml")
storage_consumer = rabotnik.get_storage("postgresql")
await storage_consumer.connect(config_file=config_consumer)
# Get a storage connection to `obm_buildings` (receiver)
storage_obmbuildings = rabotnik.get_storage("postgresql")
await storage_obmbuildings.connect(config_file="/etc/rabotnik/storage-obmbuildings.yml")
storage_contributor = rabotnik.get_storage("postgresql")
await storage_contributor.connect(config_file=config_producer)
return storage_consumer, storage_contributor
async def start_rabotnik_obm(message_bus, storage_consumer, storage_contributor):
rules = [
GetBuilding(storage_osmreplication, storage_obmbuildings),
GetFloorspace(storage_osmreplication, storage_obmbuildings),
GetBuildingLandUse(storage_osmreplication),
GetPointsInBuilding(storage_osmreplication),
GetBuilding(storage_consumer, storage_contributor),
GetFloorspace(storage_consumer, storage_contributor),
GetBuildingLandUse(storage_consumer),
GetPointsInBuilding(storage_consumer),
]
rules = Assembly(rules=rules)
await message_bus.subscribe("building", rules.run)
async def start_rabotnik(args):
storage_consumer, storage_contributor = await connected_storages(
args.config_consumer, args.config_producer
)
message_bus = await connected_message_bus(args.config_message_bus)
await start_rabotnik_obm(message_bus, storage_consumer, storage_contributor)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
......@@ -78,6 +96,27 @@ def main():
help="increase output verbosity (e.g., -vv is more than -v)",
)
parser.add_argument(
"-cp",
"--config-producer",
type=str,
help="Producer storage configuration",
)
parser.add_argument(
"-cc",
"--config-consumer",
type=str,
help="Consumer storage configuration",
)
parser.add_argument(
"-cmb",
"--config-message-bus",
type=str,
help="Rabotnik message bus configuration",
)
args = parser.parse_args()
default_log_level = logging.WARNING
......@@ -92,7 +131,7 @@ def main():
logger.addHandler(stream_handler)
loop = asyncio.get_event_loop()
loop.create_task(start_rabotnik_obm())
loop.create_task(start_rabotnik(args))
loop.run_forever()
......
......@@ -42,7 +42,6 @@ class GetBuilding(Rule):
async def evaluate(self, payload: dict):
building_id = payload["building_id"]
logger.debug("evaluating %s", building_id)
table_from = "osm_building_polygons"
table_to = "obm_buildings"
......@@ -54,3 +53,5 @@ class GetBuilding(Rule):
# Push the building data to storage_to
await upsert(self.storage_to, table_to, row)
logger.info("finished evaluating %s", building_id)
......@@ -44,4 +44,6 @@ class GetPointsInBuilding(Rule):
n_points_total += n_points
logger.info("Found %i total points in %s", n_points_total, building_id)
return n_points_total
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment