延时队列
概述
关于延时任务,在很多场景也会被使用到,比如订单 20 分钟后未支付自动关闭归还库存等。
go-queue 除了提供了 kafka 消息队列 kq 之外,也实现了延时队列 dq。目前 go-queue 的延时队列底层是使用的 beanstalkd。Config
type (
Beanstalk struct {
Endpoint string
Tube string
}
DqConf struct {
Beanstalks []Beanstalk
Redis redis.RedisConf
}
)
Beanstalks: 多个 Beanstalk 节点配置
Redis:redis 配置,主要在这里面使用 Setnx 去重
go-zero 中使用 dq 的 pusher
项目中首先要拉取 go-queue 的依赖
$ go get github.com/zeromicro/go-queue@latest
在 etc/xxx.yaml 配置文件中添加当前的 dq 配置信息
Name: dq
Host: 0.0.0.0
Port: 8888
......
DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2
在 internal/config 下的 config.go 中定义 go 映射的配置
type Config struct {
......
DqConf struct {
Brokers []string
Topic string
}
}
在 svc/serviceContext.go 中初始化 pusher 的 dq client
type ServiceContext struct {
Config config.Config
.....
DqPusherClient dq.Producer
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
.....
DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks),
}
}
在 logic 中写业务逻辑使用 go-queue 的 dq client 发送消息到 beanstalk
.......
func (l *PusherLogic) Pusher() error {
msg := "data"
// 1、5s后执行
deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5)
if err != nil {
logx.Errorf("error from DqPusherClient Delay err : %v", err)
}
logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
// 2、在某个指定时间执行
atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now())
if err != nil {
logx.Errorf("error from DqPusherClient Delay err : %v", err)
}
logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
return nil
}
go-zero 中使用 dq 消费者 consumer
项目中首先要拉取 go-queue 的依赖
$ go get github.com/zeromicro/go-queue@latest
在 etc/xxx.yaml 配置文件中添加当前的 kafka 配置信息
Name: dq
Host: 0.0.0.0
Port: 8889
.....
#dq
DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2
Redis:
Host: 127.0.0.1:6379
Type: node
在 internal/config 下的 config.go 中定义 go 映射的配置
package config
import (
"github.com/zeromicro/go-queue/dq"
"github.com/zeromicro/go-zero/rest"
)
type Config struct {
rest.RestConf
.......
DqConf dq.DqConf
}
在 svc/serviceContext.go 中初始化 consumer 的 dq client
type ServiceContext struct {
Config config.Config
.....
DqConsumer dq.Consumer
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
.....
DqConsumer: dq.NewConsumer(c.DqConf),
}
}
logic 中消费延时消息
func (l *PusherLogic) Consumer() error {
l.svcCtx.DqConsumer.Consume(func(body []byte) {
logx.Infof("consumer job %s \n", string(body))
})
}
写在最后,本身 beanstalk 不依赖 redis 的,但是 go-queue 为我们想的更周到防止短时间内重复消费,便使用了 redis 的 Setnx 帮我们在短时间内过滤掉消费过的消息