diff --git a/client/client.py b/client/client.py index 9a5841a..ff8b662 100644 --- a/client/client.py +++ b/client/client.py @@ -4,6 +4,7 @@ import grpc from client.protocol import ProtocolClient from client.kv import KvClient +from client.lease import LeaseClient, LeaseIdGenerator from client.watch import WatchClient from client.auth import AuthClient @@ -14,16 +15,19 @@ class Client: Attributes: kv_client: Kv client + lease_client: Lease client watch_client: Watch client auth_client: Auth client """ kv_client: KvClient + lease_client: LeaseClient watch_client: WatchClient auth_client: AuthClient - def __init__(self, kv: KvClient, watch: WatchClient, auth: AuthClient) -> None: + def __init__(self, kv: KvClient, lease: LeaseClient, watch: WatchClient, auth: AuthClient) -> None: self.kv_client = kv + self.lease_client = lease self.watch_client = watch self.auth_client = auth @@ -36,9 +40,11 @@ async def connect(cls, addrs: list[str]) -> Client: # TODO: Load balancing channel = grpc.aio.insecure_channel(addrs[0]) # TODO: Acquire the auth token + id_gen = LeaseIdGenerator() kv_client = KvClient("client", protocol_client, "") + lease_client = LeaseClient("client", protocol_client, channel, "", id_gen) watch_client = WatchClient(channel) auth_client = AuthClient("client", protocol_client, channel, "") - return cls(kv_client, watch_client, auth_client) + return cls(kv_client, lease_client, watch_client, auth_client) diff --git a/client/lease.py b/client/lease.py new file mode 100644 index 0000000..ecfb60a --- /dev/null +++ b/client/lease.py @@ -0,0 +1,179 @@ +"""Lease Client""" + +import asyncio +import random +import uuid +from typing import Optional +from grpc import Channel +from grpc.aio import StreamStreamCall +from client.protocol import ProtocolClient as CurpClient +from api.xline.xline_command_pb2 import Command, RequestWithToken +from api.xline.rpc_pb2_grpc import LeaseStub +from api.xline.rpc_pb2 import ( + LeaseGrantRequest, + LeaseGrantResponse, + LeaseRevokeRequest, + LeaseRevokeResponse, + LeaseKeepAliveRequest, + LeaseTimeToLiveRequest, + LeaseTimeToLiveResponse, + LeaseLeasesRequest, + LeaseLeasesResponse, +) + + +class LeaseIdGenerator: + """ + Generator of unique lease id + Note that this Lease Id generation method may cause collisions, + the client should retry after informed by the server. + + Attributes: + id: The current lease id. + """ + + lease_id: int + + def __init__(self) -> None: + self.lease_id = int.from_bytes(random.randbytes(8), "big") + + def next(self) -> int: + """Generate a new `leaseId`.""" + lease_id = self.lease_id + self.lease_id += 1 + if lease_id == 0: + return self.next() + return lease_id & 0x7FFF_FFFF_FFFF_FFFF + + +class LeaseKeeper: + """ + Keeper of lease + + Attributes: + id: The current lease id. + remote: The lease RPC client. + is_cancel: Whether the keeper is canceled. + """ + + id: str + remote: LeaseStub + is_cancel: bool + + def __init__(self, remote: LeaseStub) -> None: + self.id = "" + self.remote = remote + self.is_cancel = False + + def keep(self, req: LeaseKeepAliveRequest) -> StreamStreamCall: + """Keep alive""" + + async def keep(): + while not self.is_cancel: + yield req + await asyncio.sleep(1) + + res = self.remote.LeaseKeepAlive(keep()) + return res + + def cancel(self) -> None: + """Cancel the keeper""" + self.is_cancel = True + + +class LeaseClient: + """ + Client for Lease operations. + + Attributes: + name: Name of the LeaseClient, which will be used in CURP propose id generation. + curp_client: The client running the CURP protocol, communicate with all servers. + lease_client: The lease RPC client, only communicate with one server at a time. + token: The auth token. + id_gen: Lease Id generator. + """ + + name: str + curp_client: CurpClient + lease_client: LeaseStub + token: Optional[str] + id_gen: LeaseIdGenerator + keepers: dict[str, LeaseKeeper] + + def __init__( + self, name: str, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator + ) -> None: + self.name = name + self.curp_client = curp_client + self.lease_client = LeaseStub(channel=channel) + self.token = token + self.id_gen = id_gen + self.keepers = {} + + async def grant(self, req: LeaseGrantRequest) -> LeaseGrantResponse: + """ + Creates a lease which expires if the server does not receive a keepAlive + within a given time to live period. All keys attached to the lease will be expired and + deleted if the lease expires. Each expired key generates a delete event in the event history. + """ + if req.ID == 0: + req.ID = self.id_gen.next() + request_with_token = RequestWithToken(token=self.token, lease_grant_request=req) + propose_id = self.generate_propose_id() + cmd = Command( + request=request_with_token, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, True) + return er.lease_grant_response + + async def revoke(self, req: LeaseRevokeRequest) -> LeaseRevokeResponse: + """ + Revokes a lease. All keys attached to the lease will expire and be deleted. + """ + res: LeaseRevokeResponse = await self.lease_client.LeaseRevoke(req) + return res + + async def keep_alive(self, lease_id: int): + """ + Keeps the lease alive by streaming keep alive requests from the client + to the server and streaming keep alive responses from the server to the client. + """ + keeper = LeaseKeeper(self.lease_client) + + keep_id = str(uuid.uuid4()) + self.keepers[keep_id] = keeper + + res = keeper.keep(LeaseKeepAliveRequest(ID=lease_id)) + return res, keep_id + + def cancel_keep_alive(self, keep_id: str): + """Cancel keep alive""" + if keep_id in self.keepers: + self.keepers[keep_id].cancel() + del self.keepers[keep_id] + + def time_to_live(self, req: LeaseTimeToLiveRequest) -> LeaseTimeToLiveResponse: + """ + Retrieves lease information. + """ + res: LeaseTimeToLiveResponse = self.lease_client.LeaseTimeToLive(req) + return res + + async def leases(self) -> LeaseLeasesResponse: + """ + Lists all existing leases. + """ + request_with_token = RequestWithToken(token=self.token, lease_leases_request=LeaseLeasesRequest()) + propose_id = self.generate_propose_id() + cmd = Command( + request=request_with_token, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, True) + return er.lease_leases_response + + def generate_propose_id(self) -> str: + """Generate propose id with the given prefix.""" + propose_id = f"{self.name}-{uuid.uuid4()}" + return propose_id diff --git a/tests/lease_test.py b/tests/lease_test.py new file mode 100644 index 0000000..7f75e28 --- /dev/null +++ b/tests/lease_test.py @@ -0,0 +1,72 @@ +"""Tests for the lease client""" + +import pytest +from client import client +from api.xline.rpc_pb2 import ( + LeaseGrantRequest, + LeaseRevokeRequest, +) + + +@pytest.mark.asyncio +async def test_grant_revoke_should_success_in_normal_path(): + """ + Grant revoke should success in normal path. + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + lease_client = cli.lease_client + + res = await lease_client.grant(LeaseGrantRequest(TTL=123)) + assert res.TTL == 123 + + lease_id = res.ID + await lease_client.revoke(LeaseRevokeRequest(ID=lease_id)) + + +@pytest.mark.asyncio +async def test_keep_alive_should_success_in_normal_path(): + """ + Keep alive should success in normal path. + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + lease_client = cli.lease_client + + grant_res = await lease_client.grant(LeaseGrantRequest(TTL=60)) + lease_id = grant_res.ID + + responses, keep_id = await lease_client.keep_alive(lease_id) + + async for res in responses: + assert res.ID == lease_id + assert res.TTL == 60 + + lease_client.cancel_keep_alive(keep_id) + + await lease_client.revoke(LeaseRevokeRequest(ID=lease_id)) + + +@pytest.mark.asyncio +async def test_leases_should_include_granted_in_normal_path(): + """ + Leases should include granted in normal path. + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + lease_client = cli.lease_client + + lease1 = 100 + lease2 = 101 + lease3 = 102 + + await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease1)) + await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease2)) + await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease3)) + + res = await lease_client.leases() + assert len(res.leases) == 3 + + await lease_client.revoke(LeaseRevokeRequest(ID=lease1)) + await lease_client.revoke(LeaseRevokeRequest(ID=lease2)) + await lease_client.revoke(LeaseRevokeRequest(ID=lease3))