What can message middleware do
The message middleware mainly solves the problem of message passing between distributed systems. It can shield the characteristics between various platforms and protocols, and realize the collaboration between applications. For a very simple example, let’s take the registration function of an e-commerce platform for a simple analysis. When a user registers for this service, it is not just about insert ing a piece of data into the database, but also needs to send an activation email, send a newcomer red envelope or A series of operations such as points, sending marketing text messages, etc. Suppose that each operation here takes 1s, then the entire registration process takes 4s to respond to the user.
However, we can see from the registration of this service that each sub-operation is relatively independent. At the same time, based on the division of the domain, sending activation emails, sending marketing text messages, gifting points and red envelopes all belong to different sub-domains. So we can perform asynchronous execution on these sub-operations, similar to the concept of multi-threaded parallel processing.
How to achieve asynchrony? Can it be done with multithreading? Of course, multithreading can be achieved, but the conditions of message persistence and message retransmission cannot be satisfied by multithreading. So you need to use some open source middleware to solve it. The distributed message queue is a very good solution. After the introduction of the distributed message queue, the architecture diagram becomes like this (the following figure is the scene of the asynchronous message queue). By introducing a distributed queue, the processing efficiency of the program can be greatly improved, and the coupling problem between various modules can also be solved.
Ø This is the first solution scenario of distributed message queue [asynchronous processing]
Let's expand on another scenario, where traffic shaping is implemented through distributed message queues. For example, in the seckill scenario of an e-commerce platform, the traffic will be very large. The problem of high traffic can be alleviated very well by means of message queues.
Ø Requests submitted by users are first written to the message queue. The message queue has a length. If the length of the message queue exceeds the specified length, it will be discarded directly.
Ø The specific core processing business of seckill, receiving the messages in the message queue for processing, the message processing capability here depends on the throughput of the consumer itself
Of course, there are more application scenarios for message middleware. For example, in the weakly consistent transaction model, the distributed message queue's maximum capability notification method can be used to realize the final consistency of data, etc.
Communication using kafka in Java
rely
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> copy code
sender code
public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; public Producer(String topic) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "practice-producer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); consumer code properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<Integer, String>(properties); this.topic = topic; } @Override public void run() { int num = 0; while (num < 50) { String msg = "pratice test message:" + num; try { producer.send(new ProducerRecord<Integer, String>(topic, msg)).get(); TimeUnit.SECONDS.sleep(2); num++; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public static void main(String[] args) { new Producer("test").start(); } } copy code
consumer code
public class Consumer extends Thread { private final KafkaConsumer<Integer, String> consumer; private final String topic; public Consumer(String topic) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//Set offset to commit automatically properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// Auto commit interval properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//For the current groupid, the offset of the message is consumed from the earliest message consumer = new KafkaConsumer<>(properties); this.topic = topic; } @Override public void run() { while (true) { consumer.subscribe(Collections.singleton(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); records.forEach(record -> { System.out.println(record.key() + " " + record.value() + " -> offset:" + record.offset()); }); } } public static void main(String[] args) { new Consumer("test").start(); } } copy code
Asynchronous send
kafka can support synchronous and asynchronous transmission of messages. In the case demonstrated above, we send messages based on synchronization. Synchronous will need to block, and asynchronous does not need to wait for the blocking process.
Essentially, kafka uses an asynchronous way to send messages to the broker, but kafka does not send messages directly to the broker every time it sends a message, but puts the message in a sending queue, and then continuously through a background thread The message is taken out of the queue for sending, and the callback will be triggered after the sending is successful. The kafka client will accumulate a certain amount of messages and assemble them into a batch of messages and send them out. The trigger conditions are the batch.size and linger.ms mentioned above.
The method of synchronous sending is nothing more than waiting for the return result of the message sending through future.get(), but this method will seriously affect the performance of message sending.
public void run() { int num = 0; while (num < 50) { String msg = "pratice test message:" + num; try { producer.send(new ProducerRecord<>(topic, msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("callback: " + recordMetadata.offset() + "->" + recordMetadata.partition()); } }); TimeUnit.SECONDS.sleep(2); num++; } catch (InterruptedException e) { e.printStackTrace(); } } } copy code
batch.size
When the producer sends multiple messages to the same partition on the broker, in order to reduce the performance overhead caused by network requests, messages are submitted in batches. This parameter can be used to control the number of bytes submitted in batches. The default size is 16384byte, which is 16kb, means that when a batch of messages reaches the specified batch.size, they will be sent uniformly.
linger.ms
By default, the Producer will aggregate all the Requests collected in the two sending intervals and then send it again to improve throughput, and linger.ms is to add some delay to each request sent to the broker, in order to aggregate more Multiple Message requests. This is a bit like the Nagle algorithm in TCP. In the transmission of the TCP protocol, in order to reduce the transmission of a large number of small data packets, the Nagle algorithm is used, that is, the wait-stop protocol based on small packets.
batch.size and linker.ms are the key parameters for kafka performance optimization. Many students will find that batch Size and ringer The functions of MS and MS are the same. If both are configured, how does the wool cloth work? In fact, when both are configured, as long as one of the requirements is met, a request will be sent to the broker
Some basic configuration analysis
group.id
The consumer group is a scalable and fault-tolerant consumer mechanism provided by kafka. Since it is a group, there must be multiple consumers or consumer instances in the group, and they share a common ID, that is, the group ID. All consumers within the group coordinate to consume all partitions of subscribed topics. Of course, each partition can only be consumed by one consumer in the same consumer group. As shown in the figure below, there are three consumers, belonging to two different groups, then for the topic firstTopic, the two groups' Consumers can consume messages in this topic at the same time. For the architecture of this matter, this firstTopic is similar to the topic concept in ActiveMQ. As shown in the figure on the right, if the three consumers belong to the same group, then the firstTopic in this matter is the concept of a Queue.
enable.auto.commit
After the consumer consumes the message, it will be automatically submitted. Only after the message is submitted, the message will not be received again. You can also cooperate with auto.commit.interval.ms to control the frequency of automatic submission. Of course, we can also implement manual submission through consumer.commitSync().
auto.offset.reset
This parameter is for consumers in the new groupid. When a consumer with a new groupid consumes the specified topic, the configuration of this parameter will have different semantics.
In the case of auto.offset.reset=latest, the new consumer will start consuming messages under the Topic from the offset last consumed by other consumers
In the case of auto.offset.reset= earliest, new consumers will start consuming from the earliest message of the topic
In the case of auto.offset.reset=none, after a new consumer is added, an exception will be thrown directly because there is no offset before.
max.poll.records
This setting limits the number of messages returned per call to poll, making it easier to predict the maximum value to be processed per poll interval. By adjusting this value, the poll interval can be reduced.
Springboot+kafka
There is a comparison table between the version of springboot and the version of kafka. If the version is not imported according to the correct version, there will be a version problem leading to the problem of ClassNotFound. For details, please refer to spring.io/projects/sp...
jar package dependencies
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> copy code
KafkaProducer
@Component public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void send() { kafkaTemplate.send("test", "msgKey", "msgData"); } } copy code
KafkaConsumer
@Component public class KafkaConsumer { @KafkaListener(topics = {"test"}) public void listener(ConsumerRecord record) { Optional<?> msg = Optional.ofNullable(record.value()); if (msg.isPresent()) { System.out.println(msg.get()); } } } copy code
application configuration
spring.kafka.bootstrap-servers=192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer copy code
test
public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args); KafkaProducer kafkaProducer = context.getBean(KafkaProducer.class); for (int i = 0; i < 3; i++) { kafkaProducer.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } copy code
Principle analysis
From the previous demonstration process, as long as kafka is not used on a large scale, there is basically no big problem. Otherwise, the operation and maintenance of kafka itself will be very challenging. At the same time, the tuning of each parameter is also very important. important. It is understood that the scale of Kafka cluster used by Kwai is quite large. They mentioned at the developer conference in 19 years that the total number of machines is about 2000; More than 30 clusters; 12000 topics; A total of about 200000 TP(topic partition); The total number of messages processed every day exceeds 4 trillion; The peak value exceeds 100 million Article source
The use of technology is the easiest. To master the core values, it is necessary to understand some principles. When designing this course, I thought about where to start for a long time, and finally chose to start from the most basic principles of message communication.
About Topic and Partition
Topic
In kafka, topic is a logical concept for storing messages, which can be considered as a message collection. Every message sent to the kafka cluster has a category. Physically, the messages of different topics are stored separately, and each topic can have multiple producers to send messages to it, and multiple consumers to consume the messages.
Partition
Each topic can be divided into multiple partitions (each topic has at least one partition), and different partitions under the same topic contain different messages. When each message is added to a partition, it will be assigned an offset (called an offset), which is the unique number of the message in this partition. kafka uses the offset to ensure the order of the message in the partition. The order of the offset is not Cross-partition, that is, kafka only guarantees that messages within the same partition are ordered.
In the figure below, for the topic named test, three partitions are made, namely p0, p1, and p2.
Ø When each message is sent to the broker, it will choose which partition to store to according to the partition rules. If the partition rules are set properly, all messages will be evenly distributed in different partitions, which is similar to the concept of database sharding and sharding, and the data is sharded.
Storage of Topic&Partition
Partitions are stored in the file system in the form of files. For example, if a topic named firsttopic is created with 3 partitions, then there are 3 directories in Kafka's data directory (/ TMP / Kafka log), firsttopic - 0 ~ 3, and the naming rule is < topic_ name>-<partition_ id> sh kafka-topics. sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic
About message distribution
kafka message distribution strategy
Message is the most basic data unit in kafka. In kafka, a message consists of two parts: key and value. When sending a message, we can specify this key, then the producer will judge the current message according to the key and partition mechanism Which partition should be sent and stored in. We can extend the producer's partition mechanism as needed.
code demo
Custom Partitioner
public class MyPartitioner implements Partitioner { private Random random = new Random(); @Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //Get all partition information of the specified topic in the cluster List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s); int numOfPartition = partitionInfos.size(); int partitionNum = 0; if (o == null) { //key is not set partitionNum = random.nextInt(numOfPartition); //Randomly assign partitions } else { partitionNum = Math.abs((o1.hashCode())) % numOfPartition; } System.out.println("key->" + o + ",value->" + o1 + "->send to partition:" + partitionNum); return partitionNum; } } copy code
Send-side code to add custom partition
public KafkaProducerDemo(String topic, boolean isAysnc) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo"); properties.put(ProducerConfig.ACKS_CONFIG, "-1"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.gupaoedu.kafka.MyPa rtitioner"); producer = new KafkaProducer<Integer, String>(properties); this.topic = topic; this.isAysnc = isAysnc; } copy code
Message default distribution mechanism
By default, kafka uses the partition algorithm of hash modulo. If Key is null, a partition will be randomly assigned. This random is a random selection within the time range of this parameter "metadata.max.age.ms". For this time period, if the key is null, it will only send to the only partition. This value is updated every 10 minutes by default.
About Metadata, this has not been mentioned before. The simple understanding is the mapping relationship between topics/partitions and brokers. For each partition of each topic, you need to know what the corresponding broker list is, who is the leader, and who are the follower s. This information is stored in the Metadata class.
How does the consumer consume the specified partition
Through the following code, you can consume partition 0 under the specified topic. Data from other partitions cannot be received
//When consuming the specified partition, you do not need to subscribe again // kafkaConsumer.subscribe(Collections.singletonList(topic)); // Consume the specified partition TopicPartition topicPartition=new TopicPartition(topic,0); kafkaConsumer.assign(Arrays.asList(topicPartition)); copy code
Message consumption principle
kafka message consumption principle demonstration
In the actual production process, each topic will have multiple partitions. The advantage of multiple partitions is that, on the one hand, the data on the broker can be sharded to effectively reduce the message capacity and improve the io performance. On the other hand, in order to improve the consumption capacity of the consumer side, the same topic is generally consumed through multiple consumers, that is, the load balancing mechanism of the consumer side, which is what we will understand next, in the case of multiple partition s and multiple consumers Next, how consumers consume messages
kafka has the concept of consumer group, that is, consumers with the same group.id. These consumers belong to a consumer group, and all consumers in the group coordinate together to consume all partitions of the subscription topic. Of course, each partition can only be consumed by consumers in the same consumer group, so how do consumers in the same consumer group allocate the data in which partition to consume? As shown in the figure below, there are 3 partitions and 3 consumers, so which consumer divides which partition?
For the above picture, the three consumers will consume three partitions of the topic test, that is, each consumer consumes one partition.
There are many small partners who have been interviewing recently. For this, I have compiled a Java interview question manual: basic knowledge, JavaOOP, Java collection/generic interview questions,
Java exception interview questions, IO and NIO interview questions in Java, Java reflection, Java serialization, Java annotations, multithreading & concurrency, JVM, Mysql, Redis,
Memcached, MongoDB, Spring, SpringBoot, SpringCloud, RabbitMQ, Dubbo, MyBatis, ZooKeeper, data structures, algorithms,
Elasticsearch, Kafka, microservices, Linux, and more. You can share it with everyone to learn. [Continuously updating]How to get [999]You can get the information