Kafka

0

Windows安装Kafka真是问题多多

AccessDeniedException

[2021-11-18 17:15:09,038] ERROR Error while writing to checkpoint file E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs
        at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
        at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
        at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
        at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)
        at java.nio.channels.FileChannel.open(FileChannel.java:287)
        at java.nio.channels.FileChannel.open(FileChannel.java:335)
        at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:916)
        at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:114)
        at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
        at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:67)
        at kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1(LogManager.scala:698)
        at kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1$adapted(LogManager.scala:694)
        at scala.Option.foreach(Option.scala:407)
        at kafka.log.LogManager.checkpointLogStartOffsetsInDir(LogManager.scala:694)
        at kafka.log.LogManager.$anonfun$shutdown$9(LogManager.scala:545)
        at kafka.log.LogManager.$anonfun$shutdown$9$adapted(LogManager.scala:535)
        at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at scala.collection.compat.MapExtensionMethods$.$anonfun$foreachEntry$1(PackageShared.scala:431)
        at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
        at scala.collection.compat.MapExtensionMethods$.foreachEntry$extension(PackageShared.scala:431)
        at kafka.log.LogManager.shutdown(LogManager.scala:535)
        at kafka.server.KafkaServer.$anonfun$shutdown$18(KafkaServer.scala:701)
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:701)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:435)
        at kafka.Kafka$.main(Kafka.scala:109)
        at kafka.Kafka.main(Kafka.scala)
[2021-11-18 17:15:09,040] ERROR Disk error while writing log start offsets checkpoint in directory E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs: Error while writing to checkpoint file E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs\log-start-offset-checkpoint (kafka.log.LogManager)

这个问题就是启动失败,我们手动删除了日志目录导致,需要我们从zookeeper删除配置。

使用zkCli删除节点[admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification],当然最好是指定zookeeper的根节点可以一次全部删除,当然我没有就只能一个一个的删除了。

指定根节点

zookeeper.connect=localhost:2181/kafka

因为初次安装所以直接删除,其他情况删除需要谨慎操作。

InconsistentClusterIdException

[2021-11-18 17:20:31,698] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID ScLmrGv9QI2mzgXfIGShRA doesn't match stored clusterId Some(8n2fJuInRDSZSbXXJkmihg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:223)
        at kafka.Kafka$.main(Kafka.scala:109)
        at kafka.Kafka.main(Kafka.scala)

然后还是不行,这个3.0.0实在是头疼。
只能回去下载一个2.8.x版本了,启动成功,啥问题都没有。

监控

配合JMX提供监控

内外网访问

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://:19092,EXTERNAL://:9092
advertised.listeners=INTERNAL://内网IP:19092,EXTERNAL://外网IP:9092
inter.broker.listener.name=INTERNAL

docker-compose配置内外网访问

  kafka:
    container_name: kafka
    image: bitnami/kafka:2.8.1
    restart: always
    privileged: true
    links:
      - zookeeper
    ports:
      - 9092:9092
      - 9093:9093
    volumes:
      - /etc/localtime:/etc/localtime:ro
      - $PWD/kafka:/bitnami/kafka
    depends_on:
      - zookeeper
    environment:
      TZ: Asia/Shanghai
      KAFKA_BROKER_ID: 0
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://192.168.1.100:9093
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT

注意:配置INTERNAL以后,内网只能通过这个端口访问,不能通过外网端口访问。

常用命令

# 查看主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 查看主题分区信息
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --describe
# 修改分区
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic -alter --partitions 3
# 创建主题
# partitions = (broker < 12) ? (broker * 2) : 12(影响并发数量)
# replication-factor:至少两个,一般三个,最多四个。
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --create --partitions 3 --replication-factor 3
# 删除主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --delete
# 修改副本:increase-replication-factor.json
{
	"partitions": [{ 
		"topic": "topic",
		"partition": 0,
		"replicas": [0, 1]
	},
	{ 
		"topic": "topic",
		"partition": 1,
		"replicas": [0, 2]
	},
	{ 
		"topic": "topic",
		"partition": 2,
		"replicas": [1, 2]
	}],
	"version": 1
}
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --verify
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --execute