FX Pipeline
The fx package provides a powerful and flexible API for stream processing. It allows users to perform various operations on streams, including filtering, mapping, reducing, grouping, and more.
Function Types
Section titled “Function Types”FilterFunc
Section titled “FilterFunc”Defines the method to filter a Stream.
type FilterFunc func(item any) boolForAllFunc
Section titled “ForAllFunc”Defines the method to handle all elements in a Stream.
type ForAllFunc func(pipe <-chan any)ForEachFunc
Section titled “ForEachFunc”Defines the method to handle each element in a Stream.
type ForEachFunc func(item any)GenerateFunc
Section titled “GenerateFunc”Defines the method to send elements into a Stream.
type GenerateFunc func(source chan<- any)KeyFunc
Section titled “KeyFunc”Defines the method to generate keys for the elements in a Stream.
type KeyFunc func(item any) anyLessFunc
Section titled “LessFunc”Defines the method to compare the elements in a Stream.
type LessFunc func(a, b any) boolMapFunc
Section titled “MapFunc”Defines the method to map each element to another object in a Stream.
type MapFunc func(item any) anyOption
Section titled “Option”Defines the method to customize a Stream.
type Option func(opts *rxOptions)ParallelFunc
Section titled “ParallelFunc”Defines the method to handle elements parallelly.
type ParallelFunc func(item any)ReduceFunc
Section titled “ReduceFunc”Defines the method to reduce all the elements in a Stream.
type ReduceFunc func(pipe <-chan any) (any, error)WalkFunc
Section titled “WalkFunc”Defines the method to walk through all the elements in a Stream.
type WalkFunc func(item any, pipe chan<- any)Stream
Section titled “Stream”A Stream is a stream that can be used to do stream processing.
type Stream struct { source <-chan any}Functions
Section titled “Functions”Concat
Section titled “Concat”Returns a concatenated Stream.
func Concat(s Stream, others ...Stream) StreamConstructs a Stream from the given GenerateFunc.
func From(generate GenerateFunc) StreamConverts the given arbitrary items to a Stream.
func Just(items ...any) StreamConverts the given channel to a Stream.
func Range(source <-chan any) StreamStream Methods
Section titled “Stream Methods”AllMatch
Section titled “AllMatch”Returns whether all elements of this stream match the provided predicate.
func (s Stream) AllMatch(predicate func(item any) bool) boolAnyMatch
Section titled “AnyMatch”Returns whether any elements of this stream match the provided predicate.
func (s Stream) AnyMatch(predicate func(item any) bool) boolBuffer
Section titled “Buffer”Buffers the items into a queue with size n.
func (s Stream) Buffer(n int) StreamConcat
Section titled “Concat”Returns a Stream that concatenated other streams.
func (s Stream) Concat(others ...Stream) StreamCounts the number of elements in the result.
func (s Stream) Count() (count int)Distinct
Section titled “Distinct”Removes the duplicated items based on the given KeyFunc.
func (s Stream) Distinct(fn KeyFunc) StreamWaits all upstreaming operations to be done.
func (s Stream) Done()Filter
Section titled “Filter”Filters the items by the given FilterFunc.
func (s Stream) Filter(fn FilterFunc, opts ...Option) StreamReturns the first item, nil if no items.
func (s Stream) First() anyForAll
Section titled “ForAll”Handles the streaming elements from the source and no later streams.
func (s Stream) ForAll(fn ForAllFunc)ForEach
Section titled “ForEach”Seals the Stream with the ForEachFunc on each item, no successive operations.
func (s Stream) ForEach(fn ForEachFunc)Groups the elements into different groups based on their keys.
func (s Stream) Group(fn KeyFunc) StreamReturns the first n elements in p.
func (s Stream) Head(n int64) StreamReturns the last item, or nil if no items.
func (s Stream) Last() (item any)Converts each item to another corresponding item, which means it’s a 1:1 model.
func (s Stream) Map(fn MapFunc, opts ...Option) StreamReturns the maximum item from the underlying source.
func (s Stream) Max(less LessFunc) anyMerges all the items into a slice and generates a new stream.
func (s Stream) Merge() StreamReturns the minimum item from the underlying source.
func (s Stream) Min(less LessFunc) anyNoneMatch
Section titled “NoneMatch”Returns whether all elements of this stream don’t match the provided predicate.
func (s Stream) NoneMatch(predicate func(item any) bool) boolParallel
Section titled “Parallel”Applies the given ParallelFunc to each item concurrently with given number of workers.
func (s Stream) Parallel(fn ParallelFunc, opts ...Option)Reduce
Section titled “Reduce”Is a utility method to let the caller deal with the underlying channel.
func (s Stream) Reduce(fn ReduceFunc) (any, error)Reverse
Section titled “Reverse”Reverses the elements in the stream.
func (s Stream) Reverse() StreamReturns a Stream that skips n elements.
func (s Stream) Skip(n int64) StreamSorts the items from the underlying source.
func (s Stream) Sort(less LessFunc) StreamSplits the elements into chunks with size up to n.
func (s Stream) Split(n int) StreamReturns the last n elements in p.
func (s Stream) Tail(n int64) StreamLets the callers handle each item. The caller may write zero, one, or more items based on the given item.
func (s Stream) Walk(fn WalkFunc, opts ...Option) StreamOptions
Section titled “Options”UnlimitedWorkers
Section titled “UnlimitedWorkers”Lets the caller use as many workers as the tasks.
func UnlimitedWorkers() OptionWithWorkers
Section titled “WithWorkers”Lets the caller customize the concurrent workers.
func WithWorkers(workers int) OptionExamples
Section titled “Examples”Here are some examples demonstrating how to use the fx package for stream processing operations.
Example 1: Creating a Stream from an Array and Filtering Elements
Section titled “Example 1: Creating a Stream from an Array and Filtering Elements”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/fx")
func main() { items := []any{1, 2, 3, 4, 5}
stream := fx.Just(items...). Filter(func(item any) bool { return item.(int)%2 == 0 })
for item := range stream.source { fmt.Println(item) }}Explanation:
- Use the
Justmethod to convert an array to a stream. - Use the
Filtermethod to filter out even numbers. - Iterate over the stream and print the results.
Example 2: Processing Stream Elements in Parallel
Section titled “Example 2: Processing Stream Elements in Parallel”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/fx")
func main() { items := []any{1, 2, 3, 4, 5}
fx.Just(items...). Parallel(func(item any) { fmt.Printf("Processing %v\n", item) }, fx.WithWorkers(3))}Explanation:
- Use the
Justmethod to convert an array to a stream. - Use the
Parallelmethod to process each element in parallel, specifying 3 workers. - Print each element being processed.
Example 3: Sorting Elements in a Stream
Section titled “Example 3: Sorting Elements in a Stream”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/fx")
func main() { items := []any{4, 2, 5, 1, 3}
stream := fx.Just(items...). Sort(func(a, b any) bool { return a.(int) < b.(int) })
for item := range stream.source { fmt.Println(item) }}Explanation:
- Use the
Justmethod to convert an array to a stream. - Use the
Sortmethod to sort the elements. - Iterate over the stream and print the sorted results.
Example 4: Reducing Elements in a Stream
Section titled “Example 4: Reducing Elements in a Stream”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/fx")
func main() { items := []any{1, 2, 3, 4, 5}
result, _ := fx.Just(items...). Reduce(func(pipe <-chan any) (any, error) { sum := 0 for item := range pipe { sum += item.(int) } return sum, nil })
fmt.Println(result) // Output: 15}Explanation:
- Use the
Justmethod to convert an array to a stream. - Use the
Reducemethod to calculate the sum of all elements in the stream. - Print the reduction result.
Example 5: Grouping Elements in a Stream
Section titled “Example 5: Grouping Elements in a Stream”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/fx")
func main() { items := []any{"apple", "banana", "avocado", "blueberry"}
stream := fx.Just(items...). Group(func(item any) any { return item.(string)[0] // Group by the first letter })
for group := range stream.source { fmt.Println(group) }}Explanation:
- Use the
Justmethod to convert an array to a stream. - Use the
Groupmethod to group elements by their first letter. - Iterate over the stream and print each group.
These examples demonstrate how the fx package can simplify data processing through stream operations. You can combine and apply these methods according to your specific requirements in your code.