跳转到内容

RabbitMQ

go-zero 通过官方 amqp091-go 库与 RabbitMQ 集成。社区在 zeromicro/zero-contrib 下提供了 rabbitmq 插件。

Terminal window
go get github.com/zeromicro/zero-contrib/zrpc/rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, err := conn.Channel()
err = ch.Publish(
"orders", // exchange
"created", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: orderJSON,
},
)
msgs, err := ch.Consume(
"order-queue", // queue
"", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
for msg := range msgs {
if err := processOrder(msg.Body); err != nil {
msg.Nack(false, true) // 重新入队
} else {
msg.Ack(false)
}
}

将连接包裹在重连循环中:

for {
conn, err := amqp.Dial(url)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
runConsumer(conn)
// conn 已关闭 — 重新连接
}