Skip to main content

Message queue

Overview

The message queue is a way of communicating between applications, enabling asynchronous communication and increasing the availability and scalability of the system.In go-zero, we used go-queue

Task Targets

  • Learn about the basic usage of go-queue
  • Learn how to use message queues in go-zero

Producer

go-queue producers are simple. Only kafka addresses are needed to create a Pusher object.

NewPusher(addrs []string, topic string, opts ...PushOption)

Code Example 2

pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19093",
"127.0.0.1:19094",
}, "test")

if err:=pusher.Push("foo");err!=nil{
log.Fatal(err)
}

Consumer

Configure Introduction

kq Configuration Structure states the following:

type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
}

service.ServiceConf reference Basic Service Configuration

ParamsDataTypeRequired?Default valueNote
Brokers[]stringYESkafka cluster address
GroupstringYESConsumer Group
TopicstringYESMessage topic
OffsetstringNOlastStarting position of consumer consumption, optional values first and last
ConnsintNO1Number of connections
ConsumersintNO8Number of consumers
ProcessorsintNO8Number of Message Processors
MinBytesintNO10240Minimum number of bytes
MaxBytesintNO10485760Maximum number of messages
UsernamestringNOUsername
PasswordstringNOPassword
ForceCommitboolNOtrueForce Commit or not

Code Example 1

Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19093
- 127.0.0.1:19094
Group: foo
Topic: test
Offset: first
Consumers: 1