根据MapReduce Paper构造一个MapReduce系统。该系统主要包括master和worker。master主要负责分发任务、处理worker故障;worker主要负责根据mapreduce函数读写文件。






package mr

// RPC definitions.
// remember to capitalize all names.

import "os"
import "strconv"

type TaskRequest struct {

type TaskType int

const (
	MapTask = 1
	ReduceTask = 2

type TaskResponse struct {

	// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
	Filename string

	// task type is either map/reduce
	TypeOfTask TaskType

	// this is the serial number of task
	Serial int

	// NReduce is for dividing intermediate result into buckets
	NReduce int


// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s



package mr

import (
import "os"

type Master struct {
	// user TaskChannel to deliver task to workers
	TaskChannel chan TaskResponse
	// done will be true if all task is done
	done bool
	// sem is to protect done from concurrent read/write
	sem chan struct{}

// keep track of task
type TaskTrack struct {
	taskResp TaskResponse
	startTime time.Time

func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
	// extract a task from channel
	// if there is no task available, the thread which calls this function will go to sleep
	temp := <-m.TaskChannel
	response.Filename = temp.Filename
	response.TypeOfTask = temp.TypeOfTask
	response.Serial = temp.Serial
	response.NReduce = temp.NReduce
	return nil

// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
func (m *Master) Done() bool {
	ret := false

	// read m.done exclusively
	<- m.sem
	ret = m.done
	m.sem <- struct{}{}

	return ret

// task expires after ten seconds
func isExpired(task TaskTrack) bool {
	return time.Now().Sub(task.startTime).Seconds() > 10

func dispatcher(files []string, nReduce int, m *Master) {

	// remove intermediate files in case there is any collision
	for i := 0; i < len(files); i++ {
		filename := "mr-" + strconv.Itoa(i)
		err := os.Remove(filename)
		if err != nil && !os.IsNotExist(err) {
			log.Fatalf("error occurs while removing file %v", filename)

	var unfinishedTasks []TaskTrack
	//-------------------------------------------- dispatch map task --------------------------------------------
	for i, file := range files {
		resp := TaskResponse{Filename: file, TypeOfTask: MapTask, Serial: i}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
	// check if all map tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
			// check if intermediate file exists
			if _, err := os.Stat(filename); err == nil {
				// filename exists, which indicates that this track is completed
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				// track dispatch channel is empty && this task is expired, emit this task again
				m.TaskChannel <- track.taskResp
				// reset startTime of this task
				unfinishedTasks[i].startTime = time.Now()

	//-------------------------------------------- dispatch reduce task --------------------------------------------
	// all map tasks are completed, now start to emit reduce task
	// there are nReduce reduce tasks in total
	for i := 0; i < nReduce; i++ {
		resp := TaskResponse{TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})

	// check if all reduce tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
			if _, err := os.Stat(filename); err == nil {
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				m.TaskChannel <- track.taskResp
				// reset startTime
				unfinishedTasks[i].startTime = time.Now()

	// exclusively set status to done
	<- m.sem
	m.done = true
	m.sem <- struct{}{}

// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeMaster(files []string, nReduce int) *Master {

	// initialize master
	m := Master{TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{}, 1)}
	m.sem <- struct{}{}

	// dispatcher tasks in another thread
	go dispatcher(files, nReduce, &m)

	// start a thread that listens for RPCs from worker.go
	return &m

// start a thread that listens for RPCs from worker.go
func (m *Master) server() {
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	go http.Serve(l, nil)



package mr

import (
import "log"
import "net/rpc"
import "hash/fnv"

// Map functions return a slice of KeyValue.
type KeyValue struct {
	Key   string
	Value string

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
		log.Fatalf("error occurs while hashing key %v", key)
	return int(h.Sum32() & 0x7fffffff)

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.
	// worker调用rpc向master请求工作
	for true {
		task, ok := askForTask()
		// fail to contact master, which indicates that all tasks are done
		if !ok {

		if task.TypeOfTask == MapTask {
			doMapTask(task, mapf)
		} else {
			doReduceTask(task, reducef)

	// uncomment to send the Example RPC to the master.

func askForTask() (TaskResponse, bool) {
	request := TaskRequest{}
	response := TaskResponse{}
	ok := call("Master.DispatchTask", &request, &response)
	return response, ok

func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
	filename := task.Filename
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	err = file.Close()
	if err != nil {
		log.Fatalf("cannot close file %v", file.Name())
	kva := mapf(filename, string(content))
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	enc := json.NewEncoder(tempFile)
	for _, kv := range kva {
		err := enc.Encode(&kv)
		if err != nil {
			log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	// intermediate kv pairs are saved in mr-X
	err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))

func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
	var kva []KeyValue
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	// go to every intermediate file to collect corresponding keys
	i := 0
	for {
		file, err := os.Open("mr-" + strconv.Itoa(i))
		if err != nil {
			if os.IsNotExist(err) {
				// all intermediate files are read
			} else {
				log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
			// select keys that this worker need to reduce
			if ihash(kv.Key) % task.NReduce == task.Serial {
				kva = append(kva, kv)
	j := 0
	for j < len(kva) {
		k := j + 1
		for k < len(kva) && kva[j].Key == kva[k].Key {
		var values []string
		for u := j; u < k; u++ {
			values = append(values, kva[u].Value)
		output := reducef(kva[j].Key, values)
		_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
		if err != nil {
			log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
		j = k
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))

// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", ""+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true

	return false

