This article describes some best practices related to Pulsar client coding, and provides commercially available sample codes for your reference during R & D to improve your efficiency in accessing Pulsar. In the production environment, Pulsar address information is often obtained through the configuration center or K8s domain name discovery. This is not the focus of this article. Take pulsarconstant SERVICE_ HTTP_ URL substitution. The examples in this article have been uploaded to Github .
Early Client initialization and configuration
Initialize Client--demo level
import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInit { private static final DemoPulsarClientInit INSTANCE = new DemoPulsarClientInit(); private PulsarClient pulsarClient; public static DemoPulsarClientInit getInstance() { return INSTANCE; } public void init() throws Exception { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .build(); } public PulsarClient getPulsarClient() { return pulsarClient; } }
The Demo level Pulsar client does not have any custom parameters configured during initialization, and no exceptions are considered during initialization. Exceptions will be thrown directly during init.
Initialize Client -- online level
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInitRetry { private static final DemoPulsarClientInitRetry INSTANCE = new DemoPulsarClientInitRetry(); private volatile PulsarClient pulsarClient; private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init")); public static DemoPulsarClientInitRetry getInstance() { return INSTANCE; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .build(); log.info("pulsar client init success"); this.executorService.shutdown(); } catch (Exception e) { log.error("init pulsar error, exception is ", e); } } public PulsarClient getPulsarClient() { return pulsarClient; } }
In the actual environment, we often need to ensure that the microservice startup will not be affected after the pulsar client fails to initialize, that is, after the microservice is started, we will always try to create the pulsar client again.
The above code example achieves this goal by volatile plus continuous loop reconstruction, and destroys the timer thread after the client is successfully created.
Initialize Client -- commercial level
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SizeUnit; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInitUltimate { private static final DemoPulsarClientInitUltimate INSTANCE = new DemoPulsarClientInitUltimate(); private volatile PulsarClient pulsarClient; private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init")); public static DemoPulsarClientInitUltimate getInstance() { return INSTANCE; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .ioThreads(4) .listenerThreads(10) .memoryLimit(64, SizeUnit.MEGA_BYTES) .operationTimeout(5, TimeUnit.SECONDS) .connectionTimeout(15, TimeUnit.SECONDS) .build(); log.info("pulsar client init success"); this.executorService.shutdown(); } catch (Exception e) { log.error("init pulsar error, exception is ", e); } } public PulsarClient getPulsarClient() { return pulsarClient; } }
The commercial Pulsar Client has added 5 configuration parameters:
- ioThreads of ioThreads netty is responsible for network IO operations. If the traffic is large, you can increase the number of ioThreads;
- listenersThreads is responsible for calling the callback function of the consumer started in the listener mode. It is recommended to configure the number of partition s that the client is responsible for;
- memoryLimit is currently used to limit the maximum memory available to Pulsar producers, which can well prevent message backlog on the producer side in scenarios such as network interruption and Pulsar failure, resulting in Java program OOM;
- operationTimeout the timeout of some metadata operations. Pulsar defaults to 30s, which is somewhat conservative. It can be adjusted down appropriately according to its own network conditions and processing performance;
- connectionTimeout the timeout time for connecting to the Pulsar. The configuration principle is the same as above.
Client advanced parameters (memory allocation related)
We can also control the parameters of Pulsar client memory allocation by passing Java property. Here are some important parameters:
- pulsar. allocator. If pooled is true, the out of heap memory pool is used; if false, the heap memory allocation is used without using the memory pool. Efficient off heap memory pool is used by default;
- pulsar. allocator. exit_ on_ Whether to close the jvm if the memory overflows. The default value is false;
- pulsar.allocator.out_of_memory_policy in https://github.com/apache/pul... Introduced. Currently, there is no formal release version for configuring the behavior when the out of heap memory is insufficient. The options are FallbackToHeap and ThrowException. The default is FallbackToHeap. If you do not want the memory of message serialization to affect the heap memory allocation, you can configure ThrowException.
producer
Initialize producer important parameters
maxPendingMessages
The producer message sending queue is configured reasonably according to the actual topic level to avoid OOM in the scenarios of network interruption and Pulsar failure. It is recommended to select one from the configuration memoryLimit on the client side.
messageRoutingMode
Message routing mode. The default is RoundRobinPartition. Select according to business requirements. If you need to maintain order, you usually select SinglePartition to send messages with the same key to the same partition.
autoUpdatePartition
Automatically update partition information. If the partition information in topic remains unchanged, no configuration is required to reduce cluster consumption.
batch related parameters
Because the bottom layer of batch sending mode is implemented by scheduled tasks, if the number of messages on this topic is small, it is not recommended to start batch. In particular, a large number of low interval scheduled tasks will cause the CPU of netty threads to soar.
- Enable batch sending;
- batchingMaxMessages maximum number of messages sent in batch
- batchingMaxPublishDelay batch send scheduled task interval.
Static producer initialization
Static producer means that the producer will not be started or closed as the business changes. After the microservice is started and the client is initialized, initialize the producer. The example is as follows:
One thread per producer, applicable to scenarios with a small number of producers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarStaticProducerInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-producer-init")); private final String topic; private volatile Producer<byte[]> producer; public DemoPulsarStaticProducerInit(String topic) { this.topic = topic; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); producer = instance.getPulsarClient().newProducer().topic(topic).create(); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); } } public Producer<byte[]> getProducer() { return producer; } }
One thread for multiple producers, applicable to scenarios with a large number of producers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarStaticProducersInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private CopyOnWriteArrayList<Producer<byte[]>> producers; private int initIndex; private List<String> topics; public DemoPulsarStaticProducersInit(List<String> topics) { this.topics = topics; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { if (initIndex == topics.size()) { return; } for (; initIndex < topics.size(); initIndex++) { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); final Producer<byte[]> producer = instance.getPulsarClient().newProducer().topic(topics.get(initIndex)).create();; producers.add(producer); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); break; } } } public CopyOnWriteArrayList<Producer<byte[]>> getProducers() { return producers; } }
Dynamically generate a destroyed producer example
There are also some businesses. Our producer may start or destroy dynamically according to the business, such as receiving the data of vehicles on the road and sending it to the specified topic. We will not let all producers reside in the memory, which will lead to a large amount of memory consumption. We can use a method similar to LRU Cache to manage the life cycle of producers.
/** * @author hezhangjian */ @Slf4j public class DemoPulsarDynamicProducerInit { /** * topic -- producer */ private AsyncLoadingCache<String, Producer<byte[]>> producerCache; public DemoPulsarDynamicProducerInit() { this.producerCache = Caffeine.newBuilder() .expireAfterAccess(600, TimeUnit.SECONDS) .maximumSize(3000) .removalListener((RemovalListener<String, Producer<byte[]>>) (topic, value, cause) -> { log.info("topic {} cache removed, because of {}", topic, cause); try { value.close(); } catch (Exception e) { log.error("close failed, ", e); } }) .buildAsync(new AsyncCacheLoader<>() { @Override public CompletableFuture<Producer<byte[]>> asyncLoad(String topic, Executor executor) { return acquireFuture(topic); } @Override public CompletableFuture<Producer<byte[]>> asyncReload(String topic, Producer<byte[]> oldValue, Executor executor) { return acquireFuture(topic); } }); } private CompletableFuture<Producer<byte[]>> acquireFuture(String topic) { CompletableFuture<Producer<byte[]>> future = new CompletableFuture<>(); try { ProducerBuilder<byte[]> builder = DemoPulsarClientInit.getInstance().getPulsarClient().newProducer().enableBatching(true); final Producer<byte[]> producer = builder.topic(topic).create(); future.complete(producer); } catch (Exception e) { log.error("create producer exception ", e); future.completeExceptionally(e); } return future; } }
In this mode, streaming processing can be performed gracefully according to the returned completabilefuture<producer<byte[]>>.
Can accept sending of lost messages
final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic); cacheFuture.whenComplete((producer, e) -> { if (e != null) { log.error("create pulsar client exception ", e); return; } try { producer.sendAsync(msg).whenComplete(((messageId, throwable) -> { if (throwable != null) { log.error("send producer msg error ", throwable); return; } log.info("topic {} send success, msg id is {}", topic, messageId); })); } catch (Exception ex) { log.error("send async failed ", ex); } });
The above is the callback function to correctly handle Client creation failure and sending failure. However, in the production environment, Pulsar is not always available, and sending fails due to virtual machine failure and Pulsar service upgrade. At this time, if you want to ensure that the message is sent successfully, you need to retry the message sending.
Can tolerate transmission loss in extreme scenarios
final Timer timer = new HashedWheelTimer(); private void sendMsgWithRetry(String topic, byte[] msg, int retryTimes) { final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic); cacheFuture.whenComplete((producer, e) -> { if (e != null) { log.error("create pulsar client exception ", e); return; } try { producer.sendAsync(msg).whenComplete(((messageId, throwable) -> { if (throwable == null) { log.info("topic {} send success, msg id is {}", topic, messageId); return; } if (retryTimes == 0) { timer.newTimeout(timeout -> DemoPulsarDynamicProducerInit.this.sendMsgWithRetry(topic, msg, retryTimes - 1), 1 << retryTimes, TimeUnit.SECONDS); } log.error("send producer msg error ", throwable); })); } catch (Exception ex) { log.error("send async failed ", ex); } }); }
Here, after the sending fails, a backoff retry is performed, which can tolerate the pulsar server failure for a period of time. For example, if 7 times of backoff and the initial interval is 1s, the fault of 1+2+4+8+16+32+64=127s can be tolerated. This is enough for most production environments.
Theoretically, there is a fault exceeding 127s, so it is still necessary to return the failure to the upstream in extreme scenarios.
Producer Partition level strict order preservation
The key points of strict order preservation by producers: only one message is sent at a time, and the next message is sent after confirming that the sending is successful. Two modes, synchronous and asynchronous, can be used for implementation:
- The key point of the synchronization mode is to send the message circularly until the previous message is sent successfully, and then start the next message;
- The key point of the asynchronous mode is to observe the future of the previous message sending. If it fails, it keeps trying again. If it succeeds, it starts the next message sending.
It is worth mentioning that in this mode, partitions can be parallel, and orderedexecution or per partition per thread can be used.
Example of synchronization mode:
/** * @author hezhangjian */ @Slf4j public class DemoPulsarProducerSyncStrictlyOrdered { Producer<byte[]> producer; public void sendMsg(byte[] msg) { while (true) { try { final MessageId messageId = producer.send(msg); log.info("topic {} send success, msg id is {}", producer.getTopic(), messageId); break; } catch (Exception e) { log.error("exception is ", e); } } } }
consumer
Initialize consumer important parameters
receiverQueueSize
Note: if it cannot be processed, the consumption buffer queue will be overstocked in memory. Configure it reasonably to prevent OOM.
autoUpdatePartition
Automatically update partition information. If the partition information in topic remains unchanged, no configuration is required to reduce cluster consumption.
subscribeType
The subscription type is determined according to business requirements.
subscriptionInitialPosition
The starting position of the subscription. The first or last position is determined according to the business needs.
messageListener
Using the listener mode for consumption, you only need to provide a callback function and do not need to actively perform receive() pull. Generally, there is no special demand. It is recommended to use listener mode.
ackTimeout
When the server pushes a message but the consumer fails to reply to the ack in time, it will be pushed back to the consumer for processing after the ackTimeout, that is, the redeliver mechanism.
Note that when using the redeliver mechanism, you must pay attention to using only the retry mechanism to retry recoverable errors. For example, if the message is decoded in the code, it is not suitable to use the redeliver mechanism if the decoding fails. This will cause the client to keep trying again.
If you are not sure, you can also configure the dead letter queue through the deadLetterPolicy below to prevent messages from being retried all the time.
negativeAckRedeliveryDelay
When the client calls negativeAcknowledge, the time when the redeliver mechanism is triggered. The attention points of redeliver mechanism are the same as that of ackTimeout.
It should be noted that it is not recommended to use both ackTimeout and negativeAckRedeliveryDelay at the same time. Generally, it is recommended to use negativeAck. Users can have more flexible control rights. Once the ackTimeout configuration is unreasonable, messages may be unnecessarily retried when the consumption time is uncertain.
deadLetterPolicy
Configure the maximum number of redeliver s and dead letter topic.
Initialize consumer policy
The consumer can work only after the creation is successful. Unlike the producer, which can return failures upstream, the consumer should always try to create again. The sample code is as follows: Note: the relationship between consumer and topic can be one to many. Consumers can subscribe to multiple topics.
One thread per consumer, applicable to scenarios with a small number of consumers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarConsumerInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private final String topic; private volatile Consumer<byte[]> consumer; public DemoPulsarConsumerInit(String topic) { this.topic = topic; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); consumer = instance.getPulsarClient().newConsumer().topic(topic).messageListener(new DemoMessageListener<>()).subscribe(); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); } } public Consumer<byte[]> getConsumer() { return consumer; } }
One thread for multiple consumers, applicable to scenarios with a large number of consumers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarConsumersInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private CopyOnWriteArrayList<Consumer<byte[]>> consumers; private int initIndex; private List<String> topics; public DemoPulsarConsumersInit(List<String> topics) { this.topics = topics; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { if (initIndex == topics.size()) { return; } for (; initIndex < topics.size(); initIndex++) { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); final Consumer<byte[]> consumer = instance.getPulsarClient().newConsumer().topic(topics.get(initIndex)).messageListener(new DemoMessageListener<>()).subscribe(); consumers.add(consumer); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); break; } } } public CopyOnWriteArrayList<Consumer<byte[]>> getConsumers() { return consumers; } }
Consumer reaches semantics at least once
Use the manual reply ack mode to ensure that the processing is successful before ack. If the processing fails, you can retry by yourself or through the negativeAck mechanism
Example of synchronization mode
It should be noted here that if there is a large difference in message processing time, the synchronous processing method may prevent messages that could have been processed quickly from being processed.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerSyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { final boolean result = syncPayload(msg.getData()); if (result) { consumer.acknowledgeAsync(msg); } else { consumer.negativeAcknowledge(msg); } } catch (Exception e) { // Business methods may throw exceptions log.error("exception is ", e); consumer.negativeAcknowledge(msg); } } /** * Business method for simulating synchronous execution * @param msg Message body content * @return */ private boolean syncPayload(byte[] msg) { return System.currentTimeMillis() % 2 == 0; } }
Asynchronous mode example
In case of asynchronism, the memory limitation should be considered, because asynchronism can be consumed from the broker quickly and will not be blocked by business operations. In this way, there may be a lot of inflight messages. If the mode is Shared or KeyShared, it can be restricted through maxUnAckedMessage. In the Failover mode, you can block the pull messages when the following consumers are busy without business processing. You can block the processing by judging the number of inflight messages.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { asyncPayload(msg.getData(), new DemoSendCallback() { @Override public void callback(Exception e) { if (e == null) { consumer.acknowledgeAsync(msg); } else { log.error("exception is ", e); consumer.negativeAcknowledge(msg); } } }); } catch (Exception e) { // Business methods may throw exceptions consumer.negativeAcknowledge(msg); } } /** * Business method simulating asynchronous execution * @param msg Message body * @param demoSendCallback callback of asynchronous function */ private void asyncPayload(byte[] msg, DemoSendCallback demoSendCallback) { if (System.currentTimeMillis() % 2 == 0) { demoSendCallback.callback(null); } else { demoSendCallback.callback(new Exception("exception")); } } }
When the consumer is busy, the pull message is blocked and business processing is no longer performed
When the consumer is unable to handle it, the listener method is blocked to stop business processing. Avoid accumulating too many messages in the microservice to cause OOM. You can control the processing through RateLimiter or Semaphore.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { asyncPayload(msg.getData(), new DemoSendCallback() { @Override public void callback(Exception e) { if (e == null) { consumer.acknowledgeAsync(msg); } else { log.error("exception is ", e); consumer.negativeAcknowledge(msg); } } }); } catch (Exception e) { // Business methods may throw exceptions consumer.negativeAcknowledge(msg); } } /** * Business method simulating asynchronous execution * @param msg Message body * @param demoSendCallback callback of asynchronous function */ private void asyncPayload(byte[] msg, DemoSendCallback demoSendCallback) { if (System.currentTimeMillis() % 2 == 0) { demoSendCallback.callback(null); } else { demoSendCallback.callback(new Exception("exception")); } } }
Consumers strictly follow the partition to maintain order
In order to strictly maintain the order of partition level consumers, it is necessary to process the messages of a single partition. Once the processing fails, other messages of the partition cannot be processed until the message is retried successfully. Examples are as follows:
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerSyncAtLeastOnceStrictlyOrdered<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { retryUntilSuccess(msg.getData()); consumer.acknowledgeAsync(msg); } private void retryUntilSuccess(byte[] msg) { while (true) { try { final boolean result = syncPayload(msg); if (result) { break; } } catch (Exception e) { log.error("exception is ", e); } } } /** * Business method for simulating synchronous execution * * @param msg Message body content * @return */ private boolean syncPayload(byte[] msg) { return System.currentTimeMillis() % 2 == 0; } }
thank
thank Penghui and Luo Tian Review of the draft.
About the author
He Zhangjian Apache Pulsar Contributor, graduated from Xi'an University of Electronic Science and technology, is a senior engineer of Huawei cloud IOT. At present, Pulsar has been commercialized on a large scale in Huawei cloud IOT. You can visit him for more information Jianshu blog address.
Related links
Join Apache Pulsar Chinese exchange group 👇🏻
click link , view Apache Pulsar dry goods collection