Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-7249

[Regression] AvroReader: Could not parse incoming data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.12.0, 1.11.4
    • Extensions
    • None
    • Debian, Java 11 and Java 8

    Description

      Assessment

      AvroTypeUtil.convertUnionFieldValue
      

      has the following:

              Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
                      originalValue,
                      fieldSchema.getTypes().stream().filter(schema -> schema.getType() != Type.NULL).collect(Collectors.toList()),
                      subSchema -> AvroTypeUtil.determineDataType(subSchema)
              );
      

      which in turn has the following:

      DataType inferredDataType = inferDataType(value, null);
      

      which in turn has the following:

              if (value instanceof Map) {
                  final Map<String, ?> map = (Map<String, ?>) value;
      

      originalValue/value is a map extracted from an avro record that has Utf8 keys instead of String.
      The issue in general however is the fact that we are dealing with an avro-specific object where previously only NiFi-specific value objects were processed.

      There are multiple approaches to fix this:

      1. Consider this special case as a technical issue. We accept the fact that avro objects can leak into this layer and prepare it so it behaves as needed. I.e. transform the avro map to another where the keys are String objects.
      2. Consider this an error-handling issue. Inference can be treated as a best-effort attempt and in case of an error we can fall back to the original logic. Inference was added here to be able to choose the best matching type from a UNION/CHOICE. If inference doesn't yield a result, the original logic goes over all types within the UNION/CHOICE and selects the first compatible one. When a Map is in a UNION/CHOICE the other types will not pose compatibility issues so the original logic would work well.
        (1. and 2. are not mutually exclusive.)
      3. Enhance inference logic so that the avro object is converted to a general object before inference occurs. This would eliminate the avro (or other third-party specific) objects being able to leak into the framework's format-agnostic layer.

      Issue report

      Severe regression in Version 1.11.3, compared to 1.9.2:

      Record based processors cannot deserialize Avro messages any longer. Examples:

      • ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schema Registry
      • ConvertRecord: with embedded Avro schema or using Confluent Schema Registry, too
      • probably others as well...

      Error messages:

      ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d]
      Failed to process StandardFlowFileRecord[uuid=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,claim=StandardContentClaim 
      [resourceClaim=StandardResourceClaim[id=1584002690648-1091, container=default, section=67], offset=276387, length=3487]
      ,offset=0,name=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3487]; will route to failure:
      Could not parse incoming data
      ConsumeKafkaRecord_2_0[id=d9ebdbda-51b7-38ce-b43e-3197322bd2e1]
      Failed to parse message from Kafka using the configured Record Reader.
      Will route message as its own FlowFile to the 'parse.failure' relationship: org.apache.nifi.serialization.MalformedRecordException:
      Error while getting next record. Root cause: java.lang.ClassCastException
      

       

      However, the messages with enmbedded schema can flawlessly be converted to JSON using ConvertAvroToJson.

       

      The behavior has been confirmed using various different flows and configurations with different Java versions. A downgrade to Nifi 1.9.2 resolves the issue, a subsequent upgrade to 1.11.3 brings it back.

       

      Please find attached a minimal example template...

       

      Stack traces:

       

      2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4] org.apache.nifi.avro.AvroTypeUtil fail to convert field tags
      java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
              at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
              at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
              at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
              at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
              at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
              at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
              at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
              at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
              at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
              at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
              at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
              at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
              at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
              at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
              at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
              at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12 09:37:16,632 ERROR [Timer-Driven Process Thread-4] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process StandardFlowFileRecord[uuid=33856f9d-1991-4c95-90c2-3ffd032fc840,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1584005835899-1, container=default, section=1], offset=851, length=3487],offset=0,name=33856f9d-1991-4c95-90c2-3ffd032fc840,size=3487]; will route to failure: org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data
      org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data
              at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:171)
              at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
              at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
              at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
              at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
              at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
              at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
              at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
              at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
              at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
              at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
              ... 13 common frames omitted
      Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
              at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
              at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
              at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
              at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
              at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
              at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
              at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
              at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
              ... 15 common frames omitted
      

       

       

       

       

      Attachments

        1. AvroReader_bug_MWE.xml
          35 kB
          Philipp Leufke

        Issue Links

          Activity

            People

              mattyb149 Matt Burgess
              pleufke Philipp Leufke
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m