Beanstalkd 队列
关于延时任务,在很多场景也会被使用到,比如订单 20 分钟后未支付自动关闭归还库存等。
go-queue 除了提供了 kafka 消息队列 kq 之外,也实现了延时队列 dq。目前 go-queue 的延时队列底层是使用的 beanstalkd。
Config
Section titled “Config”type ( Beanstalk struct { Endpoint string Tube string }
DqConf struct { Beanstalks []Beanstalk Redis redis.RedisConf })-
Beanstalks: 多个 Beanstalk 节点配置
-
Redis:redis 配置,主要在这里面使用 Setnx 去重
go-zero 中使用 dq 的 pusher
Section titled “go-zero 中使用 dq 的 pusher”项目中首先要拉取 go-queue 的依赖
$ go get github.com/zeromicro/go-queue@latest在 etc/xxx.yaml 配置文件中添加当前的 dq 配置信息
Name: dqHost: 0.0.0.0Port: 8888
......
DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2在 internal/config 下的 config.go 中定义 go 映射的配置
type Config struct { ...... DqConf struct { Brokers []string Topic string }}在 svc/serviceContext.go 中初始化 pusher 的 dq client
type ServiceContext struct { Config config.Config ..... DqPusherClient dq.Producer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks), }}在 logic 中写业务逻辑使用 go-queue 的 dq client 发送消息到 beanstalk
.......func (l *PusherLogic) Pusher() error {
msg := "data"
// 1、5s后执行 deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
// 2、在某个指定时间执行 atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now()) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
return nil}go-zero 中使用 dq 消费者 consumer
Section titled “go-zero 中使用 dq 消费者 consumer”项目中首先要拉取 go-queue 的依赖
$ go get github.com/zeromicro/go-queue@latest在 etc/xxx.yaml 配置文件中添加当前的 kafka 配置信息
Name: dqHost: 0.0.0.0Port: 8889
.....
#dqDqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: node在 internal/config 下的 config.go 中定义 go 映射的配置
package config
import ( "github.com/zeromicro/go-queue/dq" "github.com/zeromicro/go-zero/rest")
type Config struct { rest.RestConf ....... DqConf dq.DqConf}在 svc/serviceContext.go 中初始化 consumer 的 dq client
type ServiceContext struct { Config config.Config ..... DqConsumer dq.Consumer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqConsumer: dq.NewConsumer(c.DqConf), }}logic 中消费延时消息
func (l *PusherLogic) Consumer() error { l.svcCtx.DqConsumer.Consume(func(body []byte) { logx.Infof("consumer job %s \n", string(body)) })}写在最后,本身 beanstalk 不依赖 redis 的,但是 go-queue 为我们想的更周到防止短时间内重复消费,便使用了 redis 的 Setnx 帮我们在短时间内过滤掉消费过的消息