Unverified Commit 719f0798 authored by Philipp Sommer's avatar Philipp Sommer
Browse files

include websocket header

parent 97350998
Pipeline #45273 passed with stage
in 1 minute and 7 seconds
......@@ -179,14 +179,13 @@ backend_config = ModuleConfig.parse_raw(
{
"messaging_config": {
"topic": "test-topic",
"header": {},
"max_workers": 4,
"queue_size": 4,
"max_payload_size": 512000,
"host": "localhost",
"port": "8080",
"persistent": "non-persistent",
"tenant": "public",
"namespace": "default"
"websocket_url": "ws://127.0.0.1:8000/ws/",
"producer_url": "",
"consumer_url": ""
}
}
"""
......
......@@ -130,6 +130,14 @@ def get_parser(
default=messaging_config["topic"],
)
connection_group.add_argument(
"--header",
help=desc("header", PulsarConfig),
metavar='{"authorization": "Token ..."}',
dest="messaging_config.header",
default=pulsar_config.header,
)
module_help = "Name of the backend module."
if module_name == UNKNOWN_MODULE:
module_help += " This option is required!"
......
......@@ -21,6 +21,7 @@ from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import (
BaseSettings,
Field,
Json,
PositiveInt,
root_validator,
validator,
......@@ -320,6 +321,10 @@ class BaseMessagingConfig(BaseSettings):
)
)
header: Union[Json[Dict[str, Any]], Dict[str, Any]] = Field( # type: ignore
default_factory=dict, description="Header parameters for the request"
)
max_workers: Optional[PositiveInt] = Field(
default=None,
description=(
......
......@@ -25,13 +25,13 @@ class WebsocketConnection(ABC):
return topic_name + "_" + get_random_letters(8)
def open_socket(
self, subscription: str = None, topic: str = None
self, subscription: str = None, topic: str = None, **connection_kws
) -> websocket.WebSocket:
topic_name = topic or self.pulsar_config.topic
topic_url = self.pulsar_config.get_topic_url(topic_name, subscription)
sock = websocket.create_connection(topic_url)
sock = websocket.create_connection(topic_url, **connection_kws)
if sock:
print("connection to {0} established".format(topic_url))
......
......@@ -80,7 +80,8 @@ class MessageConsumer(WebsocketConnection):
self.connection_attempts += 1
print("connection attempt {}".format(self.connection_attempts))
self.subscription = self.open_socket(
subscription=subscription_name
subscription=subscription_name,
header=self.pulsar_config.header,
)
if self.subscription:
......@@ -331,7 +332,9 @@ class MessageConsumer(WebsocketConnection):
producer = self.producers[response_topic]
else:
# no producer yet, create one
producer = self.open_socket(topic=response_topic)
producer = self.open_socket(
topic=response_topic, header=self.pulsar_config.header
)
self.producers[response_topic] = producer
# send the response
......
......@@ -73,11 +73,15 @@ class MessageProducer(WebsocketConnection):
# topic override if given
if topic is not None:
out_topic = topic
out_socket: WebSocket = self.open_socket(topic=out_topic)
out_socket: WebSocket = self.open_socket(
topic=out_topic, header=self.pulsar_config.header
)
# To be thread safe, we generate the response topic here
response_topic: str = self.generate_response_topic()
in_socket: WebSocket = self.open_socket(
subscription=self.subscription_name, topic=response_topic
subscription=self.subscription_name,
topic=response_topic,
header=self.pulsar_config.header,
)
self.start_ping_loop([in_socket, out_socket])
......
......@@ -34,7 +34,7 @@ __all__ = [
backend_config = ModuleConfig.parse_raw("""
{
"messaging_config": {{ config.messaging_config.json(indent=8) }}
"messaging_config": {{ config.messaging_config.json(indent=8, exclude=dict(header=set(["authorization"]))) }}
}
""")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment