Implementation principle of Kafka for distributed message communication

What can message middleware do

The message middleware mainly solves the problem of message passing between distributed systems. It can shield the characteristics between various platforms and protocols, and realize the collaboration between applications. For a very simple example, let’s take the registration function of an e-commerce platform for a simple analysis. When a user registers for this service, it is not just about insert ing a piece of data into the database, but also needs to send an activation email, send a newcomer red envelope or A series of operations such as points, sending marketing text messages, etc. Suppose that each operation here takes 1s, then the entire registration process takes 4s to respond to the user.

However, we can see from the registration of this service that each sub-operation is relatively independent. At the same time, based on the division of the domain, sending activation emails, sending marketing text messages, gifting points and red envelopes all belong to different sub-domains. So we can perform asynchronous execution on these sub-operations, similar to the concept of multi-threaded parallel processing.
How to achieve asynchrony? Can it be done with multithreading? Of course, multithreading can be achieved, but the conditions of message persistence and message retransmission cannot be satisfied by multithreading. So you need to use some open source middleware to solve it. The distributed message queue is a very good solution. After the introduction of the distributed message queue, the architecture diagram becomes like this (the following figure is the scene of the asynchronous message queue). By introducing a distributed queue, the processing efficiency of the program can be greatly improved, and the coupling problem between various modules can also be solved.

Ø This is the first solution scenario of distributed message queue [asynchronous processing]

Let's expand on another scenario, where traffic shaping is implemented through distributed message queues. For example, in the seckill scenario of an e-commerce platform, the traffic will be very large. The problem of high traffic can be alleviated very well by means of message queues.

 

Ø Requests submitted by users are first written to the message queue. The message queue has a length. If the length of the message queue exceeds the specified length, it will be discarded directly.
Ø The specific core processing business of seckill, receiving the messages in the message queue for processing, the message processing capability here depends on the throughput of the consumer itself

Of course, there are more application scenarios for message middleware. For example, in the weakly consistent transaction model, the distributed message queue's maximum capability notification method can be used to realize the final consistency of data, etc.

Communication using kafka in Java

rely

<dependency> 
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId> 
     <version>2.0.0</version> 
 </dependency>
copy code

sender code

public class Producer extends Thread {
        private final KafkaProducer<Integer, String> producer;
        private final String topic;

        public Producer(String topic) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092");
            properties.put(ProducerConfig.CLIENT_ID_CONFIG, "practice-producer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
            consumer code
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            producer = new KafkaProducer<Integer, String>(properties);
            this.topic = topic;
        }

