[Zookeeper-3.6.2 source code analysis series] -13 analysis of the principle of the Reactor network model used by zookeeper

13- enable the network listening connection of the server NIOServerCnxnFactory

13.1 introduction

Go back to the start() method of QuorumPeer, and start network interaction after data recovery
startServerCnxnFactory();

Read on:

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start();
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

In the runFromConfig method in the QuorumPeerMain type
Call servercnxnfactory createFactory(); Method to create a connection factory. In the create factory object method, judge the JVM parameter zookeeper Whether the servercnxnfactory factory type configuration parameter exists. If it does not exist, the NIOServerCnxnFactory type will be defaulted

The code for creating the connection object is as follows:
Call in runFromConfig method of QuorumPeerMain type

 if (config.getClientPortAddress() != null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }

            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }

createFactory of ServerCnxnFactory creates objects based on parameter types

 public static ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                                           .getDeclaredConstructor()
                                                                           .newInstance();
            LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
            return serverCnxnFactory;
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
            throw ioe;
        }
    }

The connection factory startup system provides properties to specify that the connection factory object will be used by default
Nioservercnxnfactory - if the NIO tool provided with JDK specifies the property zookeeper The NIO tool types of servercnxnfactory, such as NettyServerCnxnFactory -Netty, will load the corresponding types during the above initialization, and enable the corresponding port to provide query functions according to the clientPort configured by our client.

This communication implementation first looks at what others say
As a server, ZooKeeper naturally needs to communicate with the client through the network. How to communicate with the client efficiently? In ZooKeeper, ServerCnxnFactory is used to manage the connection with the client, which has two implementations,

  • One is NIOServerCnxnFactory, which is implemented using Java Native NIO;
  • One is NettyServerCnxnFactory, which is implemented by netty;

Use ServerCnxn to represent the connection between a client and a server

ServerCnxnFactory
Note: the connections below or in the notes are TCP connections initiated by the client, that is, SocketChannel classes
ZooKeeper can set the system property ZooKeeper ServerCnxnFactory configures the implementation class of ServerCnxnFactory. NIOServerCnxnFactory is used by default
NIOServerCnxnFactory

13.2 main sub Reactor network IO model


The general idea of using Java NIO is to use one thread group to listen to OP_ACCEPT event, which is responsible for handling the connection of the client; Use 1 thread group to listen to the op connected by the client_ Read and OP_WRITE event to handle IO event (netty is implemented in this way)
However, ZooKeeper does not divide thread functions in this way. NIOServerCnxnFactory will start four types of threads when it is started

  • accept thread: this thread receives the connection from the client and assigns it to the selector thread (start a thread)
  • selector thread: the thread executes select(). Since select() will become a performance bottleneck when processing a large number of connections, start multiple selector threads and use the system attribute zookeeper nio. Numselectorthreads configure the number of such threads. The default number is the number of cores /2 ‾‾‾‾‾‾‾√ number of cores /2 (at least one)
  • Worker thread: this thread performs basic socket read / write, using the system property zookeeper nio. Numworkerthreads configures the number of threads of this type. The default is the number of cores * 2 and the number of cores * 2 If the number of such threads is 0, another thread will be started for IO processing. See the introduction to worker thread below
  • connection expiration thread: close the connection if the session on the connection has expired

It can be seen that the work that the thread needs to handle has been split more carefully in ZooKeeper It thinks that when there are a large number of client connections, the selector Select () will become a performance bottleneck, so it will select Select() is split and handled by selector thread
Inter thread communication

The above threads communicate with each other through synchronous queues In this section, let's look at which synchronization queues are used for various thread communications? What is the use of each

  • SelectorThread.acceptedQueue
    acceptedQueue is of type LinkedBlockingQueue, which is in the selector thread It contains the client connections received by the accept thread. The selector thread is responsible for registering the client connections to the selector and listening to the OP_READ and OP_WRITE.
  • SelectorThread.updateQueue
    Like acceptedQueue, updateQueue is also of LinkedBlockingQueue type in selector thread However, to understand the role of the queue, you need to have a good understanding of the implementation of Java NIO
    Java NIO uses epoll system calls and is triggered horizontally, that is, if the selector Select() finds that there are events in the socketChannel, such as readable data. As long as these data are not read from the socketChannel, the next selector Select() will still detect an event until the data is read
    ZooKeeper always thinks that selector Select () is a performance bottleneck. To improve the performance of the selector The performance of select() avoids the defects of the above horizontal trigger mode. During IO processing, ZooKeeper will make the socketChannel no longer listen to OP_READ and OP_WRITE event, which can lighten the selector The burden of select()
    At this point, a problem arises. After the IO is processed, how can the socketChannel listen to the op again_ Read and OP_WRITE event?
    Some small partners may think this is very easy. After the worker thread processes the IO, it directly calls the key Interestops (OP_READ & OP_WRITE) is OK? It is not as simple as this because the selector Select() is executed in the selector thread. If it is executed in the selector During the process of select (), the worker thread called the key Interestops (OP_READ & OP_WRITE) may block the selector select(). Zookeeper is designed to call key by selector thread in order to pursue the ultimate performance Interestops (OP_READ & OP_WRITE), so the worker thread needs to tell the selector thread that the socketChannel can listen to op after IO processing_ Read and op_ Write event,
    updateQueue stores the OPS that need to listen_ Read and op_ Of write events
  • socketChannel.NIOServerCnxn.outgoingBuffers
    outgoingBuffers store the response data to be sent to the client
    Note: I guess since key Interestops (op_read & op_write) will block the selector Select(), then accepted Register (selector, selectionkey.op_read) also blocks the selector Select (), so the received client connection registered to the selector must also be executed on the selector thread. This is also the reason why acceptedQueue exists

After understanding the thread IO model, let's take a look at the startup source code:

NIOServerCnxnFactory configuration method, which will be called when the configuration information is loaded before Zookeeper startup:

