Introduction Case
Public Static Void Main (String [] ARGS) Throws ExecutionException, InterruptedException {
MAP <string, Object> Configs = New HashMap <string, Object> ();
// Specify the Broker address used in the initial connection
configs.put ("Bootstrap.Servers", "192.168.0.103:9092");
// Specify the serialization class of the key
configs.put ("Key.serializer", Integerserializer.class);
// Specify the serialization class of value
Configs.put ("Value.serializer", StringSerializer.class);
// configs.put ("act"); "all");
// configs.put ("Reties", "3");
KafkapRoducer <Integer, String> Producer = New Kafkaproducer <Integer, String> (Configs);
// Used to set the user -defined message header field
List <header> headers = new arraylist <header> ();
Headers.add (New RecordHeader ("BIZ.NAME", "Producer.Demo" .Getbytes ()));
Producerrecord <Integer, String> Record = New ProducerRold <Integer, String> (
"Topic_1",
0,
0,
"Hello Hqk 0",
headers
);
// Synchronous confirmation of message
FINAL FUTURE <RecordMetAdata> Future = Producer.Send (Record);
FINAL RecordMetAdata Metadata = Future.get ();
System.out.println ("Theme of the message:" + metadata.topic ());
System.out.println ("partition number of messages: + metadata.partition ());
System.out.println ("Message of Message:" + Metadata.Offset ());
// Close the producer
Producer.close ();
}
WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Check some of the possible causes:
1. Whether the Zookeeper configured in the Config directory in the Config directory is wrong
Whether the ports of
2.2181 and 9092 are occupied
3. Firewall needs to be closed
4. Connect the Zookeeper port is 2181
5. The parameter of the high version Kafka when creating the theme is not-zookeeper, but-Bootstrap-Server
The last discovery was not
need to be in Kafka’s /config/service.properties, and finally add a sentence to house.name = your own server IP
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
host.name=192.168.0.103
and then restart Kafka to connect again
[Main] Info org.apache.kafka.Clients.producer.producerconfig -Producerconfig Values:
Acks = 1
batch.size = 16384
bootstrap.servers = [192.168.0.103:9092]
buffer.Memory = 33554432
client.id =
compression.type = None
Connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
Key.serializer = class org.apache.kafkka.Common.Serialization.integerserializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.Recording.Level = Info
metrics.sample.window.ms = 30000
Partitioner.Class = class org.apache.kafkka.Clients.producer.internals.DefaultPartitioner
Receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
Retries = 0
Retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd =/usr/bin/kinit
SASL.KERBEROS.MIN.TIME.BEFORE.Rlogin = 6000
SASL.KERBEROS.SERVICE.NAME = NULL
SASL.KERBEROS.TICKET. Renew.jitter = 0.05
SASL.KERBEROS.TICKET. Renew.window.factor = 0.8
SASL.MeChanism = gssapi
security.protocol = plaintext
Send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSV1.2, TLSV1.1, TLSV1]
ssl.endpoint.Identification.algorithm = Null
ssl.Key.password = NULL
ssl.keyManager.algorithm = Sunx509
ssl.KeyStore.Location = NULL
ssl.KeyStore.password = NULL
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.prvider = null
ssl.secure.random.implementation = null
ssl.trustManager.algorithm = pkix
ssl.trustStore.Location = NULL
ssl.trustStore.password = NULL
ssl.trustStore.type = JKS
Transaction.Timeout.ms = 6000
Transactional.id = NULL
Value.serializer = Class org.apache.kafka.common.Serialization.StringSerializer
[Main] Info org.apache.kafkka.Common.utils.appinfoparser -Kafkka Version: 1.0.2
[Main] Info org.apache.kafkka.Common.utils.appinfoparser -Kafka Commitid: 2A121F7B1D402825
Theme of the message: Topic_1
The partition number of the message: 0
Disposal of messages: 8
Process Finished with EXIT CODE 0
Send it successfully