Skip to content

Commit

Permalink
Add support for limiting Raft quorum size
Browse files Browse the repository at this point in the history
Add a new configuration, `clustering.raft.max.quorum.size`, to allow
limiting the number of servers that participate in the Raft quorum.

Resolves #41.
  • Loading branch information
tylertreat committed Dec 30, 2020
1 parent 16997ba commit eb76613
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 87 deletions.
32 changes: 23 additions & 9 deletions documentation/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ Each partition has its own message log, leader, and set of followers. To reduce
resource consumption, partitions can be [paused](./pausing_streams.md). Paused
partitions are subsequently resumed once they are published to.

Message streams and partitions are sometimes referred to as the _data plane_.
This is in contrast to the _control plane_, which refers to the metadata
[controller](#controller).

### Write-Ahead Log

Each stream partition is backed by a durable write-ahead log. All reads and
Expand Down Expand Up @@ -219,6 +223,10 @@ Partitions in yellow indicate the server is the leader for the partition.

![cluster](assets/cluster.png)

Refer to [Configuring for
Scalability](./scalability_configuration.md#scaling-the-control-plane) for
details on scaling the control plane.

### In-Sync Replica Set (ISR)

The In-Sync Replica set (ISR) is a key aspect of the replication protocol in
Expand Down Expand Up @@ -347,19 +355,25 @@ the brokers must be running.

Controller is also referred to as "metadata leader" in some contexts. There is
only a single controller (i.e. leader) at a given time which is elected by the
Liftbridge cluster.
Liftbridge cluster. The concept of the metadata cluster is sometimes referred
to as the _control plane_. This is in contrast to the _data plane_, which
refers to actual message data, i.e. [streams and
partitions](#streams-and-partitions).

> **Architect's Note**
>
> Guidance on cluster size depends, but one important point here is that,
> currently, all servers in the cluster participate in the Raft consensus
> group. This has implications on the scalability of the cluster control plane,
> which there are [plans to address](https://github.com/liftbridge-io/liftbridge/issues/41)
> in the future.
> Guidance on cluster size is use-case specific, but it is recommended to run
> an odd number of servers in the cluster, e.g. 3 or 5, depending on scaling
> needs. Ideally, cluster members are run in different availability zones or
> racks for improved fault-tolerance.
>
> General advice is to run an odd number of servers in the cluster, e.g. 3 or
> 5, depending on scaling needs. Ideally, cluster members are run in different
> availability zones or racks for improved fault-tolerance.
> It is also important to note that, by default, all servers in the cluster
> participate in the Raft consensus group. This has implications on the
> scalability of the cluster control plane, which can be addressed by setting
> [`clustering.raft.max.quorum.size`](./configuration.md#clustering-configuration-settings)
> to limit the number of nodes that participate in the Raft group. See
> [Configuring for Scalability](./scalability_configuration.md#scaling-the-control-plane) for
> more information.
## Message Envelope

Expand Down
5 changes: 3 additions & 2 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ the configuration file.
| raft.snapshot.retain | | The number Raft log snapshots to retain on disk. | int | 2 | |
| raft.snapshot.threshold | | Controls how many outstanding logs there must be before taking a snapshot. This prevents excessive snapshots when a small set of logs can be replayed. | int | 8192 | |
| raft.cache.size | | The number of Raft logs to hold in memory for quick lookup. | int | 512 | |
| raft.bootstrap.seed | raft-bootstrap-seed | Bootstrap the Raft cluster by electing self as leader if there is no existing state. If this is enabled, `raft.bootstrap.peers` should generally not be used, either on this node or peer nodes, since cluster topology is not being explicitly defined. Instead, peers should be started without bootstrap flags which will cause them to automatically discover the bootstrapped leader and join the cluster. | bool | false | |
| raft.bootstrap.peers | raft-bootstrap-peers | Bootstrap the Raft cluster with the provided list of peer IDs if there is no existing state. This should generally not be used in combination with `raft.bootstrap.seed` since it is explicitly defining cluster topology and the configured topology will elect a leader. Note that once the cluster is established, new nodes can join without setting bootstrap flags since they will automatically discover the elected leader and join the cluster. | list | | |
| raft.bootstrap.seed | raft-bootstrap-seed | Bootstrap the Raft cluster by electing self as leader if there is no existing state. If this is enabled, `raft.bootstrap.peers` should generally not be used, either on this node or peer nodes, since cluster topology is not being explicitly defined. Instead, peers should be started without bootstrap flags which will cause them to automatically discover the bootstrapped leader and join the cluster. This is equivalent to setting `raft.bootstrap.peers` to be just this server, and it should only be enabled on one server in the cluster. | bool | false | |
| raft.bootstrap.peers | raft-bootstrap-peers | Bootstrap the Raft cluster with the provided list of peer IDs if there is no existing state. This should generally not be used in combination with `raft.bootstrap.seed` since it is explicitly defining cluster topology and the configured topology will elect a leader. Note that once the cluster is established, new nodes can join without setting bootstrap flags since they will automatically discover the elected leader and join the cluster. If `raft.bootstrap.peers` is set on multiple servers, it is recommended to set the full list of peers on each rather than a subset to avoid potential issues when setting `raft.max.quorum.size`. | list | | |
| raft.max.quorum.size | | The maximum number of servers to participate in the Raft quorum. Any servers added to the cluster beyond this number will participate as non-voters. Non-voter servers operate as normal but are not involved in the Raft election or commitment processes. Limiting this number allows the cluster to better scale since Raft requires a minimum of `N/2+1` nodes to perform operations. The should be set to the same value on all servers in the cluster. A value of 0 indicates no limit. | int | 0 | |
| replica.max.lag.time | | If a follower hasn't sent any replication requests or hasn't caught up to the leader's log end offset for at least this time, the leader will remove the follower from ISR. | duration | 15s | |
| replica.max.leader.timeout | | If a leader hasn't sent any replication responses for at least this time, the follower will report the leader to the controller. If a majority of the replicas report the leader, a new leader is selected by the controller. | duration | 15s | |
| replica.max.idle.wait | | The maximum amount of time a follower will wait before making a replication request once the follower is caught up with the leader. This value should always be less than `replica.max.lag.time` to avoid frequent shrinking of ISR for low-throughput streams. | duration | 10s | |
Expand Down
2 changes: 1 addition & 1 deletion documentation/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ which turns NATS into an opt-in implementation detail. Eventually, Liftbridge
can be used fully on its own without NATS by allowing clients to transition to
the Liftbridge API.

### Quorum Size Limit ([#41](https://github.com/liftbridge-io/liftbridge/issues/41))
### ~~Quorum Size Limit ([#41](https://github.com/liftbridge-io/liftbridge/issues/41))~~

Currently, all servers in a cluster participate in the metadata Raft quorum.
This severely limits scalability of the cluster. Allow having a subset of
Expand Down
60 changes: 60 additions & 0 deletions documentation/scalability_configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
---
id: scalability-configuration
title: Configuring for Scalability
---

Liftbridge is designed to be clustered and horizontally scalable, meaning nodes
can be added to handle additional load. There are two dimensions to this
scalability, the control plane and the data plane. The _control plane_ refers
to the [metadata controller](./concepts.md#controller) which performs
operations and coordination responsibilies for the cluster. This is implemented
using the [Raft consensus algorithm](https://raft.github.io). The _data plane_
refers to the processes around actual messages and
[streams](./concepts.md#streams-and-partitions). It is important to understand
how the scalability of these two concerns interrelate.

## Scaling the Data Plane

There are a few different ways streams can be scaled in Liftbridge. These
different approaches are discussed [here](./concepts.md#scalability). To
summarize, both stream data and consumption can be scaled horizontally by
adding additional nodes to the cluster along with additional streams or stream
partitioning. However, adding nodes to the cluster has implications with the
metadata Raft cluster used by the control plane. This is discussed below.

## Scaling the Control Plane

By default, new servers that are added to a Liftbridge cluster participate in
the Raft consensus group used by the control plane. These are referred to as
_voters_. This means they are involved in elections for the metadata leader and
committing entries to the Raft log. However, because Raft requires a minimum of
`N/2+1` nodes to perform operations, this can severely limit the scalability of
the control plane. For example, in a 100-node cluster, 51 nodes have to respond
to commit an operation. Additionally, the Raft protocol requires exchanging
`N^2` messages to arrive at consensus for a given operation.

To address this, Liftbridge has a setting to limit the number of voters who
participate in the Raft group. The [`clustering.raft.max.quorum.size`](./configuration.md#clustering-configuration-settings)
setting restricts the number of servers who participate in the Raft quorum. Any
servers added to the cluster beyond this number participate as _non-voters_.
Non-voter servers operate as normal but are not involved in the Raft election
or commitment processes. By default, `clustering.raft.max.quorum.size` is set
to `0`, which means there is no limit. Limiting this number allows the control
plane to better scale as nodes are added. This is typically not a concern for
smaller clusters, such as single-digit or low-double-digit clusters but can be
an issue for clusters beyond these sizes. This configuration should be set to
the same value on all servers in the cluster.

The configuration example below shows how to limit the Raft quorum to five
servers.

```yaml
clustering:
raft.max.quorum.size: 5
```
Guidance on cluster and quorum size is use-case specific, but it is recommended
to specify an odd number for `clustering.raft.max.quorum.size` (or to run an
odd number of servers if not limiting quorum size), e.g. 3 or 5, depending on
scaling needs. Ideally, cluster members are run in different availability zones
or racks for improved fault-tolerance.
7 changes: 7 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const (
configClusteringRaftCacheSize = "clustering.raft.cache.size"
configClusteringRaftBootstrapSeed = "clustering.raft.bootstrap.seed"
configClusteringRaftBootstrapPeers = "clustering.raft.bootstrap.peers"
configClusteringRaftMaxQuorumSize = "clustering.raft.max.quorum.size"
configClusteringReplicaMaxLagTime = "clustering.replica.max.lag.time"
configClusteringReplicaMaxLeaderTimeout = "clustering.replica.max.leader.timeout"
configClusteringReplicaMaxIdleWait = "clustering.replica.max.idle.wait"
Expand Down Expand Up @@ -158,6 +159,7 @@ var configKeys = map[string]struct{}{
configClusteringRaftCacheSize: {},
configClusteringRaftBootstrapSeed: {},
configClusteringRaftBootstrapPeers: {},
configClusteringRaftMaxQuorumSize: {},
configClusteringReplicaMaxLagTime: {},
configClusteringReplicaMaxLeaderTimeout: {},
configClusteringReplicaMaxIdleWait: {},
Expand Down Expand Up @@ -284,6 +286,7 @@ type ClusteringConfig struct {
RaftCacheSize int
RaftBootstrapSeed bool
RaftBootstrapPeers []string
RaftMaxQuorumSize uint
ReplicaMaxLagTime time.Duration
ReplicaMaxLeaderTimeout time.Duration
ReplicaFetchTimeout time.Duration
Expand Down Expand Up @@ -700,6 +703,10 @@ func parseClusteringConfig(config *Config, v *viper.Viper) error { // nolint: go
config.Clustering.RaftBootstrapPeers = v.GetStringSlice(configClusteringRaftBootstrapPeers)
}

if v.IsSet(configClusteringRaftMaxQuorumSize) {
config.Clustering.RaftMaxQuorumSize = v.GetUint(configClusteringRaftMaxQuorumSize)
}

if v.IsSet(configClusteringReplicaMaxLagTime) {
config.Clustering.ReplicaMaxLagTime = v.GetDuration(configClusteringReplicaMaxLagTime)
}
Expand Down
121 changes: 99 additions & 22 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"path/filepath"
"sort"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -276,6 +277,10 @@ func (s *Server) bootstrapCluster(node *raft.Raft) error {
// Bootstrap using provided cluster configuration.
s.logger.Debug("Bootstrapping metadata Raft group using provided configuration")
for _, peer := range s.config.Clustering.RaftBootstrapPeers {
if peer == s.config.Clustering.ServerID {
// Don't add ourselves twice.
continue
}
servers = append(servers, raft.Server{
ID: raft.ServerID(peer),
Address: raft.ServerAddress(peer), // NATS transport uses ID as addr.
Expand All @@ -285,6 +290,21 @@ func (s *Server) bootstrapCluster(node *raft.Raft) error {
// Bootstrap as a seed node.
s.logger.Debug("Bootstrapping metadata Raft group as seed node")
}

// Enforce quorum size limit. Any servers beyond the limit are non-voters.
maxQuorum := s.config.Clustering.RaftMaxQuorumSize
if maxQuorum == 0 || maxQuorum > uint(len(servers)) {
maxQuorum = uint(len(servers))
}
sort.SliceStable(servers, func(i, j int) bool { return servers[i].ID < servers[j].ID })
for i, server := range servers[maxQuorum:] {
servers[i+int(maxQuorum)] = raft.Server{
ID: server.ID,
Address: server.Address,
Suffrage: raft.Nonvoter,
}
}

config := raft.Configuration{Servers: servers}
return node.BootstrapCluster(config).Error()
}
Expand Down Expand Up @@ -395,7 +415,30 @@ func (s *Server) createRaftNode() (bool, error) {

// Handle requests to join the cluster.
subj := fmt.Sprintf("%s.join", s.baseMetadataRaftSubject())
sub, err := s.ncRaft.Subscribe(subj, func(msg *nats.Msg) {
sub, err := s.ncRaft.Subscribe(subj, s.newClusterJoinRequestHandler(node))
if err != nil {
node.Shutdown()
tr.Close()
logStore.Close()
return false, err
}

s.setRaft(&raftNode{
Raft: node,
store: logStore,
transport: tr,
logInput: logWriter,
notifyCh: raftNotifyCh,
joinSub: sub,
})

return existingState, nil
}

// newClusterJoinRequestHandler creates a NATS handler for handling requests
// to join the Raft cluster.
func (s *Server) newClusterJoinRequestHandler(node *raft.Raft) func(*nats.Msg) {
return func(msg *nats.Msg) {
// Drop the request if we're not the leader. There's no race condition
// after this check because even if we proceed with the cluster add, it
// will fail if the node is not the leader as cluster changes go
Expand All @@ -409,42 +452,76 @@ func (s *Server) createRaftNode() (bool, error) {
return
}

// Add the node as a voter. This is idempotent. No-op if the request
// came from ourselves.
resp := &proto.RaftJoinResponse{}
if req.NodeID != s.config.Clustering.ServerID {
future := node.AddVoter(
raft.ServerID(req.NodeID),
raft.ServerAddress(req.NodeAddr), 0, 0)

// No-op if the request came from ourselves.
if req.NodeID == s.config.Clustering.ServerID {
r, err := proto.MarshalRaftJoinResponse(resp)
if err != nil {
panic(err)
}
msg.Respond(r)
return
}

// Add the node to the cluster with appropriate suffrage. This is
// idempotent.
isVoter, err := s.addAsVoter(node)
if err != nil {
resp.Error = err.Error()
} else {
var future raft.IndexFuture
if isVoter {
s.logger.Debugf("Adding server %s to metadata Raft group as voter", req.NodeID)
future = node.AddVoter(
raft.ServerID(req.NodeID),
raft.ServerAddress(req.NodeAddr), 0, 0)
} else {
s.logger.Debugf("Adding server %s to metadata Raft group as non-voter", req.NodeID)
future = node.AddNonvoter(
raft.ServerID(req.NodeID),
raft.ServerAddress(req.NodeAddr), 0, 0)
}
if err := future.Error(); err != nil {
resp.Error = err.Error()
}
}

if resp.Error != "" {
s.logger.Errorf("Failed to add server %s to metadata Raft group: %s", req.NodeID, resp.Error)
}

// Send the response.
r, err := proto.MarshalRaftJoinResponse(resp)
if err != nil {
panic(err)
}
msg.Respond(r)
})
if err != nil {
node.Shutdown()
tr.Close()
logStore.Close()
return false, err
}
}

s.setRaft(&raftNode{
Raft: node,
store: logStore,
transport: tr,
logInput: logWriter,
notifyCh: raftNotifyCh,
joinSub: sub,
})
// addAsVoter returns a bool indicating if a new node to be added to the
// cluster should be added as a voter or not based on current configuration. If
// we are below the max quorum size or there is no quorum limit, the new node
// will be added as a voter. Otherwise, it's added as a non-voter.
func (s *Server) addAsVoter(node *raft.Raft) (bool, error) {
maxQuorum := s.config.Clustering.RaftMaxQuorumSize
if maxQuorum == 0 {
return true, nil
}
// If there is a quorum limit, count the number of voting members.
future := node.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
voters := uint(0)
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter || server.Suffrage == raft.Staging {
voters++
}
}

return existingState, nil
return voters < maxQuorum, nil
}

// baseMetadataRaftSubject returns the base NATS subject used for Raft-related
Expand Down
11 changes: 0 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,6 @@ func (s *Server) Start() (err error) {

rand.Seed(time.Now().UnixNano())

// Remove server's ID from the cluster peers list if present.
if len(s.config.Clustering.RaftBootstrapPeers) > 0 {
peers := make([]string, 0, len(s.config.Clustering.RaftBootstrapPeers))
for _, peer := range s.config.Clustering.RaftBootstrapPeers {
if peer != s.config.Clustering.ServerID {
peers = append(peers, peer)
}
}
s.config.Clustering.RaftBootstrapPeers = peers
}

// Create the data directory if it doesn't exist.
if err := os.MkdirAll(s.config.DataDir, os.ModePerm); err != nil {
return errors.Wrap(err, "failed to create data path directories")
Expand Down
Loading

0 comments on commit eb76613

Please sign in to comment.