spatial_index_mediator.py 16.2 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6
7
8
9
10
11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13
14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15
16
17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18
19
20
21
22
23
24
25
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
26
27
28

import socket
import struct
29
30
31
import os
import re
import warnings
32
from datetime import datetime, timedelta
33
from shapely.geometry import Polygon
34
import pytz
35
from logging import getLogger
36
from typing import List  # noqa F401  # flake8 issue
37
38

from ..misc.exceptions import GMSEnvironmentError
39
from ..misc.logging import close_logger
40

41

42
class SpatialIndexMediatorServer:
43
    def __init__(self, rootDir, logger=None):
44
        self.rootDir = rootDir
45
        self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server.sh')
46
        self.logger = logger or getLogger('SpatialIndexMediatorServer')
47
48
49

        # validate
        if not os.path.isfile(self.path_idxMedSrv):
50
51
52
53
54
            self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server')

            if not os.path.isfile(self.path_idxMedSrv):
                raise GMSEnvironmentError('File path of index mediator server does not exist at %s.'
                                          % self.path_idxMedSrv)
55

56
57
58
59
60
61
62
63
64
65
66
67
    def __getstate__(self):
        """Defines how the attributes of SpatialIndexMediatorServer are pickled."""

        if self.logger not in [None, 'not set']:
            close_logger(self.logger)
            self.logger = None
        return self.__dict__

    def __del__(self):
        close_logger(self.logger)
        self.logger = None

68
69
    @property
    def is_running(self):
70
        return self.status['running']
71
72

    @property
73
74
    def process_id(self):
        return self.status['process_id']
75
76
77
78
79
80

    @property
    def status(self):
        """Check server status.

        :return running(bool):    running or not?
81
        :return process_id(int):
82
83
84
        """
        outputStr = self._communicate('status')

85
        # decrypt status
86
        running = 'is running' in outputStr
87
88

        # get PID
89
        _process_id = re.search(r'with pid ([\d]*)', outputStr)
90
91
92
93
        if _process_id and _process_id.group(1):
            process_id = int(_process_id.group(1))
        else:
            process_id = None
94

95
        return {'running': running, 'process_id': process_id}
96

97
    def start(self):  # FIXME can be executed twice without a message that server is already running
98
        outputStr = self._communicate('start')
99
        if outputStr == 'success' and self.is_running:
100
            self.logger.info('Spatial Index Mediator Server started successfully.')
101
102
            return 'started'
        else:
103
            if outputStr != 'success':
104
105
                self.logger.warning("\nAttempt to start Spatial Index Mediator Server failed with message '%s'!"
                                    % outputStr.replace('\n', ''))
106
            else:
107
108
109
                self.logger.warning("\nCommunication to Spatial Index Mediator Server was successful but "
                                    "the server is still not running. Returned message was: %s"
                                    % outputStr.replace('\n', ''))
110
111
112
113

    def stop(self):
        outputStr = self._communicate('stop')

114
        if outputStr == 'success' or re.search(r'index-mediator-server stopped', outputStr, re.I):
115
116
117
            return 'stopped'
        else:
            warnings.warn("\nStopping Spatial Index Mediator Server failed with message '%s'!"
118
                          % outputStr.replace('\n', ''))
119
120
121

    def restart(self):
        outputStr = self._communicate('restart')
122
        if outputStr == 'success' and self.is_running:
123
124
125
            return 'restarted'
        else:
            warnings.warn("\nRestarting Spatial Index Mediator Server failed with message '%s'!"
126
                          % outputStr.replace('\n', ''))
127
128
129
130

    def _communicate(self, controller_cmd):
        curdir = os.path.abspath(os.curdir)
        os.chdir(self.rootDir)
131
132
133
        # FIXME workaround: otherwise subcall_with_output hangs at proc.communicate (waiting for EOF forever)
        no_stdout = no_stderr = controller_cmd in ['start', 'restart']
        # no_stdout = no_stderr = None, None
134
        from ..misc.helper_functions import subcall_with_output
135
        output, exitcode, err = subcall_with_output('bash %s %s' % (self.path_idxMedSrv,
136
137
138
139
                                                                    controller_cmd), no_stdout, no_stderr)
        os.chdir(curdir)

        if exitcode:
140
            raise Exception(err)
141
142
143
        else:
            if output:
                return output.decode('UTF-8')
144
145
            else:
                # FIXME actually there should be always an output (also with controller commands 'start' and 'restart'
146
147
148
                return 'success'


149
150
151
152
class SpatialIndexMediator:
    FULL_SCENE_QUERY_MSG = 3
    """ message value for a full scene query message """

