kafka builds a single machine development tutorial

Download and install

Download apache kafka2.11 and above Apache Kafka

After downloading, unzip it into this form

Change configuration (stand-alone is very simple)

For the /conf/ directory, create a new properties file called server.properties file. I will talk about the cluster separately later. That piece is a bit long but not suitable for the new development. Let’s turn Kafka up first.

server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9092
port=9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

 

Explanation of key parameters:

  • broker.id =0, there is only one kafka broker
  • delete.topic.enable=true, if this value is not set, it is useless to run the delete topic command in kafka, and you need to go to zk to delete the topic;
  • listeners=PLAINTEXT://127.0.0.1:9092, the listening address of kafka, this is the local machine, if the connection to kafka and kafka are not on the same server, then you need to replace 127.0.0.1 with this machine's here lan ip;
  • port=9092, expose the address of the kafka service to the remote;
  • log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs , not only the log log of kafka, but also the temporary information in the topic of kafka uses this directory, which is actually a log log + the path of the data file of kafka;
  • num.partitions=3, whether it is a single-machine or multi-machine production cluster, the minimum number is 3. When we talk about the multi-machine production cluster environment in the later stage, we will describe the relationship among partitions, broker s, and consumer s in kafka in detail. Three concepts are important;
  • zookeeper.connect=localhost:2181,localhost:2182,localhost:2183, must be specified. The information between kafka nodes is coordinated by zk;
  • zookeeper.connection.timeout.ms =6000, ZK connection timeout, generally 2-6s;

zk cluster local construction

This is too simple, and I will not expand it in this blog post. There are too many examples on the Internet.

Here I directly give the full configuration used for the construction.

Set up 3 zoo.cfg first

They are named zoo1.cfg, zoo2.cfg, zoo3.cfg

zoo1.cfg

# The number of milliseconds of each tick
#zk time unit (milliseconds)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# Time limit for data from leader peers during floor startup
#If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately.
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk1
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk1
clientPort=2181
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#The leader does not accept client connections and focuses on communication and elections, etc.
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo2.cfg

# The number of milliseconds of each tick
#zk time unit (milliseconds)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# Time limit for data from leader peers during floor startup
#If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately.
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk2
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk2
clientPort=2182
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#The leader does not accept client connections and focuses on communication and elections, etc.
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo3.cfg

# The number of milliseconds of each tick
#zk time unit (milliseconds)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# Time limit for data from leader peers during floor startup
#If the scale of the cluster is large and the amount of data is large, increase this parameter appropriately.
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk3
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk3
clientPort=2183
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#The leader does not accept client connections and focuses on communication and elections, etc.
leaderServes=no
maxClientCnxns=1000
forceSync=no

Start building ZK cluster

We are using zookeeper3.4.3

Enter the following command

touch data/zk1/myid
touch data/zk2/myid
touch data/zk3/myid

In this way, there will be an empty file called myid in each directory such as /data/zk1

Next, enter the following command

echo 1 > data/zk1/myid
echo 2 > data/zk2/myid
echo 3 > data/zk3/myid

start zk group

sh zkServer.sh start ../conf/zoo1.cfg
sh zkServer.sh start ../conf/zoo2.cfg
sh zkServer.sh start ../conf/zoo3.cfg

The following is a command to start and a command to shut down the entire zk group.

zk-start.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

zk-stop.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

run kafka

./kafka-server-start.sh -daemon ../config/server.properties

How to create a kafka topic and consumption?

kafka creates producer

## producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

After the command ends, you will get a command window.

Entering any content directly in this command window is equivalent to sending a send("what you entered") in mq. If there is a corresponding consumer connected to kafka, you can see the consumption generated.

kafka create messager

example:

producer

consumer

 

Attached, some changes in the kafka command in the new version of kafka (2. After the two-digit version)

Later, when I talk about kafka, I will talk about these commands one by one.

start up
./kafka-server-start.sh -daemon ../config/server.properties
 manage
## Create topic (4 partitions, 2 replicas)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
 copy
 Inquire
## query cluster description
bin/kafka-topics.sh --describe --zookeeper
 
## topic list query
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
 
## topic list query (support version 0.9+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
 
## New consumer list query (supports version 0.9+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
 
## New consumer list query (supports version 0.10+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
## Display the consumption details of a consumption group (only offset s stored in zookeeper are supported)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
 
## Display the consumption details of a consumption group (version 0.9 - before 0.10.1.0)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
 
## Display consumption details of a consumption group (version 0.10.1.0+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
 copy
 send and consume
## producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 
## consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
 
## New producer (supports version 0.9+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
 
## New consumers (supports version 0.9+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
 
## Use of advanced points
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10
 copy
 balance leader
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
 copy
kafka Built-in pressure test command
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
 copy
 Partition expansion
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
 copy
 Migrate partitions
1. Create a rule json
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF
 copy
2. implement
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
 copy
3. verify
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
 copy

Tags: Big Data Java flink kafka

Posted by lemin on Sun, 02 Oct 2022 17:42:22 +0530