13.3 initialization and configuration method of nioservercnxnfactory

@Override
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
    if (secure) {
        throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
    }
    configureSaslLogin();

    maxClientCnxns = maxcc;
    initMaxCnxns();
    sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
    // We also use the sessionlessCnxnTimeout as expiring interval for
    // cnxnExpiryQueue. These don't need to be the same, but the expiring
    // interval passed into the ExpiryQueue() constructor below should be
    // less than or equal to the timeout.
    cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
    expirerThread = new ConnectionExpirerThread();

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    if (numSelectorThreads < 1) {
        throw new IOException("numSelectorThreads must be at least 1");
    }

    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

    String logMsg = "Configuring NIO connection handler with "
        + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
        + numSelectorThreads + " selector thread(s), "
        + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
        + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
    LOG.info(logMsg);
    for (int i = 0; i < numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

    listenBacklog = backlog;
//Create socket object and get file descriptor
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port {}", addr);
    if (listenBacklog == -1) {
        ss.socket().bind(addr);
    } else {
        ss.socket().bind(addr, listenBacklog);
    }
    ss.configureBlocking(false);
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

## 13.4 startup method of nioservercnxnfactory 
@Override
public void start() {
    stopped = false;
//Start worker thread
    if (workerPool == null) {
        workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
    }
//Start selector thread
    for (SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
//Start the accept thread
    // ensure thread is started once and only once
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
//Start expiration processing thread
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}


13.5 AcceptThread

13.5.1 AcceptThread type source code

The source code of accept thread is as follows: first, take a global view:

private class AcceptThread extends AbstractSelectThread {

    private final ServerSocketChannel acceptSocket;
    private final SelectionKey acceptKey;
    private final RateLogger acceptErrorLogger = new RateLogger(LOG);
    private final Collection<SelectorThread> selectorThreads;
    private Iterator<SelectorThread> selectorIterator;
    private volatile boolean reconfiguring = false;

    public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
        super("NIOServerCxnFactory.AcceptThread:" + addr);
        this.acceptSocket = ss;
//Register receive events with the channel
        this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
        this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
        selectorIterator = this.selectorThreads.iterator();
    }
public void run() {
    try {
        while (!stopped && !acceptSocket.socket().isClosed()) {
            try {
//If it is not closed, the select method is executed circularly
                select();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }
    } finally {
        closeSelector();
        // This will wake up the selector threads, and tell the
        // worker thread pool to begin shutdown.
        if (!reconfiguring) {
            NIOServerCnxnFactory.this.stop();
        }
        LOG.info("accept thread exitted run method");
    }
}

private void select() {
    try {
//Blocking until at least one channel is ready for the event you registered.
        selector.select();
//Once the select() method is called and the return value is not 0, you can access the selected key collection by calling the selectedKeys() method of the Selector
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped && selectedKeys.hasNext()) {
            SelectionKey key = selectedKeys.next();
            selectedKeys.remove();

            if (!key.isValid()) {
                continue;
            }
//Tests whether the channel for this key is ready to accept new socket connections.
            if (key.isAcceptable()) {
                if (!doAccept()) {
                    // If unable to pull a new connection off the accept
                    // queue, pause accepting to give us time to free
                    // up file descriptors and so the accept thread
                    // doesn't spin in a tight loop.
                    pauseAccept(10);
                }
            } else {
                LOG.warn("Unexpected ops in accept select {}", key.readyOps());
            }
        }
    } catch (IOException e) {
        LOG.warn("Ignoring IOException while selecting", e);
    }
}

private boolean doAccept() {
        boolean accepted = false;
        SocketChannel sc = null;
        try {
// The accept() method listens for new incoming connections. When the accept() method returns, it returns a SocketChannel containing the new incoming connection. Therefore, the accept() method blocks until a new connection arrives
            sc = acceptSocket.accept();
            accepted = true;
//If the current number of connections exceeds the configured maximum number of connections, new connections will be rejected
            if (limitTotalNumberOfCnxns()) {
                throw new IOException("Too many connections max allowed is " + maxCnxns);
            }
//Get the address of the current connection
            InetAddress ia = sc.socket().getInetAddress();
            int cnxncount = getClientCnxnCount(ia);
//The number of single client links exceeds the maximum
            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
            }

            LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
//You can set SocketChannel to non blocking mode After setting, you can call connect(), read(), and write() in asynchronous mode.

            sc.configureBlocking(false);

            // Round-robin assign this connection to a selector thread
            if (!selectorIterator.hasNext()) {
                selectorIterator = selectorThreads.iterator();
            }
//Get current Selector thread
            SelectorThread selectorThread = selectorIterator.next();
//Call the receive request method * of the selected thread to place the newly accepted connection in the queue waiting to be added. 
            if (!selectorThread.addAcceptedConnection(sc)) {
                throw new IOException("Unable to add connection to selector queue"
                                      + (stopped ? " (shutdown in progress)" : ""));
            }
            acceptErrorLogger.flush();
        } catch (IOException e) {
            // accept, maxClientCnxns, configureBlocking
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
            fastCloseSock(sc);
        }
        return accepted;
    }

}

pauseAccept pause reception

//If the new connection cannot be pulled from the accept queue, the accept is paused to give us time to release the file descriptor so that the accept thread does not rotate in a tight loop.
private void pauseAccept(long millisecs) {
    acceptKey.interestOps(0);
    try {
        selector.select(millisecs);
    } catch (IOException e) {
        // ignore
    } finally {
        acceptKey.interestOps(SelectionKey.OP_ACCEPT);
    }
}


13.6 SelectorThread

13.6.1 source code of selectorthread type

