Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31708

RuntimeException/KryoException thrown when deserializing an empty protobuf record

    XMLWordPrintableJSON

Details

    Description

      Problem description

      I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:

      java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf
              at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
              at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
              at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
              at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121)
              at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319)
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
              at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127)
              at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
              at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
              ... 16 common frames omitted
       

      How to reproduce

      I think this is similar to another issue: FLINK-29347.

      Follwing is an example to reproduce the problem:

      package com.test;
      
      import com.test.ProtobufGeneratedClass;
      
      import com.google.protobuf.Message;
      import com.twitter.chill.protobuf.ProtobufSerializer;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.restartstrategy.RestartStrategies;
      import org.apache.flink.api.common.state.MapStateDescriptor;
      import org.apache.flink.api.common.time.Time;
      import org.apache.flink.api.java.utils.MultipleParameterTool;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.datastream.BroadcastStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
      import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
      import org.apache.flink.util.Collector;
      import org.apache.flink.util.OutputTag;
      
      import java.util.Random;
      @Slf4j
      public class app {
        public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 =
            new OutputTag<ProtobufGeneratedClass>("output-tag-1") {
        };
      
        public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 =
            new OutputTag<ProtobufGeneratedClass>("output-tag-2") {
        };
      
        public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 =
            new OutputTag<ProtobufGeneratedClass>("output-tag-3") {
        };
      
        public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> {
          Random rnd = new Random();
          private final String name;
      
          private boolean running = true;
      
          private MySourceFunction(String name) {
            this.name = name;
          }
      
          @Override
          public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception {
            final int index = getRuntimeContext().getIndexOfThisSubtask();
            int counter = 0;
      
            while (running) {
              synchronized (sourceContext.getCheckpointLock()) {
                ++counter;
                ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder();
                if (rnd.nextBoolean()) {
      
                  builder.addGraphIds(rnd.nextInt(10));
                  byte[] bytes;
                  if (rnd.nextInt(10) == 1) {
                    // make sure record is large enough to reproduce the problem
                    // in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated
                    bytes = new byte[rnd.nextInt(5000000)];
                  } else if (rnd.nextInt(10) == 2) {
                    bytes = new byte[rnd.nextInt(50000)];
                  } else {
                    bytes = new byte[rnd.nextInt(50)];
                  }
                  builder.addUserTagNames(new String(bytes));
                } else {
      				// create an empty record by do nothing.
                }
                sourceContext.collect(builder.build());
                Thread.sleep(5);
              }
            }
          }
      
          @Override
          public void cancel() {
            running = false;
          }
        }
      
        public static void main(String[] args) throws Exception {
      
          final int SHARD_NUM = 64;
          final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
      
          // set up the execution environment
          Configuration config = new Configuration();
          config.setInteger("state.checkpoints.num-retained", 5);
          config.setInteger("taskmanager.numberOfTaskSlots", 1);
          config.setInteger("local.number-taskmanager", 4);
          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config);
      
          RocksDBStateBackend rocksDBStateBackend =
              new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true);
      
          env.setParallelism(3);
          env.setStateBackend(rocksDBStateBackend);
          env.getCheckpointConfig().setCheckpointTimeout(100000);
          env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
          env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10)));
          env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf.
      
          // make parameters available in the web interface
          env.getConfig().setGlobalJobParameters(params);
      
          String[] words = new String[100000];
          Random rnd = new Random();
          for (int i = 0; i < words.length; ++i) {
            words[i] = String.valueOf(rnd.nextInt(10));
          }
          DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4);
          BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3)
              .broadcast(new MapStateDescriptor[0]);
          SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle()
              .map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() {
      
                @Override
                public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception {
                  return value;
                }
              }).setParallelism(2).disableChaining()
              .keyBy(x -> x.hashCode() % 10)
              .connect(stream2)
              .process(new MyProcessFunction()).disableChaining();
      
          output.getSideOutput(OUTPUT_TAG_1).rescale().
              addSink(new SinkFunction<ProtobufGeneratedClass>() {
                @Override
                public void invoke(ProtobufGeneratedClass value) throws Exception {
                  log.info("blah 1");
                }
              }).setParallelism(1);
      
          output.getSideOutput(OUTPUT_TAG_2).rescale().
              addSink(new SinkFunction<ProtobufGeneratedClass>() {
                @Override
                public void invoke(ProtobufGeneratedClass value) throws Exception {
                  log.info("blah 2");
                }
              }).setParallelism(2);
      
          output.getSideOutput(OUTPUT_TAG_3).rescale().
              addSink(new SinkFunction<ProtobufGeneratedClass>() {
                @Override
                public void invoke(ProtobufGeneratedClass value) throws Exception {
                  log.info("blah 3");
                }
              }).setParallelism(3);
      
          output.map(new MapFunction<ProtobufGeneratedClass, String>() {
            @Override
            public String map(ProtobufGeneratedClass value) throws Exception {
              return "" + value.toString().length();
            }
          }).print();
          env.execute("reproduce-the-problem");
        }
      
        public static class MyProcessFunction extends
            KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> {
      
          @Override
          public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass,
              KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext,
              Collector<ProtobufGeneratedClass> collector) throws Exception {
            collector.collect(ProtobufGeneratedClass);
          }
      
          @Override
          public void processBroadcastElement(ProtobufGeneratedClass s,
              KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context,
              Collector<ProtobufGeneratedClass> collector) throws Exception {
            context.output(OUTPUT_TAG_1, s);
            context.output(OUTPUT_TAG_2, s);
            context.output(OUTPUT_TAG_3, s);
          }
        }
      }
      
      

      Attachments

        Issue Links

          Activity

            People

              shenjiaqi shen
              shenjiaqi shen
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: