Zookeeper cluster + Kafka cluster

Overview of zookeeper

Zookeeperl is an open source distributed Apache project that provides coordination services for distributed frameworks.

Zookeeper working mechanism

Zookeeper is understood from the perspective of design mode: it is a distributed service management framework designed based on the observer mode. It is responsible for storing and managing the data that everyone cares about, and then accepts the registration of observers. Once the status of these data changes, Zookeeper will Will be responsible for notifying those observers already registered with Zookeeper to react accordingly. That is to say, zookeeper = file system + notification mechanism.

Zookeeper Features

  • Zookeeper: A cluster consisting of a leader (Leader) and multiple followers (Fo1lower).

  • As long as more than half of the nodes in the zookeepe r cluster survive, the ookeeper cluster can serve normally. So zookeeper is suitable for installing odd number of servers.

  • Global data consistency: Each server saves an identical copy of the data, and the data is consistent no matter which server the Client connects to.

  • Update requests are executed sequentially, and update requests from the same Client are executed in the order in which they are sent, that is, first-in, first-out.

  • Data update atomicity, a data update either succeeds or fails.

  • Real-time, within a certain time range, the Client can read the latest data.

Zookeeper data structure

The structure of the ZooKeeper data model is very similar to the Linux file system. It can be regarded as a tree as a whole, and each node is called a zNode. Each zNode can store 1MB of data by default, and each zNode can be uniquely identified by its path.

zookeeper application scenarios

The services provided include: unified naming service, unified configuration management, unified cluster management, dynamic online and offline of server nodes, soft load balancing, etc.

  • Unified Naming Service

In a distributed environment, it is often necessary to uniformly name applications/services for easy identification. For example: IP is not easy to remember, but domain name is easy to remember.

  • Unified configuration management

(1) In a distributed environment, configuration file synchronization is very common. It is generally required that in a cluster, the configuration information of all nodes is consistent, such as a Kafka cluster. After modifying the configuration file, it is hoped that it can be quickly synchronized to (2) The configuration management can be implemented by zooKeeper. Configuration information can be written to a znode on ZooKeeper. Each client server listens to this znode. zooKeeper will notify each client server once the data in the znode is modified.

  • Unified cluster management

(1) In a distributed environment, it is necessary to grasp the status of each node in real time. Some adjustments can be made based on the real-time status of the node.
(2) zooKeeper can realize real-time monitoring of node state changes. Node information can be written to a zNode on ZooKeeper. Listen to this zNode to get its real-time state changes.

  • The server goes online and offline dynamically

The client can gain real-time insight into the changes of the server online and offline.

zookeeper election mechanism

The first election mechanism was launched

  • Server 1 starts and initiates an election. Server 1 votes for itself. At this time, the number of votes for server 1 is one vote, and it is not more than half (3 votes), the election cannot be completed, and the status of server 1 remains LOOKING:
  • Server 2 starts and another election is initiated. Servers 1 and 2 cast their own votes and exchange vote information: At this time, server 1 finds that the myid of server 2 is larger than the one it currently voted for (server 1), and changes the vote to vote for server 2. At this time, server 1 has 0 votes, and server 2 has 2 votes. If there is no more than half of the results, the election cannot be completed, and the status of servers 1 and 2 remains LOOKING.
  • Server 3 starts and an election is initiated. At this point both servers 1 and 2 will change their votes to server 3. The voting results: 0 votes for server 1, 0 votes for server 2, and 3 votes for server 3. At this point, the number of votes for server 3 has exceeded half, and server 3 is elected as the Leader. Server 1, 2 change state to FOLLOWING, server 3 changes state to LEADING;
  • Server 4 starts and an election is initiated. At this time, servers 1, 2, and 3 are no longer in the LOOKING state, and the vote information will not be changed. Result of exchanging vote information: 3 votes for server 3 and 1 vote for server 4. At this time, server 4 obeys the majority, changes the vote information to server 3, and changes the status to FOLLOWING;
  • Server 5 is started, and the same as 4, it is the younger brother.

Not the first time to start the election mechanism

  • When a server in the zooKeeper cluster has one of the following two conditions, it will start to enter the Leader election:

    • Initialization of each outfit
    • The connection to the Leader cannot be maintained while the server is running.
  • When a machine enters the Leader election process, the current cluster may also be in the following two states:

    • There is already a Leader in the cluster
      For a situation where a Leader already exists, when the machine tries to elect a Leader, it will be informed of the Leader information of the current server. For this machine, it only needs to establish a connection with the Leader machine and perform state synchronization.
    • The Leader does not exist in the cluster.
      Suppose that zooKeeper consists of 5 servers, the SIDs are 1, 2, 3, 4, and 5, and the ZXID s are 8, 8, 8, 7, and 7, and the server with sID 3 is the Leader. At some point, servers 3 and 5 fail, so the Leader election begins.

Election Leader Rules

  • The big EPOCH wins outright

  • The EPOCH is the same, and the transaction id is larger. 3. The transaction id is the same, and the server id is larger.

<span style="color:#333333"><span style="background-color:#Ffffff "> <code class=" language undefined ">sid: server ID. it is used to uniquely identify a machine in the middle of a ZooKeeper cluster. Each machine cannot be duplicated and is consistent with myid 

ZXID: affairs ID. ZXID is a transaction ID,Used to identify a server state change. At some point, each machine in the cluster has ZXID The values ​​are not necessarily exactly the same, which is the same as ZooKeeper It is related to the processing logic speed of the server for client update request".

Epoch: each Leader The term code. No Leader At the same time, the logical clock value in the same round of voting process is the same. This data will increase every time you cast a vote
</code></span></span>

Deploy the Zookeeper cluster

Prepare 3 servers for Zookeeper cluster

192.168.239.40
192.168.239.50
192.168.239.60

turn off firewall

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#008000">#All nodes execute</span>
systemctl stop firewalld
systemctl <span style="color:#0000ff">disable</span> firewalld
setenforce 0
</code></span></span>

Install JDK

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#008000">#Non minimum installation is generally provided by itself</span>
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
</code></span></span>

Download the installation package

Official download address: Index of /dist/zookeeper

<span style="color:#333333"><span style="background-color:#Ffffff "> <code class=" language bash "> I have installed it
<span style="color:#0000ff">cd</span> /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
</code></span></span>

Install Zookeeper

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /opt
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz
<span style="color:#0000ff">mv</span> apache-zookeeper-3.6.3-bin /usr/local/zookeeper-3.6.3
</code></span></span>

Modify configuration files (all nodes)

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /usr/local/zookeeper-3.6.3/conf/
<span style="color:#0000ff">cp</span> zoo_sample.cfg zoo.cfg

vim zoo.cfg
tickTime=2000   <span style="color:#008000">#Communication heartbeat time, heartbeat time between Zookeeper server and client, unit: ms</span>
initLimit=10    <span style="color:#008000">#The maximum number of heartbeats (the number of ticktimes) that leaders and followers can tolerate when initially connecting is expressed here as 10*2s</span>
syncLimit=5     <span style="color:#008000">#The timeout of synchronous communication between Leader and Follower, which means that if it exceeds 5*2s, Leader thinks Follower is dead and deletes Follower from the server list</span>
dataDir=/usr/local/zookeeper-3.6.3/data      ●Modify, specify to save Zookeeper The directory of the data in the directory, the directory needs to be created separately
dataLogDir=/usr/local/zookeeper-3.6.3/logs   ●Add, specify the directory to store the log, the directory needs to be created separately
clientPort=2181   <span style="color:#008000">#Client connection port</span>
<span style="color:#008000">#Add cluster information</span>
server.1=192.168.239.40:3188:3288
server.2=192.168.239.50:3188:3288
server.3=192.168.239.60:3188:3288
<span style="color:#008000">#Port 3188 is used for cluster node communication, and port 3288 is used for leader election</span>
-------------------------------------------------------------------------------------

server.A=B:C:D
●A is a number indicating which server this is. In cluster mode, you need to zoo.cfg in dataDir Create a file in the specified directory myid,There is a data in this file which is A the value of , Zookeeper Read this file at startup, get the data inside and zoo.cfg Compare the configuration information inside to determine which one it is server. 
●B is the address of this server.
●C is this server Follower with the cluster Leader The port on which the server exchanges information.

●D is in case the cluster Leader The server hangs and needs a port to re-elect and elect a new one Leader,And this port is the port used by the servers to communicate with each other when the election is performed.
</code></span></span>

Copy the configured Zookeeper configuration file to other machines

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">scp zoo.cfg 192.168.239.50:`<span style="color:#0000ff">pwd</span>`
scp zoo.cfg 192.168.239.60:`<span style="color:#0000ff">pwd</span>`
</code></span></span>

Create a data directory and log directory on each node

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bahs">mkdir /usr/local/zookeeper-3.6.3/data
mkdir /usr/local/zookeeper-3.6.3/logs
</code></span></span>

Create a file myid in the directory specified by dataDir of each node

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bahs">echo 1 > /usr/local/zookeeper-3.6.3/data/myid
echo 2 > /usr/local/zookeeper-3.6.3/data/myid
echo 3 > /usr/local/zookeeper-3.6.3/data/myid
</code></span></span>

Configure the Zookeeper startup script

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">vim /etc/init.d/zookeeper
<span style="color:#008000">#!/bin/bash</span>
<span style="color:#008000">#chkconfig:2345 20 90</span>
<span style="color:#008000">#description:Zookeeper Service Control Script</span>
ZK_HOME=<span style="color:#a31515">'/usr/local/zookeeper-3.6.3'</span>
<span style="color:#0000ff">case</span> <span style="color:#008000">$1</span> <span style="color:#0000ff">in</span>
start)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- zookeeper start------------“</span>
	<span style="color:#008000">$ZK_HOME</span>/bin/zkServer.sh start
;;
stop)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- zookeeper stop------------“</span>
	<span style="color:#008000">$ZK_HOME</span>/bin/zkServer.sh stop
;;
restart)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- zookeeper restart------------“</span>
	<span style="color:#008000">$ZK_HOME</span>/bin/zkServer.sh restart
;;
status)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- zookeeper status------------“</span>
	<span style="color:#008000">$ZK_HOME</span>/bin/zkServer.sh status
;;
*)
    <span style="color:#0000ff">echo</span> <span style="color:#a31515">"Usage: <span style="color:#008000">$0</span> {start|stop|restart|status}"</span>
<span style="color:#0000ff">esac</span>
<span style="color:#008000">#Transfer service control scripts to other nodes</span>
scp /etc/init.d/zookeeper 192.168.239.40:/etc/init.d/
scp /etc/init.d/zookeeper 192.168.239.50:/etc/init.d/
<span style="color:#008000">#Set startup and self startup</span>
<span style="color:#0000ff">chmod</span> +x /etc/init.d/zookeeper
chkconfig --add zookeeper

<span style="color:#008000">#Start zookeeper respectively</span>
service zookeeper start

<span style="color:#008000">#View current status</span>
service zookeeper status
</code></span></span>

Kafka overview

Why do you need Message Queuing (MQ)

The main reason is that in a high-concurrency environment, synchronous requests are too late to process, and requests are often blocked. For example, a large number of requests concurrently access the database, resulting in row lock table locks, and finally too many request threads will accumulate, which will trigger a too many connection error and cause an avalanche effect.
We use message queues to ease the pressure on the system by processing requests asynchronously. Message queues are often used in scenarios such as asynchronous processing, traffic peak shaving, application decoupling, and message communication.

The most common middleware

Middleware for web applications: nginx, tomcat, apache, haproxy, squid, varnish.

MQ message queue middleware: ActiveMQ, RabbitMQ, RocketMQ, Kafka, redis, etc.

Benefits of using message queues

  • decoupling
    Allows you to extend or modify both processes independently, as long as they adhere to the same interface constraints.
  • recoverability
    When a part of the system fails, it does not affect the entire system. Message queues reduce the coupling between processes, so even if a process processing a message hangs, messages added to the queue can still be processed after the system is restored.
  • buffer
    Helps to control and optimize the speed of data flow through the system, and solve the inconsistency of the processing speed of production and consumption messages.
  • Flexibility & peak processing power
    The application still needs to continue to function in the event of a surge in traffic, but such bursts of traffic are uncommon. It would be a huge waste to invest resources on standby at any time based on the ability to handle such peak visits. The use of message queues enables key components to withstand sudden access pressures without completely crashing due to sudden overloaded requests.
  • Asynchronous communication
    Many times the user does not want or need to process the message immediately. Message queues provide asynchronous processing mechanisms that allow users to put a message on the queue, but not process it immediately. Put as many messages as you want into the queue, and then process them when needed.

