Что такое Asynq?

Представьте, что вы находитесь в оживлённом ресторане, и заказы поступают быстрее, чем повара могут их обработать. Чтобы справиться с этим хаосом, вам нужна система, которая может эффективно ставить эти заказы в очередь и следить за их выполнением в правильном порядке. В мире разработки программного обеспечения именно здесь на помощь приходят очереди задач. Asynq — это библиотека Go, которая помогает легко управлять такими очередями задач, опираясь на возможности Redis.

Asynq разработан так, чтобы быть масштабируемым, надёжным и эффективным, что делает его идеальным для распределения работы между несколькими машинами. Он разработан и поддерживается Кеном Хибино, инженером-программистом из Google, поэтому вы можете доверять качеству кода [5].

Настройка среды

Прежде чем погрузиться в код, необходимо настроить среду. Вот краткое руководство:

  • Клонируйте репозиторий Asynq или создайте новый проект Go.
  • Запустите сервер Redis. Вы можете использовать Docker или запустить его локально.
docker run -d -p 6379:6379 redis
  • Установите необходимые пакеты Go:
go get github.com/hibiken/asynq

Создание задач и обработчиков задач

В Asynq задачи являются основными единицами работы, которые необходимо обработать. Вот как вы можете определять и ставить задачи в очередь.

Определение задач

Задачи в Asynq определяются с помощью структуры Task. Вот пример того, как вы можете определить задачи для отправки приветственных и напоминающих электронных писем:

package tasks

import (
    "time"
    "github.com/hibiken/asynq"
)

const (
    TypeWelcomeEmail = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

func NewWelcomeEmailTask(id int) *asynq.Task {
    payload := map[string]interface{}{
        "user_id": id,
    }
    return asynq.NewTask(TypeWelcomeEmail, payload, nil)
}

func NewReminderEmailTask(id int, delay time.Duration) *asynq.Task {
    payload := map[string]interface{}{
        "user_id": id,
    }
    return asynq.NewTask(TypeReminderEmail, payload, nil)
}

Создание обработчиков задач

Обработчики задач — вот где происходит волшебство — они содержат бизнес-логику для обработки задач. Вот простой пример обработчика задач, который регистрирует сообщение:

package main

import (
    "log"
    "tutorial-go-asynq/tasks"
    "github.com/hibiken/asynq"
)

func main() {
    redisConnection := asynq.RedisClientOpt{
        Addr: "localhost:6379",
    }
    worker := asynq.NewServer(redisConnection, asynq.Config{
        Concurrency: 10,
        Queues: map[string]int{
            "critical": 6,
            "default": 3,
            "low": 1,
        },
    })

    worker.Register(TypeWelcomeEmail, handleWelcomeEmail)
    worker.Register(TypeReminderEmail, handleReminderEmail)

    log.Println("Starting worker...")
    worker.Run()
}

func handleWelcomeEmail(ctx *asynq.Context, task *asynq.Task) error {
    log.Println("Sending welcome email to user", task.Payload()["user_id"])
    return nil
}

func handleReminderEmail(ctx *asynq.Context, task *asynq.Task) error {
    log.Println("Sending reminder email to user", task.Payload()["user_id"])
    return nil
}

Постановка задач в очередь

Теперь, когда у вас настроены задачи и обработчики задач, пришло время поставить эти задачи в очередь. Вот как вы можете это сделать с помощью клиента Asynq:

package main

import (
    "log"
    "math/rand"
    "time"
    "tutorial-go-asynq/tasks"
    "github.com/hibiken/asynq"
)

func main() {
    redisConnection := asynq.RedisClientOpt{
        Addr: "localhost:6379",
    }
    client := asynq.NewClient(redisConnection)
    defer client.Close()

    for {
        userID := rand.Intn(1000) + 10
        delay := 2 * time.Minute

        task1 := tasks.NewWelcomeEmailTask(userID)
        task2 := tasks.NewReminderEmailTask(userID, delay)

        if _, err := client.Enqueue(task1, asynq.Queue("critical")); err != nil {
            log.Fatal(err)
        }

        if _, err := client.Enqueue(task2, asynq.Queue("low"), asynq.ProcessIn(delay)); err != nil {
            log.Fatal(err)
        }

        time.Sleep(time.Second)
    }
}

Мониторинг очередей

Asynq предоставляет веб-интерфейс и инструменты CLI для мониторинга и управления вашими очередями. Вот как можно настроить веб-интерфейс:

  • Установка Asynqmon:
go get github.com/hibiken/asynq/asynqmon/cmd/asynqmon
  • Запуск Asynqmon:
asynqmon --redis-addr=localhost:6379 --port=8080
  • Доступ к веб-интерфейсу по адресу http://localhost:8080.
graph TD A("Клиент") -->|Ставит задачи в очередь|B(Redis) B -->|Задачи|C(Рабочий сервер) C -->|Обработка задач|D(Обработчики задач) D -->|Логирование/Обработка|E(Вывод) B("Asynqmon") -->|Мониторинг| B F -->|Веб-интерфейс| C("Браузер")

Расширенные функции

Asynq предлагает несколько дополнительных функций, которые делают его ещё более мощным:

  • Планирование: вы можете планировать задачи для последующей обработки, используя опцию asynq.ProcessIn.
  • Повторные попытки: Asynq поддерживает автоматические повторные попытки для неудачных задач, которые можно настроить с помощью опции MaxRetry.
  • Приоритизация очередей: вы можете приоритизировать задачи в разных очередях, обеспечивая первоочередную обработку критически важных задач.
worker := asynq.NewServer(redisConnection, asynq.Config{
    Concurrency: 10,
    Queues: map[string]int{
        "critical": 6,
        "default": 3,
        "low": 1,
    },
    // Конфигурация повторных попыток
    RetryMax: 5,
    RetryDelay: func(attempt int, err error) time.Duration {
        return time.Duration(attempt) * time.Second
    },
})

Масштабирование системы

Одной из наиболее привлекательных особенностей Asynq является его способность к масштабированию. Вот как вы можете масштабировать свою систему:

  • Несколько рабочих серверов: вы можете создать несколько рабочих серверов, каждый из которых настроен на работу с разными очередями.
  • Docker Compose: используйте Docker Compose для автоматического запуска нужного количества реплик рабочего сервера.
  • Redis Cluster: Asynq поддерживает Redis Cluster для обеспечения высокой доступности и горизонтального масштабирования.
graph TD A("Клиент") -->|Ставит задачи в очередь|B(Redis Cluster) B -->|Задачи|C1(Рабочий сервер 1) B -->|Задачи|C2(Рабочий сервер 2) B -->|Задачи|C3(Рабочий сервер 3) C1 -->|Обработка задач|D1(Обработчики задач) C2 -->|Обработка задач|D2(Обработчики задач) C3 -->|Обработка задач|D3(Обработчики задач) D1 -->|Логирование/Обработка|E1(Вывод) D2 -->|Логирование/Обработка|E2(Вывод) D3 -->|Логирование/Обработка| B("Вывод")

Заключение

Asynq — мощный инструмент для управления очередями задач в ваших приложениях Go. Благодаря своей масштабируемости, надёжности и простоте использования он просто необходим в вашем наборе инструментов. Независимо от того, отправляете ли вы электронные письма, создаёте отчёты или обрабатываете изображения, Asynq может помочь вам эффективно обрабатывать фоновые задачи.

Так что в следующий раз, когда вы окажетесь в ситуации, когда задачи накапливаются быстрее, чем вы можете их обработать, вспомните об Asynq — невоспетый герой управления очередями задач в мире Go. Счастливого программирования!