Commit 83726704 authored by Marius Kriegerowski's avatar Marius Kriegerowski Committed by Felix Delattre
Browse files

Adopted celery and async changes in Rabotnik

parent 976aefde
Pipeline #38650 passed with stage
in 2 minutes and 11 seconds
image: python:3-buster
image: python:3.9-buster
before_script:
- python3 -V
......
......@@ -35,10 +35,7 @@ logger = logging.getLogger(__name__)
# Initiate the rabotnik
rabotnik = Rabotnik("rabotnik-obm")
# Celery entrypoint
celery = rabotnik.processor # noqa
rabotnik = Rabotnik()
async def connected_message_bus(config_message_bus):
......@@ -51,44 +48,25 @@ async def connected_message_bus(config_message_bus):
return message_bus
async def connected_storages(config_consumer, config_producer):
# Get a storage connection to `osm_replication` (source)
storage_consumer = rabotnik.get_storage("postgresql")
await storage_consumer.connect(config_file=config_consumer)
# Get a storage connection to `obm_buildings` (receiver)
storage_contributor = rabotnik.get_storage("postgresql")
await storage_contributor.connect(config_file=config_producer)
return storage_consumer, storage_contributor
async def start_rabotnik_obm(
message_bus, storage_consumer, storage_contributor, n_processes_max
):
async def start_rabotnik_obm(message_bus):
rules = [
GetBuilding(storage_consumer, storage_contributor),
GetFloorspace(storage_consumer, storage_contributor),
GetBuildingLandUse(storage_consumer),
GetPointsInBuilding(storage_consumer),
GetBuilding(),
GetFloorspace(),
GetBuildingLandUse(),
GetPointsInBuilding(),
]
rules = Assembly(rules=rules, n_processes_max=n_processes_max)
rules = Assembly(rules=rules)
await message_bus.subscribe("building", rules.run)
async def start_rabotnik(args):
storage_consumer, storage_contributor = await connected_storages(
args.config_consumer, args.config_producer
)
logger.info("start rabotnik")
message_bus = await connected_message_bus(args.config_message_bus)
await start_rabotnik_obm(
message_bus, storage_consumer, storage_contributor, args.n_processes_max
)
await start_rabotnik_obm(message_bus)
def main():
......@@ -101,20 +79,6 @@ def main():
help="increase output verbosity (e.g., -vv is more than -v)",
)
parser.add_argument(
"-cp",
"--config-producer",
type=str,
help="Producer storage configuration",
)
parser.add_argument(
"-cc",
"--config-consumer",
type=str,
help="Consumer storage configuration",
)
parser.add_argument(
"-cmb",
"--config-message-bus",
......@@ -123,13 +87,9 @@ def main():
)
parser.add_argument(
"-np",
"--n-processes-max",
default=1,
type=int,
help="Max number of assemblies to run in parallel",
"--start-celery-worker",
action="store_true",
)
args = parser.parse_args()
default_log_level = logging.WARNING
......@@ -143,10 +103,19 @@ def main():
logger.addHandler(stream_handler)
if args.start_celery_worker:
argv = [
"worker",
"--loglevel=DEBUG",
"--concurrency=10",
]
rabotnik.processor.worker_main(argv)
print("... started celery worker")
sys.exit()
if not args.config_message_bus:
sys.exit("Missing parameter -cmb")
loop = asyncio.get_event_loop()
loop.create_task(start_rabotnik(args))
loop.run_forever()
if __name__ == "__main__":
main()
......@@ -24,10 +24,10 @@ from rabotnik.storages.base import StorageBase
logger = logging.getLogger(__name__)
async def upsert(storage: StorageBase, table: str, data: tuple):
def upsert(storage: StorageBase, table: str, data: tuple):
"""Insert `data` into `storage` and update if `data` is already present."""
osm_id, geometry = data
await storage.execute(
storage.execute(
f"INSERT INTO {table} VALUES (%s, %s) ON CONFLICT (osm_id) DO UPDATE SET geometry = %s",
(osm_id, geometry, geometry),
)
......@@ -36,22 +36,19 @@ async def upsert(storage: StorageBase, table: str, data: tuple):
class GetBuilding(Rule):
"""A rule to copy a building entry from a source database to a destination database."""
def __init__(self, storage_from, storage_to):
self.storage_from = storage_from
self.storage_to = storage_to
@Rule.app.task(bind=True, base=Rule)
def evaluate(self, building_id):
async def evaluate(self, payload: dict):
building_id = payload["building_id"]
logger.info("called building task")
table_from = "osm_building_polygons"
table_to = "obm_buildings"
# Pull the building data from storage_from
for row in await self.storage_from.get_results(
for row in self.storages.storage_from.get_results(
f"SELECT osm_id, geometry FROM {table_from} WHERE osm_id={building_id}"
):
# Push the building data to storage_to
await upsert(self.storage_to, table_to, row)
upsert(self.storages.storage_to, table_to, row)
logger.info("finished evaluating %s", building_id)
......@@ -16,7 +16,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
from typing import Optional
from rabotnik import Rule
......@@ -28,13 +27,9 @@ class GetBuildingLandUse(Rule):
"""A rule to get the land use of a building. If more than one land
use is found, returns None."""
def __init__(self, storage):
self.storage = storage
async def evaluate(self, payload: dict) -> Optional[str]:
building_id = payload["building_id"]
land_use = await self.storage.expect_one(
@Rule.app.task(bind=True, base=Rule)
def evaluate(self, building_id):
land_use = self.storages.storage_from.expect_one(
f"""SELECT land.class
FROM osm_building_relations AS building, osm_lands AS land
WHERE {building_id} = building.id
......
......@@ -7,9 +7,9 @@ from rabotnik import Rule # pylint: disable=import-error
logger = logging.getLogger(__name__)
async def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float):
def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float):
"""Insert data into `storage` and update if data is already present."""
await storage.execute(
storage.execute(
"INSERT INTO obm_buildings (osm_id, floorspace) VALUES (%s, %s) "
"ON CONFLICT (osm_id) DO UPDATE SET floorspace = %s",
(osm_id, floorspace, floorspace),
......@@ -17,18 +17,11 @@ async def upsert(storage: RabotnikStorage, osm_id: int, floorspace: float):
class GetFloorspace(Rule):
def __init__(self, storage: RabotnikStorage, storage_target: RabotnikStorage):
"""Calculate floorspace of buildings.
def __init__(self):
"""Calculate floorspace of buildings."""
Args:
storage: Storage serving building data.
storage_target: Storage to fill in result.
"""
self.storage = storage
self.storage_target = storage_target
async def evaluate(self, payload: dict) -> None:
building_id = payload["building_id"]
@Rule.app.task(bind=True, base=Rule)
def evaluate(self, building_id):
logger.debug("evaluating %s", building_id)
query_floorspace = f"""
......@@ -46,10 +39,10 @@ class GetFloorspace(Rule):
AND osm_id = {building_id};
"""
floorspace = await self.storage.expect_one(query_floorspace)
floorspace = self.storages.storage_from.expect_one(query_floorspace)
logger.info("building %s floorspace: %s.", building_id, floorspace)
if floorspace is None:
return
await upsert(self.storage_target, building_id, floorspace)
upsert(self.storages.storage_to, building_id, floorspace)
......@@ -25,15 +25,12 @@ logger = logging.getLogger(__name__)
class GetPointsInBuilding(Rule):
"""A rule to get the number of points within a building."""
def __init__(self, storage):
self.storage = storage
async def evaluate(self, payload: dict) -> int:
building_id = payload["building_id"]
@Rule.app.task(bind=True, base=Rule)
def evaluate(self, building_id):
logger.debug("evaluating %s", building_id)
n_points_total = 0
async for n_points, _ in self.storage.iter_results(
for n_points, _ in self.storages.storage_from.iter_results(
f"""SELECT COUNT(ST_NPOINTS(spots.geometry)), buildings.osm_id
FROM OSM_SPOTS as spots, OSM_Building_polygons as buildings
WHERE ST_Intersects(buildings.geometry, spots.geometry)
......
......@@ -27,7 +27,7 @@ setup(
description="Rabotnik instance for the OBM Buildings calculations.",
license="AGPLv3+",
install_requires=[
"rabotnik@https://git.gfz-potsdam.de/dynamicexposure/rabotnik/rabotnik/-/archive/master/rabotnik-master.zip" # noqa: E501
"rabotnik@git+https://git.gfz-potsdam.de/dynamicexposure/rabotnik/rabotnik.git" # noqa: E501
],
extras_require={
"tests": tests_require,
......
......@@ -27,39 +27,25 @@ def pytest_configure(config):
@pytest.fixture
@pytest.mark.asyncio
async def mock_storage():
storage = rabotnik.Rabotnik.get_storage("mockeddb")
storage.connect()
yield storage
@pytest.fixture
@pytest.mark.asyncio
async def storage_contributor(pytestconfig):
def storage_contributor(pytestconfig):
storage_configuration = pytestconfig.getoption("storage_contributor")
storage = rabotnik.Rabotnik.get_storage("postgresql")
await storage.connect(config_file=storage_configuration)
storage = rabotnik.storages.deserialize_storage(storage_configuration)
storage.connect()
yield storage
storage.disconnect()
await storage.pool.wait_closed()
@pytest.fixture
@pytest.mark.asyncio
async def storage_consumer(pytestconfig):
def storage_consumer(pytestconfig):
storage_configuration = pytestconfig.getoption("storage_consumer")
storage = rabotnik.Rabotnik.get_storage("postgresql")
await storage.connect(config_file=storage_configuration)
storage = rabotnik.storages.deserialize_storage(storage_configuration)
storage.connect()
yield storage
storage.disconnect()
await storage.pool.wait_closed()
def pytest_collection_modifyitems(config, items):
......
......@@ -3,21 +3,20 @@ from rabotnikobm.rules.get_building import GetBuilding, upsert
@pytest.mark.requires_storage
@pytest.mark.asyncio
async def test_upsert(storage_contributor):
def test_upsert(storage_contributor):
table = "obm_buildings"
osm_id = 0
test_data = (osm_id, "POLYGON((0 0, 1 0, 1 1, 0 0))")
test_data_update = (osm_id, "POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))")
await upsert(storage_contributor, table=table, data=test_data)
data_out = await storage_contributor.get_results(
upsert(storage_contributor, table=table, data=test_data)
data_out = storage_contributor.get_results(
f"SELECT * FROM {table} WHERE osm_id = {osm_id};"
)
await upsert(storage_contributor, table=table, data=test_data_update)
data_out_updated = await storage_contributor.get_results(
upsert(storage_contributor, table=table, data=test_data_update)
data_out_updated = storage_contributor.get_results(
f"SELECT * FROM {table} WHERE osm_id = {osm_id};"
)
assert len(data_out) == 1
......@@ -25,14 +24,13 @@ async def test_upsert(storage_contributor):
@pytest.mark.requires_storage
@pytest.mark.asyncio
async def test_get_building(storage_consumer, storage_contributor):
def test_get_building():
rule = GetBuilding(storage_from=storage_consumer, storage_to=storage_contributor)
rule = GetBuilding()
building_id = 848970775
await rule.evaluate(payload={"building_id": building_id})
data_out = await storage_contributor.get_results(
rule.evaluate(building_id=building_id)
data_out = rule.storages.storage_to.get_results(
f"SELECT * FROM obm_buildings WHERE osm_id = {building_id};"
)
......
......@@ -26,8 +26,8 @@ from rabotnikobm.rules.get_building_land_use import (
@pytest.mark.requires_storage
@pytest.mark.asyncio
async def test_get_points_in_building(storage_consumer):
rule = GetBuildingLandUse(storage=storage_consumer)
rule = GetBuildingLandUse()
payload = {"building_id": 193799}
result = [result async for result in rule.evaluate(payload=payload)]
result = [result async for result in rule.evaluate(**payload)]
assert result == [("forest",)]
......@@ -26,11 +26,11 @@ from rabotnikobm.rules.get_floorspace import (
@pytest.mark.requires_storage
@pytest.mark.asyncio
async def test_get_floorspace(storage_consumer, storage_contributor):
rule = GetFloorspace(storage=storage_consumer, storage_target=storage_contributor)
rule = GetFloorspace()
osm_id = 848970775
payload = {"building_id": osm_id}
await rule.evaluate(payload=payload)
await rule.evaluate(**payload)
data_out = await storage_contributor.get_results(
f"SELECT osm_id, floorspace FROM obm_buildings WHERE osm_id = {osm_id};"
)
......
......@@ -25,9 +25,9 @@ from rabotnikobm.rules.get_points_in_building import (
@pytest.mark.requires_storage
@pytest.mark.asyncio
async def test_get_points_in_building(storage_consumer):
rule = GetPointsInBuilding(storage=storage_consumer)
async def test_get_points_in_building():
rule = GetPointsInBuilding()
payload = {"building_id": -12412829}
result = [nbuildings async for nbuildings in rule.evaluate(payload=payload)]
result = [nbuildings async for nbuildings in rule.evaluate(**payload)]
assert result == [8]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment