diff --git a/client/__about__.py b/client/__about__.py new file mode 100644 index 0000000..b950b91 --- /dev/null +++ b/client/__about__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present LingKa +# +# SPDX-License-Identifier: Apache 2.0 + +"""Xline clients""" + +__version__ = "0.0.1" diff --git a/client/protocol.py b/client/protocol.py new file mode 100644 index 0000000..be7c6c4 --- /dev/null +++ b/client/protocol.py @@ -0,0 +1,211 @@ +"""Protocol client""" + +import asyncio +import logging +from typing import List, Optional, Tuple, Union + +import grpc +from google.protobuf.internal.containers import RepeatedCompositeFieldContainer + +from api.curp import curp_command_pb2, curp_error_pb2, message_pb2, message_pb2_grpc +from api.xline import xline_command_pb2, xline_error_pb2 + + +class Client: + """ + Protocol client + + Attributes: + local_server_id: local server id. Only use in an inner client. + state: state of a client + inner: inner protocol clients + connects: all servers's `Connect` + """ + + local_server_id: int + inner: List[message_pb2_grpc.ProtocolStub] + connects: RepeatedCompositeFieldContainer[message_pb2.Member] + + def __init__( + self, + leader_id: int, + stubs: List[message_pb2_grpc.ProtocolStub], + connects: RepeatedCompositeFieldContainer[message_pb2.Member], + ) -> None: + self.local_server_id = leader_id + self.inner = stubs + self.connects = connects + + def propose( + self, cmd: xline_command_pb2.Command, use_fast_path: bool = False + ) -> Tuple[ + Tuple[Optional[xline_command_pb2.CommandResponse], Optional[xline_command_pb2.SyncResponse]], + Union[ + curp_error_pb2.ProposeError, + xline_error_pb2.ExecuteError, + xline_error_pb2.ExecuteError, + curp_error_pb2.WaitSyncError, + None, + ], + ]: + """Propose the request to servers, if use_fast_path is false, it will wait for the synced index""" + + if use_fast_path: + fast_round_res, fast_round_err = asyncio.run(self.fast_round(cmd)) + slow_round_res, slow_round_err = asyncio.run(self.slow_round(cmd)) + if fast_round_res[1]: + return ((fast_round_res[0], None), fast_round_err) + if not fast_round_res[1]: + return ((slow_round_res[1], slow_round_res[0]), slow_round_err) + else: + fast_round_res, _ = asyncio.run(self.fast_round(cmd)) + slow_round_res, slow_round_err = asyncio.run(self.slow_round(cmd)) + return ((slow_round_res[1], slow_round_res[0]), slow_round_err) + + async def fast_round( + self, cmd: xline_command_pb2.Command + ) -> Tuple[ + Tuple[Optional[xline_command_pb2.CommandResponse], bool], + Optional[Union[xline_error_pb2.ExecuteError, curp_error_pb2.ProposeError]], + ]: + """ + The fast round of Curp protocol + It broadcast the requests to all the curp servers. + """ + + logging.info("fast round start. propose id: %s", cmd.propose_id) + + ok_cnt = 0 + is_received_leader_res = False + is_received_success_res = None + cmd_res = xline_command_pb2.CommandResponse() + exe_err = xline_error_pb2.ExecuteError() + propose_err = curp_error_pb2.ProposeError() + + for stub in self.inner: + res = await propose_wrapper(stub, cmd) + + if res.result is not None: + cmd_result = res.result + ok_cnt += 1 + is_received_leader_res = True + if len(cmd_result.er) != 0: + is_received_success_res = True + cmd_res.ParseFromString(cmd_result.er) + if len(cmd_result.error) != 0: + is_received_success_res = False + exe_err.ParseFromString(cmd_result.error) + if res.error is not None: + is_received_leader_res = True + propose_err.CopyFrom(res.error) + else: + logging.warning("propose response prase fail") + + if is_received_leader_res and ok_cnt >= super_quorum(len(self.connects)): + logging.info("fast round success") + if is_received_success_res: + return ((cmd_res, True), None) + if not is_received_success_res: + return ((None, False), exe_err) + + return ((None, False), propose_err) + + async def slow_round( + self, cmd: xline_command_pb2.Command + ) -> Tuple[ + Tuple[Optional[xline_command_pb2.SyncResponse], Optional[xline_command_pb2.CommandResponse]], + Optional[Union[xline_error_pb2.ExecuteError, curp_error_pb2.WaitSyncError]], + ]: + """The slow round of Curp protocol""" + + logging.info("slow round start. propose id: %s", cmd.propose_id) + + addr = "" + sync_res = xline_command_pb2.SyncResponse() + cmd_res = xline_command_pb2.CommandResponse() + exe_err = xline_error_pb2.ExecuteError() + after_sync_err = curp_error_pb2.WaitSyncError() + + for member in self.connects: + if member.id == self.local_server_id: + addr = member.name + break + + channel = grpc.insecure_channel(addr) + stub = message_pb2_grpc.ProtocolStub(channel) + res = await wait_synced_wrapper(stub, cmd) + + if res.success: + success = res.success + sync_res.ParseFromString(success.after_sync_result) + cmd_res.ParseFromString(success.exe_result) + return ((sync_res, cmd_res), None) + if res.error: + cmd_sync_err = res.error + if len(cmd_sync_err.execute) != 0: + exe_err.ParseFromString(cmd_sync_err.execute) + return ((None, None), exe_err) + if len(cmd_sync_err.after_sync) != 0: + after_sync_err.ParseFromString(cmd_sync_err.after_sync) + return ((None, None), after_sync_err) + + return ((None, None), None) + + +def build_from_addrs(addrs: List[str]) -> Client: + """Build client from addresses, this method will fetch all members from servers""" + + stubs: List[message_pb2_grpc.ProtocolStub] = [] + + for addr in addrs: + channel = grpc.insecure_channel(addr) + stub = message_pb2_grpc.ProtocolStub(channel) + stubs.append(stub) + + cluster = fetch_cluster(stubs) + + return Client( + leader_id=cluster.leader_id, + stubs=stubs, + connects=cluster.members, + ) + + +def fetch_cluster(stubs: List[message_pb2_grpc.ProtocolStub]) -> message_pb2.FetchClusterResponse: + """Fetch cluster from server""" + for stub in stubs: + res: message_pb2.FetchClusterResponse = stub.FetchCluster(message_pb2.FetchClusterRequest()) + return res + + +def super_quorum(nodes: int) -> int: + """ + Get the superquorum for curp protocol + Although curp can proceed with f + 1 available replicas, it needs f + 1 + (f + 1)/2 replicas + (for superquorum of witnesses) to use 1 RTT operations. With less than superquorum replicas, + clients must ask masters to commit operations in f + 1 replicas before returning result.(2 RTTs). + """ + fault_tolerance = nodes // 2 + quorum = fault_tolerance + 1 + superquorum = fault_tolerance + (quorum // 2) + 1 + return superquorum + + +async def propose_wrapper( + stub: message_pb2_grpc.ProtocolStub, req: xline_command_pb2.Command +) -> curp_command_pb2.ProposeResponse: + """Wrapper of propose""" + res: curp_command_pb2.ProposeResponse = stub.Propose( + curp_command_pb2.ProposeRequest(command=req.SerializeToString()) + ) + return res + + +async def wait_synced_wrapper( + stub: message_pb2_grpc.ProtocolStub, req: xline_command_pb2.Command +) -> curp_command_pb2.WaitSyncedResponse: + """Wrapper of wait sync""" + res: curp_command_pb2.WaitSyncedResponse = stub.WaitSynced( + curp_command_pb2.WaitSyncedRequest(propose_id=req.propose_id) + ) + return res