MIT 6.824 分布式系统

准备

Ubuntu 20LTS
Go 1.7
课程主页

配置环境的时候遇到了一个小问题:go build命令报错,说是../mr下路径出错,找到的解决办法是在拉下来的代码的根目录(与src同层级运行go init mod 6.824,并将main/下的所有import ../mr 改为6.824/src/mr)

实现过程

Lab1的主要任务就是通过Golang来实现一个MapReduce的算法,最重要的就是读论文(MapReduce中文版),对于论文中的架构图要了然于心:
20211006181825
20211222155939
其次是看任务书
我们的代码主要写在src/mr目录下的几个文件,这几个文件由src/main目录下两个文件mrmaster.go, mrworker.go调用,这两个文件的作用是启动进程、加载map, reduce动态库,并进入定义在src/mr目录下的主流程。

定义数据结构

对于Master来说,要响应来自Worker的请求,包括任务请求、完成请求,那就需要存储Map、Reduce的任务数,Map任务队列、Reduce队列任务等,代码如下:

type Master struct {
	// Your definitions here.
	NumMap            int    //Map任务数
	NumMapFinished    int    //已完成Map任务数
	NumReduce         int    //Reduce任务数
	NumReduceFinished int    //已完成Reduce任务数
	mu                sync.Mutex

	MapTasks    []MapReduceTask  //存储所有map任务
	ReduceTasks []MapReduceTask  //存储所有reduce任务

	MapFinish    bool
	ReduceFinish bool
}

由于进程之间不能直接相互访问对方的变量,必须通过一定的进程间通信机制才能实现,该实验使用的进程间通信是rpc。那我们就可以将任务类型定义在RPC中,并将RPC请求和响应定义为携带任务的结构。最关键的就是对应的类型/状态/编号。同时利用MapFile和ReduceFiles字段来提供对应任务的输入。

// Add your RPC definitions here.
//rpc MR任务 通信消息格式
type MapReduceTask struct{
	TaskType string // 任务类型:Map / Reduce / Wait
	TaskStatus string //任务状态 : Unassigned / Assigned / Finished
	TaskNum int  //任务编号

	MapFile string //Map任务的输入
	ReduceFiles []string //Reduce任务的输入

	NumReduce int
	NumMap int
}

//RPC 请求
type MapReduceArgs struct {
	MessageType string // request / finish
	Task        MapReduceTask
}

// RPC响应
type MapReduceReply struct {
	Task MapReduceTask
}

Master实现

在mrmaster中,对输入参数检查后调用了MakeMaster方法进行初始化操作,传入了文件名数组以及reduce桶的数量。在mr/master中先实现这个方法。

//
// create a Master.
// main/mrmaster.go calls this function.
// NumReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, NumReduce int) *Master {
	m := Master{}

	// Your code here.
	//初始化master
	m.NumMap = len(files)
	m.NumReduce = NumReduce
	m.MapFinish = false
	m.ReduceFinish = false
	for index, file := range files {
		var tempTask MapReduceTask
		tempTask.NumMap = m.NumMap
		tempTask.NumReduce = m.NumReduce
		tempTask.TaskType = "Map"
		tempTask.TaskStatus = "Unassigned"
		tempTask.TaskNum = index
		tempTask.MapFile = file
		m.MapTasks = append(m.MapTasks, tempTask)
	}
	for i := 0; i < m.NumReduce; i++ {
		var tempTask MapReduceTask
		tempTask.NumMap = m.NumMap
		tempTask.NumReduce = m.NumReduce
		tempTask.TaskType = "Reduce"
		tempTask.TaskStatus = "Unassigned"
		tempTask.TaskNum = i
		for j := 0; j < m.NumMap; j++ {
			tempTask.ReduceFiles = append(tempTask.ReduceFiles, intermediateFilename(j, i))
		}
		m.ReduceTasks = append(m.ReduceTasks, tempTask)
	}


	m.server()
	return &m
}

同时mrmaster中会调用master的Done方法来检查任务是否完成,我们也重写一下,返回reduce是否完成:

func (m *Master) Done() bool {

	// Your code here.
	return m.ReduceFinish
}

接下来就是关键的处理Worker请求的实现了:
为防止多个Worker进行同时的修改,这里采用了一个非常暴力的大锁,在MapReduceHandler()的最开始对Master上锁,然后利用defer在函数退出之后进行锁的释放,保证每次只有一个MapReduceHandler()对Master当中的结构进行修改,确保对并发可以正确处理。

// Your code here -- RPC handlers for the worker to call.
func (m *Master) MapReduceHandler(args *MapReduceArgs, reply *MapReduceReply) error {
	m.mu.Lock()
	defer m.mu.Unlock()
	//根据消息类型确定请求类型
	if args.MessageType == "request" {
		//Map未完成时
		if !m.MapFinish {
			//寻找未分配的任务
			for index, task := range m.MapTasks {
				if task.TaskStatus == "Unassigned" {
					m.MapTasks[index].TaskStatus = "Assigned"
					reply.Task = m.MapTasks[index]
					go m.checkTimeout("Map", index, 10)
					return nil
				}
			}
			//无则worker等待
			reply.Task.TaskType = "Wait"
			return nil
		} else if !m.ReduceFinish {
			//map任务已全部完成,则分配reduce任务
			for index, task := range m.ReduceTasks {
				if task.TaskStatus == "Unassigned" {
					m.ReduceTasks[index].TaskStatus = "Assigned"
					reply.Task = m.ReduceTasks[index]
					go m.checkTimeout("Reduce", index, 10)
					return nil
				}
			}
			//无则worker等待
			reply.Task.TaskType = "Wait"
			return nil
		} else {
			return nil
		}
	} else if args.MessageType == "finish" {
		if args.Task.TaskType == "Map" {
			m.MapTasks[args.Task.TaskNum].TaskStatus = "Finished"
			m.NumMapFinished = m.NumMapFinished + 1
			if m.NumMapFinished == m.NumMap {
				m.MapFinish = true
			}
		} else {
			m.ReduceTasks[args.Task.TaskNum].TaskStatus = "Finished"
			m.NumReduceFinished = m.NumReduceFinished + 1
			if m.NumReduceFinished == m.NumReduce {
				m.ReduceFinish = true
			}
		}
		return nil
	}
	return nil
}

