ᕕ( ᐛ )ᕗ xiaoli's blog

mit6.824-lab1

; 2789 words  

目录

快速开始

https://pdos.csail.mit.edu/6.824/notes/l01.txt

http://nil.csail.mit.edu/6.824/2022/labs/lab-mr.html

Go version:http://nil.csail.mit.edu/6.824/2022/labs/go.html

参考思路:https://blog.csdn.net/weixin_45938441/article/details/124018485

实现代码:https://github.com/Zerlina-ysl/6.824

流程图


主要任务

流程解析

lab1是否通过是要运行src/main/test-mr.sh的脚本。


$TIMEOUT ../mrcoordinator ../pg*txt &
pid=$!

# give the coordinator time to create the sockets.
sleep 1

# 启动三个worker进程
$TIMEOUT ../mrworker ../../mrapps/wc.so &
$TIMEOUT ../mrworker ../../mrapps/wc.so &
$TIMEOUT ../mrworker ../../mrapps/wc.so &

# 阻塞coordinator进程,无法响应worker进程
# wait for the coordinator to exit.
wait $pid

# since workers are required to exit when a job is completely finished,
# and not before, that means the job has finished.
sort mr-out* | grep . > mr-wc-all
if cmp mr-wc-all mr-correct-wc.txt
then
  echo '---' wc test: PASS

mrcoordinator实现的功能是启动coordinator,并时刻监控任务是否结束;mrworker实现了map和reduce功能,并传入Worker。
分析脚本和已知功能可以发现,测试过程中是启动了一个coordinator进程和三个worker进程,而后分析mr-out*的数据是否符合预期。
mrcoordinator.go的主要函数是MakeCoordinatormrworker.go的主要函数是Worker。因此在MakeCoordinator里初始化coordinator,并初始化map任务,在Worker里通过rpc请求任务并执行。

实现细节

  1. rpc通信需要通信什么?任务分配,coordinator维护chan,每次从chan中取一个任务,并返回给worker。

    1. worker->coordinator: asking for a task
    2. coordinator->worker: respond with the file name of an as-yet-unstarted map task
  2. rpc通信原理

func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

实际调用会构造请求发送到 Coordinator 的 Unix socket。:

c.Call(rpcname, args, reply)

worker使用:
ok := call("Coordinator.AssignTask", task, task)

coordinator使用:
func (c *Coordinator) AssignTask(req *Task, reply *Task) error {
    ...
}

HTTP 服务接收到请求后,查找注册过的 RPC 方法。找到 Coordinator.AssignTask(…) 方法。反射调用该方法,并将结果返回给 Worker

+------------------+        +------------------+
|     Worker       |        |   Coordinator    |
|                 rpc call |                  |
|   call(...) ------------->|   HandleRequest  |
|                 <---------|   AssignTask()   |
|                  response |                  |
+------------------+        +------------------+
           ↑
           | 使用 Unix Domain Socket 通信
           ↓
+------------------+
|     HTTP Server  |
|   Serve(l, nil)  |
+------------------+
           ↑
           | 使用 net/rpc 处理 
           ↓
+------------------+
|     RPC Registry |
|   Register(c)    |
+------------------+

脚本测试过程

测试共包含8个,使用test-mr.sh测试,其中test-mr-many.sh脚本支持对test-mr.sh的多次测试,且每次测试存在超时时间,避免某个测试出现故障,不停挂起重试,crash测试就会导致这个问题。

reduce parallelism test

该test使用了rtiming.go的功能,主要是对reduce并发功能的测试.


	myfilename := fmt.Sprintf("mr-worker-%s-%d", phase, pid)
	err := ioutil.WriteFile(myfilename, []byte("x"), 0666)
	for _, name := range names {
		var xpid int
		pat := fmt.Sprintf("mr-worker-%s-%%d", phase)
		n, err := fmt.Sscanf(name, pat, &xpid)
		if n == 1 && err == nil {
			err := syscall.Kill(xpid, 0)
			if err == nil {
				// if err == nil, xpid is alive.
				ret += 1
			}
		}
	}
  time.Sleep(1 * time.Second)

测试原理:每个进程运行时创建一个以自己 PID 命名的临时文件,扫描当前目录下的所有文件名,对每个匹配到的 PID发送空信号来判断该进程是否存活,如果存活,则计数器 ret 加一,通过ret来判断当前并发worker数量,ret 表示当前与本 Worker 同时运行的其他 Worker 数量,ret值为2说明当前有两个worker并发执行。 有一个细节粒度需要注意,这里检查并发的时间维度是1s,如果每次获取task的时间也是1s一次,就有可能并发测试失败,适当调低获取task的时间间隔即可。

crash test

	max := big.NewInt(1000)
	rr, _ := crand.Int(crand.Reader, max)
	if rr.Int64() < 330 {
		// crash!
		os.Exit(1)
	} else if rr.Int64() < 660 {
		// delay for a while.
		maxms := big.NewInt(10 * 1000)
		ms, _ := crand.Int(crand.Reader, maxms)
		time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
	}

测试原理:通过随机数生成,模拟概率性错误,如1/3的概率crash,1/3的概率会延迟。需要处理好这两种情况。

For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

文章这两条和crash测试有关,即如果coordinator等worker等10s,还是没结果,就可以当这个worker废了,调用下一个worker,为了保证调用的原子性,就可以把废worker的文件产出重命名一下。
这里map阶段会把file处理后输出到多个文件,因此考虑改造为原子性操作。最初思路使用map记录任务涉及到的文件和kv对,而后遍历map实现原子性,但这个思路是错的,因为遍历map的过程并非原子操作,因此考虑把task的产出输出到临时文件,而后对文件的名称直接重命名即可。
但实际开发的时候我并没有按照提示中使用临时文件的思路实现,假如任务在map阶段crash,就会一直是执行状态,corrdinator发现任务超时后就会取消并重试任务,任务再次执行就会新建文件覆盖原始中间文件,因此无需考虑中间文件问题,只要保证每次crash后的任务被重新拉起执行即可。
这里多线程环境,但凡共享资源都要涉及锁,控制休眠时间以及锁的粒度,可以一定程度提高性能,如果出现死锁,大概率是踩了go冰并发安全的坑。

感想

这个项目从大三开始学习,现在工作两年了,才算彻底只靠自己把最简单的lab1给完成😂收获颇丰啊,成就感也很强,熬了好几个工作日的大夜。

  1. ai是最好的工具!不仅是ai能够根据代码或思路提供的信息,且与ai交流的过程也是自己捋清楚思路的过程.
  2. lab1其实最考验设计能力,大的设计框架搭建好之后,小的细节根据日志慢慢就调出来了。如果实在没有设计思路也没关系,跟着网上已经跑通的成功思路复刻一遍,捋清楚后,再尝试按照相同思路自己实现一遍。
  3. 路漫漫而修远之,吾将上下而求索。继续lab2!

#mit6.824