Skip to content

RAFT PART 3: Client KV Store, Persistence

Rakshita Varadarajan edited this page Mar 25, 2021 · 1 revision

CLIENT KV Store

The Raft Consensus Algorithm ensures that the multiple state machines function in a way such that they all take the same set of inputs from client requests and after performing the same state transitions, return the same set of results to the clients, even if some of the replicas fail. The algorithm has been implemented in this project with the deterministic state machine being replicated across multiple servers as the Key-Value Store. All state machines are considered as the replicas of this Key-Value store service.

The Key-Value store in this project is a Hashmap that uses Linked Lists for handling collisions. The structure for this ‘store’ is:


type store struct {
  db       [length]*Linkedlist
  mu       sync.RWMutex
  filename string
  db_temp  map[string]string
}

The ‘db’ element stands for the actual database, which has a capacity of holding ‘length’ number of linked lists. In this project, the arbitrary value for length taken is 101. Thus, each instance of this key-value store has the capacity to hold 101 linked lists.

The ‘mu’ mutex declared here is necessary to ensure data remaining consistent with concurrent access, while the ‘filename’ specifies the actual file name into which the contents of the key-value store are written regularly, and from which the old values are restored on initializing the store.

The Client Key-Value Store has been implemented as a REST-accessible Key-Value Store, which means that it needs HTTP requests to allow access to the data within it. These are:

  • POST (to create key-value pair)
  • GET (to obtain value associated with key)
  • PUT (to update a key-value pair)
  • DELETE (to delete the key-value pair from the store).

Thus, the main 4 functions in regards to interaction with this store are: PostHandler, GetHandler PutHandler, and DeleteHandler, with the names being self-explanatory. The PostHandler function is given below:


//handles all post requests
func (kv *store) PostHandler(w http.ResponseWriter, r *http.Request) {

  log.Printf("\nPOST request received\n")

  w.Header().Set("Content-Type", "application/json")
  w.Header().Set("Connection", "close")

  w.WriteHeader(http.StatusCreated)

  if err := r.ParseForm(); err != nil {
  	fmt.Fprintf(w, "ParseForm() err: %v", err)
  	return
  }

  kv.mu.Lock()

  value := r.FormValue("value")
  params := mux.Vars(r)
  key := params["key"]

  duplicate := kv.Get(key)
  if duplicate == "Invalid" {
  	fmt.Fprintf(w, "Key = %s\n", key)
  	fmt.Fprintf(w, "Value = %s\n", value)
  	kv.Push(key, value)
  	kv.db_temp[key] = value
  } else {
  	fmt.Fprintf(w, "This key already exists")
  }

  kv.Persist()
  kv.mu.Unlock()
}

Upon being accessed, the functions return their respective HTTP response status codes. After checking for errors, it enforces the mutex lock to ensure data consistency by not letting others access the key-value store while changes are ongoing. Upon performing their respective actions on the key-value store, this updated key-value store is written into the associated file and then the mutex lock is released to make the store available for other operations. The persist function is used to save the contents of the key-value store regularly.

PERSISTENCE

The Raft Algorithm was designed keeping in mind the possibility of the failure of one or more replicas and to try to ensure that the effects of this on the service provided are minimal or nil. A Raft Service with ‘2n+1’ servers tolerate ‘n’ failed servers and will remain available as long as the other ‘n+1’ servers remain connected. The algorithm uses Persistence to deal with the mode of failure in the form of crashes, in which the server stops working & reboots. A Raft-based server must be able to pick up where it left off and continue. This requires that Raft keep a persistent state that survives a reboot, during which the volatile memory state is lost.

This persistence is done for each server by writing Raft's persistent state to disk or nonvolatile storage each time it changes and reading the latest saved state from the disk when restarting after a reboot. Every state that has to be persisted by a server is done so towards the end of the reply to the ongoing RPC.

Persistence doesn’t require all the values of a Raft Server to be saved, but only a few. The 5 subsets of the state persisted in this implementation are:

  • currentTerm - the last term the server encountered

  • votedFor - the last server ID for which the server had voted in the previous term

  • log - the log entries of the Raft Server

  • commitIndex - the index of the last known highest log entry to be comitted

  • lastApplied - the index of the last known highest log entry applied to state machine

