Beanstalkd Queue
Overview
Section titled “Overview”Delay queues are useful for scheduled tasks — for example, automatically cancelling an unpaid order 20 minutes after it is placed.
go-queue implements the delay queue dq (backed by beanstalkd), in addition to the Kafka message queue kq.
Config
Section titled “Config”type ( Beanstalk struct { Endpoint string Tube string }
DqConf struct { Beanstalks []Beanstalk Redis redis.RedisConf })-
Beantalks: multiple Beanstalk node configurations
-
Redis:redis configuration, mainly using Setnx here
pusher using dq in go-zero
Section titled “pusher using dq in go-zero”First pull go-queue dependencies in the project
$ go get github.com/zeromicro/go-queue@latestAdd current dq configuration information to the etc/xxx.yaml configuration file
Name: dqHost: 0.0.0.0Port: 8888
......
DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2Define configuration of go mapping in config.go under internal/config
type Config struct { ...... DqConf struct { Brokers []string Topic string }}Initialize a dq producer client in svc/serviceContext.go:
type ServiceContext struct { Config config.Config ..... DqPusherClient dq.Producer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks), }}Write business logic in logic, using go-queue dq client to send messages to beanstalk
.......func (l *PusherLogic) Pusher() error {
msg := "data"
// 1、5s后执行 deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
// 2、在某个指定时间执行 atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now()) if err != nil { logx.Errorf("error from DqPusherClient Delay err : %v", err) } logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)
return nil}Use dq consumer consumer in go-zero
Section titled “Use dq consumer consumer in go-zero”First pull go-queue dependencies in the project
$ go get github.com/zeromicro/go-queue@latestAdd current kafka configuration information to the etc/xxx.yaml configuration file
Name: dqHost: 0.0.0.0Port: 8889
.....
#dqDqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: nodeDefine configuration of go mapping in config.go under internal/config
package config
import ( "github.com/zeromicro/go-queue/dq" "github.com/zeromicro/go-zero/rest")
type Config struct { rest.RestConf ....... DqConf dq.DqConf}Initialize consumer dq client in svc/serviceContext.go
type ServiceContext struct { Config config.Config ..... DqConsumer dq.Consumer}
func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, ..... DqConsumer: dq.NewConsumer(c.DqConf), }}Consume delay messages in logic:
func (l *PusherLogic) Consumer() error { l.svcCtx.DqConsumer.Consume(func(body []byte) { logx.Infof("consumer job %s \n", string(body)) })}Note that beanstalkd itself does not depend on Redis. go-queue uses Redis SETNX as a short-window deduplication filter to prevent the same message from being consumed more than once.