sql.py 6.67 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2

3
4
# py_tools_ds - A collection of geospatial data analysis tools that simplify standard
# operations when handling geospatial raster and vector data as well as projections.
5
#
6
7
8
9
# Copyright (C) 2016-2021
# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam,
#   Germany (https://www.gfz-potsdam.de/)
10
11
12
13
14
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
15
16
17
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
18
#
19
#   http://www.apache.org/licenses/LICENSE-2.0
20
#
21
22
23
24
25
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
26

Daniel Scheffler's avatar
Daniel Scheffler committed
27
28
29
30
31
32
33
34
35
__author__ = 'Daniel Scheffler'

import sqlite3
import os
from typing import Union  # noqa F401  # flake8 issue
import csv


def data_DB_updater(gms_obj_dict, path_db):
36
    # type: (dict, str) -> None
Daniel Scheffler's avatar
Daniel Scheffler committed
37
    """Update the table "scenes_proc" or "mgrs_tiles_proc within a postgreSQL or an SQL database
Daniel Scheffler's avatar
Daniel Scheffler committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    according to the given dictionary of a GMS object.

    :param gms_obj_dict:    <dict> a copy of the dictionary of the respective GMS object
    """
    assert isinstance(gms_obj_dict, dict), 'The input for data_DB_updater() has to be a dictionary.'

    def list2str(list2convert): return ''.join([str(val) for val in list2convert])

    if not os.path.isfile(path_db):
        print('No internal database found. Creating a new one...')
    connection = sqlite3.connect(path_db)
    cursor = connection.cursor()
    fullColumnList = ['job_ID', 'job_CPUs', 'image_type', 'satellite', 'sensor', 'subsystem', 'sensormode',
                      'acquisition_date', 'entity_ID', 'georef', 'proc_level', 'LayerBandsAssignment',
                      'path_procdata']
    cursor.execute('''CREATE TABLE IF NOT EXISTS processed_data (%s)''' % ', '.join(fullColumnList))
    currentColumnList = [i[1] for i in cursor.execute("PRAGMA table_info('processed_data')").fetchall()]
    missingColumns = [col for col in fullColumnList if col not in currentColumnList]
    if missingColumns:  # automatic adding of missing columns
        cursor.execute('''CREATE TABLE IF NOT EXISTS processed_data_temp (%s)''' % ', '.join(fullColumnList))
        cursor.execute("SELECT " + ','.join(currentColumnList) + " FROM processed_data")
        [cursor.execute("INSERT INTO processed_data_temp (%(cols)s) VALUES (%(vals)s)" % {'cols': ','.join(
            currentColumnList), 'vals': ','.join(['?'] * len(currentColumnList))}, row) for row in
         cursor.fetchall()]
        cursor.execute("DROP TABLE processed_data")
        cursor.execute("ALTER TABLE processed_data_temp RENAME TO processed_data")
    cursor.execute("SELECT EXISTS(SELECT 1 FROM processed_data WHERE entity_ID=? AND sensor=? AND subsystem=?)",
                   [gms_obj_dict['entity_ID'], gms_obj_dict['sensor'], gms_obj_dict['subsystem']])
    if cursor.fetchone()[0] == 0:  # create new entry
        new_record = [gms_obj_dict[key] for key in fullColumnList]
        new_record = [(''.join([str(val[li]) for li in range(len(val))])) if isinstance(val, list) else val
                      for val in new_record]  # e.g. converts list of LayerBandsAssignment to string
        cursor.execute("INSERT INTO processed_data VALUES (%s)" % ','.join(['?'] * len(new_record)), new_record)
    else:  # udate existing entry
        values2update = [gms_obj_dict[key] for key in
                         ['job_ID', 'job_CPUs', 'proc_level', 'path_procdata', 'LayerBandsAssignment']]
        values2update = [(''.join([str(val[li]) for li in range(len(val))])) if isinstance(val, list) else val
                         for val in values2update]  # e.g. converts list of LayerBandsAssignment to string
        connection.execute("UPDATE processed_data set job_ID=?, job_CPUs=?, proc_level=?,path_procdata=?, \
                            LayerBandsAssignment=? WHERE entity_ID=? AND sensor=? AND subsystem=?",
                           values2update + [gms_obj_dict['entity_ID']] + [gms_obj_dict['sensor'],
                                                                          gms_obj_dict['subsystem']])


def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0):
    # type: (str,str,list,dict,int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
84
    """Query an SQL database for the given parameters.
Daniel Scheffler's avatar
Daniel Scheffler committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

    :param path_db:         <str> the physical path of the SQL database on disk
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
    """
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
    assert isinstance(records2fetch, int), \
        "get_info_from_SQLdb: Expected an integer for the argument 'records2return'. Got %s" % type(records2fetch)
    if not os.path.isfile(path_db):
        return 'database connection fault'
    connection = sqlite3.connect(path_db)
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join(["%s=?" % (list(cond_dict.keys())[i]) for i in range(len(cond_dict))])
    cursor.execute("SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition, list(cond_dict.values()))
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
    cursor.close()
    connection.close()
    return records2return


def SQL_DB_to_csv(path_db):
    if not os.path.exists(path_db) or not os.path.getsize(path_db) > 0:
        print('No database conversion to CSV performed, because DB does not exist or DB is empty.')
    else:
        connection = sqlite3.connect(path_db)
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM processed_data")

117
        with open(os.path.join(os.path.dirname(path_db), 'data_DB.csv'), 'w') as csvfile:
Daniel Scheffler's avatar
Daniel Scheffler committed
118
119
120
            csvwriter = csv.writer(csvfile)
            csvwriter.writerow([i[0] for i in cursor.description])
            csvwriter.writerows(cursor)