diff --git a/setup.py b/setup.py index db9c20ac95b14af10fa4e515a9c61af43d16badb..7e20bcdc3f38a1d9bcc0ab318b7e2626f1b50152 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ setup( "aiofiles", "osmdiff>=0.1.9", "rabotnik@git+https://git.gfz-potsdam.de/dynamicexposure/rabotnik/rabotnik.git", + "databaselib@git+https://git.gfz-potsdam.de/dynamicexposure/coding/database-lib.git", ], extras_require={ "tests": tests_require, diff --git a/spearhead/__init__.py b/spearhead/__init__.py index ed8a5fed366b3dc00e0cf033a65f89440f1c976e..8e08f27f8513b3cc8545335951ce1d2e017fb9b7 100644 --- a/spearhead/__init__.py +++ b/spearhead/__init__.py @@ -17,7 +17,8 @@ # along with this program. If not, see http://www.gnu.org/licenses/. -from .diff_analyzer import DiffAnalyzer +from .diff_analyzer import DiffAnalyzer, OSMDiffAnalyzer, OBMDiffAnalyzer +from .spearhead import Spearhead -__all__ = ["DiffAnalyzer"] +__all__ = ["Spearhead", "DiffAnalyzer", "OSMDiffAnalyzer", "OBMDiffAnalyzer"] diff --git a/spearhead/configure.py b/spearhead/configure.py index c8f1d62056f291d2bdde596ac353e24baad55ae3..c3b6515af32b29f2eb6dab11e596c3d4fd412a18 100644 --- a/spearhead/configure.py +++ b/spearhead/configure.py @@ -31,14 +31,18 @@ class Configuration: Args: config_file (str): Filepath to the configuration file that defines the following items: - overpass_base_url (str): + method (str): + Either `buildings`, to send all OSM ids of buildings from Overpass that + have been changed to the MessageBus, or `tiles`, to send all Quadkeys that + have been changed in the `obm_buildings` database. + overpass_base_url (str, optional): Overpass URL that is used to query the `AugmentedDiffs`. state_pathname (str): state_pathname of the file that defines the current `AugmentedDiff` state. state_filename (str): Filename of the file that defines the current `AugmentedDiff` state. - rabotnik (dict): - Credentials and host used to connect to the `Rabotnik` instance. + database (dict, optional): + Credentials to the `obm_buildings` database. """ def __init__(self, config_file: str = "config.yml") -> None: @@ -48,14 +52,20 @@ class Configuration: config = yaml.load(f.read(), Loader=yaml.FullLoader) try: - self.overpass_base_url = config["overpass_base_url"] + self.method = config["method"] + if self.method == "building": + self.overpass_base_url = config["overpass_base_url"] + else: + self.database_config = { + "host": config["database"]["host"], + "port": config["database"]["port"], + "dbname": config["database"]["dbname"], + "username": config["database"]["username"], + "password": config["database"]["password"], + } + self.table_name = config["database"]["table_name"] self.state_pathname = config["state_pathname"] self.state_filename = config["state_filename"] - self.rabotnik = { - "hostname": config["rabotnik"]["hostname"], - "username": config["rabotnik"]["username"], - "password": config["rabotnik"]["password"], - } except KeyError as e: logger.info("Error: The variable %s hasn't been set in the config file." % str(e)) sys.exit(1) diff --git a/spearhead/diff_analyzer.py b/spearhead/diff_analyzer.py index e756859c6ebc528a85f0599bc9d3c1d0d19eb8e1..1b94ead8e8564733b0980973a2cdc5555141914a 100644 --- a/spearhead/diff_analyzer.py +++ b/spearhead/diff_analyzer.py @@ -21,8 +21,8 @@ import sys from typing import List from osmdiff import AugmentedDiff, osm +from databaselib import PostGISDatabase -from rabotnik import MessageBus from spearhead.configure import Configuration from spearhead.file_handler import FileHandler @@ -31,6 +31,39 @@ logger.addHandler(logging.StreamHandler(sys.stdout)) class DiffAnalyzer: + """ + The `DiffAnalyzer` is the abstract class that has helper functions to read and update + the state of a `DiffAnalyzer`. The state can be either a timestamp, or the current state + of OSM `AugmentedDiffs`. + """ + + def __init__(self, config): + self.file_handler = FileHandler(config) + + async def update_state(self, state: int) -> None: + """ + Update the state that is logged in the `Configuration` to the current state. + + Args: + state (int): + Current state. + """ + + await self.file_handler.update_state_file(state) + + async def get_state(self) -> int: + """ + Get the state that is logged in the `Configuration` to the current state. + + Args: + state (int): + Current state. + """ + + return await self.file_handler.read_state_file() + + +class OSMDiffAnalyzer(DiffAnalyzer): """ This class contains the logic for obtaining and interpreting the OpenStreetMap's `AugmentedDiffs`. @@ -41,19 +74,15 @@ class DiffAnalyzer: config (Configuration): The configuration that includes the state_pathname and filename of the `AugmentedDiff` state file. - message_bus (MessageBus): - Connection instance to the `Rabotnik` MessageBus. """ def __init__( self, diff_provider: AugmentedDiff, config: Configuration, - message_bus: MessageBus, ): + super().__init__(config) self.diff_provider = diff_provider - self.file_handler = FileHandler(config) - self.message_bus = message_bus async def get_list_of_new_diffs(self) -> List[int]: """ @@ -66,7 +95,7 @@ class DiffAnalyzer: self.diff_provider.get_state() new_state = self.diff_provider.sequence_number - old_state = await self.file_handler.read_state_file() + old_state = await self.get_state() return list(range(old_state, new_state)) async def get_augmented_diff(self) -> AugmentedDiff: @@ -82,18 +111,25 @@ class DiffAnalyzer: self.diff_provider.retrieve(clear_cache=True) return self.diff_provider - async def update_state(self, state: int) -> None: + async def get_buildings(self, state: int): """ - Update the state that is logged in the `Configuration` to the current state. + Collect the OSM IDs of buildings objects that have changed in a given augmented diff + based on an `AugmentedDiff` state Args: state (int): Current `AugmentedDiff` state. """ - await self.file_handler.update_state_file(state) + # Obtain and read augmented diff from Overpass API + self.diff_provider.sequence_number = state + diff = await self.get_augmented_diff() + + # Get identifiers of all buildings that have changed + return await self._get_buildings(diff) - async def get_buildings(self, augmented_diff: AugmentedDiff) -> List[int]: + @staticmethod + async def _get_buildings(augmented_diff: AugmentedDiff) -> List[int]: """ Collect the OSM IDs of buildings objects that have changed in a given augmented diff. @@ -138,14 +174,53 @@ class DiffAnalyzer: return osm_ids - async def send_buildings_to_message_bus(self, osm_ids: list) -> None: + +class OBMDiffAnalyzer(DiffAnalyzer): + """ + This class contains the logic for detecting changes in the `obm_buildings` database and + sending the changes on to the `Rabotnik MessageBus`. + + Args: + config (Configuration): + The configuration that includes the state_pathname and filename of the + `AugmentedDiff` state file. + """ + + def __init__(self, config: Configuration): + super().__init__(config) + self.db = PostGISDatabase(**config.database_config) + self.table_name = config.table_name + + def get_quadkeys(self, state: int): + """ + Get a list of changed Quadkeys based on a state (timestamp). + + Args: + state (int): + Timestamp in seconds since the epoch. + """ + + self.db.connect() + sql_statement = self.get_sql_statement(state) + self.db.cursor.execute(sql_statement) + quadkeys = self.db.cursor.fetchall() + self.db.close() + if quadkeys: + return list(map(lambda q: q[0], quadkeys)) + else: + return None + + def get_sql_statement(self, timestamp: int): """ - Send a list of building IDs to the message bus. + Formulates the SQL query that returns all changed Quadkeys based on a timestamp. Args: - osm_ids (list): - List of OSM IDs of buildings that are sent to the `MessageBus`. + timestamp (int): + Timestamp in seconds since the epoch. """ - for number in osm_ids: - await self.message_bus.send(message="building", payload={"building_id": number}) + return f""" + SELECT DISTINCT(quadkey) + FROM {self.table_name} + WHERE last_update >= to_timestamp ({timestamp}) + """ diff --git a/spearhead/spearhead.py b/spearhead/spearhead.py index 173c6ebf167686e1a017a0353307f898aae8e4ab..b5f898485c82f2da63791a1b3d2e166e0af0d768 100644 --- a/spearhead/spearhead.py +++ b/spearhead/spearhead.py @@ -17,6 +17,7 @@ # along with this program. If not, see http://www.gnu.org/licenses/. import asyncio +import datetime import logging import sys @@ -24,10 +25,10 @@ import osmdiff from rabotnik import MessageBus from spearhead.configure import Configuration -from spearhead import DiffAnalyzer +from spearhead import OSMDiffAnalyzer, OBMDiffAnalyzer logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.INFO) logger.addHandler(logging.StreamHandler(sys.stdout)) @@ -46,58 +47,143 @@ async def async_main(): logger.info("spearhead started - it can be stopped with CTRL+c") config = Configuration("config.yml") - await spearhead(config) + spearhead = Spearhead(config) + await spearhead.run() -async def spearhead(config: Configuration): +class Spearhead: """ - Obtain a list of buildings that have changed since last check. + Checks an instance (OSM; the `obm_buildings` database; the `obm_tiles` database) on updates + and sends the updated IDs (OSM IDs or Quadkeys) to the Rabotnik MessageBus for processing. Args: - config_file (str): + config (Configuration): A configuration file that defines the following items: - overpass_base_url (str): + method (str): + The method can be `building`, to send all OSM ids of buildings from Overpass + that have been changed to the MessageBus, `quadkey`, to send all Quadkeys + that have been changed in the `obm_buildings` database for processing of + the built area of the Quadkey, or `exposure` to send all Quadkeys that have + been changed in the `obm_tiles` database for processing of the exposure + inside the Quadkeys. + overpass_base_url (str, optional): Overpass URL that is used to query the `AugmentedDiffs`. state_pathname (str): state_pathname of the file that defines the current `AugmentedDiff` state. state_filename (str): Filename of the file that defines the current `AugmentedDiff` state. - rabotnik (dict): - Credentials and host used to connect to the `Rabotnik` instance. + database (dict, optional): + Credentials to the `obm_buildings` database. """ - diff_provider = osmdiff.AugmentedDiff() - diff_provider.base_url = config.overpass_base_url - logger.info("Using Overpass API: " + diff_provider.base_url) + def __init__(self, config: Configuration): + self.config = config - message_bus = MessageBus() - await message_bus.connect() + async def run(self): + """ + Starts the Spearhead process. Based on the `method` in the Configuration, the + `spearhead_obm` function or the `spearhead_tiles` function is called. + """ - diff_analyzer = DiffAnalyzer(diff_provider, config, message_bus) + if self.config.method == "building": + callback = self.spearhead_osm + elif self.config.method == "quadkey" or "exposure": + callback = self.spearhead_database + else: + logger.info("Method should be `building`, `quadkey`, or `exposure`.") + return - while True: try: + await callback() + except KeyboardInterrupt: + logger.info("spearhead stopped") + sys.exit() + + async def spearhead_osm(self): + """ + Obtain a list of OSM building IDs that have changed since last check and send them on + the `Rabotnik Messagebus`. + """ + + diff_provider = osmdiff.AugmentedDiff() + diff_provider.base_url = self.config.overpass_base_url + logger.info("Using Overpass API: " + diff_provider.base_url) + + message_bus = MessageBus() + await message_bus.connect() + + diff_analyzer = OSMDiffAnalyzer(diff_provider, self.config) + + while True: # Obtain available list of diffs diffs = await diff_analyzer.get_list_of_new_diffs() for diff_number in diffs: + try: + # Get identifiers of all buildings that have changed + osm_ids = await diff_analyzer.get_buildings(diff_number) + await self.send_buildings_to_message_bus(message_bus, "building", osm_ids) + except TimeoutError: + logger.info(f"TimeoutError on state {diff_number}.") - # Obtain and read augmented diff from Overpass API - diff_provider.sequence_number = diff_number - diff = await diff_analyzer.get_augmented_diff() - - # Get identifiers of all buildings that have changed - osm_ids = await diff_analyzer.get_buildings(diff) - - await diff_analyzer.file_handler.write_diff_report( - diff.sequence_number, osm_ids - ) - await diff_analyzer.send_buildings_to_message_bus(osm_ids) await diff_analyzer.update_state(diff_number + 1) # Wait until requesting more data await asyncio.sleep(60) - except KeyboardInterrupt: - logger.info("spearhead stopped") - sys.exit() + async def spearhead_database(self): + """ + Obtain a list of Quadkeys that have changed since last check and send them on the + `Rabotnik Messagebus`. + """ + + message_bus = MessageBus() + await message_bus.connect() + + diff_analyzer = OBMDiffAnalyzer(self.config) + while True: + state = await diff_analyzer.get_state() + new_state = int(datetime.datetime.now().timestamp()) + quadkeys = diff_analyzer.get_quadkeys(state) + + if quadkeys: + logger.info( + f"Fetched {len(quadkeys)} Quadkeys from " + f"{datetime.datetime.fromtimestamp(state)} to " + f"{datetime.datetime.fromtimestamp(new_state)}." + ) + await self.send_buildings_to_message_bus( + message_bus, self.config.method, quadkeys + ) + else: + logger.info( + f"No Quadkeys between " + f"{datetime.datetime.fromtimestamp(state)} to " + f"{datetime.datetime.fromtimestamp(new_state)}." + ) + await diff_analyzer.update_state(new_state) + await asyncio.sleep(60) + + @staticmethod + async def send_buildings_to_message_bus( + message_bus: MessageBus, message: str, ids: list + ) -> None: + """ + Send a list of building IDs to the message bus. + + Args: + message_bus (MessageBus): + Rabotnik MessageBus instance + message (str): + Message identifier sent to the MessageBus. It can be `building`, `quadkey` or + `exposure. + ids (list): + List of IDs of buildings or Quadkeys that are sent to the `MessageBus`. + """ + + for idx in ids: + await message_bus.send(message=message, payload={"building_id": idx}) + + +if __name__ == "__main__": + main() diff --git a/tests/config.yml b/tests/config.yml index 1287dcd05f11f13de4e6c6cc8be38a140058371d..cd476a591751e50cac3c546ee623561365c47cbd 100644 --- a/tests/config.yml +++ b/tests/config.yml @@ -1,7 +1,9 @@ overpass_base_url: https://overpass.openbuildingmap.org/api state_pathname: ./tests/data/ state_filename: state.txt -rabotnik: +method: building +database: hostname: localhost username: user password: password + table_name: test_table diff --git a/tests/test_configure.py b/tests/test_configure.py index 742814112d245569637a6302bf75f498df48399b..687f844c0644b21bac4cc07a53e0550d1205e1cb 100644 --- a/tests/test_configure.py +++ b/tests/test_configure.py @@ -28,4 +28,3 @@ def test_configuration_init(): config = Configuration("tests/config.yml") assert config assert config.state_filename == "state.txt" - assert config.rabotnik["hostname"] == "localhost" diff --git a/tests/test_diff_analyzer.py b/tests/test_diff_analyzer.py index 88ab590810c77938e11cf7dd4997f170abf298bc..30abf82abacf12e184e8e9093d1065e813994891 100644 --- a/tests/test_diff_analyzer.py +++ b/tests/test_diff_analyzer.py @@ -23,7 +23,7 @@ import random import pytest import pytest_asyncio -from spearhead.diff_analyzer import DiffAnalyzer +from spearhead.diff_analyzer import OSMDiffAnalyzer logger = logging.getLogger() @@ -36,8 +36,8 @@ async def augmented_diff(): @pytest.fixture -def diff_analyzer(diff_provider, config, message_bus): - return DiffAnalyzer(diff_provider, config, message_bus) +def diff_analyzer(diff_provider, config): + return OSMDiffAnalyzer(diff_provider, config) @pytest.mark.asyncio @@ -64,6 +64,6 @@ async def test_update_state(diff_analyzer): @pytest.mark.asyncio async def test_get_buildings(diff_analyzer, augmented_diff): - buildings = await diff_analyzer.get_buildings(augmented_diff) + buildings = await diff_analyzer._get_buildings(augmented_diff) assert isinstance(random.choice(buildings), int) assert len(buildings) == 356 diff --git a/tests/test_file_handler.py b/tests/test_file_handler.py index 84adb601740b17c07f3fa2e57abb8f1880503c5c..46bb4cf22e65d0135ecdcc46b5ddcf05c82f8ebb 100644 --- a/tests/test_file_handler.py +++ b/tests/test_file_handler.py @@ -44,15 +44,15 @@ async def test_read_state_file(file_handler): @pytest.mark.asyncio async def test_update_state_file(file_handler): - await file_handler.update_state_file("4749545") + await file_handler.update_state_file(4749545) state_number = await file_handler.read_state_file() assert state_number == 4749545 - await file_handler.update_state_file("4749548") + await file_handler.update_state_file(4749548) state_number = await file_handler.read_state_file() assert state_number == 4749548 @pytest.mark.asyncio async def test_write_diff_report(file_handler): - await file_handler.write_diff_report("4749545", ["337202175", "17837838", "-387388722"]) + await file_handler.write_diff_report(4749545, ["337202175", "17837838", "-387388722"]) os.remove(file_handler.config.state_pathname + "augmented-diff-4749545.txt")