153
    def __init__(self, host="localhost", port=8654, timeout=5.0, retries=10):
154
155
156
        """
        Establishes a connection to the spatial index mediator server.

157
        :param host:    host address of the index mediator server (default "localhost")
158
        :param port:    port number of the index mediator server (default 8654)
159
160
        :param timeout: timeout as float in seconds (default 5.0 sec)
        :param retries: number of retries in case of timeout
161
162
163
        """
        self.host = host
        self.port = port
164
        self.timeout = timeout
165
        self.retries = retries
166

167
168
169
170
    @staticmethod
    def __deriveSeasonCode(refDate, maxDaysDelta):
        if refDate is None or maxDaysDelta is None:
            return 0
171

172
        delta = timedelta(days=maxDaysDelta)
173

174
175
        startMonth = (refDate - delta).month - 1
        endMonth = (refDate + delta).month - 1
176

177
        seasonCode = 0
178

179
180
181
182
        for x in range(12):
            month = (startMonth + x) % 12

            seasonCode |= 1 << month
183

184
185
            if month == endMonth:
                break
186

187
        return seasonCode
188

Daniel Eggert's avatar
Daniel Eggert committed
189
    def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid,
Daniel Eggert's avatar
Daniel Eggert committed
190
                                   dayNight=0, refDate=None, maxDaysDelta=None):
191
        # type: (list, datetime, datetime, float, float, int, int, datetime, int) -> List[Scene]
192
193
194
195
196
197
198
        """
        Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the
        given envelope

        :param envelope:        list of left, right and low, up coordinates (in lat/lon wgs84) of the region of
                                interest in the form of (min_lon, max_lon, min_lat, max_lat),
                                e.g. envelope = (10.0, 16.0, 50.0, 60.0)
199
200
        :param timeStart:       start timestamp of the relevant timerange as datetime instance,
                                e.g., datetime(2015, 1, 1)
201
202
203
204
        :param timeEnd:         end timestamp of the relevant timerange as datetime instance, e.g. datetime(2016, 6, 15)
        :param minCloudCover:   minimum cloudcover in percent, e.g. 12, will return scenes with cloudcover >= 12% only
        :param maxCloudCover:   maximum cloudcover in percent, e.g. 23, will return scenes with cloudcover <= 23% only
        :param datasetid:       datasetid of the dataset in question, e.g. 104 for Landsat-8
Daniel Eggert's avatar
Daniel Eggert committed
205
        :param dayNight         day/night indicator, with (0 = both, 1 = day, 2 = night)
206
207
208
209
210
        :param refDate:         reference timestamp as datetime instance, e.g. datetime(2015, 1, 1) [optional]
        :param maxDaysDelta:    maximum allowed number of days the target scenes might be apart from the given refDate
                                [optional]
        """
        scenes = []
211

212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
        for i in range(self.retries):
            try:
                filterTimerange = not (refDate is None or maxDaysDelta is None)

                # prepare buffer
                # numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1
                b = bytearray(60)

                # pack msg header and envelope
                offset = 0
                struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
                offset += 33

                # pack the dates
                struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour,
                                 timeStart.minute, timeStart.second, 0)
                offset += 8
                struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour,
                                 timeEnd.minute, timeEnd.second, 0)
                offset += 8

                # derive season code
                seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)

                # pack the rest
                #  TODO: send unconstraint min/max proclevel values
                struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127,
                                 dayNight)

                # get connection and lock the channel
                con = Connection(self.host, self.port, self.timeout)

                # send the buffer
                con.socket.sendall(b)

                # receive the response
                # read first byte, indicating the response type, must match full scene query msg
                if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
                    raise EnvironmentError('Bad Protocol')

                # now read the number of bytes that follow
                numBytes = con.recvInt()
                b = bytearray(numBytes)
                offset = 0

                # read all data from the channel and unlock it
                con.recvBuffer(b, numBytes)

                # we received the entire message - return the connection to the global pool
                con.disconnect()

                # interpret received data
                # extract datasetid and number of scenes
                dataset = struct.unpack_from('> h', b, offset)[0]
                offset += 2
                if dataset != datasetid:
                    raise EnvironmentError('Bad Protocol')

                numScenes = struct.unpack_from('> i', b, offset)[0]
                offset += 4

                scenes = []

                for _x in range(numScenes):
                    # [0] id (4 byte)
                    # [1] year (2 byte)
                    # [2] month (1 byte)
                    # [3] day (1 byte)
                    # [4] hour (1 byte)
                    # [5] minute (1 byte)
                    # [6] second (1 byte)
                    # [7] empty (1 byte)
                    # [8] cloud cover (1 byte)
                    # [9] proc_level (1 byte) caution: this gets not yet updated in the index
                    # [10] day/night (1 byte)
                    # [11] length of bounds array (1 byte)
                    scenedata = struct.unpack_from('> i h 10b', b, offset)
                    offset += 16

                    # print(scenedata)
                    timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5],
                                         scenedata[6])

                    # read bounds
                    numBounds = scenedata[11]
                    fmt = "> {0}d".format(numBounds)
                    bounds = struct.unpack_from(fmt, b, offset)
                    offset += numBounds * 8

                    # check ref date
                    if filterTimerange:
                        if timestamp.month == 2 and timestamp.day == 29:
                            # deal with feb.29th
                            timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC)
                        else:
                            timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC)

                        if abs(refDate - timestampInRefYear).days > maxDaysDelta:
                            # skip scene
                            continue

                    # create scene
                    scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds))
315

316
                break
317

318
319
            except socket.timeout:
                if i < self.retries - 1:
320
                    continue
321
322
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
323

324
325
326
327
328
329
            except struct.error:
                if i < self.retries - 1:
                    continue
                else:
                    raise

330
331
332
333
334
        return scenes


class Connection:
    """ Connection to the spatial index mediator server """
335

336
337
338
339
340
    HELLO_MSG = 1
    """ message value for a "hello" message """

    DISCONNECT_MSG = 6
    """ message value for a disconnect message """
341

342
    def __init__(self, host, port, timeout):
343
        # connect to index mediator server
344
345
346
347
        try:
            self.socket = socket.create_connection((host, port), timeout)
        except ConnectionRefusedError:
            raise ConnectionRefusedError('The spatial index mediator server refused the connection!')
348

349
350
351
352
353
354
355
        # send hello and confirm response
        if not self.__greet():
            raise EnvironmentError('Bad protocol')

    def __greet(self):
        # send hello byte
        self.writeByte(self.HELLO_MSG)
356

357
358
        # receive hello byte echo
        response = self.recvByte()
359

360
        return response == self.HELLO_MSG
361

362
363
364
    def writeByte(self, byte):
        # send byte
        self.socket.sendall(struct.pack('b', byte))
365

366
367
    def recvByte(self):
        return struct.unpack('b', self.socket.recv(1))[0]
368

369
370
    def recvInt(self):
        return struct.unpack('>i', self.socket.recv(4))[0]
371

372
373
374
375
376
377
378
    def recvBuffer(self, buffer, numBytes):
        toread = numBytes
        view = memoryview(buffer)
        while toread:
            nbytes = self.socket.recv_into(view, toread)
            view = view[nbytes:]
            toread -= nbytes
379

380
    def disconnect(self):
381
382
383
        """Closes the connection to the index mediator server.

        No further communication, like placing queries will be possible.
384
385
386
        """
        self.writeByte(self.DISCONNECT_MSG)
        self.socket.close()
387

388

389
390
391
class Scene:
    """Scene Metadata class"""

392
    def __init__(self, sceneid, acquisitiondate, cloudcover, proclevel, daynight, bounds):
393
394
395
396
        """
        :param sceneid:         database sceneid, e.g. 26366229
        :param acquisitiondate: acquisition date of the scene as datetime instance, e.g. 2016-03-25 10:15:26
        :param cloudcover:      cloudcover value of the scene, e.g. 11
Daniel Eggert's avatar
Daniel Eggert committed
397
        :param daynight:        day/night indicator (0=unknown, 1=day, 2=night)
398
399
400
        :param bounds:          scene bounds as list of lat/lon wgs84 coordinates (lon1, lat1, lon2, lat2, ...),
                                e.g. (10.00604, 49.19385, 7.45638, 49.64513, 8.13739, 51.3515, 10.77705, 50.89307)
        """
401
        self.sceneid = sceneid
402
        self.acquisitiondate = acquisitiondate
403
404
        self.cloudcover = cloudcover
        self.proclevel = proclevel
405
        self.daynight = daynight
406
407
408
        self.bounds = bounds
        tempList = list(bounds) + [None] * 2
        self.coordsLonLat = [tempList[n:n + 2] for n in range(0, len(bounds), 2)]
409
410
411
412
413
414
415

        # set validated (!) polygon
        poly = Polygon(self.coordsLonLat)
        if not poly.is_valid:
            poly = poly.buffer(0)
            assert poly.is_valid
        self.polyLonLat = poly