2024/05/14

概念

硬件

kafka硬件指标包含磁盘吞吐量、网络吞吐量、内存、磁盘容量、cpu层面:

磁盘吞吐量

客户端发送消息,等待服务端消息写入结果。磁盘的写入效率越高,生成消息的延迟就越低;

磁盘容量

消息存储所占磁盘空间;

网络吞吐量

kafka处理数据流量的瓶颈,每秒可以处理流入流出多大的流量大小,如1MB/s,10MB/s

内存

消费者读取消息需要存储到缓存中。

CPU

客户端发送消息需要压缩处理,来降低网络传输的大小、速度;kafka服务端需要解压消息,设置偏移量,重新要数据压缩到磁盘存储;这需要一定的CPU计算能力。

高可用

集群

配置

broker集群

zookeeper.Connect=broker1.id,broker2.id

集群的数量如何评估?

  1. 集群需要多大的磁盘存储空间?单个broker的存储空间耗用?

    需要10TB的磁盘存储空间,单个broker可以存储2TB,集群的数量:10/2=5。如果启用了数据复制,集群数量:10/2*2=10

  2. 集群的处理能力?磁盘吞吐量、网络吞吐量、内存、CPU?

    通常与网络接口处理客户端流量的能力有关,特别是当有多个消费这存在或在数据保留期间流量发生波动时。如果单个broker的网络接口在高峰时段可以达到80%的使用量,并且有两个消费者,那么消费者无法保持峰值,就需要加broker。如果数据启用了复制功能,则需要把这个额外的消费者考虑在内。其他内存、磁盘吞吐量、cpu性能问题都可以通过扩展多个broker来解决。

架构模式

发布订阅

https://cloud.tencent.com/developer/article/1639449

常见命令

创建主题(topic)

# 127.0.0.1创建topic,10个分区
/opt/app/kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic test-topic --replication-factor 1 --partitions 10

另外配置了auto.create.topics.enble=true,kafka会在以下几种情形下自动创建主题。

  • 当一个生产者开始往主体写入消息时;
  • 当一个消费者开始从主题读取消息时;
  • 当任意一个客户端往主题发送原数据请求时;

发消息

/opt/app/kafka/bin/kafka-console-producer.sh  --topic  test-topic   --broker-list 127.0.0.1:9092
>{"test":"qi-sgssst027_ls","clientRealIp":"127.0.0.1","clientUa":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}    # 点击数据

查看topic数据量

/opt/app/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test-topic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

查看group消费某个topic的滞后数量

/opt/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe | awk '$1 == "test-topic" {sum += $3} END {print sum}'

/opt/app/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.16.30.206:9092 --topic click_reader_free --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

查看group消费某个topic的当前offset

/opt/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe | awk '$1 == "test_topic" {sum += $3} END {print sum}'

查看topic内容

/opt/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic --from-beginning

查看对应group的描述信息

/opt/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.16.30.206:9092 --group test-group --describe

查看topic信息、分区数量

/opt/app/kafka/bin/kafka-topics.sh --zookeeper  127.0.0.1:2181 --describe --topic click-reader-free-topic 

查看所有topic列表

/opt/app/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

查看所有group列表

/opt/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

获取topic指定时间戳的offset

/opt/app/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.16.30.207:9092  -topic send-book-ks -time 1608526800000

上述的-time,为毫秒值,-time=-1表示latest,-2表示earliest

zookeeper操作

1.删除Kafka日志文件

rm -rf /opt/app/kafka/logs/kafka-logs

2.登录Zookeeper

/opt/app/kafka/bin/zookeeper-shell.sh 172.16.30.207:2181

3.查看topic列表

ls /brokers/topics

4.删除topic

rmr /brokers/topics/click_reader_free
rmr /brokers/topics/cleantopic

5.Ctr + c退出Zookeeper 6.重置topic group的offset至最早的数据

/opt/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group --topic click_reader_free_topic --reset-offsets --to-earliest --execute

7.查看Kafka topic数据量和group消费情况

/opt/app/kafka/bin/kafka-topics.sh --delete --topic test-topic --zookeeper 127.0.0.1:2181

Docker-compose部署及kafka配置

部署,学习研究

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: on-failure:5
    volumes:
      - ./data/zookeeper:/data
    ports:
      - "2181:2181"
    networks:
      - backend
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: on-failure:1 # always、unless-stopped
    ports:
      - "9092:9092"
    depends_on: [ zookeeper ]
    environment: 
#    broker配置
      KAFKA_BROKER_ID: 0											  # broker标识
      KAFKA_ADVERTISED_PORT: 9092 							# 监听端口
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1     # 修改:宿主机IP (如不需要外网访问,设置成内网IP,否则设成外网IP)
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       # kafka运行是基于zookeeper的;用于保存broker元数据的Zookeeper地址是通过zookeeper.connnect来指定
      KAFKA_NUM_PARTITIONS: 3												# 指定新创建的主体将包含多少个分区(阿里云分区数量按6的倍数创建)
      KAFKA_LOG_RETENTION_HOURS: 120							# 配置数据可以保留的时间
      KAFKA_LOG_RETENTION_BYTES: 1000000000				# 另一种方式通过保留的消息字节数来判断过期 同时指定字节数、保留时间,任意条件满足,消息就会删除
      KAFKA_MESSAGE_MAX_BYTES: 10000000					  # 限制单个消息的大小	
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_DELETE_RETENTION_MS: 1000
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 # 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./data/kafka:/kafka
#      - /etc/localtime:/etc/localtime # mac error: is not shared from OS X and is not known to Docker.
    networks:
      - backend
networks: # 容器配置可保证容器在同一网络
  backend:

验证kafka是否可以使用

# 进入容器
$ docker exec -it kafka bash

# 进入卡夫卡命令 目录下
$ cd $KAFKA_HOME/bin/ #/opt/kafka_2.13-2.7.0/bin/

#  --broker-list localhost:9092 指定远程kafka实例地址
# 运行kafka生产者发送消息
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic mmr

# 发送消息
> {"datas":[{"channel":"","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

# --bootstrap-server localhost:9092 指定远程kafka实例地址
# 运行kafka消费者接收消息
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mmr --from-beginning

# 查看topic信息、分区数量
./kafka-topics.sh --bootstrap-server localhost:9092 --describe  --topic mmr
# or
./kafka-topics.sh --zookeeper zookeeper:2181 --describe  --topic mmr 

Search

    Table of Contents