共计 17000 个字符,预计需要花费 43 分钟才能阅读完成。
学习还是要继续,在上一篇文章讲解PostOffice当中其实也提到了Van,但是那一篇文章主要还是在讲PostOffice相关的信息,所以Van 暂时搁置放到这里。还是延续之前的学习的思路,按照ps服务启动的顺序,到那个模块加载的时候,在重点介绍这块的信息。一句题外话,其实在写这些东西的时候,也是会看很多大佬们的讲解,单靠自己去看源码,很多时候也看不懂,也许c++代码看懂了,但是为什么这么设计?所以写的有些不足或者错误也欢迎email我。
ok,那我们先回顾一下Van是在哪里被调用的,是在Postoffice的启动过程中。InitEnvironment 负责创建,van_->Start(customer_id) 负责启动。
void Postoffice::InitEnvironment() {
const char* val = NULL;
std::string van_type = GetEnv("DMLC_PS_VAN_TYPE", "zmq");
van_ = Van::Create(van_type);
val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_WORKER"));
num_workers_ = atoi(val);
val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_SERVER"));
num_servers_ = atoi(val);
val = CHECK_NOTNULL(Environment::Get()->find("DMLC_ROLE"));
std::string role(val);
is_worker_ = role == "worker";
is_server_ = role == "server";
is_scheduler_ = role == "scheduler";
verbose_ = GetEnv("PS_VERBOSE", 0);
}
void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {
start_mu_.lock();
if (init_stage_ == 0) {
InitEnvironment();
// init glog
if (argv0) {
dmlc::InitLogging(argv0);
} else {
dmlc::InitLogging("ps-lite\0");
}
// init node info.
for (int i = 0; i < num_workers_; ++i) {
int id = WorkerRankToID(i);
for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup,
kWorkerGroup + kScheduler,
kWorkerGroup + kServerGroup + kScheduler}) {
node_ids_[g].push_back(id);
}
}
for (int i = 0; i < num_servers_; ++i) {
int id = ServerRankToID(i);
for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup,
kServerGroup + kScheduler,
kWorkerGroup + kServerGroup + kScheduler}) {
node_ids_[g].push_back(id);
}
}
for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup,
kScheduler + kWorkerGroup, kScheduler + kServerGroup}) {
node_ids_[g].push_back(kScheduler);
}
init_stage_++;
}
start_mu_.unlock();
// start van
van_->Start(customer_id);
start_mu_.lock();
if (init_stage_ == 1) {
// record start time
start_time_ = time(NULL);
init_stage_++;
}
start_mu_.unlock();
// do a barrier here
if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
}
Van 的创建
我们先看看Van对象的创建
Van* Van::Create(const std::string& type) {
if (type == "zmq") {
return new ZMQVan();
} else if (type == "p3") {
return new P3Van();
#ifdef DMLC_USE_IBVERBS
} else if (type == "ibverbs") {
return new IBVerbsVan();
#endif
} else {
LOG(FATAL) << "Unsupported van type: " << type;
return nullptr;
}
}
根据初始化传递的type参数,我们也可以判断出它是一个通信模块,zmq是一个开源的socket编程库,ZMQVan也是基于zmq的Van库,至于下面的p3和ibverbs是额外的两种方式,ibverbs是字节提交的库,有兴趣的可以研究下,对Van本身对理解影响不大。
前面也说了,程序启动分三次,分别是scheduler、server和worker的启动,这里还是先以scheduler这个角色启动为代表讲,跟之前一样,在实际的讲解过程中也会拓展到其他角色。
Van 的启动
先看下代码
void Van::Start(int customer_id) {
// get scheduler info
start_mu_.lock();
if (init_stage == 0) {
scheduler_.hostname = std::string(
CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
scheduler_.port =
atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
scheduler_.role = Node::SCHEDULER;
scheduler_.id = kScheduler;
is_scheduler_ = Postoffice::Get()->is_scheduler();
// get my node info
if (is_scheduler_) {
my_node_ = scheduler_;
} else {
auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
std::string ip;
if (nhost) ip = std::string(nhost);
if (ip.empty()) {
const char* itf = Environment::Get()->find("DMLC_INTERFACE");
std::string interface;
if (itf) interface = std::string(itf);
if (interface.size()) {
GetIP(interface, &ip);
} else {
GetAvailableInterfaceAndIP(&interface, &ip);
}
CHECK(!interface.empty()) << "failed to get the interface";
}
int port = GetAvailablePort();
const char* pstr = Environment::Get()->find("PORT");
if (pstr) port = atoi(pstr);
CHECK(!ip.empty()) << "failed to get ip";
CHECK(port) << "failed to get a port";
my_node_.hostname = ip;
my_node_.role = role;
my_node_.port = port;
// cannot determine my id now, the scheduler will assign it later
// set it explicitly to make re-register within a same process possible
my_node_.id = Node::kEmpty;
my_node_.customer_id = customer_id;
}
// bind.
my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
PS_VLOG(1) << "Bind to " << my_node_.DebugString();
CHECK_NE(my_node_.port, -1) << "bind failed";
// connect to the scheduler
Connect(scheduler_);
// for debug use
if (Environment::Get()->find("PS_DROP_MSG")) {
drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
}
// start receiver
receiver_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
init_stage++;
}
start_mu_.unlock();
if (!is_scheduler_) {
// let the scheduler know myself
Message msg;
Node customer_specific_node = my_node_;
customer_specific_node.customer_id = customer_id;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::ADD_NODE;
msg.meta.control.node.push_back(customer_specific_node);
msg.meta.timestamp = timestamp_++;
Send(msg);
}
// wait until ready
while (!ready_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
start_mu_.lock();
if (init_stage == 1) {
// resender
if (Environment::Get()->find("PS_RESEND") &&
atoi(Environment::Get()->find("PS_RESEND")) != 0) {
int timeout = 1000;
if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
}
resender_ = new Resender(timeout, 10, this);
}
if (!is_scheduler_) {
// start heartbeat thread
heartbeat_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
}
init_stage++;
}
start_mu_.unlock();
}
代码首先在获取scheduler的基础信息
//获取host
scheduler_.hostname = std::string(
CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
//获取端口
scheduler_.port =
atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
//赋值 SCHEDULER 角色名
scheduler_.role = Node::SCHEDULER;
// kScheduler=1 ,之前讲解nodeid的时候提到过 1,2,4分别代表相应的group
scheduler_.id = kScheduler;
// 判断当前的节点是否是 SCHEDULER 节点,这个也好理解,我们不同角色启动程序,只有在 SCHEDULER 角色启动这个才是 true
is_scheduler_ = Postoffice::Get()->is_scheduler();
Van的成员里面有个my_node是记录自己的node节点信息,接下来就是要当前的节点信息写进去
// get my node info
//如果是SCHEDULER 那么 my_node直接赋值
if (is_scheduler_) {
my_node_ = scheduler_;
} else {
//判断在worker或者server下的角色操作,主要也是获取ip、host和端口等信息
auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
std::string ip;
if (nhost) ip = std::string(nhost);
if (ip.empty()) {
const char* itf = Environment::Get()->find("DMLC_INTERFACE");
std::string interface;
if (itf) interface = std::string(itf);
if (interface.size()) {
GetIP(interface, &ip);
} else {
GetAvailableInterfaceAndIP(&interface, &ip);
}
CHECK(!interface.empty()) << "failed to get the interface";
}
int port = GetAvailablePort();
const char* pstr = Environment::Get()->find("PORT");
if (pstr) port = atoi(pstr);
CHECK(!ip.empty()) << "failed to get ip";
CHECK(port) << "failed to get a port";
my_node_.hostname = ip;
my_node_.role = role;
my_node_.port = port;
// cannot determine my id now, the scheduler will assign it later
// set it explicitly to make re-register within a same process possible
my_node_.id = Node::kEmpty;
my_node_.customer_id = customer_id;
}
获取基础的node信息之后就是绑定端口,这是为了后续通信服务使用
// bind.
my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
PS_VLOG(1) << "Bind to " << my_node_.DebugString();
CHECK_NE(my_node_.port, -1) << "bind failed";
接下来就是比较关键的点了,需要跟scheduler连接上
// connect to the scheduler
Connect(scheduler_);
这一点乍一看有点问题,按道理worker和server说连接scheduler还好理解,你说scheduler本身也要连接自己?
在connect函数中
// worker doesn't need to connect to the other workers. same for server
if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
return;
}
那么按照这段逻辑,在scheduler角色启动的时候,应该也是直接return,没有做任何的操作。跟这个源码的诸事应该保持一致。
至于其他的worker和server在启动的时候都要连接到 scheduler。这里也仅仅只是连接而已,只能说明在指定的端口,不同的node之间建立一个通道,实际上还没有进行任何有用的数据传输。
接下来就是启动一个接收消息的线程,负责消息的接收
// start receiver,启动一个消息接收的线程
receiver_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
真正干点事还是在这里
if (!is_scheduler_) {
// let the scheduler know myself
Message msg;
Node customer_specific_node = my_node_;
customer_specific_node.customer_id = customer_id;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::ADD_NODE;
msg.meta.control.node.push_back(customer_specific_node);
msg.meta.timestamp = timestamp_++;
Send(msg);
}
当当前的启动不是以 scheduler启动的时候,需要向 scheduler 注册信息,比如worker 节点1 启动了,那么需要让 scheduler 知道又个代号 1 的worker 节点即将上线。那么之前在启动scheduler 的那个消息接收的线程接收到这个请求之后,开始处理这个请求。
现在我们看看这个接收消息的线程是如何处理消息的?
void Van::Receiving() {
Meta nodes;
Meta recovery_nodes; // store recovery nodes
recovery_nodes.control.cmd = Control::ADD_NODE;
while (true) {
Message msg;
int recv_bytes = RecvMsg(&msg);
// For debug, drop received message
if (ready_.load() && drop_rate_ > 0) {
unsigned seed = time(NULL) + my_node_.id;
if (rand_r(&seed) % 100 < drop_rate_) {
LOG(WARNING) << "Drop message " << msg.DebugString();
continue;
}
}
CHECK_NE(recv_bytes, -1);
recv_bytes_ += recv_bytes;
if (Postoffice::Get()->verbose() >= 2) {
PS_VLOG(2) << msg.DebugString();
}
// duplicated message
if (resender_ && resender_->AddIncomming(msg)) continue;
if (!msg.meta.control.empty()) {
// control msg
auto& ctrl = msg.meta.control;
if (ctrl.cmd == Control::TERMINATE) {
ProcessTerminateCommand();
break;
} else if (ctrl.cmd == Control::ADD_NODE) {
ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes);
} else if (ctrl.cmd == Control::BARRIER) {
ProcessBarrierCommand(&msg);
} else if (ctrl.cmd == Control::HEARTBEAT) {
ProcessHearbeat(&msg);
} else {
LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
}
} else {
ProcessDataMsg(&msg);
}
}
}
在上面的Receiving函数中重点是那个while循环,在循环外定义了
Meta nodes;
Meta recovery_nodes; // store recovery nodes 这个是指之前有问题重新恢复使用的节点注册信息
在循环的刚开始那有些debug代码可以不用管
if (resender_ && resender_->AddIncomming(msg)) continue;
resender是一个重试机制,假设你之前的请求失败,失败的原因有很多比如超时,如果你设置了resender,那么就会重试
接下来就是要根据接受到的消息内容执行相应的任务
停止任务 ProcessTerminateCommand
void Van::ProcessTerminateCommand() {
PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
ready_ = false;
}
这个ready_标志位是表示当前这个节点的可用性,如果置为false那么就表示当前这个节点不可用。
新增节点 ProcessAddNodeCommand
void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
Meta* recovery_nodes) {
// 获取之前记录的dead node
auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
auto& ctrl = msg->meta.control;
UpdateLocalID(msg, &dead_set, nodes, recovery_nodes);
if (is_scheduler_) {
ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
} else {
for (const auto& node : ctrl.node) {
std::string addr_str = node.hostname + ":" + std::to_string(node.port);
if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
Connect(node);
connected_nodes_[addr_str] = node.id;
}
if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
}
PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
ready_ = true;
}
}
UpdateLocalID 的函数作用如下
此函数作用是更新节点内部的node id 信息,也是分为两种情况,函数逻辑如下:
- 如果msg->meta.sender是Meta::kEmpty,即未设定,则处理此message的一定是Scheduler,会进入 if 分支。
- 如果目前 nodes 的control.node数目小于 “配置的server数目 + 配置的worker数目”,则说明是系统启动阶段,将当前消息的node信息加入到 control.node 之中。
- 否则说明是系统运行阶段,应该是有些节点死掉重启后再次连接。那么,就从 nodes 的control.node 之中找到一个已经死掉的且节点role 与当前消息一致(同类型)的 node id,把这个 node id 赋给这个重启的节点。并且更新 nodes->control.node 和 recovery_nodes。
- 下面就是普通节点处理的逻辑:
- 即在 scheduler 传回来的所有节点信息中查找,目的是找到与自己的ip,port一致的节点。
- 如果找到,就更新本地节点信息(因为在本节点启动时候,并没有设置 node_id 这个信息,这个需要scheduler统一设置,从注释看,目的是为了使重新注册成为可能)。包括全局 rank 信息。
void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
Meta* nodes, Meta* recovery_nodes) {
auto& ctrl = msg->meta.control;
// 获取这个ps系统worker和server的总数
size_t num_nodes =
Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
// assign an id
if (msg->meta.sender == Meta::kEmpty) {
CHECK(is_scheduler_);
CHECK_EQ(ctrl.node.size(), 1);
//判断当前的已经正常运行的节点数与实际要求的节点数,如果小于实际要求数量,说明系统还处于启动阶段,还需要将
//这个节点加进来,处理这个只有scheduler才能处理
if (nodes->control.node.size() < num_nodes) {
nodes->control.node.push_back(ctrl.node[0]);
} else {
// some node dies and restarts ,节点挂掉和重启的处理逻辑
CHECK(ready_.load());
for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
const auto& node = nodes->control.node[i];
if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
node.role == ctrl.node[0].role) {
auto& recovery_node = ctrl.node[0];
// assign previous node id
recovery_node.id = node.id;
recovery_node.is_recovery = true;
PS_VLOG(1) << "replace dead node " << node.DebugString()
<< " by node " << recovery_node.DebugString();
nodes->control.node[i] = recovery_node;
recovery_nodes->control.node.push_back(recovery_node);
break;
}
}
}
}
// update my id,根据scheduler回传的数据更新自己的信息,这样才能保证注册成功
for (size_t i = 0; i < ctrl.node.size(); ++i) {
const auto& node = ctrl.node[i];
if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
my_node_ = node;
std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
#ifdef _MSC_VER
_putenv_s("DMLC_RANK", rank.c_str());
#else
setenv("DMLC_RANK", rank.c_str(), true);
#endif
}
}
}
}
ok,接下来才是真正的注册了
sheduler 任务
// 在 sheduler 处理添加节点
if (is_scheduler_) {
ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
}
看看 ProcessAddNodeCommandAtScheduler 做了哪些事情?
void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
Meta* recovery_nodes) {
recovery_nodes->control.cmd = Control::ADD_NODE;
time_t t = time(NULL);
// 还是获取系统要求的worker和server总数
size_t num_nodes =
Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
// 判断实际收到注册的节点数与要求的节点数一致,那么就需要按照一定规则排序分配id
if (nodes->control.node.size() == num_nodes) {
// 按照 host+port的方式来排序
std::sort(nodes->control.node.begin(), nodes->control.node.end(),
[](const Node& a, const Node& b) {
return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
});
// 分配节点rank值,循环遍历所有的节点
for (auto& node : nodes->control.node) {
std::string node_host_ip =
node.hostname + ":" + std::to_string(node.port);
//如果ip:port不存在van_中的话
if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
CHECK_EQ(node.id, Node::kEmpty);
// 根据server和worker的角色调用rank to id 方法,这个在上一篇文章中有提到
// server 2*rank+8 ,worker 2*rank+9
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_);
PS_VLOG(1) << "assign rank=" << id << " to node " << node.DebugString();
node.id = id;
// scheduler连接上这个节点
Connect(node);
// 更新心跳,这个心跳机制是为了监控这个节点是否还在线
Postoffice::Get()->UpdateHeartbeat(node.id, t);
connected_nodes_[node_host_ip] = id;
} else {
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_);
shared_node_mapping_[id] = connected_nodes_[node_host_ip];
node.id = connected_nodes_[node_host_ip];
}
// num_servers_ 这就是上面注释里面的rank(server 2*rank+8 )
if (node.role == Node::SERVER) num_servers_++;
if (node.role == Node::WORKER) num_workers_++;
}
// 开始向这些节点回复消息,消息的 cmd = Control::ADD_NODE
nodes->control.node.push_back(my_node_);
nodes->control.cmd = Control::ADD_NODE;
Message back;
back.meta = *nodes;
for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
back.meta.recver = recver_id;
back.meta.timestamp = timestamp_++;
Send(back);
}
}
PS_VLOG(1) << "the scheduler is connected to " << num_workers_
<< " workers and " << num_servers_ << " servers";
// 这里表示 scheduler 已经准备好了
ready_ = true;
} else if (!recovery_nodes->control.node.empty()) {
// 处理重启节点
// 回去失去心跳的dead node
auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
// send back the recovery node
CHECK_EQ(recovery_nodes->control.node.size(), 1);
//连接节点和更新心跳
Connect(recovery_nodes->control.node[0]);
Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
Message back;
for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
if (r != recovery_nodes->control.node[0].id &&
dead_set.find(r) != dead_set.end()) {
// do not try to send anything to dead node
continue;
}
// only send recovery_node to nodes already exist
// but send all nodes to the recovery_node
back.meta =
(r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
back.meta.recver = r;
back.meta.timestamp = timestamp_++;
Send(back);
}
}
}
worker和server任务
for (const auto& node : ctrl.node) {
std::string addr_str = node.hostname + ":" + std::to_string(node.port);
//在已连接的节点里去找这个要即将 add node 的节点
if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
//没找到则说明是个新节点,那么就连接这个节点并且记录下来
Connect(node);
connected_nodes_[addr_str] = node.id;
}
//根据角色名更新系统的server 和worker计数,
if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
}
PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
ready_ = true;
至此 ProcessAddNodeCommand 所有的任务都完成了。
阻塞任务 ProcessBarrierCommand
这块在上一篇文章讲了,我放个链接吧!https://www.deeplearn.me/4303.html
心跳管理 ProcessHearbeat
void Van::ProcessHearbeat(Message* msg) {
auto& ctrl = msg->meta.control;
time_t t = time(NULL);
for (auto& node : ctrl.node) {
//定时更新心跳,心跳记录即使时间戳
Postoffice::Get()->UpdateHeartbeat(node.id, t);
//scheduler节点需要相应心跳请求
if (is_scheduler_) {
Message heartbeat_ack;
heartbeat_ack.meta.recver = node.id;
heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
heartbeat_ack.meta.control.node.push_back(my_node_);
heartbeat_ack.meta.timestamp = timestamp_++;
// send back heartbeat
Send(heartbeat_ack);
}
}
}
为了记录网络的可达性,PS Lite 设计了心跳机制。具体而言:
- 每一个节点的 PostOffice 单例中维护了一个 MAP 结构,存储了心跳关联的节点的活跃信息。键为节点编号,值为上次收到其 HEARTBEAT 消息的时间戳。
- Worker/Server 只记录 Scheduler 的心跳,Scheduler 则记录所有节点的心跳。基于时间戳和心跳超时,可以输出所有的死亡节点。
- 每一个 Worker/Server 节点,会新建立一个心跳线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息;
- Scheduler 节点收到后,响应一个 HEARTBEAT 消息。
- scheduler进行应答,通过当前时间与心跳包接收时间之差判断是否alive。
- Scheduler 会依据心跳节点的时间戳来判断死亡节点。如果新增的节点id在dead_node容器里,表示这个节点是重新恢复的;而新增节点通过schedular的中转与现有节点形成互联。
在Van::start函数中,给出了心跳的调用,对于非 scheduler 节点启动了一个线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息:
if (!is_scheduler_) {
// start heartbeat thread
heartbeat_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
}
具体心跳函数是:
void Van::Heartbeat() {
//从环境变量里读取间隔时间,定时发送心跳请求
const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
while (interval > 0 && ready_.load()) {
std::this_thread::sleep_for(std::chrono::seconds(interval));
Message msg;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::HEARTBEAT;
msg.meta.control.node.push_back(my_node_);
msg.meta.timestamp = timestamp_++;
Send(msg);
}
}
差不多到这里 Van的启动已经基本上都完成了。其实在Van里面还有关于通信消息的优化,比如Pb序列化,有兴趣可以去看看。