Skip to main content

Delay Queue

Overview

Delay queue is a special queue whose elements can only be consumed after the specified delay has arrived.Delayed queue implementation usually relies on a scheduled task, which regularly scan elements in the queue, move expired elements to another queue, from which consumers consume.

In go-zero we use go-queue to achieve delay queue

Task Targets

  • Learn about the basic usage of go-queue
  • Learn how to use the delay queue in go-zero

Preparing

Producer

dq Producer is simple and needs only beanstalkd addresses, tube can create a producer object.

type Beanstalk struct {
Endpoint string // beanstalkd server address
Tube string // tube name
}

NewProducer(beanstalks []Beanstalk) Producer

Code Example

package main

import (
"fmt"
"strconv"
"time"

"github.com/zeromicro/go-queue/dq"
)

func main() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})

_, err := producer.Delay([]byte("hello"), time.Second*5)
if err != nil {
fmt.Println(err)
}

_, err = producer.At([]byte("hello"), time.Now().Add(time.Second*10))
if err != nil {
fmt.Println(err)
}
}

Consumer

Configure Introduction

type DqConf struct {
Beanstalks []Beanstalk
Redis redis.RedisConf
}

type Beanstalk struct {
Endpoint string // beanstalkd server address
Tube string // tube name
}
ParamsDataTypeRequired?Default valueNote
Beanstalks[]BeanstalkYESbeanstalkd configuration
RedisRedisConfYESredis configuration, referenceRedis configuration

Beanstalk

ParamsDataTypeRequired?Default valueNote
EndpointstringYESbeanstalkd address
TubestringYEStube name

Sample

package main

import (
"fmt"

"github.com/zeromicro/go-queue/dq"
"github.com/zeromicro/go-zero/core/stores/redis"
)

func main() {
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
fmt.Println(string(body))
})
}

References

  1. go-beanstalk:
  2. Beanstalkd