Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Server Sent Events #83

Open
Siecje opened this issue Jan 10, 2023 · 5 comments
Open

Support Server Sent Events #83

Siecje opened this issue Jan 10, 2023 · 5 comments
Labels
documentation Improvements or additions to documentation enhancement New feature or request

Comments

@Siecje
Copy link

Siecje commented Jan 10, 2023

No description provided.

@cirospaciari
Copy link
Owner

cirospaciari commented Jan 10, 2023

before_response and after_response I think would allow a lot of new extensions.
Today you can achieve this using extensions + middleware, but will be a great addition to add more events for sure!
I will add this as a new feature in the future and implement it, in a way that not affects performance, if you choose to not use it.

@cirospaciari cirospaciari added documentation Improvements or additions to documentation enhancement New feature or request labels Jan 10, 2023
@Siecje
Copy link
Author

Siecje commented Jan 10, 2023

How do you have multiple responses for one request? Or are you saying to have a response to a heartbeat and store the events until then?

@cirospaciari
Copy link
Owner

you can send multiple res.write and one res.end or multiple res.send_chunks or res.try_end.
Using write you are basically using chunked encoded. with send_chunks and try_end you are streaming data.

@nickchomey
Copy link

nickchomey commented Nov 2, 2023

In addition to SSE, might it be possible to do Pub/Sub with SSE? My application is read-heavy, so SSE + AJAX is more appropriate than websockets (also SSE can be intercepted by service workers, is better on battery life, and more).

I see that PubSub is embedded within the uWS websockets mechanism, so don't know if it could be exposed to an SSE mechanism... I'm happy to help implement all of this with some guidance!

@cirospaciari
Copy link
Owner

cirospaciari commented Nov 2, 2023

SSE is a one-way channel (Server to Client) the Client can use the Last-Event-ID header to resume the stream but cannot send data.
Pub/Sub will not be possible (to publish from 1 Client to Server), but you can easily replicate it on the client side by using routes, and sending URL params and headers.

If you mean publishing from Server -> Client using topics with is possible, we can add it in the future
basic SSE with socketify example:

import aiohttp
from socketify import App

app = App()

async def get_pokemon(number):
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://pokeapi.co/api/v2/pokemon/{number}"
        ) as response:
            pokemon = await response.text()
            return pokemon.encode("utf-8")


async def home(res, req):
    try:
        lastEventID = req.get_header("Last-Event-ID")
        if lastEventID is None:
            lastEventID = 1
        else:
            lastEventID = int(lastEventID)

        res.write_status(200)
        res.write_header("Content-Type", "text/event-stream")
        res.write_header("Cache-Control", "no-cache")
        
        lastPokemonID = lastEventID % 151
        # stream pokemons data until we are aborted
        while not res.aborted:
            for pokemon_id in range(lastPokemonID, 151):
                pokemon = await get_pokemon(pokemon_id)
                res.write(f"data: {pokemon}\nid: {lastEventID}\n\n")
                lastEventID += 1
            lastPokemonID = 1
            
    except Exception as e:
        print(e)
        return res.end("Error")
    

app.get("/", home)

app.listen(
    3000,
    lambda config: print(
        "Listening on port http://localhost:%s now\n" % str(config.port)
    ),
)
app.run()

consuming it:

const source = new EventSource("http://localhost:3000/");
source.onmessage = (event) => {
   console.log(event.data);
};

using some form of pub/sub:

import aiohttp
import asyncio
from socketify import App

app = App()

channels = {}

def subscribe(channel, client):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        subscribers = []
        channels[channel] = subscribers
    subscribers.append(client)

def unsubscribe(channel, client):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        return False
    for c in subscribers:
        if c == client:
            subscribers.remove(c)
            return True
    return False

def publish(channel, message, id=None):
    subscribers = channels.get(channel, None)
    if subscribers is None:
        return False
    # broadcast to all subscribers
    for c in subscribers:
        if c.aborted:
            unsubscribe(channel, c)
        elif id is None:
            c.write(f"data: {message}\n\n")
        else:
            c.write(f"data: {message}\nid: {id}\n\n")

async def get_pokemon(number):
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://pokeapi.co/api/v2/pokemon/{number}"
        ) as response:
            pokemon = await response.text()
            # cache only works with strings/bytes
            # we will not change nothing here so no needs to parse json
            return pokemon.encode("utf-8")


def home(res, req):
    res.send("Hello World subscribe to event stream /pokemon and to broadcast /broadcast_pokemon?id=1")

async def broadcast_pokemon(res, req):
    pokemon_id = req.get_query("id")
    if pokemon_id is None:
        return res.send("id is required", status=400)
    
    pokemon = await get_pokemon(pokemon_id)
    # broadcast to all subscribers
    publish("pokemon", pokemon)
    return res.send("ACK")

async def pokemon(res, req):
    try:
        res.on_aborted(lambda res: unsubscribe("pokemon", res))
        subscribe("pokemon", res)

        res.write_status(200)
        res.write_header("Content-Type", "text/event-stream")
        res.write_header("Cache-Control", "no-cache")
        
        # wait for broadcast
        while not res.aborted:
            await asyncio.sleep(1)
    
    except Exception as e:
        print(e)
        res.end("Error")
        

app.get("/", home)
app.get("/pokemon", pokemon)
app.get("/broadcast_pokemon", broadcast_pokemon)

app.listen(
    3000,
    lambda config: print(
        "Listening on port http://localhost:%s now\n" % str(config.port)
    ),
)
app.run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants