Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Not A Bug
-
2.2.0
-
None
Description
I am trying to read from file and write to Kafka in google cloud kafka and getting following error:
org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since last report) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639) at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
.apply(KafkaIO.<String, String>_write_() .withBootstrapServers("ip1:9092,ip2:9092") .withTopic("feed") .withValueSerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class) //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT")) //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN")) .values() // writes values to Kafka with default key
Kafka is running on google cloud bitnami and I am using Flink runner
How do I pass security information to Kafka IO?