其他分享
首页 > 其他分享> > Lab1:实现简单的MapReduce框架

Lab1:实现简单的MapReduce框架

作者:互联网

1.目标

Lab1提供了单机串行的MR框架,可以直接运行。需要改写为Master-Slave式的架构,lab1也提供的调用框架和RPC通信示例,我们的任务聚焦在MR框架即可,主要实现的内容为:

  1. Worker不断请求Map任务,Coordinator将Map Task分发给Worker(一个原始输入文件对应一个Map Task)

  2. Map阶段
    a) Worker处理输入文件,Map函数输入为(filename string,content string),其中filename为输入文件名,content为该文件的内容,输出为KV数组;
    b) 我们需要将相同Key的二元组聚集到一起,然后根据Lab1提供的ihash函数将ihash(Key)%ReduceN相同的二元组写到同一中间文件;其中ReduceN为Lab1设定的Reduce job数目。如果我们输入文件为M,那么总的中间文件数目应小于等于M*ReduceN

  3. Worker不断请求Rudce任务,Coordinator将Reduce Task分发给Worker(一个ReduceID对应一个Reduce Job,这里的Job我理解的是Master一次性发给Worker的批量数据,在下次请求Job前,Work需要先把这次的数据处理完)

  4. Reduce阶段
    a) Worker处理ReduceId对应的所有文件,由于一个中间文件中可能有不同的Key,我们需要先聚集相同Key的二元组,然后分别给Reduce处理
    b) Reduce完成后,写入最终文件即可

2.实现

2.1 Worker端

worker.go

我们的Worker会不断的向Master请求任务,Master会将自己的状态(进行到哪一步)同步给Worker,Worker根据自己的状态决定请求Map Task还是Reduce Task

我们封装了Map函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件

func DoMap(reduceMax int, mapDone *bool, mapf func(string, string) []KeyValue) {
    //get map task
    reply := GetMapFileReply{}
    getMapFile(&reply, mapDone)
    //get content
    if reply.MaptaskNumber >= 0 {

        file, err := os.Open(reply.Filename)
        if err != nil {
            log.Fatalf("cannot open %v", reply.Filename)
        }
        content, err := ioutil.ReadAll(file)
        if err != nil {
            log.Fatalf("cannot read %v", reply.Filename)
        }
        file.Close()
        //call application map function
        kva := mapf(reply.Filename, string(content))

        sort.Sort(ByKey(kva))

        i := 0
        mapOutFileName := []string{}
        mapOutTmp2Final := make(map[string]string)
        mapOutFileContent := make(map[int][]MapOut)
        //split the content by key
        for i < len(kva) {
            j := i + 1
            for j < len(kva) && kva[j].Key == kva[i].Key {
                j++
            }
            mapout := MapOut{}
            for k := i; k < j; k++ {
                mapout.Value = append(mapout.Value, kva[k].Value)
            }
            mapout.Key = kva[i].Key
            reduceId := ihash(kva[i].Key) % reduceMax
            _, ok := mapOutFileContent[reduceId]
            if !ok {
                filename := "mr-" + strconv.Itoa(reply.MaptaskNumber) + "-" + strconv.Itoa(reduceId)
                //fmt.Println("file name is " + filename)
                mapOutFileName = append(mapOutFileName, filename)
            }
            mapOutFileContent[reduceId] = append(mapOutFileContent[reduceId], mapout)
            i = j
        }
        //generate the intermediate file
        for _, filename := range mapOutFileName {
            //get reduceId according to intermediate file name
            suffix := strings.Split(filename, "-")
            reduceId, _ := strconv.Atoi(suffix[2])

            //file, _ = os.Create(filename)
            file, _ = ioutil.TempFile("", filename+"*")
            mapOutTmp2Final[file.Name()] = filename
            enc := json.NewEncoder(file)
            for _, content := range mapOutFileContent[reduceId] {
                err := enc.Encode(&content)
                if err != nil {
                    fmt.Println("encode failed " + err.Error())
                }
            }
            file.Close()
        }
        //send one map file done
        sendMapDone(reply.MaptaskNumber, reply.Filename, mapOutTmp2Final)
    } else if *mapDone == false && reply.MaptaskNumber < 0 {
        //wait for all map finish
        time.Sleep(1)
    }
}

我们封装了Reduce函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件

