使用ZooKeeper实现分布式队列、分布式锁和选举详解
java1234
共 36258字,需浏览 73分钟
·
2021-08-01 16:26
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
1、分布式队列
1)、offer方法
public class DistributedQueue {
public boolean offer(byte[] data) throws KeeperException, InterruptedException {
for (; ; ) {
try {
zookeeper.create(dir + "/" + prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
return true;
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
}
}
}
2)、element方法
public class DistributedQueue {
public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {
Map<Long, String> orderedChildren;
while (true) {
try {
//获取所有排好序的子节点
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException();
}
if (orderedChildren.size() == 0) {
throw new NoSuchElementException();
}
//返回队头节点的数据
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
try {
return zookeeper.getData(dir + "/" + headNode, false, null);
} catch (KeeperException.NoNodeException e) {
//另一个客户端已经移除了队头节点,尝试获取下一个节点
}
}
}
}
}
private Map<Long, String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
Map<Long, String> orderedChildren = new TreeMap<>();
List<String> childNames;
childNames = zookeeper.getChildren(dir, watcher);
for (String childName : childNames) {
try {
if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
LOG.warn("Found child node with improper name: {}", childName);
continue;
}
String suffix = childName.substring(prefix.length());
Long childId = Long.parseLong(suffix);
orderedChildren.put(childId, childName);
} catch (NumberFormatException e) {
LOG.warn("Found child node with improper format : {}", childName, e);
}
}
return orderedChildren;
}
3)、remove方法
public class DistributedQueue {
public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
Map<Long, String> orderedChildren;
while (true) {
try {
//获取所有排好序的子节点
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException();
}
if (orderedChildren.size() == 0) {
throw new NoSuchElementException();
}
//移除队头节点
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, false, null);
zookeeper.delete(path, -1);
return data;
} catch (KeeperException.NoNodeException e) {
//另一个客户端已经移除了队头节点,尝试移除下一个节点
}
}
}
}
2、分布式锁
1)、排他锁
2)、羊群效应
3)、共享锁
4)、排他锁源码解析
public class WriteLock extends ProtocolSupport {
public synchronized boolean lock() throws KeeperException, InterruptedException {
if (isClosed()) {
return false;
}
//确认持久父节点是否存在
ensurePathExists(dir);
//真正获取锁的逻辑 调用ProtocolSupport的retryOperation()方法
return (Boolean) retryOperation(zop);
}
class ProtocolSupport {
protected Object retryOperation(ZooKeeperOperation operation)
throws KeeperException, InterruptedException {
KeeperException exception = null;
for (int i = 0; i < RETRY_COUNT; i++) {
try {
//调用LockZooKeeperOperation的execute()方法
return operation.execute();
} catch (KeeperException.SessionExpiredException e) {
LOG.warn("Session expired {}. Reconnecting...", zookeeper, e);
throw e;
} catch (KeeperException.ConnectionLossException e) {
if (exception == null) {
exception = e;
}
LOG.debug("Attempt {} failed with connection loss. Reconnecting...", i);
retryDelay(i);
}
}
throw exception;
}
public class WriteLock extends ProtocolSupport {
private class LockZooKeeperOperation implements ZooKeeperOperation {
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
throws KeeperException, InterruptedException {
List<String> names = zookeeper.getChildren(dir, false);
for (String name : names) {
if (name.startsWith(prefix)) {
id = name;
LOG.debug("Found id created last time: {}", id);
break;
}
}
if (id == null) {
id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);
LOG.debug("Created id: {}", id);
}
}
@SuppressFBWarnings(
value = "NP_NULL_PARAM_DEREF_NONVIRTUAL",
justification = "findPrefixInChildren will assign a value to this.id")
public boolean execute() throws KeeperException, InterruptedException {
do {
if (id == null) {
long sessionId = zookeeper.getSessionId();
String prefix = "x-" + sessionId + "-";
//创建临时顺序节点
findPrefixInChildren(prefix, zookeeper, dir);
idName = new ZNodeName(id);
}
//获取所有子节点
List<String> names = zookeeper.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn("No children in: {} when we've just created one! Lets recreate it...", dir);
id = null;
} else {
//对所有子节点进行排序
SortedSet<ZNodeName> sortedNames = new TreeSet<>();
for (String name : names) {
sortedNames.add(new ZNodeName(dir + "/" + name));
}
ownerId = sortedNames.first().getName();
SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
//是否存在序号比自己小的节点
if (!lessThanMe.isEmpty()) {
ZNodeName lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
LOG.debug("Watching less than me node: {}", lastChildId);
//有序号比自己小的节点,则调用exist()向前一个节点注册watcher
Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
} else {
LOG.warn("Could not find the stats for less than me: {}", lastChildName.getName());
}
}
//没有序号比自己小的节点,则获取锁
else {
if (isOwner()) {
LockListener lockListener = getLockListener();
if (lockListener != null) {
lockListener.lockAcquired();
}
return Boolean.TRUE;
}
}
}
}
while (id == null);
return Boolean.FALSE;
}
public class WriteLock extends ProtocolSupport {
public synchronized void unlock() throws RuntimeException {
if (!isClosed() && id != null) {
try {
//删除当前节点,此时会触发后一个节点的watcher
ZooKeeperOperation zopdel = () -> {
zookeeper.delete(id, -1);
return Boolean.TRUE;
};
zopdel.execute();
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
} catch (KeeperException e) {
LOG.warn("Unexpected exception", e);
throw new RuntimeException(e.getMessage(), e);
} finally {
LockListener lockListener = getLockListener();
if (lockListener != null) {
lockListener.lockReleased();
}
id = null;
}
}
}
3、选举
public class LeaderElectionSupport implements Watcher {
public synchronized void start() {
state = State.START;
dispatchEvent(EventType.START);
LOG.info("Starting leader election support");
if (zooKeeper == null) {
throw new IllegalStateException(
"No instance of zookeeper provided. Hint: use setZooKeeper()");
}
if (hostName == null) {
throw new IllegalStateException(
"No hostname provided. Hint: use setHostName()");
}
try {
//发起选举请求 创建临时顺序节点
makeOffer();
//选举请求是否被满足
determineElectionStatus();
} catch (KeeperException | InterruptedException e) {
becomeFailed(e);
}
}
private void makeOffer() throws KeeperException, InterruptedException {
state = State.OFFER;
dispatchEvent(EventType.OFFER_START);
LeaderOffer newLeaderOffer = new LeaderOffer();
byte[] hostnameBytes;
synchronized (this) {
newLeaderOffer.setHostName(hostName);
hostnameBytes = hostName.getBytes();
newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL));
leaderOffer = newLeaderOffer;
}
LOG.debug("Created leader offer {}", leaderOffer);
dispatchEvent(EventType.OFFER_COMPLETE);
}
private void determineElectionStatus() throws KeeperException, InterruptedException {
state = State.DETERMINE;
dispatchEvent(EventType.DETERMINE_START);
LeaderOffer currentLeaderOffer = getLeaderOffer();
String[] components = currentLeaderOffer.getNodePath().split("/");
currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1].substring("n_".length())));
//获取所有子节点并排序
List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));
for (int i = 0; i < leaderOffers.size(); i++) {
LeaderOffer leaderOffer = leaderOffers.get(i);
if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {
LOG.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i);
dispatchEvent(EventType.DETERMINE_COMPLETE);
//如果当前节点是第一个,则成为Leader
if (i == 0) {
becomeLeader();
}
//如果有选举请求在当前节点前面,则进行等待,调用exist()向前一个节点注册watcher
else {
becomeReady(leaderOffers.get(i - 1));
}
break;
}
}
}
作者 | 邋遢的流浪剑客
来源 | csdn.net/qq_40378034/article/details/117014648
评论