diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ca2e92..8597b26 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,57 @@ on: branches: [main] jobs: + lint: + name: Check Lint & Format + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Install Pip + run: sudo apt install pip + + - name: Install Hatch + run: pip install hatch + + - name: Run lint & fmt + run: hatch run lint:all ./client + + test: + name: Test Validation + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Pip + run: sudo apt install pip + + - name: Generate API + uses: ./.github/workflows/protobuf + + - name: Install Hatch + run: pip install hatch + + - name: Start the cluster + run: ./scripts/quick_start.sh + + - name: Run test + run: hatch run cov + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml + fail_ci_if_error: true + verbose: true + commit: name: Commit Message Validation runs-on: ubuntu-latest diff --git a/.github/workflows/protobuf/action.yaml b/.github/workflows/protobuf/action.yaml new file mode 100644 index 0000000..f624b22 --- /dev/null +++ b/.github/workflows/protobuf/action.yaml @@ -0,0 +1,19 @@ +name: Generate API + +runs: + using: "composite" + steps: + - name: Install gRPC & gRPC tools + run: python3 -m pip install grpcio grpcio-tools + shell: bash + + - name: Initialize Git Submodules + run: git submodule init + shell: bash + - name: Update Git Submodules + run: git submodule update + shell: bash + + - name: Generate api + run: make + shell: bash diff --git a/client/__about__.py b/client/__about__.py new file mode 100644 index 0000000..b950b91 --- /dev/null +++ b/client/__about__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present LingKa +# +# SPDX-License-Identifier: Apache 2.0 + +"""Xline clients""" + +__version__ = "0.0.1" diff --git a/client/error.py b/client/error.py new file mode 100644 index 0000000..c0ac471 --- /dev/null +++ b/client/error.py @@ -0,0 +1,52 @@ +""" +Client Errors +""" + +from api.curp.curp_error_pb2 import ( + ProposeError as _ProposeError, + CommandSyncError as _CommandSyncError, + WaitSyncError as _WaitSyncError, +) +from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError + + +class ResDecodeError(Exception): + """Response decode error""" + + pass + + +class ProposeError(BaseException): + """Propose error""" + + inner: _ProposeError + + def __init__(self, err: _ProposeError) -> None: + self.inner = err + + +class CommandSyncError(BaseException): + """Command sync error""" + + inner: _CommandSyncError + + def __init__(self, err: _CommandSyncError) -> None: + self.inner = err + + +class WaitSyncError(BaseException): + """Wait sync error""" + + inner: _WaitSyncError + + def __init__(self, err: _WaitSyncError) -> None: + self.inner = err + + +class ExecuteError(BaseException): + """Execute error""" + + inner: _ExecuteError + + def __init__(self, err: _ExecuteError) -> None: + self.inner = err diff --git a/client/protocol.py b/client/protocol.py new file mode 100644 index 0000000..f431eb5 --- /dev/null +++ b/client/protocol.py @@ -0,0 +1,195 @@ +""" +Protocol Client +""" + +from __future__ import annotations +import asyncio +import logging +import grpc + +from api.curp.message_pb2_grpc import ProtocolStub +from api.curp.message_pb2 import FetchClusterRequest, FetchClusterResponse +from api.curp.curp_command_pb2 import ProposeRequest, WaitSyncedRequest +from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse +from client.error import ResDecodeError, CommandSyncError, WaitSyncError, ExecuteError +from api.curp.curp_error_pb2 import ( + CommandSyncError as _CommandSyncError, + WaitSyncError as _WaitSyncError, +) +from api.xline.xline_error_pb2 import ExecuteError as _ExecuteError + + +class ProtocolClient: + """ + Protocol client + + Attributes: + leader_id: cluster `leader id` + connects: `all servers's `Connect` + """ + + leader_id: int + connects: dict[int, grpc.Channel] + + def __init__( + self, + leader_id: int, + connects: dict[int, grpc.Channel], + ) -> None: + self.leader_id = leader_id + self.connects = connects + + @classmethod + async def build_from_addrs(cls, addrs: list[str]) -> ProtocolClient: + """ + Build client from addresses, this method will fetch all members from servers + """ + cluster = await cls.fast_fetch_cluster(addrs) + + connects = {} + for member in cluster.members: + channel = grpc.aio.insecure_channel(member.name) + connects[member.id] = channel + + return cls( + cluster.leader_id, + connects, + ) + + @staticmethod + async def fast_fetch_cluster(addrs: list[str]) -> FetchClusterResponse: + """ + Fetch cluster from server, return the first `FetchClusterResponse + """ + futures = [] + for addr in addrs: + channel = grpc.aio.insecure_channel(addr) + stub = ProtocolStub(channel) + futures.append(stub.FetchCluster(FetchClusterRequest())) + for t in asyncio.as_completed(futures): + return await t + + msg = "fetch cluster error" + raise Exception(msg) + + async def propose(self, cmd: Command, use_fast_path: bool = False) -> tuple[CommandResponse, SyncResponse | None]: + """ + Propose the request to servers, if use_fast_path is false, it will wait for the synced index + """ + if use_fast_path: + return await self.fast_path(cmd) + else: + return await self.slow_path(cmd) + + async def fast_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse | None]: + """ + Fast path of propose + """ + for futures in asyncio.as_completed([self.fast_round(cmd), self.slow_round(cmd)]): + first, second = await futures + if isinstance(first, CommandResponse) and isinstance(second, bool): + return (first, None) + if isinstance(second, CommandResponse) and isinstance(first, SyncResponse): + return (second, first) + + msg = "fast path error" + raise Exception(msg) + + async def slow_path(self, cmd: Command) -> tuple[CommandResponse, SyncResponse]: + """ + Slow path of propose + """ + results = await asyncio.gather(self.fast_round(cmd), self.slow_round(cmd)) + for result in results: + if isinstance(result[0], SyncResponse) and isinstance(result[1], CommandResponse): + return (result[1], result[0]) + + msg = "slow path error" + raise Exception(msg) + + async def fast_round(self, cmd: Command) -> tuple[CommandResponse | None, bool]: + """ + The fast round of Curp protocol + It broadcast the requests to all the curp servers. + """ + logging.info("fast round start. propose id: %s", cmd.propose_id) + + ok_cnt = 0 + is_received_leader_res = False + cmd_res = CommandResponse() + exe_err = ExecuteError(_ExecuteError()) + + futures = [] + for server_id in self.connects: + stub = ProtocolStub(self.connects[server_id]) + futures.append(stub.Propose(ProposeRequest(command=cmd.SerializeToString()))) + + for future in asyncio.as_completed(futures): + res = await future + + if res.HasField("result"): + cmd_result = res.result + ok_cnt += 1 + is_received_leader_res = True + if cmd_result.HasField("er"): + cmd_res.ParseFromString(cmd_result.er) + if cmd_result.HasField("error"): + exe_err.inner.ParseFromString(cmd_result.error) + raise exe_err + elif res.HasField("error"): + raise res.error + else: + ok_cnt += 1 + + if is_received_leader_res and ok_cnt >= self.super_quorum(len(self.connects)): + logging.info("fast round succeed. propose id: %s", cmd.propose_id) + return (cmd_res, True) + + logging.info("fast round failed. propose id: %s", cmd.propose_id) + return (cmd_res, False) + + async def slow_round(self, cmd: Command) -> tuple[SyncResponse, CommandResponse]: + """ + The slow round of Curp protocol + """ + logging.info("slow round start. propose id: %s", cmd.propose_id) + + sync_res = SyncResponse() + cmd_res = CommandResponse() + exe_err = CommandSyncError(_CommandSyncError()) + after_sync_err = WaitSyncError(_WaitSyncError()) + + channel = self.connects[self.leader_id] + stub = ProtocolStub(channel) + res = await stub.WaitSynced(WaitSyncedRequest(propose_id=cmd.propose_id)) + + if res.HasField("success"): + success = res.success + sync_res.ParseFromString(success.after_sync_result) + cmd_res.ParseFromString(success.exe_result) + logging.info("slow round succeed. propose id: %s", cmd.propose_id) + return (sync_res, cmd_res) + if res.HasField("error"): + cmd_sync_err = res.error + if cmd_sync_err.HasField("execute"): + exe_err.inner.ParseFromString(cmd_sync_err.execute) + raise exe_err + if cmd_sync_err.HasField("after_sync"): + after_sync_err.inner.ParseFromString(cmd_sync_err.after_sync) + raise after_sync_err + + err_msg = "Response decode error" + raise ResDecodeError(err_msg) + + @staticmethod + def super_quorum(nodes: int) -> int: + """ + Get the superquorum for curp protocol + Although curp can proceed with f + 1 available replicas, it needs f + 1 + (f + 1)/2 replicas + (for superquorum of witnesses) to use 1 RTT operations. With less than superquorum replicas, + clients must ask masters to commit operations in f + 1 replicas before returning result.(2 RTTs). + """ + fault_tolerance = nodes // 2 + quorum = fault_tolerance + 1 + superquorum = fault_tolerance + (quorum // 2) + 1 + return superquorum diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..6425201 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,5 @@ +# Global options: + +[mypy] +ignore_missing_imports = True +disable_error_code = var-annotated diff --git a/pyproject.toml b/pyproject.toml index 7119a2a..c5035e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ dynamic = ["version"] description = 'py-xline is an official xline client sdk, written in Python.' readme = "README.md" requires-python = ">=3.7" -license = "Apache 2.0" +license = "Apache-2.0" keywords = [] authors = [ { name = "LingKa", email = "cnfty786@gmail.com" }, @@ -24,7 +24,11 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] -dependencies = [] +dependencies = [ + "grpcio", + "grpcio-tools", + "pytest-asyncio", +] [project.urls] Documentation = "https://github.com/xline-kv/py-xline#readme" @@ -32,7 +36,7 @@ Issues = "https://github.com/xline-kv/py-xline/issues" Source = "https://github.com/xline-kv/py-xline" [tool.hatch.version] -path = "src/py_xline/__about__.py" +path = "client/__about__.py" [tool.hatch.envs.default] dependencies = [ @@ -63,7 +67,7 @@ dependencies = [ "ruff>=0.0.243", ] [tool.hatch.envs.lint.scripts] -typing = "mypy --install-types --non-interactive {args:src/py_xline tests}" +typing = "mypy --install-types --non-interactive {args:client tests}" style = [ "ruff {args:.}", "black --check --diff {args:.}", @@ -117,11 +121,15 @@ ignore = [ # Allow non-abstract empty methods in abstract base classes "B027", # Allow boolean positional values in function calls, like `dict.get(... True)` - "FBT003", + "FBT001", "FBT002", "FBT003", # Ignore checks for possible passwords "S105", "S106", "S107", # Ignore complexity "C901", "PLR0911", "PLR0912", "PLR0913", "PLR0915", + # Ignore import sort + "I001", + # Allow verable shadow + "A003", ] unfixable = [ # Don't touch unused imports @@ -139,16 +147,16 @@ ban-relative-imports = "all" "tests/**/*" = ["PLR2004", "S101", "TID252"] [tool.coverage.run] -source_pkgs = ["py_xline", "tests"] +source_pkgs = ["client", "tests"] branch = true parallel = true omit = [ - "src/py_xline/__about__.py", + "client/__about__.py", ] [tool.coverage.paths] -py_xline = ["src/py_xline", "*/py-xline/src/py_xline"] -tests = ["tests", "*/py-xline/tests"] +py_xline = ["client"] +tests = ["tests"] [tool.coverage.report] exclude_lines = [ diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh index f522f54..57adf88 100755 --- a/scripts/quick_start.sh +++ b/scripts/quick_start.sh @@ -55,7 +55,7 @@ stop_all() { run_container() { echo container starting size=${1} - image="ghcr.io/xline-kv/xline:latest" + image="ghcr.io/xline-kv/xline:b573f16" for ((i = 1; i <= ${size}; i++)); do docker run -d -it --rm --name=node${i} --net=xline_net --ip=${SERVERS[$i]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt ${image} bash & done diff --git a/tests/__init__.py b/tests/__init__.py index 91a12bb..69fc850 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,9 @@ # SPDX-FileCopyrightText: 2023-present LingKa # # SPDX-License-Identifier: Apache 2.0 + +import sys + +sys.path.append("./api/curp") + +sys.path.append("./api/xline") diff --git a/tests/protocol_test.py b/tests/protocol_test.py new file mode 100755 index 0000000..ae85253 --- /dev/null +++ b/tests/protocol_test.py @@ -0,0 +1,51 @@ +"""Tests for the protocol client.""" + +import uuid +import pytest + +from api.xline.xline_command_pb2 import Command, RequestWithToken +from api.xline.rpc_pb2 import PutRequest +from client.protocol import ProtocolClient + + +@pytest.mark.asyncio +async def test_propose_fast_path(): + """ + test propose fast path + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + client = await ProtocolClient.build_from_addrs(curp_members) + cmd = Command( + request=RequestWithToken( + put_request=PutRequest( + key=b"hello", + value=b"py-xline", + ) + ), + propose_id=f"client-{uuid.uuid4()}", + ) + + er, _ = await client.propose(cmd, True) + assert er.put_response is not None + + +@pytest.mark.asyncio +async def test_propose_slow_path(): + """ + test propose slow path + """ + curp_members = ["172.20.0.3:2379", "172.20.0.4:2379", "172.20.0.5:2379"] + client = await ProtocolClient.build_from_addrs(curp_members) + cmd = Command( + request=RequestWithToken( + put_request=PutRequest( + key=b"hello1", + value=b"py-xline1", + ) + ), + propose_id=f"client-{uuid.uuid4()}", + ) + + er, asr = await client.propose(cmd, False) + assert asr is not None + assert er.put_response is not None