func DoReduce(reduceDone *bool, reducef func(string, []string) string) {
    reply := GetReduceFileReply{}
    //get reduce task
    getReduceFile(&reply, reduceDone)
    kva := []MapOut{}
    ReduceOutTmp2Final := make(map[string]string)
    if !*reduceDone && reply.ReduceId >= 0 {
        //create out file
        outFileName := "mr-out-" + strconv.Itoa(reply.ReduceId)
        outfile, _ := ioutil.TempFile("", outFileName+"*")
        ReduceOutTmp2Final[outfile.Name()] = outFileName
        defer outfile.Close()

        //read from intermediate file
        for _, filename := range reply.Filename {
            file, _ := os.Open(filename)
            defer file.Close()
            dec := json.NewDecoder(file)
            for {
                var kv MapOut
                if err := dec.Decode(&kv); err != nil {
                    break
                }
                kva = append(kva, kv)
            }
        }
        sort.Sort(MapOutByKey(kva))
        //split content by key
        i := 0
        for i < len(kva) {
            j := i + 1
            for j < len(kva) && kva[j].Key == kva[i].Key {
                j++
            }
            intermediate := []string{}
            for k := i; k < j; k++ {
                intermediate = append(intermediate, kva[k].Value...)
            }
            reduceRes := reducef(kva[i].Key, intermediate)
            fmt.Fprintf(outfile, "%v %v\n", kva[i].Key, reduceRes)
            i = j
        }
        //call application reduce
        sendReduceDone(reply.ReduceId, ReduceOutTmp2Final)
    //wait for all reduce done
    } else if !*reduceDone && reply.ReduceId < 0 {
        time.Sleep(1)
    }
}

worker在完成任务后,会向Master发送“我搞定了”,Master此时检测是否当前阶段的所有任务都已经实现

2.2 Coordinator端

Master端需要对不同阶段、不同Task进度、超时时间等进行记录

type Coordinator struct {
    //map task : not started / running / finished
    mapTaskState map[string]int
    //used for timeout check
    mapTaskTime map[string]int64
    //idicated map task
    mapTaskNumber int
    //if in map step
    mapDone bool

    //intermediate map out file
    mapOutFileArray map[int][]string

    //if in reduce step
    reduceDone bool
    //reduce task : not started / running / finished
    reduceTaskState map[int]int
    //used for timeout check
    reduceTaskTime map[int]int64
    //given by caller,indicated the reduce job
    nReduce int

    //task state lock
    taskStatLock sync.Mutex
    //step state lock
    taskDone sync.Mutex
}

这里Map和Reduce流程类似,如下Map流程:

首先获取Map任务,并分发给Worker

func (c *Coordinator) GetMapInFile(args *GetMapFileArgs, reply *GetMapFileReply) error {
    //get map file
    c.taskDone.Lock()
    if !c.mapDone {
        c.taskDone.Unlock()
        c.taskStatLock.Lock()
        for task, _ := range c.mapTaskState {
            if c.mapTaskState[task] == -1 {
                reply.Filename = task
                reply.MapDone = false
                reply.MaptaskNumber = c.mapTaskNumber
                c.mapTaskNumber++
                c.mapTaskState[task] = 0
                c.mapTaskTime[task] = time.Now().Unix()
                c.taskStatLock.Unlock()
                return nil
            }
        }
        reply.MapDone = false
    } else {
        reply.MapDone = true
    }
    reply.MaptaskNumber = -1
    return nil
}

在收到一个Map Task完成后,记录任务状态并检查是否所有任务完成能进入下一状态

func (c *Coordinator) MapSingleFileDone(args *MapDoneArgs, reply *MapDoneReply) error {
    //set this map task done
    c.taskStatLock.Lock()
    c.mapTaskState[args.Filename] = 1
    c.taskStatLock.Unlock()
    //c.mapTaskDoneCollection = append(c.mapTaskDoneCollection, args.MaptaskNumber)
    //record reduceid <-> intermediate file name
    for tmpfile, filename := range args.MapOutTmp2Final {
        os.Rename(tmpfile, filename)
        suffix := strings.Split(filename, "-")
        reduceN, _ := strconv.Atoi(suffix[2])
        c.mapOutFileArray[reduceN] = append(c.mapOutFileArray[reduceN], filename)
    }
    //test if all map done
    reply.Y = args.MaptaskNumber
    for _, i := range c.mapTaskState {
        if i != 1 {
            return nil
        }
    }
    //if all map done,set reduce task state
    c.taskStatLock.Lock()
    for key := range c.mapOutFileArray {
        c.reduceTaskState[key] = -1
    }
    c.taskStatLock.Unlock()
    c.taskDone.Lock()
    c.mapDone = true
    c.taskDone.Unlock()
    return nil
}

3.Lab1中提到的Tips

  1. 对于所有文件,可以先使用ioutil.TempFile创建临时文件,在任务结束后再改名为最终文件;避免部分中间部分Worker退出或崩溃,导致最终的文件混乱
//创建临时文件,临时文件不可见
file, _ = ioutil.TempFile("", filename+"*")
//创建 临时文件 到 最终文件名的映射
mapOutTmp2Final[file.Name()] = filename
...
//修改为最终文件,因为前两步在Worker中进行,这一步在Master进行,所以使用map来缓存临时文件的文件路径
os.Rename(ReduceOutTmp2Final[tmpName], filename)
  1. 对于每个任务,需要有超时判断,如果任务超时,将任务发给其他Worker来做,这里我另外起了一个线程来做超时判断。

  2. 在go run时,使用-race来判断是否有竞态,及时加锁

4.结果与改进

1.完成了所有测试

2.使用锁之后,明显性能下降很多,需要优化锁的粒度和类型

3.对于任务应该抽象为结构体,使用管道通信,会让流程更简洁

标签:map,string,框架,kva,MapReduce,filename,Lab1,file,reply
来源: https://www.cnblogs.com/vstone/p/16649409.html