Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding websocket event publisher #65

Merged
merged 3 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions brew_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from bg_utils.event_publisher import EventPublishers
from bg_utils.pika import TransientPikaClient
from bg_utils.plugin_logging_loader import PluginLoggingLoader
from brew_view.publishers import MongoPublisher, RequestPublisher, TornadoPikaPublisher
from brew_view.publishers import (MongoPublisher, RequestPublisher,
TornadoPikaPublisher, WebsocketPublisher)
from brew_view.specification import get_default_logging_config
from brewtils.schemas import ParameterSchema, CommandSchema, InstanceSchema, SystemSchema, \
RequestSchema, PatchSchema, LoggingConfigSchema, EventSchema, QueueSchema
Expand Down Expand Up @@ -51,6 +52,12 @@ def setup_brew_view(spec, cli_args):
_setup_application()


def shutdown():
"""Close any open websocket connections"""
from brew_view.controllers import EventSocket
EventSocket.shutdown()


def load_plugin_logging_config(input_config):
global plugin_logging_config

Expand Down Expand Up @@ -86,7 +93,7 @@ def _setup_tornado_app():
from brew_view.controllers import AdminAPI, CommandAPI, CommandListAPI, ConfigHandler, \
InstanceAPI, QueueAPI, QueueListAPI, RequestAPI, RequestListAPI, SystemAPI, SystemListAPI, \
VersionHandler, SpecHandler, SwaggerConfigHandler, OldAdminAPI, OldQueueAPI, \
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI, EventSocket

prefix = config.web.url_prefix
static_base = os.path.join(os.path.dirname(__file__), 'static', 'dist')
Expand Down Expand Up @@ -116,13 +123,23 @@ def _setup_tornado_app():

# And these do not
unpublished_url_specs = [
# These are a little special - unpublished but still versioned
# The swagger spec
(r'{0}api/v1/spec/?'.format(prefix), SpecHandler),
# Events websocket
(r'{0}api/v1/socket/events/?'.format(prefix), EventSocket),

# Version / configs
(r'{0}version/?'.format(prefix), VersionHandler),
(r'{0}config/?'.format(prefix), ConfigHandler),
(r'{0}config/swagger/?'.format(prefix), SwaggerConfigHandler),
(r'{0}version/?'.format(prefix), VersionHandler),
(r'{0}api/v1/spec/?'.format(prefix), SpecHandler),

# Not sure if these are really necessary
(r'{0}'.format(prefix[:-1]), RedirectHandler, {"url": prefix}),
(r'{0}swagger/(.*)'.format(prefix), StaticFileHandler,
{'path': os.path.join(static_base, 'swagger')}),

# Static content
(r'{0}(.*)'.format(prefix), StaticFileHandler,
{'path': static_base, 'default_filename': 'index.html'})
]
Expand Down Expand Up @@ -184,9 +201,13 @@ def bg_thrift_context(async=True, **kwargs):


def _setup_event_publishers(ssl_context):
from brew_view.controllers.event_api import EventSocket

# Create the collection of event publishers and add concrete publishers to it
pubs = EventPublishers({'request': RequestPublisher(ssl_context=ssl_context)})
pubs = EventPublishers({
'request': RequestPublisher(ssl_context=ssl_context),
'websocket': WebsocketPublisher(EventSocket)
})

if config.event.mongo.enable:
pubs['mongo'] = MongoPublisher()
Expand Down
4 changes: 4 additions & 0 deletions brew_view/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def shutdown():
brew_view.logger.info("Stopping server.")
brew_view.server.stop()

# Shutdown everything short of the event loop
# (we need the event loop to publish the shutdown event)
brew_view.shutdown()

# Publish shutdown notification
brew_view.event_publishers.publish_event(Event(name=Events.BREWVIEW_STOPPED.name))

Expand Down
2 changes: 1 addition & 1 deletion brew_view/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
from brew_view.controllers.misc_controllers import ConfigHandler, VersionHandler, SpecHandler, \
SwaggerConfigHandler
from brew_view.controllers.logging_api import LoggingConfigAPI
from brew_view.controllers.event_api import EventPublisherAPI
from brew_view.controllers.event_api import EventPublisherAPI, EventSocket
43 changes: 43 additions & 0 deletions brew_view/controllers/event_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging

from tornado.websocket import WebSocketHandler

import brew_view
from bg_utils.parser import BeerGardenSchemaParser
from brew_view.base_handler import BaseHandler
from brewtils.schema_parser import SchemaParser

Expand Down Expand Up @@ -50,3 +53,43 @@ def post(self):
brew_view.event_publishers[publisher].publish_event(event)

self.set_status(204)


class EventSocket(WebSocketHandler):

logger = logging.getLogger(__name__)
parser = BeerGardenSchemaParser()

closing = False
listeners = set()

def check_origin(self, origin):
return True

def open(self):
if EventSocket.closing:
self.close(reason='Shutting down')
else:
EventSocket.listeners.add(self)

def on_close(self):
EventSocket.listeners.discard(self)

def on_message(self, message):
pass

@classmethod
def publish(cls, message):
# Don't bother if nobody is listening
if not len(cls.listeners):
return

for listener in cls.listeners:
listener.write_message(message)

@classmethod
def shutdown(cls):
EventSocket.closing = True

for listener in cls.listeners:
listener.close(reason='Shutting down')
12 changes: 12 additions & 0 deletions brew_view/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ def _event_publish_args(self, event, **kwargs):
return {'urls': urls} if urls else {}


class WebsocketPublisher(BeergardenPublisher):
"""Publisher implementation that publishes to a websocket"""

def __init__(self, socket_class):
BeergardenPublisher.__init__(self)

self._socket = socket_class

def publish(self, message, **kwargs):
self._socket.publish(message)


class MongoPublisher(BeergardenPublisher):
"""Publisher implementation that 'publishes' to Mongo"""

Expand Down