Two modes of message queue

  • Peer-to-peer mode (one-to-one, consumers actively pull data, and messages are cleared after messages are received)
    • The message producer produces the message and sends it to the message queue, and then the message consumer takes the message from the message queue and consumes the message. After the message is consumed, there is no more storage in the message queue, so it is impossible for the message consumer to consume the message that has been consumed. The message queue supports the existence of multiple consumers, but for a message, only one consumer can consume it.

  • Publish/subscribe mode (one-to-many, also known as observer mode, consumers will not clear messages after consuming data)
    • A message producer (publishing) publishes a message to a topic, and multiple message consumers (subscribing) consume the message at the same time. Unlike the peer-to-peer approach, messages published to a topic are consumed by all subscribers. The publish/subscribe pattern is to define a one-to-many dependency between objects, so that whenever the state of an object (target object) changes, all objects that depend on it (observer objects) will be notified and automatically updated.

Kafka Definition

  • Kafka is a distributed message queue (MQ, Message Queue) based on the publish/subscribe mode, which is mainly used in the field of real-time processing of big data.

Introduction to Kafka

Kafka was originally developed by Linkedin. It is a distributed, partition-supported, replica-based distributed messaging middleware system coordinated by Zookeeper. Its biggest feature is that it can process large amounts of data in real time. To meet various demand scenarios, such as hadoop-based batch system, low-latency real-time system, Spark/Flink streaming engine, nginx access log, message service, etc., written in scala language, Linkedin contributed to 2010 Apache Foundation and become a top open source project.

Features of Kafka

High throughput, low latency

  • Kafka can process hundreds of thousands of messages per second, and its latency is as low as a few milliseconds. Each topic can be divided into multiple Partitions, and the Consumer Group consumes pPartition s to improve load balancing and consumption capabilities.

Scalability

  • kafka cluster supports hot expansion

Durability and reliability

  • Messages are persisted to local disk, and data backup is supported to prevent data loss

fault tolerance

  • Allow nodes in the cluster to fail (in the case of multiple copies, if the number of copies is n, n-1 nodes are allowed to fail)

High concurrency

  • Support thousands of clients to read and write at the same time

Kafka system architecture

Broker

  • A kafka server is a broker. A cluster consists of multiple broker s. A broker can hold multiple topic s.

topic

  • It can be understood as a queue, and both producers and consumers are oriented to a topic.
    Similar to the table name of the database or the index of ES

    Messages of physically different topic s are stored separately

partition

  • In order to achieve scalability, a very large topic can be distributed to multiple broker s (ie servers), a topic can be divided into one or more partitions, and each partition is an ordered queue. Kafka only guarantees that records within a partition are ordered, not the order of different partitions in a topic.

Partation Data Routing Rules

  • If patition is specified, use it directly;

  • If the patition is not specified but the key is specified (equivalent to an attribute in the message), a patition is selected by hash modulo the value of the key;

  • Neither patition nor key is specified, use polling to select a patition.

    • Each message will have an auto-incrementing number that identifies the offset of the message, starting from 0.
    • The data in each partition is stored using multiple segment files.
    • If a topic has multiple partition s, the order of data cannot be guaranteed when consuming data. In scenarios where the order of consumption of messages is strictly guaranteed (for example, commodity flash sales and red envelope grabbing), the number of partition s needs to be set to 1.
  • The broker stores topic data. If a topic has N partitions and the cluster has N brokers, then each broker stores a partition of the topic.

  • If a topic has N partitions and the cluster has (N+M) brokers, then N brokers store one partition of the topic, and the remaining M brokers do not store the topic's partition data.

  • If a topic has N partitions and the number of brokers in the cluster is less than N, then one broker stores one or more partitions of the topic. In the actual production environment, try to avoid this situation, which can easily lead to unbalanced data in the Kafka cluster.

