Executors
In go-zero
, executors
act as a task pool, do multi-task buffering, and use tasks for batch processing. Such as: clickhouse
large batch insert
, sql batch insert
. At the same time, you can also see executors
in go-queue
[In queue
, ChunkExecutor
is used to limit the byte size of task submission].
So when you have the following requirements, you can use this component:
- Submit tasks in batches
- Buffer part of tasks and submit lazily
- Delay task submission
Before explaining it in detail, let's give a rough overview:
#
Interface designUnder the executors
package, there are the following executors
:
Name | Margin value |
---|---|
bulkexecutor | Reach maxTasks [Maximum number of tasks] Submit |
chunkexecutor | Reach maxChunkSize [Maximum number of bytes] Submit |
periodicalexecutor | basic executor |
delayexecutor | Delay the execution of the passed fn() |
lessexecutor |
You will see that except for the special functions of delay
and less
, the other three are all combinations of executor
+ container
:
And this container
is an interface
:
This shows the dependency between:
bulkexecutor
:periodicalexecutor
+bulkContainer
chunkexecutor
:periodicalexecutor
+chunkContainer
tip
So if you want to complete your own executor
, you can implement these three interfaces of container
, and then combine with periodicalexecutor
.
So back to the picture 👆, our focus is on the periodicalexecutor
, and see how it is designed?
#
How to useFirst look at how to use this component in business:
There is a timed service to perform data synchronization from mysql
to clickhouse
at a fixed time every day:
Initialize bulkExecutor
:
tip
An additional introduction: clickhouse
is suitable for mass insertion, because the insert speed is very fast, mass insert can make full use of clickhouse
Main business logic preparation:
tip
You may be wondering why Flush(), Wait()
is needed, and I will analyze it through the source code later.
There are 3 steps to use as a whole:
Add()
: Add to taskFlush()
: Refresh tasks incontainer
Wait()
: Wait for the completion of all tasks
#
Source code analysistip
The main analysis here is periodicalexecutor
, because the other two commonly used executors
rely on it.
#
Initializationcommander
: Pass the channel oftasks
container
: Temporarily store the task ofAdd()
confirmChan
: BlockAdd()
, at the beginning of this time,executeTasks()
will let go of blockingticker
: To prevent the blocking ofAdd()
, there will be a chance to execute regularly and release the temporarily stored task in time
#
Add()After initialization, the first step in the business logic is to add task to executor
:
In addAndCheck()
, AddTask()
is controlling the maximum number of tasks. If it exceeds the number of tasks, RemoveAll()
will be executed, and the tasks pop of the temporarily stored container
will be passed to the commander
, followed by goroutine loop reading , And then execute tasks.
#
backgroundFlush()Start a background coroutine, and constantly refresh the tasks in the container
:
Overall two processes:
commander
receives the tasks passed byRemoveAll()
, then executes it, and releases the blocking ofAdd()
to continueAdd()
- It’s time for
ticker
, if the first step is not executed, it will automaticallyFlush()
and execute the task.
#
Wait()In backgroundFlush()
, a function is mentioned: enterExecution()
:
By enumerating in this way, you can know why you have to bring dts.insertExecutor.Wait()
at the end. Of course, you have to wait for all goroutine tasks
to complete.
#
ThinkingIn looking at the source code, I thought about some other design ideas, do you have similar questions:
- In the analysis of
executors
, you will find that there arelock
in many places
tip
There is a race condition in go test
, use locking to avoid this situation
- After analyzing
confirmChan
, it was found that this submit only appeared, why is it designed like this?
It used to be:
wg.Add(1)
was written inexecuteTasks()
; now it is: firstwg.Add(1)
, then releaseconfirmChan
blocking If the execution ofexecutor func
is blocked,Add task
is still in progress, because there is no block, it may be executed toExecutor.Wait()
soon, and this is wherewg.Wait()
appears inwg.Add ()
before execution, this will bepanic
For details, please see the latest version of TestPeriodicalExecutor_WaitFast()
, you may wish to run on this version to reproduce.
#
SummaryThere are still a few more analysis of executors
, I leave it to you to look at the source code.
In short, the overall design:
- Follow interface-oriented design
- Flexible use of concurrent tools such as
channel
andwaitgroup
- The combination of execution unit + storage unit
There are many useful component tools in go-zero
. Good use of tools is very helpful to improve service performance and development efficiency. I hope this article can bring you some gains.