The following code is for restoring the last persisted state into the local file with the name as the string ‘raft_persistence_file’:


func (node *RaftNode) RestoreFromStorage(storage *Storage) {

	var check bool

	var t1, t2, t3, t4, t5 interface{}

	if t1, check = node.storage.Get("currentTerm", node.Meta.raft_persistence_file); !check {
		log.Fatalf("\nFatal: persisted data found, but currentTerm not found in storage\n")
	}

	node.currentTerm = t1.(int32)

	if t2, check = node.storage.Get("votedFor", node.Meta.raft_persistence_file); !check {
		log.Fatalf("\nFatal: persisted data found, but votedFor not found in storage\n")
	}

	node.votedFor = t2.(int32)

	if t3, check = node.storage.Get("log", node.Meta.raft_persistence_file); !check {
		log.Fatalf("\nFatal: persisted data found, but log not found in storage\n")
	}

	node.log = t3.([]protos.LogEntry)

	if t4, check = node.storage.Get("commitIndex", node.Meta.raft_persistence_file); !check {
		log.Fatalf("\nFatal: persisted data found, but commitIndex not found in storage\n")
	}

	node.commitIndex = t4.(int32)

	if t5, check = node.storage.Get("lastApplied", node.Meta.raft_persistence_file); !check {
		log.Fatalf("\nFatal: persisted data found, but lastApplied not found in storage\n")
	}

	node.lastApplied = t5.(int32)
}

The code to write to persist or write the values into the local file is:

func (node *RaftNode) PersistToStorage() {

	node.storage.Set("currentTerm", node.currentTerm)
	node.storage.Set("votedFor", node.votedFor)
	node.storage.Set("log", node.log)
	node.storage.Set("commitIndex", node.commitIndex)
	node.storage.Set("lastApplied", node.lastApplied)

	node.storage.WriteFile(node.Meta.raft_persistence_file)

}

Persistence is implemented simply by calling ‘persistToStorage’ at every point where these state variables change.

The functions ‘Get’, ’Set’, 'ReadFile' and ‘WriteFile’ used in both persistToStorage() and restoreFromStorage() are files from the file ‘storage.go’, along with other functions like ‘ReadFile’ which are used to encode and write to the persistence file, or decode and get values from the persistence file.

// Encode as gob and write to file for persistence.
func (stored *Storage) WriteFile(filename string) {
	dataFile, err := os.Create(filename)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	// serialize the data
	dataEncoder := gob.NewEncoder(dataFile)

	err = dataEncoder.Encode(stored.m)

	if err != nil {
		log.Printf("Error in WriteFile: %v", err.Error())
	}

	dataFile.Close()
}

// Write a particular value to be persisted.
func (stored *Storage) Set(key string, value interface{}) {
	stored.mu.Lock()
	defer stored.mu.Unlock()
	stored.m[key] = value
}

These functions are associated with struct Storage, which has a mutex lock as its member. The mutex lock is used to ensure data consistency while accessing the file for reading or write operations.

While the persistence discussed so far is of the Raft Server alone, this concept is applied for even the Key-Value store, which is also susceptible to crashing.

The ‘filename’ member variable in the ‘store’ struct in the key-value store files specifies the name of the file in which the values of the key-value store are regularly being persisted.

The functions for recovering the key-value store and for persisting the same are:

func (kv *store) Recover() {

	kv.mu.RLock()
	defer kv.mu.RUnlock()
	kv.readFile()

}

func (kv *store) Persist() {
	kv.writeFile()
}

Where the mutex lock which is a part of the ‘kv’ struct is used to ensure multiple users cannot read and write to the file simultaneously and to ensure data consistency.

‘writeFile’ and ‘readFile’ both are used to encode/decode the data being written to/ read from the file.

func (kv *store) writeFile() {

	dataFile, err := os.Create(kv.filename)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	// serialize the data
	dataEncoder := gob.NewEncoder(dataFile)
	dataEncoder.Encode(kv.db_temp)

	dataFile.Close()
}

Persistence helps the system withstand server crashes. As long as it is only a minority of servers crash and also eventually restart, the service remains available to all the clients.