Reason for partition

  • It is convenient to expand in the cluster, each Partition can be adjusted to suit the machine where it is located, and a topic can be composed of multiple Partitions, so the entire cluster can adapt to data of any size;
  • Concurrency can be improved because it can be read and written in Partition units.

Replica

  • Replica. In order to ensure that when a node in the cluster fails, the partition data on that node will not be lost, and kafka can still continue to work, kafka provides a replica mechanism. Each partition of a topic has several replicas, a leader and several follower s

Leader

  • Each partition has multiple replicas, and only one of them is the Leader. The Leader is the partition that is currently responsible for reading and writing data.

Follower

  • Followers follow the Leader, all write requests are routed through the Leader, data changes are broadcast to all Followers, and Followers keep data synchronization with the Leader. Follower is only responsible for backup, not for reading and writing data.
    If the Leader fails, a new Leader is elected from the followers.
    When a Follower hangs, gets stuck or the synchronization is too slow, the Leader will delete the Follower from the ISR (a Follower set maintained by the Leader that is synchronized with the Leader) list, and create a new Follower.

Producer

  • The producer is the publisher of the data, and this role push es the message to the topic of Kafka.
    After the broker receives the message sent by the producer, the broker appends the message to the segment file currently used for appending data.
    The messages sent by the producer are stored in a partition, and the producer can also specify the partition for data storage.

Consumer

  • Consumers can pull data from the broker. Consumers can consume data from multiple topic s.

Consumer Group(CG)

  • A consumer group, consisting of multiple consumers.
    All consumers belong to a certain consumer group, that is, a consumer group is logically a subscriber. A group name can be specified for each consumer. If no group name is specified, it belongs to the default group.
    Concentrating multiple consumers to process the data of a certain Topic can improve the data consumption ability faster.
    Each consumer in the consumer group is responsible for consuming data from different partitions. One partition can only be consumed by one consumer in the group, preventing data from being read repeatedly.
    Consumer groups do not affect each other.

offset offset

  • A message can be uniquely identified.
    The offset determines the position of reading data, and there is no thread safety problem. Consumers use the offset to determine the next message to be read (ie, the consumption position).
    After the message is consumed, it is not deleted immediately, so that multiple businesses can reuse Kafka messages.
    A certain service can also achieve the purpose of re-reading the message by modifying the offset, and the offset is controlled by the user.
    Messages will eventually be deleted, and the default lifetime is 1 week (7*24 hours).

Zookeeper

  • Kafka uses Zookeeper to store the meta information of the cluster. Since the consumer may experience failures such as power failure and downtime during the consumption process, after the consumer recovers, it needs to continue to consume from the position before the failure, so the consumer needs to record which offset it consumes in real time, so that it can continue to consume after the failure is restored. Before Kafka version 0.9, the consumer saved the offset in Zookeeper by default; starting from version 0.9, the consumer saved the offset in a built-in topic in Kafka by default, the topic is __consumer_offsets. That is to say, the role of zookeeper is that when the producer push es data to the kafka cluster, it is necessary to find out where the nodes of the kafka cluster are. These are all found through zookeeper. Which data a consumer consumes also needs the support of zookeeper. The offset is obtained from zookeeper, and the offset records where the last consumed data is consumed, so that the next data can be consumed.

Deploy kafka cluster

Download the installation package

Official download address: Apache Kafka

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
</code></span></span>

Install Kafka

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /opt/
tar zxvf kafka_2.13-2.7.1.tgz
<span style="color:#0000ff">mv</span> kafka_2.13-2.7.1 /usr/local/kafka
</code></span></span>

Modify the configuration file

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /usr/local/kafka/config/
<span style="color:#0000ff">cp</span> server.properties{,.bak}

