Commit b67b6c29 authored by Marius Kriegerowski's avatar Marius Kriegerowski
Browse files

add signal

parent a12776b8
Pipeline #16348 failed with stage
in 38 seconds
import pytest
import asyncio # noqa
from the_engine import signal
import logging
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_database_signal(event_loop, celery_app, celery_worker):
logger.info('loop %s' % event_loop)
dispatcher = signal.Dispatcher(
loop=event_loop, username='test', password='test')
@signal.task_slot('testSignal')
def function_test(payload):
'''This is testing docstring'''
logger.warning('called me!')
return payload
# tasks = []
# for i in range(3):
# tasks.append(function_test.delay(payload=str(i)))
# for i, t in enumerate(tasks):
# result = t.get()
# assert result == str(i)
assert list(dispatcher.callbacks.keys()).count('testSignal') == 1
signal.emit('testSignal', payload={'a': 'b'})
await asyncio.sleep(1)
@pytest.mark.asyncio
async def test_dispatcher_singelton():
dispatcher = signal.Dispatcher()
dispatcher_2 = signal.Dispatcher()
assert dispatcher.instance is dispatcher_2.instance
from .tasks import DatabaseTask # noqa
from .celery_app import app
from collections import defaultdict
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1
import logging
import json
import asyncio
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('Dispatcher')
class Dispatcher(object):
''' A signal dispatcher singelton
Existing throughout the lifetime of the engine.
- Keep track of registered signals and emit these.
- Check and validate signals
'''
class __Dispatcher:
def __init__(self, mqtt_broker='localhost', username=None,
password=None, loop=None):
self.username = username or 'test'
self.password = password or 'test'
self.mqtt_broker = mqtt_broker
self.callbacks = defaultdict(list)
self.client = MQTTClient()
self._connected = False
self._lock_connect = None
async def connect(self): # noqa
if self.username and self.password:
credentials = '%s:%s@' % (self.username, self.password)
logger.info('using mqtt credentials %s:%s' % (
self.username, self.password))
else:
credentials = ''
logger.info('no mqtt broker credentials provided')
await self.client.connect('mqtt://%s%s:1883' % (
credentials, self.mqtt_broker))
self._connected = True
async def connected(self):
self._lock_connect = self._lock_connect or asyncio.Lock()
async with self._lock_connect:
if self._connected:
return True
else:
await self.connect()
return True
async def listen(self):
await self.connected()
while True:
message = await self.client.deliver_message()
logger.warning(message)
packet = message.publish_packet
topic = packet.variable_header.topic_name
payload = packet.payload.data
await self.inform_listeners(topic, payload)
async def register(self, signal, func):
await self.connected()
logger.info('register %s on %s' % (signal, func))
await self.client.subscribe([(signal, QOS_1), ])
self.callbacks[signal].append(func)
logger.info('register %s on %s done' % (signal, func))
async def inform_listeners(self, signal, payload):
logger.info('dispatching %s - %s' % (signal, payload))
for func in self.signal.get('signal'):
func('payload')
async def emit(self, signal, payload):
logger.warning('emitting signal %s | payload %s' % (
signal, payload))
payload = {'signal': signal, 'payload': payload}
payload = json.dumps(payload).encode()
await self.client.publish(
topic='signal',
message=payload)
def __del__(self):
try:
self.client.unssubscribe(list(self.callbacks.keys()))
self.client.disconnect()
logger.info('unsubscribed and disconnected')
except AttributeError:
pass
instance = None
def __init__(self, *args, loop=None, **kwargs):
self.loop = loop
if not Dispatcher.instance:
Dispatcher.instance = Dispatcher.__Dispatcher(
*args, loop=loop, **kwargs)
else:
logger.info('Dispatcher already setup')
def emit(self, signal, payload):
return asyncio.ensure_future(
self.instance.emit(signal, payload), loop=self.loop)
def register(self, signal, func):
logger.info('register %s on %s' % (signal, func))
return asyncio.ensure_future(
self.instance.register(signal, func), loop=self.loop)
async def connect(self):
await self.instance.connect()
def __getattr__(self, name):
return getattr(self.instance, name)
def __str__(self):
return str(self.instance.callbacks)
def emit(signal, payload):
Dispatcher().emit(signal, payload)
class task_slot(object):
def __init__(self, signal, loop=None):
self.signal = signal
self._dispatcher = Dispatcher(loop=loop)
self._dispatcher.register(signal, self)
def __call__(self, func):
# @app.task(base=DatabaseTask)
@app.task
def _registered(*args, **kwargs):
return func(*args, **kwargs)
return _registered
if __name__ == '__main__':
@task_slot('testSignal')
def function_test(payload):
print('ddd %s' % payload)
function_test.delay(payload=None)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment