Kafka
0
Windows安装Kafka真是问题多多
AccessDeniedException
[2021-11-18 17:15:09,038] ERROR Error while writing to checkpoint file E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115)
at java.nio.channels.FileChannel.open(FileChannel.java:287)
at java.nio.channels.FileChannel.open(FileChannel.java:335)
at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:916)
at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:114)
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
at kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:67)
at kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1(LogManager.scala:698)
at kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1$adapted(LogManager.scala:694)
at scala.Option.foreach(Option.scala:407)
at kafka.log.LogManager.checkpointLogStartOffsetsInDir(LogManager.scala:694)
at kafka.log.LogManager.$anonfun$shutdown$9(LogManager.scala:545)
at kafka.log.LogManager.$anonfun$shutdown$9$adapted(LogManager.scala:535)
at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at scala.collection.compat.MapExtensionMethods$.$anonfun$foreachEntry$1(PackageShared.scala:431)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at scala.collection.compat.MapExtensionMethods$.foreachEntry$extension(PackageShared.scala:431)
at kafka.log.LogManager.shutdown(LogManager.scala:535)
at kafka.server.KafkaServer.$anonfun$shutdown$18(KafkaServer.scala:701)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:701)
at kafka.server.KafkaServer.startup(KafkaServer.scala:435)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
[2021-11-18 17:15:09,040] ERROR Disk error while writing log start offsets checkpoint in directory E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs: Error while writing to checkpoint file E:\develop\kafka_2.12-3.0.0\tmp\kafka-logs\log-start-offset-checkpoint (kafka.log.LogManager)
这个问题就是启动失败,我们手动删除了日志目录导致,需要我们从zookeeper删除配置。
使用zkCli删除节点[admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
,当然最好是指定zookeeper的根节点可以一次全部删除,当然我没有就只能一个一个的删除了。
指定根节点
zookeeper.connect=localhost:2181/kafka
因为初次安装所以直接删除,其他情况删除需要谨慎操作。
InconsistentClusterIdException
[2021-11-18 17:20:31,698] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID ScLmrGv9QI2mzgXfIGShRA doesn't match stored clusterId Some(8n2fJuInRDSZSbXXJkmihg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:223)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
然后还是不行,这个3.0.0
实在是头疼。
只能回去下载一个2.8.x
版本了,启动成功,啥问题都没有。
监控
配合JMX提供监控
内外网访问
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://:19092,EXTERNAL://:9092
advertised.listeners=INTERNAL://内网IP:19092,EXTERNAL://外网IP:9092
inter.broker.listener.name=INTERNAL
docker-compose配置内外网访问
kafka:
container_name: kafka
image: bitnami/kafka:2.8.1
restart: always
privileged: true
links:
- zookeeper
ports:
- 9092:9092
- 9093:9093
volumes:
- /etc/localtime:/etc/localtime:ro
- $PWD/kafka:/bitnami/kafka
depends_on:
- zookeeper
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 0
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_LISTENERS: CLIENT://:9092,EXTERNAL://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: CLIENT://kafka:9092,EXTERNAL://192.168.1.100:9093
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: CLIENT
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
注意:配置INTERNAL
以后,内网只能通过这个端口访问,不能通过外网端口访问。
常用命令
# 查看主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 查看主题分区信息
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --describe
# 修改分区
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic -alter --partitions 3
# 创建主题
# partitions = (broker < 12) ? (broker * 2) : 12(影响并发数量)
# replication-factor:至少两个,一般三个,最多四个。
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --create --partitions 3 --replication-factor 3
# 删除主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic topic --delete
# 修改副本:increase-replication-factor.json
{
"partitions": [{
"topic": "topic",
"partition": 0,
"replicas": [0, 1]
},
{
"topic": "topic",
"partition": 1,
"replicas": [0, 2]
},
{
"topic": "topic",
"partition": 2,
"replicas": [1, 2]
}],
"version": 1
}
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --verify
kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --execute