vim server.properties
broker.id=0                           
<span style="color:#008000">#Line 21, the globally unique number of the broker. Each broker cannot be repeated, so you need to configure the broker on other machines id=1,broker. id=2</span>
listeners=PLAINTEXT://192.168.239.40:9092    
<span style="color:#008000">#Line 31, specify the IP and port to listen to. If you modify the IP of each broker, you can also keep the default configuration without modifying it</span>
num.network.threads=3    <span style="color:#008000">#Line 42. The number of threads that the broker processes network requests. Generally, it does not need to be modified</span>
num.io.threads=8         <span style="color:#008000">#45 lines. The number of threads used to process disk IO should be greater than the number of hard disks</span>
socket.send.buffer.bytes=102400       <span style="color:#008000">#Line 48, buffer size of sending socket</span>
socket.receive.buffer.bytes=102400    <span style="color:#008000">#Line 51, buffer size of receive socket</span>
socket.request.max.bytes=104857600    <span style="color:#008000">#Line 54, buffer size of the request socket</span>
log.dirs=/usr/local/kafka/logs        <span style="color:#008000">#60 lines, the path where kafka operation logs are stored, is also the path where data is stored</span>
num.partitions=1    <span style="color:#008000">#Line 65. The default number of partitions of topic on the current broker will be overwritten by the specified parameters when topic is created</span>
num.recovery.threads.per.data.dir=1    <span style="color:#008000">#69 lines, the number of threads used to recover and clean data</span>
log.retention.hours=168    <span style="color:#008000">#In line 103, the maximum retention time of the segment file (data file), in hours, defaults to 7 days, and the timeout will be deleted</span>
log.segment.bytes=1073741824    <span style="color:#008000">#110 lines. The maximum size of a segment file is 1G by default. If it exceeds, a new segment file will be created</span>
zookeeper.connect=192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181    
<span style="color:#008000">#Line 123, configure the address of the Zookeeper cluster</span>

<span style="color:#0000ff">mkdir</span> /usr/local/kafka/logs
</code></span></span>

