Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33769

ExternalSorter hits "java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted" when using custom serializer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.17.2
    • None
    • Runtime / Task
    • None

    Description

      The NormalizedKeySorter library is used to sort records in-memory. It internally uses a SimpleCollectingOutputView instantiated using a fixed chunk of managed memory to store the records. When the SimpleCollectingOutputView runs out of memory segments, it throws an EOFException which should be caught by the sorter in the write methodand a false indicating that the sort buffer was full (javadoc) should returned. The issue here is that the EOFException thrown by the SimpleCollectingOutputView is first caught by the record serializer which offers no guarantee on passing on the exception as it was caught upwards. In the case of Kryo and Thrift, the serializer wraps the caught exception in their own exception classes and throw them upwards which the sorter doesn't catch and the job crashes.
       
      Example stacktrace -

      java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:487)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
      at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
      at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.flink.util.WrappingRuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted
      at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:262)
      at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1222)
      at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
      ... 6 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted
      at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
      at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
      at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:259)
      ... 9 more
      Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted
      at org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:256)
      at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
      at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
      at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
      at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
      at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:397)
      at org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
      at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
      Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted
      at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
      Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: Can't collect further: memorySource depleted
      at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
      at com.esotericsoftware.kryo.io.OutputChunked.flush(OutputChunked.java:45)
      at com.esotericsoftware.kryo.io.OutputChunked.endChunks(OutputChunked.java:82)
      at com.twitter.beam.coder.scala.ChillCoder.encode(ChillCoder.scala:101)
      at com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:40)
      at com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:22)
      at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
      at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:74)
      at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
      at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
      at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
      at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:607)
      at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:598)
      at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:558)
      at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:110)
      at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
      at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
      at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:297)
      at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:77)
      at org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:69)
      at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73)
      Caused by: java.io.EOFException: Can't collect further: memorySource depleted
      at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:76)
      at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:139)
      at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:205)
      at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:44)
      at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
      ... 20 more
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            vishalp Vishal Palla
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: