Fx
fx
is a complete stream processing component.
It is similar to MapReduce
, fx
also has a concurrent processing function: Parallel(fn, options)
. But at the same time it is not only concurrent processing. From(chan)
, Map(fn)
, Filter(fn)
, Reduce(fn)
, etc., read from the data source into a stream, process the stream data, and finally aggregate the stream data. Is it a bit like Java Lambda? If you were a Java developer before, you can understand the basic design when you see this.
#
Overall APILet's get an overview of how fx
is constructed as a whole:
The marked part is the most important part of the entire fx
:
- From APIs such as
From(fn)
, a data streamStream
is generated - A collection of APIs for converting, aggregating, and evaluating
Stream
So list the currently supported Stream API
:
API | Function |
---|---|
Distinct(fn) | Select a specific item type in fn and de-duplicate it |
Filter(fn, option) | fn specifies specific rules, and the element that meets the rules is passed to the next stream |
Group(fn) | According to fn, the elements in stream are divided into different groups |
Head(num) | Take out the first num elements in stream and generate a new stream |
Map(fn, option) | Convert each ele to another corresponding ele and pass it to the next stream |
Merge() | Combine all ele into one slice and generate a new stream |
Reverse() | Reverse the element in stream . [Use double pointer] |
Sort(fn) | Sort elements in stream according to fn |
Tail(num) | Take out the last num elements of stream to generate a new stream . [Using a doubly linked list] |
Walk(fn, option) | Apply fn to every element of source . Generate a new stream |
No longer generates a new stream
, do the final evaluation operation:
API | Function |
---|---|
ForAll(fn) | Process stream according to fn, and no longer generate stream [evaluation operation] |
ForEach(fn) | Perform fn [evaluation operation] on all elements in stream |
Parallel(fn, option) | Concurrently apply the given fn and the given number of workers to each element [evaluation operation] |
Reduce(fn) | Directly process stream [evaluation operation] |
Done() | Do nothing, wait for all operations to complete |
#
How to use?From()
generatesstream
from aslice
Walk()
receives and astream
, transforms and reorganizes eachele
in the stream to generate a newstream
- Finally, the
stream
output (fmt.Println
), storage (map,slice
), and persistence (db operation
) are performed by theevaluation operation
#
Briefly analyzeThe function naming in fx
is semantically. Developers only need to know what kind of conversion is required for the business logic and call the matching function.
So here is a brief analysis of a few more typical functions.
#
Walk()Walk()
is implemented as the bottom layer by multiple functions throughout fx
, such as Map(), Filter()
, etc.
So the essence is: Walk()
is responsible for concurrently applying the passed function to each ele
of the input stream and generating a new stream
.
Following the source code, it is divided into two sub-functions: custom count by worker
, default count is worker
- Use
buffered channel
as a concurrent queue to limit the number of concurrent waitgroup
to ensure the completeness of the task completion
Another walkUnlimited()
: also uses waitgroup
for concurrency control, because there is no custom concurrency limit, so there is no other channel
for concurrency control.
#
Tail()The introduction of this is mainly because the ring
is a doubly linked list, and the simple algorithm is still very interesting.
As for why Tail()
can take out the last n of the source, this is left for everyone to fine-tune. Here is my understanding:
tip
Suppose there is the following scenario,Tail(5)
stream size
:7ring size
:5
Here you can use the method of pulling apart the ring-shaped linked list, Loop-to-line,At this point, divide the symmetry axis by the full length, flip the extra elements, and the following elements are the parts needed by Tail(5)
.
tip
The graph is used here for a clearer performance, but everyone should also look at the code. Algorithm to be tested
#
Stream Transform DesignAnalyzing the entire fx
, you will find that the overall design follows a design template:
channel
as a container for streams- Open
goroutine
to convertsource
, aggregate, and send tochannel
- Processed,
close(outputStream)
channel -> stream
#
SummaryThis concludes the basic introduction of fx
. If you are interested in other API source code, you can follow the API list above to read one by one.
At the same time, it is also recommended that you take a look at the API of java stream
, and you can have a deeper understanding of this stream call
.
At the same time, there are many useful component tools in go-zero
. Good use of tools will greatly help improve service performance and development efficiency. I hope this article can bring you some gains.