Skip to content

Commit

Permalink
feat: add lease client
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Dec 13, 2023
1 parent 4d2dede commit 55d290a
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 2 deletions.
10 changes: 8 additions & 2 deletions client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import grpc
from client.protocol import ProtocolClient
from client.kv import KvClient
from client.lease import LeaseClient, LeaseIdGenerator
from client.watch import WatchClient
from client.auth import AuthClient

Expand All @@ -14,16 +15,19 @@ class Client:
Attributes:
kv_client: Kv client
lease_client: Lease client
watch_client: Watch client
auth_client: Auth client
"""

kv_client: KvClient
lease_client: LeaseClient
watch_client: WatchClient
auth_client: AuthClient

def __init__(self, kv: KvClient, watch: WatchClient, auth: AuthClient) -> None:
def __init__(self, kv: KvClient, lease: LeaseClient, watch: WatchClient, auth: AuthClient) -> None:
self.kv_client = kv
self.lease_client = lease
self.watch_client = watch
self.auth_client = auth

Expand All @@ -36,9 +40,11 @@ async def connect(cls, addrs: list[str]) -> Client:
# TODO: Load balancing
channel = grpc.aio.insecure_channel(addrs[0])
# TODO: Acquire the auth token
id_gen = LeaseIdGenerator()

kv_client = KvClient("client", protocol_client, "")
lease_client = LeaseClient("client", protocol_client, channel, "", id_gen)
watch_client = WatchClient(channel)
auth_client = AuthClient("client", protocol_client, channel, "")

return cls(kv_client, watch_client, auth_client)
return cls(kv_client, lease_client, watch_client, auth_client)
179 changes: 179 additions & 0 deletions client/lease.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Lease Client"""

import asyncio
import random
import uuid
from typing import Optional
from grpc import Channel
from grpc.aio import StreamStreamCall
from client.protocol import ProtocolClient as CurpClient
from api.xline.xline_command_pb2 import Command, RequestWithToken
from api.xline.rpc_pb2_grpc import LeaseStub
from api.xline.rpc_pb2 import (
LeaseGrantRequest,
LeaseGrantResponse,
LeaseRevokeRequest,
LeaseRevokeResponse,
LeaseKeepAliveRequest,
LeaseTimeToLiveRequest,
LeaseTimeToLiveResponse,
LeaseLeasesRequest,
LeaseLeasesResponse,
)


class LeaseIdGenerator:
"""
Generator of unique lease id
Note that this Lease Id generation method may cause collisions,
the client should retry after informed by the server.
Attributes:
id: The current lease id.
"""

lease_id: int

def __init__(self) -> None:
self.lease_id = int.from_bytes(random.randbytes(8), "big")

def next(self) -> int:
"""Generate a new `leaseId`."""
lease_id = self.lease_id
self.lease_id += 1
if lease_id == 0:
return self.next()
return lease_id & 0x7FFF_FFFF_FFFF_FFFF


class LeaseKeeper:
"""
Keeper of lease
Attributes:
id: The current lease id.
remote: The lease RPC client.
is_cancel: Whether the keeper is canceled.
"""

id: str
remote: LeaseStub
is_cancel: bool

def __init__(self, remote: LeaseStub) -> None:
self.id = ""
self.remote = remote
self.is_cancel = False

def keep(self, req: LeaseKeepAliveRequest) -> StreamStreamCall:
"""Keep alive"""

async def keep():
while not self.is_cancel:
yield req
await asyncio.sleep(1)

res = self.remote.LeaseKeepAlive(keep())
return res

def cancel(self) -> None:
"""Cancel the keeper"""
self.is_cancel = True


class LeaseClient:
"""
Client for Lease operations.
Attributes:
name: Name of the LeaseClient, which will be used in CURP propose id generation.
curp_client: The client running the CURP protocol, communicate with all servers.
lease_client: The lease RPC client, only communicate with one server at a time.
token: The auth token.
id_gen: Lease Id generator.
"""

name: str
curp_client: CurpClient
lease_client: LeaseStub
token: Optional[str]
id_gen: LeaseIdGenerator
keepers: dict[str, LeaseKeeper]

def __init__(
self, name: str, curp_client: CurpClient, channel: Channel, token: Optional[str], id_gen: LeaseIdGenerator
) -> None:
self.name = name
self.curp_client = curp_client
self.lease_client = LeaseStub(channel=channel)
self.token = token
self.id_gen = id_gen
self.keepers = {}

