其他分享
首页 > 其他分享> > MapReduce极简实现

MapReduce极简实现

作者:互联网

0 概述

MapReduce是一种广泛运用的分布式-大数据计算编程模型,最初由Google发表,其开源实现为Hadoop。

MapReduce 的编程模型非常简单,正如名字一样,用户仅仅需要实现一个 Map 函数,一个 Reduce 函数。

可以解决的任务例子:

1 MapReduce结构

一图胜千言:

截屏2022-06-29 21.47.52

2 总体设计

以完成6.8242021Spring的lab1为目标。

可以通过以下git命令:clone代码:

git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824

master采用lazy分配任务方法,由worker主动去触发任务分配、任务结束等操作。master分配不同的块给不同的worker执行。

因此worker需要实现获取任务,任务结束等RPC,代码如下:

type GetTaskArgs struct {
}

type GetTaskReply struct {
	Type      TaskType
	Filenames []string
	Task_no   int
	NReduce   int
	Err       Errno
}

type FinishTaskArgs struct {
	Type    TaskType
	Task_no int
}

type FinishTaskReply struct {
	Err Errno
}

3 worker设计

worker的工作就是不断获取任务,若任务完成则提交之。

其主要代码为:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for {
		args := GetTaskArgs{}
		reply := GetTaskReply{}
		log.Printf("get task request: %v\n", args)
		ok := CallGetTask(&args, &reply)
		log.Printf("recv get task reply: %v\n", reply)
		if !ok || reply.Type == STOP {
			break
		}

		// handle map fynction
		switch reply.Type {
		case MAP:
			if len(reply.Filenames) < 1 {
				log.Fatalf("don't have filename")
			}
			DoMAP(reply.Filenames[0], reply.Task_no, reply.NReduce, mapf)
			// map complete, send msg to master
			finish_args := FinishTaskArgs{
				Type:    MAP,
				Task_no: reply.Task_no,
			}
			finish_reply := FinishTaskReply{}
			log.Printf("finish request: %v\n", finish_args)
			CallFinishTask(&finish_args, &finish_reply)
			log.Printf("recv finish reply: %v\n", finish_reply)
			// time.Sleep(time.Second)
		case REDUCE:
			if len(reply.Filenames) < 1 {
				log.Fatalf("don't have filenames")
			}
			DoReduce(reply.Filenames, reply.Task_no, reducef)
			// reduce complete, send msg to master
			finish_args := FinishTaskArgs{
				Type:    REDUCE,
				Task_no: reply.Task_no,
			}
			finish_reply := FinishTaskReply{}
			log.Printf("finish request: %v\n", finish_args)
			CallFinishTask(&finish_args, &finish_reply)
			log.Printf("recv finish reply: %v\n", finish_reply)
			// time.Sleep(time.Second)
		case WAIT:
			log.Printf("wait task\n")
			time.Sleep(time.Second)
		default:
			time.Sleep(time.Second)
		}
	}
}

其中分MAP、REDUCE、WAIT和STOP四个状态:

其中最重要的为map和reduce任务的执行。

map任务的执行实现代码如下:(对应上图中的2、3、4步)

func DoMAP(filename string, task_no int, nReduce int, mapf func(string, string) []KeyValue) {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()

	kva := mapf(filename, string(content))

	sort.Sort(ByKey(kva))

	log.Println("encode to json")
	files := make([]*os.File, nReduce)
	encoders := make([]*json.Encoder, nReduce)
	for i := 0; i < nReduce; i++ {
		ofile, err := ioutil.TempFile("", "mr-tmp*")
		if err != nil {
			log.Fatalf("cannot create temp file")
		}
		defer ofile.Close()

		encoder := json.NewEncoder(ofile)
		encoders[i] = encoder
		files[i] = ofile
	}

	var index int
	for _, kv := range kva {
		index = ihash(kv.Key) % nReduce
		err = encoders[index].Encode(&kv)
		if err != nil {
			log.Fatalf("cannot encode %v", kv)
		}
	}

	// atomically rename
	for i := 0; i < nReduce; i++ {
		filename_tmp := fmt.Sprintf("mr-%d-%d", task_no, i)
		err := os.Rename(files[i].Name(), filename_tmp)
		if err != nil {
			log.Fatalf("cannot rename %v to %v", files[i].Name(), filename_tmp)
		}
	}
}

比较有意思的是map需要通过一个hash函数将相同的条目分布在同一输出文件中:

func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

var index int
for _, kv := range kva {
  index = ihash(kv.Key) % nReduce
  err = encoders[index].Encode(&kv)
  if err != nil {
  log.Fatalf("cannot encode %v", kv)
  }
}

reduce任务的执行实现代码如下:(对应上图中的5、6步)