其中,checkTimeout是用来处理Worker crash的情况的。如果某个任务在10S内后状态还是已分配,则认为该Worker 任务超时,crash掉了,随即将该任务标为未分配状态,分配给其他worker,代码如下:

func (m *Master) checkTimeout(taskType string, num int, timeout int) {
	time.Sleep(time.Second * time.Duration(timeout))
	m.mu.Lock()
	defer m.mu.Unlock()
	//任务超时则回收任务,下次分配给其他worker
	if taskType == "Map" {
		if m.MapTasks[num].TaskStatus == "Assigned" {
			m.MapTasks[num].TaskStatus = "Unassigned"
		}
	} else {
		if m.ReduceTasks[num].TaskStatus == "Assigned" {
			m.ReduceTasks[num].TaskStatus = "Unassigned"
		}
	}
}

Worker

在mrworker中,会调用mr.Worker方法,我们在mr.worker中实现这个方法:

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for {
		args := MapReduceArgs{MessageType:"request"}
		reply := MapReduceReply{}

		resp := call("Master.MapReduceHandler",&args,&reply)

		if !resp {
			break
		}
		//判断Task任务类型
		switch reply.Task.TaskType {
		case "Map":
			mapTask(mapf,reply.Task)
		case "Reduce":
			reduceTask(reducef, reply.Task)
		case "Wait":
			waitTask()
		}
	}
}

首先构造请求和响应参数,通过call(实验给出的)来发起rpc请求,若无响应则退出,有则根据任务的类型执行相应的操作。

接下来实现mapTask方法:

//自定义mapTask处理
func mapTask(mapf func(string,string) []KeyValue,task MapReduceTask){
	filename := task.MapFile

	file,err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v",filename)
	}
	content,err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()

	//调用map函数获取map数组
	kva := mapf(filename,string(content))

	kvaa := make([][]KeyValue,task.NumReduce)

	//将key相同的kv对放入一个桶里
	for _, kv := range kva {
		idx := ihash(kv.Key) % task.NumReduce
		kvaa[idx] = append(kvaa[idx], kv)
	}

	//将桶里的map写入本地文件
	for i := 0;i < task.NumReduce;i++{
		storeIntermediateFile(kvaa[i], intermediateFilename(task.TaskNum, i))
	}

	defer finishTask(task)
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//按照规则生成中间文件名
func intermediateFilename(numMapTask int, numReduceTask int) string {
	return fmt.Sprintf("mr-%v-%v", numMapTask, numReduceTask)
}

//将桶里的map写入本地文件
func storeIntermediateFile(kva []KeyValue,filename string){
	file, err := os.Create(filename)
	defer file.Close()

	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	enc := json.NewEncoder(file)
	if err != nil {
		log.Fatal("cannot create encoder")
	}
	for _,kv := range kva{
		err := enc.Encode(&kv)
		if err != nil {
			log.Fatal("cannot encode")
		}
	}
}

//加载中间文件
func loadIntermediateFile(filename string) []KeyValue {
	var kva []KeyValue
	file, err := os.Open(filename)
	defer file.Close()

	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	dec := json.NewDecoder(file)
	for {
		kv := KeyValue{}
		if err := dec.Decode(&kv); err != nil {
			break
		}
		kva = append(kva, kv)
	}

	return kva
}

func finishTask(task MapReduceTask){
	args := MapReduceArgs{MessageType: "finish", Task: task}
	reply := MapReduceReply{}
	call("Master.MapReduceHandler", &args, &reply)
}

实现reduceTask方法:

//自定义reducetask
func reduceTask(reducef func(string, []string) string, task MapReduceTask) {
	var intermediate []KeyValue
	for _, filename := range task.ReduceFiles {
		intermediate = append(intermediate, loadIntermediateFile(filename)...)
	}
	sort.Sort(ByKey(intermediate))  //将相同的key排列到一起
	oname := fmt.Sprintf("mr-out-%v", task.TaskNum)
	ofile, _ := os.Create(oname)

	//借用mrsequential的部分,计算相同key的个数
	i := 0
	for i<len(intermediate){
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	ofile.Close()

	defer finishTask(task)
}

实现WaitTask方法:

func waitTask() {
	time.Sleep(time.Second)
}

各文件导入的包:

//master
package mr

import "log"
import "net"
import "os"
import "sync"
import "time"
import "net/rpc"
import "net/http"


//Worker
package mr

import "fmt"
import "os"
import "log"
import "time"
import "sort"
import "io/ioutil"
import "encoding/json"
import "net/rpc"
import "hash/fnv"

//RPC
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

测试

执行sh test-mr.sh的结果如下所示,Lab1的所有测试都完全通过。
20211222210104

Q.E.D.


励志成为年薪百块工程师