async def grant(self, req: LeaseGrantRequest) -> LeaseGrantResponse:
"""
Creates a lease which expires if the server does not receive a keepAlive
within a given time to live period. All keys attached to the lease will be expired and
deleted if the lease expires. Each expired key generates a delete event in the event history.
"""
if req.ID == 0:
req.ID = self.id_gen.next()
request_with_token = RequestWithToken(token=self.token, lease_grant_request=req)
propose_id = self.generate_propose_id()
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.lease_grant_response

async def revoke(self, req: LeaseRevokeRequest) -> LeaseRevokeResponse:
"""
Revokes a lease. All keys attached to the lease will expire and be deleted.
"""
res: LeaseRevokeResponse = await self.lease_client.LeaseRevoke(req)
return res

async def keep_alive(self, lease_id: int):
"""
Keeps the lease alive by streaming keep alive requests from the client
to the server and streaming keep alive responses from the server to the client.
"""
keeper = LeaseKeeper(self.lease_client)

keep_id = str(uuid.uuid4())
self.keepers[keep_id] = keeper

res = keeper.keep(LeaseKeepAliveRequest(ID=lease_id))
return res, keep_id

def cancel_keep_alive(self, keep_id: str):
"""Cancel keep alive"""
if keep_id in self.keepers:
self.keepers[keep_id].cancel()
del self.keepers[keep_id]

def time_to_live(self, req: LeaseTimeToLiveRequest) -> LeaseTimeToLiveResponse:
"""
Retrieves lease information.
"""
res: LeaseTimeToLiveResponse = self.lease_client.LeaseTimeToLive(req)
return res

async def leases(self) -> LeaseLeasesResponse:
"""
Lists all existing leases.
"""
request_with_token = RequestWithToken(token=self.token, lease_leases_request=LeaseLeasesRequest())
propose_id = self.generate_propose_id()
cmd = Command(
request=request_with_token,
propose_id=propose_id,
)
er, _ = await self.curp_client.propose(cmd, True)
return er.lease_leases_response

def generate_propose_id(self) -> str:
"""Generate propose id with the given prefix."""
propose_id = f"{self.name}-{uuid.uuid4()}"
return propose_id
72 changes: 72 additions & 0 deletions tests/lease_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Tests for the lease client"""

import pytest
from client import client
from api.xline.rpc_pb2 import (
LeaseGrantRequest,
LeaseRevokeRequest,
)


@pytest.mark.asyncio
async def test_grant_revoke_should_success_in_normal_path():
"""
Grant revoke should success in normal path.
"""
curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"]
cli = await client.Client.connect(curp_members)
lease_client = cli.lease_client

res = await lease_client.grant(LeaseGrantRequest(TTL=123))
assert res.TTL == 123

lease_id = res.ID
await lease_client.revoke(LeaseRevokeRequest(ID=lease_id))


@pytest.mark.asyncio
async def test_keep_alive_should_success_in_normal_path():
"""
Keep alive should success in normal path.
"""
curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"]
cli = await client.Client.connect(curp_members)
lease_client = cli.lease_client

grant_res = await lease_client.grant(LeaseGrantRequest(TTL=60))
lease_id = grant_res.ID

responses, keep_id = await lease_client.keep_alive(lease_id)

async for res in responses:
assert res.ID == lease_id
assert res.TTL == 60

lease_client.cancel_keep_alive(keep_id)

await lease_client.revoke(LeaseRevokeRequest(ID=lease_id))


@pytest.mark.asyncio
async def test_leases_should_include_granted_in_normal_path():
"""
Leases should include granted in normal path.
"""
curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"]
cli = await client.Client.connect(curp_members)
lease_client = cli.lease_client

lease1 = 100
lease2 = 101
lease3 = 102

await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease1))
await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease2))
await lease_client.grant(LeaseGrantRequest(TTL=60, ID=lease3))

res = await lease_client.leases()
assert len(res.leases) == 3

await lease_client.revoke(LeaseRevokeRequest(ID=lease1))
await lease_client.revoke(LeaseRevokeRequest(ID=lease2))
await lease_client.revoke(LeaseRevokeRequest(ID=lease3))

0 comments on commit 55d290a

Please sign in to comment.