MapReduce极简实现
作者:互联网
0 概述
MapReduce是一种广泛运用的分布式-大数据计算编程模型,最初由Google发表,其开源实现为Hadoop。
MapReduce 的编程模型非常简单,正如名字一样,用户仅仅需要实现一个 Map 函数,一个 Reduce 函数。
-
Map 函数,即映射函数:它会接受一个 key-value 对,然后把这个 key-value 对转换成 0 到多个新的 key-value 对并输出出去。
map (k1, v1) -> list (k2, v2)
-
Reduce 函数,即化简函数:它接受一个 Key,以及这个 Key 下的一组 Value,然后化简成一组新的值 Value 输出出去。
reduce (k2, list(v2)) -> list(v3)
可以解决的任务例子:
- 分布式 grep;
- 统计 URL 的访问频次;
- 反转网页 - 链接图;
- 分域名的词向量;
- 生成倒排索引;
- 分布式排序。
1 MapReduce结构
一图胜千言:
2 总体设计
以完成6.8242021Spring的lab1为目标。
可以通过以下git命令:clone代码:
git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824
master采用lazy分配任务方法,由worker主动去触发任务分配、任务结束等操作。master分配不同的块给不同的worker执行。
因此worker需要实现获取任务,任务结束等RPC,代码如下:
type GetTaskArgs struct {
}
type GetTaskReply struct {
Type TaskType
Filenames []string
Task_no int
NReduce int
Err Errno
}
type FinishTaskArgs struct {
Type TaskType
Task_no int
}
type FinishTaskReply struct {
Err Errno
}
3 worker设计
worker的工作就是不断获取任务,若任务完成则提交之。
其主要代码为:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
for {
args := GetTaskArgs{}
reply := GetTaskReply{}
log.Printf("get task request: %v\n", args)
ok := CallGetTask(&args, &reply)
log.Printf("recv get task reply: %v\n", reply)
if !ok || reply.Type == STOP {
break
}
// handle map fynction
switch reply.Type {
case MAP:
if len(reply.Filenames) < 1 {
log.Fatalf("don't have filename")
}
DoMAP(reply.Filenames[0], reply.Task_no, reply.NReduce, mapf)
// map complete, send msg to master
finish_args := FinishTaskArgs{
Type: MAP,
Task_no: reply.Task_no,
}
finish_reply := FinishTaskReply{}
log.Printf("finish request: %v\n", finish_args)
CallFinishTask(&finish_args, &finish_reply)
log.Printf("recv finish reply: %v\n", finish_reply)
// time.Sleep(time.Second)
case REDUCE:
if len(reply.Filenames) < 1 {
log.Fatalf("don't have filenames")
}
DoReduce(reply.Filenames, reply.Task_no, reducef)
// reduce complete, send msg to master
finish_args := FinishTaskArgs{
Type: REDUCE,
Task_no: reply.Task_no,
}
finish_reply := FinishTaskReply{}
log.Printf("finish request: %v\n", finish_args)
CallFinishTask(&finish_args, &finish_reply)
log.Printf("recv finish reply: %v\n", finish_reply)
// time.Sleep(time.Second)
case WAIT:
log.Printf("wait task\n")
time.Sleep(time.Second)
default:
time.Sleep(time.Second)
}
}
}
其中分MAP、REDUCE、WAIT和STOP四个状态:
- MAP:进行MAP操作
- REDUCE:进行REDECE操作
- WAIT:等待其他worker完成任务(比如等待在总体MAP任务的收尾上,以及没有更多的MAP任务可以分配了)
- STOP:worker停止、退出
其中最重要的为map和reduce任务的执行。
map任务的执行实现代码如下:(对应上图中的2、3、4步)
func DoMAP(filename string, task_no int, nReduce int, mapf func(string, string) []KeyValue) {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
sort.Sort(ByKey(kva))
log.Println("encode to json")
files := make([]*os.File, nReduce)
encoders := make([]*json.Encoder, nReduce)
for i := 0; i < nReduce; i++ {
ofile, err := ioutil.TempFile("", "mr-tmp*")
if err != nil {
log.Fatalf("cannot create temp file")
}
defer ofile.Close()
encoder := json.NewEncoder(ofile)
encoders[i] = encoder
files[i] = ofile
}
var index int
for _, kv := range kva {
index = ihash(kv.Key) % nReduce
err = encoders[index].Encode(&kv)
if err != nil {
log.Fatalf("cannot encode %v", kv)
}
}
// atomically rename
for i := 0; i < nReduce; i++ {
filename_tmp := fmt.Sprintf("mr-%d-%d", task_no, i)
err := os.Rename(files[i].Name(), filename_tmp)
if err != nil {
log.Fatalf("cannot rename %v to %v", files[i].Name(), filename_tmp)
}
}
}
比较有意思的是map需要通过一个hash函数将相同的条目分布在同一输出文件中:
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
var index int
for _, kv := range kva {
index = ihash(kv.Key) % nReduce
err = encoders[index].Encode(&kv)
if err != nil {
log.Fatalf("cannot encode %v", kv)
}
}
reduce任务的执行实现代码如下:(对应上图中的5、6步)
func DoReduce(filenames []string, task_no int, reducef func(string, []string) string) {
// read data from mid-file
kva := make([]KeyValue, 0)
for _, filename := range filenames {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
sort.Sort(ByKey(kva))
// call Reduce on each distinct key in kva[],
// and print the result to mr-out-0.
ofile, err := ioutil.TempFile("", "mr-out-tmp*")
if err != nil {
log.Fatalf("cannot create temp file")
}
defer ofile.Close()
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
i = j
}
output_filename := fmt.Sprintf("mr-out-%d", task_no)
err = os.Rename(ofile.Name(), output_filename)
if err != nil {
log.Fatalf("cannot rename %v to %v", ofile.Name(), output_filename)
}
}
按道理应该是要在GFS上读写文件的,条件不允许,就直接采用UNIX的文件系统了。
4 master设计
master的设计还是比较简单的,只包含很少的信息:
type Coordinator struct {
tasks []Task
nReduce int
nMap int
status CoordinatorStatus
mu sync.Mutex
}
对所需要进行的任务信息进行定义,如下:
type TaskStatus int
const (
idle TaskStatus = iota
in_progress
completed
)
type Task struct {
tno int
filenames []string
status TaskStatus
startTime time.Time
}
其主要就是接受worker的两个RPC请求。
获取任务的RPC handler实现如下:
- 对于长时间(10s)未完成的任务,重新制定一个worker执行此任务。
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
finish_flag := c.IsAllFinish()
if finish_flag {
c.NextPhase()
}
for i := 0; i < len(c.tasks); i++ {
if c.tasks[i].status == idle {
log.Printf("send task %d to worker\n", i)
reply.Err = SuccessCode
reply.Task_no = i
reply.Filenames = c.tasks[i].filenames
if c.status == MAP_PHASE {
reply.Type = MAP
reply.NReduce = c.nReduce
} else if c.status == REDUCE_PHASE {
reply.NReduce = 0
reply.Type = REDUCE
} else {
log.Fatal("unexpected status")
}
c.tasks[i].startTime = time.Now()
c.tasks[i].status = in_progress
return nil
} else if c.tasks[i].status == in_progress {
curr := time.Now()
if curr.Sub(c.tasks[i].startTime) > time.Second*10 {
log.Printf("resend task %d to worker\n", i)
reply.Err = SuccessCode
reply.Task_no = i
reply.Filenames = c.tasks[i].filenames
if c.status == MAP_PHASE {
reply.Type = MAP
reply.NReduce = c.nReduce
} else if c.status == REDUCE_PHASE {
reply.NReduce = 0
reply.Type = REDUCE
} else {
log.Fatal("unexpected status")
}
c.tasks[i].startTime = time.Now()
return nil
}
}
}
reply.Err = SuccessCode
reply.Type = WAIT
return nil
}
完成任务的RPC handler实现如下:
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.Task_no >= len(c.tasks) || args.Task_no < 0 {
reply.Err = ParaErrCode
return nil
}
c.tasks[args.Task_no].status = completed
if c.IsAllFinish() {
c.NextPhase()
}
return nil
}
检查全部任务是否完成,完成就进入下一个阶段:
func (c *Coordinator) IsAllFinish() bool {
for i := len(c.tasks) - 1; i >= 0; i-- {
if c.tasks[i].status != completed {
return false
}
}
return true
}
func (c *Coordinator) NextPhase() {
if c.status == MAP_PHASE {
log.Println("change to REDUCE_PHASE")
c.MakeReduceTasks()
c.status = REDUCE_PHASE
} else if c.status == REDUCE_PHASE {
log.Println("change to FINISH_PHASE")
c.status = FINISH_PHASE
} else {
log.Println("unexpected status change!")
}
}
客户端查看MapReduce任务是否完成:
func (c *Coordinator) Done() bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.status == FINISH_PHASE {
return true
}
return false
}
5 客户端如何使用呢?
写两个函数(Map和Reduce)就行啦:
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
6 附录
详细代码可以参考:
标签:status,极简,log,err,kva,实现,MapReduce,reply,string 来源: https://www.cnblogs.com/cxl-/p/16425143.html