使用golang实现一个MapReduce的示例代码
package test
import (
"math"
"sync"
)
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
if length < 1 {
return
}
if maxConcurrent <= 1 || length <= chunkSize {
doChunkProcessSerially(length, procedure, reduce, chunkSize)
} else {
doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
}
}
// 同步处理
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
// 拆分的子任务数
chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
for i := 0; i < chunkNums; i++ {
func(chunkIndex int) {
defer func() {
if err := recover(); err != nil {
// 自定义错误处理
}
}()
start := chunkIndex * chunkSize
end := start + chunkSize
if end > length {
end = length
}
// 执行map
response, err := procedure(start, end)
// 执行reduce
if reduce != nil {
reduce(response, err, start, end)
}
}(i)
}
}
// 并发处理
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
index := 0
chunkIndex := 0
// 拆分的子任务数
lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
// 保证reduce同步执行
var lock sync.Mutex
// 保证子任务全部执行完成
var wg sync.WaitGroup
wg.Add(lengthTask)
// 控制并发数
throttleChan := make(chan struct{}, maxConcurrent)
for {
start := index
end := index + chunkSize
if end > length {
end = length
}
throttleChan <- struct{}{}
go func(chunkIndex int) {
defer func() {
<-throttleChan
if err := recover(); err != nil {
// 自定义错误处理
}
wg.Done()
}()
// 执行map
response, err := procedure(start, end)
// 执行reduce
if reduce != nil {
lock.Lock()
defer lock.Unlock()
reduce(response, err, start, end)
}
}(chunkIndex)
chunkIndex++
index = index + chunkSize
if index >= length {
break
}
}
wg.Wait()
close(throttleChan)
}