13- enable the network listening connection of the server NIOServerCnxnFactory
13.1 introduction
Go back to the start() method of QuorumPeer, and start network interaction after data recovery
startServerCnxnFactory();
Read on:
private void startServerCnxnFactory() { if (cnxnFactory != null) { cnxnFactory.start(); } if (secureCnxnFactory != null) { secureCnxnFactory.start(); } }
In the runFromConfig method in the QuorumPeerMain type
Call servercnxnfactory createFactory(); Method to create a connection factory. In the create factory object method, judge the JVM parameter zookeeper Whether the servercnxnfactory factory type configuration parameter exists. If it does not exist, the NIOServerCnxnFactory type will be defaulted
The code for creating the connection object is as follows:
Call in runFromConfig method of QuorumPeerMain type
if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); }
createFactory of ServerCnxnFactory creates objects based on parameter types
public static ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor() .newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e); throw ioe; } }
The connection factory startup system provides properties to specify that the connection factory object will be used by default
Nioservercnxnfactory - if the NIO tool provided with JDK specifies the property zookeeper The NIO tool types of servercnxnfactory, such as NettyServerCnxnFactory -Netty, will load the corresponding types during the above initialization, and enable the corresponding port to provide query functions according to the clientPort configured by our client.
This communication implementation first looks at what others say
As a server, ZooKeeper naturally needs to communicate with the client through the network. How to communicate with the client efficiently? In ZooKeeper, ServerCnxnFactory is used to manage the connection with the client, which has two implementations,
- One is NIOServerCnxnFactory, which is implemented using Java Native NIO;
- One is NettyServerCnxnFactory, which is implemented by netty;
Use ServerCnxn to represent the connection between a client and a server
ServerCnxnFactory
Note: the connections below or in the notes are TCP connections initiated by the client, that is, SocketChannel classes
ZooKeeper can set the system property ZooKeeper ServerCnxnFactory configures the implementation class of ServerCnxnFactory. NIOServerCnxnFactory is used by default
NIOServerCnxnFactory
13.2 main sub Reactor network IO model
The general idea of using Java NIO is to use one thread group to listen to OP_ACCEPT event, which is responsible for handling the connection of the client; Use 1 thread group to listen to the op connected by the client_ Read and OP_WRITE event to handle IO event (netty is implemented in this way)
However, ZooKeeper does not divide thread functions in this way. NIOServerCnxnFactory will start four types of threads when it is started
- accept thread: this thread receives the connection from the client and assigns it to the selector thread (start a thread)
- selector thread: the thread executes select(). Since select() will become a performance bottleneck when processing a large number of connections, start multiple selector threads and use the system attribute zookeeper nio. Numselectorthreads configure the number of such threads. The default number is the number of cores /2 ‾‾‾‾‾‾‾√ number of cores /2 (at least one)
- Worker thread: this thread performs basic socket read / write, using the system property zookeeper nio. Numworkerthreads configures the number of threads of this type. The default is the number of cores * 2 and the number of cores * 2 If the number of such threads is 0, another thread will be started for IO processing. See the introduction to worker thread below
- connection expiration thread: close the connection if the session on the connection has expired
It can be seen that the work that the thread needs to handle has been split more carefully in ZooKeeper It thinks that when there are a large number of client connections, the selector Select () will become a performance bottleneck, so it will select Select() is split and handled by selector thread
Inter thread communication
The above threads communicate with each other through synchronous queues In this section, let's look at which synchronization queues are used for various thread communications? What is the use of each
- SelectorThread.acceptedQueue
acceptedQueue is of type LinkedBlockingQueue, which is in the selector thread It contains the client connections received by the accept thread. The selector thread is responsible for registering the client connections to the selector and listening to the OP_READ and OP_WRITE. - SelectorThread.updateQueue
Like acceptedQueue, updateQueue is also of LinkedBlockingQueue type in selector thread However, to understand the role of the queue, you need to have a good understanding of the implementation of Java NIO
Java NIO uses epoll system calls and is triggered horizontally, that is, if the selector Select() finds that there are events in the socketChannel, such as readable data. As long as these data are not read from the socketChannel, the next selector Select() will still detect an event until the data is read
ZooKeeper always thinks that selector Select () is a performance bottleneck. To improve the performance of the selector The performance of select() avoids the defects of the above horizontal trigger mode. During IO processing, ZooKeeper will make the socketChannel no longer listen to OP_READ and OP_WRITE event, which can lighten the selector The burden of select()
At this point, a problem arises. After the IO is processed, how can the socketChannel listen to the op again_ Read and OP_WRITE event?
Some small partners may think this is very easy. After the worker thread processes the IO, it directly calls the key Interestops (OP_READ & OP_WRITE) is OK? It is not as simple as this because the selector Select() is executed in the selector thread. If it is executed in the selector During the process of select (), the worker thread called the key Interestops (OP_READ & OP_WRITE) may block the selector select(). Zookeeper is designed to call key by selector thread in order to pursue the ultimate performance Interestops (OP_READ & OP_WRITE), so the worker thread needs to tell the selector thread that the socketChannel can listen to op after IO processing_ Read and op_ Write event,
updateQueue stores the OPS that need to listen_ Read and op_ Of write events - socketChannel.NIOServerCnxn.outgoingBuffers
outgoingBuffers store the response data to be sent to the client
Note: I guess since key Interestops (op_read & op_write) will block the selector Select(), then accepted Register (selector, selectionkey.op_read) also blocks the selector Select (), so the received client connection registered to the selector must also be executed on the selector thread. This is also the reason why acceptedQueue exists
After understanding the thread IO model, let's take a look at the startup source code:
NIOServerCnxnFactory configuration method, which will be called when the configuration information is loaded before Zookeeper startup:
13.3 initialization and configuration method of nioservercnxnfactory
@Override public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin(); maxClientCnxns = maxcc; initMaxCnxns(); sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); String logMsg = "Configuring NIO connection handler with " + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers.")); LOG.info(logMsg); for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } listenBacklog = backlog; //Create socket object and get file descriptor this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (listenBacklog == -1) { ss.socket().bind(addr); } else { ss.socket().bind(addr, listenBacklog); } ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } ## 13.4 startup method of nioservercnxnfactory @Override public void start() { stopped = false; //Start worker thread if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } //Start selector thread for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } //Start the accept thread // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } //Start expiration processing thread if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
13.5 AcceptThread
13.5.1 AcceptThread type source code
The source code of accept thread is as follows: first, take a global view:
private class AcceptThread extends AbstractSelectThread { private final ServerSocketChannel acceptSocket; private final SelectionKey acceptKey; private final RateLogger acceptErrorLogger = new RateLogger(LOG); private final Collection<SelectorThread> selectorThreads; private Iterator<SelectorThread> selectorIterator; private volatile boolean reconfiguring = false; public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; //Register receive events with the channel this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //If it is not closed, the select method is executed circularly select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } private void select() { try { //Blocking until at least one channel is ready for the event you registered. selector.select(); //Once the select() method is called and the return value is not 0, you can access the selected key collection by calling the selectedKeys() method of the Selector Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } //Tests whether the channel for this key is ready to accept new socket connections. if (key.isAcceptable()) { if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { // The accept() method listens for new incoming connections. When the accept() method returns, it returns a SocketChannel containing the new incoming connection. Therefore, the accept() method blocks until a new connection arrives sc = acceptSocket.accept(); accepted = true; //If the current number of connections exceeds the configured maximum number of connections, new connections will be rejected if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } //Get the address of the current connection InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); //The number of single client links exceeds the maximum if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); //You can set SocketChannel to non blocking mode After setting, you can call connect(), read(), and write() in asynchronous mode. sc.configureBlocking(false); // Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } //Get current Selector thread SelectorThread selectorThread = selectorIterator.next(); //Call the receive request method * of the selected thread to place the newly accepted connection in the queue waiting to be added. if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } }
pauseAccept pause reception
//If the new connection cannot be pulled from the accept queue, the accept is paused to give us time to release the file descriptor so that the accept thread does not rotate in a tight loop. private void pauseAccept(long millisecs) { acceptKey.interestOps(0); try { selector.select(millisecs); } catch (IOException e) { // ignore } finally { acceptKey.interestOps(SelectionKey.OP_ACCEPT); } }
13.6 SelectorThread
13.6.1 source code of selectorthread type
SelectorThread
The SelectorThread receives the newly received connection from the AcceptThread and is responsible for selecting the I/O readiness between the connections. This thread is the only one that performs non thread safe or potentially blocking calls to selectors (register new connections and read / write interest operations). Assigning a connection to a SelectorThread is permanent, and only one SelectorThread will interact with the connection. There are 1-N SelectorThreads, and the connections are evenly distributed between SelectorThreads.
If there is a worker thread pool, when a connection has I/O to execute, the SelectorThread removes it from the selection by clearing the operations it is interested in, and arranges the I/O to be processed by the worker thread. When the work is completed, the connection is placed on the ready queue to resume its operations of interest and restore the selection. If there is no worker thread pool, SelectorThread will directly perform I/O operations.
class SelectorThread extends AbstractSelectThread { private final int id; private final Queue<SocketChannel> acceptedQueue; private final Queue<SelectionKey> updateQueue; public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); } /** * Place new accepted connection onto a queue for adding. Do this * so only the selector thread modifies what keys are registered * with the selector. Place the newly accepted connection on the queue to be added. Thus, only the selector thread modifies the keys registered with the selector. */ public boolean addAcceptedConnection(SocketChannel accepted) { if (stopped || !acceptedQueue.offer(accepted)) { return false; } //wakeupSelector method in the parent type AbstractSelectThread of the wake-up selector call wakeupSelector(); return true; } /** * Place interest op update requests onto a queue so that only the * selector thread modifies interest ops, because interest ops * reads/sets are potentially blocking operations if other select * operations are happening. */ public boolean addInterestOpsUpdateRequest(SelectionKey sk) { if (stopped || !updateQueue.offer(sk)) { return false; } wakeupSelector(); return true; } /** * The main loop for the thread selects() on the connections and * dispatches ready I/O work requests, then registers all pending * newly accepted connections and updates any interest ops on the * queue. The main loop of the thread selects () on the connection and dispatches the prepared I/O work requests, then registers all waiting newly accepted connections and updates any interested operations on the queue. */ public void run() { try { while (!stopped) { try { select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void select() { try { //Select a set of keys whose corresponding channel is ready for I/O operation. selector.select(); //Returns the selected keyset of the selector. Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); if (!key.isValid()) { cleanupSelectionKey(key); continue; } if (key.isReadable() || key.isWritable()) { //Schedule I/O to process the connection associated with the given SelectionKey. If a worker thread pool is not used, I/O will be run directly by this thread. handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } /** * Schedule I/O for processing on the connection associated with * the given SelectionKey. If a worker thread pool is not being used, * I/O is run directly by this thread. Schedule I/O to process the connection associated with the given SelectionKey. If a worker thread pool is not used, I/O will be run directly by this thread. */ private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // Connection stop selecting this key when processing its connection cnxn.disableSelectable(); //Clear interest groups key.interestOps(0); //Refresh connection Session timeout touchCnxn(cnxn); //Execute IO thread to trigger IO read / write workerPool.schedule(workRequest); } /** * Iterate over the queue of accepted connections that have been * assigned to this thread but not yet placed on the selector. */ private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } /** * Iterate over the queue of connections ready to resume selection, * and restore their interest ops selection mask. */ private void processInterestOpsUpdateRequests() { SelectionKey key; while (!stopped && (key = updateQueue.poll()) != null) { if (!key.isValid()) { cleanupSelectionKey(key); } NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { key.interestOps(cnxn.getInterestOps()); } } } }
For SelectorThread, we'll look at three operations in total. These three operations make an infinite loop through while. When the stop variable is set to true, the loop is terminated,
In the while infinite loop, the main loop of the thread selects () on the connection and dispatches the prepared I/O work requests, then registers all waiting newly accepted connections and updates any interested operations on the queue.
- select();
Select () on the connection and dispatch the prepared I/O work request - processAcceptedConnections();
Handle the connection newly assigned by the accept thread, / / (1) register the new connection with the selector; (2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn - processInterestOpsUpdateRequests();
Update listening events of connections in updateQueue
13.6.2 processing accepted connection requests processAcceptedConnections
Next, let's take a detailed look at how processAcceptedConnections handles the connections that can be received:
private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { //Register OP for SocketChannel_ The read event is used to receive read requests key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); //Attaches the given object to this key. This object is used to process connections and read IO data key.attach(cnxn); //Cache the connections of the same IP client into the ipMap object in the NIOServerCnxnFactory type to limit the number of connections of the same client. If the number of connections of the same client is too large, a Too many connections error will be thrown and the accept connection will be rejected addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } }
Register OP for SocketChannel_ The read event is used to start creating the connection object after receiving the read request, as follows:
Create a connection as follows:
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException { return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); }
13.6.2.1 take a look at the constructor of NIOServerCnxn
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException { super(zk); this.sock = sock; this.sk = sk; this.factory = factory; this.selectorThread = selectorThread; if (this.factory.login != null) { this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } //Turn off Nagle algorithm, and no cache delay is required for sending sock.socket().setTcpNoDelay(true); /* set socket ling SO_LINGER Another function is to reduce TIME_WAIT Number of sockets. In settings SO_LINGER Option, the specified waiting time is 0. In this case, active shutdown will not be sent FIN To end the connection, but directly set the connection to CLOSE Status, clear the send and receive buffers in the socket, and send directly to the opposite end RST package //The first parameter is whether to enable SoLinger, and the second parameter is the duration if SoLinger is enabled sock.socket().setSoLinger(false, -1); InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress(); //Cache remote ip address addAuthInfo(new Id("ip", addr.getHostAddress())); this.sessionTimeout = factory.sessionlessCnxnTimeout; }
13.6.3 update the listening event processInterestOpsUpdateRequests connected in updateQueue
processInterestOpsUpdateRequests() method:
As we said earlier, the subscription event will be stopped when the IO event is processed. After the IO event is processed, the listening event connected in the updateQueue will be obtained to subscribe to the interestOps
private void processInterestOpsUpdateRequests() { SelectionKey key; while (!stopped && (key = updateQueue.poll()) != null) { if (!key.isValid()) { cleanupSelectionKey(key); } NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { key.interestOps(cnxn.getInterestOps()); } } } }
13.6.4 handling IO events
13.6.4.1 IOWorkRequest
IOWorkRequest handles the occurrence time of IO events. When there is data readable on SocketChannel, worker thread calls nioservercnxn Doio() for read operation
Sticking and unpacking
The troublesome problem in handling read events is that packets sent through TCP will be stuck and unpacked. In order to solve this problem, Zookeeper divides the packets into three parts when designing the communication protocol:
- Length of request header and body (4 bytes)
- Request header
- Request body
Note: (1) the request header and request body are also subdivided into smaller parts, but no further research is done here. Just know that the first four bytes of the request are the length of the request header and request body (2) The request header and body are called payload
A length field of 4 bytes is added to the message header to indicate the length of the whole message except the length field The server can separate or combine the glued and unpacked messages into complete messages according to the length NIOServerCnxn data reading process is as follows:
NIOServerCnxn has two attributes. One is lenBuffer, with a capacity of 4 bytes, which is used to read length information One is incomingBuffer, which is called lenBuffer during initialization. However, after reading the length information, allocate the corresponding space for incomingBuffer to read the payload
Allocate the incomingBuffer size according to the length of the request message
Store the read bytes in the incomingBuffer until they are full (since the length allocated to the incomingBuffer in step 2 is just the length of the message, there is just one message in the incomingBuffer at this time)
Processing message
private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { //IO operation if readable or writable cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } } @Override public void cleanup() { cnxn.close(ServerCnxn.DisconnectReason.CLEAN_UP); } }
13.6.4.2 doIO of nioservercnxn
You can refer to this blog:
https://blog.csdn.net/jpf254/article/details/80792086
//Process IO void doIO(SelectionKey k) throws InterruptedException { try { if (!isSocketOpen()) { LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId)); return; } if (k.isReadable()) { Reads a sequence of bytes from this channel to the given buffer. //If it is a client request, a read event is triggered at this time //During initialization, incomingBuffer is a real-time lengthBuffer. Only 4 bytes are allocated for the user to read an int (this int value is the total length of the request message) int rc = sock.read(incomingBuffer); if (rc < 0) { handleFailedRead(); } //Returns the number of elements between the current position and the limit /* only incomingBuffer.remaining() == 0,Before proceeding to the next step,Otherwise, the data is read until incomingBuffer Read full,There are two possibilities: 1.incomingBuffer namely lenBuffer,here incomingBuffer The content of is the length of this request message. according to lenBuffer by incomingBuffer Called after allocating space readPayload(). stay readPayload()A data reading will be carried out immediately,(1)If you can incomingBuffer Read full,be incomingBuffer Is a complete request,Process the request; (2)If not incomingBuffer Read full,Description unpacking problem,A complete request cannot be constructed at this time,Can only wait for the client to continue sending data,Wait till next time socketChannel When readable,Continue reading data to incomingBuffer in 2.incomingBuffer no lenBuffer,Description unpacking occurred during the last read,incomingBuffer There is only one part of the requested data in the. The data read this time plus the data read last time make up a complete request,call readPayload() if (incomingBuffer.remaining() == 0) { boolean isPayload; //One is lenBuffer, with a capacity of 4 bytes, which is used to read length information One is incomingBuffer, which is called lenBuffer during initialization. However, after reading the length information, allocate the corresponding space for incomingBuffer to read the payload if (incomingBuffer == lenBuffer) { // start of next request //Switch to read ready state incomingBuffer.flip(); //Read the next four bytes at the current position of this buffer, combine them into an int value according to the current byte order, and then increase the position by 4 //Here, it is false only when the transmitted data is a four word command. You do not need to read the data content later. The result is returned directly when it is false isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } //Read the details if it is not a four word command if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else //If the four word command request has been processed previously, it will be returned directly return; } } } if (k.isWritable()) { handleWrite(k); if (!initialized && !getReadInterest() && !getWriteInterest()) { throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId)); LOG.debug("CancelledKeyException stack trace", e); close(DisconnectReason.CANCELLED_KEY_EXCEPTION); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("Unexpected exception", e); // expecting close to log session closure close(e.getReason()); } catch (ClientCnxnLimitException e) { // Common case exception, print at debug level ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.CLIENT_CNX_LIMIT); } catch (IOException e) { LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e); close(DisconnectReason.IO_EXCEPTION); } }
/** * This method is called in two cases: * 1.After allocating space for incomingBuffer according to the value of lengthBuffer, the data has not been read from socketChannel to incomingBuffer at this time * 2.The data has been read from socketChannel to incomingBuffer, and the reading is completed * <p> * Read the request payload (everything following the length prefix) */ private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? //Corresponding to case 1, at this time, the incomingBuffer has just been allocated space, and the incomingBuffer is empty. Read the data once //(1) If the incomingBuffer is read to full, it will be processed directly; //(2) If the incomingBuffer is not read to full, it means that the data sent this time cannot constitute a complete request. Wait for the next time the data arrives and call doIo() to send the data again //Read from socketChannel to incomingBuffer int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { handleFailedRead(); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); //No matter in case 1 or case 2, the incomingBuffer is full at this time, and the content must be a request. Process the request //Update statistics packetReceived(4 + incomingBuffer.remaining()); if (!initialized) { /Processing connection requests readConnectRequest(); } else { //Processing normal requests readRequest(); } //The request processing ends, and the lenBuffer and incomingBuffer are reset lenBuffer.clear(); incomingBuffer = lenBuffer; } }
//Read connection data private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true; }
//Read package data private void readRequest() throws IOException { zkServer.processPacket(this, incomingBuffer); }
ZooKeeperServer processing connection request: processConnectRequest
You can refer to the article:
- https://www.cnblogs.com/Benjious/p/11462064.html
session: - https://my.oschina.net/anxiaole/blog/3217373
- https://segmentfault.com/a/1190000022193168
13.6.4.3 processing connection requestprocessconnectrequest
The calling code is as follows:
zkServer.processConnectRequest(this, incomingBuffer);
The parsing code is as follows: @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); // ConnectRequest connReq = new ConnectRequest(); //Deserialize connection information connReq.deserialize(bia, "connect"); LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), Long.toHexString(connReq.getLastZxidSeen())); long sessionId = connReq.getSessionId(); int tokensNeeded = 1; //Whether to consider connection weight during throttling. The configuration parameter is zookeeper connection_ throttle_ weight_ enabled The default value is false if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { if (localSessionEnabled) { tokensNeeded = connThrottle.getRequiredTokensForLocal(); } else { tokensNeeded = connThrottle.getRequiredTokensForGlobal(); } } else { tokensNeeded = connThrottle.getRequiredTokensForRenew(); } } //token bucket current limiting algorithm, whether there are available tokens if (!connThrottle.checkLimit(tokensNeeded)) { throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); boolean readOnly = false; try { //Read only readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readOnly field LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } //Currently it is a read-only object, and the data is not read-only if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } //When the visible zxid of the client is larger than the latest zxid of the server, the processing is refused. This may occur after the snapshot and transaction log files of the server are deleted and the zookeeper is restarted, resulting in the smaller zxid of the server or brain fracture. The following errors will occur when the client accesses the zookeeper under different partitions if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } //Initialize session timeout parameter int sessionTimeout = connReq.getTimeOut(); byte[] passwd = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); if (sessionId == 0) { //Create sessionid long id = createSession(cnxn, passwd, sessionTimeout); LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), Long.toHexString(connReq.getLastZxidSeen()), connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { long clientSessionId = connReq.getSessionId(); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(clientSessionId), Long.toHexString(connReq.getLastZxidSeen()), connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } if (secureServerCnxnFactory != null) { secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); } } #### Create session and submit session createSession //Create session code long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { if (passwd == null) { // Possible since it's just deserialized from a packet on the wire. passwd = new byte[0]; } long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); //f encapsulates the session information request submission request submitRequest(si); return sessionId; }
Zookeeper Server usage SessionTrackerImpl To create sessionid,sessionid Increment 1 each time public long createSession(int sessionTimeout) { long sessionId = nextSessionId.getAndIncrement(); trackSession(sessionId, sessionTimeout); return sessionId; }
Initialized here nextSessionId Yes: public static long initializeNextSessionId(long id) { long nextSid; nextSid = (Time.currentElapsedTime() << 24) >>> 8; nextSid = nextSid | (id << 56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; }
13.6.4.4 tracking session expiration time
//Track the expiration time of the session @Override public synchronized boolean trackSession(long id, int sessionTimeout) { boolean added = false; SessionImpl session = sessionsById.get(id); if (session == null) { session = new SessionImpl(id, sessionTimeout); } // findbugs2.0.3 complains about get after put. // long term strategy would be use computeIfAbsent after JDK 1.8 //Store the mapping relationship between sessionid and session object into the local sessionsById Map cache object SessionImpl existedSession = sessionsById.putIfAbsent(id, session); if (existedSession != null) { session = existedSession; } else { added = true; LOG.debug("Adding session 0x{}", Long.toHexString(id)); } if (LOG.isTraceEnabled()) { String actionStr = added ? "Adding" : "Existing"; ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- " + actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout); } updateSessionExpiry(session, sessionTimeout); return added; }
//Save the expiration time of the corresponding session object into the sessionExpiryQueue private void updateSessionExpiry(SessionImpl s, int timeout) { logTraceTouchSession(s.sessionId, timeout, ""); sessionExpiryQueue.update(s, timeout); } sessionExpiryQueued of update Method to tickTime Put expiration time into bucket collection by unit public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); Long newExpiryTime = roundToNextInterval(now + timeout); if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } // First add the elem to the new expiry time bucket in expiryMap. // expiryMap is the Map of the set of expiration time mapping elements, key is the unit expiration time, and value is the set corresponding to the current expiration time Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } //Save to the set corresponding to the current expiration time set.add(elem); // Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. // prevExpiryTime = elemMap.put(elem, newExpiryTime); //If the element corresponding to the previous expiration time exists, remove the old data to ensure normal data refresh if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; }
13.6.4.5 submit session
ZookeeperServer Submit session request submitRequest public void submitRequest(Request si) { enqueueRequest(si); } public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } requestThrottler.submitRequest(si); } public void submitRequest(Request request) { if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { submittedRequests.add(request); } }
submittedRequests is a request queue of type LinkedBlockingQueue
How are the requests placed in the queue handled next
Zookeeper uses the queue + asynchronous model: the request chain is as follows:
- Submit request: requestthrottle run()>Zookeeper. submitRequestNow(Request si)>
- Pre processing request: preprequestprocessor processRequest(Request request)
- Request persistence: syncreuqestprocessor run
- Business to process requests: finalrequestprocessor processTxn
13.6.4.6 submission request processing
See the specific request details later. Here we have seen the readConnectRequest method in ZookeeperServer
Next, let's take a look at how Zookeeper handles degree requests
Read request processing of Zookeeper readRequest() in NIOServerCnxn class
The processPacket method in ZookeeperServer was called
private void readRequest() throws IOException { zkServer.processPacket(this, incomingBuffer); }
How to process data packets? See the following code:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next //Deserializing objects with just InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn generate a new read-only Buffer from the current position to be read incomingBuffer = incomingBuffer.slice(); //If the current request is an authentication authorization request if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. Process client authentication authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } //If the authentication is successful, a message of successful authentication will be returned if (authReturn == KeeperException.Code.OK){ LOG.debug("Authentication succeeded for scheme: {}", scheme); LOG.info("auth success {}", cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { //Authentication failure returns the message of authentication failure and closes the connection at the same time if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else if (h.getType() == OpCode.sasl) { //Handle sasl certification processSasl(incomingBuffer, cnxn, h); } else { //Process request if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { //If it is an unauthenticated request, close the connection directly ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue()); cnxn.sendResponse(replyHeader, null, "response"); cnxn.sendCloseSession(); cnxn.disableRecv(); } else { //Process request Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); int length = incomingBuffer.limit(); //Check whether it is a large request by using the parameter zookeeper Largerequestthreshold configuration, if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected If it is a large number of transfers, judge the maximum request byte size to prevent JVM Heap memory overflow,The default size is 100 KB Pass parameter zookeeper.largeRequestMaxBytes to configure checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); //Submit a package request. The processing of this request is the same as that of the connection request. See the request code for details submitRequest(si); } } }
In run() of accept thread, it executes selector Select() and call doAccept() to receive the client connection and add it to the selectorthread acceptedQueue()
selector thread @Override public void run() { try { while (!stopped) { try { //1. call select() to read the ready IO events, which are handled by the worker thread select(); //2. handle the connection newly dispatched by the accept thread, // (1) Register the new connection to the selector; (2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn processAcceptedConnections(); //3. update listening events connected in updateQueue processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } //Perform a cleanup operation to close all connections waiting on the selector ... } finally { ... //Clean up work } }
In the run() of the selector thread, three things are mainly executed
Call select() to read the ready IO events and hand them over to the worker thread for processing (key.interestops (0) will be called before handing them over to the worker thread for processing)
Handle the newly dispatched connection of the accept thread,
(1) Register the new connection to the selector;
(2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn
Update listening events of connections in updateQueue
worker thread
ZooKeeper manages a group of worker thread threads through WorkerService, which has two management modes:
Schema name | explain | **Usage scenarios* | realization |
---|---|---|---|
Thread mode can be specified | Assign tasks to be completed by a thread. If a series of tasks need to be completed in order, you can use this mode to assign tasks to be completed in order to the same thread | A series of requests in the same session | Generate N executorservices, and each ExecutorService contains only one thread |
Thread mode cannot be specified | After a task is submitted, the WorkerService randomly assigns a thread to complete it. If there is no sequence requirement between tasks, this mode is used | Perform network IO | Generate 1 ExecutorService with N threads |
Since there is no sequence requirement between the network IO tasks of each connection, the WorkerService used by NIOServerCnxnFactory adopts the non assignable thread mode
/** * Schedule work to be done by the thread assigned to this id. Thread * assignment is a single mod operation on the number of threads. If a * worker thread pool is not being used, work is done directly by * this thread. * Allocate workRequest to corresponding thread according to id module If the worker thread is not used * (That is, numWorkerThreads=0), then start the ScheduledWorkRequest thread to complete the task. Currently * The thread blocks until the task is completed * * @param workRequest Pending IO requests * @param id Select which thread to use to process the workRequest based on this value */ public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; // otherwise, do the work directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.start(); try { scheduledWorkRequest.join(); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); Thread.currentThread().interrupt(); } } }
When introducing worker threads above, it was said that "if the number of such threads is 0, the selector thread is used to directly execute IO read / write". However, from the above source code, it can be seen that if the number of worker threads is 0, a thread is started for each network IO to execute, and the main thread is blocked until the network IO is executed. This is a waste of resources. Since it is necessary to block until the network IO is executed, why should a thread be started separately? I think it may be legacy code or preparation for future expansion, so there is such unreasonable code Therefore, the number of worker threads must not be set to 0
Let's continue to see how ScheduledWorkRequest handles network IO
@Override public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } } @Override public void doWork() throws InterruptedException { //If the Channel has been closed, clear the SelectionKey if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //1. if it is readable or writable, call NIOServerCnxn Doio() method to notify NIOServerCnxn connection object to read, write and process IO if (key.isReadable() || key.isWritable()) { //Call doIO() of NIOServerCnxn to complete IO processing cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection //Close the connection if it has been shut down if (stopped) { cnxn.close(); return; } //If the Channel has been closed, clear the SelectionKey if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //2. update the expiration time of this session touchCnxn(cnxn); } //3. read / write processing has been completed. Re mark the connection as ready for new select event listening cnxn.enableSelectable(); //Put the connection back into the updateQueue of the selectThread. The selectThread will update the registered listening events of this Channel after processing the read / write and new connections of all channels if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(); } }
Remove some robust code and accomplish three main things:
NIOServerCnxn.doIO() method to notify nioservercnxn connection object to read, write and process IO
Update the expiration time of this connection
The network IO has been processed. Modify the selectable flag bit and add the socketChannel to the updateQueue of the selector thread. Its functions have been described above
When the selector thread processes the connection received by the accept thread, in addition to registering the new connection to the selector, it also packages the connection as NIOServerCnxn and registers it in the NIOServerCnxnFactory NIOServerCnxn encapsulates the client connection. NIOServerCnxn is called in the worker thread Doio() handles network IO See NIOServerCnxn of ZooKeeper client connection ServerCnxn for details
13.7 ConnectionExpirerThread
This thread is used to clean up expired connections. The main methods are as follows:
@Override public void run() { try { while (!stopped) { long waitTime = cnxnExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) { conn.close(); } } } catch (InterruptedException e) { LOG.info("ConnnectionExpirerThread interrupted"); } }
The working principle of this thread is detailed in Zookeeper connection and session expiration cleanup policy (ExpiryQueue)
13.8 NettyServerCnxnFactory
The NIO mode connectors are described in detail above. The following can compare the differences between the two:
NettyServerCnxnFactory uses netty for network IO, but it uses netty3* Version, and 4* Although the implementation ideas of the version are the same, the API s are very different. Therefore, we will not further study NettyServerCnxnFactory, but briefly introduce its differences from NIOServerCnxnFactory
difference | NIO | Netty |
---|---|---|
accept event | Start 1 accept thread | The boss group handles the accept event. One thread is started by default |
select() | Start select thread | When adding a handler, call addLast(EventExecutorGroup, ChannelHandler...), and the handler will process IO events in the EventExecutorGroup |
Network IO | Start worker thread | Start the work group to process network IO. By default, start the number of cores * 2 cores * 2 threads |
Handling read events | Call nioservercnxn in worker thread Doio() processing | Handling read events in handler |
Sticking and unpacking | The code is very complex to solve this problem through lenBuffer and incomingBuffer | Insert the handler that handles sticking and unpacking |
Handling write events | Execute finalrp The processrequest() thread and the worker thread pass nioservercnxn Outgoingbuffers for communication, written in batch by worker thread | netty naturally supports asynchronous write. If the current thread is an EventLoop thread, the data to be written will be stored in the ChannelOutboundBuffer If the current thread is not an EventLoop thread, the construct write task is added to the EventLoop task queue |
Direct memory | Direct memory using ThreadLocal | I don't know how to use direct memory in netty, but netty supports direct memory and is easy to use |
Process connection close | Start connection expiration thread to manage connections | Handling connections in handler |
Note: the above difference is that netty4 Compared with NIOServerCnxnFactory, because ZooKeeper uses netty3, Therefore, there are some useless codes in its NettyServerCnxnFactory, such as the code for handling sticking and unpacking
From the above comparison, we can see that it is much more convenient to use netty to process network IO than to code based on Java NIO. Netty is a great method~~
summary
Summarize the three queues used for thread communication:
- SelectorThread.acceptedQueue:accept thread communicates with selector thread
- SelectorThread.updateQueue:worker thread communicates with selector thread
- NIOServerCnxn.outgoingBuffers:worker thread communicates with request processing thread