Skip to main content

Delay Queue

Overview

With regard to extended tasks, there are many scenarios in which the return of stocks is automatically closed without payment after 20 minutes.

go-queue implements the time queue dq, in addition to providing kafka message queue kq.The bottom layer of the go-queue is used for the beanstalkd.

Config

type (
Beanstalk struct {
Endpoint string
Tube string
}

DqConf struct {
Beanstalks []Beanstalk
Redis redis.RedisConf
}
)
  • Beantalks: multiple Beanstalk node configurations

  • Redis:redis configuration, mainly using Setnx here

pusher using dq in go-zero

First pull go-queue dependencies in the project

$ go get github.com/zeromicro/go-queue@latest

Add current dq configuration information to the etc/xxx.yaml configuration file

Name: dq
Host: 0.0.0.0
Port: 8888

......

DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2

Define configuration of go mapping in config.go under internal/config

type Config struct {
......
DqConf struct {
Brokers []string
Topic string
}
}

Initialize a penchant dq client in svc/serviceContext.go

type ServiceContext struct {
Config config.Config
.....
DqPusherClient dq.Producer
}

func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
.....
DqPusherClient: dq.NewProducer(c.DqConf.Beanstalks),
}
}

Write business logic in logic, using go-queue dq client to send messages to beanstalk

.......
func (l *PusherLogic) Pusher() error {

msg := "data"

// 1、5s后执行
deplayResp, err := l.svcCtx.DqPusherClient.Delay([]byte(msg), time.Second*5)
if err != nil {
logx.Errorf("error from DqPusherClient Delay err : %v", err)
}
logx.Infof("resp : %s", deplayResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)

// 2、在某个指定时间执行
atResp, err := l.svcCtx.DqPusherClient.At([]byte(msg), time.Now())
if err != nil {
logx.Errorf("error from DqPusherClient Delay err : %v", err)
}
logx.Infof("resp : %s", atResp) // fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id)

return nil
}

Use dq consumer consumer in go-zero

First pull go-queue dependencies in the project

$ go get github.com/zeromicro/go-queue@latest

Add current kafka configuration information to the etc/xxx.yaml configuration file

Name: dq
Host: 0.0.0.0
Port: 8889

.....

#dq
DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2
Redis:
Host: 127.0.0.1:6379
Type: node

Define configuration of go mapping in config.go under internal/config

package config

import (
"github.com/zeromicro/go-queue/dq"
"github.com/zeromicro/go-zero/rest"
)

type Config struct {
rest.RestConf
.......
DqConf dq.DqConf
}

Initialize consumer dq client in svc/serviceContext.go

type ServiceContext struct {
Config config.Config
.....
DqConsumer dq.Consumer
}

func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
.....
DqConsumer: dq.NewConsumer(c.DqConf),
}
}

logic 中消费延时消息

func (l *PusherLogic) Consumer() error {
l.svcCtx.DqConsumer.Consume(func(body []byte) {
logx.Infof("consumer job %s \n", string(body))
})
}

Write in the end, the beanstalk is not reliant on redis, but go-queue is the better we want to prevent repeated consumption in a short period of time, using redis Setnx to allow us to filter spent messages within a short period of time

References

  1. Beanstalkd Introduction and Installation