fx
概述
fx 包提供了一系列用于流处理的函数和类型。流(Stream)是一个可以使用这些方法进行处理的数据集合。
函数类型
FilterFunc: 定义了过滤流元素的方法。ForAllFunc: 定义了处理所有流元素的方法。ForEachFunc: 定义了处理每个流元素的方法。GenerateFunc: 定义了向流中发送元素的方法。KeyFunc: 定义了生成流元素键值的方法。LessFunc: 定义了比较流元素的方法。MapFunc: 定义了将每个流元素映射到另一个对象的方法。Option: 定义了自定义流的方法。ParallelFunc: 定义了并行处理元素的方法。ReduceFunc: 定义了归约流元素的方法。WalkFunc: 定义了遍历所有流元素的方法。
Stream
type Stream struct {
source <-chan any // 流数据源
}
功能函数
创建流
Concat(s Stream, others ...Stream) Stream: 返回连接其他流的流。From(generate GenerateFunc) Stream: 从给定的生成函数构造一个流。Just(items ...any) Stream: 将给定的任意项目转换为流。Range(source <-chan any) Stream: 将给定的通道转换为流。
流操作
AllMatch(predicate func(item any) bool) bool: 返回流的所有元素是否满足提供的判断函数。AnyMatch(predicate func(item any) bool) bool: 返回流的任何元素是否满足提供的判断函数。Buffer(n int) Stream: 将项目缓冲到大小为n的队列中。Count() (count int): 计算结果中的元素数量。Distinct(fn KeyFunc) Stream: 根据给定的KeyFunc删除重复项。Done(): 等待所有上游操作完成。Filter(fn FilterFunc, opts ...Option) Stream: 根据给定的FilterFunc过滤项目。First() any: 返回第一个项目,如果没有项目则返回 nil。ForAll(fn ForAllFunc): 处理源中的流元素,并且没有后续的流操作。ForEach(fn ForEachFunc): 使用ForEachFunc处理每个项目,没有后续操作。Group(fn KeyFunc) Stream: 根据键将元素分组。Head(n int64) Stream: 返回前 n 个元素。Last() (item any): 返回最后一个项目,如果没有项目则返回 nil。Map(fn MapFunc, opts ...Option) Stream: 将每个项目映射到另一个相应的项目。Max(less LessFunc) any: 返回源中的最大项目。Merge() Stream: 将所有项目合并到一个切片中并生成一个新的流。Min(less LessFunc) any: 返回源中的最小项目。NoneMatch(predicate func(item any) bool) bool: 返回流的所有元素不满足提供的谓词。Parallel(fn ParallelFunc, opts ...Option): 并行应用给定的ParallelFunc到每个项目。Reduce(fn ReduceFunc) (any, error): 一个实用方法,用于处理底层通道。Reverse() Stream: 反转流中的元素。Skip(n int64) Stream: 返回跳过前 n 个元素后的流。Sort(less LessFunc) Stream: 对底层源中的项目进行排序。Split(n int) Stream: 将元素拆分为大小最多为 n 的块。Tail(n int64) Stream: 返回最后 n 个元素。Walk(fn WalkFunc, opts ...Option) Stream: 让调用者处理每个项目,调用者可以根据给定的项目写入零个、一个或多个项目。
配置选项
UnlimitedWorkers() Option: 允许调用者使用与任务一样多的工作线程。WithWorkers(workers int) Option: 允许调用者自定义并发工作线程数。
使用示例
以下是一些使用 fx 包的示例,展示了如何进行流处理操作。
示例 1: 从数组创建流并过滤元素
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
stream := fx.Just(items...).
Filter(func(item any) bool {
return item.(int)%2 == 0
})
for item := range stream.source {
fmt.Println(item)
}
}
解释:
- 使用
Just方法将数组转换为流。 - 使用
Filter过滤出偶数元素。 - 遍历流并打印结果。
示例 2: 并行处理流中的元素
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
fx.Just(items...).
Parallel(func(item any) {
fmt.Printf("Processing %v\n", item)
}, fx.WithWorkers(3))
}
解释:
- 使用
Just方法将数组转换为流。 - 使用
Parallel方法并行处理每个元素,指定使用 3 个工作线程。 - 打印正在处理的每个元素。
示例 3: 对流中的元素进行排序
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{4, 2, 5, 1, 3}
stream := fx.Just(items...).
Sort(func(a, b any) bool {
return a.(int) < b.(int)
})
for item := range stream.source {
fmt.Println(item)
}
}
解释:
- 使用
Just方法将数组转换为流。 - 使用
Sort方法对元素进行排序。 - 遍历流并打印排序后的结果。
示例 4: 归约流中的元素
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{1, 2, 3, 4, 5}
result, _ := fx.Just(items...).
Reduce(func(pipe <-chan any) (any, error) {
sum := 0
for item := range pipe {
sum += item.(int)
}
return sum, nil
})
fmt.Println(result) // 输出:15
}
解释:
- 使用
Just方法将数组转换为流。 - 使用
Reduce方法计算流中所有元素的总和。 - 打印归约结果。
示例 5: 分组流中的元素
package main
import (
"fmt"
"github.com/zeromicro/go-zero/core/fx"
)
func main() {
items := []any{"apple", "banana", "avocado", "blueberry"}
stream := fx.Just(items...).
Group(func(item any) any {
return item.(string)[0] // 按首字母分组
})
for group := range stream.source {
fmt.Println(group)
}
}
解释:
- 使用
Just方法将数组转换为流。 - 使用
Group方法按首字母对元素进行分组。 - 遍历流并打印每个分组。
这些示例展示了 fx 包如何通过流操作简化数据处理。可以根据实际需求在代码中组合和应用这些方法。