Lab1:实现简单的MapReduce框架
作者:互联网
1.目标
Lab1提供了单机串行的MR框架,可以直接运行。需要改写为Master-Slave式的架构,lab1也提供的调用框架和RPC通信示例,我们的任务聚焦在MR框架即可,主要实现的内容为:
-
Worker不断请求Map任务,Coordinator将Map Task分发给Worker(一个原始输入文件对应一个Map Task)
-
Map阶段
a) Worker处理输入文件,Map函数输入为(filename string,content string),其中filename为输入文件名,content为该文件的内容,输出为KV数组;
b) 我们需要将相同Key的二元组聚集到一起,然后根据Lab1提供的ihash函数将ihash(Key)%ReduceN相同的二元组写到同一中间文件;其中ReduceN为Lab1设定的Reduce job数目。如果我们输入文件为M,那么总的中间文件数目应小于等于M*ReduceN -
Worker不断请求Rudce任务,Coordinator将Reduce Task分发给Worker(一个ReduceID对应一个Reduce Job,这里的Job我理解的是Master一次性发给Worker的批量数据,在下次请求Job前,Work需要先把这次的数据处理完)
-
Reduce阶段
a) Worker处理ReduceId对应的所有文件,由于一个中间文件中可能有不同的Key,我们需要先聚集相同Key的二元组,然后分别给Reduce处理
b) Reduce完成后,写入最终文件即可
2.实现
2.1 Worker端
worker.go
我们的Worker会不断的向Master请求任务,Master会将自己的状态(进行到哪一步)同步给Worker,Worker根据自己的状态决定请求Map Task还是Reduce Task
我们封装了Map函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件
func DoMap(reduceMax int, mapDone *bool, mapf func(string, string) []KeyValue) {
//get map task
reply := GetMapFileReply{}
getMapFile(&reply, mapDone)
//get content
if reply.MaptaskNumber >= 0 {
file, err := os.Open(reply.Filename)
if err != nil {
log.Fatalf("cannot open %v", reply.Filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.Filename)
}
file.Close()
//call application map function
kva := mapf(reply.Filename, string(content))
sort.Sort(ByKey(kva))
i := 0
mapOutFileName := []string{}
mapOutTmp2Final := make(map[string]string)
mapOutFileContent := make(map[int][]MapOut)
//split the content by key
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
mapout := MapOut{}
for k := i; k < j; k++ {
mapout.Value = append(mapout.Value, kva[k].Value)
}
mapout.Key = kva[i].Key
reduceId := ihash(kva[i].Key) % reduceMax
_, ok := mapOutFileContent[reduceId]
if !ok {
filename := "mr-" + strconv.Itoa(reply.MaptaskNumber) + "-" + strconv.Itoa(reduceId)
//fmt.Println("file name is " + filename)
mapOutFileName = append(mapOutFileName, filename)
}
mapOutFileContent[reduceId] = append(mapOutFileContent[reduceId], mapout)
i = j
}
//generate the intermediate file
for _, filename := range mapOutFileName {
//get reduceId according to intermediate file name
suffix := strings.Split(filename, "-")
reduceId, _ := strconv.Atoi(suffix[2])
//file, _ = os.Create(filename)
file, _ = ioutil.TempFile("", filename+"*")
mapOutTmp2Final[file.Name()] = filename
enc := json.NewEncoder(file)
for _, content := range mapOutFileContent[reduceId] {
err := enc.Encode(&content)
if err != nil {
fmt.Println("encode failed " + err.Error())
}
}
file.Close()
}
//send one map file done
sendMapDone(reply.MaptaskNumber, reply.Filename, mapOutTmp2Final)
} else if *mapDone == false && reply.MaptaskNumber < 0 {
//wait for all map finish
time.Sleep(1)
}
}
我们封装了Reduce函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件
func DoReduce(reduceDone *bool, reducef func(string, []string) string) {
reply := GetReduceFileReply{}
//get reduce task
getReduceFile(&reply, reduceDone)
kva := []MapOut{}
ReduceOutTmp2Final := make(map[string]string)
if !*reduceDone && reply.ReduceId >= 0 {
//create out file
outFileName := "mr-out-" + strconv.Itoa(reply.ReduceId)
outfile, _ := ioutil.TempFile("", outFileName+"*")
ReduceOutTmp2Final[outfile.Name()] = outFileName
defer outfile.Close()
//read from intermediate file
for _, filename := range reply.Filename {
file, _ := os.Open(filename)
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv MapOut
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
sort.Sort(MapOutByKey(kva))
//split content by key
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
intermediate := []string{}
for k := i; k < j; k++ {
intermediate = append(intermediate, kva[k].Value...)
}
reduceRes := reducef(kva[i].Key, intermediate)
fmt.Fprintf(outfile, "%v %v\n", kva[i].Key, reduceRes)
i = j
}
//call application reduce
sendReduceDone(reply.ReduceId, ReduceOutTmp2Final)
//wait for all reduce done
} else if !*reduceDone && reply.ReduceId < 0 {
time.Sleep(1)
}
}
worker在完成任务后,会向Master发送“我搞定了”,Master此时检测是否当前阶段的所有任务都已经实现
2.2 Coordinator端
Master端需要对不同阶段、不同Task进度、超时时间等进行记录
type Coordinator struct {
//map task : not started / running / finished
mapTaskState map[string]int
//used for timeout check
mapTaskTime map[string]int64
//idicated map task
mapTaskNumber int
//if in map step
mapDone bool
//intermediate map out file
mapOutFileArray map[int][]string
//if in reduce step
reduceDone bool
//reduce task : not started / running / finished
reduceTaskState map[int]int
//used for timeout check
reduceTaskTime map[int]int64
//given by caller,indicated the reduce job
nReduce int
//task state lock
taskStatLock sync.Mutex
//step state lock
taskDone sync.Mutex
}
这里Map和Reduce流程类似,如下Map流程:
首先获取Map任务,并分发给Worker
func (c *Coordinator) GetMapInFile(args *GetMapFileArgs, reply *GetMapFileReply) error {
//get map file
c.taskDone.Lock()
if !c.mapDone {
c.taskDone.Unlock()
c.taskStatLock.Lock()
for task, _ := range c.mapTaskState {
if c.mapTaskState[task] == -1 {
reply.Filename = task
reply.MapDone = false
reply.MaptaskNumber = c.mapTaskNumber
c.mapTaskNumber++
c.mapTaskState[task] = 0
c.mapTaskTime[task] = time.Now().Unix()
c.taskStatLock.Unlock()
return nil
}
}
reply.MapDone = false
} else {
reply.MapDone = true
}
reply.MaptaskNumber = -1
return nil
}
在收到一个Map Task完成后,记录任务状态并检查是否所有任务完成能进入下一状态
func (c *Coordinator) MapSingleFileDone(args *MapDoneArgs, reply *MapDoneReply) error {
//set this map task done
c.taskStatLock.Lock()
c.mapTaskState[args.Filename] = 1
c.taskStatLock.Unlock()
//c.mapTaskDoneCollection = append(c.mapTaskDoneCollection, args.MaptaskNumber)
//record reduceid <-> intermediate file name
for tmpfile, filename := range args.MapOutTmp2Final {
os.Rename(tmpfile, filename)
suffix := strings.Split(filename, "-")
reduceN, _ := strconv.Atoi(suffix[2])
c.mapOutFileArray[reduceN] = append(c.mapOutFileArray[reduceN], filename)
}
//test if all map done
reply.Y = args.MaptaskNumber
for _, i := range c.mapTaskState {
if i != 1 {
return nil
}
}
//if all map done,set reduce task state
c.taskStatLock.Lock()
for key := range c.mapOutFileArray {
c.reduceTaskState[key] = -1
}
c.taskStatLock.Unlock()
c.taskDone.Lock()
c.mapDone = true
c.taskDone.Unlock()
return nil
}
3.Lab1中提到的Tips
- 对于所有文件,可以先使用ioutil.TempFile创建临时文件,在任务结束后再改名为最终文件;避免部分中间部分Worker退出或崩溃,导致最终的文件混乱
//创建临时文件,临时文件不可见
file, _ = ioutil.TempFile("", filename+"*")
//创建 临时文件 到 最终文件名的映射
mapOutTmp2Final[file.Name()] = filename
...
//修改为最终文件,因为前两步在Worker中进行,这一步在Master进行,所以使用map来缓存临时文件的文件路径
os.Rename(ReduceOutTmp2Final[tmpName], filename)
-
对于每个任务,需要有超时判断,如果任务超时,将任务发给其他Worker来做,这里我另外起了一个线程来做超时判断。
-
在go run时,使用-race来判断是否有竞态,及时加锁
4.结果与改进
1.完成了所有测试
2.使用锁之后,明显性能下降很多,需要优化锁的粒度和类型
3.对于任务应该抽象为结构体,使用管道通信,会让流程更简洁
标签:map,string,框架,kva,MapReduce,filename,Lab1,file,reply 来源: https://www.cnblogs.com/vstone/p/16649409.html