From 83726704f3b7575e9bb83303843d90d3b427588b Mon Sep 17 00:00:00 2001 From: Marius Kriegerowski Date: Fri, 5 Nov 2021 18:00:34 +0100 Subject: [PATCH] Adopted celery and async changes in Rabotnik --- .gitlab-ci.yml | 2 +- rabotnikobm/instance.py | 79 +++++++-------------- rabotnikobm/rules/get_building.py | 17 ++--- rabotnikobm/rules/get_building_land_use.py | 11 +-- rabotnikobm/rules/get_floorspace.py | 23 +++--- rabotnikobm/rules/get_points_in_building.py | 9 +-- setup.py | 2 +- tests/conftest.py | 26 ++----- tests/test_get_building.py | 20 +++--- tests/test_get_building_land_use.py | 4 +- tests/test_get_floorspace.py | 4 +- tests/test_get_points_in_building.py | 6 +- 12 files changed, 69 insertions(+), 134 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3ecd123..cea3aec 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: python:3-buster +image: python:3.9-buster before_script: - python3 -V diff --git a/rabotnikobm/instance.py b/rabotnikobm/instance.py index a71de75..6165d37 100644 --- a/rabotnikobm/instance.py +++ b/rabotnikobm/instance.py @@ -35,10 +35,7 @@ logger = logging.getLogger(__name__) # Initiate the rabotnik -rabotnik = Rabotnik("rabotnik-obm") - -# Celery entrypoint -celery = rabotnik.processor # noqa +rabotnik = Rabotnik() async def connected_message_bus(config_message_bus): @@ -51,44 +48,25 @@ async def connected_message_bus(config_message_bus): return message_bus -async def connected_storages(config_consumer, config_producer): - - # Get a storage connection to `osm_replication` (source) - storage_consumer = rabotnik.get_storage("postgresql") - await storage_consumer.connect(config_file=config_consumer) - - # Get a storage connection to `obm_buildings` (receiver) - storage_contributor = rabotnik.get_storage("postgresql") - await storage_contributor.connect(config_file=config_producer) - - return storage_consumer, storage_contributor - - -async def start_rabotnik_obm( - message_bus, storage_consumer, storage_contributor, n_processes_max -): +async def start_rabotnik_obm(message_bus): rules = [ - GetBuilding(storage_consumer, storage_contributor), - GetFloorspace(storage_consumer, storage_contributor), - GetBuildingLandUse(storage_consumer), - GetPointsInBuilding(storage_consumer), + GetBuilding(), + GetFloorspace(), + GetBuildingLandUse(), + GetPointsInBuilding(), ] - rules = Assembly(rules=rules, n_processes_max=n_processes_max) + rules = Assembly(rules=rules) await message_bus.subscribe("building", rules.run) async def start_rabotnik(args): - storage_consumer, storage_contributor = await connected_storages( - args.config_consumer, args.config_producer - ) + logger.info("start rabotnik") message_bus = await connected_message_bus(args.config_message_bus) - await start_rabotnik_obm( - message_bus, storage_consumer, storage_contributor, args.n_processes_max - ) + await start_rabotnik_obm(message_bus) def main(): @@ -101,20 +79,6 @@ def main(): help="increase output verbosity (e.g., -vv is more than -v)", ) - parser.add_argument( - "-cp", - "--config-producer", - type=str, - help="Producer storage configuration", - ) - - parser.add_argument( - "-cc", - "--config-consumer", - type=str, - help="Consumer storage configuration", - ) - parser.add_argument( "-cmb", "--config-message-bus", @@ -123,13 +87,9 @@ def main(): ) parser.add_argument( - "-np", - "--n-processes-max", - default=1, - type=int, - help="Max number of assemblies to run in parallel", + "--start-celery-worker", + action="store_true", ) - args = parser.parse_args() default_log_level = logging.WARNING @@ -143,10 +103,19 @@ def main(): logger.addHandler(stream_handler) + if args.start_celery_worker: + argv = [ + "worker", + "--loglevel=DEBUG", + "--concurrency=10", + ] + rabotnik.processor.worker_main(argv) + print("... started celery worker") + sys.exit() + + if not args.config_message_bus: + sys.exit("Missing parameter -cmb") + loop = asyncio.get_event_loop() loop.create_task(start_rabotnik(args)) loop.run_forever() - - -if __name__ == "__main__": - main() diff --git a/rabotnikobm/rules/get_building.py b/rabotnikobm/rules/get_building.py index f78c324..9705639 100644 --- a/rabotnikobm/rules/get_building.py +++ b/rabotnikobm/rules/get_building.py @@ -24,10 +24,10 @@ from rabotnik.storages.base import StorageBase logger = logging.getLogger(__name__) -async def upsert(storage: StorageBase, table: str, data: tuple): +def upsert(storage: StorageBase, table: str, data: tuple): """Insert `data` into `storage` and update if `data` is already present.""" osm_id, geometry = data - await storage.execute( + storage.execute( f"INSERT INTO {table} VALUES (%s, %s) ON CONFLICT (osm_id) DO UPDATE SET geometry = %s", (osm_id, geometry, geometry), ) @@ -36,22 +36,19 @@ async def upsert(storage: StorageBase, table: str, data: tuple): class GetBuilding(Rule): """A rule to copy a building entry from a source database to a destination database.""" - def __init__(self, storage_from, storage_to): - self.storage_from = storage_from - self.storage_to = storage_to + @Rule.app.task(bind=True, base=Rule) + def evaluate(self, building_id): - async def evaluate(self, payload: dict): - building_id = payload["building_id"] + logger.info("called building task") table_from = "osm_building_polygons" table_to = "obm_buildings" # Pull the building data from storage_from - for row in await self.storage_from.get_results( + for row in self.storages.storage_from.get_results( f"SELECT osm_id, geometry FROM {table_from} WHERE osm_id={building_id}" ): - # Push the building data to storage_to - await upsert(self.storage_to, table_to, row) + upsert(self.storages.storage_to, table_to, row) logger.info("finished evaluating %s", building_id) diff --git a/rabotnikobm/rules/get_building_land_use.py b/rabotnikobm/rules/get_building_land_use.py index b52384e..dc4bf3c 100644 --- a/rabotnikobm/rules/get_building_land_use.py +++ b/rabotnikobm/rules/get_building_land_use.py @@ -16,7 +16,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see http://www.gnu.org/licenses/. import logging -from typing import Optional from rabotnik import Rule @@ -28,13 +27,9 @@ class GetBuildingLandUse(Rule): """A rule to get the land use of a building. If more than one land use is found, returns None.""" - def __init__(self, storage): - self.storage = storage - - async def evaluate(self, payload: dict) -> Optional[str]: - building_id = payload["building_id"] - - land_use = await self.storage.expect_one( + @Rule.app.task(bind=True, base=Rule) + def evaluate(self, building_id): + land_use = self.storages.storage_from.expect_one( f"""SELECT land.class FROM osm_building_relations AS building, osm_lands AS land WHERE {building_id} = building.id diff --git a/rabotnikobm/rules/get_floorspace.py b/rabotnikobm/rules/get_floorspace.py index 579882f..a40d19e 100644 --- a/rabotnikobm/rules/get_floorspace.py +++ b/rabotnikobm/rules/get_floorspace.py @@ -7,9 +7,9 @@ from rabotnik import Rule # pylint: disable=import-error logger = logging.getLogger(__name__) -async def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float): +def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float): """Insert data into `storage` and update if data is already present.""" - await storage.execute( + storage.execute( "INSERT INTO obm_buildings (osm_id, floorspace) VALUES (%s, %s) " "ON CONFLICT (osm_id) DO UPDATE SET floorspace = %s", (osm_id, floorspace, floorspace), @@ -17,18 +17,11 @@ async def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float): class GetFloorspace(Rule): - def __init__(self, storage: RabotnikStorage, storage_target: RabotnikStorage): - """Calculate floorspace of buildings. + def __init__(self): + """Calculate floorspace of buildings.""" - Args: - storage: Storage serving building data. - storage_target: Storage to fill in result. - """ - self.storage = storage - self.storage_target = storage_target - - async def evaluate(self, payload: dict) -> None: - building_id = payload["building_id"] + @Rule.app.task(bind=True, base=Rule) + def evaluate(self, building_id): logger.debug("evaluating %s", building_id) query_floorspace = f""" @@ -46,10 +39,10 @@ class GetFloorspace(Rule): AND osm_id = {building_id}; """ - floorspace = await self.storage.expect_one(query_floorspace) + floorspace = self.storages.storage_from.expect_one(query_floorspace) logger.info("building %s floorspace: %s.", building_id, floorspace) if floorspace is None: return - await upsert(self.storage_target, building_id, floorspace) + upsert(self.storages.storage_to, building_id, floorspace) diff --git a/rabotnikobm/rules/get_points_in_building.py b/rabotnikobm/rules/get_points_in_building.py index 932ccba..8adc064 100644 --- a/rabotnikobm/rules/get_points_in_building.py +++ b/rabotnikobm/rules/get_points_in_building.py @@ -25,15 +25,12 @@ logger = logging.getLogger(__name__) class GetPointsInBuilding(Rule): """A rule to get the number of points within a building.""" - def __init__(self, storage): - self.storage = storage - - async def evaluate(self, payload: dict) -> int: - building_id = payload["building_id"] + @Rule.app.task(bind=True, base=Rule) + def evaluate(self, building_id): logger.debug("evaluating %s", building_id) n_points_total = 0 - async for n_points, _ in self.storage.iter_results( + for n_points, _ in self.storages.storage_from.iter_results( f"""SELECT COUNT(ST_NPOINTS(spots.geometry)), buildings.osm_id FROM OSM_SPOTS as spots, OSM_Building_polygons as buildings WHERE ST_Intersects(buildings.geometry, spots.geometry) diff --git a/setup.py b/setup.py index 2bcd095..9fa49bb 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ setup( description="Rabotnik instance for the OBM Buildings calculations.", license="AGPLv3+", install_requires=[ - "rabotnik@https://git.gfz-potsdam.de/dynamicexposure/rabotnik/rabotnik/-/archive/master/rabotnik-master.zip" # noqa: E501 + "rabotnik@git+https://git.gfz-potsdam.de/dynamicexposure/rabotnik/rabotnik.git" # noqa: E501 ], extras_require={ "tests": tests_require, diff --git a/tests/conftest.py b/tests/conftest.py index fad8731..d4ec532 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,39 +27,25 @@ def pytest_configure(config): @pytest.fixture -@pytest.mark.asyncio -async def mock_storage(): - storage = rabotnik.Rabotnik.get_storage("mockeddb") - storage.connect() - yield storage - - -@pytest.fixture -@pytest.mark.asyncio -async def storage_contributor(pytestconfig): +def storage_contributor(pytestconfig): storage_configuration = pytestconfig.getoption("storage_contributor") - - storage = rabotnik.Rabotnik.get_storage("postgresql") - await storage.connect(config_file=storage_configuration) + storage = rabotnik.storages.deserialize_storage(storage_configuration) + storage.connect() yield storage storage.disconnect() - await storage.pool.wait_closed() @pytest.fixture -@pytest.mark.asyncio -async def storage_consumer(pytestconfig): +def storage_consumer(pytestconfig): storage_configuration = pytestconfig.getoption("storage_consumer") - - storage = rabotnik.Rabotnik.get_storage("postgresql") - await storage.connect(config_file=storage_configuration) + storage = rabotnik.storages.deserialize_storage(storage_configuration) + storage.connect() yield storage storage.disconnect() - await storage.pool.wait_closed() def pytest_collection_modifyitems(config, items): diff --git a/tests/test_get_building.py b/tests/test_get_building.py index 7cc3cfc..646904f 100644 --- a/tests/test_get_building.py +++ b/tests/test_get_building.py @@ -3,21 +3,20 @@ from rabotnikobm.rules.get_building import GetBuilding, upsert @pytest.mark.requires_storage -@pytest.mark.asyncio -async def test_upsert(storage_contributor): +def test_upsert(storage_contributor): table = "obm_buildings" osm_id = 0 test_data = (osm_id, "POLYGON((0 0, 1 0, 1 1, 0 0))") test_data_update = (osm_id, "POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))") - await upsert(storage_contributor, table=table, data=test_data) - data_out = await storage_contributor.get_results( + upsert(storage_contributor, table=table, data=test_data) + data_out = storage_contributor.get_results( f"SELECT * FROM {table} WHERE osm_id = {osm_id};" ) - await upsert(storage_contributor, table=table, data=test_data_update) - data_out_updated = await storage_contributor.get_results( + upsert(storage_contributor, table=table, data=test_data_update) + data_out_updated = storage_contributor.get_results( f"SELECT * FROM {table} WHERE osm_id = {osm_id};" ) assert len(data_out) == 1 @@ -25,14 +24,13 @@ async def test_upsert(storage_contributor): @pytest.mark.requires_storage -@pytest.mark.asyncio -async def test_get_building(storage_consumer, storage_contributor): +def test_get_building(): - rule = GetBuilding(storage_from=storage_consumer, storage_to=storage_contributor) + rule = GetBuilding() building_id = 848970775 - await rule.evaluate(payload={"building_id": building_id}) - data_out = await storage_contributor.get_results( + rule.evaluate(building_id=building_id) + data_out = rule.storages.storage_to.get_results( f"SELECT * FROM obm_buildings WHERE osm_id = {building_id};" ) diff --git a/tests/test_get_building_land_use.py b/tests/test_get_building_land_use.py index 112c10d..8386592 100644 --- a/tests/test_get_building_land_use.py +++ b/tests/test_get_building_land_use.py @@ -26,8 +26,8 @@ from rabotnikobm.rules.get_building_land_use import ( @pytest.mark.requires_storage @pytest.mark.asyncio async def test_get_points_in_building(storage_consumer): - rule = GetBuildingLandUse(storage=storage_consumer) + rule = GetBuildingLandUse() payload = {"building_id": 193799} - result = [result async for result in rule.evaluate(payload=payload)] + result = [result async for result in rule.evaluate(**payload)] assert result == [("forest",)] diff --git a/tests/test_get_floorspace.py b/tests/test_get_floorspace.py index d8d5c11..a224002 100644 --- a/tests/test_get_floorspace.py +++ b/tests/test_get_floorspace.py @@ -26,11 +26,11 @@ from rabotnikobm.rules.get_floorspace import ( @pytest.mark.requires_storage @pytest.mark.asyncio async def test_get_floorspace(storage_consumer, storage_contributor): - rule = GetFloorspace(storage=storage_consumer, storage_target=storage_contributor) + rule = GetFloorspace() osm_id = 848970775 payload = {"building_id": osm_id} - await rule.evaluate(payload=payload) + await rule.evaluate(**payload) data_out = await storage_contributor.get_results( f"SELECT osm_id, floorspace FROM obm_buildings WHERE osm_id = {osm_id};" ) diff --git a/tests/test_get_points_in_building.py b/tests/test_get_points_in_building.py index 2f18c22..5e430db 100644 --- a/tests/test_get_points_in_building.py +++ b/tests/test_get_points_in_building.py @@ -25,9 +25,9 @@ from rabotnikobm.rules.get_points_in_building import ( @pytest.mark.requires_storage @pytest.mark.asyncio -async def test_get_points_in_building(storage_consumer): - rule = GetPointsInBuilding(storage=storage_consumer) +async def test_get_points_in_building(): + rule = GetPointsInBuilding() payload = {"building_id": -12412829} - result = [nbuildings async for nbuildings in rule.evaluate(payload=payload)] + result = [nbuildings async for nbuildings in rule.evaluate(**payload)] assert result == [8] -- GitLab