【MIT6.824】lab 1 MapReduce实现总结

MIT6.824是一门经典的分布式课程,课程链接:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html,对于lab 1我们需要在提供的代码框架的基础上补充coordinator和worker的代码,以实现分布式的MapReduce程序。

本人在借鉴了部分其他人的设计思想的基础上,独立完成了所有的代码,最后设计的实现能够通过所有的测试脚本。

实现的代码厂库:https://github.com/slipegg/MIT6.824/tree/main/6.5840

实现目标

在给定的代码框架中实现一个单词计数的MapReduce程序。原本的框架中已经给了一个在本地串行执行单词计数的独立程序,并提供了一个通过UNIX-domain sockets实现的RPC(RPC介绍),我们需要完成的部分有:

  1. 设计coordinator和worker之间交流的流程和格式,以方便worker向coordinator申请任务,coordinator将taks发送给worker,worker把task的完成情况返回给coordinator
  2. coordinator对Map类型的task和Reduce类型的task进行管理,需要初始化这些任务,需要记录任务完成的情况,并生成新的任务,直到全部完成
  3. worker如何完成Map类型以及Reduce类型task

总体设计

worker会不断向coordinator发送心跳,申请任务,拿到任务后进行map或者renduce类型的task的执行,在执行完毕后发送请求给coordinator以表示该任务完成了。当coordinator告诉其所有任务都完成时,他会结束运行

coordinator只维护task的状态不维护各个worker的状态。worker向其发送心跳申请任务时,coordinator会去遍历任务,取出还没有发送的任务或者过了太长时间都没有完成的任务返回回去,如果没有,就返回一个等待任务。coordinator接收到worker的某个任务完成的请求时会改变这个任务的状态,如果当前阶段所有的任务都完成了就转向下一个阶段,知道转到了所有MapReduce任务都完成的阶段。

整体流程如下图所示:

MapReduce流程

rpc信息传递设计

Heartbeat

worker通过rpc向coordinator发送心跳(Heartbeat)来申请任务。如下:

  • 关键结构体定义如下,HeartbeatRequest是个空结构,HeartbeatResponse承载了coordinator返回给worker的信息,这里的信息实际上是运行map类型和reduce类型的task所必须的信息的集合。所有的返回都需要JobType来标明其类型,需要id来标明其是哪个作业,对于map类型作业,其额外需要FilePath来获取任务的输入,还需要NReduce来决定输出的数量,对于reduce类型作业,其额外需要NMap来辅助获取map类型的中间输出。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    type HeartbeatRequest struct {
    }

    type HeartbeatResponse struct {
    FilePath string
    JobType JobType
    NReduce int
    NMap int
    Id int
    }
  • 调用请求如下,它会调用coordinator的heartbeat函数来处理,并将任务返回到response中。

    1
    call("Coordinator.Heartbeat", &HeartbeatRequest{}, &response)

Report

worker完成任务后通过rpc向coordinator发送回复。如下:

  • 关键结构体设计如下。ReportRequest通过phase和id来联合表示是哪个任务完成了。

    1
    2
    3
    4
    5
    6
    7
    8
    type ReportRequest struct {
    Id int
    Phase SchedulePhase
    }

    type ReportResponse struct {
    }

  • 调用请求如下,它会调用coordinator的Report函数来处理,来将该任务标记为运行结束。

    1
    call("Coordinator.Report", &ReportRequest{Id: id, Phase: phase}, &ReportResponse{})

coordinator设计

coordinator会衍生出2个额外的协程,一个负责给rpc注册,并响应rpc传来的函数调用请求,一个负责给worker选择task生成resopnse

rpc函数调用处理

给rpc注册的程序就是原本框架提供的代码,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

其相应的也是上面提到的hearbeat和report事件。比较有go特色的的地方在于如何等待另一个进程生成对应的回应,采用的是如下的代码:

1
2
3
4
5
6
7
8
9
10
11
type heartbeatMsg struct {
response *HeartbeatResponse
ok chan struct{}
}

func (c *Coordinator) Heartbeat(request *HeartbeatRequest, response *HeartbeatResponse) error {
msg := heartbeatMsg{response, make(chan struct{})}
c.heartbeatCh <- msg
<-msg.ok
return nil
}

构建msg将信息传递过去给c.heartbeach,然后等待msg.ok准备就绪,也就是response填好了数据,再返回。Report也是同理