SelectorThread
The SelectorThread receives the newly received connection from the AcceptThread and is responsible for selecting the I/O readiness between the connections. This thread is the only one that performs non thread safe or potentially blocking calls to selectors (register new connections and read / write interest operations). Assigning a connection to a SelectorThread is permanent, and only one SelectorThread will interact with the connection. There are 1-N SelectorThreads, and the connections are evenly distributed between SelectorThreads.
If there is a worker thread pool, when a connection has I/O to execute, the SelectorThread removes it from the selection by clearing the operations it is interested in, and arranges the I/O to be processed by the worker thread. When the work is completed, the connection is placed on the ready queue to resume its operations of interest and restore the selection. If there is no worker thread pool, SelectorThread will directly perform I/O operations.

class SelectorThread extends AbstractSelectThread {

    private final int id;
    private final Queue<SocketChannel> acceptedQueue;
    private final Queue<SelectionKey> updateQueue;

    public SelectorThread(int id) throws IOException {
        super("NIOServerCxnFactory.SelectorThread-" + id);
        this.id = id;
        acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
        updateQueue = new LinkedBlockingQueue<SelectionKey>();
    }

    /**
     * Place new accepted connection onto a queue for adding. Do this
     * so only the selector thread modifies what keys are registered
     * with the selector.
Place the newly accepted connection on the queue to be added. Thus, only the selector thread modifies the keys registered with the selector.
     */
    public boolean addAcceptedConnection(SocketChannel accepted) {
        if (stopped || !acceptedQueue.offer(accepted)) {
            return false;
        }
	//wakeupSelector method in the parent type AbstractSelectThread of the wake-up selector call
        wakeupSelector();
        return true;
    }

    /**
     * Place interest op update requests onto a queue so that only the
     * selector thread modifies interest ops, because interest ops
     * reads/sets are potentially blocking operations if other select
     * operations are happening.
     */
    public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
        if (stopped || !updateQueue.offer(sk)) {
            return false;
        }
        wakeupSelector();
        return true;
    }

    /**
     * The main loop for the thread selects() on the connections and
     * dispatches ready I/O work requests, then registers all pending
     * newly accepted connections and updates any interest ops on the
     * queue.
The main loop of the thread selects () on the connection and dispatches the prepared I/O work requests, then registers all waiting newly accepted connections and updates any interested operations on the queue.
     */
    public void run() {
        try {
            while (!stopped) {
                try {

                    select();
                    processAcceptedConnections();
                    processInterestOpsUpdateRequests();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }

            // Close connections still pending on the selector. Any others
            // with in-flight work, let drain out of the work queue.
            for (SelectionKey key : selector.keys()) {
                NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
                if (cnxn.isSelectable()) {
                    cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                }
                cleanupSelectionKey(key);
            }
            SocketChannel accepted;
            while ((accepted = acceptedQueue.poll()) != null) {
                fastCloseSock(accepted);
            }
            updateQueue.clear();
        } finally {
            closeSelector();
            // This will wake up the accept thread and the other selector
            // threads, and tell the worker thread pool to begin shutdown.
            NIOServerCnxnFactory.this.stop();
            LOG.info("selector thread exitted run method");
        }
    }

    private void select() {
        try {
//Select a set of keys whose corresponding channel is ready for I/O operation.
            selector.select();
//Returns the selected keyset of the selector.
            Set<SelectionKey> selected = selector.selectedKeys();
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
            Collections.shuffle(selectedList);
            Iterator<SelectionKey> selectedKeys = selectedList.iterator();
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selected.remove(key);

                if (!key.isValid()) {
                    cleanupSelectionKey(key);
                    continue;
                }
                if (key.isReadable() || key.isWritable()) {
//Schedule I/O to process the connection associated with the given SelectionKey. If a worker thread pool is not used, I/O will be run directly by this thread.
                    handleIO(key);
                } else {
                    LOG.warn("Unexpected ops in select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }

    /**
     * Schedule I/O for processing on the connection associated with
     * the given SelectionKey. If a worker thread pool is not being used,
     * I/O is run directly by this thread.
Schedule I/O to process the connection associated with the given SelectionKey. If a worker thread pool is not used, I/O will be run directly by this thread.
     */
    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

        // Stop selecting this key while processing on its
        // Connection stop selecting this key when processing its connection
        cnxn.disableSelectable();
//Clear interest groups
        key.interestOps(0);
//Refresh connection Session timeout
        touchCnxn(cnxn);
//Execute IO thread to trigger IO read / write
        workerPool.schedule(workRequest);
    }

    /**
     * Iterate over the queue of accepted connections that have been
     * assigned to this thread but not yet placed on the selector.
     */
    private void processAcceptedConnections() {
        SocketChannel accepted;
        while (!stopped && (accepted = acceptedQueue.poll()) != null) {
            SelectionKey key = null;
            try {
                key = accepted.register(selector, SelectionKey.OP_READ);
                NIOServerCnxn cnxn = createConnection(accepted, key, this);
                key.attach(cnxn);
                addCnxn(cnxn);
            } catch (IOException e) {
                // register, createConnection
                cleanupSelectionKey(key);
                fastCloseSock(accepted);
            }
        }
    }

    /**
     * Iterate over the queue of connections ready to resume selection,
     * and restore their interest ops selection mask.
     */
    private void processInterestOpsUpdateRequests() {
        SelectionKey key;
        while (!stopped && (key = updateQueue.poll()) != null) {
            if (!key.isValid()) {
                cleanupSelectionKey(key);
            }
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                key.interestOps(cnxn.getInterestOps());
            }
        }
    }

}

For SelectorThread, we'll look at three operations in total. These three operations make an infinite loop through while. When the stop variable is set to true, the loop is terminated,
In the while infinite loop, the main loop of the thread selects () on the connection and dispatches the prepared I/O work requests, then registers all waiting newly accepted connections and updates any interested operations on the queue.

  • select();
    Select () on the connection and dispatch the prepared I/O work request
  • processAcceptedConnections();
    Handle the connection newly assigned by the accept thread, / / (1) register the new connection with the selector; (2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn
  • processInterestOpsUpdateRequests();
    Update listening events of connections in updateQueue

13.6.2 processing accepted connection requests processAcceptedConnections

Next, let's take a detailed look at how processAcceptedConnections handles the connections that can be received:

private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
//Register OP for SocketChannel_ The read event is used to receive read requests
            key = accepted.register(selector, SelectionKey.OP_READ);
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
//Attaches the given object to this key. This object is used to process connections and read IO data
            key.attach(cnxn);
//Cache the connections of the same IP client into the ipMap object in the NIOServerCnxnFactory type to limit the number of connections of the same client. If the number of connections of the same client is too large, a Too many connections error will be thrown and the accept connection will be rejected
            addCnxn(cnxn);
        } catch (IOException e) {
            // register, createConnection
            cleanupSelectionKey(key);
            fastCloseSock(accepted);
        }
    }
}

