executors act as a task pool, do multi-task buffering, and use tasks for batch processing. Such as:
clickhouse large batch
sql batch insert. At the same time, you can also see
ChunkExecutor is used to limit the byte size of task submission].
So when you have the following requirements, you can use this component:
- Submit tasks in batches
- Buffer part of tasks and submit lazily
- Delay task submission
Before explaining it in detail, let's give a rough overview:
executors package, there are the following
|Delay the execution of the passed |
You will see that except for the special functions of
less, the other three are all combinations of
container is an
This shows the dependency between:
So if you want to complete your own
executor, you can implement these three interfaces of
container, and then combine with
So back to the picture 👆, our focus is on the
periodicalexecutor, and see how it is designed?
First look at how to use this component in business:
There is a timed service to perform data synchronization from
clickhouse at a fixed time every day:
An additional introduction:
clickhouse is suitable for mass insertion, because the insert speed is very fast, mass insert can make full use of clickhouse
Main business logic preparation:
You may be wondering why
Flush(), Wait() is needed, and I will analyze it through the source code later.
There are 3 steps to use as a whole:
Add(): Add to task
Flush(): Refresh tasks in
Wait(): Wait for the completion of all tasks
The main analysis here is
periodicalexecutor, because the other two commonly used
executors rely on it.
commander: Pass the channel of
container: Temporarily store the task of
Add(), at the beginning of this time,
executeTasks()will let go of blocking
ticker: To prevent the blocking of
Add(), there will be a chance to execute regularly and release the temporarily stored task in time
After initialization, the first step in the business logic is to add task to
AddTask() is controlling the maximum number of tasks. If it exceeds the number of tasks,
RemoveAll() will be executed, and the tasks pop of the temporarily stored
container will be passed to the
commander, followed by goroutine loop reading , And then execute tasks.
Start a background coroutine, and constantly refresh the tasks in the
Overall two processes:
commanderreceives the tasks passed by
RemoveAll(), then executes it, and releases the blocking of
- It’s time for
ticker, if the first step is not executed, it will automatically
Flush()and execute the task.
backgroundFlush(), a function is mentioned:
By enumerating in this way, you can know why you have to bring
dts.insertExecutor.Wait() at the end. Of course, you have to wait for all
goroutine tasks to complete.
In looking at the source code, I thought about some other design ideas, do you have similar questions:
- In the analysis of
executors, you will find that there are
lockin many places
There is a race condition in
go test, use locking to avoid this situation
- After analyzing
confirmChan, it was found that this submit only appeared, why is it designed like this?
It used to be:
wg.Add(1)was written in
executeTasks(); now it is: first
wg.Add(1), then release
confirmChanblocking If the execution of
executor funcis blocked,
Add taskis still in progress, because there is no block, it may be executed to
Executor.Wait()soon, and this is where
wg.Add ()before execution, this will be
For details, please see the latest version of
TestPeriodicalExecutor_WaitFast(), you may wish to run on this version to reproduce.
There are still a few more analysis of
executors, I leave it to you to look at the source code.
In short, the overall design:
- Follow interface-oriented design
- Flexible use of concurrent tools such as
- The combination of execution unit + storage unit
There are many useful component tools in
go-zero. Good use of tools is very helpful to improve service performance and development efficiency. I hope this article can bring you some gains.