task管理

在coordinator初始化时会生成一个schedule协程来负责task生成和管理

  • task有4种类型Map类型、Reduce类型、等待类型和完成类型,Map类型和Reduce类需要worker进行实际处理,等待类型只需要worker去sleep一段时间就好了,然后再去询问有没有新任务,完成类型的任务发送过来之后worker就可以结束运行了
  • task有3个状态,分别为等待、运行、完成,一开始初始化时为等待状态,交给worker运行后为运行状态,worker发送report回来说明自己运行完毕后为完成状态。
  • coordinator有三个阶段分别为Map阶段、Reduce阶段和Complete阶段,一开始为Map阶段,其需要处理Map类型的task,当Map类型的task全部完成后需要转变到Reduce阶段,处理Reduce类型的task,当Reduce类型的状态也全部完成后就转为Complete状态,可以结束运行了。

schedule代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *Coordinator) schedule() {
c.initMapPhase()

for {
select {
case msg := <-c.heartbeatCh:
isAllTaskDoneInPhase := c.selectANewTask(msg.response)
if isAllTaskDoneInPhase {
c.switchPhase()
c.selectTaskAfterSwitchPhase(msg.response)
}
log.Printf("Coordinator: Heartbeat response: %v\n", msg.response)
msg.ok <- struct{}{}

case msg := <-c.reportCh:
if msg.request.Phase == c.phase {
log.Printf("Coordinator: Worker has finished %v-task%d\n", c.phase, msg.request.Id)
c.tasks[msg.request.Id].status = Finished
}
msg.ok <- struct{}{}
}
}
}

生成一个新任务时需要去遍历查看是否有处于等待状态的任务或者是运行时间过久(说明worker可能已经挂掉了)的任务,然后将其分配出去,主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (c *Coordinator) selectANewTask(response *HeartbeatResponse) bool {
isAllTaskDone, isNewTaskScheduled := true, false

for id, task := range c.tasks {
switch task.status {
case Idle:
isAllTaskDone, isNewTaskScheduled = false, true
c.tasks[id].status, c.tasks[id].startTime = Working, time.Now()
c.scheduleTaskToResponse(id, response)

case Working:
isAllTaskDone = false
if time.Since(task.startTime) > MaxTaskRunInterval {
isNewTaskScheduled = true
c.tasks[id].startTime = time.Now()
c.scheduleTaskToResponse(id, response)
}

case Finished:
}

if isNewTaskScheduled {
break
}
}

if !isNewTaskScheduled && !isAllTaskDone {
response.JobType = WaitJob
}

return isAllTaskDone
}

当coordinator进行Complete阶段后其实并不会再去处理其他事情,比如给worker发送运行结束的指令,而是直接给doneCh赋值,然后以此退出运行

1
2
3
4
5
func (c *Coordinator) Done() bool {
<-c.doneCh
log.Printf("Coordinator: Done\n")
return true
}

worker 设计

运行流程

worker就是不断地发送heartbeat命令然后获取任务进行运行,直到接收到了Complete任务或者发送heartbeat失败,就可以结束运行了。如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

for {
response := doHeartbeat()
log.Printf("Worker: receive coordinator's response, new job is %v \n", response)

switch response.JobType {
case MapJob:
doMapTask(mapf, response)
case ReduceJob:
doReduceTask(reducef, response)
case WaitJob:
time.Sleep(1 * time.Second)
case CompleteJob:
return
default:
panic(fmt.Sprintf("worker get an unexpected jobType %v", response.JobType))
}
}
}

Map类型task

Map类型的task的处理如下所示,总体就是调用mapF统计文件中各个单词的数量,并记录到中间文件中,由于将中间结果写入到文件中是可以并行运行的,所以这里启动了多个协程来进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func doMapTask(mapF func(string, string) []KeyValue, response *HeartbeatResponse) {
wordCountList := getWordCountListOfFile(mapF, response.FilePath)

intermediate := splitWordCountListToReduceNNum(wordCountList, response.NReduce)

var writeIntermediateFilewg sync.WaitGroup
for reduceNumber, splitedWordCountList := range intermediate {
writeIntermediateFilewg.Add(1)
go func(reduceNumber int, splitedWordCountList []KeyValue) {
defer writeIntermediateFilewg.Done()
writeIntermediateFile(response.Id, reduceNumber, splitedWordCountList)
}(reduceNumber, splitedWordCountList)
}
writeIntermediateFilewg.Wait()

doReport(response.Id, MapPhase)
}

