使用golang实现一个MapReduce的示例代码

  package test

  import (

  "math"

  "sync"

  )

  func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),

  reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {

  if length < 1 {

  return

  }

  if maxConcurrent <= 1 || length <= chunkSize {

  doChunkProcessSerially(length, procedure, reduce, chunkSize)

  } else {

  doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)

  }

  }

  // 同步处理

  func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),

  reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {

  // 拆分的子任务数

  chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))

  for i := 0; i < chunkNums; i++ {

  func(chunkIndex int) {

  defer func() {

  if err := recover(); err != nil {

  // 自定义错误处理

  }

  }()

  start := chunkIndex * chunkSize

  end := start + chunkSize

  if end > length {

  end = length

  }

  // 执行map

  response, err := procedure(start, end)

  // 执行reduce

  if reduce != nil {

  reduce(response, err, start, end)

  }

  }(i)

  }

  }

  // 并发处理

  func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),

  reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {

  index := 0

  chunkIndex := 0

  // 拆分的子任务数

  lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))

  // 保证reduce同步执行

  var lock sync.Mutex

  // 保证子任务全部执行完成

  var wg sync.WaitGroup

  wg.Add(lengthTask)

  // 控制并发数

  throttleChan := make(chan struct{}, maxConcurrent)

  for {

  start := index

  end := index + chunkSize

  if end > length {

  end = length

  }

  throttleChan <- struct{}{}

  go func(chunkIndex int) {

  defer func() {

  <-throttleChan

  if err := recover(); err != nil {

  // 自定义错误处理

  }

  wg.Done()

  }()

  // 执行map

  response, err := procedure(start, end)

  // 执行reduce

  if reduce != nil {

  lock.Lock()

  defer lock.Unlock()

  reduce(response, err, start, end)

  }

  }(chunkIndex)

  chunkIndex++

  index = index + chunkSize

  if index >= length {

  break

  }

  }

  wg.Wait()

  close(throttleChan)

  }