解决Kafka“Failed to send messages after 3 tries”错误

如果是在同一台机器(localhost),Kafka和Java Client工作正常,Producer和Consumer都能正常发送和接收消息,但是一旦部署到两台机器,则默认配置的话不能正常工作。会出现“kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries”的错误。


[2015-07-02 15:15:39,295] WARN Error while fetching metadata [{TopicMetadata for topic datadog-dev -> 
No partition metadata for topic datadog-dev due to kafka.common.LeaderNotAvailableException}] for topic [datadog-dev]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-07-02 15:15:39,295] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: datadog-dev (kafka.producer.async.DefaultEventHandler)
[2015-07-02 15:15:39,399] WARN Error while fetching metadata [{TopicMetadata for topic datadog-dev -> 
No partition metadata for topic datadog-dev due to kafka.common.LeaderNotAvailableException}] for topic [datadog-dev]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-07-02 15:15:39,399] ERROR Failed to send requests for topics datadog-dev with correlation ids in [9,16] (kafka.producer.async.DefaultEventHandler)
[2015-07-02 15:15:39,399] ERROR Error in handling batch of 4 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
  at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
  at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
  at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
  at scala.collection.immutable.Stream.foreach(Stream.scala:547)
  at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
  at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

解决方法其实很简单,只需要在Kafka的配置文件server.properties中,设置好主机名即可:

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=queue-server1

究其原因,其实从注释里,我们可以知道,这是一个指定broker的地址(严格来说是所监听的网络接口,或者网卡),同时,也可以看出它还和下面的属性相关。

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for “host.name” if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=

也就是说,producer和consumer是通过这个主机名(advertised.host.name)来连接broker的,而如果这个值没有设置,则会使用上面的host.name的值,如果上面的host.name也没有设置,则会使用java.net.InetAddress.getCanonicalHostName()获取的值。

从Zookeeper中可以看出,默认的时候该broker是localhost,从其他机器访问当然不可能成功。


get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1435818382516","host":"localhost","version":1,"port":9092}
cZxid = 0x12
ctime = Thu Jul 02 06:26:22 UTC 2015
mZxid = 0x12
mtime = Thu Jul 02 06:26:22 UTC 2015
pZxid = 0x12
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x14e4d71a0bc0000
dataLength = 86
numChildren = 0

参考资料

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan’tmyconsumers/producersconnecttothebrokers?

https://sathyatechblog.wordpress.com/2014/07/17/kafka-and-storm-commonly-faced-issues/



Posted in DevOps

无觅相关文章插件,快速提升流量