Commit e7d44e18 authored by Daniel Eggert's avatar Daniel Eggert
Browse files

added ping thread to keep the server connection alive

parent 48b070cc
......@@ -5,6 +5,7 @@ import asyncio
import base64
import json
import websocket
import threading
class PulsarConfigKeys:
......@@ -35,6 +36,7 @@ class MessageType:
class PulsarMessageHandler:
MAX_CONNECTION_ATTEMPTS = 20
PULSAR_PING_INTERVAL = 60 # 1min
def __init__(self, pulsar_config, handle_request, handle_response=None):
self.pulsarConfig = pulsar_config
......@@ -73,6 +75,9 @@ class PulsarMessageHandler:
self.connectionAttempts += 1
self.subscription = self.open_socket(subscription=subscription_name)
if self.subscription:
self.start_ping_loop()
def disconnect(self):
# close consumer
if self.subscription:
......@@ -92,6 +97,23 @@ class PulsarMessageHandler:
self.producers.clear()
def _pulsar_ping(self, timeout, event: threading.Event):
while not event.wait(timeout):
if self.subscription:
try:
print('ping {}'.format(datetime.now()))
self.subscription.ping()
except Exception as e:
print("error in pulsar ping routine: {}".format(e))
break
def start_ping_loop(self):
event = threading.Event()
thread = threading.Thread(
target=self._pulsar_ping, args=(PulsarMessageHandler.PULSAR_PING_INTERVAL, event))
thread.setDaemon(True)
thread.start()
def wait_for_request(self):
# register request event handler
print('waiting for incoming request')
......
......@@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='demessaging',
version='0.0.5',
version='0.0.6',
description='basic back-end messaging module for the digital earth framework',
license='TBD',
packages=['demessaging'],
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment