zk源码—1.数据节点与Watcher机制及权限二
nanshan 2025-05-02 12:25 11 浏览 0 评论
大纲
1.ZooKeeper的数据模型、节点类型与应用
(1)数据模型之树形结构
(2)节点类型与特性(持久 + 临时 + 顺序 )
(3)节点的状态结构(各种zxid + 各种version)
(4)节点的版本(version + cversion + aversion)
(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)
2.发布订阅模式:用Watcher机制实现分布式通知
(1)Watcher机制是如何实现的
(2)Watcher机制的底层原理
(3)客户端Watcher注册实现过程
(4)服务端处理Watcher过程
(5)服务端Watch事件的触发过程
(6)客户端回调Watcher的处理过程
(7)利用Watcher实现发布订阅
(8)Watcher具有的特性
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
2.发布订阅模式:用Watcher机制实现分布式通知
(4)服务端处理Watcher过程
zk服务端处理Watcher事件基本有两个过程:
一.判断收到的请求是否需要注册Watcher事件
二.将对应的Watcher事件存储到WatchManager
以下是zk服务端处理Watcher的序列图:
zk服务端接收到客户端请求后的具体处理:
一.当服务端收到客户端标记了Watcher事件的getData请求时,会调用到FinalRequestProcessor的processRequest()方法,判断当前客户端请求是否需要注册Watcher事件。
二.当getDataRequest.getWatch()的值为true时,则表明当前客户端请求需要进行Watcher注册。
三.然后将当前的ServerCnxn对象(即Watcher事件)和数据节点路径,传入到zks.getZKDatabase()的getData()方法中来实现Watcher事件的注册,也就是实现存储Watcher事件到WatchManager中。具体就是:调用DataTree.dataWatches这个WatchManager的addWatch()方法,将该客户端请求的Watcher事件(也就是ServerCnxn对象)存储到DataTree.dataWatches这个WatchManager的两个HashMap(watchTable和watch2Paths)中。
补充说明:
首先,ServerCnxn对象代表了一个客户端和服务端的连接。ServerCnxn接口的默认实现是NIOServerCnxn,也可以选NettyServerCnxn。由于NIOServerCnxn和NettyServerCnxn都实现了Watcher的process接口,所以可以把ServerCnxn对象看作是一个Watcher对象。
然后,zk服务端的数据库DataTree中会有两个WatchManager,分别是dataWatches和childWatches,分别对应节点和子节点数据变更。
接着,WatchManager中有两个HashMap:watch2Paths和watchTable。当前的ServerCnxn对象和数据节点路径最终会被存储在这两HashMap中。watchTable可以根据数据节点路径来查找对应的Watcher,watch2Paths可以根据Watcher来查找对应的数据节点路径。
同时,WatchManager除了负责添加Watcher事件,还负责触发Watcher事件,以及移除那些已经被触发的Watcher事件。
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
...
public void processRequest(Request request) {
...
ServerCnxn cnxn = request.cnxn;
...
switch (request.type) {
...
case OpCode.getData: {
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
...
}
case OpCode.getChildren: {
...
List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
...
}
}
...
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private ZKDatabase zkDb;
private FileTxnSnapLog txnLogFactory = null;
...
public ZKDatabase getZKDatabase() {
return this.zkDb;
}
...
}
public class ZKDatabase {
protected DataTree dataTree;
protected FileTxnSnapLog snapLog;
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
return dataTree.getData(path, stat, watcher);
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
return dataTree.getChildren(path, stat, watcher);
}
...
}
public class DataTree {
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
return new ArrayList<String>(n.getChildren());
}
}
...
}
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
...
synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
...
}
(5)服务端Watch事件的触发过程
对于标记了Watcher注册的请求,zk会将其对应的ServerCnxn对象(Watcher事件)存储到DataTree里的WatchManager的HashMap(watchTable和watch2Paths)中。之后,当服务端对指定节点进行数据更新后,会通过调用DataTree里的WatchManager的triggerWatch()方法来触发Watcher。
无论是触发DataTree的dataWatches,还是触发DataTree的childWatches,Watcher的触发逻辑都是一样的。
具体的Watcher触发逻辑如下:
步骤一:首先封装一个具有这三个属性的WatchedEvent对象:通知状态(KeeperState)、事件类型(EventType)、数据节点路径(path)。
步骤二:然后根据数据节点路径从DateTree的WatchManager中取出Watcher。如果为空,则说明没有任何客户端在该数据节点上注册过Watcher。如果存在,则将Watcher事件添加到自定义的Wathcers集合中,并且从DataTree的WatchManager的watchTable和watch2Paths中移除。最后调用Watcher的process()方法向客户端发送通知。
public class DataTree {
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public Stat setData(String path, byte data[], int version, long zxid, long time) {
Stat s = new Stat();
DataNode n = nodes.get(path);
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
...
}
class WatchManager {
...
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
...
}
具体的Watcher的process()方法,会由NIOServerCnxn来实现。Watcher的process()方法的具体逻辑如下:
步骤一:标记响应头ReplyHeader的xid为-1,表示当前响应是一个通知。
步骤二:将触发WatchManager.triggerWatch()方法时封装的WatchedEvent,包装成WatcherEvent,以便进行网络传输序列化。
步骤三:向客户端发送响应。
public interface Watcher {
abstract public void process(WatchedEvent event);
...
}
public abstract class ServerCnxn implements Stats, Watcher {
...
public abstract void process(WatchedEvent event);
}
public class NIOServerCnxn extends ServerCnxn {
...
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
}
(6)客户端回调Watcher的处理过程
对于来自服务端的响应:客户端会使用SendThread的readResponse()方法来进行统一处理。如果反序列化后得到的响应头replyHdr的xid为-1,则表明这是一个通知类型的响应。
SendThread接收事件通知的处理步骤如下:
步骤一:反序列化成WatcherEvent对象
zk客户端接收到请求后,首先将字节流反序列化成WatcherEvent对象。
步骤二:处理chrootPath
如果客户端设置了chrootPath为/app,而服务端响应的节点路径为/app/a,那么经过chrootPath处理后,就会统一变成一个相对路径:/a。
步骤三:还原成WatchedEvent对象
将WatcherEvent对象转换成WatchedEvent对象。
步骤四:回调Watcher
通过调用EventThread的queueEvent()方法,将WatchedEvent对象交给EventThread线程来回调Watcher。所以服务端的Watcher事件通知,最终会交给EventThread线程来处理。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
}
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
...
class SendThread extends ZooKeeperThread {
...
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
...
while (state.isAlive()) {
...
//将outgoingQueue里的请求发送出去 + 处理接收到的响应
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
...
}
}
void readResponse(ByteBuffer incomingBuffer) {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
...
//处理事务回调
if (replyHdr.getXid() == -1) {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else
...
}
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
return;
}
...
finishPacket(packet);
}
...
}
}
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
...
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {
...
doIO(pendingQueue, cnxn);
...
}
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {
SocketChannel sock = (SocketChannel) sockKey.channel();
...
//处理接收响应
if (sockKey.isReadable()) {
...
sendThread.readResponse(incomingBuffer);
...
}
//处理发送请求
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
p.createBB();
sock.write(p.bb);
outgoingQueue.removeFirstOccurrence(p);
pendingQueue.add(p);
...
}
...
}
}
EventThread线程是zk客户端专门用来处理服务端事件通知的线程。
EventThread处理事件通知的步骤如下:
步骤一:EventThread的queueEvent()方法首先会根据WatchedEvent对象,从ZKWatchManager中取出所有注册过的客户端Watcher。
步骤二:然后从ZKWatchManager的管理中删除这些Watcher。这也说明客户端的Watcher机制是一次性的,触发后就会失效。
步骤三:接着将所有获取到的Watcher放入waitingEvents队列中。
步骤四:最后EventThread线程的run()方法,通过循环的方式,每次都会从waitingEvents队列中取出一个Watcher进行串行同步处理。也就是调用EventThread线程的processEvent()方法来最终执行实现了Watcher接口的process()方法,从而实现回调处理。
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
private final ClientWatchManager watcher;
...
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
//通过以下这两个变量来实现waitingEvents为空时,加入的Watcher要马上执行,而不用等待run()方法
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
...
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
//对WatchedEvent对象进行处理,从ZKWatchManager的管理中删除这些Watcher
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
//将获取到的所有Watcher放入waitingEvents队列
waitingEvents.add(pair);
}
...
public void run() {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
}
private void processEvent(Object event) {
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
watcher.process(pair.event);
}
}
...
}
...
}
}
public class ZooKeeper implements AutoCloseable {
...
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
...
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
...
}
return result;
}
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
...
}
...
}
总结zk的Watcher机制处理过程:
一.zk是通过在客户端和服务端创建观察者信息列表来实现Watcher机制的。
二.客户端调用getData()、getChildren()、exist()等方法时,会将Watcher事件放到本地的ZKWatchManager中进行管理。
三.服务端在接收到客户端的请求后首先判断是否需要注册Watcher,若是则将ServerCnxn对象当成Watcher事件放入DataTree的WatchManager中。
四.服务端触发Watcher事件时,会根据节点路径从WatchManager中取出对应的Watcher,然后发送通知类型的响应给客户端。
五.客户端在接收到通知类型的响应后,首先通过SendThread线程提取出WatchedEvent对象。然后将WatchedEvent对象交给EventThread线程来回调Watcher。也就是查询本地的ZKWatchManager获得对应的Watcher事件,删除ZKWatchManager的Watcher并将Watcher放入waitingEvents队列。后续EventThread线程便会在其run()方法中串行出队waitingEvents,执行Watcher的process()回调。
客户端的Watcher管理器是ZKWatchManager。
服务端的Watcher管理器是WatchManager。
以上的处理设计实现了一个分布式环境下的观察者模式,通过将客户端和服务端处理Watcher事件时所需要的信息分别保存在两端,减少了彼此通信的内容,大大提升了服务的处理性能。
(7)利用Watcher实现发布订阅
一.发布订阅系统一般有推模式和拉模式
推模式是指服务端主动将数据更新发送给所有订阅的客户端,拉模式是指客户端主动发起请求来获取最新数据(定时轮询拉取)。
二.zk采用了推拉相结合来实现发布和订阅功能
首先客户端需要向服务端注册自己关注的节点(添加Watcher事件)。一旦该节点发生变更,服务端就会向客户端发送Watcher事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。
如果将配置信息放到zk上进行集中管理,那么应用启动时需要主动到zk服务端获取配置信息,然后在指定节点上注册一个Watcher监听。接着只要配置信息发生变更,zk服务端就会实时通知所有订阅的应用。从而让应用能实时获取到所订阅的配置信息节点已发生变更了的消息。
注意:原生zk客户端可以通过getData()、exists()、getChildren()三个方法,向zk服务端注册Watcher监听。而且注册到Watcher监听具有一次性,所以zk客户端获得服务端的节点变更通知后需要再次注册Watcher。
三.使用zk来实现发布订阅功能总结
步骤一:将配置信息存储到zk的节点上。
步骤二:应用启动时先从zk节点上获取配置信息,然后再向该zk节点注册一个数据变更的Watcher监听。一旦该zk节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到zk服务端发过来的数据变更通知后重新获取最新数据。
(8)Watcher具有的特性
一.一次性
无论是客户端还是服务端,一旦Watcher被触发或者回调,zk都会将其移除,所以使用zk的Watcher时需要反复注册。这样的设计能够有效减轻服务端的压力。否则,如果一个Watcher注册后一直有效,那么频繁更新的节点就会频繁发送通知给客户端,这样就会影响网络性能和服务端性能。
二.客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序。注意不要因一个Watcher的处理逻辑而影响整个客户端的Watcher回调。
三.轻量
WatchedEvent是Watcher机制的最小通知单元,WatchedEvent只包含三部分内容:通知状态、事件类型和节点路径。所以Watcher通知非常简单,只告诉客户端发生的事件,不包含具体内容。所以原始数据和变更后的数据无法从WatchedEvent中获取,需要客户端主动重新去获取数据。
客户端向服务端注册Watcher时:不会把客户端真实的Watcher对象传递到服务端,只会在客户端请求中使用boolean属性来标记Watcher对象,服务端也只会保存当前连接的ServerCnxn对象。
这种轻量的Watcher设计机制,在网络开销和服务端内存开销上都是很低的。
(9)总结
有一个问题:当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?
答案是否定的,通过对zk内部实现机制的解析可以知道:Watcher事件的触发机制取决于会话的连接状态和客户端注册事件的类型,当客户端会话状态或数据节点发生改变时,都会触发对应的Watcher事件。Watcher具有一次性,曾经的监控要重新监控。
3.ACL权限控制:避免未经授权的访问
(1)ACL的使用(scheme:id:permission)
(2)实现自己的权限控制
(3)ACL内部实现原理之客户端处理过程
(4)ACL内部实现原理之服务端实现过程
(5)ACL权限总结
前面介绍完了数据模型、Watcher监控机制,并实现了在分布式环境中经常用到的分布式锁、配置管理等功能,这些功能的本质都在于操作数据节点。如果作为分布式锁或配置项的数据节点被错误删除或修改,那么对整个分布式系统有很大的影响,甚至会造成严重的生产事故。所以zk提供了一个很好的解决方案,那就是ACL权限控制。
(1)ACL的使用(scheme:id:permission)
如何使用zk的ACL机制来实现客户端对数据节点的访问控制。一个ACL权限设置通常可以分为3部分,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission),最终组成的一条ACL请求信息格式为"scheme:id:permission"。
一.权限模式:Scheme
权限模式就是用来设置zk服务器进行权限验证的方式。zk的权限验证方式大体分为两种类型:一种是范围验证,另外一种是口令验证。但具体来分,则有4种权限模式:
模式一:所谓的范围验证就是zk可针对一个IP或一段IP授予某种权限
比如通过"ip:192.168.0.11"让某机器对服务器的一数据节点具有写入权限,或者也可以通过"ip:192.168.0.11/22"给一段IP地址的机器赋权。
模式二:另一种权限模式就是口令验证,也可以理解为用户名密码的方式
在zk中这种验证方式是Digest认证。即在向客户端传送"username:password"这种形式的权限表示符时,服务端会对密码部分使用SHA-1和BASE64算法进行加密以保证安全。
模式三:还有一种权限模式Super可以认为是一种特殊的Digest认证
具有Super权限的客户端可以对zk上的任意数据节点进行任意操作。
模式四:最后一种授权模式是world模式
其实这种授权模式对应于系统中的所有用户,本质上起不到任何作用,设置了world权限模式系统中的所有用户操作都可以不进行权限验证。
下面代码给出了Digest模式下客户端的调用方式:
create /digest_node1
setAcl /digest_node1 digest:用户名:base64格式密码:rwadc
getAcl /digest_node1
addauth digest user:passwd
二.授权对象:ID
所谓的授权对象就是要把权限赋予谁,对应于4种不同的权限模式来说:
如果使用IP方式,那么授权对象可以是一个IP地址或IP地址段
如果使用Digest或Super方式,那么授权对象对应于一个用户名
如果使用World模式,那么就是授权系统中所有的用户
三.权限信息:Permission
权限就是指可以在数据节点上执行的操作种类,zk定义好的权限有5种:
数据节点(Create)创建权限,可以在该数据节点下创建子节点
数据节点(Wirte)更新权限,可以更新该数据节点
数据节点(Read)读取权限,可以读取该数据节点内容以及子节点信息
数据节点(Delete)删除权限,可以删除该数据节点的子节点
数据节点(Admin)管理者权限,可以对该数据节点体进行ACL权限设置
需要注意的是:每个节点都有维护自身的ACL权限数据,即使是该节点的子节点也有自己的ACL权限而不是直接继承其父节点权限。所以如果客户端只配置"/Config"节点的读取权限,该客户端是没有其子节点的"/Config/dataBase"的读取权限的。
(2)实现自己的权限控制
虽然zk自身的权限控制机制已经做得很细,但是zk还提供了一种权限扩展机制来让用户实现自己的权限控制方式。官方文档对这种机制的定义是"Pluggable ZooKeeper Authenication",意思是可插拔的授权机制,从名称上可看出它的灵活性。
那么这种机制是如何实现的呢?首先,要想实现自定义的权限控制机制,最核心的一点是实现zk提供的权限控制器接口AuthenticationProvider。然后,实现了自定义权限后,如何才能让服务端使用自定义的权限验证方式呢?接下来就需要将自定义的权限控制注册到服务端,而注册的方式有两种:第一种是通过设置系统属性来注册自定义的权限控制器,第二种是在配置文件zoo.cfg中进行配置。
//第一种注册方式
-Dzookeeper.authProvider.x=CustomAuthenticationProvider
//第二种方式
authProvider.x=CustomAuthenticationProvider
(3)ACL内部实现原理之客户端处理过程
下面深入到底层介绍zk是如何实现ACL权限控制机制的,先看一下客户端是如何操作的,以节点授权addAuth接口为例。
步骤一:客户端会通过ClientCnxn的addAuthInfo()方法,向服务端发送请求。
步骤二:addAuthInfo()方法会将scheme和auth封装成AuthPacket对象,并封装一个表示权限操作请求的RequestHeader对象。
步骤三:接着AuthPacket对象和RequestHeader对象会被封装到Packet对象中,最后会将该Packet对象添加到outgoingQueue队列,发送给服务端。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final HostProvider hostProvider;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
public void addAuthInfo(String scheme, byte auth[]) {
cnxn.addAuthInfo(scheme, auth);
}
...
}
public class ClientCnxn {
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
volatile States state = States.NOT_CONNECTED;
...
public void addAuthInfo(String scheme, byte auth[]) {
if (!state.isAlive()) {
return;
}
authInfo.add(new AuthData(scheme, auth));
queuePacket(
new RequestHeader(-4, OpCode.auth), null,
new AuthPacket(0, scheme, auth),
null, null, null, null, null, null
);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
...
}
ACL权限控制机制的客户端实现相对简单,只是封装请求类型为权限请求,方便服务器识别处理,而发送到服务器的信息包括前面提到的权限校验信息。
(4)ACL内部实现原理之服务端实现过程
相比于客户端的处理过程,服务器端对ACL内部实现就比较复杂。
一.当客户端发出的节点ACL授权请求到达服务端后
步骤一:首先调用NIOServerCnxn.readRequest()方法作为服务端处理的入口,而readRequest()方法其内部只是调用processPacket()方法。
public class NIOServerCnxn extends ServerCnxn {
private final ZooKeeperServer zkServer;
private ByteBuffer incomingBuffer = lenBuffer;
...
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
...
}
步骤二:然后在ZooKeeperServer的processPacket()方法的内部,首先反序列化客户端的请求信息并封装到AuthPacket对象中,之后通过ProviderRegistry的getProvider()方法根据scheme判断具体实现类。
以Digest模式为例,该实现类是
DigestAuthenticationProvider,此时就会调用handleAuthentication方法进行权限验证。如果返回KeeperException.Code.OK则表示该请求已经通过了权限验证,如果返回的状态是其他或者抛出异常则表示权限验证失败。
所以权限认证的最终实现方法是handleAuthentication()方法,该方法的工作是解析客户端传递的权限验证类型,并通过addAuthInfo()方法将权限信息添加到authInfo集合中。
其中addAuthInfo()方法的作用是将解析到的权限信息存储到zk服务端的内存中,这些权限信息在整个会话存活期间会一直保存在zk服务端。如果会话关闭,那么权限信息就会被删除,这个特性类似于数据节点中的临时节点。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
...
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
...
}
return;
} else {
...
}
cnxn.incrOutstandingRequests(h);
}
...
}
public class DigestAuthenticationProvider implements AuthenticationProvider {
...
public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
String id = new String(authData);
try {
String digest = generateDigest(id);
if (digest.equals(superDigest)) {
cnxn.addAuthInfo(new Id("super", ""));
}
cnxn.addAuthInfo(new Id(getScheme(), digest));
return KeeperException.Code.OK;
} catch (NoSuchAlgorithmException e) {
LOG.error("Missing algorithm",e);
}
return KeeperException.Code.AUTHFAILED;
}
...
}
public abstract class ServerCnxn implements Stats, Watcher {
protected ArrayList<Id> authInfo = new ArrayList<Id>();
...
public void addAuthInfo(Id id) {
//将权限信息添加到authInfo集合
if (authInfo.contains(id) == false) {
authInfo.add(id);
}
}
}
二.当服务端已将客户端ACL授权请求解析并将对应的会话权限信息存储好后
服务端处理一次请求时,是如何进行权限验证的?
首先通过PrepRequestProcessor中的checkAcl()方法检查对应的请求权限。如果该节点没有任何权限设置则直接返回,如果该节点有权限设置则循环遍历节点的权限信息进行检查,如果具有相应的权限则直接返回表明权限认证成功,否则直接抛出NoAuthException异常表明权限认证失败。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
...
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
...
case OpCode.delete:
...
checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);
...
case OpCode.setData:
...
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
...
...
}
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm, List<Id> ids) {
...
for (ACL a : acl) {
if (authId.getScheme().equals(id.getScheme()) && ap.matches(authId.getId(), id.getId())) {
return;
}
}
throw new KeeperException.NoAuthException();
}
...
}
(5)ACL权限总结
客户端发送ACL权限请求时的处理:首先会封装请求类型,然后将权限信息封装到request中,最后发送request给服务端。
服务器对ACL权限请求的授权处理:首先分析请求类型是否是权限相关操作,然后根据不同的权限模式调用不同的实现类验证权限,最后存储权限信息。
注意:会话的授权信息存储在服务端内存。如果客户端会话关闭,授权信息会被删除。下次连接服务器后,需要重新调用授权接口进行授权;
zk作为分布式系统协调框架,往往在一个分布式系统下起到关键的作用。尤其是在分布式锁、配置管理等应用场景中。如果因错误操作对重要数据节点进行变更或删除,对整个系统影响很大,甚至可能会导致整个分布式服务不可用,所以设计使用zk时一定要考虑对关键节点添加权限控制。
问题:如果一个客户端对服务器上的一个节点设置了只有它自己才能操作的权限,那么等该客户端下线后,对其创建的节点要想进行修改应该怎么做?
可以通过"super模式"删除该节点或变更该节点的权限验证方式,正因为"super模式"有如此大的权限,在平时使用时应更加谨慎。
相关推荐
- F5负载均衡器如何通过irules实现应用的灵活转发?
-
F5是非常强大的商业负载均衡器。除了处理性能强劲,以及高稳定性之外,F5还可以通过irules编写强大灵活的转发规则,实现web业务的灵活应用。irules是基于TCL语法的,每个iRules必须包含...
- 映射域名到NAS
-
前面介绍已经将域名映射到家庭路由器上,现在只需要在路由器上设置一下端口转发即可。假设NAS在内网的IP是192.168.1.100,NAS管理端口2000.你的域名是www.xxx.com,配置外部端...
- 转发(Forward)和重定向(Redirect)的区别
-
转发是服务器行为,重定向是客户端行为。转发(Forward)通过RequestDispatcher对象的forward(HttpServletRequestrequest,HttpServletRe...
- SpringBoot应用中使用拦截器实现路由转发
-
1、背景项目中有一个SpringBoot开发的微服务,经过业务多年的演进,代码已经累积到令人恐怖的规模,亟需重构,将之拆解成多个微服务。该微服务的接口庞大,调用关系非常复杂,且实施重构的人员大部分不是...
- 公司想搭建个网站,网站如何进行域名解析?
-
域名解析是将域名指向网站空间IP,让人们通过注册的域名可以方便地访问到网站的一种服务。IP地址是网络上标识站点的数字地址,为方便记忆,采用域名来代替IP地址标识站点地址。域名解析就是域名到IP地址的转...
- 域名和IP地址什么关系?如何通过域名解析IP?
-
一般情况下,访客通过域名和IP地址都能访问到网站,那么两者之间有什么关系吗?本文中科三方针对域名和IP地址的关系和区别,以及如何实现域名与IP的绑定做下介绍。域名与IP地址之间的关系IP地址是计算机的...
- 分享网站域名301重定向的知识
-
网站域名做301重定向操作时,一般需要由专业的技术来协助完成,如果用户自己在维护,可以按照相应的说明进行操作。好了,下面说说重点,域名301重定向的操作步骤。首先,根据HTTP协议,在客户端向服务器发...
- NAS外网到底安全吗?一文看懂HTTP/HTTPS和SSL证书
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:可爱的小cherry搭好了NAS,但是不懂做好网络加密,那么隐私泄露也会随时发生!大家好,这里是Cherry,喜爱折腾、玩数码,热衷于分享数...
- ForwardEmail免费、开源、加密的邮件转发服务
-
ForwardEmail是一款免费、加密和开源的邮件转发服务,设置简单只需4步即可正常使用,通过测试来看也要比ImprovMX好得多,转发近乎秒到且未进入垃圾箱(仅以Mailbox.org发送、Out...
- 使用CloudFlare进行域名重定向
-
当网站变更域名的时候,经常会使用域名重定向的方式,将老域名指向到新域名,这通常叫做:URL转发(URLFORWARDING),善于使用URL转发,对SEO来说非常有用,因为用这种方式能明确告知搜索引...
- 要将端口5002和5003通过Nginx代理到一个域名上的操作笔记
-
要将端口5002和5003通过Nginx代理到域名www.4rvi.cn的不同路径下,请按照以下步骤配置Nginx:步骤说明创建或编辑Nginx配置文件通常配置文件位于/etc/nginx/sites...
- SEO浅谈:网站域名重定向的三种方式
-
在大多数情况下,我们输入网站访问网站的时候,很难发现www.***.com和***.com的区别,因为一般的网站主,都会把这两个域名指向到同一网站。但是对于网站运营和优化来说,www.***.com和...
- 花生壳出现诊断域名与转发服务器ip不一致的解决办法
-
出现诊断域名与转发服务器ip不一致您可以:1、更改客户端所处主机的drs为223.5.5.5备用dns为119.29.29.29;2、在windows上进入命令提示符输入ipconfig/flush...
- 涨知识了!带你认识什么是域名
-
1、什么是域名从技术角度来看,域名是在Internet上解决IP地址对应的一种方法。一个完整的域名由两个或两个以上部分组成,各部分之间用英文的句号“.”来分隔。如“abc.com”。其中“com”称...
- 域名被跳转到其他网站是怎么回事
-
当你输入域名时被跳转到另一个网站,这可能是由几种原因造成的:一、域名可能配置了域名转发服务。无论何时有人访问域名,比如.com、.top等,都会自动重定向到另一个指定的URL,这通常是在域名注册商设...
你 发表评论:
欢迎- 一周热门
-
-
爱折腾的特斯拉车主必看!手把手教你TESLAMATE的备份和恢复
-
如何在安装前及安装后修改黑群晖的Mac地址和Sn系列号
-
[常用工具] OpenCV_contrib库在windows下编译使用指南
-
WindowsServer2022|配置NTP服务器的命令
-
Ubuntu系统Daphne + Nginx + supervisor部署Django项目
-
WIN11 安装配置 linux 子系统 Ubuntu 图形界面 桌面系统
-
解决Linux终端中“-bash: nano: command not found”问题
-
NBA 2K25虚拟内存不足/爆内存/内存占用100% 一文速解
-
Linux 中的文件描述符是什么?(linux 打开文件表 文件描述符)
-
K3s禁用Service Load Balancer,解决获取浏览器IP不正确问题
-
- 最近发表
- 标签列表
-
- linux 查询端口号 (58)
- docker映射容器目录到宿主机 (66)
- 杀端口 (60)
- yum更换阿里源 (62)
- internet explorer 增强的安全配置已启用 (65)
- linux自动挂载 (56)
- 禁用selinux (55)
- sysv-rc-conf (69)
- ubuntu防火墙状态查看 (64)
- windows server 2022激活密钥 (56)
- 无法与服务器建立安全连接是什么意思 (74)
- 443/80端口被占用怎么解决 (56)
- ping无法访问目标主机怎么解决 (58)
- fdatasync (59)
- 405 not allowed (56)
- 免备案虚拟主机zxhost (55)
- linux根据pid查看进程 (60)
- dhcp工具 (62)
- mysql 1045 (57)
- 宝塔远程工具 (56)
- ssh服务器拒绝了密码 请再试一次 (56)
- ubuntu卸载docker (56)
- linux查看nginx状态 (63)
- tomcat 乱码 (76)
- 2008r2激活序列号 (65)