Введение в распределённый консенсус и Raft

В мире распределённых систем достижение консенсуса между узлами является критически важной задачей. Оно гарантирует, что все узлы в кластере согласованы относительно одного состояния, даже в случае сбоев. Одним из самых популярных и понятных алгоритмов консенсуса является Raft, разработанный так, чтобы быть более доступным по сравнению с его предшественником, Paxos. В этой статье мы погрузимся в мир Raft и реализуем систему распределённого консенсуса с использованием Go.

Зачем нужен Raft?

Raft назван в честь своих ключевых характеристик: надёжный (Reliable), реплицированный (Replicated), избыточный (Redundant) и отказоустойчивый (Fault-Tolerant). Он был создан для упрощения процесса достижения консенсуса в распределённых системах, делая его более понятным и простым в реализации по сравнению с другими алгоритмами, такими как Paxos.

Основные компоненты Raft

Выборы лидера

В кластере Raft узлы могут находиться в одном из трёх состояний: лидер (Leader), последователь (Follower) или кандидат (Candidate). Лидер отвечает за управление журналом и обеспечение синхронизации всех последователей. Вот упрощённый обзор процесса выборов лидера:

sequenceDiagram participant Follower1 as Follower 1 participant Follower2 as Follower 2 participant Follower3 as Follower 3 participant Candidate as Candidate Note over Follower1,Follower3: Leader failure detected Follower1->>Follower2: RequestVote Follower1->>Follower3: RequestVote Follower2->>Follower1: VoteGranted Follower3->>Follower1: VoteGranted Note over Follower1: Majority votes received Follower1->>Follower2: BecomeLeader Follower1->>Follower3: BecomeLeader

Когда лидер выходит из строя, последователи обнаруживают это через отсутствие сообщений пульса и переходят в состояние кандидата. Затем они начинают выборы, отправляя запросы на голосование (RequestVote) другим узлам. Кандидат, первым получивший большинство голосов, становится новым лидером.

Репликация журнала

После избрания лидера он отвечает за репликацию журнала. Вот как это работает:

  • Клиентский запрос: клиент отправляет запрос лидеру для выполнения операции (например, установки пары ключ-значение).
  • Запись в журнал: лидер добавляет этот запрос как запись в журнал и отправляет её всем последователям.
  • Консенсус большинства: лидер ждёт, пока большинство последователей подтвердит запись в журнале.
  • Фиксация: когда достигается большинство, лидер фиксирует запись в журнале и применяет её к своей машине состояний.
  • Обновление машины состояний: последователи узнают о зафиксированной записи в журнале от лидера и применяют её к своим локальным машинам состояний.

Реализация Raft в Go

Настройка проекта

Чтобы начать, вам нужно настроить проект Go. Вот базовая структура:

package main

import (
    "fmt"
    "log"
    "net"
    "net/http"
    "sync"
)

type Node struct {
    ID        uint64
    Address   string
    State     string // Leader, Follower, Candidate
    Log       []LogEntry
    Peers     []Node
    Mutex     sync.Mutex
}

type LogEntry struct {
    Command  []byte
    Term     uint64
    Index    uint64
}

func main() {
    // Initialize nodes
    nodes := []Node{
        {ID: 1, Address: "localhost:8080", State: "Follower"},
        {ID: 2, Address: "localhost:8081", State: "Follower"},
        {ID: 3, Address: "localhost:8082", State: "Follower"},
    }

    // Start each node
    for _, node := range nodes {
        go startNode(node)
    }

    // Start HTTP server for client requests
    http.HandleFunc("/set", setHandler)
    http.HandleFunc("/get", getHandler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}```

### Выборы лидера

Вот упрощённая реализация процесса выборов лидера:

```go
func startNode(node Node) {
    // Start listening for incoming connections
    ln, err := net.Listen("tcp", node.Address)
    if err != nil {
        log.Fatal(err)
    }

    // Handle incoming connections
    go func() {
        for {
            conn, err := ln.Accept()
            if err != nil {
                log.Println(err)
                return
            }
            go handleConnection(conn, node)
        }
    }()

    // Periodically check for leader failure
    go func() {
        for {
            time.Sleep(150 * time.Millisecond)
            if node.State == "Follower" {
                // Check if heartbeat received within timeout
                if !heartbeatReceived(node) {
                    // Transition to Candidate state and start election
                    node.State = "Candidate"
                    startElection(node)
                }
            }
        }
    }()
}

func startElection(node Node) {
    // Send RequestVote RPCs to all peers
    for _, peer := range node.Peers {
        go func(peer Node) {
            resp, err := requestVote(peer, node)
            if err != nil {
                log.Println(err)
                return
            }
            if resp.VoteGranted {
                // Increment vote count
                node.Mutex.Lock()
                node.Votes++
                node.Mutex.Unlock()
                // Check if majority votes received
                if node.Votes > len(node.Peers)/2 {
                    node.State = "Leader"
                    // Send heartbeats to followers
                    go sendHeartbeats(node)
                }
            }
        }(peer)
    }
}```

### Репликация журнала

Вот как вы можете реализовать репликацию журнала:

```go
func setHandler(w http.ResponseWriter, r *http.Request) {
    key := r.FormValue("key")
    value := r.FormValue("value")

    // Create log entry
    logEntry := LogEntry{
        Command:  []byte(fmt.Sprintf("set %s %s", key, value)),
        Term:     currentTerm,
        Index:    len(leader.Log),
    }

    // Add log entry to leader's log
    leader.Log = append(leader.Log, logEntry)

    // Send AppendEntries RPCs to followers
    for _, follower := range leader.Peers {
        go appendEntries(follower, logEntry)
    }

    // Wait for majority consensus
    for {
        if majorityConsensus(logEntry.Index) {
            // Commit log entry
            commitLogEntry(logEntry)
            break
        }
        time.Sleep(10 * time.Millisecond)
    }

    w.Write([]byte("Set operation successful"))
}

func appendEntries(follower Node, logEntry LogEntry) {
    resp, err := appendEntriesRPC(follower, logEntry)
    if err != nil {
        log.Println(err)
        return
    }
    if resp.Success {
        // Update matchIndex and nextIndex
        follower.MatchIndex = logEntry.Index
        follower.NextIndex = logEntry.Index + 1
    }
}```