# 服务启动和停止

kafka启动

JMX_PORT=9999 /app/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /app/kafka_2.12-2.5.0/config/server.properties
1

kafka停止

/app/kafka_2.12-2.5.0/bin/kafka-server-stop.sh stop
1

# 指定server

开发环境

  • --zookeeper 129.0.4.37:2181,129.0.4.38:2181,129.0.4.39:2181
  • --bootstrap-server 129.0.4.37:9092,129.0.4.38:9092,129.0.4.39:9092

测试环境:

  • --zookeeper 129.0.4.32:2181,129.0.4.33:2181,129.0.4.34:2181
  • --bootstrap-server 129.0.4.32:9092,129.0.4.33:9092,129.0.4.34:9092

# topic

# 查看当前服务器中所有topic

kafka-topics.sh --zookeeper 129.0.4.32:2181 --list
1

# 查看某个Topic的详情

kafka-topics.sh --describe --bootstrap-server 129.0.4.32:9092 --topic first
1

或者

kafka-topics.sh --describe --zookeeper 129.0.4.32:2181 --topic first
1

# 创建topic

topic的创建要通过zookpeer来创建

1个副本集,1个分区

kafka-topics.sh --zookeeper 129.0.4.32:2181 --create --replication-factor 1 --partitions 1 --topic first
1

3个副本集,10个分区

kafka-topics.sh --zookeeper 129.0.4.32:2181 --create --replication-factor 3 --partitions 10 --topic second
1

选项说明:

  • --topic 定义topic名称
  • --replication-factor 定义副本数(副本数不超过机器数)
  • --partitions 定义分区数(提高并发)

# 删除topic

kafka-topics.sh --zookeeper 129.0.4.32:2181 --delete --topic second
1

注:

  • 删除topic操作,需要server.properties中设置delete.topic.enable=true,否则只是标记删除。

# 修改分区数

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 6
1

# 消息

  • 生产消息命令:kafka-console-producer.sh
  • 消费消息命令:kafka-console-consumer.sh

# 生产者

发送消息

创建生产者长连接:

kafka-console-producer.sh --broker-list 129.0.4.32:9092 --topic first
1

发送消息到指定分区

# 消费者

消费者可以用不同的方式进行消费

# 默认消费模式

kafka-console-consumer.sh --bootstrap-server 129.0.4.32:9092 --topic first
1

# 从头消费

kafka-console-consumer.sh --bootstrap-server 129.0.4.32:9092 --from-beginning --topic first
1

# 指定group消费

group是消费者的最小单元

kafka-console-consumer.sh --bootstrap-server 129.0.4.32:9092 --group fltrp_nodejs_group --topic first
1

# 指定分区消费

kafka-console-consumer.sh --bootstrap-server 129.0.4.32:9092 --partition 0 --topic first
1

# 指定offset消费

指定offset时,必须指定分区

kafka-console-consumer.sh --bootstrap-server 129.0.4.32:9092 --partition 0 --offset 0 --topic first 
1

选项:

  • --from-beginning: 会把主题中以往所有的数据都读取出来
  • --group <String: consumer group id> The consumer group id of the consumer.
  • --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.
  • --offset <String: consume offset> The offset id to consume from (a non-negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest)

注意:

  • The partition is required when offset is specified.

# offset

# 查看group的消费进度

kafka-consumer-offset-checker --bootstrap-server localhost:9092 --group fltrp_nodejs_group  --topic first
1