Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.10.0, 1.17.0
-
None
Description
Problem description
I am using protobuf defined Class in Flink job. When the application runs on production, the job throws following Exception:
java.lang.RuntimeException: Could not create class com.MYClass <==== generated by protobuf at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:121) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:319) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:494) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:127) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332) at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ... 16 common frames omitted
How to reproduce
I think this is similar to another issue: FLINK-29347.
Follwing is an example to reproduce the problem:
package com.test; import com.test.ProtobufGeneratedClass; import com.google.protobuf.Message; import com.twitter.chill.protobuf.ProtobufSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Random; @Slf4j public class app { public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_1 = new OutputTag<ProtobufGeneratedClass>("output-tag-1") { }; public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_2 = new OutputTag<ProtobufGeneratedClass>("output-tag-2") { }; public static final OutputTag<ProtobufGeneratedClass> OUTPUT_TAG_3 = new OutputTag<ProtobufGeneratedClass>("output-tag-3") { }; public static class MySourceFunction extends RichParallelSourceFunction<ProtobufGeneratedClass> { Random rnd = new Random(); private final String name; private boolean running = true; private MySourceFunction(String name) { this.name = name; } @Override public void run(SourceContext<ProtobufGeneratedClass> sourceContext) throws Exception { final int index = getRuntimeContext().getIndexOfThisSubtask(); int counter = 0; while (running) { synchronized (sourceContext.getCheckpointLock()) { ++counter; ProtobufGeneratedClass.Builder builder = ProtobufGeneratedClass.newBuilder(); if (rnd.nextBoolean()) { builder.addGraphIds(rnd.nextInt(10)); byte[] bytes; if (rnd.nextInt(10) == 1) { // make sure record is large enough to reproduce the problem // in which case, SpillingAdaptiveSpanningRecordDeserializer#spanningWrapper may be activated bytes = new byte[rnd.nextInt(5000000)]; } else if (rnd.nextInt(10) == 2) { bytes = new byte[rnd.nextInt(50000)]; } else { bytes = new byte[rnd.nextInt(50)]; } builder.addUserTagNames(new String(bytes)); } else { // create an empty record by do nothing. } sourceContext.collect(builder.build()); Thread.sleep(5); } } } @Override public void cancel() { running = false; } } public static void main(String[] args) throws Exception { final int SHARD_NUM = 64; final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment Configuration config = new Configuration(); config.setInteger("state.checkpoints.num-retained", 5); config.setInteger("taskmanager.numberOfTaskSlots", 1); config.setInteger("local.number-taskmanager", 4); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3, config); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/state/checkpoints/", true); env.setParallelism(3); env.setStateBackend(rocksDBStateBackend); env.getCheckpointConfig().setCheckpointTimeout(100000); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.seconds(10))); env.addDefaultKryoSerializer(Message.class, ProtobufSerializer.class); // make sure ProtobufSerializer is serialized/deserialized by protobuf. // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); String[] words = new String[100000]; Random rnd = new Random(); for (int i = 0; i < words.length; ++i) { words[i] = String.valueOf(rnd.nextInt(10)); } DataStreamSource<ProtobufGeneratedClass> stream1 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass1")).setParallelism(4); BroadcastStream<ProtobufGeneratedClass> stream2 = env.addSource(new MySourceFunction("randomProtobufGeneratedClass2")).setParallelism(3) .broadcast(new MapStateDescriptor[0]); SingleOutputStreamOperator<ProtobufGeneratedClass> output = stream1.shuffle() .map(new MapFunction<ProtobufGeneratedClass, ProtobufGeneratedClass>() { @Override public ProtobufGeneratedClass map(ProtobufGeneratedClass value) throws Exception { return value; } }).setParallelism(2).disableChaining() .keyBy(x -> x.hashCode() % 10) .connect(stream2) .process(new MyProcessFunction()).disableChaining(); output.getSideOutput(OUTPUT_TAG_1).rescale(). addSink(new SinkFunction<ProtobufGeneratedClass>() { @Override public void invoke(ProtobufGeneratedClass value) throws Exception { log.info("blah 1"); } }).setParallelism(1); output.getSideOutput(OUTPUT_TAG_2).rescale(). addSink(new SinkFunction<ProtobufGeneratedClass>() { @Override public void invoke(ProtobufGeneratedClass value) throws Exception { log.info("blah 2"); } }).setParallelism(2); output.getSideOutput(OUTPUT_TAG_3).rescale(). addSink(new SinkFunction<ProtobufGeneratedClass>() { @Override public void invoke(ProtobufGeneratedClass value) throws Exception { log.info("blah 3"); } }).setParallelism(3); output.map(new MapFunction<ProtobufGeneratedClass, String>() { @Override public String map(ProtobufGeneratedClass value) throws Exception { return "" + value.toString().length(); } }).print(); env.execute("reproduce-the-problem"); } public static class MyProcessFunction extends KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass> { @Override public void processElement(ProtobufGeneratedClass ProtobufGeneratedClass, KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.ReadOnlyContext readOnlyContext, Collector<ProtobufGeneratedClass> collector) throws Exception { collector.collect(ProtobufGeneratedClass); } @Override public void processBroadcastElement(ProtobufGeneratedClass s, KeyedBroadcastProcessFunction<ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass, ProtobufGeneratedClass>.Context context, Collector<ProtobufGeneratedClass> collector) throws Exception { context.output(OUTPUT_TAG_1, s); context.output(OUTPUT_TAG_2, s); context.output(OUTPUT_TAG_3, s); } } }
Attachments
Issue Links
- links to