Download and install
Download apache kafka2.11 and above Apache Kafka
After downloading, unzip it into this form
Change configuration (stand-alone is very simple)
For the /conf/ directory, create a new properties file called server.properties file. I will talk about the cluster separately later. That piece is a bit long but not suitable for the new development. Let’s turn Kafka up first.
server.properties
broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://127.0.0.1:9092 port=9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 host.name=127.0.0.1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
Explanation of key parameters:
- broker.id =0, there is only one kafka broker
- delete.topic.enable=true, if this value is not set, it is useless to run the delete topic command in kafka, and you need to go to zk to delete the topic;
- listeners=PLAINTEXT://127.0.0.1:9092, the listening address of kafka, this is the local machine, if the connection to kafka and kafka are not on the same server, then you need to replace 127.0.0.1 with this machine's here lan ip;
- port=9092, expose the address of the kafka service to the remote;
- log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs , not only the log log of kafka, but also the temporary information in the topic of kafka uses this directory, which is actually a log log + the path of the data file of kafka;
- num.partitions=3, whether it is a single-machine or multi-machine production cluster, the minimum number is 3. When we talk about the multi-machine production cluster environment in the later stage, we will describe the relationship among partitions, broker s, and consumer s in kafka in detail. Three concepts are important;
- zookeeper.connect=localhost:2181,localhost:2182,localhost:2183, must be specified. The information between kafka nodes is coordinated by zk;
- zookeeper.connection.timeout.ms =6000, ZK connection timeout, generally 2-6s;
zk cluster local construction
This is too simple, and I will not expand it in this blog post. There are too many examples on the Internet.
Here I directly give the full configuration used for the construction.
Set up 3 zoo.cfg first
They are named zoo1.cfg, zoo2.cfg, zoo3.cfg
zoo1.cfg
# The number of milliseconds of each tick #zk time unit (milliseconds) tickTime=5000 # The number of ticks that the initial # synchronization phase can take # Time limit for data from leader peers during floor startup #If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately. initLimit=50 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=6 dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk1 dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk1 clientPort=2181 server.1=localhost:2187:2887 server.2=localhost:2188:2888 server.3=localhost:2189:2889 #The leader does not accept client connections and focuses on communication and elections, etc. leaderServes=no maxClientCnxns=1000 forceSync=no
zoo2.cfg
# The number of milliseconds of each tick #zk time unit (milliseconds) tickTime=5000 # The number of ticks that the initial # synchronization phase can take # Time limit for data from leader peers during floor startup #If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately. initLimit=50 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=6 dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk2 dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk2 clientPort=2182 server.1=localhost:2187:2887 server.2=localhost:2188:2888 server.3=localhost:2189:2889 #The leader does not accept client connections and focuses on communication and elections, etc. leaderServes=no maxClientCnxns=1000 forceSync=no
zoo3.cfg
# The number of milliseconds of each tick #zk time unit (milliseconds) tickTime=5000 # The number of ticks that the initial # synchronization phase can take # Time limit for data from leader peers during floor startup #If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately. initLimit=50 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=6 dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk3 dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk3 clientPort=2183 server.1=localhost:2187:2887 server.2=localhost:2188:2888 server.3=localhost:2189:2889 #The leader does not accept client connections and focuses on communication and elections, etc. leaderServes=no maxClientCnxns=1000 forceSync=no
Start building ZK cluster
We are using zookeeper3.4.3
Enter the following command
touch data/zk1/myid touch data/zk2/myid touch data/zk3/myid
In this way, there will be an empty file called myid in each directory such as /data/zk1
Next, enter the following command
echo 1 > data/zk1/myid echo 2 > data/zk2/myid echo 3 > data/zk3/myid
start zk group
sh zkServer.sh start ../conf/zoo1.cfg sh zkServer.sh start ../conf/zoo2.cfg sh zkServer.sh start ../conf/zoo3.cfg
The following is a command to start and a command to shut down the entire zk group.
zk-start.sh
#!/bin/sh /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg
zk-stop.sh
#!/bin/sh /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg /Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg
run kafka
./kafka-server-start.sh -daemon ../config/server.properties
How to create a kafka topic and consumption?
kafka creates producer
## producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
After the command ends, you will get a command window.
Entering any content directly in this command window is equivalent to sending a send("what you entered") in mq. If there is a corresponding consumer connected to kafka, you can see the consumption generated.
kafka create messager
example:
producer
consumer
Attached, some changes in the kafka command in the new version of kafka (2. After the two-digit version)
Later, when I talk about kafka, I will talk about these commands one by one.
start up ./kafka-server-start.sh -daemon ../config/server.properties manage ## Create topic (4 partitions, 2 replicas) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test copy Inquire ## query cluster description bin/kafka-topics.sh --describe --zookeeper ## topic list query bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list ## topic list query (support version 0.9+) bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ## New consumer list query (supports version 0.9+) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list ## New consumer list query (supports version 0.10+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list ## Display the consumption details of a consumption group (only offset s stored in zookeeper are supported) bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test ## Display the consumption details of a consumption group (version 0.9 - before 0.10.1.0) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## Display consumption details of a consumption group (version 0.10.1.0+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group copy send and consume ## producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test ## consumer bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test ## New producer (supports version 0.9+) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties ## New consumers (supports version 0.9+) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties ## Use of advanced points bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10 copy balance leader bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot copy kafka Built-in pressure test command bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092 copy Partition expansion bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2 copy Migrate partitions 1. Create a rule json cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}] } EOF copy 2. implement bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute copy 3. verify bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify copy