Transfer kafka to other nodes

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /usr/local
scp -r kafka/ 192.168.236.50:`<span style="color:#0000ff">pwd</span>`
scp -r kafka/ 192.168.236.60:`<span style="color:#0000ff">pwd</span>
</code></span></span>

Modify other node configuration files

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#008000">#50 nodes</span>
<span style="color:#0000ff">cd</span> /usr/local/kafka/config/
 vim server.properties
<span style="color:#008000">#Modify the globally unique number of the 21 line broker</span>
 broker.id=1
<span style="color:#008000">#Modify the listening address in line 31</span>
listeners=PLAINTEXT://192.168.239.50:9092

<span style="color:#008000">#60 nodes</span>
<span style="color:#0000ff">cd</span> /usr/local/kafka/config/
<span style="color:#008000">#Modify the globally unique number of the 21 line broker</span>
 broker.id=2
<span style="color:#008000">#Modify the listening address in line 31</span>
listeners=PLAINTEXT://192.168.239.60:9092
</code></span></span>

Modify environment variables (all nodes)

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">cd</span> /usr/local/kafka/bin
<span style="color:#0000ff">ls</span>
vim /etc/profile
<span style="color:#0000ff">export</span> KAFKA_HOME=/usr/local/kafka
<span style="color:#0000ff">export</span> PATH=<span style="color:#008000">$PATH</span>:<span style="color:#008000">$KAFKA_HOME</span>/bin
<span style="color:#0000ff">source</span> /etc/profile
</code></span></span>

Writing Zookeeper service control scripts

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">vim /etc/init.d/kafka
<span style="color:#008000">#!/bin/bash</span>
<span style="color:#008000">#chkconfig:2345 22 88</span>
<span style="color:#008000">#description:Kafka Service Control Script</span>
KAFKA_HOME=<span style="color:#a31515">'/usr/local/kafka'</span>
<span style="color:#0000ff">case</span> <span style="color:#008000">$1</span> <span style="color:#0000ff">in</span>
start)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- Kafka start------------“</span>
	<span style="color:#008000">${KAFKA_HOME}</span>/bin/kafka-server-start.sh -daemon <span style="color:#008000">${KAFKA_HOME}</span>/config/server.properties
;;
stop)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- Kafka stop------------“</span>
	<span style="color:#008000">${KAFKA_HOME}</span>/bin/kafka-server-stop.sh
;;
restart)
	<span style="color:#008000">$0</span> stop
	<span style="color:#008000">$0</span> start
;;
status)
	<span style="color:#0000ff">echo</span> <span style="color:#A3515 ">" ---------- Kafka status------------“</span>
	count=$(ps -ef | grep kafka | egrep -cv <span style="color:#a31515">"grep|$$"</span>)
	<span style="color:#0000ff">if</span> [ <span style="color:#a31515">"<span style="color:#008000">$count</span>"</span> -eq 0 ];<span style="color:#0000ff">then</span>
        <span style="color:#0000ff">echo</span> <span style="color:#a31515">"kafka is not running"</span>
    <span style="color:#0000ff">else</span>
        <span style="color:#0000ff">echo</span> <span style="color:#a31515">"kafka is running"</span>
    <span style="color:#0000ff">fi</span>
;;
*)
    <span style="color:#0000ff">echo</span> <span style="color:#a31515">"Usage: <span style="color:#008000">$0</span> {start|stop|restart|status}"</span>
<span style="color:#0000ff">esac</span>
</code></span></span>

Transfer the service control script to other nodes

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">scp /etc/init.d/kafka 192.168.239.50:/etc/init.d/
scp /etc/init.d/kafka 192.168.239.60:/etc/init.d/
</code></span></span>

Set up auto-start and start Kafka (all nodes)

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash"><span style="color:#0000ff">chmod</span> +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start
ps -ef | grep kafka  <span style="color:#008000">#Check whether the service is started</span>
</code></span></span>

Kafka Command Line Operations

create topic

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-topics.sh --create --zookeeper 192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181 --replication-factor 2 --partitions 3 --topic ky18

<span style="color:#008000">#--Zoomeeper: define the zoomeeper cluster server address. If there are multiple IP addresses separated by commas, generally use one IP</span>
<span style="color:#008000">#--Replication factor: defines the number of partition replicas. 1 represents a single replica, and 2 is recommended</span>
<span style="color:#008000">#--Partitions: define the number of partitions</span>
<span style="color:#008000">#--Topic: define topic name</span>
</code></span></span>

View all topic s in the current server

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-topics.sh --list --zookeeper 192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181 
</code></span></span>

View details of a topic

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-topics.sh  --describe --zookeeper 192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181 
</code></span></span>

make an announcement

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-console-producer.sh --broker-list 192.168.239.40:9092,192.168.239.50:9092,192.168.239.60:9092  --topic ky18
</code></span></span>

Consuming messages

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-console-consumer.sh --bootstrap-server 192.168.239.40:9092,192.168.239.50:9092,192.168.239.60:9092 --topic ky18 --from-beginning
<span style="color:#008000">#--From beginning: it will read all the previous data in the topic</span>
</code></span></span>

Modify the number of partitions

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-topics.sh --zookeeper 192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181 --alter --topic ky18 --partitions 6
</code></span></span>

delete topic

<span style="color:#333333"><span style="background-color:#ffffff"><code class="language-bash">kafka-topics.sh --delete --zookeeper 192.168.239.40:2181,192.168.239.50:2181,192.168.239.60:2181 --topic ky18
</code></span></span>

Summarize

kafka architecture

broker: kafka server A kafka consists of multiple brokers

topic: A message queue producer and consumer are both topic-oriented

producer: The producer push pushes the message data to the broker's topic

consumer: The consumer pull s the message data from the broker's topic

partition: Partition a topic can be divided into one or more partition partitions, which are used to speed up the transmission of messages (read and write) copies backup the partition. The Leader is responsible for reading and writing, and the follower is responsible for the backup

The message data in the partition is ordered. The partitions are out of order. Only one partition can be used in scenarios that require ordering such as spike red packets.

offset: Offset records the location where the consumer consumes the message, and records where the data consumed last time by the consumer is there, so that the next data can continue to be consumed

zookeeper: save the source information of the kafka cluster and save the offset

zookeeper combined with kafka: when the producer pushes data to the kafka cluster, it needs to address the location of kafka through zk, and which data the consumer consumes also needs zk support, because the offset can be obtained from zk

Tags: Linux Operation & Maintenance kafka

Posted by toyartist on Fri, 22 Jul 2022 01:07:27 +0530