MapReduce
Overview
Section titled “Overview”The mr package provides a framework for performing map-reduce operations in Go. It supports concurrent execution of mapping and reducing functions with customizable settings.
Errors
Section titled “Errors”var ( ErrCancelWithNil = errors.New("mapreduce cancelled with nil") ErrReduceNoOutput = errors.New("reduce not writing value"))- ErrCancelWithNil: Error indicating that the map-reduce operation was canceled with nil.
- ErrReduceNoOutput: Error indicating that the reduce function did not produce any output.
ForEachFunc
Section titled “ForEachFunc”type ForEachFunc[T any] func(item T)Function type for processing each element without output.
GenerateFunc
Section titled “GenerateFunc”type GenerateFunc[T any] func(source chan<- T)Function type for generating elements to be processed.
MapFunc
Section titled “MapFunc”type MapFunc[T, U any] func(item T, writer Writer[U])Function type for processing an element and writing the output using a writer.
MapperFunc
Section titled “MapperFunc”type MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))Function type for processing an element with support for cancellation.
ReducerFunc
Section titled “ReducerFunc”type ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))Function type for reducing output elements from the mapping stage into a final result.
VoidReducerFunc
Section titled “VoidReducerFunc”type VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))Function type for reducing output elements without producing a final result.
Option
Section titled “Option”type Option func(opts *mapReduceOptions)Function type for customizing map-reduce options.
Writer
Section titled “Writer”type Writer[T any] interface { Write(v T)}Interface for writing values.
Functions
Section titled “Functions”Finish
Section titled “Finish”func Finish(fns ...func() error) errorRuns functions in parallel and cancels on any error.
FinishVoid
Section titled “FinishVoid”func FinishVoid(fns ...func())Runs functions in parallel without output.
ForEach
Section titled “ForEach”func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option)Maps all elements from the generate function but produces no output.
MapReduce
Section titled “MapReduce”func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],opts ...Option) (V, error)Performs map-reduce operation using the provided generate function, mapper, and reducer.
MapReduceChan
Section titled “MapReduceChan”func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],opts ...Option) (V, error)Performs map-reduce operation using the provided source channel, mapper, and reducer.
MapReduceVoid
Section titled “MapReduceVoid”func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],reducer VoidReducerFunc[U], opts ...Option) errorPerforms map-reduce operation using the provided generate function and mapper, but produces no final result.
WithContext
Section titled “WithContext”func WithContext(ctx context.Context) OptionCustomizes a map-reduce operation to use a given context.
WithWorkers
Section titled “WithWorkers”func WithWorkers(workers int) OptionCustomizes a map-reduce operation to use a specified number of workers.
Below are some examples demonstrating various functionalities of the mr package:
Example 1: Processing Each Element (ForEach)
Section titled “Example 1: Processing Each Element (ForEach)”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/mr")
func main() { generateFunc := func(source chan<- int) { for i := 0; i < 10; i++ { source <- i } }
mapperFunc := func(item int) { fmt.Println("Processing item:", item) }
mr.ForEach(generateFunc, mapperFunc, mr.WithWorkers(4))}Example 2: Simple MapReduce Operation
Section titled “Example 2: Simple MapReduce Operation”package main
import ( "fmt"
"github.com/zeromicro/go-zero/core/mr")
func main() { generateFunc := func(source chan<- int) { for i := 0; i < 10; i++ { source <- i } }
mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) { writer.Write(item * 2) }
reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) { sum := 0 for v := range pipe { sum += v } writer.Write(sum) }
result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithWorkers(4)) if err != nil { fmt.Println("Error:", err) } else { fmt.Println("Result:", result) // Output: Result: 90 }}Example 3: MapReduce Operation with Cancellation
Section titled “Example 3: MapReduce Operation with Cancellation”package main
import ( "context" "fmt" "time"
"github.com/zeromicro/go-zero/core/mr")
func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel()
generateFunc := func(source chan<- int) { for i := 0; i < 100; i++ { source <- i time.Sleep(100 * time.Millisecond) } }
mapperFunc := func(item int, writer mr.Writer[int], cancel func(error)) { writer.Write(item * 2) }
reducerFunc := func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) { sum := 0 for v := range pipe { sum += v } writer.Write(sum) }
result, err := mr.MapReduce(generateFunc, mapperFunc, reducerFunc, mr.WithContext(ctx), mr.WithWorkers(4)) if err != nil { fmt.Println("Error:", err) // Expected to timeout } else { fmt.Println("Result:", result) }}Example 4: Parallel Execution of Multiple Functions (Finish and FinishVoid)
Section titled “Example 4: Parallel Execution of Multiple Functions (Finish and FinishVoid)”package main
import ( "fmt" "errors"
"github.com/zeromicro/go-zero/core/mr")
func main() { funcs := []func() error{ func() error { fmt.Println("Function 1 executed") return nil }, func() error { fmt.Println("Function 2 executed") return errors.New("error in function 2") }, }
err := mr.Finish(funcs...) if err != nil { fmt.Println("Finish encountered an error:", err) }
voidFuncs := []func(){ func() { fmt.Println("Void Function 1 executed") }, func() { fmt.Println("Void Function 2 executed") }, }
mr.FinishVoid(voidFuncs...)}These examples showcase different usages of the mr package from go-zero, including basic element processing, simple MapReduce operations, MapReduce operations with cancellation, and parallel execution of multiple functions. Choose and modify these examples according to your specific requirements.