From eadc4c916279c4f69293adc53e687d09bee0c4bd Mon Sep 17 00:00:00 2001 From: Laurens <laurens@gfz-potsdam.de> Date: Thu, 12 Jan 2023 16:49:59 +0100 Subject: [PATCH 1/2] Add Uploader class --- allin/uploader.py | 249 +++++++++++++++++++++++++++++++++++++++++ setup.py | 2 + tests/test_uploader.py | 25 +++++ 3 files changed, 276 insertions(+) create mode 100644 allin/uploader.py create mode 100644 tests/test_uploader.py diff --git a/allin/uploader.py b/allin/uploader.py new file mode 100644 index 0000000..7ca7aeb --- /dev/null +++ b/allin/uploader.py @@ -0,0 +1,249 @@ +import logging +import copy +import rasterio +import glob + +from datetime import datetime +from typing import Iterable, Tuple, List +from rasterio.features import shapes +from rasterio.windows import transform +from shapely.geometry import shape +from databaselib import PostGISDatabase +from numpy import ndarray + + +logger = logging.getLogger(__name__) + + +class Uploader: + """ + The `Uploader` class takes one or a set of raster files, converts these to vector format + and uploads the vector features to a database. + + Args: + database_config (dictionary): + Dictionary containing the database connection credentials: + `host`: host name of the server + `dbname`: name of the database + `port`: port to access the server + `username`: name of the database user + `password`: password of the database user + crs (optional, default: 4326): + EPSG code of the Coordinate Reference System (CRS). + raster_band (optional, default: 1): + The raster band that should be used to create vector features. + + Attributes: + database (PostGISDatabase): + The database that the vector features are uploaded to. + default_crs (int): + EPSG code of the Coordinate Reference System (CRS), if no other is provided. + default_raster_band (int) + The raster band that should be used to create vector features, if no other is + provided. + """ + + def __init__(self, database_config, crs: int = 4326, raster_band: int = 1): + self.database = PostGISDatabase(**database_config) + self.default_crs = crs + self.default_raster_band = raster_band + + def upload_raster_batch( + self, directory: str, extension: str, crs: int = None, raster_band: int = None + ): + """ + Vectorizes multiple raster files after each other and uploads these to the database. + + Args: + directory (str): + ` The directory where the raster files are located. + extension (str): + The raster file extension (TIF, JPG, PNG) + crs (optional, int): + EPSG code of the Coordinate Reference System (CRS) of the files. + raster_band (optional, int): + The raster band that should be used to create vector features. + """ + + files = glob.glob(f"{directory}/**/*.{extension}") + for file in files: + self.upload_raster(file, crs, raster_band) + + def upload_raster(self, filepath: str, crs: int = None, raster_band: int = None): + """ + Vectorizes a raster file and uploads it to the database. + + Args: + filepath: + The filepath to the raster file. + crs: + EPSG code of the Coordinate Reference System (CRS) of the file. + raster_band: + """ + + # Set default data if not provided + if crs is None: + crs = self.default_crs + if raster_band is None: + raster_band = self.default_raster_band + + # Read raster file + dataset = self.read_raster(filepath) + + # Read a slice of the raster file (window) + window_iter = self.read_windows(dataset, raster_band) + start_time = datetime.now() + n_geoms = 0 + + # Vectorize each window and upload the geometries that are found to the database + self.database.connect() + for i, geometries in enumerate(self.get_built_area(window_iter)): + n_geoms += len(geometries) + self.insert_geometries(filepath, geometries, crs) + if i % 1000 == 0 and i != 0: + self.database.connection.commit() + self.database.commit_and_close() + + # Output some statistics of the processing of the file + total_time = (datetime.now() - start_time).total_seconds() + logger.info( + f"File {filepath} took {total_time:.1f} seconds.\n" + f"In total {n_geoms} geometries were found." + ) + + def get_built_area(self, windows: Iterable[Tuple]) -> List[List[str]]: + """ + Transforms a slice of a raster file into WKT geometries. + + Args: + windows (Iterable): + A iterable list of a window with its transform parameters. The window is a slice + of the raster that is used as input for the vectorization. + + Returns: + A set of WKT geometries for each window. + """ + + # Vectorize each window and if there are geometries, return them. + for i, (window, window_transform) in enumerate(windows): + geometries = self.vectorize_image(window, window_transform) + if geometries: + yield geometries + + def insert_geometries(self, filepath: str, geometries: List[str], crs: int): + """ + A list of geometries is inserted into the database. The `filepath` is also inserted, + for easy removal of the data from one file. + + Args: + filepath: + The filepath to the raster file. + geometries: + A list of WKT geometries of the vectorized features. + crs: + EPSG code of the Coordinate Reference System (CRS) of the file. + Returns: + """ + + vals = ",".join( + f"('{filepath}', " f"ST_Transform(ST_GeomFromText('{geom}', {crs}),4326)" f")" + for geom in geometries + ) + sql_statement = f""" + INSERT INTO ghsl_vector(file, geom) + VALUES {vals} + """ + self.database.cursor.execute(sql_statement) + + @staticmethod + def vectorize_image(image: ndarray, transform: rasterio.transform) -> List[str]: + """ + Reclassifies a numpy ndarray and transforms the desired data points into WKT geometries. + + Args: + image: + The raster band data in a Numpy `ndarray`. + transform: + The transform parameters that can be used for geo-referencing. + + Returns: + A list of WKT geometries of the vectorized features. + """ + + # All building classifications are between 11 and 25. All other values are either + # vegetation, roads, water or no data. Therefore, first all other values are set to 0. + # Afterwards, all values that are not 0 are set to 1. Now, all buildings, regardless of + # there classification (residential/non-residential; various building heights) will be + # merged into one polygon. + image[image >= 26] = 0 + image[image <= 10] = 0 + image[image > 0] = 1 + logger.debug("reclassified image") + + # A mask is set for all values that are not 0. If these are not set, two sets of + # polygons are created: one set for all polygons that are 0 and one for all that are 1. + # We are only interested in the pixels that contain buildings. + mask = copy.copy(image) + mask[image == 0] = False + mask[image != 0] = True + + # If all values are `False`, no buildings are found in the image. + if mask.sum() == 0: + return [] + + # The image is vectorized using the image, the mask and the transform parameters. + features = ( + {"properties": {"raster_val": v}, "geometry": s} + for i, (s, v) in enumerate(shapes(image, mask=mask, transform=transform)) + ) + + # The features are converted to shapely geometries. The WKT representation of these + # geometries is returned. + wkt_geometries = [] + for feature in features: + # If, for any reason, the raster value is not 1, an error is raised. + if feature["properties"]["raster_val"] != 1.0: + raise ValueError("Vectorized features can only have raster value 1.0") + wkt_geometry = shape(feature["geometry"]).wkt + wkt_geometries.append(wkt_geometry) + return wkt_geometries + + @staticmethod + def read_raster(filepath: str) -> rasterio.DatasetReader: + """ + Reads a geo-referenced raster file with rasterio. + + Args: + filepath: + The filepath to the raster file. + + returns: + A DatasetReader of the raster file with metadata included. + """ + + with rasterio.Env(): + with rasterio.open(filepath) as dataset: + return dataset + + @staticmethod + def read_windows(dataset: rasterio.DatasetReader, raster_band: int): + """ + Slices a geo-referenced raster into many rasterio `Windows` that are returned + one-by-one. + + Args: + dataset (DatasetReader): + The raster dataset with metadata included. + raster_band (optional, int): + The raster band that should be used to create vector features. + + + returns: + An numpy `ndarray` with the data of the image and the image transform for + geo-referencing. + """ + + for pixel_set, window in dataset.block_windows(raster_band): + image = dataset.read(raster_band, window=window) + image_transform = transform(window, dataset.transform) + yield image, image_transform diff --git a/setup.py b/setup.py index bf7192d..c6f418c 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,8 @@ setup( license="AGPLv3+", install_requires=[ "geopandas", + "rasterio", + "numpy", "databaselib==1.0.0", ], extras_require={ diff --git a/tests/test_uploader.py b/tests/test_uploader.py new file mode 100644 index 0000000..7cb39ef --- /dev/null +++ b/tests/test_uploader.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2023: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import logging + +logger = logging.getLogger() + + +def test_func(): + pass -- GitLab From 22749761ffb42f1d9d78d48604c46d592c735f8c Mon Sep 17 00:00:00 2001 From: Laurens Oostwegel <laurens@gfz-potsdam.de> Date: Fri, 23 Feb 2024 09:16:25 +0100 Subject: [PATCH 2/2] VectorUploader --- allin/vectoruploader.py | 232 ++++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 2 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 allin/vectoruploader.py diff --git a/allin/vectoruploader.py b/allin/vectoruploader.py new file mode 100644 index 0000000..a47d426 --- /dev/null +++ b/allin/vectoruploader.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2024: +# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + + +import csv +import copy +import json +import gzip +import logging +import time +from abc import ABC, abstractmethod +from multiprocessing import Queue, Pool +from queue import Empty + +from shapely.geometry import shape + + +logger = logging.getLogger(__name__) + + +class AbstractVectorUploader(ABC): + """ + An abstract base class for uploading vector data into a database using multiprocessing. + """ + + items_per_batch = 5000 + + def __init__(self, db): + """ + Initializes the uploader with a database connection. + + Args: + db (AbstractDatabase): + The database connection. + """ + + self.db = db + + def multiprocess(self, file_paths, num_processors, items_per_batch=None): + """ + Initiates multiprocessing for uploading vector data. + + Args: + file_paths (list): + List of file paths to upload. + num_processors (int): + Number of processors to use for multiprocessing. + """ + + logger.info(f"Starting multiprocessing on {num_processors} processors.") + + if items_per_batch: + self.items_per_batch = items_per_batch + + # Fill the queue + queue = Queue() + for file_path in file_paths: + queue.put(file_path) + + # Initialize the pool and start processing + pool = Pool(num_processors, self.worker, (queue,)) + + # Wait until the queue is empty + while queue.qsize() > 0: + logger.info(f"Length queue: {queue.qsize()}") + time.sleep(10) + + # Close the pool to prevent the pool from accepting new tasks + pool.close() + # Wait for all processes to finished + pool.join() + logger.info("All items are processed.") + + def worker(self, queue): + """ + Worker function for multiprocessing. + + Args: + queue (Queue): + Queue for storing file paths. + """ + + db = copy.copy(self.db) + db.connect() + while True: + try: + file_path = queue.get(timeout=5) + # If no objects are found in the queue, the queue raises an `Empty` exception, + # therefore the process can be stopped + except Empty: + break + + for values in self.read_batch(file_path, self.items_per_batch): + if not values: + continue + db.cursor.execute(self.get_insert_sql_statement(values)) + + # Only commit per file, so if there is an error it is easier to spot. + db.connection.commit() + logger.info(f"inserted {file_path} into database") + + @staticmethod + @abstractmethod + def read_batch(file_path, items_per_batch): + """ + Abstract method for reading data in batches. + + Args: + file_path (str): + Path to the file containing Microsoft vector data. + items_per_batch (int, default: 5000): + Number of items to read per batch. + """ + + raise NotImplementedError + + @staticmethod + @abstractmethod + def get_insert_sql_statement(values): + """ + Abstract method for generating SQL insert statements. + + Args: + values (str): + Values to be inserted into the database. + """ + + raise NotImplementedError + + +class GoogleUploader(AbstractVectorUploader): + """ + A class for uploading Google vector data into a database. + """ + + @staticmethod + def read_batch(file_path, items_per_batch=5000): + """ + Reads Google vector data in batches. + + Args: + file_path (str): + Path to the file containing Microsoft vector data. + items_per_batch (int, default: 5000): + Number of items to read per batch. + """ + + rows = [] + with gzip.open(file_path, "rt") as csvfile: + reader = csv.DictReader(csvfile) + for idx, row in enumerate(reader): + geom = f"ST_GeomFromText('{row['geometry']}', 4326)" + area = row["area_in_meters"] + confidence = row["confidence"] + google_id = row["full_plus_code"] + rows.append(f"({geom},{area},{confidence},'{google_id}')") + if idx >= items_per_batch: + yield ",".join(rows) + rows = [] + yield rows + + @staticmethod + def get_insert_sql_statement(values): + """ + Generates SQL insert statements for Google vector data. + + Args: + values (str): + Values to be inserted into the database. + """ + + return f""" + INSERT INTO google_building (geom, area, confidence, google_id) + VALUES {values} + """ + + +class MicrosoftUploader(AbstractVectorUploader): + @staticmethod + def read_batch(file_path, items_per_batch=5000): + """ + Reads Microsoft vector data in batches. + + Args: + file_path (str): + Path to the file containing Microsoft vector data. + items_per_batch (int, default: 5000): + Number of items to read per batch. + """ + + rows = [] + with gzip.open(file_path, "r") as jsonfile: + reader = jsonfile.readlines() + for idx, row in enumerate(reader): + data = json.loads(row) + height = data["properties"]["height"] + geometry = f"(ST_GeomFromText('{shape(data['geometry'])}', 4326)" + rows.append(f"{geometry},{height})") + if idx >= items_per_batch: + yield ",".join(rows) + rows = [] + yield rows + + @staticmethod + def get_insert_sql_statement(values): + """ + Generates SQL insert statements for Microsoft vector data. + + Args: + values (str): + Values to be inserted into the database. + """ + + return f""" + INSERT INTO microsoft_building (geom, height) + VALUES {values} + """ diff --git a/setup.py b/setup.py index c6f418c..86c71ad 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ setup( "geopandas", "rasterio", "numpy", - "databaselib==1.0.0", + "databaselib>=1.0.0", ], extras_require={ "tests": tests_require, -- GitLab