Register OP for SocketChannel_ The read event is used to start creating the connection object after receiving the read request, as follows:
Create a connection as follows:

protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {
    return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
}

13.6.2.1 take a look at the constructor of NIOServerCnxn

public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException {
    super(zk);
    this.sock = sock;
    this.sk = sk;
    this.factory = factory;
    this.selectorThread = selectorThread;
    if (this.factory.login != null) {
        this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
    }
//Turn off Nagle algorithm, and no cache delay is required for sending
    sock.socket().setTcpNoDelay(true);
    /* set socket ling 
SO_LINGER Another function is to reduce TIME_WAIT Number of sockets. In settings SO_LINGER Option, the specified waiting time is 0. In this case, active shutdown will not be sent FIN To end the connection, but directly set the connection to CLOSE Status, clear the send and receive buffers in the socket, and send directly to the opposite end RST package

//The first parameter is whether to enable SoLinger, and the second parameter is the duration if SoLinger is enabled
    sock.socket().setSoLinger(false, -1);
    InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress();
//Cache remote ip address
    addAuthInfo(new Id("ip", addr.getHostAddress()));
    this.sessionTimeout = factory.sessionlessCnxnTimeout;
}

13.6.3 update the listening event processInterestOpsUpdateRequests connected in updateQueue

processInterestOpsUpdateRequests() method:
As we said earlier, the subscription event will be stopped when the IO event is processed. After the IO event is processed, the listening event connected in the updateQueue will be obtained to subscribe to the interestOps

    private void processInterestOpsUpdateRequests() {
        SelectionKey key;
        while (!stopped && (key = updateQueue.poll()) != null) {
            if (!key.isValid()) {
                cleanupSelectionKey(key);
            }
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                key.interestOps(cnxn.getInterestOps());
            }
        }
    }

}

13.6.4 handling IO events

13.6.4.1 IOWorkRequest

IOWorkRequest handles the occurrence time of IO events. When there is data readable on SocketChannel, worker thread calls nioservercnxn Doio() for read operation

Sticking and unpacking
The troublesome problem in handling read events is that packets sent through TCP will be stuck and unpacked. In order to solve this problem, Zookeeper divides the packets into three parts when designing the communication protocol:

  • Length of request header and body (4 bytes)
  • Request header
  • Request body

Note: (1) the request header and request body are also subdivided into smaller parts, but no further research is done here. Just know that the first four bytes of the request are the length of the request header and request body (2) The request header and body are called payload
A length field of 4 bytes is added to the message header to indicate the length of the whole message except the length field The server can separate or combine the glued and unpacked messages into complete messages according to the length NIOServerCnxn data reading process is as follows:

NIOServerCnxn has two attributes. One is lenBuffer, with a capacity of 4 bytes, which is used to read length information One is incomingBuffer, which is called lenBuffer during initialization. However, after reading the length information, allocate the corresponding space for incomingBuffer to read the payload
Allocate the incomingBuffer size according to the length of the request message
Store the read bytes in the incomingBuffer until they are full (since the length allocated to the incomingBuffer in step 2 is just the length of the message, there is just one message in the incomingBuffer at this time)
Processing message

private class IOWorkRequest extends WorkerService.WorkRequest {

    private final SelectorThread selectorThread;
    private final SelectionKey key;
    private final NIOServerCnxn cnxn;

    IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
        this.selectorThread = selectorThread;
        this.key = key;
        this.cnxn = (NIOServerCnxn) key.attachment();
    }

    public void doWork() throws InterruptedException {
        if (!key.isValid()) {
            selectorThread.cleanupSelectionKey(key);
            return;
        }

        if (key.isReadable() || key.isWritable()) {
//IO operation if readable or writable
            cnxn.doIO(key);

            // Check if we shutdown or doIO() closed this connection
            if (stopped) {
                cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                return;
            }
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
            touchCnxn(cnxn);
        }

        // Mark this connection as once again ready for selection
        cnxn.enableSelectable();
        // Push an update request on the queue to resume selecting
        // on the current set of interest ops, which may have changed
        // as a result of the I/O operations we just performed.
        if (!selectorThread.addInterestOpsUpdateRequest(key)) {
            cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
        }
    }

    @Override
    public void cleanup() {
        cnxn.close(ServerCnxn.DisconnectReason.CLEAN_UP);
    }

}

13.6.4.2 doIO of nioservercnxn

You can refer to this blog:
https://blog.csdn.net/jpf254/article/details/80792086

//Process IO
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (!isSocketOpen()) {
            LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));

            return;
        }
        if (k.isReadable()) {
Reads a sequence of bytes from this channel to the given buffer.
//If it is a client request, a read event is triggered at this time
                //During initialization, incomingBuffer is a real-time lengthBuffer. Only 4 bytes are allocated for the user to read an int (this int value is the total length of the request message)
                
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                handleFailedRead();
            }
//Returns the number of elements between the current position and the limit
/*
                only incomingBuffer.remaining() == 0,Before proceeding to the next step,Otherwise, the data is read until incomingBuffer Read full,There are two possibilities:
                1.incomingBuffer namely lenBuffer,here incomingBuffer The content of is the length of this request message.
                according to lenBuffer by incomingBuffer Called after allocating space readPayload().
                stay readPayload()A data reading will be carried out immediately,(1)If you can incomingBuffer Read full,be incomingBuffer Is a complete request,Process the request;
                (2)If not incomingBuffer Read full,Description unpacking problem,A complete request cannot be constructed at this time,Can only wait for the client to continue sending data,Wait till next time socketChannel When readable,Continue reading data to incomingBuffer in
                2.incomingBuffer no lenBuffer,Description unpacking occurred during the last read,incomingBuffer There is only one part of the requested data in the.
                The data read this time plus the data read last time make up a complete request,call readPayload()
               
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
//One is lenBuffer, with a capacity of 4 bytes, which is used to read length information One is incomingBuffer, which is called lenBuffer during initialization. However, after reading the length information, allocate the corresponding space for incomingBuffer to read the payload
                if (incomingBuffer == lenBuffer) { // start of next request
//Switch to read ready state

                    incomingBuffer.flip();
//Read the next four bytes at the current position of this buffer, combine them into an int value according to the current byte order, and then increase the position by 4
//Here, it is false only when the transmitted data is a four word command. You do not need to read the data content later. The result is returned directly when it is false
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
//Read the details if it is not a four word command
                if (isPayload) { // not the case for 4letterword
                    readPayload();
                } else {
                    // four letter words take care
                    // need not do anything else
//If the four word command request has been processed previously, it will be returned directly
                    return;
                }
            }
        }
        if (k.isWritable()) {
            handleWrite(k);

            if (!initialized && !getReadInterest() && !getWriteInterest()) {
                throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
            }
        }
    } catch (CancelledKeyException e) {
        LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));

        LOG.debug("CancelledKeyException stack trace", e);

        close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
    } catch (CloseRequestException e) {
        // expecting close to log session closure
        close();
    } catch (EndOfStreamException e) {
        LOG.warn("Unexpected exception", e);
        // expecting close to log session closure
        close(e.getReason());
    } catch (ClientCnxnLimitException e) {
        // Common case exception, print at debug level
        ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
        LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
        close(DisconnectReason.CLIENT_CNX_LIMIT);
    } catch (IOException e) {
        LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
        close(DisconnectReason.IO_EXCEPTION);
    }
}
/**
     * This method is called in two cases:
     * 1.After allocating space for incomingBuffer according to the value of lengthBuffer, the data has not been read from socketChannel to incomingBuffer at this time
     * 2.The data has been read from socketChannel to incomingBuffer, and the reading is completed
     * <p>
     * Read the request payload (everything following the length prefix)
     */
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
//Corresponding to case 1, at this time, the incomingBuffer has just been allocated space, and the incomingBuffer is empty. Read the data once
            //(1) If the incomingBuffer is read to full, it will be processed directly;
            //(2) If the incomingBuffer is not read to full, it means that the data sent this time cannot constitute a complete request. Wait for the next time the data arrives and call doIo() to send the data again
            //Read from socketChannel to incomingBuffer
         

        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            handleFailedRead();
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        incomingBuffer.flip();
//No matter in case 1 or case 2, the incomingBuffer is full at this time, and the content must be a request. Process the request
            //Update statistics
        packetReceived(4 + incomingBuffer.remaining());
        if (!initialized) {
/Processing connection requests
            readConnectRequest();
        } else {
//Processing normal requests
            readRequest();
        }
//The request processing ends, and the lenBuffer and incomingBuffer are reset
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}
//Read connection data
private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}
//Read package data
private void readRequest() throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}

