Skip to content

Commit

Permalink
Switch from socket.io to vanilla websockets & fix state updates for w…
Browse files Browse the repository at this point in the history
…ebsockets (#1048)
  • Loading branch information
wwwillchen authored Oct 25, 2024
1 parent 9f1ab4f commit 2a0e9e5
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 162 deletions.
8 changes: 2 additions & 6 deletions build_defs/defaults.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ THIRD_PARTY_JS_HIGHLIGHTJS = [
"@npm//highlight.js",
]

THIRD_PARTY_JS_SOCKETIO_CLIENT = [
"@npm//socket.io-client",
]

THIRD_PARTY_PY_ABSL_PY = [
requirement("absl-py"),
]
Expand All @@ -93,8 +89,8 @@ THIRD_PARTY_PY_FLASK = [
requirement("flask"),
]

THIRD_PARTY_PY_FLASK_SOCKETIO = [
requirement("flask-socketio"),
THIRD_PARTY_PY_FLASK_SOCK = [
requirement("flask-sock"),
]

THIRD_PARTY_PY_MATPLOTLIB = [
Expand Down
2 changes: 1 addition & 1 deletion build_defs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ python-dotenv

# Optional (lazily-loaded) deps:
sqlalchemy
flask-socketio
flask-sock

# greenlet is needed for SQL Alchemy depending on the architecture, but because of how
# Bazel works using requirements_lock.txt, it does seem to able to install the
Expand Down
22 changes: 5 additions & 17 deletions build_defs/requirements_lock.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ babel==2.15.0 \
--hash=sha256:08706bdad8d0a3413266ab61bd6c34d0c28d6e1e7badf40a2cebe67644e2e1fb \
--hash=sha256:8daf0e265d05768bc6c7a314cf1321e9a123afc328cc635c18622a2f30a04413
# via mkdocs-material
bidict==0.23.1 \
--hash=sha256:03069d763bc387bbd20e7d49914e75fc4132a41937fa3405417e1a5a2d006d71 \
--hash=sha256:5dae8d4d79b552a71cbabc7deb25dfe8ce710b17ff41711e13010ead2abfc3e5
# via python-socketio
blinker==1.8.2 \
--hash=sha256:1779309f71bf239144b9399d06ae925637cf6634cf6bd131104184531bf67c01 \
--hash=sha256:8f77b09d3bf7c795e969e9486f39c2c5e9c39d4ee07424be2bc594ece9642d83
Expand Down Expand Up @@ -296,10 +292,10 @@ flask==3.0.3 \
--hash=sha256:ceb27b0af3823ea2737928a4d99d125a06175b8512c445cbd9a9ce200ef76842
# via
# -r build_defs/requirements.txt
# flask-socketio
flask-socketio==5.4.1 \
--hash=sha256:2e9b8864a5be37ca54f6c76a4d06b1ac5e0df61fde12d03afc81ab4057e1eb86 \
--hash=sha256:895da879d162781b9193cbb8fe8f3cf25b263ff242980d5c5e6c16d3c03930d2
# flask-sock
flask-sock==0.7.0 \
--hash=sha256:caac4d679392aaf010d02fabcf73d52019f5bdaf1c9c131ec5a428cb3491204a \
--hash=sha256:e023b578284195a443b8d8bdb4469e6a6acf694b89aeb51315b1a34fcf427b7d
# via -r build_defs/requirements.txt
fonttools==4.53.0 \
--hash=sha256:099634631b9dd271d4a835d2b2a9e042ccc94ecdf7e2dd9f7f34f7daf333358d \
Expand Down Expand Up @@ -1276,14 +1272,6 @@ python-dotenv==1.0.1 \
--hash=sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca \
--hash=sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a
# via -r build_defs/requirements.txt
python-engineio==4.9.1 \
--hash=sha256:7631cf5563086076611e494c643b3fa93dd3a854634b5488be0bba0ef9b99709 \
--hash=sha256:f995e702b21f6b9ebde4e2000cd2ad0112ba0e5116ec8d22fe3515e76ba9dddd
# via python-socketio
python-socketio==5.11.4 \
--hash=sha256:42efaa3e3e0b166fc72a527488a13caaac2cefc76174252486503bd496284945 \
--hash=sha256:8b0b8ff2964b2957c865835e936310190639c00310a47d77321a594d1665355e
# via flask-socketio
pytz==2024.1 \
--hash=sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812 \
--hash=sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319
Expand Down Expand Up @@ -1445,7 +1433,7 @@ rsa==4.9 \
simple-websocket==1.1.0 \
--hash=sha256:4af6069630a38ed6c561010f0e11a5bc0d4ca569b36306eb257cd9a192497c8c \
--hash=sha256:7939234e7aa067c534abdab3a9ed933ec9ce4691b0713c78acb195560aa52ae4
# via python-engineio
# via flask-sock
six==1.16.0 \
--hash=sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926 \
--hash=sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254
Expand Down
35 changes: 21 additions & 14 deletions docs/api/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,6 @@ Mesop is configured at the application level using environment variables.

## Configuration values

### MESOP_CONCURRENT_UPDATES_ENABLED

Allows concurrent updates to state in the same session. If this is not updated, then updates are queued and processed sequentially.

By default, this is not enabled. You can enable this by setting it to `true`.

### MESOP_WEB_SOCKETS_ENABLED

!!! warning "Experimental feature"

This is an experimental feature and is subject to breaking change. Please follow [https://github.com/google/mesop/issues/1028](https://github.com/google/mesop/issues/1028) for updates.

This uses WebSockets instead of HTTP Server-Sent Events (SSE) as the transport protocol for UI updates. If you set this environment variable to `true`, then [`MESOP_CONCURRENT_UPDATES_ENABLED`](#MESOP_CONCURRENT_UPDATES_ENABLED) will automatically be enabled as well.

### MESOP_STATE_SESSION_BACKEND

Sets the backend to use for caching state data server-side. This makes it so state does
Expand Down Expand Up @@ -194,6 +180,27 @@ parameter specifies which SQL database table that Mesop will write state session

**Default:** `mesop_state_session`

## Experimental configuration values

These configuration values are experimental and are subject to breaking change, including removal in future releases.

### MESOP_CONCURRENT_UPDATES_ENABLED

!!! warning "Experimental feature"

This is an experimental feature and is subject to breaking change. There are many bugs and edge cases to this feature.

Allows concurrent updates to state in the same session. If this is not updated, then updates are queued and processed sequentially.

By default, this is not enabled. You can enable this by setting it to `true`.

### MESOP_WEB_SOCKETS_ENABLED

!!! warning "Experimental feature"

This is an experimental feature and is subject to breaking change. Please follow [https://github.com/google/mesop/issues/1028](https://github.com/google/mesop/issues/1028) for updates.

This uses WebSockets instead of HTTP Server-Sent Events (SSE) as the transport protocol for UI updates. If you set this environment variable to `true`, then [`MESOP_CONCURRENT_UPDATES_ENABLED`](#MESOP_CONCURRENT_UPDATES_ENABLED) will automatically be enabled as well.

## Usage Examples

Expand Down
13 changes: 1 addition & 12 deletions mesop/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
execute_module,
get_module_name_from_runfile_path,
)
from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.exceptions import format_traceback
from mesop.runtime import (
enable_debug_mode,
Expand Down Expand Up @@ -154,17 +153,7 @@ def main(argv: Sequence[str]):
log_startup(port=port())
logging.getLogger("werkzeug").setLevel(logging.WARN)

if MESOP_WEBSOCKETS_ENABLED:
socketio = flask_app.socketio # type: ignore
socketio.run(
flask_app,
host=get_public_host(),
port=port(),
use_reloader=False,
allow_unsafe_werkzeug=True,
)
else:
flask_app.run(host=get_public_host(), port=port(), use_reloader=False)
flask_app.run(host=get_public_host(), port=port(), use_reloader=False)


if __name__ == "__main__":
Expand Down
21 changes: 11 additions & 10 deletions mesop/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,22 @@ def __init__(self):
self._contexts = {}

def context(self) -> Context:
if MESOP_WEBSOCKETS_ENABLED and hasattr(request, "sid"):
# flask-socketio adds sid (session id) to the request object.
sid = request.sid # type: ignore
if sid not in self._contexts:
self._contexts[sid] = self.create_context()
return self._contexts[sid]
if MESOP_WEBSOCKETS_ENABLED and hasattr(request, "websocket_session_id"):
websocket_session_id = request.websocket_session_id # type: ignore
if websocket_session_id not in self._contexts:
self._contexts[websocket_session_id] = self.create_context()
return self._contexts[websocket_session_id]
if "_mesop_context" not in g:
g._mesop_context = self.create_context()
return g._mesop_context

def delete_context(self, sid: str) -> None:
if sid in self._contexts:
del self._contexts[sid]
def delete_context(self, websocket_session_id: str) -> None:
if websocket_session_id in self._contexts:
del self._contexts[websocket_session_id]
else:
warn(f"Tried to delete context with sid={sid} that doesn't exist.")
warn(
f"Tried to delete context with websocket_session_id={websocket_session_id} that doesn't exist."
)

def create_context(self) -> Context:
# If running in prod mode, *always* enable the has served traffic safety check.
Expand Down
4 changes: 2 additions & 2 deletions mesop/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ load(
"THIRD_PARTY_PY_DOTENV",
"THIRD_PARTY_PY_FIREBASE_ADMIN",
"THIRD_PARTY_PY_FLASK",
"THIRD_PARTY_PY_FLASK_SOCKETIO",
"THIRD_PARTY_PY_FLASK_SOCK",
"THIRD_PARTY_PY_GREENLET",
"THIRD_PARTY_PY_MSGPACK",
"THIRD_PARTY_PY_PYTEST",
Expand Down Expand Up @@ -40,7 +40,7 @@ py_library(
"//mesop/warn",
] + THIRD_PARTY_PY_ABSL_PY +
THIRD_PARTY_PY_FLASK +
THIRD_PARTY_PY_FLASK_SOCKETIO,
THIRD_PARTY_PY_FLASK_SOCK,
)

py_library(
Expand Down
87 changes: 58 additions & 29 deletions mesop/server/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import base64
import secrets
import threading
from typing import Generator, Sequence

from flask import Flask, Response, abort, request, stream_with_context
from flask import (
Flask,
Response,
abort,
copy_current_request_context,
request,
stream_with_context,
)

import mesop.protos.ui_pb2 as pb
from mesop.component_helpers import diff_component
Expand Down Expand Up @@ -146,18 +155,20 @@ def generate_data(ui_request: pb.UiRequest) -> Generator[str, None, None]:
yield from render_loop(path=ui_request.path, init_request=True)
else:
yield from render_loop(path=ui_request.path, init_request=True)
yield create_update_state_event()
if not MESOP_WEBSOCKETS_ENABLED:
yield create_update_state_event()
yield STREAM_END
elif ui_request.HasField("user_event"):
event = ui_request.user_event
runtime().context().set_theme_settings(event.theme_settings)
runtime().context().set_viewport_size(event.viewport_size)
runtime().context().initialize_query_params(event.query_params)

if event.states.states:
runtime().context().update_state(event.states)
else:
runtime().context().restore_state_from_session(event.state_token)
if not MESOP_WEBSOCKETS_ENABLED:
if event.states.states:
runtime().context().update_state(event.states)
else:
runtime().context().restore_state_from_session(event.state_token)

for _ in render_loop(path=ui_request.path, trace_mode=True):
pass
Expand Down Expand Up @@ -215,7 +226,8 @@ def generate_data(ui_request: pb.UiRequest) -> Generator[str, None, None]:
yield from render_loop(path=path)
runtime().context().set_previous_node_from_current_node()
runtime().context().reset_current_node()
yield create_update_state_event(diff=True)
if not MESOP_WEBSOCKETS_ENABLED:
yield create_update_state_event(diff=True)
yield STREAM_END
else:
raise Exception(f"Unknown request type: {ui_request}")
Expand Down Expand Up @@ -255,34 +267,51 @@ def teardown_clear_stale_state_sessions(error=None):
configure_debug_routes(flask_app)

if MESOP_WEBSOCKETS_ENABLED:
from flask_socketio import SocketIO, emit
from flask_sock import Sock
from simple_websocket import Server

socketio = SocketIO(flask_app)
sock = Sock(flask_app)

@socketio.on_error(namespace=UI_PATH)
def handle_error(e):
print("WebSocket error", e)
sid = request.sid # type: ignore
runtime().delete_context(sid)
@sock.route(UI_PATH)
def handle_websocket(ws: Server):
@copy_current_request_context
def ws_generate_data(ws, ui_request):
for data_chunk in generate_data(ui_request):
if not ws.connected:
break
ws.send(data_chunk)

@socketio.on("disconnect", namespace=UI_PATH)
def handle_disconnect():
sid = request.sid # type: ignore
runtime().delete_context(sid)
# Generate a unique session ID for the WebSocket connection
session_id = secrets.token_urlsafe(32)
request.websocket_session_id = session_id # type: ignore

@socketio.on("message", namespace=UI_PATH)
def handle_message(message):
if not message:
emit("error", {"error": "Missing request payload"})
return
try:
while True:
message = ws.receive()
if not message:
continue # Ignore empty messages

ui_request = pb.UiRequest()
ui_request.ParseFromString(base64.urlsafe_b64decode(message))
ui_request = pb.UiRequest()
try:
decoded_message = base64.urlsafe_b64decode(message)
ui_request.ParseFromString(decoded_message)
except Exception as parse_error:
print("Failed to parse message:", parse_error)
continue # Skip processing this message

generator = generate_data(ui_request)
for data_chunk in generator:
emit("response", {"data": data_chunk})
# Start a new thread so we can handle multiple
# concurrent updates for the same websocket connection.
thread = threading.Thread(
target=ws_generate_data, args=(ws, ui_request), daemon=True
)
thread.start()

flask_app.socketio = socketio # type: ignore
except Exception as e:
print("WebSocket error:", e)
finally:
# Clean up context when connection closes
if hasattr(request, "websocket_session_id"):
websocket_session_id = request.websocket_session_id # type: ignore
runtime().delete_context(websocket_session_id)

return flask_app
16 changes: 2 additions & 14 deletions mesop/server/wsgi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from absl import flags
from flask import Flask

from mesop.env.env import MESOP_WEBSOCKETS_ENABLED
from mesop.runtime import enable_debug_mode
from mesop.server.constants import EDITOR_PACKAGE_PATH, PROD_PACKAGE_PATH
from mesop.server.flags import port
Expand All @@ -22,19 +21,8 @@ def __init__(self, flask_app: Flask):

def run(self):
log_startup(port=port())
if MESOP_WEBSOCKETS_ENABLED:
socketio = self._flask_app.socketio # type: ignore
socketio.run(
self._flask_app,
host=get_local_host(),
port=port(),
use_reloader=False,
allow_unsafe_werkzeug=True,
)
else:
self._flask_app.run(
host=get_local_host(), port=port(), use_reloader=False
)

self._flask_app.run(host=get_local_host(), port=port(), use_reloader=False)


def create_app(
Expand Down
4 changes: 2 additions & 2 deletions mesop/web/src/services/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("//build_defs:defaults.bzl", "ANGULAR_CORE_DEPS", "ANGULAR_MATERIAL_TS_DEPS", "THIRD_PARTY_JS_SOCKETIO_CLIENT", "ng_module")
load("//build_defs:defaults.bzl", "ANGULAR_CORE_DEPS", "ANGULAR_MATERIAL_TS_DEPS", "ng_module")

package(
default_visibility = ["//build_defs:mesop_internal"],
Expand All @@ -13,5 +13,5 @@ ng_module(
"//mesop/protos:ui_jspb_proto",
"//mesop/web/src/dev_tools/services",
"//mesop/web/src/utils",
] + ANGULAR_CORE_DEPS + ANGULAR_MATERIAL_TS_DEPS + THIRD_PARTY_JS_SOCKETIO_CLIENT,
] + ANGULAR_CORE_DEPS + ANGULAR_MATERIAL_TS_DEPS,
)
Loading

0 comments on commit 2a0e9e5

Please sign in to comment.