Введение в распределённую блокировку

В мире распределённых систем управление параллельным доступом к общим ресурсам представляет собой критическую задачу. Представьте себе сценарий, в котором нескольким экземплярам вашего микросервиса необходимо получить доступ к общей базе данных или выполнить какую-либо эксклюзивную операцию. Именно здесь на помощь приходит распределённая блокировка, гарантируя, что только один процесс может получить доступ к ресурсу в любой момент времени.

Что такое etcd?

Прежде чем приступить к реализации, давайте разберёмся, что такое etcd. Etcd — это строго согласованное распределённое хранилище «ключ-значение», которое обеспечивает надёжный способ хранения данных в распределённой системе. Он широко используется в крупномасштабных системах, включая Kubernetes, для управления конфигурацией и обнаружения служб. Способность etcd поддерживать согласованность между несколькими узлами делает его идеальным выбором для реализации распределённой блокировки.

Настройка окружения

Чтобы начать создавать нашу систему распределённой блокировки, вам необходимо установить Go в вашей системе (версии 1.13 или новее) и запустить etcd локально или сделать его доступным через сервер.

Установка etcd

Если вы ещё не установили etcd, вы можете загрузить его из официального репозитория etcd и запустить локально с помощью следующей команды:

etcd

Настройка проекта Go

Создайте новый проект Go и инициализируйте его необходимыми зависимостями:

mkdir distributed-lock && cd distributed-lock
go mod init example.com/distributed-lock
go get go.etcd.io/etcd/clientv3

Реализация распределённой блокировки

Теперь давайте реализуем механизм распределённой блокировки с помощью Go и etcd.

Инициализация клиента etcd

Сначала нам нужно инициализировать клиент etcd. Вот как вы можете это сделать:

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    client "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

var lock *concurrency.Mutex

func main() {
    // Инициализируем клиент etcd
    etcdClient, err := client.New(client.Config{Endpoints: []string{"localhost:2379"}})
    if err != nil {
        log.Fatal(err)
    }
    defer etcdClient.Close() // Очистка

    session, err := concurrency.NewSession(etcdClient)
    if err != nil {
        log.Fatal(err)
    }
    defer session.Close()

    mutex := concurrency.NewMutex(session, "/lock-prefix")
    PerformImportantTask(mutex)
}

Функции блокировки и разблокировки

Вот подробный взгляд на функцию PerformImportantTask, которая демонстрирует, как получить и освободить блокировку:

func PerformImportantTask(mutex *concurrency.Mutex) {
    ctx := context.Background()
    t := time.Now().Unix()
    fmt.Println("Ожидание получения блокировки...")
    // Только один процесс может заблокировать, остальные будут ждать
    mutex.Lock(ctx)
    fmt.Printf("Получили блокировку после ожидания %d секунд\n", time.Now().Unix()-t)

    // Выполнение задачи критической секции
    fmt.Println("Выполнение сверхважной задачи")
    time.Sleep(5 * time.Second) // Моделирование критической секции

    mutex.Unlock(ctx)
    fmt.Println("Готово!")
}

Как это работает

  • Инициализация клиента etcd: мы подключаемся к серверу etcd с помощью функции client.New.
  • Создание сессии: мы создаём сессию выборов с помощью concurrency.NewSession, которая генерирует объект аренды в кластере etcd.
  • Получение блокировки: вызов mutex.Lock(ctx) создаёт пару «ключ-значение» с арендой, указывая, что блокировка принадлежит экземпляру.
  • Освобождение блокировки: вызов mutex.Unlock(ctx) удаляет пару «ключ-значение» и отзывает аренду.

Пример потока

Вот диаграмма последовательности, иллюстрирующая процесс получения и освобождения блокировки:

sequenceDiagram participant A as Instance A participant B as Instance B participant E as etcd Note over A,B: Несколько экземпляров работают одновременно A->>E: Запрос на получение блокировки E->>A: Блокировка получена (создать пару «ключ-значение» с арендой) A->>A: Выполнение задачи критической секции A->>E: Снятие блокировки (удалить пару «ключ-значение» и отозвать аренду) Note over B,E: Экземпляр B ожидает снятия блокировки B->>E: Запрос на получение блокировки (блокируется до снятия блокировки) E->>B: Блокировка получена (создать пару «ключ-значение» с арендой) B->>B: Выполнение задачи критической секции B->>E: Снятие блокировки (удалить пару «ключ-значение» и отозвать аренду)

Расширенные возможности

Блокировки чтения-записи

Для более сложных сценариев вам могут понадобиться блокировки чтения-записи. Пакет etcd-lock поддерживает эту функцию, позволяя нескольким читателям одновременно получать доступ к ресурсу, обеспечивая эксклюзивный доступ для писателей.

Тайм-ауты блокировки и продление

Чтобы сделать вашу систему блокировки более надёжной, вы можете реализовать тайм-ауты и продления блокировки. Это гарантирует, что блокировка будет снята, если процесс, удерживающий блокировку, аварийно завершится или станет невосприимчивым. Вот пример того, как вы могли бы реализовать это, используя аренду с указанным TTL (время жизни):

func (dl *DistributedLock) Lock(ctx context.Context, ttl int64) error {
    lease, err := dl.etcdClient.Grant(ctx, ttl)
    if err != nil {
        return err
    }

    _, err = dl.etcdClient.Put(ctx, dl.Key, dl.Value, clientv3.WithLease(lease.ID))
    if err != nil {
        return err
    }

    dl.LeaseID = lease.ID
    log.Printf("Блокировка получена: %s", dl.Key)
    return nil
}

Обработка ошибок и повторные попытки

Правильная обработка ошибок и механизмы повторных попыток имеют решающее значение для отказоустойчивой системы распределённой блокировки. Вот пример того, как вы можете обрабатывать ошибки и повторные попытки при получении блокировки:

func (dl *DistributedLock) Lock(ctx context.Context, ttl int64) error {
    for {
        lease, err := dl.etcdClient.Grant(ctx, ttl)
        if err != nil {
            log.Printf("Ошибка предоставления аренды: %v", err)
            time.Sleep(1 * time.Second) // Повторить попытку после короткой задержки
            continue
        }

        _, err = dl.etcdClient.Put(ctx, dl.Key, dl.Value, clientv3.WithLease(lease.ID))
        if err != nil {
            log.Printf("Ошибка помещения пары «ключ-значение»: %v", err)
            time.Sleep(1 * time.Second) // Повтор попытки после короткой задержки
            continue
        }

        dl.LeaseID = lease.ID
        log.Printf("Блокировка получена: %s", dl.Key)
        return nil
    }
}

Заключение

Создание системы распределённой блокировки с использованием Go и etcd — это мощный способ управления параллельным доступом к общим ресурсам в распределённых системах. Следуя описанным выше шагам, вы сможете обеспечить согласованность вашей системы и отсутствие условий гонки.

Помните, что ключ к успешной распределённой системе заключается не только в коде, но и в понимании лежащих в его основе принципов и тщательной их реализации. Итак, в следующий раз, когда вы будете иметь дело с несколькими экземплярами вашего микросервиса, не позволяйте им бороться за ресурсы — используйте etcd для поддержания порядка.

И в качестве завершающей мысли, если вы когда-нибудь окажетесь в ситуации, когда несколько процессов борются за блокировку, просто вспомните мудрые слова: «Блокировка в день держит условия гонки в страхе». Удачного кодирования!