lease
monitor节点分为leader和peon两种角色,monitor会定义向peon发送lease消息,保证副本在一定的时间内可读.只要有一个peon没有
回复lease的消息就会发起重新选举.
lease消息是从extend_lease 这个函数发出的
void Paxos::extend_lease()
{
#从这里知道只有leader节点能发送lease消息
assert(mon->is_leader());
//assert(is_active());
#清空acked_lease并将自己加入其中,注意这里可以看出是通过插入monitor节点的rank
acked_lease.clear();
acked_lease.insert(mon->rank);
#向所有的monitor节点发送lease消息,但也包括leader节点
// bcast
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end(); ++p) {
#由于leader节点的rank已经加入到acked_lease中了,所以就不用给leader节点发送lease消息了
if (*p == mon->rank) continue;
#新建lease消息
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
ceph_clock_now());
lease->last_committed = last_committed;
lease->lease_timestamp = lease_expire;
lease->first_committed = first_committed;
#发送消息
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
}
#如果peon节点没有在规定的时间内回复lease消息,则会信息这个timeout的回调函数lease_ack_timeout
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
lease_ack_timeout_event = mon->timer.add_event_after(
g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_ack_timeout();
}));
}
#准备下一次发送lease消息,这里是通过lease_renew_timeout->extend_lease 来周期发送lease消息的
// set renew event
utime_t at = lease_expire;
at -= g_conf->mon_lease;
at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
lease_renew_event = mon->timer.add_event_at(
at, new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_renew_timeout();
}));
}
下面我们来看看peon收到lease消息后的处理函数。
所有消息的处理函数都在dispatch中,我们这里分析的是OP_LEASE消息,其对应的处理函数是handle_lease
void Paxos::handle_lease(MonOpRequestRef op)
{
#从mon->is_peon()可以知道只有peon节点才能回复lease消息。
// sanity
if (!mon->is_peon() ||
last_committed != lease->last_committed) {
dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
<< " or the last_committed doesn't match, dropping" << dendl;
op->mark_paxos_event("invalid lease, ignore");
return;
}
#从这里知道回复给leader节点的是OP_LEASE_ACK 消息
// ack
MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
ceph_clock_now());
lease->get_connection()->send_message(ack);
}
leader节点收到消息OP_LEASE_ACK消息后的处理函数是handle_lease_ack
void Paxos::handle_lease_ack(MonOpRequestRef op)
{
#可以看到leader节点收到所有peon发送的ack消息后就通过mon->timer.cancel_event 来取消timer,如果有一个peon没有回复leader节点,则这里
#就不会取消time,因此在extend_lease 中定义的timer就会过期,就会调用lease_ack_timeout 来重新选举
if (acked_lease == mon->get_quorum()) {
// yay!
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- got everyone" << dendl;
mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
}
}
timer的到期函数如下:可见是通过mon->bootstrap()来重新发起选举的.
void Paxos::lease_ack_timeout()
{
dout(1) << "lease_ack_timeout -- calling new election" << dendl;
assert(mon->is_leader());
assert(is_active());
logger->inc(l_paxos_lease_ack_timeout);
lease_ack_timeout_event = 0;
mon->bootstrap();
}
相关阅读
ReleaseSemaphore 功能按指定数量增加指定信号量对象的计数。语法C++BOOL WINAPI ReleaseSemaphore(_In_ HANDLE hSemaph
CreateMutex、WaitForSingleObject、ReleaseMutex——
CreateMutexCreateMutex作用是找出当前系统是否已经存在指定进程的实例。如果没有则创建一个互斥体。互斥对象是系统内核维护的一