ZooKeeperServer processing connection request: processConnectRequest
You can refer to the article:

  • https://www.cnblogs.com/Benjious/p/11462064.html
    session:
  • https://my.oschina.net/anxiaole/blog/3217373
  • https://segmentfault.com/a/1190000022193168

13.6.4.3 processing connection requestprocessconnectrequest

The calling code is as follows:
zkServer.processConnectRequest(this, incomingBuffer);

The parsing code is as follows:
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
    throws IOException, ClientCnxnLimitException {

    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
//
    ConnectRequest connReq = new ConnectRequest();
//Deserialize connection information
    connReq.deserialize(bia, "connect");
    LOG.debug(
        "Session establishment request from client {} client's lastZxid is 0x{}",
        cnxn.getRemoteSocketAddress(),
        Long.toHexString(connReq.getLastZxidSeen()));

    long sessionId = connReq.getSessionId();
    int tokensNeeded = 1;
//Whether to consider connection weight during throttling. The configuration parameter is zookeeper connection_ throttle_ weight_ enabled
 The default value is false
    if (connThrottle.isConnectionWeightEnabled()) {
        if (sessionId == 0) {
            if (localSessionEnabled) {
                tokensNeeded = connThrottle.getRequiredTokensForLocal();
            } else {
                tokensNeeded = connThrottle.getRequiredTokensForGlobal();
            }
        } else {
            tokensNeeded = connThrottle.getRequiredTokensForRenew();
        }
    }
//token bucket current limiting algorithm, whether there are available tokens
    if (!connThrottle.checkLimit(tokensNeeded)) {
        throw new ClientCnxnLimitException();
    }
    ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
    ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);

    boolean readOnly = false;
    try {
//Read only
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
        // this is ok -- just a packet from an old client which
        // doesn't contain readOnly field
        LOG.warn(
            "Connection request from old client {}; will be dropped if server is in r-o mode",
            cnxn.getRemoteSocketAddress());
    }
