一、数据存储

持久化

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中。
org.apache.zookeeper.server.persistence包下的相关类都是序列化相关的代码。

image-20220327142001515

处理日志接口TxnLog

处理快照接口SnapShot

序列化

image-20220327143155959

zookeeper-jute代码是关于Zookeeper序列化相关源码

1
2
interface InputArchive
interface OutputArchive

二、服务端初始源码

image-20220327144747642

zkServer.sh

从zk的脚本看加载的类是org.apache.zookeeper.server.quorum.QuorumPeerMain

加载的配置的zkEnv.sh中的zoo.cfg

QuorumPeerMain

  1. main方法中初始化一个QuorumPeerMain对象,调用initializeAndRun()方法王超初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
    {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
    config.parse(args[0]);
    }

    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
    .getDataDir(), config.getDataLogDir(), config
    .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
    } else {
    LOG.warn("Either no config or no quorum defined in config, running "
    + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
    }
    }
  2. initializeAndRun的第一步就是new一个QuorumPeerConfig对象,调用parse解析参数,通过文件流的方式读取参数, parseProperties方法读取参数,最后调用setupQuorumPeerConfig进一步解析参数。

    1
    2
    3
    4
    5
    6
    7
    void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    setupMyId(); //读取myid赋值给serverId
    setupClientPort(); //设置客户端端口
    setupPeerType();
    checkValidity();
    }
  3. 第二步new一个 DatadirCleanupManager,执行过期快照删除的任务

    1
    2
    3
    4
    5
    6
     DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
    .getDataDir(), config.getDataLogDir(), config
    .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
    //config.getSnapRetainCount() 最少保留的快照数3
    //config.getPurgeInterval() 执行删除历史快照

    DatadirCleanupManager

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class DatadirCleanupManager {
    public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
    LOG.warn("Purge task is already running.");
    return;
    }
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
    LOG.info("Purge task is not scheduled.");
    return;
    }

    timer = new Timer("PurgeTask", true);
    //启动一个线程,执行清理任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

    purgeTaskStatus = PurgeTaskStatus.STARTED;
    }
    }
  4. 第4步runFromConfig(config),创建通信工厂,启动通信

    1
    2
    3
    4
    5
    6
    if (config.getClientPortAddress() != null) {
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(),
    config.getMaxClientCnxns(),
    false);
    }

    创建通信工厂默认NIO, NIOServerCnxnFactory

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
    System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
    serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
    ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
    .getDeclaredConstructor().newInstance();
    LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
    return serverCnxnFactory;
    } catch (Exception e) {
    IOException ioe = new IOException("Couldn't instantiate "
    + serverCnxnFactoryName);
    ioe.initCause(e);
    throw ioe;
    }
    }

    configure(),初始化NIO的socket,绑定2181端口

    启动对应zk

    1
    2
    3
    quorumPeer.initialize();
    quorumPeer.start();
    quorumPeer.join();

三、服务端加载数据

  1. zk中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode
  2. zk集群中的DataTree.时刻保持状态同步
  3. Zookeeper集群中每个zk节点中,数据在内存和磁盘中都有一份完整的数据。
    • 内存数据:DataTrees
    • 磁盘数据:快照文件+编辑日志
image-20220327201531840

QuorumPeer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//加载数据
private void loadDataBase() {
try {
zkDb.loadDataBase();

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}

public long loadDataBase() throws IOException {
//加载数据
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}

//restore方法
//将磁盘的数据反序列化到磁盘中
//如何从编辑日志中恢复数据

四、选举

image-20220327203528530

流程

image-20220328222437258

选举准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection(); //开始选举
super.start();
}

startLeaderElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
synchronized public void startLeaderElection() {
try {
//创建选票,myid,zxid事务id,epoch任期
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}

// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}

createElectionAlgorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//创建通讯,相关队列,创建各种队列
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
//启动监听,一下线程,客户端会一直接收消息,阻塞
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
//创建FastLeaderElection,创建发送队列,发送选票,创建接收队列接收选票
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

选举执行

image-20220328225324352

start方法中的super.start();启动线程中的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public void run() {
updateThreadName();

...

try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//设置当前选票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
...
start_fle = Time.currentElapsedTime();
}
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);

for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}

jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
}
}

lookForLeader寻找Leader时

第一步,更新选择人

1
2
3
4
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

第二步:发送选票sendNotifications

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
//由队列发送消息
sendqueue.offer(notmsg);
}
}

第三步:由内部类WorkerSender负责发送选票,循环处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;

process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
//由具体管理者发送
manager.toSend(m.sid, requestBuffer);

}

第四步 由具体manager发送这里是QuorumCnxManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) { //发送给自己,添加到自己的接收队列中
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else { //向外发
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b); //添加到发送队列
} else {
addToSendQueue(bq, b);
}
connectOne(sid); //连接发送的节点

}
}

第五步:连接到节点,创建socket连接、接收和发送的流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server " + sid);
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
return;
}
if (lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
if (!knownId) {
LOG.warn("Invalid server id: " + sid);
return;
}
}
}

第六步:创建SendWorkRecvWorkerSendWork不断向外进行写操作,RecvWorker会不断接收消息添加到recvQueue队列中。

第七不:选举类中的WorkerReceiver,会不断从recvQueue中取得数据,处理,然后发送到接收队列recequeue中,选举算法就会统计这些选票有没有超过半数,超过就会成为Leader

五、Follower和Leader进行状态同步

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader 更
新自己状态为Leader,其他节点更新自己状态为Follower。
Leader 更新状态入口:leader.lead()
Follower更新状态入口: follower.followerLeader()
注意:

  1. follower 必须要让leader 知道自己的状态: epoch、zxid、sid.
    必须要找出谁是leader;
    发起请求连接leader;e发送自己的信息给leader;
    leader接收到信息,必须要返回对应的信息给follower。
  2. 当leader得知 follower的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
  3. 执行数据同步
  4. 当leader 接收到超过半数follower的 ack 之后,进入正常工作状态,集群启动完

最终总结同步的方式:e

  1. DIFF 咱两一样,不需要做什么
  2. TRUNC follower 的 zxid比 leader 的 zxid大,所以Follower要回滚
  3. COMMIT leader的 zxid,比 follower 的zxid,大,发送Proposal 给follower,提交执行
  4. 如果follower并没有任何数据,直接使用SNAP的方式来执行数据同步(直接把数裾全部序列到follower)

image-20220328230724600

image-20220328231026796

状态变更后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}

Leader,会调用leader.lead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void lead() throws IOException, InterruptedException {
....
//对每个节点创建接收,得到follower的注册
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
....
}

public void run() {
while (!stop) {
Socket s = null;
boolean error = false;
try {
s = ss.accept();

// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);

BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
//为每个follower创建LearnerHandler
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
//这里会接收到对应的消息,这里会把epochid发送给对方,并等待对方的应答,并且会通过syncFollower选择同步策略,2阶段提交
fh.start();
}
}
}

FOLLOWING,follower.followLeader();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void followLeader() throws InterruptedException {
...
//找到leader
QuorumServer leaderServer = findLeader();
//连接leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
//注册,把自己的信息发送给leader,获得对应的数据
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
//同步数据
syncWithLeader(newEpochZxid);
//一直读取服务器的信息并处理 2阶段提交。提案和提交
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}

}

protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}

服务器Leader启动

image-20220328233155546

集群同步之后就启动服务startZkServer()

调用Zookeeper中的startup()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();

registerJMX();

setState(State.RUNNING);
notifyAll();
}

protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
//预处理线程,会一直等待请求,并根据请求执行相关操作
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

服务器Follower启动

image-20220328233709486

客户端启动

image-20220328233901984