        @Override
        public void run() {
            int num = 0;
            while (num < 50) {
                String msg = "pratice test message:" + num;
                try {
                    producer.send(new ProducerRecord<Integer, String>(topic, msg)).get();
                    TimeUnit.SECONDS.sleep(2);
                    num++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }

        public static void main(String[] args) {
            new Producer("test").start();
        }
    }
copy code

consumer code

public class Consumer extends Thread {
        private final KafkaConsumer<Integer, String> consumer;
        private final String topic;

        public Consumer(String topic) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//Set offset to commit automatically
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// Auto commit interval 
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//For the current groupid, the offset of the message is consumed from the earliest message 
            consumer = new KafkaConsumer<>(properties);
            this.topic = topic;
        }

        @Override
        public void run() {
            while (true) {
                consumer.subscribe(Collections.singleton(this.topic));
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
                records.forEach(record -> {
                    System.out.println(record.key() + " " + record.value() + " -> offset:" + record.offset());
                });
            }
        }

        public static void main(String[] args) {
            new Consumer("test").start();
        }
    }
copy code

Asynchronous send

kafka can support synchronous and asynchronous transmission of messages. In the case demonstrated above, we send messages based on synchronization. Synchronous will need to block, and asynchronous does not need to wait for the blocking process.

Essentially, kafka uses an asynchronous way to send messages to the broker, but kafka does not send messages directly to the broker every time it sends a message, but puts the message in a sending queue, and then continuously through a background thread The message is taken out of the queue for sending, and the callback will be triggered after the sending is successful. The kafka client will accumulate a certain amount of messages and assemble them into a batch of messages and send them out. The trigger conditions are the batch.size and linger.ms mentioned above.

The method of synchronous sending is nothing more than waiting for the return result of the message sending through future.get(), but this method will seriously affect the performance of message sending.

public void run() {
        int num = 0;
        while (num < 50) {
            String msg = "pratice test message:" + num;
            try {
                producer.send(new ProducerRecord<>(topic, msg), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("callback: " + recordMetadata.offset() + "->" + recordMetadata.partition());
                    }
                });
                TimeUnit.SECONDS.sleep(2);
                num++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
copy code

batch.size

When the producer sends multiple messages to the same partition on the broker, in order to reduce the performance overhead caused by network requests, messages are submitted in batches. This parameter can be used to control the number of bytes submitted in batches. The default size is 16384byte, which is 16kb, means that when a batch of messages reaches the specified batch.size, they will be sent uniformly.

linger.ms

By default, the Producer will aggregate all the Requests collected in the two sending intervals and then send it again to improve throughput, and linger.ms is to add some delay to each request sent to the broker, in order to aggregate more Multiple Message requests. This is a bit like the Nagle algorithm in TCP. In the transmission of the TCP protocol, in order to reduce the transmission of a large number of small data packets, the Nagle algorithm is used, that is, the wait-stop protocol based on small packets.

batch.size and linker.ms are the key parameters for kafka performance optimization. Many students will find that batch Size and ringer The functions of MS and MS are the same. If both are configured, how does the wool cloth work? In fact, when both are configured, as long as one of the requirements is met, a request will be sent to the broker

Some basic configuration analysis

group.id

The consumer group is a scalable and fault-tolerant consumer mechanism provided by kafka. Since it is a group, there must be multiple consumers or consumer instances in the group, and they share a common ID, that is, the group ID. All consumers within the group coordinate to consume all partitions of subscribed topics. Of course, each partition can only be consumed by one consumer in the same consumer group. As shown in the figure below, there are three consumers, belonging to two different groups, then for the topic firstTopic, the two groups' Consumers can consume messages in this topic at the same time. For the architecture of this matter, this firstTopic is similar to the topic concept in ActiveMQ. As shown in the figure on the right, if the three consumers belong to the same group, then the firstTopic in this matter is the concept of a Queue.

enable.auto.commit

After the consumer consumes the message, it will be automatically submitted. Only after the message is submitted, the message will not be received again. You can also cooperate with auto.commit.interval.ms to control the frequency of automatic submission. Of course, we can also implement manual submission through consumer.commitSync().

auto.offset.reset

This parameter is for consumers in the new groupid. When a consumer with a new groupid consumes the specified topic, the configuration of this parameter will have different semantics.
In the case of auto.offset.reset=latest, the new consumer will start consuming messages under the Topic from the offset last consumed by other consumers
In the case of auto.offset.reset= earliest, new consumers will start consuming from the earliest message of the topic
In the case of auto.offset.reset=none, after a new consumer is added, an exception will be thrown directly because there is no offset before.

max.poll.records

This setting limits the number of messages returned per call to poll, making it easier to predict the maximum value to be processed per poll interval. By adjusting this value, the poll interval can be reduced.

Springboot+kafka

There is a comparison table between the version of springboot and the version of kafka. If the version is not imported according to the correct version, there will be a version problem leading to the problem of ClassNotFound. For details, please refer to spring.io/projects/sp...

jar package dependencies

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId> 
    <version>2.2.0.RELEASE</version> 
</dependency>
copy code

KafkaProducer

@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send() {
        kafkaTemplate.send("test", "msgKey", "msgData");
    }
}
copy code

KafkaConsumer

@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test"})
    public void listener(ConsumerRecord record) {
        Optional<?> msg = Optional.ofNullable(record.value());
        if (msg.isPresent()) {
            System.out.println(msg.get());
        }
    }
}
copy code

application configuration

spring.kafka.bootstrap-servers=192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.consumer.group-id=test-consumer-group 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.consumer.enable-auto-commit=true 

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 copy code

test

public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);
        KafkaProducer kafkaProducer = context.getBean(KafkaProducer.class);
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
copy code

Principle analysis

From the previous demonstration process, as long as kafka is not used on a large scale, there is basically no big problem. Otherwise, the operation and maintenance of kafka itself will be very challenging. At the same time, the tuning of each parameter is also very important. important. It is understood that the scale of Kafka cluster used by Kwai is quite large. They mentioned at the developer conference in 19 years that the total number of machines is about 2000; More than 30 clusters; 12000 topics; A total of about 200000 TP(topic partition); The total number of messages processed every day exceeds 4 trillion; The peak value exceeds 100 million Article source

The use of technology is the easiest. To master the core values, it is necessary to understand some principles. When designing this course, I thought about where to start for a long time, and finally chose to start from the most basic principles of message communication.

About Topic and Partition

Topic

In kafka, topic is a logical concept for storing messages, which can be considered as a message collection. Every message sent to the kafka cluster has a category. Physically, the messages of different topics are stored separately, and each topic can have multiple producers to send messages to it, and multiple consumers to consume the messages.

Partition

