Details
-
Bug
-
Status: Resolved
-
Trivial
-
Resolution: Not A Problem
-
0.11.0.0
-
None
-
macOS 10.12
kafka 0.11.0.1
Description
Hi.
"org.apache.kafka.streams.errors.StreamsException" is thrown in following case.
Create topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic word-count-input
Create Kafka Streams Application
public class WordCountApp { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); ... ...
Ensure that it works fine
$ java -jar wordcount.jar KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b StreamsThread appId: wordcount-application ... ...
Change "partitions"
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic word-count-input Adding partitions succeeded!
When I start Application, StreamsException is thrown
$ java -jar wordcount.jar
KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
StreamsThread appId: wordcount-applicationn
StreamsThread clientId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522
StreamsThread threadId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1
Active tasks:
Running:
Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New:
Exception in thread "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
If I change the application id, Application works again.
Thank you.