Skip to content

Commit

Permalink
Merge pull request #65 from beer-garden/websocket_publisher
Browse files Browse the repository at this point in the history
Adding websocket event publisher
  • Loading branch information
hazmat345 authored Jun 19, 2018
2 parents 96758fc + 21d1bab commit e8e7771
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
31 changes: 26 additions & 5 deletions brew_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from bg_utils.event_publisher import EventPublishers
from bg_utils.pika import TransientPikaClient
from bg_utils.plugin_logging_loader import PluginLoggingLoader
from brew_view.publishers import MongoPublisher, RequestPublisher, TornadoPikaPublisher
from brew_view.publishers import (MongoPublisher, RequestPublisher,
TornadoPikaPublisher, WebsocketPublisher)
from brew_view.specification import get_default_logging_config
from brewtils.schemas import ParameterSchema, CommandSchema, InstanceSchema, SystemSchema, \
RequestSchema, PatchSchema, LoggingConfigSchema, EventSchema, QueueSchema
Expand Down Expand Up @@ -51,6 +52,12 @@ def setup_brew_view(spec, cli_args):
_setup_application()


def shutdown():
"""Close any open websocket connections"""
from brew_view.controllers import EventSocket
EventSocket.shutdown()


def load_plugin_logging_config(input_config):
global plugin_logging_config

Expand Down Expand Up @@ -86,7 +93,7 @@ def _setup_tornado_app():
from brew_view.controllers import AdminAPI, CommandAPI, CommandListAPI, ConfigHandler, \
InstanceAPI, QueueAPI, QueueListAPI, RequestAPI, RequestListAPI, SystemAPI, SystemListAPI, \
VersionHandler, SpecHandler, SwaggerConfigHandler, OldAdminAPI, OldQueueAPI, \
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI
OldQueueListAPI, LoggingConfigAPI, EventPublisherAPI, EventSocket

prefix = config.web.url_prefix
static_base = os.path.join(os.path.dirname(__file__), 'static', 'dist')
Expand Down Expand Up @@ -116,13 +123,23 @@ def _setup_tornado_app():

# And these do not
unpublished_url_specs = [
# These are a little special - unpublished but still versioned
# The swagger spec
(r'{0}api/v1/spec/?'.format(prefix), SpecHandler),
# Events websocket
(r'{0}api/v1/socket/events/?'.format(prefix), EventSocket),

# Version / configs
(r'{0}version/?'.format(prefix), VersionHandler),
(r'{0}config/?'.format(prefix), ConfigHandler),
(r'{0}config/swagger/?'.format(prefix), SwaggerConfigHandler),
(r'{0}version/?'.format(prefix), VersionHandler),
(r'{0}api/v1/spec/?'.format(prefix), SpecHandler),

# Not sure if these are really necessary
(r'{0}'.format(prefix[:-1]), RedirectHandler, {"url": prefix}),
(r'{0}swagger/(.*)'.format(prefix), StaticFileHandler,
{'path': os.path.join(static_base, 'swagger')}),

# Static content
(r'{0}(.*)'.format(prefix), StaticFileHandler,
{'path': static_base, 'default_filename': 'index.html'})
]
Expand Down Expand Up @@ -184,9 +201,13 @@ def bg_thrift_context(async=True, **kwargs):


def _setup_event_publishers(ssl_context):
from brew_view.controllers.event_api import EventSocket

# Create the collection of event publishers and add concrete publishers to it
pubs = EventPublishers({'request': RequestPublisher(ssl_context=ssl_context)})
pubs = EventPublishers({
'request': RequestPublisher(ssl_context=ssl_context),
'websocket': WebsocketPublisher(EventSocket)
})

if config.event.mongo.enable:
pubs['mongo'] = MongoPublisher()
Expand Down
4 changes: 4 additions & 0 deletions brew_view/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def shutdown():
brew_view.logger.info("Stopping server.")
brew_view.server.stop()

# Shutdown everything short of the event loop
# (we need the event loop to publish the shutdown event)
brew_view.shutdown()

# Publish shutdown notification
brew_view.event_publishers.publish_event(Event(name=Events.BREWVIEW_STOPPED.name))

Expand Down
2 changes: 1 addition & 1 deletion brew_view/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
from brew_view.controllers.misc_controllers import ConfigHandler, VersionHandler, SpecHandler, \
SwaggerConfigHandler
from brew_view.controllers.logging_api import LoggingConfigAPI
from brew_view.controllers.event_api import EventPublisherAPI
from brew_view.controllers.event_api import EventPublisherAPI, EventSocket
43 changes: 43 additions & 0 deletions brew_view/controllers/event_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging

from tornado.websocket import WebSocketHandler

import brew_view
from bg_utils.parser import BeerGardenSchemaParser
from brew_view.base_handler import BaseHandler
from brewtils.schema_parser import SchemaParser

Expand Down Expand Up @@ -50,3 +53,43 @@ def post(self):
brew_view.event_publishers[publisher].publish_event(event)

self.set_status(204)


class EventSocket(WebSocketHandler):

logger = logging.getLogger(__name__)
parser = BeerGardenSchemaParser()

closing = False
listeners = set()

def check_origin(self, origin):
return True

def open(self):
if EventSocket.closing:
self.close(reason='Shutting down')
else:
EventSocket.listeners.add(self)

def on_close(self):
EventSocket.listeners.discard(self)

def on_message(self, message):
pass

@classmethod
def publish(cls, message):
# Don't bother if nobody is listening
if not len(cls.listeners):
return

for listener in cls.listeners:
listener.write_message(message)

@classmethod
def shutdown(cls):
EventSocket.closing = True

for listener in cls.listeners:
listener.close(reason='Shutting down')
12 changes: 12 additions & 0 deletions brew_view/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ def _event_publish_args(self, event, **kwargs):
return {'urls': urls} if urls else {}


class WebsocketPublisher(BeergardenPublisher):
"""Publisher implementation that publishes to a websocket"""

def __init__(self, socket_class):
BeergardenPublisher.__init__(self)

self._socket = socket_class

def publish(self, message, **kwargs):
self._socket.publish(message)


class MongoPublisher(BeergardenPublisher):
"""Publisher implementation that 'publishes' to Mongo"""

Expand Down

0 comments on commit e8e7771

Please sign in to comment.