From eadc4c916279c4f69293adc53e687d09bee0c4bd Mon Sep 17 00:00:00 2001
From: Laurens <laurens@gfz-potsdam.de>
Date: Thu, 12 Jan 2023 16:49:59 +0100
Subject: [PATCH 1/2] Add Uploader class

---
 allin/uploader.py      | 249 +++++++++++++++++++++++++++++++++++++++++
 setup.py               |   2 +
 tests/test_uploader.py |  25 +++++
 3 files changed, 276 insertions(+)
 create mode 100644 allin/uploader.py
 create mode 100644 tests/test_uploader.py

diff --git a/allin/uploader.py b/allin/uploader.py
new file mode 100644
index 0000000..7ca7aeb
--- /dev/null
+++ b/allin/uploader.py
@@ -0,0 +1,249 @@
+import logging
+import copy
+import rasterio
+import glob
+
+from datetime import datetime
+from typing import Iterable, Tuple, List
+from rasterio.features import shapes
+from rasterio.windows import transform
+from shapely.geometry import shape
+from databaselib import PostGISDatabase
+from numpy import ndarray
+
+
+logger = logging.getLogger(__name__)
+
+
+class Uploader:
+    """
+    The `Uploader` class takes one or a set of raster files, converts these to vector format
+    and uploads the vector features to a database.
+
+    Args:
+        database_config (dictionary):
+            Dictionary containing the database connection credentials:
+            `host`: host name of the server
+            `dbname`: name of the database
+            `port`: port to access the server
+            `username`: name of the database user
+            `password`: password of the database user
+        crs (optional, default: 4326):
+            EPSG code of the Coordinate Reference System (CRS).
+        raster_band (optional, default: 1):
+            The raster band that should be used to create vector features.
+
+    Attributes:
+        database (PostGISDatabase):
+            The database that the vector features are uploaded to.
+        default_crs (int):
+            EPSG code of the Coordinate Reference System (CRS), if no other is provided.
+        default_raster_band (int)
+            The raster band that should be used to create vector features, if no other is
+            provided.
+    """
+
+    def __init__(self, database_config, crs: int = 4326, raster_band: int = 1):
+        self.database = PostGISDatabase(**database_config)
+        self.default_crs = crs
+        self.default_raster_band = raster_band
+
+    def upload_raster_batch(
+        self, directory: str, extension: str, crs: int = None, raster_band: int = None
+    ):
+        """
+            Vectorizes multiple raster files after each other and uploads these to the database.
+
+            Args:
+                directory (str):
+        `           The directory where the raster files are located.
+                extension (str):
+                    The raster file extension (TIF, JPG, PNG)
+                crs (optional, int):
+                    EPSG code of the Coordinate Reference System (CRS) of the files.
+                raster_band (optional, int):
+                    The raster band that should be used to create vector features.
+        """
+
+        files = glob.glob(f"{directory}/**/*.{extension}")
+        for file in files:
+            self.upload_raster(file, crs, raster_band)
+
+    def upload_raster(self, filepath: str, crs: int = None, raster_band: int = None):
+        """
+        Vectorizes a raster file and uploads it to the database.
+
+        Args:
+            filepath:
+                The filepath to the raster file.
+            crs:
+                EPSG code of the Coordinate Reference System (CRS) of the file.
+            raster_band:
+        """
+
+        # Set default data if not provided
+        if crs is None:
+            crs = self.default_crs
+        if raster_band is None:
+            raster_band = self.default_raster_band
+
+        # Read raster file
+        dataset = self.read_raster(filepath)
+
+        # Read a slice of the raster file (window)
+        window_iter = self.read_windows(dataset, raster_band)
+        start_time = datetime.now()
+        n_geoms = 0
+
+        # Vectorize each window and upload the geometries that are found to the database
+        self.database.connect()
+        for i, geometries in enumerate(self.get_built_area(window_iter)):
+            n_geoms += len(geometries)
+            self.insert_geometries(filepath, geometries, crs)
+            if i % 1000 == 0 and i != 0:
+                self.database.connection.commit()
+        self.database.commit_and_close()
+
+        # Output some statistics of the processing of the file
+        total_time = (datetime.now() - start_time).total_seconds()
+        logger.info(
+            f"File {filepath} took {total_time:.1f} seconds.\n"
+            f"In total {n_geoms} geometries were found."
+        )
+
+    def get_built_area(self, windows: Iterable[Tuple]) -> List[List[str]]:
+        """
+        Transforms a slice of a raster file into WKT geometries.
+
+        Args:
+            windows (Iterable):
+                A iterable list of a window with its transform parameters. The window is a slice
+                of the raster that is used as input for the vectorization.
+
+        Returns:
+            A set of WKT geometries for each window.
+        """
+
+        # Vectorize each window and if there are geometries, return them.
+        for i, (window, window_transform) in enumerate(windows):
+            geometries = self.vectorize_image(window, window_transform)
+            if geometries:
+                yield geometries
+
+    def insert_geometries(self, filepath: str, geometries: List[str], crs: int):
+        """
+        A list of geometries is inserted into the database. The `filepath` is also inserted,
+        for easy removal of the data from one file.
+
+        Args:
+            filepath:
+                The filepath to the raster file.
+            geometries:
+                A list of WKT geometries of the vectorized features.
+            crs:
+                EPSG code of the Coordinate Reference System (CRS) of the file.
+        Returns:
+        """
+
+        vals = ",".join(
+            f"('{filepath}', " f"ST_Transform(ST_GeomFromText('{geom}', {crs}),4326)" f")"
+            for geom in geometries
+        )
+        sql_statement = f"""
+            INSERT INTO ghsl_vector(file, geom)
+            VALUES {vals}
+            """
+        self.database.cursor.execute(sql_statement)
+
+    @staticmethod
+    def vectorize_image(image: ndarray, transform: rasterio.transform) -> List[str]:
+        """
+        Reclassifies a numpy ndarray and transforms the desired data points into WKT geometries.
+
+        Args:
+            image:
+                The raster band data in a Numpy `ndarray`.
+            transform:
+                The transform parameters that can be used for geo-referencing.
+
+        Returns:
+            A list of WKT geometries of the vectorized features.
+        """
+
+        # All building classifications are between 11 and 25. All other values are either
+        # vegetation, roads, water or no data. Therefore, first all other values are set to 0.
+        # Afterwards, all values that are not 0 are set to 1. Now, all buildings, regardless of
+        # there classification (residential/non-residential; various building heights) will be
+        # merged into one polygon.
+        image[image >= 26] = 0
+        image[image <= 10] = 0
+        image[image > 0] = 1
+        logger.debug("reclassified image")
+
+        # A mask is set for all values that are not 0. If these are not set, two sets of
+        # polygons are created: one set for all polygons that are 0 and one for all that are 1.
+        # We are only interested in the pixels that contain buildings.
+        mask = copy.copy(image)
+        mask[image == 0] = False
+        mask[image != 0] = True
+
+        # If all values are `False`, no buildings are found in the image.
+        if mask.sum() == 0:
+            return []
+
+        # The image is vectorized using the image, the mask and the transform parameters.
+        features = (
+            {"properties": {"raster_val": v}, "geometry": s}
+            for i, (s, v) in enumerate(shapes(image, mask=mask, transform=transform))
+        )
+
+        # The features are converted to shapely geometries. The WKT representation of these
+        # geometries is returned.
+        wkt_geometries = []
+        for feature in features:
+            # If, for any reason, the raster value is not 1, an error is raised.
+            if feature["properties"]["raster_val"] != 1.0:
+                raise ValueError("Vectorized features can only have raster value 1.0")
+            wkt_geometry = shape(feature["geometry"]).wkt
+            wkt_geometries.append(wkt_geometry)
+        return wkt_geometries
+
+    @staticmethod
+    def read_raster(filepath: str) -> rasterio.DatasetReader:
+        """
+        Reads a geo-referenced raster file with rasterio.
+
+        Args:
+            filepath:
+                The filepath to the raster file.
+
+        returns:
+            A DatasetReader of the raster file with metadata included.
+        """
+
+        with rasterio.Env():
+            with rasterio.open(filepath) as dataset:
+                return dataset
+
+    @staticmethod
+    def read_windows(dataset: rasterio.DatasetReader, raster_band: int):
+        """
+        Slices a geo-referenced raster into many rasterio `Windows` that are returned
+        one-by-one.
+
+        Args:
+            dataset (DatasetReader):
+                The raster dataset with metadata included.
+            raster_band (optional, int):
+                The raster band that should be used to create vector features.
+
+
+        returns:
+            An numpy `ndarray` with the data of the image and the image transform for
+            geo-referencing.
+        """
+
+        for pixel_set, window in dataset.block_windows(raster_band):
+            image = dataset.read(raster_band, window=window)
+            image_transform = transform(window, dataset.transform)
+            yield image, image_transform
diff --git a/setup.py b/setup.py
index bf7192d..c6f418c 100644
--- a/setup.py
+++ b/setup.py
@@ -27,6 +27,8 @@ setup(
     license="AGPLv3+",
     install_requires=[
         "geopandas",
+        "rasterio",
+        "numpy",
         "databaselib==1.0.0",
     ],
     extras_require={
diff --git a/tests/test_uploader.py b/tests/test_uploader.py
new file mode 100644
index 0000000..7cb39ef
--- /dev/null
+++ b/tests/test_uploader.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2023:
+#   Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or (at
+# your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
+# General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see http://www.gnu.org/licenses/.
+
+import logging
+
+logger = logging.getLogger()
+
+
+def test_func():
+    pass
-- 
GitLab


From 22749761ffb42f1d9d78d48604c46d592c735f8c Mon Sep 17 00:00:00 2001
From: Laurens Oostwegel <laurens@gfz-potsdam.de>
Date: Fri, 23 Feb 2024 09:16:25 +0100
Subject: [PATCH 2/2] VectorUploader

---
 allin/vectoruploader.py | 232 ++++++++++++++++++++++++++++++++++++++++
 setup.py                |   2 +-
 2 files changed, 233 insertions(+), 1 deletion(-)
 create mode 100644 allin/vectoruploader.py

diff --git a/allin/vectoruploader.py b/allin/vectoruploader.py
new file mode 100644
index 0000000..a47d426
--- /dev/null
+++ b/allin/vectoruploader.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2024:
+#   Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or (at
+# your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
+# General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see http://www.gnu.org/licenses/.
+
+
+import csv
+import copy
+import json
+import gzip
+import logging
+import time
+from abc import ABC, abstractmethod
+from multiprocessing import Queue, Pool
+from queue import Empty
+
+from shapely.geometry import shape
+
+
+logger = logging.getLogger(__name__)
+
+
+class AbstractVectorUploader(ABC):
+    """
+    An abstract base class for uploading vector data into a database using multiprocessing.
+    """
+
+    items_per_batch = 5000
+
+    def __init__(self, db):
+        """
+        Initializes the uploader with a database connection.
+
+        Args:
+            db (AbstractDatabase):
+                The database connection.
+        """
+
+        self.db = db
+
+    def multiprocess(self, file_paths, num_processors, items_per_batch=None):
+        """
+        Initiates multiprocessing for uploading vector data.
+
+        Args:
+            file_paths (list):
+                List of file paths to upload.
+            num_processors (int):
+                Number of processors to use for multiprocessing.
+        """
+
+        logger.info(f"Starting multiprocessing on {num_processors} processors.")
+
+        if items_per_batch:
+            self.items_per_batch = items_per_batch
+
+        # Fill the queue
+        queue = Queue()
+        for file_path in file_paths:
+            queue.put(file_path)
+
+        # Initialize the pool and start processing
+        pool = Pool(num_processors, self.worker, (queue,))
+
+        # Wait until the queue is empty
+        while queue.qsize() > 0:
+            logger.info(f"Length queue: {queue.qsize()}")
+            time.sleep(10)
+
+        # Close the pool to prevent the pool from accepting new tasks
+        pool.close()
+        # Wait for all processes to finished
+        pool.join()
+        logger.info("All items are processed.")
+
+    def worker(self, queue):
+        """
+        Worker function for multiprocessing.
+
+        Args:
+            queue (Queue):
+                Queue for storing file paths.
+        """
+
+        db = copy.copy(self.db)
+        db.connect()
+        while True:
+            try:
+                file_path = queue.get(timeout=5)
+            # If no objects are found in the queue, the queue raises an `Empty` exception,
+            # therefore the process can be stopped
+            except Empty:
+                break
+
+            for values in self.read_batch(file_path, self.items_per_batch):
+                if not values:
+                    continue
+                db.cursor.execute(self.get_insert_sql_statement(values))
+
+            # Only commit per file, so if there is an error it is easier to spot.
+            db.connection.commit()
+            logger.info(f"inserted {file_path} into database")
+
+    @staticmethod
+    @abstractmethod
+    def read_batch(file_path, items_per_batch):
+        """
+        Abstract method for reading data in batches.
+
+        Args:
+            file_path (str):
+                Path to the file containing Microsoft vector data.
+            items_per_batch (int, default: 5000):
+                Number of items to read per batch.
+        """
+
+        raise NotImplementedError
+
+    @staticmethod
+    @abstractmethod
+    def get_insert_sql_statement(values):
+        """
+        Abstract method for generating SQL insert statements.
+
+        Args:
+            values (str):
+                Values to be inserted into the database.
+        """
+
+        raise NotImplementedError
+
+
+class GoogleUploader(AbstractVectorUploader):
+    """
+    A class for uploading Google vector data into a database.
+    """
+
+    @staticmethod
+    def read_batch(file_path, items_per_batch=5000):
+        """
+        Reads Google vector data in batches.
+
+        Args:
+            file_path (str):
+                Path to the file containing Microsoft vector data.
+            items_per_batch (int, default: 5000):
+                Number of items to read per batch.
+        """
+
+        rows = []
+        with gzip.open(file_path, "rt") as csvfile:
+            reader = csv.DictReader(csvfile)
+            for idx, row in enumerate(reader):
+                geom = f"ST_GeomFromText('{row['geometry']}', 4326)"
+                area = row["area_in_meters"]
+                confidence = row["confidence"]
+                google_id = row["full_plus_code"]
+                rows.append(f"({geom},{area},{confidence},'{google_id}')")
+                if idx >= items_per_batch:
+                    yield ",".join(rows)
+                    rows = []
+            yield rows
+
+    @staticmethod
+    def get_insert_sql_statement(values):
+        """
+        Generates SQL insert statements for Google vector data.
+
+        Args:
+            values (str):
+                Values to be inserted into the database.
+        """
+
+        return f"""
+            INSERT INTO google_building (geom, area, confidence, google_id)
+            VALUES {values}
+            """
+
+
+class MicrosoftUploader(AbstractVectorUploader):
+    @staticmethod
+    def read_batch(file_path, items_per_batch=5000):
+        """
+        Reads Microsoft vector data in batches.
+
+        Args:
+            file_path (str):
+                Path to the file containing Microsoft vector data.
+            items_per_batch (int, default: 5000):
+                Number of items to read per batch.
+        """
+
+        rows = []
+        with gzip.open(file_path, "r") as jsonfile:
+            reader = jsonfile.readlines()
+            for idx, row in enumerate(reader):
+                data = json.loads(row)
+                height = data["properties"]["height"]
+                geometry = f"(ST_GeomFromText('{shape(data['geometry'])}', 4326)"
+                rows.append(f"{geometry},{height})")
+                if idx >= items_per_batch:
+                    yield ",".join(rows)
+                    rows = []
+            yield rows
+
+    @staticmethod
+    def get_insert_sql_statement(values):
+        """
+        Generates SQL insert statements for Microsoft vector data.
+
+        Args:
+            values (str):
+                Values to be inserted into the database.
+        """
+
+        return f"""
+            INSERT INTO microsoft_building (geom, height)
+            VALUES {values}
+            """
diff --git a/setup.py b/setup.py
index c6f418c..86c71ad 100644
--- a/setup.py
+++ b/setup.py
@@ -29,7 +29,7 @@ setup(
         "geopandas",
         "rasterio",
         "numpy",
-        "databaselib==1.0.0",
+        "databaselib>=1.0.0",
     ],
     extras_require={
         "tests": tests_require,
-- 
GitLab