MIT yyds,真的爱了爱了

MIT 6.824真的是我拖了很久都一直想做的Project,以前本科的时候做完CSAPP以后,就挖坑做MIT 6.828,另一个目标就是做6.824。当时奈何Go并没怎么学,所以一直拖着连Lab1都没有认真做完。终于弄完毕业论文,找完工作以后这段摸鱼时光,想起年少的理想还没实现,于是赶紧打论文,看lecture,参考了一堆资料,开始写Project

准备

首先是阅读论文,2020版的作业相比以前,需要自己实现RPC部分,因此相对于以前只需要实现MapF和ReduceF,首先需要设计好RPC交互。

MapReduce流程

RPC主要是Master和Worker之间的通信,Master负责调度任务,Worker的注册以及将任务给Worker进行运行。Worker首先上线以后会去Master注册,获取workID,之后不断的获取任务,执行任务,反馈任务执行信息。如果Worker执行某个Task失败,会通知给Master,让其找其他可以运行的Worker重新执行该Task。

Worker

在复盘Lab 1时,感觉先写Worker会比较好理解,Master负责调度以及使用Worker来进行工作。Worker的几个工作相对好理解。对于MapReduce来说,Worker是不需要考虑map函数和reduce函数具体细节,更多是关注如何将map的结果写到本地磁盘,以及reduce函数如何读取数据进行合并输出结果。

注册

当worker上线后,第一步是发消息给Master进行注册自己的信息。

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	w := worker{}
	w.mapf = mapf
	w.reducef = reducef

	// uncomment to send the Example RPC to the master.
	//CallExample()

	w.register()
	w.run()
}

刚才说了,Master负责全部资源的调度和任务分配,因此Worker去注册自己的身份,主要是为了获得自己的workerID,这样后续的相应操作Master都可以知道是哪个Worker进行的请求。

func (w *worker) register() {
	args := &RegArgs{}
	reply := &RegReply{}

	if ok := call("Master.RegWorker", &args, &reply); !ok{
		log.Fatal("Register worker fail.")
	}
	w.id = reply.WorkerId
	display(fmt.Sprintf("worker %+v register\n", w.id))
}

请求任务

Worker向Master请求,获取当前需要执行的任务信息。

func (w *worker) reqTask() Task {
	args := TaskArgs{}
	args.WorkerId = w.id
	reply := ReqTaskReply{}

	if ok := call("Master.ReqTask", &args, &reply); !ok {
		log.Fatal("request for task fail...")
	}

	return *reply.Task
}

运行阶段

Worker在运行的阶段,实际上是一直轮询,从Master获取需要执行的任务,直到所有任务都结束。

func (w *worker) run() {
	for {
		t := w.reqTask()
		if !t.Alive {
			display(fmt.Sprintf("worker get task not alive, worker %d exit..\n", w.id))
			return
		}
		w.doTask(t)
	}
}

执行任务

之前的图中可以看到,Worker实际上也是有两个阶段的,一个是Map阶段,一个是Reduce阶段。每个任务不同阶段,因此在执行的时候,要根据任务的阶段执行不同的操作。

Map任务与Reduce任务

Map阶段使用mapf对内容构建出键值对,将所有的kvs通过hash放到中间文件中。按照Task的任务量(lab里也就是文件数量)和Reduce块数构建成中间文件矩阵。

Reduce阶段分为两部分,首先是读取文件,将本地的中间文件读取出来,之后进行reduce操作,其结果最后会写到本地的分块文件中。

Master

Master的职责就比较多,自顶向下来看,MakeMaster 是Master的入口,这里包含初始化Master节点,初始化Map任务相关状态然后启动Master的服务。

对于刚开始的任务,是处于Map阶段,从任务状态里将所有的Task添加到Task队列中,方便Worker获取Task进行执行。在运行阶段,通过计算任务执行之间来进行超时控制,当超过一段时间并没完成后,会将该任务重新塞回任务队列。通过不断的循环,当所有任务的Map阶段都完成好后,转为Reduce阶段。

// schedule task
func (m *Master) schedule() {
	//fmt.Println("Processing ...")

	// check all task status
	flag := true

	m.mu.Lock()
	defer m.mu.Unlock()

	if m.done {
		return
	}

	for id, taskStat := range m.taskStats {
		//fmt.Printf("Task %d : %v", id, taskStat.Status)
		switch taskStat.Status {
		case TaskStatusReady:
			flag = false
			m.addTask(id)
		case TaskStatusQueue:
			flag = false
		case TaskStatusRunning:
			flag = false
			m.checkBreak(id)
		case TaskStatusFinished:
		//	只有全部任务都是完成,才不会对flag变量进行修改,
		//	进入reduce环节
		case TaskStatusErr:
			flag = false
			m.addTask(id)
		default:
			panic("Tasks status schedule error...")
		}
	}

	if flag {
		if m.taskPhase == MapPhase {
			m.initReduceTasks()
		} else {
			m.done = true
		}
	}
}

当Worker向Master请求任务后,从任务队列中获取一个Task,然后将该Task的相关信息传给Worker,并且在Master处注册该任务,也就是修改维护的任务状态信息。Worker在执行的过程中,也会不断反馈任务相关执行状态。

Ref

MapReduce: Simplified Data Processing on Large Clusters

MIT 6.824 Lab1 MapReduce 2020 攻略

6.824 Spring 2020 – Lab 1: MapReduce

2020 Spring 6.824 Lab1: MapReduce笔记

MIT 6.824 分布式系统 Lab 1:MapReduce