From 92420d011d966204139b8450e6d126f8cba59ad7 Mon Sep 17 00:00:00 2001 From: LingKa Date: Tue, 5 Dec 2023 02:09:01 +0800 Subject: [PATCH] feat: add kv client Signed-off-by: LingKa --- .github/workflows/ci.yml | 3 + client/client.py | 30 ++++++ client/kv.py | 189 ++++++++++++++++++++++++++++++++++++++ client/protocol.py | 4 +- client/txn.py | 88 ++++++++++++++++++ tests/kv_test.py | 193 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 505 insertions(+), 2 deletions(-) create mode 100644 client/client.py create mode 100644 client/kv.py create mode 100644 client/txn.py create mode 100644 tests/kv_test.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8597b26..d0a0c40 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,6 +19,9 @@ jobs: - name: Install Hatch run: pip install hatch + + - name: Generate API + uses: ./.github/workflows/protobuf - name: Run lint & fmt run: hatch run lint:all ./client diff --git a/client/client.py b/client/client.py new file mode 100644 index 0000000..2203cc9 --- /dev/null +++ b/client/client.py @@ -0,0 +1,30 @@ +"""Xline Client""" + +from __future__ import annotations +from client.protocol import ProtocolClient +from client.kv import KvClient + + +class Client: + """ + Xline client + + Attributes: + kv: Kv client + """ + + kv_client: KvClient + + def __init__(self, kv: KvClient) -> None: + self.kv_client = kv + + @classmethod + async def connect(cls, addrs: list[str]) -> Client: + """ + New `Client` + """ + protocol_client = await ProtocolClient.build_from_addrs(addrs) + + kv_client = KvClient("client", protocol_client, "") + + return cls(kv_client) diff --git a/client/kv.py b/client/kv.py new file mode 100644 index 0000000..d430fd9 --- /dev/null +++ b/client/kv.py @@ -0,0 +1,189 @@ +"""Kv Client""" + +import uuid +from typing import Optional, Literal +from client.protocol import ProtocolClient as CurpClient +from client.txn import Txn +from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange +from api.xline.rpc_pb2 import ( + RangeRequest, + RangeResponse as _RangeResponse, + PutRequest, + PutResponse as _PutResponse, + DeleteRangeRequest, + DeleteRangeResponse as _DeleteRangeResponse, + CompactionRequest, + CompactionResponse as _CompactionResponse, +) + +RangeResponse = _RangeResponse +PutResponse = _PutResponse +DeleteRangeResponse = _DeleteRangeResponse +CompactionResponse = _CompactionResponse + + +class KvClient: + """ + Client for KV operations. + + Attributes: + name: Name of the kv client, which will be used in CURP propose id generation. + curp_client: The client running the CURP protocol, communicate with all servers. + token: The auth token. + """ + + name: str + curp_client: CurpClient + token: Optional[str] + + def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None: + self.name = name + self.curp_client = curp_client + self.token = token + + async def range( + self, + key: bytes, + range_end: bytes | None = None, + limit: int | None = None, + revision: int | None = None, + sort_order: Literal["none", "ascend", "descend"] | None = None, + sort_target: Literal["key", "version", "create", "mod", "value"] | None = None, + serializable: bool = False, + keys_only: bool = False, + count_only: bool = False, + min_mod_revision: int | None = None, + max_mod_revision: int | None = None, + min_create_revision: int | None = None, + max_create_revision: int | None = None, + ) -> RangeResponse: + """ + Get a range of keys from the store. + """ + req = RangeRequest( + key=key, + range_end=range_end, + limit=limit, + revision=revision, + sort_order=sort_order, + sort_target=sort_target, + serializable=serializable, + keys_only=keys_only, + count_only=count_only, + min_mod_revision=min_mod_revision, + max_mod_revision=max_mod_revision, + min_create_revision=min_create_revision, + max_create_revision=max_create_revision, + ) + key_ranges = [KeyRange(key=key, range_end=range_end)] + propose_id = generate_propose_id(self.name) + request_with_token = RequestWithToken( + range_request=req, + token=self.token, + ) + cmd = Command( + keys=key_ranges, + request=request_with_token, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, True) + return er.range_response + + async def put( + self, + key: bytes, + value: bytes, + lease: int | None = None, + prev_kv: bool = False, + ignore_value: bool = False, + ignore_lease: bool = False, + ) -> PutResponse: + """ + Put a key-value into the store. + """ + req = PutRequest( + key=key, value=value, lease=lease, prev_kv=prev_kv, ignore_value=ignore_value, ignore_lease=ignore_lease + ) + key_ranges = [KeyRange(key=key, range_end=key)] + propose_id = generate_propose_id(self.name) + request_with_token = RequestWithToken( + put_request=req, + token=self.token, + ) + cmd = Command( + keys=key_ranges, + request=request_with_token, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, True) + return er.put_response + + async def delete(self, key: bytes, range_end: bytes | None = None, prev_kv: bool = False) -> DeleteRangeResponse: + """ + Delete a range of keys from the store. + """ + req = DeleteRangeRequest( + key=key, + range_end=range_end, + prev_kv=prev_kv, + ) + key_ranges = [KeyRange(key=key, range_end=range_end)] + propose_id = generate_propose_id(self.name) + + request_with_token = RequestWithToken( + delete_range_request=req, + token=self.token, + ) + cmd = Command( + keys=key_ranges, + request=request_with_token, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, True) + return er.delete_range_response + + def txn(self) -> Txn: + """ + Creates a transaction, which can provide serializable writes. + """ + + return Txn( + self.name, + self.curp_client, + self.token, + ) + + async def compact(self, revision: int, physical: bool = False) -> CompactionResponse: + """ + Compacts the key-value store up to a given revision. + """ + req = CompactionRequest( + revision=revision, + physical=physical, + ) + use_fast_path = physical + propose_id = generate_propose_id(self.name) + request_with_token = RequestWithToken( + compaction_request=req, + token=self.token, + ) + cmd = Command( + request=request_with_token, + propose_id=propose_id, + ) + + if use_fast_path: + er, asr = await self.curp_client.propose(cmd, True) + return er.compaction_response + else: + er, asr = await self.curp_client.propose(cmd, False) + if asr is None: + msg = "sync_res is always Some when use_fast_path is false" + raise Exception(msg) + return er.compaction_response + + +def generate_propose_id(prefix: str) -> str: + """Generate propose id with the given prefix""" + propose_id = f"{prefix}-{uuid.uuid4()}" + return propose_id diff --git a/client/protocol.py b/client/protocol.py index f431eb5..06bc4b8 100644 --- a/client/protocol.py +++ b/client/protocol.py @@ -87,7 +87,7 @@ async def fast_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse | """ for futures in asyncio.as_completed([self.fast_round(cmd), self.slow_round(cmd)]): first, second = await futures - if isinstance(first, CommandResponse) and isinstance(second, bool): + if isinstance(first, CommandResponse) and second: return (first, None) if isinstance(second, CommandResponse) and isinstance(first, SyncResponse): return (second, first) @@ -137,7 +137,7 @@ async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]: exe_err.inner.ParseFromString(cmd_result.error) raise exe_err elif res.HasField("error"): - raise res.error + logging.info(res.error) else: ok_cnt += 1 diff --git a/client/txn.py b/client/txn.py new file mode 100644 index 0000000..28bd30d --- /dev/null +++ b/client/txn.py @@ -0,0 +1,88 @@ +"Transaction" + +import uuid +from typing import List, Optional +from client.client import ProtocolClient as CurpClient +from api.xline.xline_command_pb2 import Command, RequestWithToken, KeyRange +from api.xline.rpc_pb2 import ( + RangeRequest as _RangeRequest, + PutRequest as _PutRequest, + DeleteRangeRequest as _DeleteRangeRequest, + TxnRequest as _TxnRequest, + Compare as _Compare, + RequestOp as _RequestOp, + TxnResponse as _TxnResponse, +) + +RangeRequest = _RangeRequest +PutRequest = _PutRequest +DeleteRangeRequest = _DeleteRangeRequest +TxnRequest = _TxnRequest +Compare = _Compare +RequestOp = _RequestOp +TxnResponse = _TxnResponse + + +class Txn: + """ + Transaction. + + Attributes: + name: Name of the Transaction, which will be used in CURP propose id generation. + curp_client: The client running the CURP protocol, communicate with all servers. + token: The auth token. + """ + + name: str + curp_client: CurpClient + token: Optional[str] + + cmps: List[Compare] + sus: List[RequestOp] + fas: List[RequestOp] + + def __init__(self, name: str, curp_client: CurpClient, token: Optional[str]) -> None: + self.name = name + self.curp_client = curp_client + self.token = token + + def when(self, cmps: List[Compare]): + "compare" + self.cmps = cmps + return self + + def and_then(self, op: List[RequestOp]): + "true" + self.sus = op + return self + + def or_else(self, op: List[RequestOp]): + "false" + self.fas = op + return self + + async def commit(self) -> TxnResponse: + "commit" + # TODO: https://github.com/xline-kv/Xline/issues/470 + krs = [] + for cmp in self.cmps: + krs.append(KeyRange(key=cmp.key, range_end=cmp.range_end)) + propose_id = self.generate_propose_id(self.name) + r = TxnRequest(compare=self.cmps, success=self.sus, failure=self.fas) + req = RequestWithToken( + txn_request=r, + token=self.token, + ) + cmd = Command( + keys=krs, + request=req, + propose_id=propose_id, + ) + er, _ = await self.curp_client.propose(cmd, False) + return er.txn_response + + @staticmethod + def generate_propose_id(prefix: str) -> str: + """Generate propose id with the given prefix""" + propose_id = f"{prefix}-{uuid.uuid4()}" + return propose_id diff --git a/tests/kv_test.py b/tests/kv_test.py new file mode 100644 index 0000000..d61ccd6 --- /dev/null +++ b/tests/kv_test.py @@ -0,0 +1,193 @@ +"""Tests for the kv client""" + +import pytest +from client import client, kv, txn + + +@pytest.mark.asyncio +async def test_put_should_success_in_normal_path(): + """ + test put should success in normal path + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + kv_client = cli.kv_client + + await kv_client.put(b"put", b"123") + + # overwrite with prev key + res = await kv_client.put(b"put", b"456", prev_kv=True) + prev_kv = res.prev_kv + assert prev_kv is not None + assert prev_kv.key == b"put" + assert prev_kv.value == b"123" + + # overwrite again with prev key + res = await kv_client.put(b"put", b"456", prev_kv=True) + prev_kv = res.prev_kv + assert prev_kv is not None + assert prev_kv.key == b"put" + assert prev_kv.value == b"456" + + +@pytest.mark.asyncio +async def test_range_should_fetches_previously_put_keys(): + """ + test_range_should_fetches_previously_put_keys + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + kv_client = cli.kv_client + + await kv_client.put(b"get10", b"10") + await kv_client.put(b"get11", b"11") + await kv_client.put(b"get20", b"20") + await kv_client.put(b"get21", b"21") + + # get key + res = await kv_client.range(b"get11") + assert res.count == 1 + assert not res.more + assert len(res.kvs) == 1 + assert res.kvs[0].key == b"get11" + assert res.kvs[0].value == b"11" + + # get from key + res = await kv_client.range(b"get11", range_end=b"\0", limit=2) + assert res.more + assert len(res.kvs) == 2 + assert res.kvs[0].key == b"get11" + assert res.kvs[0].value == b"11" + assert res.kvs[1].key == b"get20" + assert res.kvs[1].value == b"20" + + +@pytest.mark.asyncio +async def test_delete_should_remove_previously_put_kvs(): + """ + test_delete_should_remove_previously_put_kvs + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + kv_client = cli.kv_client + + await kv_client.put(b"del10", b"10") + await kv_client.put(b"del11", b"11") + await kv_client.put(b"del20", b"20") + await kv_client.put(b"del21", b"21") + await kv_client.put(b"del31", b"31") + await kv_client.put(b"del32", b"32") + + # delete key + del_res = await kv_client.delete(b"del11", prev_kv=True) + assert del_res.deleted == 1 + assert del_res.prev_kvs[0].key == b"del11" + assert del_res.prev_kvs[0].value == b"11" + + range_res = await kv_client.range(b"del11", count_only=True) + assert range_res.count == 0 + + # delete a range of keys + del_res = await kv_client.delete(b"del11", b"del22", prev_kv=True) + assert del_res.deleted == 2 + assert del_res.prev_kvs[0].key == b"del20" + assert del_res.prev_kvs[0].value == b"20" + assert del_res.prev_kvs[1].key == b"del21" + assert del_res.prev_kvs[1].value == b"21" + + range_res = await kv_client.range(b"del11", range_end=b"del22", count_only=True) + assert range_res.count == 0 + + +@pytest.mark.asyncio +async def test_compact_should_remove_previous_revision(): + """ + test_compact_should_remove_previous_revision + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + kv_client = cli.kv_client + + await kv_client.put(b"compact", b"0") + put_res = await kv_client.put(b"compact", b"1") + rev = put_res.header.revision + + # before compacting + rev0_res = await kv_client.range(b"compact", revision=rev - 1) + assert rev0_res.kvs[0].value == b"0" + + rev1_res = await kv_client.range(b"compact", revision=rev) + assert rev1_res.kvs[0].value == b"1" + + await kv_client.compact(rev) + + # after compacting + try: + await kv_client.range(b"compact", revision=rev - 1) + except BaseException as e: + assert e is not None + + range_res = await kv_client.range(b"compact", revision=rev) + assert range_res.kvs[0].value == b"1" + + +@pytest.mark.asyncio +async def test_txn_should_execute_as_expected(): + """ + test_txn_should_execute_as_expected + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + cli = await client.Client.connect(curp_members) + kv_client = cli.kv_client + + await kv_client.put(b"txn01", b"01") + + # transaction 1 + txn_res = await ( + kv_client.txn() + .when( + [ + txn.Compare( + key=b"txn01", + value=b"01", + target=txn.Compare.CompareTarget.VALUE, + result=txn.Compare.CompareResult.EQUAL, + ) + ] + ) + .and_then([txn.RequestOp(request_put=txn.PutRequest(key=b"txn01", value=b"02", prev_kv=True))]) + .or_else([txn.RequestOp(request_range=txn.RangeRequest(key=b"txn01"))]) + .commit() + ) + + assert txn_res.succeeded + op_res = txn_res.responses + assert len(op_res) == 1 + assert isinstance(op_res[0].response_put, kv.PutResponse) + put_res = op_res[0].response_put + assert put_res.prev_kv.value == b"01" + + # transaction 2 + txn_res = await ( + kv_client.txn() + .when( + [ + txn.Compare( + key=b"txn01", + value=b"01", + target=txn.Compare.CompareTarget.VALUE, + result=txn.Compare.CompareResult.EQUAL, + ) + ] + ) + .and_then([txn.RequestOp(request_put=txn.PutRequest(key=b"txn01", value=b"02"))]) + .or_else([txn.RequestOp(request_range=txn.RangeRequest(key=b"txn01"))]) + .commit() + ) + + assert not txn_res.succeeded + op_res = txn_res.responses + assert len(op_res) == 1 + assert isinstance(op_res[0].response_range, kv.RangeResponse) + range_res = op_res[0].response_range + assert range_res.kvs[0].value == b"02"