其他分享
首页 > 其他分享> > 6.824 Lab 1: MapReduce

6.824 Lab 1: MapReduce

作者:互联网

lab源地址

简介

根据MapReduce Paper构造一个MapReduce系统。该系统主要包括master和worker。master主要负责分发任务、处理worker故障;worker主要负责根据mapreduce函数读写文件。

思路

具体实现

以下给出rpc.gomaster.goworker.go三个文件。

rpc.go

rpc.go定义了master和worker通信的数据结构:

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

type TaskRequest struct {
}

type TaskType int

const (
	MapTask = 1
	ReduceTask = 2
)

type TaskResponse struct {

	// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
	Filename string

	// task type is either map/reduce
	TypeOfTask TaskType

	// this is the serial number of task
	Serial int

	// NReduce is for dividing intermediate result into buckets
	NReduce int

}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

master.go

master实现如下:

package mr

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
	"strconv"
	"time"
)
import "os"

type Master struct {
	// user TaskChannel to deliver task to workers
	TaskChannel chan TaskResponse
	// done will be true if all task is done
	done bool
	// sem is to protect done from concurrent read/write
	sem chan struct{}
}

// keep track of task
type TaskTrack struct {
	taskResp TaskResponse
	startTime time.Time
}

func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
	// extract a task from channel
	// if there is no task available, the thread which calls this function will go to sleep
	temp := <-m.TaskChannel
	response.Filename = temp.Filename
	response.TypeOfTask = temp.TypeOfTask
	response.Serial = temp.Serial
	response.NReduce = temp.NReduce
	return nil
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	ret := false

	// read m.done exclusively
	<- m.sem
	ret = m.done
	m.sem <- struct{}{}

	return ret
}


// task expires after ten seconds
func isExpired(task TaskTrack) bool {
	return time.Now().Sub(task.startTime).Seconds() > 10
}

func dispatcher(files []string, nReduce int, m *Master) {

	// remove intermediate files in case there is any collision
	for i := 0; i < len(files); i++ {
		filename := "mr-" + strconv.Itoa(i)
		err := os.Remove(filename)
		if err != nil && !os.IsNotExist(err) {
			log.Fatalf("error occurs while removing file %v", filename)
		}
	}

	var unfinishedTasks []TaskTrack
	//-------------------------------------------- dispatch map task --------------------------------------------
	for i, file := range files {
		resp := TaskResponse{Filename: file, TypeOfTask: MapTask, Serial: i}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
	}
	// check if all map tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
			// check if intermediate file exists
			if _, err := os.Stat(filename); err == nil {
				// filename exists, which indicates that this track is completed
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				// track dispatch channel is empty && this task is expired, emit this task again
				m.TaskChannel <- track.taskResp
				// reset startTime of this task
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	//-------------------------------------------- dispatch reduce task --------------------------------------------
	// all map tasks are completed, now start to emit reduce task
	// there are nReduce reduce tasks in total
	for i := 0; i < nReduce; i++ {
		resp := TaskResponse{TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
	}

	// check if all reduce tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
			if _, err := os.Stat(filename); err == nil {
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				m.TaskChannel <- track.taskResp
				// reset startTime
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	// exclusively set status to done
	<- m.sem
	m.done = true
	m.sem <- struct{}{}
}


//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {

	// initialize master
	m := Master{TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{}, 1)}
	m.sem <- struct{}{}

	// dispatcher tasks in another thread
	go dispatcher(files, nReduce, &m)

	// start a thread that listens for RPCs from worker.go
	m.server()
	return &m
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

worker.go

worker实现如下:

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
)
import "log"
import "net/rpc"
import "hash/fnv"


//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
		log.Fatalf("error occurs while hashing key %v", key)
	}
	return int(h.Sum32() & 0x7fffffff)
}


//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.
	// worker调用rpc向master请求工作
	for true {
		task, ok := askForTask()
		// fail to contact master, which indicates that all tasks are done
		if !ok {
			break
		}

		if task.TypeOfTask == MapTask {
			doMapTask(task, mapf)
		} else {
			doReduceTask(task, reducef)
		}
	}

	// uncomment to send the Example RPC to the master.
	//CallExample()
}

func askForTask() (TaskResponse, bool) {
	request := TaskRequest{}
	response := TaskResponse{}
	ok := call("Master.DispatchTask", &request, &response)
	return response, ok
}


func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
	filename := task.Filename
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	err = file.Close()
	if err != nil {
		log.Fatalf("cannot close file %v", file.Name())
	}
	kva := mapf(filename, string(content))
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	enc := json.NewEncoder(tempFile)
	for _, kv := range kva {
		err := enc.Encode(&kv)
		if err != nil {
			log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
		}
	}
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	// intermediate kv pairs are saved in mr-X
	err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))
	}
}

func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
	var kva []KeyValue
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	// go to every intermediate file to collect corresponding keys
	i := 0
	for {
		file, err := os.Open("mr-" + strconv.Itoa(i))
		if err != nil {
			if os.IsNotExist(err) {
				// all intermediate files are read
				break
			} else {
				log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
			}
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			// select keys that this worker need to reduce
			if ihash(kv.Key) % task.NReduce == task.Serial {
				kva = append(kva, kv)
			}
		}
		i++
	}
	sort.Sort(ByKey(kva))
	j := 0
	for j < len(kva) {
		k := j + 1
		for k < len(kva) && kva[j].Key == kva[k].Key {
			k++
		}
		var values []string
		for u := j; u < k; u++ {
			values = append(values, kva[u].Value)
		}
		output := reducef(kva[j].Key, values)
		_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
		if err != nil {
			log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
		}
		j = k
	}
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))
	}
}


//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

标签:6.824,string,err,nil,MapReduce,Lab,tempFile,task,master
来源: https://blog.csdn.net/weixin_43116322/article/details/119274470