Beanstalkd Queue
Overview
Section titled “Overview”With regard to extended tasks, there are many scenarios in which the return of stocks is automatically closed without payment after 20 minutes.
go-queue implements the time queue dq, in addition to providing kafka message queue kq.The bottom layer of the go-queue is used for the beanstalkd.
Config
Section titled “Config”type ( Beanstalk struct { Endpoint string Tube string }
DqConf struct { Beanstalks []Beanstalk Redis redis.RedisConf })-
Beantalks: multiple Beanstalk node configurations
-
Redis:redis configuration, mainly using Setnx here
pusher using dq in go-zero
Section titled “pusher using dq in go-zero”First pull go-queue dependencies in the project
$ go get github.com/zeromicro/go-queue@latestAdd current dq configuration information to the etc/xxx.yaml configuration file
Name: dqHost: 0.0.0.0Port: 8888
......
DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2Define configuration of go mapping in config.go under internal/config
type Config struct { ...... DqConf struct { Brokers []string Topic string }}Initialize a penchant dq client in svc/serviceContext.go
type ServiceContext struct { Config config.Config ..... DqPusherClient dq.Producer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks), }}Write business logic in logic, using go-queue dq client to send messages to beanstalk
.......func (l *PusherLogic) Pusher() error {
msg := "data"
// 1、5s后执行 deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
// 2、在某个指定时间执行 atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now()) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
return nil}Use dq consumer consumer in go-zero
Section titled “Use dq consumer consumer in go-zero”First pull go-queue dependencies in the project
$ go get github.com/zeromicro/go-queue@latestAdd current kafka configuration information to the etc/xxx.yaml configuration file
Name: dqHost: 0.0.0.0Port: 8889
.....
#dqDqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: nodeDefine configuration of go mapping in config.go under internal/config
package config
import ( "github.com/zeromicro/go-queue/dq" "github.com/zeromicro/go-zero/rest")
type Config struct { rest.RestConf ....... DqConf dq.DqConf}Initialize consumer dq client in svc/serviceContext.go
type ServiceContext struct { Config config.Config ..... DqConsumer dq.Consumer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqConsumer: dq.NewConsumer(c.DqConf), }}logic 中消费延时消息
func (l *PusherLogic) Consumer() error { l.svcCtx.DqConsumer.Consume(func(body []byte) { logx.Infof("consumer job %s \n", string(body)) })}Write in the end, the beanstalk is not reliant on redis, but go-queue is the better we want to prevent repeated consumption in a short period of time, using redis Setnx to allow us to filter spent messages within a short period of time