Introduction to MyDisruptor V6
After the v5 version of MyDisruptor implemented the DSL-style API. The v6 version of MyDisruptor is scheduled to be the last version, requiring some final detail optimization of MyDisruptor.
There are three optimizations for v6:
- Solving pseudo-sharing problems
- Supports consumer thread graceful stop
- The data structure that maintains the consumer sequence set in the producer sequencer is optimized by ArrayList to the array Array type (reduces the additional range Check checks ArrayList performs on get operations)
Since this post is part of a series of blogs, you need to know more about the previous blog content to better understand this blog
- v1 Version Blog: Implement lmax-Disruptor queue from scratch (1) RingBuffer and single producer, single consumer working principle analysis
- v2 version blog: Implement lmax-Disruptor Queue from scratch (2) Analysis of the Principle of Consumption Dependency among Multiple Consumers and Consumer Groups
- v3 Version Blog: Implement lmax-Disruptor queue from zero (3) Multithreaded consumer WorkerPool principle analysis
- v4 Version Blog: Implement lmax-Disruptor Queue from scratch (4) MultiProducerSequencer Principle Analysis for Multithreaded Producers
- v5 Version Blog: Implement lmax-Disruptor Queue from scratch (5) Analysis of Disruptor DSL style API principles
Detailed explanation of the FalseSharing principle
In the first blog post, we already discussed the problem of pseudo-sharing. Here's a copy of the original blog:
Modern CPUs are multicore, with separate caches for each core. A cache consists of fixed-size cache rows (usually 32 or 64 bytes). The CPU reads and writes in the smallest unit of cache rows, and a cache row is usually occupied by multiple variables (such as a 32-bit reference pointer of 4 bytes and a 64-bit reference pointer of 8 bytes).
This design causes a problem: even if the variables on the cache row are not related (for example, do not belong to the same object), the cache consistency will be synchronized across the cache row as long as one of the shared variables on the cache row changes.
The synchronization of cache consistency between CPU s is a performance penalty that can be avoided and avoided as much as possible. This is the so-called "pseudo-sharing" problem.
disruptor solves the problem of pseudo-sharing by filling the cache rows with some key variables in the queue to avoid unnecessary refresh of the cache due to unrelated variable reads and writes.
Examples show how pseudo-sharing issues affect performance
- Suppose there is a Point object with two volatile-modified long-type fields, x and y.
There are two threads accessing a Point object concurrently, but one thread reads and writes only the x field and the other thread 2 reads and writes only the y field.
demo with pseudo-sharing problems
public class Point { public volatile int x; public volatile int y; public Point(int x, int y) { this.x = x; this.y = y; } }
import java.util.concurrent.CountDownLatch; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class FalseSharingDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); CountDownLatch countDownLatch = new CountDownLatch(2); Point point = new Point(1,2); long start = System.currentTimeMillis(); executor.execute(()->{ // Thread x increases 100 million times for(int i=0; i<100000000; i++){ point.x++; } countDownLatch.countDown(); }); executor.execute(()->{ // Thread y increases 100 million times for(int i=0; i<100000000; i++){ point.y++; } countDownLatch.countDown(); }); countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("testNormal time consuming=" + (end-start)); executor.shutdown(); } }
- Two threads access two different data independently, but x and y are two adjacent attributes of an object and are consecutively distributed in memory, possibly in the same cache line when read and write.
Due to volatile variable modification, thread 1's modification to x threads will trigger inter-cache synchronization to write strongly and consistently on the current cache rows, invalidating the cache rows on the CPU where the X and y fields reside in thread 2 and forcing them to re-read the latest data in main memory.
Thread 1 read and write x and thread 2 read and write y are not related at all. Thread 1 and thread 2 do not need to share the same piece of memory space in their actual business, so a strong cache row synchronization is a complete snap-in and only reduces performance.
- It is important to note that pseudo-sharing problems occur mostly between different objects, such as thread 1 accessing volatile variable aaa in object A and thread 2 accessing volatile variable bbb in another object B.
However, it happens that the aaa attribute of object A and the bbb attribute of object B are loaded into the same cache row, which is actually the most common pseudo-shared scenario.
Therefore, the above examples of x and y attributes interfering with each other in the same Point object are not very appropriate, only pseudo-shared scenarios of different fields in the same object are taken for demonstration purposes. - The solution to the pseudo-sharing problem is to fill in the cache rows, simply by filling in the unusable padding fields before and after the volatile field collection that needs to be avoided from pseudo-sharing, so that the compiler can make sure that the variable address is not affected by other threads when accessing unrelated variables.
Regardless of how the variable's memory address is allocated, volatile variables wrapped in populated fields are not accessed by other unrelated variables and are forced to cache synchronization consistently.
Solve pseudo-sharing problem demo by filling in useless fields
public class PointNoFalseSharing { private long lp1, lp2, lp3, lp4, lp5, lp6, lp7; public volatile long x; private long rp1, rp2, rp3, rp4, rp5, rp6, rp7; public volatile long y; public PointNoFalseSharing(int x, int y) { this.x = x; this.y = y; } }
public class NoFalseSharingDemo { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); CountDownLatch countDownLatch = new CountDownLatch(2); PointNoFalseSharing point = new PointNoFalseSharing(1,2); long start = System.currentTimeMillis(); executor.execute(()->{ // Thread x increases 100 million times for(int i=0; i<100000000; i++){ point.x++; } countDownLatch.countDown(); }); executor.execute(()->{ // Thread y increases 100 million times for(int i=0; i<100000000; i++){ point.y++; } countDownLatch.countDown(); }); countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("testNoFalseSharing time consuming=" + (end-start)); executor.shutdown(); } }
- Interested readers can execute demo s that have pseudo-sharing problems and solved pseudo-sharing problems.
On my machine, the sample code FalseSharingDemo with pseudo-sharing problems is three to five times slower than the sample code NoFalseSharingDemo with pseudo-sharing problems when two threads increase x and y 100 million times, respectively.
Solutions to pseudo-sharing problems in disruptor
- disruptor populates the cache rows for all or part of the attributes of the three key components, Sequence, RingBuffer, and SingleProducerSequencer.
These three components have two main features: they are only written by a single thread and frequently read by a large number of other threads, which makes them avoid pseudo-sharing problems and greatly improve performance in high-concurrency scenarios. - There are many attributes in MySingleProducerSequencer, but only nextValue and cachedConsumerSequenceValue are wrapped in the filled field, mainly because only these two fields are read and written frequently by the producer.
MySequence Resolves Pseudo-Sharing Implementation
/** * Sequence number object (like Disruptor.Sequence) * * Inside is a long value decorated with volatile since it needs to be accessed by both producer and consumer threads * */ public class MySequence { /** * Resolve pseudo-shared left half fill * */ private long lp1, lp2, lp3, lp4, lp5, lp6, lp7; /** * The sequence start value defaults to -1, ensuring that the next sequence happens to be 0 (that is, the first legal sequence number) * */ private volatile long value = -1; /** * Resolve pseudo-shared right-half padding * */ private long rp1, rp2, rp3, rp4, rp5, rp6, rp7; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { try { UNSAFE = UnsafeUtil.getUnsafe(); VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value")); } catch (final Exception e) { throw new RuntimeException(e); } } // Note: Method code is omitted }
MyRingBuffer Resolves Pseudo-Sharing Implementation
/** * Ring Queue (like Disruptor.RingBuffer) * */ public class MyRingBuffer<T> { /** * Resolve pseudo-shared left half fill * */ protected long lp1, lp2, lp3, lp4, lp5, lp6, lp7; private final T[] elementList; private final MyProducerSequencer myProducerSequencer; private final int ringBufferSize; private final int mask; /** * Resolve pseudo-shared right-half padding * */ protected long rp1, rp2, rp3, rp4, rp5, rp6, rp7; // Note: Method code is omitted }
MySingleProducerSequencer Resolves Pseudo-Sharing Implementation
/** * Single-threaded producer sequencer (like Disruptor.SingleProducerSequencer) * Simple version for single consumer only (only one consumerSequence) * * Because it is a single-threaded sequencer, it is designed to be thread insecure * */ public class MySingleProducerSequencer implements MyProducerSequencer { /** * Size of ringBuffer to which producer sequencer belongs * */ private final int ringBufferSize; /** * Currently published producer serial number * (Distinguish from nextValue) * */ private final MySequence currentProducerSequence = new MySequence(); /** * The set of consumer sequences of ringBuffer to which the producer sequencer belongs * */ private volatile MySequence[] gatingConsumerSequences = new MySequence[0]; private final MyWaitStrategy myWaitStrategy; /** * Resolve pseudo-shared left half fill * */ private long lp1, lp2, lp3, lp4, lp5, lp6, lp7; /** * Sequence currently applied for (but published depending on current ProducerSequence) * * Used internally by single-threaded producers, so it's a normal long, regardless of concurrency * */ private long nextValue = -1; /** * Currently Cached Consumer Sequence * * Used internally by single-threaded producers, so it's a normal long, regardless of concurrency * */ private long cachedConsumerSequenceValue = -1; /** * Resolve pseudo-shared right-half padding * */ private long rp1, rp2, rp3, rp4, rp5, rp6, rp7; // Note: Method code is omitted }
- Populating extra fields in an object avoids pseudo-sharing problems and improves performance, while also noting that it may significantly increase the memory space occupied by the object.
In disruptor, because the three data structures Sequence, RingBuffer, SingleProducer Sequencer are frequently accessed by threads, but the actual number is very limited (proportional to the total number of producers and consumers), this problem is not serious. - Filling in cached rows can be done either manually by setting the fill field like disruptor or by using the Contended annotation provided by jdk to tell the compiler to fill in the cached rows, which will stop expanding for a limited amount of space.
Why doesn't MultiProducerSequencer, similar to SingleProducerSequencer, need to solve the pseudo-sharing problem?
- Because attributes such as nextValue and cachedConsumerSequenceValue in a multithreaded producer sequencer need to be shared among multiple producer threads, frequent synchronization between cached rows at the core of multiple CPU s is required.
This scenario is a real sharing scenario, not a pseudo-sharing scenario, so there is no problem with pseudo-sharing.
Supports consumer threads to gracefully stop verbosity
As of MyDisruptor's v5 release, consumer threads are working through a never-ending while loop and cannot shut down consumer threads unless they are forced to kill, which is no doubt elegant.
Implement an external notification consumer thread to terminate itself
To do this, disruptor implements a mechanism that allows consumer threads to stop on their own initiative.
- The idea is to maintain an identity run inside the consumer thread to identify whether it needs to continue running. By default, it is running, but externally it can modify the status of the identity (paus method) to identify it as stopping.
- The status is checked each time during the consumer main loop and an AlertException exception is thrown if the identity is stopped. The exception is caught in the main loop and actively closed by jumping out of the main loop through a break.
Single-threaded consumer with elegant stop functionality
/** * Single-threaded consumer (like Disruptor.BatchEventProcessor) * */ public class MyBatchEventProcessor<T> implements MyEventProcessor{ private final MySequence currentConsumeSequence = new MySequence(-1); private final MyRingBuffer<T> myRingBuffer; private final MyEventHandler<T> myEventConsumer; private final MySequenceBarrier mySequenceBarrier; private final AtomicBoolean running = new AtomicBoolean(); public MyBatchEventProcessor(MyRingBuffer<T> myRingBuffer, MyEventHandler<T> myEventConsumer, MySequenceBarrier mySequenceBarrier) { this.myRingBuffer = myRingBuffer; this.myEventConsumer = myEventConsumer; this.mySequenceBarrier = mySequenceBarrier; } @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } this.mySequenceBarrier.clearAlert(); // Next Subscript Required Consumption long nextConsumerIndex = currentConsumeSequence.get() + 1; // Consumer threads main loop logic, constantly trying to get events and consume them (to make the code simpler, not to mention the ability to gracefully stop consumer threads) while(true) { try { long availableConsumeIndex = this.mySequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex); while (nextConsumerIndex <= availableConsumeIndex) { // Remove the event corresponding to the consumer's subscript and hand it to eventConsumer for consumption T event = myRingBuffer.get(nextConsumerIndex); this.myEventConsumer.consume(event, nextConsumerIndex, nextConsumerIndex == availableConsumeIndex); // Batch process, consuming N events per main cycle (subscript plus 1, get next) nextConsumerIndex++; } // Updating the current consumer consumption sequence (lazySet, which does not require a producer to read the consumer sequence in real time, is better because the producer himself is not reading the consumer sequence in real time) this.currentConsumeSequence.lazySet(availableConsumeIndex); LogUtil.logWithThreadName("Update the current consumer spending sequence:" + availableConsumeIndex); } catch (final MyAlertException ex) { LogUtil.logWithThreadName("Consumer MyAlertException" + ex); // Interrupted by external alert, check running Tags if (!running.get()) { // Run == false, break out of main loop, run end break; } } catch (final Throwable ex) { // In case of an exception, consumption progress is still advancing (skipping this batch of pulled data) (lazySet works the same way) this.currentConsumeSequence.lazySet(nextConsumerIndex); nextConsumerIndex++; } } } @Override public MySequence getCurrentConsumeSequence() { return this.currentConsumeSequence; } @Override public void halt() { // Current consumer status is set to stop running.set(false); // Wake up the consumer thread (so that it can immediately check to stop) this.mySequenceBarrier.alert(); } @Override public boolean isRunning() { return this.running.get(); } }
Multi-threaded consumer with elegant stop function
/** * Multithreaded consumer worker thread (like Disruptor.WorkProcessor) * */ public class MyWorkProcessor<T> implements MyEventProcessor{ private final MySequence currentConsumeSequence = new MySequence(-1); private final MyRingBuffer<T> myRingBuffer; private final MyWorkHandler<T> myWorkHandler; private final MySequenceBarrier sequenceBarrier; private final MySequence workGroupSequence; private final AtomicBoolean running = new AtomicBoolean(false); public MyWorkProcessor(MyRingBuffer<T> myRingBuffer, MyWorkHandler<T> myWorkHandler, MySequenceBarrier sequenceBarrier, MySequence workGroupSequence) { this.myRingBuffer = myRingBuffer; this.myWorkHandler = myWorkHandler; this.sequenceBarrier = sequenceBarrier; this.workGroupSequence = workGroupSequence; } @Override public MySequence getCurrentConsumeSequence() { return currentConsumeSequence; } @Override public void halt() { // Current consumer status is set to stop running.set(false); // Wake up the consumer thread (so that it can immediately check to stop) this.sequenceBarrier.alert(); } @Override public boolean isRunning() { return this.running.get(); } @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } this.sequenceBarrier.clearAlert(); long nextConsumerIndex = this.currentConsumeSequence.get(); // Set the Sentry value to ensure that the nextConsumerIndex <= cachedAvailableSequence must be false on the first cycle, and walk the else branch to get the maximum available sequence number through the sequence barrier long cachedAvailableSequence = Long.MIN_VALUE; // Has the sequence been processed recently boolean processedSequence = true; while (true) { try { if(processedSequence) { // Racing for a new consumption sequence, but not actually consuming (marked false) processedSequence = false; // If a sequence has already been processed, the re cas t contests for a new sequence to consume do { nextConsumerIndex = this.workGroupSequence.get() + 1L; // Since currentConsumeSequence is registered on the producer side, it is always necessary to coordinate with the actual sequence of the workGroupSequence worker group // That is, the current worker's consumption sequence currentConsumeSequence =the current consumer group's sequence workGroupSequence this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L); // Question: Just use workGroupSequence, can't each worker not maintain current ConsumeSequence? // Answer: This is not possible. Because as with single-threaded consumers, it is the specific consumer EvetHandler/workHandler that executes before updating the consumer's serial number to make it visible to the outside (producers, downstream consumers) // Because of the consumer dependency relationship, sequence i events can only be consumed downstream after upstream consumer consumption (eventHandler/workHandler executed). // WorGroupSequence is mainly used to coordinate consumer thread sequence contention within the same workerPool through cas. External constraints still require workProcessor's local consumer sequence currentConsumeSequence to control // cas update to ensure that each worker thread gets a unique sequence } while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex)); }else{ // ProcessedSequence == false (there is a sequence on hand that has not yet been consumed) // You get a new consumption sequence before you go here, but because nextConsumerIndex > cachedAvailableSequence, you don't actually execute the consumption logic // Instead, it gets blocked, returns to get the latest cachedAvailableSequence, executes a loop, and goes here // You need to consume this sequence on your head before you can continue with the next one } // There are only two cases for cachedAvailableSequence // 1First loop, initialized to Long.MIN_VALUE, you must go to the else branch below // 2 is not the first cycle, then cachedAvailableSequence is the largest consumable sequence allowed by the sequence barrier if (cachedAvailableSequence >= nextConsumerIndex) { // The competing consumption sequence is satisfied (less than sequence barrier value, allowed by sequence barrier), then the consumer is invoked for actual consumption // Remove the event corresponding to the consumer's subscript and hand it to eventConsumer for consumption T event = myRingBuffer.get(nextConsumerIndex); this.myWorkHandler.consume(event); // The consumer is actually invoked for consumption, marked as true. This allows cas to compete for the next new consumption sequence in the next cycle processedSequence = true; } else { // 1 The first cycle obtains the largest consumable sequence of the current sequence barrier // 2 is not the first cycle, indicating that the sequence scrambled for exceeds the maximum value of the barrier sequence, waiting for the producer to advance to the sequence scrambled for cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex); } } catch (final MyAlertException ex) { // Interrupted by external alert, check running Tags if (!running.get()) { // Run == false, break out of main loop, run end break; } } catch (final Throwable ex) { // Exceptions occur when consumers consume, which are also considered successful consumption, to avoid blocking the consumption sequence // Next cycle cas competes for a new consumption sequence processedSequence = true; } } } }
/** * Multithreaded consumer (like Disruptor.WorkerPool) * */ public class MyWorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final MySequence workSequence = new MySequence(-1); private final MyRingBuffer<T> myRingBuffer; private final List<MyWorkProcessor<T>> workEventProcessorList; public void halt() { for (MyWorkProcessor<?> processor : this.workEventProcessorList) { // Stop all worker threads one by one processor.halt(); } started.set(false); } public boolean isRunning(){ return this.started.get(); } // Note: Irrelevant code has been omitted }
Sequence barrier for elegant stop function
- In a paus method that modifies the identity state to stop, the consumer thread may be blocked by waiting for the producer to continue production (for example, BlockingWaitStrategy),
So we also need to try to wake up consumers through the alert method of SequenceBarrier, a consumer-maintained sequence barrier.
/** * Sequence Barrier (like Disruptor.SequenceBarrier) * */ public class MySequenceBarrier { private final MyProducerSequencer myProducerSequencer; private final MySequence currentProducerSequence; private volatile boolean alerted = false; private final MyWaitStrategy myWaitStrategy; private final MySequence[] dependentSequencesList; public MySequenceBarrier(MyProducerSequencer myProducerSequencer, MySequence currentProducerSequence, MyWaitStrategy myWaitStrategy, MySequence[] dependentSequencesList) { this.myProducerSequencer = myProducerSequencer; this.currentProducerSequence = currentProducerSequence; this.myWaitStrategy = myWaitStrategy; if(dependentSequencesList.length != 0) { this.dependentSequencesList = dependentSequencesList; }else{ // If the incoming upstream dependency sequence is empty, the producer serial number is the underlying dependency this.dependentSequencesList = new MySequence[]{currentProducerSequence}; } } /** * Get available consumer subscriptions (waitFor in disruptor) * */ public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException, MyAlertException { // Every time you check to see if there is a wake-up, the wake-up throws MyAlertException on behalf of the current consumer to stop running checkAlert(); long availableSequence = this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,dependentSequencesList,this); if (availableSequence < currentConsumeSequence) { return availableSequence; } // In multithreaded producers, further constraints are required (new in v4) return myProducerSequencer.getHighestPublishedSequence(currentConsumeSequence,availableSequence); } /** * Wake up potential blocked consumers * */ public void alert() { this.alerted = true; this.myWaitStrategy.signalWhenBlocking(); } /** * Clear flags on restart */ public void clearAlert() { this.alerted = false; } /** * Check current consumer wakeup status * */ public void checkAlert() throws MyAlertException { if (alerted) { throw MyAlertException.INSTANCE; } } }
Stop all current consumer threads by the halt method exposed by disruptor
- The disruptor class provides a paus method that shuts down all registered consumer threads based on the paus mechanism provided by the component.
- ConsumererInfo abstracts a single-threaded/multithreaded consumer whose subclass halt method calls the corresponding consumer's halt method to terminate the corresponding consumer.
/** * disruptor dsl(Imitate Disruptor.Disruptor) * */ public class MyDisruptor<T> { private final MyRingBuffer<T> ringBuffer; private final Executor executor; private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); /** * Start all registered consumers * */ public void start(){ // cas Sets Startup Identification to Avoid Duplicate Startup if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor Start only once"); } // Traverse all consumers, start by start this.consumerRepository.getConsumerInfos().forEach( item->item.start(this.executor) ); } /** * All consumers who stop registering * */ public void halt() { // Traverse the consumer information list and call the stop method one by one to terminate for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) { consumerInfo.halt(); } } // Note: Irrelevant code has been omitted }
Gracefully Stop Consumer Threads
- So far, disruptor's halt method has been implemented to allow external control over the start and termination of consumer threads. But there is still a key issue that remains unresolved: how can we ensure that when the consumer thread halt stops, there are no events that have not yet been consumed?
- disruptor is an in-memory queue that, when closed, is equivalent to losing a message if the consumer does not consume events already in the ringBuffer. This problem is fatal and unacceptable in some scenarios.
- disruptor provides a shutdown method for truly elegant stopping all consumers. The shutdown method checks the consumption status of all consumers until all consumers consume production events before calling the stop method to terminate the consumer thread.
It allows users to achieve a truly elegant stop without losing events.
shutdown method implementation of disruptor
- In the dsl-style api provided by disruptor, the consumer sequence that is not at the end of the consumer chain is optimized by updateGatingSequences ForNextInChain method to exclude from the producer.
These consumers (ConsumeInfo) are also marked at the end of the consumer queue (endOfChain) - The shutdown method uses a busy cycle to check for (the slowest) consumers at the end of the consumer chain who are making progress slower than the producers through the hasBacklog method.
/** * disruptor dsl(Imitate Disruptor.Disruptor) * */ public class MyDisruptor<T> { private final MyRingBuffer<T> ringBuffer; private final Executor executor; private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); public MyDisruptor( final MyEventFactory<T> eventProducer, final int ringBufferSize, final Executor executor, final ProducerType producerType, final MyWaitStrategy myWaitStrategy) { this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy); this.executor = executor; } /** * Register single-threaded consumers (no upstream dependent consumers, only producer sequence) * */ @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){ return createEventProcessors(new MySequence[0], myEventHandlers); } /** * Register single-threaded consumers (upstream dependent consumers, producer sequence only) * @param barrierSequences Dependent Sequence Barrier * @param myEventHandlers User-defined collection of event consumers * */ public MyEventHandlerGroup<T> createEventProcessors( final MySequence[] barrierSequences, final MyEventHandler<T>[] myEventHandlers) { final MySequence[] processorSequences = new MySequence[myEventHandlers.length]; final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); int i=0; for(MyEventHandler<T> myEventConsumer : myEventHandlers){ final MyBatchEventProcessor<T> batchEventProcessor = new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier); processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence(); i++; // The consumer object is maintained for subsequent starts to start consumerRepository.add(batchEventProcessor); } // Update the current producer registered consumer sequence updateGatingSequencesForNextInChain(barrierSequences,processorSequences); return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences); } /** * Register multithreaded consumers (no upstream dependent consumers, only producer sequence) * */ @SafeVarargs public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) { return createWorkerPool(new MySequence[0], myWorkHandlers); } /** * Register multithreaded consumers (upstream dependent consumers, producer sequence only) * @param barrierSequences Dependent Sequence Barrier * @param myWorkHandlers User-defined collection of event consumers * */ public MyEventHandlerGroup<T> createWorkerPool( final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) { final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers); // The consumer is saved for start unified start or halt, shutdown unified stop consumerRepository.add(workerPool); final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences); } private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) { if (processorSequences.length != 0) { // This is an optimization operation: // As new consumers pass through ringBuffer.newBarrier (barrier Sequences), they are already dependent on a sequence of consumers already in ringBuffer. // The old barrier Sequences are already set as dependencies within the consumer, EventProcessor, so the existing consumer sequence in the ringBuffer can be removed // Simply save, relying on the sequence at the end of the current consumer chain (that is, the slowest sequence) allows the producer to traverse the registered consumer sequence faster for(MySequence sequence : barrierSequences){ ringBuffer.removeConsumerSequence(sequence); } for(MySequence sequence : processorSequences){ // The new setup is the sequence at the end of the current consumer chain ringBuffer.addConsumerSequence(sequence); } // The status of the excluded sequence is marked as not belonging to the end of the consumer dependency chain (for shutdown graceful stop) consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } /** * Start all registered consumers * */ public void start(){ // cas Sets Startup Identification to Avoid Duplicate Startup if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor Start only once"); } // Traverse all consumers, start by start this.consumerRepository.getConsumerInfos().forEach( item->item.start(this.executor) ); } /** * All consumers who stop registering * */ public void halt() { // Traverse the consumer information list and call the stop method one by one to terminate for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) { consumerInfo.halt(); } } /** * Stop all consumer threads until all consumers have consumed all the events they have produced * */ public void shutdown(long timeout, TimeUnit timeUnit){ final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); // Loop indefinitely until all production events are consumed while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw new RuntimeException("disruptor shutdown Operation, wait timeout"); } // Busy waiting } // hasBacklog is false, jumping out of the loop // Indicates that all consumed events that have been produced have been consumed, at which point all consumer threads can be safely and elegantly stopped. halt(); } /** * Determine if there are still unused events for current consumers */ private boolean hasBacklog() { final long cursor = ringBuffer.getCurrentProducerSequence().get(); // Get all the consumer sequences at the end (the slowest, so accurate) for (final MySequence consumer : consumerRepository.getLastSequenceInChain()) { if (cursor > consumer.get()) { // Returns true if any consumer serial number is less than the current producer sequence, indicating that there are unused events return true; } } // All end-to-end consumers have the same serial number as producers // Indicates that all consumers have consumed all the produced events by now, returning false return false; } /** * Get ringBuffer for the parent Disruptor * */ public MyRingBuffer<T> getRingBuffer() { return ringBuffer; } }
/** * A repository that maintains all consumer object information for the current disruptor (similar to Disruptor.ConsumerRepository) */ public class MyConsumerRepository<T> { private final ArrayList<MyConsumerInfo> consumerInfos = new ArrayList<>(); /** * Do not override Sequence's hashCode, equals, because the comparison is made to see if the original object is equal * */ private final Map<MySequence, MyConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<>(); public ArrayList<MyConsumerInfo> getConsumerInfos() { return consumerInfos; } public void add(final MyEventProcessor processor) { final MyEventProcessorInfo<T> consumerInfo = new MyEventProcessorInfo<>(processor); eventProcessorInfoBySequence.put(processor.getCurrentConsumeSequence(),consumerInfo); consumerInfos.add(consumerInfo); } public void add(final MyWorkerPool<T> workerPool) { final MyWorkerPoolInfo<T> workerPoolInfo = new MyWorkerPoolInfo<>(workerPool); for (MySequence sequence : workerPool.getCurrentWorkerSequences()) { eventProcessorInfoBySequence.put(sequence, workerPoolInfo); } consumerInfos.add(workerPoolInfo); } /** * Find all running, end-of-life consumers * */ public List<MySequence> getLastSequenceInChain() { List<MySequence> lastSequenceList = new ArrayList<>(); for (MyConsumerInfo consumerInfo : consumerInfos) { // Find all running, end-of-life consumers if (consumerInfo.isRunning() && consumerInfo.isEndOfChain()) { final MySequence[] sequences = consumerInfo.getSequences(); // Put all their consumer serial numbers in the lastSequenceList Collections.addAll(lastSequenceList, sequences); } } return lastSequenceList; } public void unMarkEventProcessorsAsEndOfChain(final MySequence... barrierEventProcessors) { for (MySequence barrierEventProcessor : barrierEventProcessors) { eventProcessorInfoBySequence.get(barrierEventProcessor).markAsUsedInBarrier(); } } }
ConsumerInfo and its subclass implementations
/** * Consumer Information (like Disruptor.ConsumerInfo) * */ public interface MyConsumerInfo { /** * Start the current consumer with executor * @param executor starter * */ void start(Executor executor); /** * Stop the current consumer * */ void halt(); /** * Is it the last consumer * */ boolean isEndOfChain(); /** * Mark current consumer as not the last consumer * */ void markAsUsedInBarrier(); /** * Is the current consumer still running * */ boolean isRunning(); /** * Get the serial number of the consumer (multi-threaded consumers are represented by multiple serial number objects) * */ MySequence[] getSequences(); }
/** * Single-threaded event handler information (like Disruptor.EventProcessorInfo) * */ public class MyEventProcessorInfo<T> implements MyConsumerInfo { private final MyEventProcessor myEventProcessor; /** * Default is the last consumer * */ private boolean endOfChain = true; public MyEventProcessorInfo(MyEventProcessor myEventProcessor) { this.myEventProcessor = myEventProcessor; } @Override public void start(Executor executor) { executor.execute(myEventProcessor); } @Override public void halt() { this.myEventProcessor.halt(); } @Override public boolean isEndOfChain() { return endOfChain; } @Override public void markAsUsedInBarrier() { this.endOfChain = false; } @Override public boolean isRunning() { return this.myEventProcessor.isRunning(); } @Override public MySequence[] getSequences() { return new MySequence[]{this.myEventProcessor.getCurrentConsumeSequence()}; } }
/** * Multithreaded consumer information (like Disruptor.WorkerPoolInfo) * */ public class MyWorkerPoolInfo<T> implements MyConsumerInfo { private final MyWorkerPool<T> workerPool; /** * Default is the last consumer * */ private boolean endOfChain = true; public MyWorkerPoolInfo(MyWorkerPool<T> workerPool) { this.workerPool = workerPool; } @Override public void start(Executor executor) { workerPool.start(executor); } @Override public void halt() { this.workerPool.halt(); } @Override public boolean isEndOfChain() { return endOfChain; } @Override public void markAsUsedInBarrier() { this.endOfChain = true; } @Override public boolean isRunning() { return this.workerPool.isRunning(); } @Override public MySequence[] getSequences() { return this.workerPool.getCurrentWorkerSequences(); } }
- So far, the v6 version of MyDisruptor has fully implemented the consumer's elegant stop function. Once the producer thread is no longer productive, it can safely and elegantly shut down all consumers through the shutdown method provided by Disruptor.
- Compared to the previous version, you can see that disruptor has added a lot of new methods and logic for graceful stop, making the overall code more complex and incomprehensible, so MyDisruptor deferred this until the end.
The set of consumer sequences in a producer is optimized by ArrayList to an array
As of version v5, MyDisruptor stores a set of consumer sequences registered in the Producer Sequencer through an ArrayList linear table. But disruptor uses arrays directly to save it. Why?
- The set of consumer sequences maintained by the producer sequencer in disruptor is dynamically added and deleted. Earlier versions of MyDisruptor used ArrayList directly to avoid writing additional code to expand the array and make the code easier to understand.
- Although ArrayList is a linear table structure with a simple encapsulation based on arrays, it is not as efficient as direct access to "array[index]" to access elements in an array.
The reason is that there is an additional RanCheck judgment in the get method of ArrayList; The iterator for ArrayList includes additional logic such as validation of Concurrent Version numbers.
Array Lists with additional logic certainly do not perform as well as bare arrays in accessing internal elements. - In most scenarios, this small performance difference between bare arrays and ArrayList s is entirely negligible. However, producers in disruptor continuously iterate through the maintained consumer sequence through the getMinimumSequence method. So it's worth discarding some readability for a small performance boost.
Producer changed from ArrayList to Array implementation (the same is true for multithreaded producers)
/** * Single-threaded producer sequencer (like Disruptor.SingleProducerSequencer) * * Because it is a single-threaded sequencer, it is designed to be thread insecure * */ public class MySingleProducerSequencer implements MyProducerSequencer{ private static final AtomicReferenceFieldUpdater<MySingleProducerSequencer, MySequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(MySingleProducerSequencer.class, MySequence[].class, "gatingConsumerSequences"); @Override public void addGatingConsumerSequence(MySequence newGatingConsumerSequence){ MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequence); } @Override public void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences){ MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequences); } @Override public void removeConsumerSequence(MySequence sequenceNeedRemove) { MySequenceGroups.removeSequence(this,SEQUENCE_UPDATER,sequenceNeedRemove); } // Note: Irrelevant code has been omitted }
/** * Change Sequence Array Tool Class (like Disruptor.SequenceGroups) * Note: The insert/delete mechanism of cas in the implementation is not necessary in MyDisruptor because MyDisruptor does not support dynamically registering new consumers at runtime (disruptor supports, but with some additional complexity) * This is done just to be consistent with Disruptor's implementation and to better explain how it works, essentially by supporting sequence array expansion/scaling * */ public class MySequenceGroups { /** * Add a new sequence set that needs to be registered to the corresponding sequence array of the holder object (sequencesToAdd set) * */ public static <T> void addSequences( final T holder, final AtomicReferenceFieldUpdater<T, MySequence[]> updater, final MySequence currentProducerSequence, final MySequence... sequencesToAdd) { long cursorSequence; MySequence[] updatedSequences; MySequence[] currentSequences; do { // Get the current array reference for the data holder currentSequences = updater.get(holder); // Copy data from the original array into the new array updatedSequences = Arrays.copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); cursorSequence = currentProducerSequence.get(); int index = currentSequences.length; // Each newly added sequence value is based on the current producer's sequence for (MySequence sequence : sequencesToAdd) { sequence.set(cursorSequence); // New registration sequence into array updatedSequences[index++] = sequence; } // cas assigns a new array to the object, allowing disruptor to register a new consumer sequence collection concurrently at run time // Only if the cas assignment succeeds will it be returned, and if it fails, it will retrieve the latest currentSequences, rebuild and merge the new updatedSequences array } while (!updater.compareAndSet(holder, currentSequences, updatedSequences)); // New registered consumer sequence, then final correction based on current producer sequence cursorSequence = currentProducerSequence.get(); for (MySequence sequence : sequencesToAdd) { sequence.set(cursorSequence); } } /** * Remove a sequence from the sequence array of holder s * */ public static <T> void removeSequence( final T holder, final AtomicReferenceFieldUpdater<T, MySequence[]> sequenceUpdater, final MySequence sequenceNeedRemove) { int numToRemove; MySequence[] oldSequences; MySequence[] newSequences; do { // Get the current array reference for the data holder oldSequences = sequenceUpdater.get(holder); // Gets the number of sequence s that need to be deleted from the array numToRemove = countMatching(oldSequences, sequenceNeedRemove); if (0 == numToRemove) { // No Sequence found to delete, return directly return; } final int oldSize = oldSequences.length; // Construct a new sequence array newSequences = new MySequence[oldSize - numToRemove]; for (int i = 0, pos = 0; i < oldSize; i++) { // Copy the sequence from the original array to the new array final MySequence testSequence = oldSequences[i]; if (sequenceNeedRemove != testSequence) { // Copy only data that does not need to be deleted newSequences[pos++] = testSequence; } } } while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences)); } private static int countMatching(MySequence[] values, final MySequence toMatch) { int numToRemove = 0; for (MySequence value : values) { if (value == toMatch) { // Comparing Sequence references, delete if same as toMatch numToRemove++; } } return numToRemove; } }
summary
- As the last blog in the disruptor learning series, the v6 release finishes optimizing some key performance issues with MyDisruptor. The final v6 version of MyDisruptor, with the exception of a few features that are not commonly used, is almost as different as Disruptor as a whole.
- Throughout the v1 to v6 iteration process, MyDisruptor started with simple support for single threading/single consumer, constantly enriching functionality, optimizing performance, and expanding and becoming more complex.
However, as long as each version is designed to implement one or more full functional modules, it is not difficult to understand how to slice these codes organically. - In the process of implementing MyDisruptor from the point of view of the designer, I learned a lot and gradually understood why disruptor was implemented in some places.
This copied, self-made way can dramatically reduce the learning curve for such a clever and detailed project as disruptor, helping us better understand how disruptor works and the design ideas behind it.
Disruptor has a lot of details to think about and learn from both the overall design and the final code implementation. I hope this series of blogs will help small partners interested in disruptor.
The full code for this blog is on my github: https://github.com/1399852153/MyDisruptor Branch: feature/lab6