Skip to content

Commit

Permalink
feat: add protocol client
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Sep 30, 2023
1 parent aa7836f commit 0b55b35
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 0 deletions.
7 changes: 7 additions & 0 deletions client/__about__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2023-present LingKa <[email protected]>
#
# SPDX-License-Identifier: Apache 2.0

"""Xline clients"""

__version__ = "0.0.1"
211 changes: 211 additions & 0 deletions client/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
"""Protocol client"""

import asyncio
import logging
from typing import List, Optional, Tuple, Union

import grpc
from google.protobuf.internal.containers import RepeatedCompositeFieldContainer

from api.curp import curp_command_pb2, curp_error_pb2, message_pb2, message_pb2_grpc
from api.xline import xline_command_pb2, xline_error_pb2


class Client:
"""
Protocol client
Attributes:
local_server_id: local server id. Only use in an inner client.
state: state of a client
inner: inner protocol clients
connects: all servers's `Connect`
"""

local_server_id: int
inner: List[message_pb2_grpc.ProtocolStub]
connects: RepeatedCompositeFieldContainer[message_pb2.Member]

def __init__(
self,
leader_id: int,
stubs: List[message_pb2_grpc.ProtocolStub],
connects: RepeatedCompositeFieldContainer[message_pb2.Member],
) -> None:
self.local_server_id = leader_id
self.inner = stubs
self.connects = connects

def propose(
self, cmd: xline_command_pb2.Command, use_fast_path: bool = False
) -> Tuple[
Tuple[Optional[xline_command_pb2.CommandResponse], Optional[xline_command_pb2.SyncResponse]],
Union[
curp_error_pb2.ProposeError,
xline_error_pb2.ExecuteError,
xline_error_pb2.ExecuteError,
curp_error_pb2.WaitSyncError,
None,
],
]:
"""Propose the request to servers, if use_fast_path is false, it will wait for the synced index"""

if use_fast_path:
fast_round_res, fast_round_err = asyncio.run(self.fast_round(cmd))
slow_round_res, slow_round_err = asyncio.run(self.slow_round(cmd))
if fast_round_res[1]:
return ((fast_round_res[0], None), fast_round_err)
if not fast_round_res[1]:
return ((slow_round_res[1], slow_round_res[0]), slow_round_err)
else:
fast_round_res, _ = asyncio.run(self.fast_round(cmd))
slow_round_res, slow_round_err = asyncio.run(self.slow_round(cmd))
return ((slow_round_res[1], slow_round_res[0]), slow_round_err)

async def fast_round(
self, cmd: xline_command_pb2.Command
) -> Tuple[
Tuple[Optional[xline_command_pb2.CommandResponse], bool],
Optional[Union[xline_error_pb2.ExecuteError, curp_error_pb2.ProposeError]],
]:
"""
The fast round of Curp protocol
It broadcast the requests to all the curp servers.
"""

logging.info("fast round start. propose id: %s", cmd.propose_id)

ok_cnt = 0
is_received_leader_res = False
is_received_success_res = None
cmd_res = xline_command_pb2.CommandResponse()
exe_err = xline_error_pb2.ExecuteError()
propose_err = curp_error_pb2.ProposeError()

for stub in self.inner:
res = await propose_wrapper(stub, cmd)

if res.result is not None:
cmd_result = res.result
ok_cnt += 1
is_received_leader_res = True
if len(cmd_result.er) != 0:
is_received_success_res = True
cmd_res.ParseFromString(cmd_result.er)
if len(cmd_result.error) != 0:
is_received_success_res = False
exe_err.ParseFromString(cmd_result.error)
if res.error is not None:
is_received_leader_res = True
propose_err.CopyFrom(res.error)
else:
logging.warning("propose response prase fail")

if is_received_leader_res and ok_cnt >= super_quorum(len(self.connects)):
logging.info("fast round success")
if is_received_success_res:
return ((cmd_res, True), None)
if not is_received_success_res:
return ((None, False), exe_err)

return ((None, False), propose_err)

async def slow_round(
self, cmd: xline_command_pb2.Command
) -> Tuple[
Tuple[Optional[xline_command_pb2.SyncResponse], Optional[xline_command_pb2.CommandResponse]],
Optional[Union[xline_error_pb2.ExecuteError, curp_error_pb2.WaitSyncError]],
]:
"""The slow round of Curp protocol"""

logging.info("slow round start. propose id: %s", cmd.propose_id)

addr = ""
sync_res = xline_command_pb2.SyncResponse()
cmd_res = xline_command_pb2.CommandResponse()
exe_err = xline_error_pb2.ExecuteError()
after_sync_err = curp_error_pb2.WaitSyncError()

for member in self.connects:
if member.id == self.local_server_id:
addr = member.name
break

channel = grpc.insecure_channel(addr)
stub = message_pb2_grpc.ProtocolStub(channel)
res = await wait_synced_wrapper(stub, cmd)

if res.success:
success = res.success
sync_res.ParseFromString(success.after_sync_result)
cmd_res.ParseFromString(success.exe_result)
return ((sync_res, cmd_res), None)
if res.error:
cmd_sync_err = res.error
if len(cmd_sync_err.execute) != 0:
exe_err.ParseFromString(cmd_sync_err.execute)
return ((None, None), exe_err)
if len(cmd_sync_err.after_sync) != 0:
after_sync_err.ParseFromString(cmd_sync_err.after_sync)
return ((None, None), after_sync_err)

return ((None, None), None)


def build_from_addrs(addrs: List[str]) -> Client:
"""Build client from addresses, this method will fetch all members from servers"""

stubs: List[message_pb2_grpc.ProtocolStub] = []

for addr in addrs:
channel = grpc.insecure_channel(addr)
stub = message_pb2_grpc.ProtocolStub(channel)
stubs.append(stub)

cluster = fetch_cluster(stubs)

return Client(
leader_id=cluster.leader_id,
stubs=stubs,
connects=cluster.members,
)


def fetch_cluster(stubs: List[message_pb2_grpc.ProtocolStub]) -> message_pb2.FetchClusterResponse:
"""Fetch cluster from server"""
for stub in stubs:
res: message_pb2.FetchClusterResponse = stub.FetchCluster(message_pb2.FetchClusterRequest())
return res


def super_quorum(nodes: int) -> int:
"""
Get the superquorum for curp protocol
Although curp can proceed with f + 1 available replicas, it needs f + 1 + (f + 1)/2 replicas
(for superquorum of witnesses) to use 1 RTT operations. With less than superquorum replicas,
clients must ask masters to commit operations in f + 1 replicas before returning result.(2 RTTs).
"""
fault_tolerance = nodes // 2
quorum = fault_tolerance + 1
superquorum = fault_tolerance + (quorum // 2) + 1
return superquorum


async def propose_wrapper(
stub: message_pb2_grpc.ProtocolStub, req: xline_command_pb2.Command
) -> curp_command_pb2.ProposeResponse:
"""Wrapper of propose"""
res: curp_command_pb2.ProposeResponse = stub.Propose(
curp_command_pb2.ProposeRequest(command=req.SerializeToString())
)
return res


async def wait_synced_wrapper(
stub: message_pb2_grpc.ProtocolStub, req: xline_command_pb2.Command
) -> curp_command_pb2.WaitSyncedResponse:
"""Wrapper of wait sync"""
res: curp_command_pb2.WaitSyncedResponse = stub.WaitSynced(
curp_command_pb2.WaitSyncedRequest(propose_id=req.propose_id)
)
return res

0 comments on commit 0b55b35

Please sign in to comment.