fx is a complete stream processing component.
It is similar to
fx also has a concurrent processing function:
Parallel(fn, options). But at the same time it is not only concurrent processing.
Reduce(fn), etc., read from the data source into a stream, process the stream data, and finally aggregate the stream data. Is it a bit like Java Lambda? If you were a Java developer before, you can understand the basic design when you see this.
Let's get an overview of how
fx is constructed as a whole:
The marked part is the most important part of the entire
- From APIs such as
From(fn), a data stream
- A collection of APIs for converting, aggregating, and evaluating
So list the currently supported
|Select a specific item type in fn and de-duplicate it|
|fn specifies specific rules, and the |
|According to fn, the elements in |
|Take out the first num elements in |
|Convert each ele to another corresponding ele and pass it to the next |
|Combine all |
|Reverse the element in |
|Sort elements in |
|Take out the last num elements of |
|Apply fn to every element of |
No longer generates a new
stream, do the final evaluation operation:
|Perform fn [evaluation operation] on all elements in |
|Concurrently apply the given fn and the given number of workers to each |
|Directly process |
|Do nothing, wait for all operations to complete|
Walk()receives and a
stream, transforms and reorganizes each
elein the stream to generate a new
- Finally, the
fmt.Println), storage (
map,slice), and persistence (
db operation) are performed by the
The function naming in
fx is semantically. Developers only need to know what kind of conversion is required for the business logic and call the matching function.
So here is a brief analysis of a few more typical functions.
Walk() is implemented as the bottom layer by multiple functions throughout
fx, such as
Map(), Filter(), etc.
So the essence is:
Walk() is responsible for concurrently applying the passed function to each
ele of the input stream and generating a new
Following the source code, it is divided into two sub-functions: custom count by
worker, default count is
buffered channelas a concurrent queue to limit the number of concurrent
waitgroupto ensure the completeness of the task completion
walkUnlimited(): also uses
waitgroup for concurrency control, because there is no custom concurrency limit, so there is no other
channel for concurrency control.
The introduction of this is mainly because the
ring is a doubly linked list, and the simple algorithm is still very interesting.
As for why
Tail() can take out the last n of the source, this is left for everyone to fine-tune. Here is my understanding:
Suppose there is the following scenario,
Here you can use the method of pulling apart the ring-shaped linked list, Loop-to-line，At this point, divide the symmetry axis by the full length, flip the extra elements, and the following elements are the parts needed by
The graph is used here for a clearer performance, but everyone should also look at the code. Algorithm to be tested
Analyzing the entire
fx, you will find that the overall design follows a design template:
channelas a container for streams
source, aggregate, and send to
channel -> stream
This concludes the basic introduction of
fx. If you are interested in other API source code, you can follow the API list above to read one by one.
At the same time, it is also recommended that you take a look at the API of
java stream, and you can have a deeper understanding of this
At the same time, there are many useful component tools in
go-zero. Good use of tools will greatly help improve service performance and development efficiency. I hope this article can bring you some gains.