//Currently it is a read-only object, and the data is not read-only
    if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
        LOG.info(msg);
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
//When the visible zxid of the client is larger than the latest zxid of the server, the processing is refused. This may occur after the snapshot and transaction log files of the server are deleted and the zookeeper is restarted, resulting in the smaller zxid of the server or brain fracture. The following errors will occur when the client accesses the zookeeper under different partitions
    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
                     + cnxn.getRemoteSocketAddress()
                     + " as it has seen zxid 0x"
                     + Long.toHexString(connReq.getLastZxidSeen())
                     + " our last zxid is 0x"
                     + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                     + " client must try another server";

        LOG.info(msg);
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
    }
//Initialize session timeout parameter
    int sessionTimeout = connReq.getTimeOut();
    byte[] passwd = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    if (sessionId == 0) {
//Create sessionid
        long id = createSession(cnxn, passwd, sessionTimeout);
        LOG.debug(
            "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
            Long.toHexString(id),
            Long.toHexString(connReq.getLastZxidSeen()),
            connReq.getTimeOut(),
            cnxn.getRemoteSocketAddress());
    } else {
        long clientSessionId = connReq.getSessionId();
            LOG.debug(
                "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                Long.toHexString(clientSessionId),
                Long.toHexString(connReq.getLastZxidSeen()),
                connReq.getTimeOut(),
                cnxn.getRemoteSocketAddress());
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);

    }
}


#### Create session and submit session createSession
//Create session code
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        // Possible since it's just deserialized from a packet on the wire.
        passwd = new byte[0];
    }
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
//f encapsulates the session information request submission request
    submitRequest(si);
    return sessionId;
}

Zookeeper Server usage SessionTrackerImpl To create sessionid´╝îsessionid Increment 1 each time
public long createSession(int sessionTimeout) {
    long sessionId = nextSessionId.getAndIncrement();
    trackSession(sessionId, sessionTimeout);
    return sessionId;
}
Initialized here nextSessionId Yes:
public static long initializeNextSessionId(long id) {
    long nextSid;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid = nextSid | (id << 56);
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}

13.6.4.4 tracking session expiration time

//Track the expiration time of the session
@Override
public synchronized boolean trackSession(long id, int sessionTimeout) {
    boolean added = false;

    SessionImpl session = sessionsById.get(id);
    if (session == null) {
        session = new SessionImpl(id, sessionTimeout);
    }

    // findbugs2.0.3 complains about get after put.
    // long term strategy would be use computeIfAbsent after JDK 1.8
//Store the mapping relationship between sessionid and session object into the local sessionsById Map cache object
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);

    if (existedSession != null) {
        session = existedSession;
    } else {
        added = true;
        LOG.debug("Adding session 0x{}", Long.toHexString(id));
    }

    if (LOG.isTraceEnabled()) {
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "SessionTrackerImpl --- " + actionStr
            + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
    }

    updateSessionExpiry(session, sessionTimeout);
    return added;
}
//Save the expiration time of the corresponding session object into the sessionExpiryQueue
private void updateSessionExpiry(SessionImpl s, int timeout) {
    logTraceTouchSession(s.sessionId, timeout, "");
    sessionExpiryQueue.update(s, timeout);
}
sessionExpiryQueued of update Method to tickTime Put expiration time into bucket collection by unit
public Long update(E elem, int timeout) {
    Long prevExpiryTime = elemMap.get(elem);
    long now = Time.currentElapsedTime();
    Long newExpiryTime = roundToNextInterval(now + timeout);

    if (newExpiryTime.equals(prevExpiryTime)) {
        // No change, so nothing to update
        return null;
    }

    // First add the elem to the new expiry time bucket in expiryMap.
// expiryMap is the Map of the set of expiration time mapping elements, key is the unit expiration time, and value is the set corresponding to the current expiration time
    Set<E> set = expiryMap.get(newExpiryTime);
    if (set == null) {
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        // Put the new set in the map, but only if another thread
        // hasn't beaten us to it
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
//Save to the set corresponding to the current expiration time
    set.add(elem);

    // Map the elem to the new expiry time. If a different previous
    // mapping was present, clean up the previous expiry bucket.
//
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
//If the element corresponding to the previous expiration time exists, remove the old data to ensure normal data refresh
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            prevSet.remove(elem);
        }
    }
    return newExpiryTime;
}

13.6.4.5 submit session

ZookeeperServer Submit session request submitRequest
public void submitRequest(Request si) {
    enqueueRequest(si);
}

