Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: watch clinet #8

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import grpc
from client.protocol import ProtocolClient
from client.kv import KvClient
from client.watch import WatchClient
from client.auth import AuthClient


Expand All @@ -13,14 +14,17 @@ class Client:

Attributes:
kv_client: Kv client
watch_client: Watch client
auth_client: Auth client
"""

kv_client: KvClient
watch_client: WatchClient
auth_client: AuthClient

def __init__(self, kv: KvClient, auth: AuthClient) -> None:
def __init__(self, kv: KvClient, watch: WatchClient, auth: AuthClient) -> None:
self.kv_client = kv
self.watch_client = watch
self.auth_client = auth

@classmethod
Expand All @@ -34,6 +38,7 @@ async def connect(cls, addrs: list[str]) -> Client:
# TODO: Acquire the auth token

kv_client = KvClient("client", protocol_client, "")
watch_client = WatchClient(channel)
auth_client = AuthClient("client", protocol_client, channel, "")

return cls(kv_client, auth_client)
return cls(kv_client, watch_client, auth_client)
95 changes: 95 additions & 0 deletions client/watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Watch Client"""

import uuid
import asyncio
from grpc import Channel
from grpc.aio import StreamStreamCall
from api.xline.rpc_pb2_grpc import WatchStub
from api.xline.rpc_pb2 import (
WatchRequest,
WatchCreateRequest,
WatchCancelRequest,
)


class Watcher:
"""
Watcher for Watch operations.

Attributes:
watch_client: The watch RPC client, only communicate with one server at a time.
is_cancel: Whether the watcher is canceled.
watch_id: The ID of the watcher.
"""

watch_client: WatchStub
is_cancel: bool
watch_id: int

def __init__(self, watch_client: WatchStub) -> None:
self.watch_client = watch_client
self.is_cancel = False
self.watch_id = -1

def watch(self, req: WatchCreateRequest) -> StreamStreamCall:
"""
Watches for events happening or that have happened. Both input and output
are streams; the input stream is for creating and canceling watcher and the output
stream sends events. The entire event history can be watched starting from the
last compaction revision.
"""

async def watch():
yield WatchRequest(create_request=req)

while not self.is_cancel:
await asyncio.sleep(0.5)

yield WatchRequest(cancel_request=WatchCancelRequest(watch_id=self.watch_id))

res = self.watch_client.Watch(watch())
return res

def cancel(self, watch_id: int) -> None:
"""
Cancel the Watcher
"""
self.is_cancel = True
self.watch_id = watch_id


class WatchClient:
"""
Client for Watch operations.

Attributes:
watch_client: The watch RPC client, only communicate with one server at a time.
watchers: The list of watchers.
"""

watch_client: WatchStub
watchers: dict[str, Watcher]

def __init__(self, channel: Channel) -> None:
self.watch_client = WatchStub(channel=channel)
self.watchers = {}

def watch(self, req: WatchCreateRequest) -> tuple[StreamStreamCall, str]:
"""
Create Watcher to watch
"""
watcher = Watcher(self.watch_client)

watch_id = str(uuid.uuid4())
self.watchers[watch_id] = watcher

res = watcher.watch(req)
return res, watch_id

def cancel(self, watcher_id: str, watch_id: int) -> None:
"""
Cancel the Watcher
"""
if watcher_id in self.watchers:
self.watchers[watcher_id].cancel(watch_id)
del self.watchers[watcher_id]
57 changes: 57 additions & 0 deletions tests/watch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for the watch client"""

import asyncio
import pytest
from client import client
from api.xline.rpc_pb2 import (
WatchCreateRequest,
)


@pytest.mark.asyncio
async def test_watch_should_receive_consistent_events():
"""
Watch should receive consistent events
"""
curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"]
cli = await client.Client.connect(curp_members)
kv_client = cli.kv_client
watch_client = cli.watch_client

watch_res, watcher_id = watch_client.watch(WatchCreateRequest(key=b"watch01"))

async def changer():
await kv_client.put(b"watch01", b"value01")
await kv_client.put(b"watch01", b"value02")
await kv_client.put(b"watch01", b"value03")

async def watcher():
i = 1
async for res in watch_res:
if res.created is True:
watch_id = res.watch_id

if i == 1:
assert res.watch_id is not None
assert res.created
if i == 2:
assert res.events[0].kv.key == b"watch01"
assert res.events[0].kv.value == b"value01"
if i == 3:
assert res.events[0].kv.key == b"watch01"
assert res.events[0].kv.value == b"value02"
if i == 4:
assert res.events[0].kv.key == b"watch01"
assert res.events[0].kv.value == b"value03"
if i == 5:
assert res.watch_id is not None
assert res.canceled

i += 1

watch_client.cancel(watcher_id, watch_id)

await asyncio.gather(
changer(),
watcher(),
)
Loading