Message queue
The message queue is a way of communicating between applications, enabling asynchronous communication and increasing the availability and scalability of the system.In go-zero, we used go-queue
Task Targets
- Learn about the basic usage of go-queue
- Learn how to use message queues in go-zero
go-queue producers are simple. Only kafka addresses are needed to create a Pusher object.
NewPusher(addrs []string, topic string, opts ...PushOption)
Code Example 2
pusher := kq.NewPusher([]string{
}, "test")
if err:=pusher.Push("foo");err!=nil{
Configure Introduction
kq Configuration Structure states the following:
type KqConf struct {
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
service.ServiceConf reference Basic Service Configuration
Brokers | []string | YES | kafka cluster address | |
Group | string | YES | Consumer Group | |
Topic | string | YES | Message topic | |
Offset | string | NO | last | Starting position of consumer consumption, optional values first and last |
Conns | int | NO | 1 | Number of connections |
Consumers | int | NO | 8 | Number of consumers |
Processors | int | NO | 8 | Number of Message Processors |
MinBytes | int | NO | 10240 | Minimum number of bytes |
MaxBytes | int | NO | 10485760 | Maximum number of messages |
Username | string | NO | Username | |
Password | string | NO | Password | |
ForceCommit | bool | NO | true | Force Commit or not |
Code Example 1
- config.yaml
- main.go
Name: kq
Group: foo
Topic: test
Offset: first
Consumers: 1
package main
import (
func main() {
var c kq.KqConf
conf.MustLoad("config.yaml", &c)
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
defer q.Stop()