public void enqueueRequest(Request si) {
    if (requestThrottler == null) {
        synchronized (this) {
            try {
                // Since all requests are passed to the request
                // processor it should wait for setting up the request
                // processor chain. The state will be updated to RUNNING
                // after the setup.
                while (state == State.INITIAL) {
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (requestThrottler == null) {
                throw new RuntimeException("Not started");
            }
        }
    }
    requestThrottler.submitRequest(si);
}

public void submitRequest(Request request) {
    if (stopping) {
        LOG.debug("Shutdown in progress. Request cannot be processed");
        dropRequest(request);
    } else {
        submittedRequests.add(request);
    }
}

submittedRequests is a request queue of type LinkedBlockingQueue

How are the requests placed in the queue handled next
Zookeeper uses the queue + asynchronous model: the request chain is as follows:

  • Submit request: requestthrottle run()>Zookeeper. submitRequestNow(Request si)>
  • Pre processing request: preprequestprocessor processRequest(Request request)
  • Request persistence: syncreuqestprocessor run
  • Business to process requests: finalrequestprocessor processTxn

13.6.4.6 submission request processing

See the specific request details later. Here we have seen the readConnectRequest method in ZookeeperServer

Next, let's take a look at how Zookeeper handles degree requests
Read request processing of Zookeeper readRequest() in NIOServerCnxn class
The processPacket method in ZookeeperServer was called

private void readRequest() throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}

How to process data packets? See the following code:

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
//Deserializing objects with just
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
    h.deserialize(bia, "header");

    // Need to increase the outstanding request count first, otherwise
    // there might be a race condition that it enabled recv after
    // processing request and then disabled when check throttling.
    //
    // Be aware that we're actually checking the global outstanding
    // request before this request.
    //
    // It's fine if the IOException thrown before we decrease the count
    // in cnxn, since it will close the cnxn anyway.
    cnxn.incrOutstandingAndCheckThrottle(h);

    // Through the magic of byte buffers, txn will not be
    // pointing
    // to the start of the txn generate a new read-only Buffer from the current position to be read
    incomingBuffer = incomingBuffer.slice();
	//If the current request is an authentication authorization request
    if (h.getType() == OpCode.auth) {
        LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
        AuthPacket authPacket = new AuthPacket();
        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
        String scheme = authPacket.getScheme();
        ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
        Code authReturn = KeeperException.Code.AUTHFAILED;
        if (ap != null) {
            try {
                // handleAuthentication may close the connection, to allow the client to choose
                // a different server to connect to.  Process client authentication
                authReturn = ap.handleAuthentication(
                    new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                    authPacket.getAuth());
            } catch (RuntimeException e) {
                LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
                authReturn = KeeperException.Code.AUTHFAILED;
            }
        }
//If the authentication is successful, a message of successful authentication will be returned if (authReturn == KeeperException.Code.OK){
            LOG.debug("Authentication succeeded for scheme: {}", scheme);
            LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh, null, null);
        } else {
//Authentication failure returns the message of authentication failure and closes the connection at the same time

            if (ap == null) {
                LOG.warn(
                    "No authentication provider for scheme: {} has {}",
                    scheme,
                    ProviderRegistry.listProviders());
            } else {
                LOG.warn("Authentication failed for scheme: {}", scheme);
            }
            // send a response...
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
            cnxn.sendResponse(rh, null, null);
            // ... and close connection
            cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            cnxn.disableRecv();
        }
        return;
    } else if (h.getType() == OpCode.sasl) {
//Handle sasl certification
        processSasl(incomingBuffer, cnxn, h);
    } else {
//Process request
        if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
//If it is an unauthenticated request, close the connection directly
            ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
            cnxn.sendResponse(replyHeader, null, "response");
            cnxn.sendCloseSession();
            cnxn.disableRecv();
        } else {
//Process request
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
            int length = incomingBuffer.limit();
//Check whether it is a large request by using the parameter zookeeper Largerequestthreshold configuration, 
            if (isLargeRequest(length)) {
                // checkRequestSize will throw IOException if request is rejected
 If it is a large number of transfers, judge the maximum request byte size to prevent JVM Heap memory overflow,The default size is 100 KB Pass parameter zookeeper.largeRequestMaxBytes to configure
                checkRequestSizeWhenMessageReceived(length);
                si.setLargeRequestSize(length);
            }
            si.setOwner(ServerCnxn.me);
//Submit a package request. The processing of this request is the same as that of the connection request. See the request code for details
            submitRequest(si);
        }
    }
}




In run() of accept thread, it executes selector Select() and call doAccept() to receive the client connection and add it to the selectorthread acceptedQueue()

selector thread
         @Override
        public void run() {
            try {
                while (!stopped) {
                    try {
                        //1. call select() to read the ready IO events, which are handled by the worker thread
                        select();
                        //2. handle the connection newly dispatched by the accept thread,
                        // (1) Register the new connection to the selector; (2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn
                        processAcceptedConnections();
                        //3. update listening events connected in updateQueue
                        processInterestOpsUpdateRequests();
                    } catch (RuntimeException e) {
                        LOG.warn("Ignoring unexpected runtime exception", e);
                    } catch (Exception e) {
                        LOG.warn("Ignoring unexpected exception", e);
                    }
                }
                //Perform a cleanup operation to close all connections waiting on the selector
                ...
            } finally {
                ...
                //Clean up work
            }
        }

In the run() of the selector thread, three things are mainly executed
Call select() to read the ready IO events and hand them over to the worker thread for processing (key.interestops (0) will be called before handing them over to the worker thread for processing)
Handle the newly dispatched connection of the accept thread,
(1) Register the new connection to the selector;
(2) Registered in NIOServerCnxnFactory after packaging as NIOServerCnxn
Update listening events of connections in updateQueue
worker thread
ZooKeeper manages a group of worker thread threads through WorkerService, which has two management modes:

Schema nameexplain**Usage scenarios*realization
Thread mode can be specifiedAssign tasks to be completed by a thread. If a series of tasks need to be completed in order, you can use this mode to assign tasks to be completed in order to the same threadA series of requests in the same sessionGenerate N executorservices, and each ExecutorService contains only one thread
Thread mode cannot be specifiedAfter a task is submitted, the WorkerService randomly assigns a thread to complete it. If there is no sequence requirement between tasks, this mode is usedPerform network IOGenerate 1 ExecutorService with N threads

Since there is no sequence requirement between the network IO tasks of each connection, the WorkerService used by NIOServerCnxnFactory adopts the non assignable thread mode

 /**
     * Schedule work to be done by the thread assigned to this id. Thread
     * assignment is a single mod operation on the number of threads.  If a
     * worker thread pool is not being used, work is done directly by
     * this thread.
     * Allocate workRequest to corresponding thread according to id module If the worker thread is not used
     * (That is, numWorkerThreads=0), then start the ScheduledWorkRequest thread to complete the task. Currently
     * The thread blocks until the task is completed
     *
     * @param workRequest Pending IO requests
     * @param id          Select which thread to use to process the workRequest based on this value
     */
    public void schedule(WorkRequest workRequest, long id) {
        if (stopped) {
            workRequest.cleanup();
            return;
        }

        ScheduledWorkRequest scheduledWorkRequest =
                new ScheduledWorkRequest(workRequest);

        // If we have a worker thread pool, use that; 
        // otherwise, do the work directly.
        int size = workers.size();
        if (size > 0) {
            try {
                // make sure to map negative ids as well to [0, size-1]
                int workerNum = ((int) (id % size) + size) % size;
                ExecutorService worker = workers.get(workerNum);
                worker.execute(scheduledWorkRequest);
            } catch (RejectedExecutionException e) {
                LOG.warn("ExecutorService rejected execution", e);
                workRequest.cleanup();
            }
        } else {
            // When there is no worker thread pool, do the work directly
            // and wait for its completion
            scheduledWorkRequest.start();
            try {
                scheduledWorkRequest.join();
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
                Thread.currentThread().interrupt();
            }
        }
    }