Each topic can be divided into multiple partitions (each topic has at least one partition), and different partitions under the same topic contain different messages. When each message is added to a partition, it will be assigned an offset (called an offset), which is the unique number of the message in this partition. kafka uses the offset to ensure the order of the message in the partition. The order of the offset is not Cross-partition, that is, kafka only guarantees that messages within the same partition are ordered.

In the figure below, for the topic named test, three partitions are made, namely p0, p1, and p2.
Ø When each message is sent to the broker, it will choose which partition to store to according to the partition rules. If the partition rules are set properly, all messages will be evenly distributed in different partitions, which is similar to the concept of database sharding and sharding, and the data is sharded.

Storage of Topic&Partition

Partitions are stored in the file system in the form of files. For example, if a topic named firsttopic is created with 3 partitions, then there are 3 directories in Kafka's data directory (/ TMP / Kafka log), firsttopic - 0 ~ 3, and the naming rule is < topic_ name>-<partition_ id> sh kafka-topics. sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic

About message distribution

kafka message distribution strategy

Message is the most basic data unit in kafka. In kafka, a message consists of two parts: key and value. When sending a message, we can specify this key, then the producer will judge the current message according to the key and partition mechanism Which partition should be sent and stored in. We can extend the producer's partition mechanism as needed.

code demo

Custom Partitioner

public class MyPartitioner implements Partitioner {
        private Random random = new Random();

        @Overridepublic
        int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            //Get all partition information of the specified topic in the cluster 
            List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);
            int numOfPartition = partitionInfos.size();
            int partitionNum = 0;
            if (o == null) {
                //key is not set 
                partitionNum = random.nextInt(numOfPartition); //Randomly assign partitions 
            } else {
                partitionNum = Math.abs((o1.hashCode())) % numOfPartition;
            }
            System.out.println("key->" + o + ",value->" + o1 + "->send to partition:" + partitionNum);
            return partitionNum;
        }
}
copy code

Send-side code to add custom partition

public KafkaProducerDemo(String topic, boolean isAysnc) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.gupaoedu.kafka.MyPa rtitioner");
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
        this.isAysnc = isAysnc;
    }
copy code

Message default distribution mechanism

By default, kafka uses the partition algorithm of hash modulo. If Key is null, a partition will be randomly assigned. This random is a random selection within the time range of this parameter "metadata.max.age.ms". For this time period, if the key is null, it will only send to the only partition. This value is updated every 10 minutes by default.

About Metadata, this has not been mentioned before. The simple understanding is the mapping relationship between topics/partitions and brokers. For each partition of each topic, you need to know what the corresponding broker list is, who is the leader, and who are the follower s. This information is stored in the Metadata class.

How does the consumer consume the specified partition

Through the following code, you can consume partition 0 under the specified topic. Data from other partitions cannot be received

//When consuming the specified partition, you do not need to subscribe again 
// kafkaConsumer.subscribe(Collections.singletonList(topic)); 
// Consume the specified partition 
TopicPartition topicPartition=new TopicPartition(topic,0); 
kafkaConsumer.assign(Arrays.asList(topicPartition));
copy code

Message consumption principle

kafka message consumption principle demonstration

In the actual production process, each topic will have multiple partitions. The advantage of multiple partitions is that, on the one hand, the data on the broker can be sharded to effectively reduce the message capacity and improve the io performance. On the other hand, in order to improve the consumption capacity of the consumer side, the same topic is generally consumed through multiple consumers, that is, the load balancing mechanism of the consumer side, which is what we will understand next, in the case of multiple partition s and multiple consumers Next, how consumers consume messages

kafka has the concept of consumer group, that is, consumers with the same group.id. These consumers belong to a consumer group, and all consumers in the group coordinate together to consume all partitions of the subscription topic. Of course, each partition can only be consumed by consumers in the same consumer group, so how do consumers in the same consumer group allocate the data in which partition to consume? As shown in the figure below, there are 3 partitions and 3 consumers, so which consumer divides which partition?

For the above picture, the three consumers will consume three partitions of the topic test, that is, each consumer consumes one partition.

There are many small partners who have been interviewing recently. For this, I have compiled a Java interview question manual: basic knowledge, JavaOOP, Java collection/generic interview questions,
Java exception interview questions, IO and NIO interview questions in Java, Java reflection, Java serialization, Java annotations, multithreading & concurrency, JVM, Mysql, Redis,
Memcached, MongoDB, Spring, SpringBoot, SpringCloud, RabbitMQ, Dubbo, MyBatis, ZooKeeper, data structures, algorithms,
Elasticsearch, Kafka, microservices, Linux, and more. You can share it with everyone to learn. [Continuously updating]How to get [999]You can get the information

 

Tags: Database Java data structure architecture Distribution Interview kafka

Posted by cheekychop on Wed, 31 Aug 2022 04:05:02 +0530