Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13705

OOM / memory-leak in KafkaIO.read using Confluent KafkaAvroDeserializer with SpecificRecord

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Not A Bug
    • 2.35.0
    • None
    • io-java-kafka
    • None

    Description

      Details - Trying to use a generated Avro SpecificRecord subclass with KafkaIO.read (I was able to use KafkaIO.write fine with it).

      Problem - OOM happens while constructing the deserializer with SpecificRecord, but not GenericRecord. I am unable to use my generated class because I get errors saying it cannot be cast to a GenericRecord (even though it extends/implements it though a chain of other classes)

      2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder (PipelineOptionsFactory.java:325) - Provided Arguments: {}
      2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) - Creating Pipeline#817686795
      2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795
      2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding KafkaIO.Read to Pipeline#817686795
      2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry (CoderRegistry.java:635) - Coder for [B: ByteArrayCoder
      java.lang.OutOfMemoryError: Java heap space
      Dumping heap to /tmp/beam-dump ...
      Heap dump file created [1086964638 bytes in 1.315 secs]
      Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247)
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200)
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637)
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670)
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654)
      	at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683)
      	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181)
      	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170)
      	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136)
      	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown Source)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120)
      	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116)
      	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476)
      	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256)
      	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605)
      	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      	at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
      	at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555)
      	at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529)
      	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      	at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
      	at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
      	at cricket.jmoore.jmx.Main.main(Main.java:98)
      

      Small example with Kafka and Confluent Schema Registry locally

        public static void main(String[] args) throws Exception {
      
          PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
      //    Pipeline p = getWritePipeline(options);
      
          Pipeline p = Pipeline.create(options);
      
          final String topic = "foobar-2";
          final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
          final String valueSubject = subjectStrategy.subjectName(topic, false, null); // schema not used
          final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord> valueProvider =
              ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", valueSubject, null,
                                                             // TODO: This doesn't seem to work to get the SpecificRecord subclass in the apply function below
                                                             ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true));
          p
              .apply(KafkaIO.<byte[], SpecificRecord>read()
                         .withBootstrapServers("localhost:9092")
                         .withTopic(topic)
                         .withKeyDeserializer(ByteArrayDeserializer.class) // Don't have any keys, but this is required
                         .withValueDeserializer(valueProvider)
                         .withConsumerConfigUpdates(ImmutableMap.of(
                             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
                             ConsumerConfig.GROUP_ID_CONFIG, "beam-" + UUID.randomUUID()
                         ))
                         .withoutMetadata()
              ).apply(Values.create())
              // TODO: How to get SpecificRecord subclass?
              .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
                @Override
                public Void apply(SpecificRecord input) {
                  log.info("{}", input);
                  return null;
                }
              }));
      
          p.run().waitUntilFinish();
        }
      

      Avro schema that I am using, which generates a class Product.java that I would like to use in-place of SpecificRecord above.

      {"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
      

      Beam Version: 2.35.0
      Confluent Version: 7.0.1 (error seems to come from here... will try to downgrade this)

      Dependencies:

              <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>2.8.1</version>
              </dependency>
              <dependency>
                  <groupId>io.confluent</groupId>
                  <artifactId>kafka-avro-serializer</artifactId>
                  <version>7.0.1</version>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.beam</groupId>
                  <artifactId>beam-runners-direct-java</artifactId>
                  <version>${beam.version}</version> <!-- 2.35.0 -->
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.beam</groupId>
                  <artifactId>beam-sdks-java-core</artifactId>
                  <version>${beam.version}</version>
              </dependency>
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            cricket007 Jordan Moore
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: