Skip to main content

消息队列

概述

消息队列是一种应用程序间通信的方式,它可以实现异步通信,提高系统的可用性和可扩展性。在 go-zero 中,我们使用了 go-queue

任务目标

  • 了解 go-queue 的基本使用
  • 了解如何在 go-zero 中使用消息队列

生产者

go-queue 生产者很简单,只需要 kafka 地址,topic 即可创建一个 Pusher 对象。

NewPusher(addrs []string, topic string, opts ...PushOption)

代码示例1

pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19093",
"127.0.0.1:19094",
}, "test")

if err:=pusher.Push("foo");err!=nil{
log.Fatal(err)
}

消费者

配置介绍

kq 的配置结构体声明如下:

type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
}

service.ServiceConf 请参考 基础服务配置

参数类型是否必填默认值说明
Brokers[]stringkafka 集群地址
Groupstring消费者组
Topicstring消息主题
Offsetstringlast消费者消费的起始位置,可选值为 first 和 last
Connsint1连接数
Consumersint8消费者数
Processorsint8消息处理器数
MinBytesint10240消息最小字节数
MaxBytesint10485760消息最大字节数
Usernamestring用户名
Passwordstring密码
ForceCommitbooltrue是否强制提交

代码示例2

Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19093
- 127.0.0.1:19094
Group: foo
Topic: test
Offset: first
Consumers: 1