Reduce类型task

Reduce类型task的处理如下所示,总体就是把对应的中间文件读出来,将结果通过reduceF进行聚集,输出到最终的文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
func doReduceTask(reduceF func(string, []string) string, response *HeartbeatResponse) {
wordCountList := getWordCountListFromIntermediateFile(response.NMap, response.Id)

wordCountMap := gatherAndSortIntermediateWordCountList(wordCountList)

var buf bytes.Buffer
reducIntermediateWordCount(reduceF, wordCountMap, &buf)

fileName := generateReduceResultFileName(response.Id)
atomicWriteFile(fileName, &buf)

doReport(response.Id, ReducePhase)
}

原子写入文件

这里采用了一种原子写入的方式,以防止多个worker都需要写入同一个文件名的文件时可能出现的问题。总体思想就是先写入到一个临时文件中,然后再将其改名为对应的文件名,如果临时文件没有写成功,就用defer命令将其删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func atomicWriteFile(filename string, reader io.Reader) (err error) {
tmpFileName, err := writeToTmpFile(filename, reader)
if err != nil {
return fmt.Errorf("cannot write to temp file: %v", err)
}

if err := os.Rename(tmpFileName, filename); err != nil {
return fmt.Errorf("cannot rename temp file: %v", err)
}

return nil
}

func writeToTmpFile(filename string, reader io.Reader) (tmpFileName string, err error) {
dir, file := filepath.Split(filename)
if dir == "" {
dir = "."
}

tmpFile, err := os.CreateTemp(dir, file)
if err != nil {
return "", fmt.Errorf("cannot create temp file: %v", err)
}
defer tmpFile.Close()
defer func() {
if err != nil {
os.Remove(tmpFile.Name())
}
}()

_, err = io.Copy(tmpFile, reader)
if err != nil {
return "", fmt.Errorf("cannot write to temp file: %v", err)
}
if err := tmpFile.Close(); err != nil {
return "", fmt.Errorf("cannot close temp file: %v", err)
}

return tmpFile.Name(), nil
}

运行结果

coordinator运行结果:

coordinator运行结果

worker运行结果:

worker运行结果

中间文件:

中间文件

输出结果:

输出结果

测试脚本结果:

测试脚本结果

后记

最后进行脚本测试的时候发现early_exit这个点总是通不过,这个脚本会捕捉最早退出运行的进程,然后拷贝所有输出文件,然后再在所有进程都退出的时候拷贝所有输出文件,以此对比两个文件是否相同,来判断是否coordinator和所有的woker都在任务全部完成后再退出。

后面仔细查看不通过的原因发现是因为其依靠下面的部分来进行捕捉退出的进程,本机器上使用的是wait -n,但是实际查看发现其并没有正确地在相关进程退出时进行触发,而是一开始就触发了,其触发时coordinator和所有worker其实都还在前台运行了,后面讲这部分改成了if里面的测试,就可以正常捕捉退出的进程然后顺利通过了。

1
2
3
4
5
6
7
8
9
10
11
12
13
jobs &> /dev/null
if [[ "$OSTYPE" = "darwin"* ]]
then
# bash on the Mac doesn't have wait -n
while [ ! -e $DF ]
do
sleep 0.2
done
else
# the -n causes wait to wait for just one child process,
# rather than waiting for all to finish.
wait -n
fi

从看MapReduce论文到学go再到能看懂作业要求,再到能看懂别人写的代码,再到能自己独立完成这部分代码总共断断续续持续了一个月,能够感受到自己在这之中的不断的精进,MapReduce的设计确实也很巧妙,go总体的设计确实很适合分布式,MIT6.824确实不愧是一名深受好评的课,学一下是很有必要的,希望自己后面也能都将其他部分啃下来了。

参考链接

  1. https://github.com/OneSizeFitsQuorum/MIT6.824-2021
  2. https://github.com/PKUFlyingPig/MIT6.824
  3. https://github.com/szw2021/MIT6.824-2021/tree/practice/src

【MIT6.824】lab 1 MapReduce实现总结
http://example.com/2023/11/22/MIT6-824lab1/
作者
John Doe
发布于
2023年11月22日
许可协议