When introducing worker threads above, it was said that "if the number of such threads is 0, the selector thread is used to directly execute IO read / write". However, from the above source code, it can be seen that if the number of worker threads is 0, a thread is started for each network IO to execute, and the main thread is blocked until the network IO is executed. This is a waste of resources. Since it is necessary to block until the network IO is executed, why should a thread be started separately? I think it may be legacy code or preparation for future expansion, so there is such unreasonable code Therefore, the number of worker threads must not be set to 0
Let's continue to see how ScheduledWorkRequest handles network IO

@Override
        public void run() {
            try {
                // Check if stopped while request was on queue
                if (stopped) {
                    workRequest.cleanup();
                    return;
                }
                workRequest.doWork();
            } catch (Exception e) {
                LOG.warn("Unexpected exception", e);
                workRequest.cleanup();
            }
        }
        @Override
        public void doWork() throws InterruptedException {
            //If the Channel has been closed, clear the SelectionKey
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
            //1. if it is readable or writable, call NIOServerCnxn Doio() method to notify NIOServerCnxn connection object to read, write and process IO
            if (key.isReadable() || key.isWritable()) {
                //Call doIO() of NIOServerCnxn to complete IO processing
                cnxn.doIO(key);

                // Check if we shutdown or doIO() closed this connection
                //Close the connection if it has been shut down
                if (stopped) {
                    cnxn.close();
                    return;
                }
                //If the Channel has been closed, clear the SelectionKey
                if (!key.isValid()) {
                    selectorThread.cleanupSelectionKey(key);
                    return;
                }
                //2. update the expiration time of this session
                touchCnxn(cnxn);
            }

            //3. read / write processing has been completed. Re mark the connection as ready for new select event listening
            cnxn.enableSelectable();
            //Put the connection back into the updateQueue of the selectThread. The selectThread will update the registered listening events of this Channel after processing the read / write and new connections of all channels
            if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                cnxn.close();
            }
        }

Remove some robust code and accomplish three main things:
NIOServerCnxn.doIO() method to notify nioservercnxn connection object to read, write and process IO
Update the expiration time of this connection
The network IO has been processed. Modify the selectable flag bit and add the socketChannel to the updateQueue of the selector thread. Its functions have been described above

When the selector thread processes the connection received by the accept thread, in addition to registering the new connection to the selector, it also packages the connection as NIOServerCnxn and registers it in the NIOServerCnxnFactory NIOServerCnxn encapsulates the client connection. NIOServerCnxn is called in the worker thread Doio() handles network IO See NIOServerCnxn of ZooKeeper client connection ServerCnxn for details

13.7 ConnectionExpirerThread

This thread is used to clean up expired connections. The main methods are as follows:

  @Override
        public void run() {
            try {
                while (!stopped) {
                    long waitTime = cnxnExpiryQueue.getWaitTime();
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                        continue;
                    }
                    for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
                        conn.close();
                    }
                }

            } catch (InterruptedException e) {
                LOG.info("ConnnectionExpirerThread interrupted");
            }
        }

The working principle of this thread is detailed in Zookeeper connection and session expiration cleanup policy (ExpiryQueue)

13.8 NettyServerCnxnFactory

The NIO mode connectors are described in detail above. The following can compare the differences between the two:
NettyServerCnxnFactory uses netty for network IO, but it uses netty3* Version, and 4* Although the implementation ideas of the version are the same, the API s are very different. Therefore, we will not further study NettyServerCnxnFactory, but briefly introduce its differences from NIOServerCnxnFactory

differenceNIONetty
accept eventStart 1 accept threadThe boss group handles the accept event. One thread is started by default
select()Start select threadWhen adding a handler, call addLast(EventExecutorGroup, ChannelHandler...), and the handler will process IO events in the EventExecutorGroup
Network IOStart worker threadStart the work group to process network IO. By default, start the number of cores * 2 cores * 2 threads
Handling read eventsCall nioservercnxn in worker thread Doio() processingHandling read events in handler
Sticking and unpackingThe code is very complex to solve this problem through lenBuffer and incomingBufferInsert the handler that handles sticking and unpacking
Handling write eventsExecute finalrp The processrequest() thread and the worker thread pass nioservercnxn Outgoingbuffers for communication, written in batch by worker threadnetty naturally supports asynchronous write. If the current thread is an EventLoop thread, the data to be written will be stored in the ChannelOutboundBuffer If the current thread is not an EventLoop thread, the construct write task is added to the EventLoop task queue
Direct memoryDirect memory using ThreadLocalI don't know how to use direct memory in netty, but netty supports direct memory and is easy to use
Process connection closeStart connection expiration thread to manage connectionsHandling connections in handler

Note: the above difference is that netty4 Compared with NIOServerCnxnFactory, because ZooKeeper uses netty3, Therefore, there are some useless codes in its NettyServerCnxnFactory, such as the code for handling sticking and unpacking
From the above comparison, we can see that it is much more convenient to use netty to process network IO than to code based on Java NIO. Netty is a great method~~
summary
Summarize the three queues used for thread communication:

  • SelectorThread.acceptedQueue:accept thread communicates with selector thread
  • SelectorThread.updateQueue:worker thread communicates with selector thread
  • NIOServerCnxn.outgoingBuffers:worker thread communicates with request processing thread

Tags: Java Zookeeper network

Posted by TronB24 on Sat, 04 Jun 2022 03:33:48 +0530