Stream processing
Stream processing is a computer programming paradigm that allows given a data sequence (stream processing data source), a series of data operations (functions) are applied to each element in the stream. At the same time, stream processing tools can significantly improve programmers' development efficiency, allowing them to write effective, clean, and concise code.
Streaming data processing is very common in our daily work. For example, we often record many business logs in business development. These logs are usually sent to Kafka first, and then written to elasticsearch by the Job consumption Kafka, and the logs are in progress. In the process of stream processing, logs are often processed, such as filtering invalid logs, doing some calculations and recombining logs, etc. The schematic diagram is as follows:
#
fxgo-zero is a full-featured microservice framework. There are many very useful tools built in the framework, including streaming data processing tools fx , let’s use a simple example to understand the tool:
The inputStream function simulates the generation of stream data, and the outputStream function simulates the process of stream data. The From function is the input of the stream. The Walk function concurrently acts on each item. The Filter function filters the item as true and keeps it as false. Keep, the ForEach function traverses and outputs each item element.
#
Intermediate operations of streaming data processingThere may be many intermediate operations in the data processing of a stream, and each intermediate operation can act on the stream. Just like the workers on the assembly line, each worker will return to the processed new part after operating the part, and in the same way, after the intermediate operation of the flow processing is completed, it will also return to a new flow.
Intermediate operations of fx stream processing:
Operation function | Features | Input |
---|---|---|
Distinct | Remove duplicate items | KeyFunc, return the key that needs to be deduplicated |
Filter | Filter items that do not meet the conditions | FilterFunc, Option controls the amount of concurrency |
Group | Group items | KeyFunc, group by key |
Head | Take out the first n items and return to the new stream | int64 reserved number |
Map | Object conversion | MapFunc, Option controls the amount of concurrency |
Merge | Merge item into slice and generate new stream | |
Reverse | Reverse item | |
Sort | Sort items | LessFunc implements sorting algorithm |
Tail | Similar to the Head function, n items form a new stream after being taken out | int64 reserved number |
Walk | Act on each item | WalkFunc, Option controls the amount of concurrency |
The following figure shows each step and the result of each step:
#
Usage and principle analysis#
FromConstruct a stream through the From function and return the Stream, and the stream data is stored through the channel:
#
FilterThe Filter function provides the function of filtering items, FilterFunc defines the filtering logic true to retain the item, and false to not retain:
#
GroupGroup groups the stream data. The key of the group needs to be defined. After the data is grouped, it is stored in the channel as slices:
#
Reversereverse can reverse the elements in the stream:
#
DistinctDistinct de-duplicates elements in the stream. De-duplication is commonly used in business development. It is often necessary to de-duplicate user IDs, etc.:
#
WalkThe concurrency of the Walk function works on each item in the stream. You can set the number of concurrency through WithWorkers. The default number of concurrency is 16, and the minimum number of concurrency is 1. If you set unlimitedWorkers to true, the number of concurrency is unlimited, but the number of concurrent writes in the stream is unlimited. The data is limited by defaultWorkers. In WalkFunc, users can customize the elements that are subsequently written to the stream, and can write multiple elements without writing:
#
Concurrent processingIn addition to stream data processing, the fx tool also provides function concurrency. The realization of a function in microservices often requires multiple services. Concurrent processing dependence can effectively reduce dependency time and improve service performance.
Note that when fx.Parallel performs dependency parallel processing, there will be no error return. If you need an error return, or a dependency error report needs to end the dependency request immediately, please use the MapReduce tool To process.
#
SummaryThis article introduces the basic concepts of stream processing and the stream processing tool fx in go-zero. There are many stream processing scenarios in actual production. I hope this article can give you some inspiration and better response Stream processing scene at work.