Skip to content

FX Pipeline

The fx package provides a powerful and flexible API for stream processing. It allows users to perform various operations on streams, including filtering, mapping, reducing, grouping, and more.

Defines the method to filter a Stream.

type FilterFunc func(item any) bool

Defines the method to handle all elements in a Stream.

type ForAllFunc func(pipe <-chan any)

Defines the method to handle each element in a Stream.

type ForEachFunc func(item any)

Defines the method to send elements into a Stream.

type GenerateFunc func(source chan<- any)

Defines the method to generate keys for the elements in a Stream.

type KeyFunc func(item any) any

Defines the method to compare the elements in a Stream.

type LessFunc func(a, b any) bool

Defines the method to map each element to another object in a Stream.

type MapFunc func(item any) any

Defines the method to customize a Stream.

type Option func(opts *rxOptions)

Defines the method to handle elements parallelly.

type ParallelFunc func(item any)

Defines the method to reduce all the elements in a Stream.

type ReduceFunc func(pipe <-chan any) (any, error)

Defines the method to walk through all the elements in a Stream.

type WalkFunc func(item any, pipe chan<- any)

A Stream is a stream that can be used to do stream processing.

type Stream struct {
source <-chan any
}

Returns a concatenated Stream.

func Concat(s Stream, others ...Stream) Stream

Constructs a Stream from the given GenerateFunc.

func From(generate GenerateFunc) Stream

Converts the given arbitrary items to a Stream.

func Just(items ...any) Stream

Converts the given channel to a Stream.

func Range(source <-chan any) Stream

Returns whether all elements of this stream match the provided predicate.

func (s Stream) AllMatch(predicate func(item any) bool) bool

Returns whether any elements of this stream match the provided predicate.

func (s Stream) AnyMatch(predicate func(item any) bool) bool

Buffers the items into a queue with size n.

func (s Stream) Buffer(n int) Stream

Returns a Stream that concatenated other streams.

func (s Stream) Concat(others ...Stream) Stream

Counts the number of elements in the result.

func (s Stream) Count() (count int)

Removes the duplicated items based on the given KeyFunc.

func (s Stream) Distinct(fn KeyFunc) Stream

Waits all upstreaming operations to be done.

func (s Stream) Done()

Filters the items by the given FilterFunc.

func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream

Returns the first item, nil if no items.

func (s Stream) First() any

Handles the streaming elements from the source and no later streams.

func (s Stream) ForAll(fn ForAllFunc)

Seals the Stream with the ForEachFunc on each item, no successive operations.

func (s Stream) ForEach(fn ForEachFunc)

Groups the elements into different groups based on their keys.

func (s Stream) Group(fn KeyFunc) Stream

Returns the first n elements in p.

func (s Stream) Head(n int64) Stream

Returns the last item, or nil if no items.

func (s Stream) Last() (item any)

Converts each item to another corresponding item, which means it’s a 1:1 model.

func (s Stream) Map(fn MapFunc, opts ...Option) Stream

Returns the maximum item from the underlying source.

func (s Stream) Max(less LessFunc) any

Merges all the items into a slice and generates a new stream.

func (s Stream) Merge() Stream

Returns the minimum item from the underlying source.

func (s Stream) Min(less LessFunc) any

Returns whether all elements of this stream don’t match the provided predicate.

func (s Stream) NoneMatch(predicate func(item any) bool) bool

Applies the given ParallelFunc to each item concurrently with given number of workers.

func (s Stream) Parallel(fn ParallelFunc, opts ...Option)

Is a utility method to let the caller deal with the underlying channel.

func (s Stream) Reduce(fn ReduceFunc) (any, error)

Reverses the elements in the stream.

func (s Stream) Reverse() Stream

Returns a Stream that skips n elements.

func (s Stream) Skip(n int64) Stream

Sorts the items from the underlying source.

func (s Stream) Sort(less LessFunc) Stream

Splits the elements into chunks with size up to n.

func (s Stream) Split(n int) Stream

Returns the last n elements in p.

func (s Stream) Tail(n int64) Stream

Lets the callers handle each item. The caller may write zero, one, or more items based on the given item.

func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream

Lets the caller use as many workers as the tasks.

func UnlimitedWorkers() Option

Lets the caller customize the concurrent workers.

func WithWorkers(workers int) Option

Here are some examples demonstrating how to use the fx package for stream processing operations.

Example 1: Creating a Stream from an Array and Filtering Elements

Section titled “Example 1: Creating a Stream from an Array and Filtering Elements”
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
stream := fx.Just(items...).
Filter(func(item any) bool {
return item.(int)%2 == 0
})
for item := range stream.source {
fmt.Println(item)
}
}

Explanation:

  1. Use the Just method to convert an array to a stream.
  2. Use the Filter method to filter out even numbers.
  3. Iterate over the stream and print the results.

Example 2: Processing Stream Elements in Parallel

Section titled “Example 2: Processing Stream Elements in Parallel”
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
fx.Just(items...).
Parallel(func(item any) {
fmt.Printf("Processing %v\n", item)
}, fx.WithWorkers(3))
}

Explanation:

  1. Use the Just method to convert an array to a stream.
  2. Use the Parallel method to process each element in parallel, specifying 3 workers.
  3. Print each element being processed.
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{4, 2, 5, 1, 3}
stream := fx.Just(items...).
Sort(func(a, b any) bool {
return a.(int) < b.(int)
})
for item := range stream.source {
fmt.Println(item)
}
}

Explanation:

  1. Use the Just method to convert an array to a stream.
  2. Use the Sort method to sort the elements.
  3. Iterate over the stream and print the sorted results.
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
result, _ := fx.Just(items...).
Reduce(func(pipe <-chan any) (any, error) {
sum := 0
for item := range pipe {
sum += item.(int)
}
return sum, nil
})
fmt.Println(result) // Output: 15
}

Explanation:

  1. Use the Just method to convert an array to a stream.
  2. Use the Reduce method to calculate the sum of all elements in the stream.
  3. Print the reduction result.
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{"apple", "banana", "avocado", "blueberry"}
stream := fx.Just(items...).
Group(func(item any) any {
return item.(string)[0] // Group by the first letter
})
for group := range stream.source {
fmt.Println(group)
}
}

Explanation:

  1. Use the Just method to convert an array to a stream.
  2. Use the Group method to group elements by their first letter.
  3. Iterate over the stream and print each group.

These examples demonstrate how the fx package can simplify data processing through stream operations. You can combine and apply these methods according to your specific requirements in your code.