8 вопросов
RabbitMQ - брокер сообщений: producers отправляют в exchanges, exchanges маршрутизируют в очереди по правилам (routing key, bindings), consumers читают из очередей. Модель AMQP. В Go используют библиотеку amqp (streadway/amqp) или обертки. Подходит для задач воркерам, отложенных заданий, событий с гибкой маршрутизацией.
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, _ := conn.Channel()
ch.Publish("", "queue1", false, false, amqp.Publishing{Body: []byte("msg")})Direct: маршрутизация по точному совпадению routing key. Topic: по паттерну (например, "user.*.created"). Fanout: игнорирует ключ, рассылает во все привязанные очереди. Headers: по заголовкам. В Go при Publish указывают exchange и routing key; при Bind очереди к exchange задают ключ или паттерн. Выбор типа определяет гибкость маршрутизации.
ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
ch.QueueBind("q1", "", "logs", false, nil)Сообщение считается обработанным после ack от consumer. Без ack при отключении канал сообщение возвращается в очередь (или в DLQ при настройке). Nack (requeue=true) возвращает в очередь; requeue=false отбрасывает или отправляет в DLQ. В Go: после успешной обработки channel.Ack(deliveryTag, false). При ошибке - Nack с requeue в зависимости от политики (ретраи без requeue через DLQ предпочтительнее при постоянной ошибке).
err := process(msg)
if err != nil {
ch.Nack(d.Tag, false, false) // в DLQ
return
}
ch.Ack(d.Tag, false)Сообщения делают persistent (deliveryMode=2), очереди durable. Publisher confirms: канал в режиме confirm, после Publish ждут Confirm; при потере сообщения можно ретраить. В Go включают confirms: ch.Confirm(false), после Publish слушают ch.NotifyPublish. При падении брокера неподтвержденные сообщения теряются; повторная отправка на стороне приложения при таймауте confirm.
ch.Confirm(false)
ch.Publish(...)
select {
case confirm := <-ch.NotifyPublish(...):
if !confirm.Ack { /* retry */ }
}DLQ - очередь для сообщений, которые не удалось обработать (nack без requeue, TTL истек, limit exceeded). При объявлении очереди задают x-dead-letter-exchange и опционально x-dead-letter-routing-key. Необработанные сообщения попадают в DLQ для разбора и ретраев. В Go настраивают при QueueDeclare; отдельный consumer обрабатывает DLQ (логирование, алерты, повторная публикация).
args := amqp.Table{"x-dead-letter-exchange": "dlx"}
ch.QueueDeclare("tasks", true, false, false, false, args)Prefetch ограничивает число неподтвержденных сообщений на канал у consumer. Без лимита брокер отдает все доступные сообщения; при медленной обработке память растет и распределение между воркерами неравномерное. В Go: channel.Qos(prefetchCount, prefetchSize, global). Разумное значение - 1 или несколько, чтобы один consumer не забирал всю очередь. Равномерная загрузка воркеров.
ch.Qos(10, 0, false)TTL сообщения: при публикации задают expiration (миллисекунды). TTL очереди: x-message-ttl при объявлении очереди; все сообщения без своего expiration получают этот TTL. Истекшие сообщения удаляются или отправляются в DLQ (x-dead-letter-exchange). В Go используют для отложенных заданий: публикация с TTL или через delayed exchange плагин. Ограничение: до истечения TTL сообщение не удаляется мгновенно.
ch.Publish("", "queue", false, false, amqp.Publishing{
Body: []byte("data"),
Expiration: "60000",
})RabbitMQ: AMQP, очереди с ack, гибкая маршрутизация (exchanges), приоритеты, TTL. Меньше латентность для простых сценариев; сообщение удаляется после ack. Kafka: персистентный лог, партиции, consumer groups, повторное чтение, высокая пропускная способность. В Go для задач воркерам и коротких очередей удобен RabbitMQ (или облачные SQS). Для потоков событий, логов, ретроспективы - Kafka. Выбор по требованиям к персистентности, масштабу и семантике.