共计 3784 个字符,预计需要花费 10 分钟才能阅读完成。
距离上一次发文又间隔了一段时间,都有点忘记写到哪一部分了,先回顾一下之前写到哪里了
- 介绍ps-lite的基本概念 https://www.deeplearn.me/4302.html
- 介绍ps-lite核心组成 postOffice https://www.deeplearn.me/4303.html
- 介绍ps-lite 通信模块van https://www.deeplearn.me/4306.html
本篇文章主要讲解 customer。customer的角色主要干的事情其实是一个中介的事情,它主要负责server 和worker之间的一些信息传递。
我们先看看 customer 在哪里被创建使用?
在test/test_kv_app_multi_workers.cc文件中我们看下如何调用的,先看下server端调用
void StartServer() {
if (!IsServer()) return;
//重点是kvserver的初始化,传递的参数是app_id
auto server = new KVServer<float>(0);
server->set_request_handle(KVServerDefaultHandle<float>());
RegisterExitCallback([server](){ delete server; });
}
对于一个server来说app_id可以理解为是kv数据库的id,这个在之前有提到过,kvsever的初始化函数如下所示:
explicit KVServer(int app_id) : SimpleApp() {
using namespace std::placeholders;
obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
}
从上面就可以清晰的看到customer被创建的过程,直接调用customer初始化方法。这里有一点需要注意的是customer这个初始化参数第一个和第二个参数传递的都是一个值,如果是这样的话是不是意味着server端就一个customer?
这个其实是对比worker端而言,下面放一下woker端customer初始化对比
explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
using namespace std::placeholders;
slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
}
这么一对比就发现差异蛮大的,kvworker的初始化需要指定传递app_id和customer_id的,那么意味着app-id和customer_id是一对多的关系。
以上只是看一下customer在哪了被调用,以及在worker端和server端之间的差异。
Customer
构造函数
接下来就是看下customer的真面貌来!!!还是先看下customer的构造函数:
Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle)
: app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) {
Postoffice::Get()->AddCustomer(this);
recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
}
首先看下传参,app_id和customer_id就不多解释了,const Customer::RecvHandle& recv_handle 这个参数的含义是表示接受到消息之后要对消息做什么处理,一般情况下这个是由kvserver和kvworker初始化的时候传递给customer的,也就是说这些处理逻辑本身跟customer没啥关系,由实际需求方自己定义。
举个最简单的例子,server端拿到worker端梯度信息要去更新参数,这个处理逻辑就是 const Customer::RecvHandle& recv_handle 这个要干的事情。
构造函数做了哪些事情?
- 调用PostOffice::AddCustomer将当前Customer注册到PostOffice;
- PostOffice的customers成员: 在对应的app_id的元素上添加custom_id;
- PostOffice的barrier_done成员将该custom_id的同步状态设为false
- 新起一个接收消息的线程,不断的从外界接收消息并且根据实际的处理逻辑做出相应的处理。
消息处理
生产和消费模式
Customer的消息处理遵循“producer+consumer”模式,之前也提到customer只是个中介,那么它这个生产和消费是如何进行的?
生产
核心的API是Accept函数
inline void Accept(const Message& recved) {
recv_queue_.Push(recved);
}
recv_queue_就是一个队列,存储调用房发送过来的消息,对于customer而言上游发送过来的消息就是生产出来的内容,customer拿到这个也只是存起来。
消费
核心函数就是这个 Receiving 成员函数,负责消息的consumer
void Customer::Receiving() {
while (true) {
Message recv;
//从队列中取出相应的数据
recv_queue_.WaitAndPop(&recv);
//判断当前的消息control指令是不是结束的指令,如果是则直接结束
if (!recv.meta.control.empty() &&
recv.meta.control.cmd == Control::TERMINATE) {
break;
}
//调用的制定的处理函数来消费内容
recv_handle_(recv);
//判断当前的消息是请求和响应,如果是响应则修改tracker_数据
//tracker是负责核对消息完整性,比如你发送3个请求,那么收到3个响应才算完整,那么这个参数就是干这个事
if (!recv.meta.request) {
std::lock_guard<std::mutex> lk(tracker_mu_);
tracker_[recv.meta.timestamp].second++;
tracker_cond_.notify_all();
}
}
}
请求相关
比如kvworker需要去server拉取参数,那么就会经过customer来进行的消息请求,由customer新建请求去server拉取数据。
int Customer::NewRequest(int recver) {
std::lock_guard<std::mutex> lk(tracker_mu_);
//这里是获取需要通知的nodeid下的节点个数,比如worker向server请求,那么这就是所有需要参与处理server的节点数
int num = Postoffice::Get()->GetNodeIDs(recver).size();
//构建一个tracker_ 用于校对请求的完整星,这个在Receiving函数里也有体现
tracker_.push_back(std::make_pair(num, 0));
return tracker_.size() - 1;
}
上面也提到一个worker会从多个server拉取数据,那么什么时候才能保证数据都获取到呢,那么有的处理响应快,有的慢,怎么保证数据的完整性,那么下面的这个函数就是做这个事情
void Customer::WaitRequest(int timestamp) {
std::unique_lock<std::mutex> lk(tracker_mu_);
tracker_cond_.wait(lk, [this, timestamp]{
return tracker_[timestamp].first == tracker_[timestamp].second;
});
}
就是用tracker_cond_
阻塞等待“请求节点数”和“回复节点数”相等。而tracker_cond_
是在Customer::Receiving
每次接受到消息时通知一下。
注意这个阻塞是“单机阻塞”,适用于比如,worker必须在从server拉取完最新参数之后,才能开始下一轮的训练。由于pull是异步的,所以worker需要调用WaitRequest阻塞等待那个pull request完成。多机之间的阻塞同步,要用PostOffice:Barrier
。
总结
主要讲解了customer的作用–中介,同时作为中介,它是如何进行消息的生产与消费,最后也提到了消息的同步这块的知识。
![post-qrcode](https://deeplearn-1251474370.cos.ap-guangzhou.myqcloud.com/2020/12/20201230041553952.jpeg)