func DoReduce(filenames []string, task_no int, reducef func(string, []string) string) {
	// read data from mid-file
	kva := make([]KeyValue, 0)
	for _, filename := range filenames {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		defer file.Close()
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}

	sort.Sort(ByKey(kva))

	// call Reduce on each distinct key in kva[],
	// and print the result to mr-out-0.
	ofile, err := ioutil.TempFile("", "mr-out-tmp*")
	if err != nil {
		log.Fatalf("cannot create temp file")
	}
	defer ofile.Close()

	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

		i = j
	}

	output_filename := fmt.Sprintf("mr-out-%d", task_no)
	err = os.Rename(ofile.Name(), output_filename)
	if err != nil {
		log.Fatalf("cannot rename %v to %v", ofile.Name(), output_filename)
	}
}

按道理应该是要在GFS上读写文件的,条件不允许,就直接采用UNIX的文件系统了。

4 master设计

master的设计还是比较简单的,只包含很少的信息:

type Coordinator struct {
	tasks   []Task
	nReduce int
	nMap    int
	status  CoordinatorStatus
	mu      sync.Mutex
}

对所需要进行的任务信息进行定义,如下:

type TaskStatus int

const (
	idle TaskStatus = iota
	in_progress
	completed
)

type Task struct {
	tno       int
	filenames []string
	status    TaskStatus
	startTime time.Time
}

其主要就是接受worker的两个RPC请求。

获取任务的RPC handler实现如下:

func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	finish_flag := c.IsAllFinish()
	if finish_flag {
		c.NextPhase()
	}
	for i := 0; i < len(c.tasks); i++ {
		if c.tasks[i].status == idle {
			log.Printf("send task %d to worker\n", i)
			reply.Err = SuccessCode
			reply.Task_no = i
			reply.Filenames = c.tasks[i].filenames
			if c.status == MAP_PHASE {
				reply.Type = MAP
				reply.NReduce = c.nReduce
			} else if c.status == REDUCE_PHASE {
				reply.NReduce = 0
				reply.Type = REDUCE
			} else {
				log.Fatal("unexpected status")
			}
			c.tasks[i].startTime = time.Now()
			c.tasks[i].status = in_progress
			return nil
		} else if c.tasks[i].status == in_progress {
			curr := time.Now()
			if curr.Sub(c.tasks[i].startTime) > time.Second*10 {
				log.Printf("resend task %d to worker\n", i)
				reply.Err = SuccessCode
				reply.Task_no = i
				reply.Filenames = c.tasks[i].filenames
				if c.status == MAP_PHASE {
					reply.Type = MAP
					reply.NReduce = c.nReduce
				} else if c.status == REDUCE_PHASE {
					reply.NReduce = 0
					reply.Type = REDUCE
				} else {
					log.Fatal("unexpected status")
				}
				c.tasks[i].startTime = time.Now()
				return nil
			}
		}
	}
	reply.Err = SuccessCode
	reply.Type = WAIT
	return nil
}

完成任务的RPC handler实现如下:

func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if args.Task_no >= len(c.tasks) || args.Task_no < 0 {
		reply.Err = ParaErrCode
		return nil
	}
	c.tasks[args.Task_no].status = completed
	if c.IsAllFinish() {
		c.NextPhase()
	}
	return nil
}

检查全部任务是否完成,完成就进入下一个阶段:

func (c *Coordinator) IsAllFinish() bool {
	for i := len(c.tasks) - 1; i >= 0; i-- {
		if c.tasks[i].status != completed {
			return false
		}
	}
	return true
}

func (c *Coordinator) NextPhase() {
	if c.status == MAP_PHASE {
		log.Println("change to REDUCE_PHASE")
		c.MakeReduceTasks()
		c.status = REDUCE_PHASE
	} else if c.status == REDUCE_PHASE {
		log.Println("change to FINISH_PHASE")
		c.status = FINISH_PHASE
	} else {
		log.Println("unexpected status change!")
	}
}

客户端查看MapReduce任务是否完成:

func (c *Coordinator) Done() bool {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.status == FINISH_PHASE {
		return true
	}
	return false
}

5 客户端如何使用呢?

写两个函数(Map和Reduce)就行啦:

//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
	// function to detect word separators.
	ff := func(r rune) bool { return !unicode.IsLetter(r) }

	// split contents into an array of words.
	words := strings.FieldsFunc(contents, ff)

	kva := []mr.KeyValue{}
	for _, w := range words {
		kv := mr.KeyValue{w, "1"}
		kva = append(kva, kv)
	}
	return kva
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
	// return the number of occurrences of this word.
	return strconv.Itoa(len(values))
}

6 附录

详细代码可以参考:

仓库

commit

标签:status,极简,log,err,kva,实现,MapReduce,reply,string
来源: https://www.cnblogs.com/cxl-/p/16425143.html