mit6.824-lab1
; 2789 words
目录
快速开始
https://pdos.csail.mit.edu/6.824/notes/l01.txt
http://nil.csail.mit.edu/6.824/2022/labs/lab-mr.html
Go version:http://nil.csail.mit.edu/6.824/2022/labs/go.html
参考思路:https://blog.csdn.net/weixin_45938441/article/details/124018485
实现代码:https://github.com/Zerlina-ysl/6.824
流程图
主要任务
-
coordinator
- 检查worker执行是否超时,超时后把任务给其他worker执行
-
worker
- 向coordinator请求task
- 读取task输入
- 执行任务
- 将输出写回到文件
流程解析
- coordinator
- 初始化
- 读取file,并通过file长度创建task,类型为map
- 分配task给worker执行
- 监控当前阶段,如果file被读取完,且所有map task的状态都为已完成,则整个阶段到Reduce阶段
- 读取中间文件mr-X-Y,根据nReduce创建Task,分配task给reduce执行
- 监控当前阶段,如果所有reduce task的状态已完成,整个阶段达到done阶段,程序退出
- worker
- 通过rpc通信接收map执行请求,执行map,输出到mr-X-Y, X是map task id,Y是reduce task id.其中Y由ihash(key)%nReduce得到
- 通过rpc通信接收reduce执行请求,执行reduce,输出到mr-out-X, X for each reduce task.
lab1是否通过是要运行src/main/test-mr.sh的脚本。
$TIMEOUT ../mrcoordinator ../pg*txt &
pid=$!
# give the coordinator time to create the sockets.
sleep 1
# 启动三个worker进程
$TIMEOUT ../mrworker ../../mrapps/wc.so &
$TIMEOUT ../mrworker ../../mrapps/wc.so &
$TIMEOUT ../mrworker ../../mrapps/wc.so &
# 阻塞coordinator进程,无法响应worker进程
# wait for the coordinator to exit.
wait $pid
# since workers are required to exit when a job is completely finished,
# and not before, that means the job has finished.
sort mr-out* | grep . > mr-wc-all
if cmp mr-wc-all mr-correct-wc.txt
then
echo '---' wc test: PASS
mrcoordinator实现的功能是启动coordinator,并时刻监控任务是否结束;mrworker实现了map和reduce功能,并传入Worker。
分析脚本和已知功能可以发现,测试过程中是启动了一个coordinator进程和三个worker进程,而后分析mr-out*的数据是否符合预期。
mrcoordinator.go的主要函数是MakeCoordinator,mrworker.go的主要函数是Worker。因此在MakeCoordinator里初始化coordinator,并初始化map任务,在Worker里通过rpc请求任务并执行。
实现细节
-
rpc通信需要通信什么?任务分配,coordinator维护chan,每次从chan中取一个任务,并返回给worker。
- worker->coordinator: asking for a task
- coordinator->worker: respond with the file name of an as-yet-unstarted map task
-
rpc通信原理
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
rpc.Register(c)把整个 Coordinator 对象注册为 RPC 服务,所有符合签名格式的方法func (c *Coordinator) AssignTask(req *Task, reply *Task) error都会被注册。rpc.HandleHTTP()将 RPC 服务绑定到 HTTP 协议上,可以通过 HTTP 请求来发起 RPC 调用。通过 HTTP 发起请求时,Go 会根据路径识别这是不是 RPC 请求并做相应处理。sockname := coordinatorSock()和os.Remove(sockname)使用 Unix 域套接字(Unix Domain Socket),先删除旧的 socket 文件,避免重复创建失败,然后监听这个 socket。l, e := net.Listen("unix", sockname)启动监听器,监听 Unix 域 socket。go http.Serve(l, nil)启动 HTTP 服务器,处理来自 Worker 的 RPC 请求。每个请求会自动路由到对应的 RPC 方法。
实际调用会构造请求发送到 Coordinator 的 Unix socket。:
c.Call(rpcname, args, reply)
worker使用:
ok := call("Coordinator.AssignTask", task, task)
coordinator使用:
func (c *Coordinator) AssignTask(req *Task, reply *Task) error {
...
}
HTTP 服务接收到请求后,查找注册过的 RPC 方法。找到 Coordinator.AssignTask(…) 方法。反射调用该方法,并将结果返回给 Worker
+------------------+ +------------------+
| Worker | | Coordinator |
| rpc call | |
| call(...) ------------->| HandleRequest |
| <---------| AssignTask() |
| response | |
+------------------+ +------------------+
↑
| 使用 Unix Domain Socket 通信
↓
+------------------+
| HTTP Server |
| Serve(l, nil) |
+------------------+
↑
| 使用 net/rpc 处理
↓
+------------------+
| RPC Registry |
| Register(c) |
+------------------+
脚本测试过程
测试共包含8个,使用test-mr.sh测试,其中test-mr-many.sh脚本支持对test-mr.sh的多次测试,且每次测试存在超时时间,避免某个测试出现故障,不停挂起重试,crash测试就会导致这个问题。
reduce parallelism test
该test使用了rtiming.go的功能,主要是对reduce并发功能的测试.
myfilename := fmt.Sprintf("mr-worker-%s-%d", phase, pid)
err := ioutil.WriteFile(myfilename, []byte("x"), 0666)
for _, name := range names {
var xpid int
pat := fmt.Sprintf("mr-worker-%s-%%d", phase)
n, err := fmt.Sscanf(name, pat, &xpid)
if n == 1 && err == nil {
err := syscall.Kill(xpid, 0)
if err == nil {
// if err == nil, xpid is alive.
ret += 1
}
}
}
time.Sleep(1 * time.Second)
测试原理:每个进程运行时创建一个以自己 PID 命名的临时文件,扫描当前目录下的所有文件名,对每个匹配到的 PID发送空信号来判断该进程是否存活,如果存活,则计数器 ret 加一,通过ret来判断当前并发worker数量,ret 表示当前与本 Worker 同时运行的其他 Worker 数量,ret值为2说明当前有两个worker并发执行。 有一个细节粒度需要注意,这里检查并发的时间维度是1s,如果每次获取task的时间也是1s一次,就有可能并发测试失败,适当调低获取task的时间间隔即可。
crash test
max := big.NewInt(1000)
rr, _ := crand.Int(crand.Reader, max)
if rr.Int64() < 330 {
// crash!
os.Exit(1)
} else if rr.Int64() < 660 {
// delay for a while.
maxms := big.NewInt(10 * 1000)
ms, _ := crand.Int(crand.Reader, maxms)
time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
}
测试原理:通过随机数生成,模拟概率性错误,如1/3的概率crash,1/3的概率会延迟。需要处理好这两种情况。
For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.
文章这两条和crash测试有关,即如果coordinator等worker等10s,还是没结果,就可以当这个worker废了,调用下一个worker,为了保证调用的原子性,就可以把废worker的文件产出重命名一下。
这里map阶段会把file处理后输出到多个文件,因此考虑改造为原子性操作。最初思路使用map记录任务涉及到的文件和kv对,而后遍历map实现原子性,但这个思路是错的,因为遍历map的过程并非原子操作,因此考虑把task的产出输出到临时文件,而后对文件的名称直接重命名即可。
但实际开发的时候我并没有按照提示中使用临时文件的思路实现,假如任务在map阶段crash,就会一直是执行状态,corrdinator发现任务超时后就会取消并重试任务,任务再次执行就会新建文件覆盖原始中间文件,因此无需考虑中间文件问题,只要保证每次crash后的任务被重新拉起执行即可。
这里多线程环境,但凡共享资源都要涉及锁,控制休眠时间以及锁的粒度,可以一定程度提高性能,如果出现死锁,大概率是踩了go冰并发安全的坑。
感想
这个项目从大三开始学习,现在工作两年了,才算彻底只靠自己把最简单的lab1给完成😂收获颇丰啊,成就感也很强,熬了好几个工作日的大夜。- ai是最好的工具!不仅是ai能够根据代码或思路提供的信息,且与ai交流的过程也是自己捋清楚思路的过程.
- lab1其实最考验设计能力,大的设计框架搭建好之后,小的细节根据日志慢慢就调出来了。如果实在没有设计思路也没关系,跟着网上已经跑通的成功思路复刻一遍,捋清楚后,再尝试按照相同思路自己实现一遍。
- 路漫漫而修远之,吾将上下而求索。继续lab2!