Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.11.0.0
-
None
-
None
-
Linux
Description
Hi,
I am upgraded my 3 node kafka cluster from 0.8 to 0.11 broker. I am trying to test the new consumer APIs.
Below is the code extract. consumer.poll() method goes for a toss (thread dump attached) for consumer.subscribe() method . This poll returns value if I use consumer.seek() methods. Please let me know what i am doing incorrectly. i have the advertised.host and listeners updated okay in server.properties. Thread dump attached.
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092");
props1.put("group.id", "test3");
props1.put("enable.auto.commit", "false");
props1.put("auto_offset_reset", "earliest");
props1.put("request.timeout.ms", 30000);
props1.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props1.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
String TestTopic="T3";
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props1);
consumer1.subscribe(Arrays.asList(TestTopic));
int j = 0;
while (j < 10) {
j++;
ConsumerRecords<String, String> records1=consumer1.poll(100);
for (ConsumerRecord<String, String> record1 : records1)